recoverd: Fix backward compatibility for CTDB_SRVID_TAKEOVER_RUN
[ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_version.h"
29 #include "../include/ctdb_client.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 static void usage(void);
39
40 static struct {
41         int timelimit;
42         uint32_t pnn;
43         uint32_t *nodes;
44         int machinereadable;
45         int verbose;
46         int maxruntime;
47         int printemptyrecords;
48         int printdatasize;
49         int printlmaster;
50         int printhash;
51         int printrecordflags;
52 } options;
53
54 #define LONGTIMEOUT options.timelimit*10
55
56 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
57 #define LONGTIMELIMIT() timeval_current_ofs(LONGTIMEOUT, 0)
58
59 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
60 {
61         printf("CTDB version: %s\n", CTDB_VERSION_STRING);
62         return 0;
63 }
64
65 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
66                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
67                                    "Out of memory in " __location__ )); \
68                 abort();                                                \
69         }} while (0)
70
71 static uint32_t getpnn(struct ctdb_context *ctdb)
72 {
73         if ((options.pnn == CTDB_BROADCAST_ALL) ||
74             (options.pnn == CTDB_MULTICAST)) {
75                 DEBUG(DEBUG_ERR,
76                       ("Cannot get PNN for node %u\n", options.pnn));
77                 exit(1);
78         }
79
80         if (options.pnn == CTDB_CURRENT_NODE) {
81                 return ctdb_get_pnn(ctdb);
82         } else {
83                 return options.pnn;
84         }
85 }
86
87 static void assert_single_node_only(void)
88 {
89         if ((options.pnn == CTDB_BROADCAST_ALL) ||
90             (options.pnn == CTDB_MULTICAST)) {
91                 DEBUG(DEBUG_ERR,
92                       ("This control can not be applied to multiple PNNs\n"));
93                 exit(1);
94         }
95 }
96
97 /* Pretty print the flags to a static buffer in human-readable format.
98  * This never returns NULL!
99  */
100 static const char *pretty_print_flags(uint32_t flags)
101 {
102         int j;
103         static const struct {
104                 uint32_t flag;
105                 const char *name;
106         } flag_names[] = {
107                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
108                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
109                 { NODE_FLAGS_BANNED,                "BANNED" },
110                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
111                 { NODE_FLAGS_DELETED,               "DELETED" },
112                 { NODE_FLAGS_STOPPED,               "STOPPED" },
113                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
114         };
115         static char flags_str[512]; /* Big enough to contain all flag names */
116
117         flags_str[0] = '\0';
118         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
119                 if (flags & flag_names[j].flag) {
120                         if (flags_str[0] == '\0') {
121                                 (void) strcpy(flags_str, flag_names[j].name);
122                         } else {
123                                 (void) strncat(flags_str, "|", sizeof(flags_str)-1);
124                                 (void) strncat(flags_str, flag_names[j].name,
125                                                sizeof(flags_str)-1);
126                         }
127                 }
128         }
129         if (flags_str[0] == '\0') {
130                 (void) strcpy(flags_str, "OK");
131         }
132
133         return flags_str;
134 }
135
136 static int h2i(char h)
137 {
138         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
139         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
140         return h - '0';
141 }
142
143 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
144 {
145         int i, len;
146         TDB_DATA key = {NULL, 0};
147
148         len = strlen(str);
149         if (len & 0x01) {
150                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
151                 return key;
152         }
153
154         key.dsize = len>>1;
155         key.dptr  = talloc_size(mem_ctx, key.dsize);
156
157         for (i=0; i < len/2; i++) {
158                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
159         }
160         return key;
161 }
162
163 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
164  * that are disconnected or deleted.  If dd_ok is true those nodes are
165  * included in the output list of nodes.  If dd_ok is false, those
166  * nodes are filtered from the "all" case and cause an error if
167  * explicitly specified.
168  */
169 static bool parse_nodestring(struct ctdb_context *ctdb,
170                              TALLOC_CTX *mem_ctx,
171                              const char * nodestring,
172                              uint32_t current_pnn,
173                              bool dd_ok,
174                              uint32_t **nodes,
175                              uint32_t *pnn_mode)
176 {
177         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
178         int n;
179         uint32_t i;
180         struct ctdb_node_map *nodemap;
181         int ret;
182
183         *nodes = NULL;
184
185         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
186         if (ret != 0) {
187                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
188                 talloc_free(tmp_ctx);
189                 exit(10);
190         }
191
192         if (nodestring != NULL) {
193                 *nodes = talloc_array(mem_ctx, uint32_t, 0);
194                 if (*nodes == NULL) {
195                         goto failed;
196                 }
197
198                 n = 0;
199
200                 if (strcmp(nodestring, "all") == 0) {
201                         *pnn_mode = CTDB_BROADCAST_ALL;
202
203                         /* all */
204                         for (i = 0; i < nodemap->num; i++) {
205                                 if ((nodemap->nodes[i].flags &
206                                      (NODE_FLAGS_DISCONNECTED |
207                                       NODE_FLAGS_DELETED)) && !dd_ok) {
208                                         continue;
209                                 }
210                                 *nodes = talloc_realloc(mem_ctx, *nodes,
211                                                         uint32_t, n+1);
212                                 if (*nodes == NULL) {
213                                         goto failed;
214                                 }
215                                 (*nodes)[n] = i;
216                                 n++;
217                         }
218                 } else {
219                         /* x{,y...} */
220                         char *ns, *tok;
221
222                         ns = talloc_strdup(tmp_ctx, nodestring);
223                         tok = strtok(ns, ",");
224                         while (tok != NULL) {
225                                 uint32_t pnn;
226                                 i = (uint32_t)strtoul(tok, NULL, 0);
227                                 if (i >= nodemap->num) {
228                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
229                                         talloc_free(tmp_ctx);
230                                         exit(ERR_NONODE);
231                                 }
232                                 if ((nodemap->nodes[i].flags & 
233                                      (NODE_FLAGS_DISCONNECTED |
234                                       NODE_FLAGS_DELETED)) && !dd_ok) {
235                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
236                                         talloc_free(tmp_ctx);
237                                         exit(ERR_DISNODE);
238                                 }
239                                 if ((pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), i)) < 0) {
240                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
241                                         talloc_free(tmp_ctx);
242                                         exit(10);
243                                 }
244
245                                 *nodes = talloc_realloc(mem_ctx, *nodes,
246                                                         uint32_t, n+1);
247                                 if (*nodes == NULL) {
248                                         goto failed;
249                                 }
250
251                                 (*nodes)[n] = i;
252                                 n++;
253
254                                 tok = strtok(NULL, ",");
255                         }
256                         talloc_free(ns);
257
258                         if (n == 1) {
259                                 *pnn_mode = (*nodes)[0];
260                         } else {
261                                 *pnn_mode = CTDB_MULTICAST;
262                         }
263                 }
264         } else {
265                 /* default - no nodes specified */
266                 *nodes = talloc_array(mem_ctx, uint32_t, 1);
267                 if (*nodes == NULL) {
268                         goto failed;
269                 }
270                 *pnn_mode = CTDB_CURRENT_NODE;
271
272                 if (((*nodes)[0] = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), current_pnn)) < 0) {
273                         goto failed;
274                 }
275         }
276
277         talloc_free(tmp_ctx);
278         return true;
279
280 failed:
281         talloc_free(tmp_ctx);
282         return false;
283 }
284
285 /*
286  check if a database exists
287 */
288 static bool db_exists(struct ctdb_context *ctdb, const char *dbarg,
289                       uint32_t *dbid, const char **dbname, uint8_t *flags)
290 {
291         int i, ret;
292         struct ctdb_dbid_map *dbmap=NULL;
293         bool dbid_given = false, found = false;
294         uint32_t id;
295         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
296         const char *name;
297
298         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
299         if (ret != 0) {
300                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
301                 goto fail;
302         }
303
304         if (strncmp(dbarg, "0x", 2) == 0) {
305                 id = strtoul(dbarg, NULL, 0);
306                 dbid_given = true;
307         }
308
309         for(i=0; i<dbmap->num; i++) {
310                 if (dbid_given) {
311                         if (id == dbmap->dbs[i].dbid) {
312                                 found = true;
313                                 break;
314                         }
315                 } else {
316                         ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
317                         if (ret != 0) {
318                                 DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
319                                 goto fail;
320                         }
321
322                         if (strcmp(name, dbarg) == 0) {
323                                 id = dbmap->dbs[i].dbid;
324                                 found = true;
325                                 break;
326                         }
327                 }
328         }
329
330         if (found && dbid_given && dbname != NULL) {
331                 ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
332                 if (ret != 0) {
333                         DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
334                         found = false;
335                         goto fail;
336                 }
337         }
338
339         if (found) {
340                 if (dbid) *dbid = id;
341                 if (dbname) *dbname = talloc_strdup(ctdb, name);
342                 if (flags) *flags = dbmap->dbs[i].flags;
343         } else {
344                 DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
345         }
346
347 fail:
348         talloc_free(tmp_ctx);
349         return found;
350 }
351
352 /*
353   see if a process exists
354  */
355 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
356 {
357         uint32_t pnn, pid;
358         int ret;
359         if (argc < 1) {
360                 usage();
361         }
362
363         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
364                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
365                 return -1;
366         }
367
368         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
369         if (ret == 0) {
370                 printf("%u:%u exists\n", pnn, pid);
371         } else {
372                 printf("%u:%u does not exist\n", pnn, pid);
373         }
374         return ret;
375 }
376
377 /*
378   display statistics structure
379  */
380 static void show_statistics(struct ctdb_statistics *s, int show_header)
381 {
382         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
383         int i;
384         const char *prefix=NULL;
385         int preflen=0;
386         int tmp, days, hours, minutes, seconds;
387         const struct {
388                 const char *name;
389                 uint32_t offset;
390         } fields[] = {
391 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
392                 STATISTICS_FIELD(num_clients),
393                 STATISTICS_FIELD(frozen),
394                 STATISTICS_FIELD(recovering),
395                 STATISTICS_FIELD(num_recoveries),
396                 STATISTICS_FIELD(client_packets_sent),
397                 STATISTICS_FIELD(client_packets_recv),
398                 STATISTICS_FIELD(node_packets_sent),
399                 STATISTICS_FIELD(node_packets_recv),
400                 STATISTICS_FIELD(keepalive_packets_sent),
401                 STATISTICS_FIELD(keepalive_packets_recv),
402                 STATISTICS_FIELD(node.req_call),
403                 STATISTICS_FIELD(node.reply_call),
404                 STATISTICS_FIELD(node.req_dmaster),
405                 STATISTICS_FIELD(node.reply_dmaster),
406                 STATISTICS_FIELD(node.reply_error),
407                 STATISTICS_FIELD(node.req_message),
408                 STATISTICS_FIELD(node.req_control),
409                 STATISTICS_FIELD(node.reply_control),
410                 STATISTICS_FIELD(client.req_call),
411                 STATISTICS_FIELD(client.req_message),
412                 STATISTICS_FIELD(client.req_control),
413                 STATISTICS_FIELD(timeouts.call),
414                 STATISTICS_FIELD(timeouts.control),
415                 STATISTICS_FIELD(timeouts.traverse),
416                 STATISTICS_FIELD(locks.num_calls),
417                 STATISTICS_FIELD(locks.num_current),
418                 STATISTICS_FIELD(locks.num_pending),
419                 STATISTICS_FIELD(locks.num_failed),
420                 STATISTICS_FIELD(total_calls),
421                 STATISTICS_FIELD(pending_calls),
422                 STATISTICS_FIELD(childwrite_calls),
423                 STATISTICS_FIELD(pending_childwrite_calls),
424                 STATISTICS_FIELD(memory_used),
425                 STATISTICS_FIELD(max_hop_count),
426                 STATISTICS_FIELD(total_ro_delegations),
427                 STATISTICS_FIELD(total_ro_revokes),
428         };
429         
430         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
431         seconds = tmp%60;
432         tmp    /= 60;
433         minutes = tmp%60;
434         tmp    /= 60;
435         hours   = tmp%24;
436         tmp    /= 24;
437         days    = tmp;
438
439         if (options.machinereadable){
440                 if (show_header) {
441                         printf("CTDB version:");
442                         printf("Current time of statistics:");
443                         printf("Statistics collected since:");
444                         for (i=0;i<ARRAY_SIZE(fields);i++) {
445                                 printf("%s:", fields[i].name);
446                         }
447                         printf("num_reclock_ctdbd_latency:");
448                         printf("min_reclock_ctdbd_latency:");
449                         printf("avg_reclock_ctdbd_latency:");
450                         printf("max_reclock_ctdbd_latency:");
451
452                         printf("num_reclock_recd_latency:");
453                         printf("min_reclock_recd_latency:");
454                         printf("avg_reclock_recd_latency:");
455                         printf("max_reclock_recd_latency:");
456
457                         printf("num_call_latency:");
458                         printf("min_call_latency:");
459                         printf("avg_call_latency:");
460                         printf("max_call_latency:");
461
462                         printf("num_lockwait_latency:");
463                         printf("min_lockwait_latency:");
464                         printf("avg_lockwait_latency:");
465                         printf("max_lockwait_latency:");
466
467                         printf("num_childwrite_latency:");
468                         printf("min_childwrite_latency:");
469                         printf("avg_childwrite_latency:");
470                         printf("max_childwrite_latency:");
471                         printf("\n");
472                 }
473                 printf("%d:", CTDB_VERSION);
474                 printf("%d:", (int)s->statistics_current_time.tv_sec);
475                 printf("%d:", (int)s->statistics_start_time.tv_sec);
476                 for (i=0;i<ARRAY_SIZE(fields);i++) {
477                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
478                 }
479                 printf("%d:", s->reclock.ctdbd.num);
480                 printf("%.6f:", s->reclock.ctdbd.min);
481                 printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
482                 printf("%.6f:", s->reclock.ctdbd.max);
483
484                 printf("%d:", s->reclock.recd.num);
485                 printf("%.6f:", s->reclock.recd.min);
486                 printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
487                 printf("%.6f:", s->reclock.recd.max);
488
489                 printf("%d:", s->call_latency.num);
490                 printf("%.6f:", s->call_latency.min);
491                 printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
492                 printf("%.6f:", s->call_latency.max);
493
494                 printf("%d:", s->childwrite_latency.num);
495                 printf("%.6f:", s->childwrite_latency.min);
496                 printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
497                 printf("%.6f:", s->childwrite_latency.max);
498                 printf("\n");
499         } else {
500                 printf("CTDB version %u\n", CTDB_VERSION);
501                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
502                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
503
504                 for (i=0;i<ARRAY_SIZE(fields);i++) {
505                         if (strchr(fields[i].name, '.')) {
506                                 preflen = strcspn(fields[i].name, ".")+1;
507                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
508                                         prefix = fields[i].name;
509                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
510                                 }
511                         } else {
512                                 preflen = 0;
513                         }
514                         printf(" %*s%-22s%*s%10u\n", 
515                                preflen?4:0, "",
516                                fields[i].name+preflen, 
517                                preflen?0:4, "",
518                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
519                 }
520                 printf(" hop_count_buckets:");
521                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
522                         printf(" %d", s->hop_count_bucket[i]);
523                 }
524                 printf("\n");
525                 printf(" lock_buckets:");
526                 for (i=0; i<MAX_COUNT_BUCKETS; i++) {
527                         printf(" %d", s->locks.buckets[i]);
528                 }
529                 printf("\n");
530                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
531
532                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
533
534                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
535
536                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
537                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
538         }
539
540         talloc_free(tmp_ctx);
541 }
542
543 /*
544   display remote ctdb statistics combined from all nodes
545  */
546 static int control_statistics_all(struct ctdb_context *ctdb)
547 {
548         int ret, i;
549         struct ctdb_statistics statistics;
550         uint32_t *nodes;
551         uint32_t num_nodes;
552
553         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
554         CTDB_NO_MEMORY(ctdb, nodes);
555         
556         ZERO_STRUCT(statistics);
557
558         for (i=0;i<num_nodes;i++) {
559                 struct ctdb_statistics s1;
560                 int j;
561                 uint32_t *v1 = (uint32_t *)&s1;
562                 uint32_t *v2 = (uint32_t *)&statistics;
563                 uint32_t num_ints = 
564                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
565                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
566                 if (ret != 0) {
567                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
568                         return ret;
569                 }
570                 for (j=0;j<num_ints;j++) {
571                         v2[j] += v1[j];
572                 }
573                 statistics.max_hop_count = 
574                         MAX(statistics.max_hop_count, s1.max_hop_count);
575                 statistics.call_latency.max = 
576                         MAX(statistics.call_latency.max, s1.call_latency.max);
577         }
578         talloc_free(nodes);
579         printf("Gathered statistics for %u nodes\n", num_nodes);
580         show_statistics(&statistics, 1);
581         return 0;
582 }
583
584 /*
585   display remote ctdb statistics
586  */
587 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
588 {
589         int ret;
590         struct ctdb_statistics statistics;
591
592         if (options.pnn == CTDB_BROADCAST_ALL) {
593                 return control_statistics_all(ctdb);
594         }
595
596         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
597         if (ret != 0) {
598                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
599                 return ret;
600         }
601         show_statistics(&statistics, 1);
602         return 0;
603 }
604
605
606 /*
607   reset remote ctdb statistics
608  */
609 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
610 {
611         int ret;
612
613         ret = ctdb_statistics_reset(ctdb, options.pnn);
614         if (ret != 0) {
615                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
616                 return ret;
617         }
618         return 0;
619 }
620
621
622 /*
623   display remote ctdb rolling statistics
624  */
625 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
626 {
627         int ret;
628         struct ctdb_statistics_wire *stats;
629         int i, num_records = -1;
630
631         assert_single_node_only();
632
633         if (argc ==1) {
634                 num_records = atoi(argv[0]) - 1;
635         }
636
637         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
638         if (ret != 0) {
639                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
640                 return ret;
641         }
642         for (i=0;i<stats->num;i++) {
643                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
644                         continue;
645                 }
646                 show_statistics(&stats->stats[i], i==0);
647                 if (i == num_records) {
648                         break;
649                 }
650         }
651         return 0;
652 }
653
654
655 /*
656   display remote ctdb db statistics
657  */
658 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
659 {
660         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
661         struct ctdb_db_statistics *dbstat;
662         int i;
663         uint32_t db_id;
664         int num_hot_keys;
665         int ret;
666
667         if (argc < 1) {
668                 usage();
669         }
670
671         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
672                 return -1;
673         }
674
675         ret = ctdb_ctrl_dbstatistics(ctdb, options.pnn, db_id, tmp_ctx, &dbstat);
676         if (ret != 0) {
677                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
678                 talloc_free(tmp_ctx);
679                 return -1;
680         }
681
682         printf("DB Statistics: %s\n", argv[0]);
683         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
684                 dbstat->db_ro_delegations);
685         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
686                 dbstat->db_ro_delegations);
687         printf(" %s\n", "locks");
688         printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
689                 dbstat->locks.num_calls);
690         printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
691                 dbstat->locks.num_failed);
692         printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
693                 dbstat->locks.num_current);
694         printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
695                 dbstat->locks.num_pending);
696         printf(" %s", "hop_count_buckets:");
697         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
698                 printf(" %d", dbstat->hop_count_bucket[i]);
699         }
700         printf("\n");
701         printf(" %s", "lock_buckets:");
702         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
703                 printf(" %d", dbstat->locks.buckets[i]);
704         }
705         printf("\n");
706         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
707                 "locks_latency      MIN/AVG/MAX",
708                 dbstat->locks.latency.min,
709                 (dbstat->locks.latency.num ?
710                  dbstat->locks.latency.total /dbstat->locks.latency.num :
711                  0.0),
712                 dbstat->locks.latency.max,
713                 dbstat->locks.latency.num);
714         num_hot_keys = 0;
715         for (i=0; i<dbstat->num_hot_keys; i++) {
716                 if (dbstat->hot_keys[i].count > 0) {
717                         num_hot_keys++;
718                 }
719         }
720         dbstat->num_hot_keys = num_hot_keys;
721
722         printf(" Num Hot Keys:     %d\n", dbstat->num_hot_keys);
723         for (i = 0; i < dbstat->num_hot_keys; i++) {
724                 int j;
725                 printf("     Count:%d Key:", dbstat->hot_keys[i].count);
726                 for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
727                         printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
728                 }
729                 printf("\n");
730         }
731
732         talloc_free(tmp_ctx);
733         return 0;
734 }
735
736 /*
737   display uptime of remote node
738  */
739 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
740 {
741         int ret;
742         struct ctdb_uptime *uptime = NULL;
743         int tmp, days, hours, minutes, seconds;
744
745         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
746         if (ret != 0) {
747                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
748                 return ret;
749         }
750
751         if (options.machinereadable){
752                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
753                 printf(":%u:%u:%u:%lf\n",
754                         (unsigned int)uptime->current_time.tv_sec,
755                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
756                         (unsigned int)uptime->last_recovery_finished.tv_sec,
757                         timeval_delta(&uptime->last_recovery_finished,
758                                       &uptime->last_recovery_started)
759                 );
760                 return 0;
761         }
762
763         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
764
765         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
766         seconds = tmp%60;
767         tmp    /= 60;
768         minutes = tmp%60;
769         tmp    /= 60;
770         hours   = tmp%24;
771         tmp    /= 24;
772         days    = tmp;
773         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
774
775         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
776         seconds = tmp%60;
777         tmp    /= 60;
778         minutes = tmp%60;
779         tmp    /= 60;
780         hours   = tmp%24;
781         tmp    /= 24;
782         days    = tmp;
783         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
784         
785         printf("Duration of last recovery/failover: %lf seconds\n",
786                 timeval_delta(&uptime->last_recovery_finished,
787                               &uptime->last_recovery_started));
788
789         return 0;
790 }
791
792 /*
793   show the PNN of the current node
794  */
795 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
796 {
797         uint32_t mypnn;
798
799         mypnn = getpnn(ctdb);
800
801         printf("PNN:%d\n", mypnn);
802         return 0;
803 }
804
805
806 struct pnn_node {
807         struct pnn_node *next;
808         const char *addr;
809         int pnn;
810 };
811
812 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
813 {
814         const char *nodes_list;
815         int nlines;
816         char **lines;
817         int i, pnn;
818         struct pnn_node *pnn_nodes = NULL;
819         struct pnn_node *pnn_node;
820         struct pnn_node *tmp_node;
821
822         /* read the nodes file */
823         nodes_list = getenv("CTDB_NODES");
824         if (nodes_list == NULL) {
825                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
826                                              getenv("CTDB_BASE"));
827                 if (nodes_list == NULL) {
828                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
829                         exit(1);
830                 }
831         }
832         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
833         if (lines == NULL) {
834                 return NULL;
835         }
836         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
837                 nlines--;
838         }
839         for (i=0, pnn=0; i<nlines; i++) {
840                 char *node;
841
842                 node = lines[i];
843                 /* strip leading spaces */
844                 while((*node == ' ') || (*node == '\t')) {
845                         node++;
846                 }
847                 if (*node == '#') {
848                         pnn++;
849                         continue;
850                 }
851                 if (strcmp(node, "") == 0) {
852                         continue;
853                 }
854                 pnn_node = talloc(mem_ctx, struct pnn_node);
855                 pnn_node->pnn = pnn++;
856                 pnn_node->addr = talloc_strdup(pnn_node, node);
857                 pnn_node->next = pnn_nodes;
858                 pnn_nodes = pnn_node;
859         }
860
861         /* swap them around so we return them in incrementing order */
862         pnn_node = pnn_nodes;
863         pnn_nodes = NULL;
864         while (pnn_node) {
865                 tmp_node = pnn_node;
866                 pnn_node = pnn_node->next;
867
868                 tmp_node->next = pnn_nodes;
869                 pnn_nodes = tmp_node;
870         }
871
872         return pnn_nodes;
873 }
874
875 /*
876   show the PNN of the current node
877   discover the pnn by loading the nodes file and try to bind to all
878   addresses one at a time until the ip address is found.
879  */
880 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
881 {
882         TALLOC_CTX *mem_ctx = talloc_new(NULL);
883         struct pnn_node *pnn_nodes;
884         struct pnn_node *pnn_node;
885
886         assert_single_node_only();
887
888         pnn_nodes = read_nodes_file(mem_ctx);
889         if (pnn_nodes == NULL) {
890                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
891                 talloc_free(mem_ctx);
892                 return -1;
893         }
894
895         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
896                 ctdb_sock_addr addr;
897
898                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
899                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
900                         talloc_free(mem_ctx);
901                         return -1;
902                 }
903
904                 if (ctdb_sys_have_ip(&addr)) {
905                         printf("PNN:%d\n", pnn_node->pnn);
906                         talloc_free(mem_ctx);
907                         return 0;
908                 }
909         }
910
911         printf("Failed to detect which PNN this node is\n");
912         talloc_free(mem_ctx);
913         return -1;
914 }
915
916 /* Helpers for ctdb status
917  */
918 static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_flags *node)
919 {
920         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
921         int j;
922         bool ret = false;
923
924         if (node->flags == 0) {
925                 struct ctdb_control_get_ifaces *ifaces;
926
927                 if (ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), node->pnn,
928                                          tmp_ctx, &ifaces) == 0) {
929                         for (j=0; j < ifaces->num; j++) {
930                                 if (ifaces->ifaces[j].link_state != 0) {
931                                         continue;
932                                 }
933                                 ret = true;
934                                 break;
935                         }
936                 }
937         }
938         talloc_free(tmp_ctx);
939
940         return ret;
941 }
942
943 static void control_status_header_machine(void)
944 {
945         printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
946                ":Inactive:PartiallyOnline:ThisNode:\n");
947 }
948
949 static int control_status_1_machine(struct ctdb_context *ctdb, int mypnn,
950                                     struct ctdb_node_and_flags *node)
951 {
952         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
953                ctdb_addr_to_str(&node->addr),
954                !!(node->flags&NODE_FLAGS_DISCONNECTED),
955                !!(node->flags&NODE_FLAGS_BANNED),
956                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
957                !!(node->flags&NODE_FLAGS_UNHEALTHY),
958                !!(node->flags&NODE_FLAGS_STOPPED),
959                !!(node->flags&NODE_FLAGS_INACTIVE),
960                is_partially_online(ctdb, node) ? 1 : 0,
961                (node->pnn == mypnn)?'Y':'N');
962
963         return node->flags;
964 }
965
966 static int control_status_1_human(struct ctdb_context *ctdb, int mypnn,
967                                   struct ctdb_node_and_flags *node)
968 {
969        printf("pnn:%d %-16s %s%s\n", node->pnn,
970               ctdb_addr_to_str(&node->addr),
971               is_partially_online(ctdb, node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
972               node->pnn == mypnn?" (THIS NODE)":"");
973
974        return node->flags;
975 }
976
977 /*
978   display remote ctdb status
979  */
980 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
981 {
982         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
983         int i;
984         struct ctdb_vnn_map *vnnmap=NULL;
985         struct ctdb_node_map *nodemap=NULL;
986         uint32_t recmode, recmaster, mypnn;
987         int num_deleted_nodes = 0;
988         int ret;
989
990         mypnn = getpnn(ctdb);
991
992         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
993         if (ret != 0) {
994                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
995                 talloc_free(tmp_ctx);
996                 return -1;
997         }
998
999         if (options.machinereadable) {
1000                 control_status_header_machine();
1001                 for (i=0;i<nodemap->num;i++) {
1002                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1003                                 continue;
1004                         }
1005                         (void) control_status_1_machine(ctdb, mypnn,
1006                                                         &nodemap->nodes[i]);
1007                 }
1008                 talloc_free(tmp_ctx);
1009                 return 0;
1010         }
1011
1012         for (i=0; i<nodemap->num; i++) {
1013                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1014                         num_deleted_nodes++;
1015                 }
1016         }
1017         if (num_deleted_nodes == 0) {
1018                 printf("Number of nodes:%d\n", nodemap->num);
1019         } else {
1020                 printf("Number of nodes:%d (including %d deleted nodes)\n",
1021                        nodemap->num, num_deleted_nodes);
1022         }
1023         for(i=0;i<nodemap->num;i++){
1024                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1025                         continue;
1026                 }
1027                 (void) control_status_1_human(ctdb, mypnn, &nodemap->nodes[i]);
1028         }
1029
1030         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
1031         if (ret != 0) {
1032                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
1033                 talloc_free(tmp_ctx);
1034                 return -1;
1035         }
1036         if (vnnmap->generation == INVALID_GENERATION) {
1037                 printf("Generation:INVALID\n");
1038         } else {
1039                 printf("Generation:%d\n",vnnmap->generation);
1040         }
1041         printf("Size:%d\n",vnnmap->size);
1042         for(i=0;i<vnnmap->size;i++){
1043                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
1044         }
1045
1046         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmode);
1047         if (ret != 0) {
1048                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
1049                 talloc_free(tmp_ctx);
1050                 return -1;
1051         }
1052         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
1053
1054         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmaster);
1055         if (ret != 0) {
1056                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1057                 talloc_free(tmp_ctx);
1058                 return -1;
1059         }
1060         printf("Recovery master:%d\n",recmaster);
1061
1062         talloc_free(tmp_ctx);
1063         return 0;
1064 }
1065
1066 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
1067 {
1068         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1069         int i, ret;
1070         struct ctdb_node_map *nodemap=NULL;
1071         uint32_t * nodes;
1072         uint32_t pnn_mode, mypnn;
1073
1074         if (argc > 1) {
1075                 usage();
1076         }
1077
1078         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1079                               options.pnn, true, &nodes, &pnn_mode)) {
1080                 return -1;
1081         }
1082
1083         if (options.machinereadable) {
1084                 control_status_header_machine();
1085         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
1086                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
1087         }
1088
1089         mypnn = getpnn(ctdb);
1090
1091         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1092         if (ret != 0) {
1093                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1094                 talloc_free(tmp_ctx);
1095                 return -1;
1096         }
1097
1098         ret = 0;
1099
1100         for (i = 0; i < talloc_array_length(nodes); i++) {
1101                 if (options.machinereadable) {
1102                         ret |= control_status_1_machine(ctdb, mypnn,
1103                                                         &nodemap->nodes[nodes[i]]);
1104                 } else {
1105                         ret |= control_status_1_human(ctdb, mypnn,
1106                                                       &nodemap->nodes[nodes[i]]);
1107                 }
1108         }
1109
1110         talloc_free(tmp_ctx);
1111         return ret;
1112 }
1113
1114 struct natgw_node {
1115         struct natgw_node *next;
1116         const char *addr;
1117 };
1118
1119 static int find_natgw(struct ctdb_context *ctdb,
1120                        struct ctdb_node_map *nodemap, uint32_t flags,
1121                        uint32_t *pnn, const char **ip)
1122 {
1123         int i;
1124         uint32_t capabilities;
1125         int ret;
1126
1127         for (i=0;i<nodemap->num;i++) {
1128                 if (!(nodemap->nodes[i].flags & flags)) {
1129                         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
1130                                                         nodemap->nodes[i].pnn,
1131                                                         &capabilities);
1132                         if (ret != 0) {
1133                                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n",
1134                                                   nodemap->nodes[i].pnn));
1135                                 return -1;
1136                         }
1137                         if (!(capabilities&CTDB_CAP_NATGW)) {
1138                                 continue;
1139                         }
1140                         *pnn = nodemap->nodes[i].pnn;
1141                         *ip = ctdb_addr_to_str(&nodemap->nodes[i].addr);
1142                         return 0;
1143                 }
1144         }
1145
1146         return 2; /* matches ENOENT */
1147 }
1148
1149 /*
1150   display the list of nodes belonging to this natgw configuration
1151  */
1152 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
1153 {
1154         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1155         int i, ret;
1156         const char *natgw_list;
1157         int nlines;
1158         char **lines;
1159         struct natgw_node *natgw_nodes = NULL;
1160         struct natgw_node *natgw_node;
1161         struct ctdb_node_map *nodemap=NULL;
1162         uint32_t mypnn, pnn;
1163         const char *ip;
1164
1165         /* When we have some nodes that could be the NATGW, make a
1166          * series of attempts to find the first node that doesn't have
1167          * certain status flags set.
1168          */
1169         uint32_t exclude_flags[] = {
1170                 /* Look for a nice healthy node */
1171                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
1172                 /* If not found, an UNHEALTHY/BANNED node will do */
1173                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
1174                 /* If not found, a STOPPED node will do */
1175                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
1176                 0,
1177         };
1178
1179         /* read the natgw nodes file into a linked list */
1180         natgw_list = getenv("CTDB_NATGW_NODES");
1181         if (natgw_list == NULL) {
1182                 natgw_list = talloc_asprintf(tmp_ctx, "%s/natgw_nodes",
1183                                              getenv("CTDB_BASE"));
1184                 if (natgw_list == NULL) {
1185                         DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
1186                         exit(1);
1187                 }
1188         }
1189         lines = file_lines_load(natgw_list, &nlines, ctdb);
1190         if (lines == NULL) {
1191                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
1192                 talloc_free(tmp_ctx);
1193                 return -1;
1194         }
1195         for (i=0;i<nlines;i++) {
1196                 char *node;
1197
1198                 node = lines[i];
1199                 /* strip leading spaces */
1200                 while((*node == ' ') || (*node == '\t')) {
1201                         node++;
1202                 }
1203                 if (*node == '#') {
1204                         continue;
1205                 }
1206                 if (strcmp(node, "") == 0) {
1207                         continue;
1208                 }
1209                 natgw_node = talloc(ctdb, struct natgw_node);
1210                 natgw_node->addr = talloc_strdup(natgw_node, node);
1211                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
1212                 natgw_node->next = natgw_nodes;
1213                 natgw_nodes = natgw_node;
1214         }
1215
1216         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1217         if (ret != 0) {
1218                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1219                 talloc_free(tmp_ctx);
1220                 return -1;
1221         }
1222
1223         /* Trim the nodemap so it only includes connected nodes in the
1224          * current natgw group.
1225          */
1226         i=0;
1227         while(i<nodemap->num) {
1228                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
1229                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
1230                                 break;
1231                         }
1232                 }
1233
1234                 /* this node was not in the natgw so we just remove it from
1235                  * the list
1236                  */
1237                 if ((natgw_node == NULL) 
1238                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
1239                         int j;
1240
1241                         for (j=i+1; j<nodemap->num; j++) {
1242                                 nodemap->nodes[j-1] = nodemap->nodes[j];
1243                         }
1244                         nodemap->num--;
1245                         continue;
1246                 }
1247
1248                 i++;
1249         }
1250
1251         ret = 2; /* matches ENOENT */
1252         pnn = -1;
1253         ip = "0.0.0.0";
1254         for (i = 0; exclude_flags[i] != 0; i++) {
1255                 ret = find_natgw(ctdb, nodemap,
1256                                  exclude_flags[i],
1257                                  &pnn, &ip);
1258                 if (ret == -1) {
1259                         goto done;
1260                 }
1261                 if (ret == 0) {
1262                         break;
1263                 }
1264         }
1265
1266         if (options.machinereadable) {
1267                 printf(":Node:IP:\n");
1268                 printf(":%d:%s:\n", pnn, ip);
1269         } else {
1270                 printf("%d %s\n", pnn, ip);
1271         }
1272
1273         /* print the pruned list of nodes belonging to this natgw list */
1274         mypnn = getpnn(ctdb);
1275         if (options.machinereadable) {
1276                 control_status_header_machine();
1277         } else {
1278                 printf("Number of nodes:%d\n", nodemap->num);
1279         }
1280         for(i=0;i<nodemap->num;i++){
1281                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1282                         continue;
1283                 }
1284                 if (options.machinereadable) {
1285                         control_status_1_machine(ctdb, mypnn, &(nodemap->nodes[i]));
1286                 } else {
1287                         control_status_1_human(ctdb, mypnn, &(nodemap->nodes[i]));
1288                 }
1289         }
1290
1291 done:
1292         talloc_free(tmp_ctx);
1293         return ret;
1294 }
1295
1296 /*
1297   display the status of the scripts for monitoring (or other events)
1298  */
1299 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1300                                     enum ctdb_eventscript_call type)
1301 {
1302         struct ctdb_scripts_wire *script_status;
1303         int ret, i;
1304
1305         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1306         if (ret != 0) {
1307                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1308                 return ret;
1309         }
1310
1311         if (script_status == NULL) {
1312                 if (!options.machinereadable) {
1313                         printf("%s cycle never run\n",
1314                                ctdb_eventscript_call_names[type]);
1315                 }
1316                 return 0;
1317         }
1318
1319         if (!options.machinereadable) {
1320                 printf("%d scripts were executed last %s cycle\n",
1321                        script_status->num_scripts,
1322                        ctdb_eventscript_call_names[type]);
1323         }
1324         for (i=0; i<script_status->num_scripts; i++) {
1325                 const char *status = NULL;
1326
1327                 switch (script_status->scripts[i].status) {
1328                 case -ETIME:
1329                         status = "TIMEDOUT";
1330                         break;
1331                 case -ENOEXEC:
1332                         status = "DISABLED";
1333                         break;
1334                 case 0:
1335                         status = "OK";
1336                         break;
1337                 default:
1338                         if (script_status->scripts[i].status > 0)
1339                                 status = "ERROR";
1340                         break;
1341                 }
1342                 if (options.machinereadable) {
1343                         printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1344                                ctdb_eventscript_call_names[type],
1345                                script_status->scripts[i].name,
1346                                script_status->scripts[i].status,
1347                                status,
1348                                (long)script_status->scripts[i].start.tv_sec,
1349                                (long)script_status->scripts[i].start.tv_usec,
1350                                (long)script_status->scripts[i].finished.tv_sec,
1351                                (long)script_status->scripts[i].finished.tv_usec,
1352                                script_status->scripts[i].output);
1353                         continue;
1354                 }
1355                 if (status)
1356                         printf("%-20s Status:%s    ",
1357                                script_status->scripts[i].name, status);
1358                 else
1359                         /* Some other error, eg from stat. */
1360                         printf("%-20s Status:CANNOT RUN (%s)",
1361                                script_status->scripts[i].name,
1362                                strerror(-script_status->scripts[i].status));
1363
1364                 if (script_status->scripts[i].status >= 0) {
1365                         printf("Duration:%.3lf ",
1366                         timeval_delta(&script_status->scripts[i].finished,
1367                               &script_status->scripts[i].start));
1368                 }
1369                 if (script_status->scripts[i].status != -ENOEXEC) {
1370                         printf("%s",
1371                                ctime(&script_status->scripts[i].start.tv_sec));
1372                         if (script_status->scripts[i].status != 0) {
1373                                 printf("   OUTPUT:%s\n",
1374                                        script_status->scripts[i].output);
1375                         }
1376                 } else {
1377                         printf("\n");
1378                 }
1379         }
1380         return 0;
1381 }
1382
1383
1384 static int control_scriptstatus(struct ctdb_context *ctdb,
1385                                 int argc, const char **argv)
1386 {
1387         int ret;
1388         enum ctdb_eventscript_call type, min, max;
1389         const char *arg;
1390
1391         if (argc > 1) {
1392                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1393                 return -1;
1394         }
1395
1396         if (argc == 0)
1397                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1398         else
1399                 arg = argv[0];
1400
1401         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1402                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1403                         min = type;
1404                         max = type+1;
1405                         break;
1406                 }
1407         }
1408         if (type == CTDB_EVENT_MAX) {
1409                 if (strcmp(arg, "all") == 0) {
1410                         min = 0;
1411                         max = CTDB_EVENT_MAX;
1412                 } else {
1413                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1414                         return -1;
1415                 }
1416         }
1417
1418         if (options.machinereadable) {
1419                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1420         }
1421
1422         for (type = min; type < max; type++) {
1423                 ret = control_one_scriptstatus(ctdb, type);
1424                 if (ret != 0) {
1425                         return ret;
1426                 }
1427         }
1428
1429         return 0;
1430 }
1431
1432 /*
1433   enable an eventscript
1434  */
1435 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1436 {
1437         int ret;
1438
1439         if (argc < 1) {
1440                 usage();
1441         }
1442
1443         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1444         if (ret != 0) {
1445           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1446                 return ret;
1447         }
1448
1449         return 0;
1450 }
1451
1452 /*
1453   disable an eventscript
1454  */
1455 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1456 {
1457         int ret;
1458
1459         if (argc < 1) {
1460                 usage();
1461         }
1462
1463         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1464         if (ret != 0) {
1465           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1466                 return ret;
1467         }
1468
1469         return 0;
1470 }
1471
1472 /*
1473   display the pnn of the recovery master
1474  */
1475 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1476 {
1477         uint32_t recmaster;
1478         int ret;
1479
1480         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
1481         if (ret != 0) {
1482                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1483                 return -1;
1484         }
1485         printf("%d\n",recmaster);
1486
1487         return 0;
1488 }
1489
1490 /*
1491   add a tickle to a public address
1492  */
1493 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1494 {
1495         struct ctdb_tcp_connection t;
1496         TDB_DATA data;
1497         int ret;
1498
1499         assert_single_node_only();
1500
1501         if (argc < 2) {
1502                 usage();
1503         }
1504
1505         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1506                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1507                 return -1;
1508         }
1509         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1510                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1511                 return -1;
1512         }
1513
1514         data.dptr = (uint8_t *)&t;
1515         data.dsize = sizeof(t);
1516
1517         /* tell all nodes about this tcp connection */
1518         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1519                            0, data, ctdb, NULL, NULL, NULL, NULL);
1520         if (ret != 0) {
1521                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1522                 return -1;
1523         }
1524         
1525         return 0;
1526 }
1527
1528
1529 /*
1530   delete a tickle from a node
1531  */
1532 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1533 {
1534         struct ctdb_tcp_connection t;
1535         TDB_DATA data;
1536         int ret;
1537
1538         assert_single_node_only();
1539
1540         if (argc < 2) {
1541                 usage();
1542         }
1543
1544         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1545                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1546                 return -1;
1547         }
1548         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1549                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1550                 return -1;
1551         }
1552
1553         data.dptr = (uint8_t *)&t;
1554         data.dsize = sizeof(t);
1555
1556         /* tell all nodes about this tcp connection */
1557         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1558                            0, data, ctdb, NULL, NULL, NULL, NULL);
1559         if (ret != 0) {
1560                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1561                 return -1;
1562         }
1563         
1564         return 0;
1565 }
1566
1567
1568 /*
1569   get a list of all tickles for this pnn
1570  */
1571 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1572 {
1573         struct ctdb_control_tcp_tickle_list *list;
1574         ctdb_sock_addr addr;
1575         int i, ret;
1576         unsigned port = 0;
1577
1578         assert_single_node_only();
1579
1580         if (argc < 1) {
1581                 usage();
1582         }
1583
1584         if (argc == 2) {
1585                 port = atoi(argv[1]);
1586         }
1587
1588         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1589                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1590                 return -1;
1591         }
1592
1593         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1594         if (ret == -1) {
1595                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1596                 return -1;
1597         }
1598
1599         if (options.machinereadable){
1600                 printf(":source ip:port:destination ip:port:\n");
1601                 for (i=0;i<list->tickles.num;i++) {
1602                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1603                                 continue;
1604                         }
1605                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1606                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1607                 }
1608         } else {
1609                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1610                 printf("Num tickles:%u\n", list->tickles.num);
1611                 for (i=0;i<list->tickles.num;i++) {
1612                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1613                                 continue;
1614                         }
1615                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1616                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1617                 }
1618         }
1619
1620         talloc_free(list);
1621         
1622         return 0;
1623 }
1624
1625
1626 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1627 {
1628         struct ctdb_all_public_ips *ips;
1629         struct ctdb_public_ip ip;
1630         int i, ret;
1631         uint32_t *nodes;
1632         uint32_t disable_time;
1633         TDB_DATA data;
1634         struct ctdb_node_map *nodemap=NULL;
1635         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1636
1637         disable_time = 30;
1638         data.dptr  = (uint8_t*)&disable_time;
1639         data.dsize = sizeof(disable_time);
1640         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1641         if (ret != 0) {
1642                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1643                 return -1;
1644         }
1645
1646
1647
1648         /* read the public ip list from the node */
1649         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1650         if (ret != 0) {
1651                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1652                 talloc_free(tmp_ctx);
1653                 return -1;
1654         }
1655
1656         for (i=0;i<ips->num;i++) {
1657                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1658                         break;
1659                 }
1660         }
1661         if (i==ips->num) {
1662                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1663                         pnn, ctdb_addr_to_str(addr)));
1664                 talloc_free(tmp_ctx);
1665                 return -1;
1666         }
1667
1668         ip.pnn  = pnn;
1669         ip.addr = *addr;
1670
1671         data.dptr  = (uint8_t *)&ip;
1672         data.dsize = sizeof(ip);
1673
1674         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1675         if (ret != 0) {
1676                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1677                 talloc_free(tmp_ctx);
1678                 return ret;
1679         }
1680
1681         nodes = list_of_nodes(ctdb, nodemap, tmp_ctx, NODE_FLAGS_INACTIVE, pnn);
1682         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1683                                         nodes, 0,
1684                                         LONGTIMELIMIT(),
1685                                         false, data,
1686                                         NULL, NULL,
1687                                         NULL);
1688         if (ret != 0) {
1689                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1690                 talloc_free(tmp_ctx);
1691                 return -1;
1692         }
1693
1694         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1695         if (ret != 0) {
1696                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1697                 talloc_free(tmp_ctx);
1698                 return -1;
1699         }
1700
1701         /* update the recovery daemon so it now knows to expect the new
1702            node assignment for this ip.
1703         */
1704         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1705         if (ret != 0) {
1706                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1707                 return -1;
1708         }
1709
1710         talloc_free(tmp_ctx);
1711         return 0;
1712 }
1713
1714
1715 /* 
1716  * scans all other nodes and returns a pnn for another node that can host this 
1717  * ip address or -1
1718  */
1719 static int
1720 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1721 {
1722         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1723         struct ctdb_all_public_ips *ips;
1724         struct ctdb_node_map *nodemap=NULL;
1725         int i, j, ret;
1726
1727         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1728         if (ret != 0) {
1729                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1730                 talloc_free(tmp_ctx);
1731                 return ret;
1732         }
1733
1734         for(i=0;i<nodemap->num;i++){
1735                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1736                         continue;
1737                 }
1738                 if (nodemap->nodes[i].pnn == options.pnn) {
1739                         continue;
1740                 }
1741
1742                 /* read the public ip list from this node */
1743                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1744                 if (ret != 0) {
1745                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1746                         return -1;
1747                 }
1748
1749                 for (j=0;j<ips->num;j++) {
1750                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1751                                 talloc_free(tmp_ctx);
1752                                 return nodemap->nodes[i].pnn;
1753                         }
1754                 }
1755                 talloc_free(ips);
1756         }
1757
1758         talloc_free(tmp_ctx);
1759         return -1;
1760 }
1761
1762 /* If pnn is -1 then try to find a node to move IP to... */
1763 static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1764 {
1765         bool pnn_specified = (pnn == -1 ? false : true);
1766         int retries = 0;
1767
1768         while (retries < 5) {
1769                 if (!pnn_specified) {
1770                         pnn = find_other_host_for_public_ip(ctdb, addr);
1771                         if (pnn == -1) {
1772                                 return false;
1773                         }
1774                         DEBUG(DEBUG_NOTICE,
1775                               ("Trying to move public IP to node %u\n", pnn));
1776                 }
1777
1778                 if (move_ip(ctdb, addr, pnn) == 0) {
1779                         return true;
1780                 }
1781
1782                 sleep(3);
1783                 retries++;
1784         }
1785
1786         return false;
1787 }
1788
1789
1790 /*
1791   move/failover an ip address to a specific node
1792  */
1793 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1794 {
1795         uint32_t pnn;
1796         ctdb_sock_addr addr;
1797
1798         assert_single_node_only();
1799
1800         if (argc < 2) {
1801                 usage();
1802                 return -1;
1803         }
1804
1805         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1806                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1807                 return -1;
1808         }
1809
1810
1811         if (sscanf(argv[1], "%u", &pnn) != 1) {
1812                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1813                 return -1;
1814         }
1815
1816         if (!try_moveip(ctdb, &addr, pnn)) {
1817                 DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
1818                 return -1;
1819         }
1820
1821         return 0;
1822 }
1823
1824 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1825 {
1826         TDB_DATA data;
1827
1828         data.dptr  = (uint8_t *)&pnn;
1829         data.dsize = sizeof(uint32_t);
1830         if (ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1831                 DEBUG(DEBUG_ERR,
1832                       ("Failed to send message to force node %u to be a rebalancing target\n",
1833                        pnn));
1834                 return -1;
1835         }
1836
1837         return 0;
1838 }
1839
1840
1841 /*
1842   rebalance a node by setting it to allow failback and triggering a
1843   takeover run
1844  */
1845 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1846 {
1847         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1848         uint32_t *nodes;
1849         uint32_t pnn_mode;
1850         int i, ret;
1851
1852         assert_single_node_only();
1853
1854         if (argc > 1) {
1855                 usage();
1856         }
1857
1858         /* Determine the nodes where IPs need to be reloaded */
1859         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
1860                               options.pnn, true, &nodes, &pnn_mode)) {
1861                 ret = -1;
1862                 goto done;
1863         }
1864
1865         for (i = 0; i < talloc_array_length(nodes); i++) {
1866                 if (!rebalance_node(ctdb, nodes[i])) {
1867                         ret = -1;
1868                 }
1869         }
1870
1871 done:
1872         talloc_free(tmp_ctx);
1873         return ret;
1874 }
1875
1876 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1877 {
1878         struct ctdb_public_ip ip;
1879         int ret;
1880         uint32_t *nodes;
1881         uint32_t disable_time;
1882         TDB_DATA data;
1883         struct ctdb_node_map *nodemap=NULL;
1884         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1885
1886         disable_time = 30;
1887         data.dptr  = (uint8_t*)&disable_time;
1888         data.dsize = sizeof(disable_time);
1889         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1890         if (ret != 0) {
1891                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1892                 return -1;
1893         }
1894
1895         ip.pnn  = -1;
1896         ip.addr = *addr;
1897
1898         data.dptr  = (uint8_t *)&ip;
1899         data.dsize = sizeof(ip);
1900
1901         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1902         if (ret != 0) {
1903                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1904                 talloc_free(tmp_ctx);
1905                 return ret;
1906         }
1907
1908         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1909         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1910                                         nodes, 0,
1911                                         LONGTIMELIMIT(),
1912                                         false, data,
1913                                         NULL, NULL,
1914                                         NULL);
1915         if (ret != 0) {
1916                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1917                 talloc_free(tmp_ctx);
1918                 return -1;
1919         }
1920
1921         talloc_free(tmp_ctx);
1922         return 0;
1923 }
1924
1925 /*
1926   release an ip form all nodes and have it re-assigned by recd
1927  */
1928 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
1929 {
1930         ctdb_sock_addr addr;
1931
1932         assert_single_node_only();
1933
1934         if (argc < 1) {
1935                 usage();
1936                 return -1;
1937         }
1938
1939         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1940                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1941                 return -1;
1942         }
1943
1944         if (rebalance_ip(ctdb, &addr) != 0) {
1945                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
1946                 return -1;
1947         }
1948
1949         return 0;
1950 }
1951
1952 static int getips_store_callback(void *param, void *data)
1953 {
1954         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1955         struct ctdb_all_public_ips *ips = param;
1956         int i;
1957
1958         i = ips->num++;
1959         ips->ips[i].pnn  = node_ip->pnn;
1960         ips->ips[i].addr = node_ip->addr;
1961         return 0;
1962 }
1963
1964 static int getips_count_callback(void *param, void *data)
1965 {
1966         uint32_t *count = param;
1967
1968         (*count)++;
1969         return 0;
1970 }
1971
1972 #define IP_KEYLEN       4
1973 static uint32_t *ip_key(ctdb_sock_addr *ip)
1974 {
1975         static uint32_t key[IP_KEYLEN];
1976
1977         bzero(key, sizeof(key));
1978
1979         switch (ip->sa.sa_family) {
1980         case AF_INET:
1981                 key[0]  = ip->ip.sin_addr.s_addr;
1982                 break;
1983         case AF_INET6: {
1984                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
1985                 key[0]  = s6_a32[3];
1986                 key[1]  = s6_a32[2];
1987                 key[2]  = s6_a32[1];
1988                 key[3]  = s6_a32[0];
1989                 break;
1990         }
1991         default:
1992                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1993                 return key;
1994         }
1995
1996         return key;
1997 }
1998
1999 static void *add_ip_callback(void *parm, void *data)
2000 {
2001         return parm;
2002 }
2003
2004 static int
2005 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
2006 {
2007         struct ctdb_all_public_ips *tmp_ips;
2008         struct ctdb_node_map *nodemap=NULL;
2009         trbt_tree_t *ip_tree;
2010         int i, j, len, ret;
2011         uint32_t count;
2012
2013         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2014         if (ret != 0) {
2015                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2016                 return ret;
2017         }
2018
2019         ip_tree = trbt_create(tmp_ctx, 0);
2020
2021         for(i=0;i<nodemap->num;i++){
2022                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
2023                         continue;
2024                 }
2025                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2026                         continue;
2027                 }
2028
2029                 /* read the public ip list from this node */
2030                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
2031                 if (ret != 0) {
2032                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
2033                         return -1;
2034                 }
2035         
2036                 for (j=0; j<tmp_ips->num;j++) {
2037                         struct ctdb_public_ip *node_ip;
2038
2039                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
2040                         node_ip->pnn  = tmp_ips->ips[j].pnn;
2041                         node_ip->addr = tmp_ips->ips[j].addr;
2042
2043                         trbt_insertarray32_callback(ip_tree,
2044                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
2045                                 add_ip_callback,
2046                                 node_ip);
2047                 }
2048                 talloc_free(tmp_ips);
2049         }
2050
2051         /* traverse */
2052         count = 0;
2053         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
2054
2055         len = offsetof(struct ctdb_all_public_ips, ips) + 
2056                 count*sizeof(struct ctdb_public_ip);
2057         tmp_ips = talloc_zero_size(tmp_ctx, len);
2058         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
2059
2060         *ips = tmp_ips;
2061
2062         return 0;
2063 }
2064
2065
2066 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2067 {
2068         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
2069
2070         event_add_timed(ctdb->ev, ctdb, 
2071                                 timeval_current_ofs(1, 0),
2072                                 ctdb_every_second, ctdb);
2073 }
2074
2075 struct srvid_reply_handler_data {
2076         bool done;
2077         bool wait_for_all;
2078         uint32_t *nodes;
2079         const char *srvid_str;
2080 };
2081
2082 static void srvid_broadcast_reply_handler(struct ctdb_context *ctdb,
2083                                          uint64_t srvid,
2084                                          TDB_DATA data,
2085                                          void *private_data)
2086 {
2087         struct srvid_reply_handler_data *d =
2088                 (struct srvid_reply_handler_data *)private_data;
2089         int i;
2090         int32_t ret;
2091
2092         if (data.dsize != sizeof(ret)) {
2093                 DEBUG(DEBUG_ERR, (__location__ " Wrong reply size\n"));
2094                 return;
2095         }
2096
2097         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
2098         ret = *(int32_t *)data.dptr;
2099         if (ret < 0) {
2100                 DEBUG(DEBUG_ERR,
2101                       ("%s failed with result %d\n", d->srvid_str, ret));
2102                 return;
2103         }
2104
2105         if (!d->wait_for_all) {
2106                 d->done = true;
2107                 return;
2108         }
2109
2110         /* Wait for all replies */
2111         d->done = true;
2112         for (i = 0; i < talloc_array_length(d->nodes); i++) {
2113                 if (d->nodes[i] == ret) {
2114                         DEBUG(DEBUG_INFO,
2115                               ("%s reply received from node %u\n",
2116                                d->srvid_str, ret));
2117                         d->nodes[i] = -1;
2118                 }
2119                 if (d->nodes[i] != -1) {
2120                         /* Found a node that hasn't yet replied */
2121                         d->done = false;
2122                 }
2123         }
2124 }
2125
2126 /* Broadcast the given SRVID to all connected nodes.  Wait for 1 reply
2127  * or replies from all connected nodes.  arg is the data argument to
2128  * pass in the srvid_request structure - pass 0 if this isn't needed.
2129  */
2130 static int srvid_broadcast(struct ctdb_context *ctdb,
2131                            uint64_t srvid, uint32_t *arg,
2132                            const char *srvid_str, bool wait_for_all)
2133 {
2134         int ret;
2135         TDB_DATA data;
2136         uint32_t pnn;
2137         uint64_t reply_srvid;
2138         struct srvid_request request;
2139         struct srvid_request_data request_data;
2140         struct srvid_reply_handler_data reply_data;
2141         struct timeval tv;
2142
2143         ZERO_STRUCT(request);
2144
2145         /* Time ticks to enable timeouts to be processed */
2146         event_add_timed(ctdb->ev, ctdb, 
2147                                 timeval_current_ofs(1, 0),
2148                                 ctdb_every_second, ctdb);
2149
2150         pnn = ctdb_get_pnn(ctdb);
2151         reply_srvid = getpid();
2152
2153         if (arg == NULL) {
2154                 request.pnn = pnn;
2155                 request.srvid = reply_srvid;
2156
2157                 data.dptr = (uint8_t *)&request;
2158                 data.dsize = sizeof(request);
2159         } else {
2160                 request_data.pnn = pnn;
2161                 request_data.srvid = reply_srvid;
2162                 request_data.data = *arg;
2163
2164                 data.dptr = (uint8_t *)&request_data;
2165                 data.dsize = sizeof(request_data);
2166         }
2167
2168         /* Register message port for reply from recovery master */
2169         ctdb_client_set_message_handler(ctdb, reply_srvid,
2170                                         srvid_broadcast_reply_handler,
2171                                         &reply_data);
2172
2173         reply_data.wait_for_all = wait_for_all;
2174         reply_data.nodes = NULL;
2175         reply_data.srvid_str = srvid_str;
2176
2177 again:
2178         reply_data.done = false;
2179
2180         if (wait_for_all) {
2181                 struct ctdb_node_map *nodemap;
2182
2183                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
2184                                            CTDB_CURRENT_NODE, ctdb, &nodemap);
2185                 if (ret != 0) {
2186                         DEBUG(DEBUG_ERR,
2187                               ("Unable to get nodemap from current node, try again\n"));
2188                         sleep(1);
2189                         goto again;
2190                 }
2191
2192                 if (reply_data.nodes != NULL) {
2193                         talloc_free(reply_data.nodes);
2194                 }
2195                 reply_data.nodes = list_of_connected_nodes(ctdb, nodemap,
2196                                                            NULL, true);
2197
2198                 talloc_free(nodemap);
2199         }
2200
2201         /* Send to all connected nodes. Only recmaster replies */
2202         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2203                                        srvid, data);
2204         if (ret != 0) {
2205                 /* This can only happen if the socket is closed and
2206                  * there's no way to recover from that, so don't try
2207                  * again.
2208                  */
2209                 DEBUG(DEBUG_ERR,
2210                       ("Failed to send %s request to connected nodes\n",
2211                        srvid_str));
2212                 return -1;
2213         }
2214
2215         tv = timeval_current();
2216         /* This loop terminates the reply is received */
2217         while (timeval_elapsed(&tv) < 5.0 && !reply_data.done) {
2218                 event_loop_once(ctdb->ev);
2219         }
2220
2221         if (!reply_data.done) {
2222                 DEBUG(DEBUG_NOTICE,
2223                       ("Still waiting for confirmation of %s\n", srvid_str));
2224                 sleep(1);
2225                 goto again;
2226         }
2227
2228         ctdb_client_remove_message_handler(ctdb, reply_srvid, &reply_data);
2229
2230         talloc_free(reply_data.nodes);
2231
2232         return 0;
2233 }
2234
2235 static int ipreallocate(struct ctdb_context *ctdb)
2236 {
2237         return srvid_broadcast(ctdb, CTDB_SRVID_TAKEOVER_RUN, NULL,
2238                                "IP reallocation", false);
2239 }
2240
2241
2242 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2243 {
2244         return ipreallocate(ctdb);
2245 }
2246
2247 /*
2248   add a public ip address to a node
2249  */
2250 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2251 {
2252         int i, ret;
2253         int len, retries = 0;
2254         unsigned mask;
2255         ctdb_sock_addr addr;
2256         struct ctdb_control_ip_iface *pub;
2257         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2258         struct ctdb_all_public_ips *ips;
2259
2260
2261         if (argc != 2) {
2262                 talloc_free(tmp_ctx);
2263                 usage();
2264         }
2265
2266         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2267                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2268                 talloc_free(tmp_ctx);
2269                 return -1;
2270         }
2271
2272         /* read the public ip list from the node */
2273         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2274         if (ret != 0) {
2275                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2276                 talloc_free(tmp_ctx);
2277                 return -1;
2278         }
2279         for (i=0;i<ips->num;i++) {
2280                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2281                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2282                         return 0;
2283                 }
2284         }
2285
2286
2287
2288         /* Dont timeout. This command waits for an ip reallocation
2289            which sometimes can take wuite a while if there has
2290            been a recent recovery
2291         */
2292         alarm(0);
2293
2294         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2295         pub = talloc_size(tmp_ctx, len); 
2296         CTDB_NO_MEMORY(ctdb, pub);
2297
2298         pub->addr  = addr;
2299         pub->mask  = mask;
2300         pub->len   = strlen(argv[1])+1;
2301         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2302
2303         do {
2304                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2305                 if (ret != 0) {
2306                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2307                         sleep(3);
2308                         retries++;
2309                 }
2310         } while (retries < 5 && ret != 0);
2311         if (ret != 0) {
2312                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2313                 talloc_free(tmp_ctx);
2314                 return ret;
2315         }
2316
2317         if (rebalance_node(ctdb, options.pnn) != 0) {
2318                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2319                 return ret;
2320         }
2321
2322         talloc_free(tmp_ctx);
2323         return 0;
2324 }
2325
2326 /*
2327   add a public ip address to a node
2328  */
2329 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2330 {
2331         ctdb_sock_addr addr;
2332
2333         if (argc != 1) {
2334                 usage();
2335         }
2336
2337         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2338                 printf("Badly formed ip : %s\n", argv[0]);
2339                 return -1;
2340         }
2341
2342         printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
2343
2344         return 0;
2345 }
2346
2347 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2348
2349 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2350 {
2351         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2352         struct ctdb_node_map *nodemap=NULL;
2353         struct ctdb_all_public_ips *ips;
2354         int ret, i, j;
2355
2356         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2357         if (ret != 0) {
2358                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2359                 return ret;
2360         }
2361
2362         /* remove it from the nodes that are not hosting the ip currently */
2363         for(i=0;i<nodemap->num;i++){
2364                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2365                         continue;
2366                 }
2367                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2368                 if (ret != 0) {
2369                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2370                         continue;
2371                 }
2372
2373                 for (j=0;j<ips->num;j++) {
2374                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2375                                 break;
2376                         }
2377                 }
2378                 if (j==ips->num) {
2379                         continue;
2380                 }
2381
2382                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2383                         continue;
2384                 }
2385
2386                 options.pnn = nodemap->nodes[i].pnn;
2387                 control_delip(ctdb, argc, argv);
2388         }
2389
2390
2391         /* remove it from every node (also the one hosting it) */
2392         for(i=0;i<nodemap->num;i++){
2393                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2394                         continue;
2395                 }
2396                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
2397                 if (ret != 0) {
2398                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2399                         continue;
2400                 }
2401
2402                 for (j=0;j<ips->num;j++) {
2403                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2404                                 break;
2405                         }
2406                 }
2407                 if (j==ips->num) {
2408                         continue;
2409                 }
2410
2411                 options.pnn = nodemap->nodes[i].pnn;
2412                 control_delip(ctdb, argc, argv);
2413         }
2414
2415         talloc_free(tmp_ctx);
2416         return 0;
2417 }
2418         
2419 /*
2420   delete a public ip address from a node
2421  */
2422 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2423 {
2424         int i, ret;
2425         ctdb_sock_addr addr;
2426         struct ctdb_control_ip_iface pub;
2427         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2428         struct ctdb_all_public_ips *ips;
2429
2430         if (argc != 1) {
2431                 talloc_free(tmp_ctx);
2432                 usage();
2433         }
2434
2435         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2436                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2437                 return -1;
2438         }
2439
2440         if (options.pnn == CTDB_BROADCAST_ALL) {
2441                 return control_delip_all(ctdb, argc, argv, &addr);
2442         }
2443
2444         pub.addr  = addr;
2445         pub.mask  = 0;
2446         pub.len   = 0;
2447
2448         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2449         if (ret != 0) {
2450                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2451                 talloc_free(tmp_ctx);
2452                 return ret;
2453         }
2454         
2455         for (i=0;i<ips->num;i++) {
2456                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2457                         break;
2458                 }
2459         }
2460
2461         if (i==ips->num) {
2462                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2463                         ctdb_addr_to_str(&addr)));
2464                 talloc_free(tmp_ctx);
2465                 return -1;
2466         }
2467
2468         /* This is an optimisation.  If this node is hosting the IP
2469          * then try to move it somewhere else without invoking a full
2470          * takeover run.  We don't care if this doesn't work!
2471          */
2472         if (ips->ips[i].pnn == options.pnn) {
2473                 (void) try_moveip(ctdb, &addr, -1);
2474         }
2475
2476         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2477         if (ret != 0) {
2478                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2479                 talloc_free(tmp_ctx);
2480                 return ret;
2481         }
2482
2483         talloc_free(tmp_ctx);
2484         return 0;
2485 }
2486
2487 static int kill_tcp_from_file(struct ctdb_context *ctdb,
2488                               int argc, const char **argv)
2489 {
2490         struct ctdb_control_killtcp *killtcp;
2491         int max_entries, current, i;
2492         struct timeval timeout;
2493         char line[128], src[128], dst[128];
2494         int linenum;
2495         TDB_DATA data;
2496         struct client_async_data *async_data;
2497         struct ctdb_client_control_state *state;
2498
2499         if (argc != 0) {
2500                 usage();
2501         }
2502
2503         linenum = 1;
2504         killtcp = NULL;
2505         max_entries = 0;
2506         current = 0;
2507         while (!feof(stdin)) {
2508                 if (fgets(line, sizeof(line), stdin) == NULL) {
2509                         continue;
2510                 }
2511
2512                 /* Silently skip empty lines */
2513                 if (line[0] == '\n') {
2514                         continue;
2515                 }
2516
2517                 if (sscanf(line, "%s %s\n", src, dst) != 2) {
2518                         DEBUG(DEBUG_ERR, ("Bad line [%d]: '%s'\n",
2519                                           linenum, line));
2520                         talloc_free(killtcp);
2521                         return -1;
2522                 }
2523
2524                 if (current >= max_entries) {
2525                         max_entries += 1024;
2526                         killtcp = talloc_realloc(ctdb, killtcp,
2527                                                  struct ctdb_control_killtcp,
2528                                                  max_entries);
2529                         CTDB_NO_MEMORY(ctdb, killtcp);
2530                 }
2531
2532                 if (!parse_ip_port(src, &killtcp[current].src_addr)) {
2533                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2534                                           linenum, src));
2535                         talloc_free(killtcp);
2536                         return -1;
2537                 }
2538
2539                 if (!parse_ip_port(dst, &killtcp[current].dst_addr)) {
2540                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2541                                           linenum, dst));
2542                         talloc_free(killtcp);
2543                         return -1;
2544                 }
2545
2546                 current++;
2547         }
2548
2549         async_data = talloc_zero(ctdb, struct client_async_data);
2550         if (async_data == NULL) {
2551                 talloc_free(killtcp);
2552                 return -1;
2553         }
2554
2555         for (i = 0; i < current; i++) {
2556
2557                 data.dsize = sizeof(struct ctdb_control_killtcp);
2558                 data.dptr  = (unsigned char *)&killtcp[i];
2559
2560                 timeout = TIMELIMIT();
2561                 state = ctdb_control_send(ctdb, options.pnn, 0,
2562                                           CTDB_CONTROL_KILL_TCP, 0, data,
2563                                           async_data, &timeout, NULL);
2564
2565                 if (state == NULL) {
2566                         DEBUG(DEBUG_ERR,
2567                               ("Failed to call async killtcp control to node %u\n",
2568                                options.pnn));
2569                         talloc_free(killtcp);
2570                         return -1;
2571                 }
2572                 
2573                 ctdb_client_async_add(async_data, state);
2574         }
2575
2576         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2577                 DEBUG(DEBUG_ERR,("killtcp failed\n"));
2578                 talloc_free(killtcp);
2579                 return -1;
2580         }
2581
2582         talloc_free(killtcp);
2583         return 0;
2584 }
2585
2586
2587 /*
2588   kill a tcp connection
2589  */
2590 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2591 {
2592         int ret;
2593         struct ctdb_control_killtcp killtcp;
2594
2595         assert_single_node_only();
2596
2597         if (argc == 0) {
2598                 return kill_tcp_from_file(ctdb, argc, argv);
2599         }
2600
2601         if (argc < 2) {
2602                 usage();
2603         }
2604
2605         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2606                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2607                 return -1;
2608         }
2609
2610         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2611                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2612                 return -1;
2613         }
2614
2615         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2616         if (ret != 0) {
2617                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2618                 return ret;
2619         }
2620
2621         return 0;
2622 }
2623
2624
2625 /*
2626   send a gratious arp
2627  */
2628 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2629 {
2630         int ret;
2631         ctdb_sock_addr addr;
2632
2633         assert_single_node_only();
2634
2635         if (argc < 2) {
2636                 usage();
2637         }
2638
2639         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2640                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2641                 return -1;
2642         }
2643
2644         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2645         if (ret != 0) {
2646                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2647                 return ret;
2648         }
2649
2650         return 0;
2651 }
2652
2653 /*
2654   register a server id
2655  */
2656 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2657 {
2658         int ret;
2659         struct ctdb_server_id server_id;
2660
2661         if (argc < 3) {
2662                 usage();
2663         }
2664
2665         server_id.pnn       = strtoul(argv[0], NULL, 0);
2666         server_id.type      = strtoul(argv[1], NULL, 0);
2667         server_id.server_id = strtoul(argv[2], NULL, 0);
2668
2669         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2670         if (ret != 0) {
2671                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2672                 return ret;
2673         }
2674         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2675         sleep(999);
2676         return -1;
2677 }
2678
2679 /*
2680   unregister a server id
2681  */
2682 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2683 {
2684         int ret;
2685         struct ctdb_server_id server_id;
2686
2687         if (argc < 3) {
2688                 usage();
2689         }
2690
2691         server_id.pnn       = strtoul(argv[0], NULL, 0);
2692         server_id.type      = strtoul(argv[1], NULL, 0);
2693         server_id.server_id = strtoul(argv[2], NULL, 0);
2694
2695         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2696         if (ret != 0) {
2697                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2698                 return ret;
2699         }
2700         return -1;
2701 }
2702
2703 /*
2704   check if a server id exists
2705  */
2706 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2707 {
2708         uint32_t status;
2709         int ret;
2710         struct ctdb_server_id server_id;
2711
2712         if (argc < 3) {
2713                 usage();
2714         }
2715
2716         server_id.pnn       = strtoul(argv[0], NULL, 0);
2717         server_id.type      = strtoul(argv[1], NULL, 0);
2718         server_id.server_id = strtoul(argv[2], NULL, 0);
2719
2720         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2721         if (ret != 0) {
2722                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2723                 return ret;
2724         }
2725
2726         if (status) {
2727                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2728         } else {
2729                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2730         }
2731         return 0;
2732 }
2733
2734 /*
2735   get a list of all server ids that are registered on a node
2736  */
2737 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2738 {
2739         int i, ret;
2740         struct ctdb_server_id_list *server_ids;
2741
2742         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2743         if (ret != 0) {
2744                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2745                 return ret;
2746         }
2747
2748         for (i=0; i<server_ids->num; i++) {
2749                 printf("Server id %d:%d:%d\n", 
2750                         server_ids->server_ids[i].pnn, 
2751                         server_ids->server_ids[i].type, 
2752                         server_ids->server_ids[i].server_id); 
2753         }
2754
2755         return -1;
2756 }
2757
2758 /*
2759   check if a server id exists
2760  */
2761 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2762 {
2763         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2764         uint64_t *ids;
2765         uint8_t *result;
2766         int i;
2767
2768         if (argc < 1) {
2769                 talloc_free(tmp_ctx);
2770                 usage();
2771         }
2772
2773         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2774         result = talloc_array(tmp_ctx, uint8_t, argc);
2775
2776         for (i = 0; i < argc; i++) {
2777                 ids[i] = strtoull(argv[i], NULL, 0);
2778         }
2779
2780         if (!ctdb_client_check_message_handlers(ctdb, ids, argc, result)) {
2781                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2782                                   options.pnn));
2783                 talloc_free(tmp_ctx);
2784                 return -1;
2785         }
2786
2787         for (i=0; i < argc; i++) {
2788                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2789                        result[i] ? "exists" : "does not exist");
2790         }
2791
2792         talloc_free(tmp_ctx);
2793         return 0;
2794 }
2795
2796 /*
2797   send a tcp tickle ack
2798  */
2799 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2800 {
2801         int ret;
2802         ctdb_sock_addr  src, dst;
2803
2804         if (argc < 2) {
2805                 usage();
2806         }
2807
2808         if (!parse_ip_port(argv[0], &src)) {
2809                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2810                 return -1;
2811         }
2812
2813         if (!parse_ip_port(argv[1], &dst)) {
2814                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2815                 return -1;
2816         }
2817
2818         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2819         if (ret==0) {
2820                 return 0;
2821         }
2822         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2823
2824         return -1;
2825 }
2826
2827
2828 /*
2829   display public ip status
2830  */
2831 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2832 {
2833         int i, ret;
2834         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2835         struct ctdb_all_public_ips *ips;
2836
2837         if (options.pnn == CTDB_BROADCAST_ALL) {
2838                 /* read the list of public ips from all nodes */
2839                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2840         } else {
2841                 /* read the public ip list from this node */
2842                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2843         }
2844         if (ret != 0) {
2845                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2846                 talloc_free(tmp_ctx);
2847                 return ret;
2848         }
2849
2850         if (options.machinereadable){
2851                 printf(":Public IP:Node:");
2852                 if (options.verbose){
2853                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2854                 }
2855                 printf("\n");
2856         } else {
2857                 if (options.pnn == CTDB_BROADCAST_ALL) {
2858                         printf("Public IPs on ALL nodes\n");
2859                 } else {
2860                         printf("Public IPs on node %u\n", options.pnn);
2861                 }
2862         }
2863
2864         for (i=1;i<=ips->num;i++) {
2865                 struct ctdb_control_public_ip_info *info = NULL;
2866                 int32_t pnn;
2867                 char *aciface = NULL;
2868                 char *avifaces = NULL;
2869                 char *cifaces = NULL;
2870
2871                 if (options.pnn == CTDB_BROADCAST_ALL) {
2872                         pnn = ips->ips[ips->num-i].pnn;
2873                 } else {
2874                         pnn = options.pnn;
2875                 }
2876
2877                 if (pnn != -1) {
2878                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
2879                                                    &ips->ips[ips->num-i].addr, &info);
2880                 } else {
2881                         ret = -1;
2882                 }
2883
2884                 if (ret == 0) {
2885                         int j;
2886                         for (j=0; j < info->num; j++) {
2887                                 if (cifaces == NULL) {
2888                                         cifaces = talloc_strdup(info,
2889                                                                 info->ifaces[j].name);
2890                                 } else {
2891                                         cifaces = talloc_asprintf_append(cifaces,
2892                                                                          ",%s",
2893                                                                          info->ifaces[j].name);
2894                                 }
2895
2896                                 if (info->active_idx == j) {
2897                                         aciface = info->ifaces[j].name;
2898                                 }
2899
2900                                 if (info->ifaces[j].link_state == 0) {
2901                                         continue;
2902                                 }
2903
2904                                 if (avifaces == NULL) {
2905                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
2906                                 } else {
2907                                         avifaces = talloc_asprintf_append(avifaces,
2908                                                                           ",%s",
2909                                                                           info->ifaces[j].name);
2910                                 }
2911                         }
2912                 }
2913
2914                 if (options.machinereadable){
2915                         printf(":%s:%d:",
2916                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2917                                 ips->ips[ips->num-i].pnn);
2918                         if (options.verbose){
2919                                 printf("%s:%s:%s:",
2920                                         aciface?aciface:"",
2921                                         avifaces?avifaces:"",
2922                                         cifaces?cifaces:"");
2923                         }
2924                         printf("\n");
2925                 } else {
2926                         if (options.verbose) {
2927                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
2928                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2929                                         ips->ips[ips->num-i].pnn,
2930                                         aciface?aciface:"",
2931                                         avifaces?avifaces:"",
2932                                         cifaces?cifaces:"");
2933                         } else {
2934                                 printf("%s %d\n",
2935                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2936                                         ips->ips[ips->num-i].pnn);
2937                         }
2938                 }
2939                 talloc_free(info);
2940         }
2941
2942         talloc_free(tmp_ctx);
2943         return 0;
2944 }
2945
2946 /*
2947   public ip info
2948  */
2949 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
2950 {
2951         int i, ret;
2952         ctdb_sock_addr addr;
2953         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2954         struct ctdb_control_public_ip_info *info;
2955
2956         if (argc != 1) {
2957                 talloc_free(tmp_ctx);
2958                 usage();
2959         }
2960
2961         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2962                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2963                 return -1;
2964         }
2965
2966         /* read the public ip info from this node */
2967         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
2968                                            tmp_ctx, &addr, &info);
2969         if (ret != 0) {
2970                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
2971                                   argv[0], options.pnn));
2972                 talloc_free(tmp_ctx);
2973                 return ret;
2974         }
2975
2976         printf("Public IP[%s] info on node %u\n",
2977                ctdb_addr_to_str(&info->ip.addr),
2978                options.pnn);
2979
2980         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
2981                ctdb_addr_to_str(&info->ip.addr),
2982                info->ip.pnn, info->num);
2983
2984         for (i=0; i<info->num; i++) {
2985                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2986
2987                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
2988                        i+1, info->ifaces[i].name,
2989                        info->ifaces[i].link_state?"up":"down",
2990                        (unsigned int)info->ifaces[i].references,
2991                        (i==info->active_idx)?" (active)":"");
2992         }
2993
2994         talloc_free(tmp_ctx);
2995         return 0;
2996 }
2997
2998 /*
2999   display interfaces status
3000  */
3001 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
3002 {
3003         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3004         int i;
3005         struct ctdb_control_get_ifaces *ifaces;
3006         int ret;
3007
3008         /* read the public ip list from this node */
3009         ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ifaces);
3010         if (ret != 0) {
3011                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
3012                                   options.pnn));
3013                 talloc_free(tmp_ctx);
3014                 return -1;
3015         }
3016
3017         if (options.machinereadable){
3018                 printf(":Name:LinkStatus:References:\n");
3019         } else {
3020                 printf("Interfaces on node %u\n", options.pnn);
3021         }
3022
3023         for (i=0; i<ifaces->num; i++) {
3024                 if (options.machinereadable){
3025                         printf(":%s:%s:%u\n",
3026                                ifaces->ifaces[i].name,
3027                                ifaces->ifaces[i].link_state?"1":"0",
3028                                (unsigned int)ifaces->ifaces[i].references);
3029                 } else {
3030                         printf("name:%s link:%s references:%u\n",
3031                                ifaces->ifaces[i].name,
3032                                ifaces->ifaces[i].link_state?"up":"down",
3033                                (unsigned int)ifaces->ifaces[i].references);
3034                 }
3035         }
3036
3037         talloc_free(tmp_ctx);
3038         return 0;
3039 }
3040
3041
3042 /*
3043   set link status of an interface
3044  */
3045 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
3046 {
3047         int ret;
3048         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3049         struct ctdb_control_iface_info info;
3050
3051         ZERO_STRUCT(info);
3052
3053         if (argc != 2) {
3054                 usage();
3055         }
3056
3057         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
3058                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
3059                                   argv[0]));
3060                 talloc_free(tmp_ctx);
3061                 return -1;
3062         }
3063         strcpy(info.name, argv[0]);
3064
3065         if (strcmp(argv[1], "up") == 0) {
3066                 info.link_state = 1;
3067         } else if (strcmp(argv[1], "down") == 0) {
3068                 info.link_state = 0;
3069         } else {
3070                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
3071                                   argv[1]));
3072                 talloc_free(tmp_ctx);
3073                 return -1;
3074         }
3075
3076         /* read the public ip list from this node */
3077         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
3078                                    tmp_ctx, &info);
3079         if (ret != 0) {
3080                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
3081                                   argv[0], options.pnn));
3082                 talloc_free(tmp_ctx);
3083                 return ret;
3084         }
3085
3086         talloc_free(tmp_ctx);
3087         return 0;
3088 }
3089
3090 /*
3091   display pid of a ctdb daemon
3092  */
3093 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
3094 {
3095         uint32_t pid;
3096         int ret;
3097
3098         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
3099         if (ret != 0) {
3100                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
3101                 return ret;
3102         }
3103         printf("Pid:%d\n", pid);
3104
3105         return 0;
3106 }
3107
3108 typedef bool update_flags_handler_t(struct ctdb_context *ctdb, void *data);
3109
3110 static int update_flags_and_ipreallocate(struct ctdb_context *ctdb,
3111                                               void *data,
3112                                               update_flags_handler_t handler,
3113                                               uint32_t flag,
3114                                               const char *desc,
3115                                               bool set_flag)
3116 {
3117         struct ctdb_node_map *nodemap = NULL;
3118         bool flag_is_set;
3119         int ret;
3120
3121         /* Check if the node is already in the desired state */
3122         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3123         if (ret != 0) {
3124                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3125                 exit(10);
3126         }
3127         flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3128         if (set_flag == flag_is_set) {
3129                 DEBUG(DEBUG_NOTICE, ("Node %d is %s %s\n", options.pnn,
3130                                      (set_flag ? "already" : "not"), desc));
3131                 return 0;
3132         }
3133
3134         do {
3135                 if (!handler(ctdb, data)) {
3136                         DEBUG(DEBUG_WARNING,
3137                               ("Failed to send control to set state %s on node %u, try again\n",
3138                                desc, options.pnn));
3139                 }
3140
3141                 sleep(1);
3142
3143                 /* Read the nodemap and verify the change took effect.
3144                  * Even if the above control/hanlder timed out then it
3145                  * could still have worked!
3146                  */
3147                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
3148                                          ctdb, &nodemap);
3149                 if (ret != 0) {
3150                         DEBUG(DEBUG_WARNING,
3151                               ("Unable to get nodemap from local node, try again\n"));
3152                 }
3153                 flag_is_set = nodemap->nodes[options.pnn].flags & flag;
3154         } while (nodemap == NULL || (set_flag != flag_is_set));
3155
3156         return ipreallocate(ctdb);
3157 }
3158
3159 /* Administratively disable a node */
3160 static bool update_flags_disabled(struct ctdb_context *ctdb, void *data)
3161 {
3162         int ret;
3163
3164         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3165                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
3166         return ret == 0;
3167 }
3168
3169 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
3170 {
3171         return update_flags_and_ipreallocate(ctdb, NULL,
3172                                                   update_flags_disabled,
3173                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3174                                                   "disabled",
3175                                                   true /* set_flag*/);
3176 }
3177
3178 /* Administratively re-enable a node */
3179 static bool update_flags_not_disabled(struct ctdb_context *ctdb, void *data)
3180 {
3181         int ret;
3182
3183         ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
3184                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
3185         return ret == 0;
3186 }
3187
3188 static int control_enable(struct ctdb_context *ctdb,  int argc, const char **argv)
3189 {
3190         return update_flags_and_ipreallocate(ctdb, NULL,
3191                                                   update_flags_not_disabled,
3192                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
3193                                                   "disabled",
3194                                                   false /* set_flag*/);
3195 }
3196
3197 /* Stop a node */
3198 static bool update_flags_stopped(struct ctdb_context *ctdb, void *data)
3199 {
3200         int ret;
3201
3202         ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
3203
3204         return ret == 0;
3205 }
3206
3207 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
3208 {
3209         return update_flags_and_ipreallocate(ctdb, NULL,
3210                                                   update_flags_stopped,
3211                                                   NODE_FLAGS_STOPPED,
3212                                                   "stopped",
3213                                                   true /* set_flag*/);
3214 }
3215
3216 /* Continue a stopped node */
3217 static bool update_flags_not_stopped(struct ctdb_context *ctdb, void *data)
3218 {
3219         int ret;
3220
3221         ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
3222
3223         return ret == 0;
3224 }
3225
3226 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
3227 {
3228         return update_flags_and_ipreallocate(ctdb, NULL,
3229                                                   update_flags_not_stopped,
3230                                                   NODE_FLAGS_STOPPED,
3231                                                   "stopped",
3232                                                   false /* set_flag */);
3233 }
3234
3235 static uint32_t get_generation(struct ctdb_context *ctdb)
3236 {
3237         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3238         struct ctdb_vnn_map *vnnmap=NULL;
3239         int ret;
3240         uint32_t generation;
3241
3242         /* wait until the recmaster is not in recovery mode */
3243         while (1) {
3244                 uint32_t recmode, recmaster;
3245                 
3246                 if (vnnmap != NULL) {
3247                         talloc_free(vnnmap);
3248                         vnnmap = NULL;
3249                 }
3250
3251                 /* get the recmaster */
3252                 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
3253                 if (ret != 0) {
3254                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
3255                         talloc_free(tmp_ctx);
3256                         exit(10);
3257                 }
3258
3259                 /* get recovery mode */
3260                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), recmaster, &recmode);
3261                 if (ret != 0) {
3262                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
3263                         talloc_free(tmp_ctx);
3264                         exit(10);
3265                 }
3266
3267                 /* get the current generation number */
3268                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, tmp_ctx, &vnnmap);
3269                 if (ret != 0) {
3270                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
3271                         talloc_free(tmp_ctx);
3272                         exit(10);
3273                 }
3274
3275                 if ((recmode == CTDB_RECOVERY_NORMAL) && (vnnmap->generation != 1)) {
3276                         generation = vnnmap->generation;
3277                         talloc_free(tmp_ctx);
3278                         return generation;
3279                 }
3280                 sleep(1);
3281         }
3282 }
3283
3284 /* Ban a node */
3285 static bool update_state_banned(struct ctdb_context *ctdb, void *data)
3286 {
3287         struct ctdb_ban_time *bantime = (struct ctdb_ban_time *)data;
3288         int ret;
3289
3290         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, bantime);
3291
3292         return ret == 0;
3293 }
3294
3295 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
3296 {
3297         struct ctdb_ban_time bantime;
3298
3299         if (argc < 1) {
3300                 usage();
3301         }
3302         
3303         bantime.pnn  = options.pnn;
3304         bantime.time = strtoul(argv[0], NULL, 0);
3305
3306         if (bantime.time == 0) {
3307                 DEBUG(DEBUG_ERR, ("Invalid ban time specified - must be >0\n"));
3308                 return -1;
3309         }
3310
3311         return update_flags_and_ipreallocate(ctdb, &bantime,
3312                                                   update_state_banned,
3313                                                   NODE_FLAGS_BANNED,
3314                                                   "banned",
3315                                                   true /* set_flag*/);
3316 }
3317
3318
3319 /* Unban a node */
3320 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3321 {
3322         struct ctdb_ban_time bantime;
3323
3324         bantime.pnn  = options.pnn;
3325         bantime.time = 0;
3326
3327         return update_flags_and_ipreallocate(ctdb, &bantime,
3328                                                   update_state_banned,
3329                                                   NODE_FLAGS_BANNED,
3330                                                   "banned",
3331                                                   false /* set_flag*/);
3332 }
3333
3334 /*
3335   show ban information for a node
3336  */
3337 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3338 {
3339         int ret;
3340         struct ctdb_node_map *nodemap=NULL;
3341         struct ctdb_ban_time *bantime;
3342
3343         /* verify the node exists */
3344         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3345         if (ret != 0) {
3346                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3347                 return ret;
3348         }
3349
3350         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3351         if (ret != 0) {
3352                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3353                 return -1;
3354         }       
3355
3356         if (bantime->time == 0) {
3357                 printf("Node %u is not banned\n", bantime->pnn);
3358         } else {
3359                 printf("Node %u is banned, %d seconds remaining\n",
3360                        bantime->pnn, bantime->time);
3361         }
3362
3363         return 0;
3364 }
3365
3366 /*
3367   shutdown a daemon
3368  */
3369 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3370 {
3371         int ret;
3372
3373         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3374         if (ret != 0) {
3375                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3376                 return ret;
3377         }
3378
3379         return 0;
3380 }
3381
3382 /*
3383   trigger a recovery
3384  */
3385 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3386 {
3387         int ret;
3388         uint32_t generation, next_generation;
3389         bool force;
3390
3391         /* "force" option ignores freeze failure and forces recovery */
3392         force = (argc == 1) && (strcasecmp(argv[0], "force") == 0);
3393
3394         /* record the current generation number */
3395         generation = get_generation(ctdb);
3396
3397         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
3398         if (ret != 0) {
3399                 if (!force) {
3400                         DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
3401                         return ret;
3402                 }
3403                 DEBUG(DEBUG_WARNING, ("Unable to freeze node but proceeding because \"force\" option given\n"));
3404         }
3405
3406         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3407         if (ret != 0) {
3408                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3409                 return ret;
3410         }
3411
3412         /* wait until we are in a new generation */
3413         while (1) {
3414                 next_generation = get_generation(ctdb);
3415                 if (next_generation != generation) {
3416                         return 0;
3417                 }
3418                 sleep(1);
3419         }
3420
3421         return 0;
3422 }
3423
3424
3425 /*
3426   display monitoring mode of a remote node
3427  */
3428 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3429 {
3430         uint32_t monmode;
3431         int ret;
3432
3433         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3434         if (ret != 0) {
3435                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3436                 return ret;
3437         }
3438         if (!options.machinereadable){
3439                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3440         } else {
3441                 printf(":mode:\n");
3442                 printf(":%d:\n",monmode);
3443         }
3444         return 0;
3445 }
3446
3447
3448 /*
3449   display capabilities of a remote node
3450  */
3451 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3452 {
3453         uint32_t capabilities;
3454         int ret;
3455
3456         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
3457         if (ret != 0) {
3458                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3459                 return -1;
3460         }
3461         
3462         if (!options.machinereadable){
3463                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3464                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3465                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3466                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3467         } else {
3468                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
3469                 printf(":%d:%d:%d:%d:\n",
3470                         !!(capabilities&CTDB_CAP_RECMASTER),
3471                         !!(capabilities&CTDB_CAP_LMASTER),
3472                         !!(capabilities&CTDB_CAP_LVS),
3473                         !!(capabilities&CTDB_CAP_NATGW));
3474         }
3475         return 0;
3476 }
3477
3478 /*
3479   display lvs configuration
3480  */
3481 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3482 {
3483         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3484         uint32_t *capabilities;
3485         struct ctdb_node_map *nodemap=NULL;
3486         int i, ret;
3487         int healthy_count = 0;
3488
3489         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
3490         if (ret != 0) {
3491                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3492                 talloc_free(tmp_ctx);
3493                 return -1;
3494         }
3495
3496         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3497         CTDB_NO_MEMORY(ctdb, capabilities);
3498         
3499         ret = 0;
3500
3501         /* collect capabilities for all connected nodes */
3502         for (i=0; i<nodemap->num; i++) {
3503                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3504                         continue;
3505                 }
3506                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3507                         continue;
3508                 }
3509
3510                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3511                 if (ret != 0) {
3512                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3513                         ret = -1;
3514                         goto done;
3515                 }
3516
3517                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3518                         continue;
3519                 }
3520
3521                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3522                         healthy_count++;
3523                 }
3524         }
3525
3526         /* Print all LVS nodes */
3527         for (i=0; i<nodemap->num; i++) {
3528                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3529                         continue;
3530                 }
3531                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3532                         continue;
3533                 }
3534                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3535                         continue;
3536                 }
3537
3538                 if (healthy_count != 0) {
3539                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3540                                 continue;
3541                         }
3542                 }
3543
3544                 printf("%d:%s\n", i, 
3545                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
3546         }
3547
3548 done:
3549         talloc_free(tmp_ctx);
3550         return ret;
3551 }
3552
3553 /*
3554   display who is the lvs master
3555  */
3556 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3557 {
3558         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3559         uint32_t *capabilities;
3560         struct ctdb_node_map *nodemap=NULL;
3561         int i, ret;
3562         int healthy_count = 0;
3563
3564         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
3565         if (ret != 0) {
3566                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3567                 talloc_free(tmp_ctx);
3568                 return -1;
3569         }
3570
3571         capabilities = talloc_array(tmp_ctx, uint32_t, nodemap->num);
3572         if (capabilities == NULL) {
3573                 talloc_free(tmp_ctx);
3574                 CTDB_NO_MEMORY(ctdb, capabilities);
3575         }
3576
3577         /* collect capabilities for all connected nodes */
3578         for (i=0; i<nodemap->num; i++) {
3579                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3580                         continue;
3581                 }
3582                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3583                         continue;
3584                 }
3585         
3586                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3587                 if (ret != 0) {
3588                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3589                         ret = -1;
3590                         goto done;
3591                 }
3592
3593                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3594                         continue;
3595                 }
3596
3597                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3598                         healthy_count++;
3599                 }
3600         }
3601
3602         ret = -1;
3603
3604         /* find and show the lvsmaster */
3605         for (i=0; i<nodemap->num; i++) {
3606                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3607                         continue;
3608                 }
3609                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3610                         continue;
3611                 }
3612                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3613                         continue;
3614                 }
3615
3616                 if (healthy_count != 0) {
3617                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3618                                 continue;
3619                         }
3620                 }
3621
3622                 if (options.machinereadable){
3623                         printf("%d\n", i);
3624                 } else {
3625                         printf("Node %d is LVS master\n", i);
3626                 }
3627                 ret = 0;
3628                 goto done;
3629         }
3630
3631         printf("There is no LVS master\n");
3632 done:
3633         talloc_free(tmp_ctx);
3634         return ret;
3635 }
3636
3637 /*
3638   disable monitoring on a  node
3639  */
3640 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3641 {
3642         
3643         int ret;
3644
3645         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3646         if (ret != 0) {
3647                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3648                 return ret;
3649         }
3650         printf("Monitoring mode:%s\n","DISABLED");
3651
3652         return 0;
3653 }
3654
3655 /*
3656   enable monitoring on a  node
3657  */
3658 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3659 {
3660         
3661         int ret;
3662
3663         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3664         if (ret != 0) {
3665                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3666                 return ret;
3667         }
3668         printf("Monitoring mode:%s\n","ACTIVE");
3669
3670         return 0;
3671 }
3672
3673 /*
3674   display remote list of keys/data for a db
3675  */
3676 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3677 {
3678         const char *db_name;
3679         struct ctdb_db_context *ctdb_db;
3680         int ret;
3681         struct ctdb_dump_db_context c;
3682         uint8_t flags;
3683
3684         if (argc < 1) {
3685                 usage();
3686         }
3687
3688         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3689                 return -1;
3690         }
3691
3692         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3693         if (ctdb_db == NULL) {
3694                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3695                 return -1;
3696         }
3697
3698         if (options.printlmaster) {
3699                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3700                                           ctdb, &ctdb->vnn_map);
3701                 if (ret != 0) {
3702                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3703                                           options.pnn));
3704                         return ret;
3705                 }
3706         }
3707
3708         ZERO_STRUCT(c);
3709         c.f = stdout;
3710         c.printemptyrecords = (bool)options.printemptyrecords;
3711         c.printdatasize = (bool)options.printdatasize;
3712         c.printlmaster = (bool)options.printlmaster;
3713         c.printhash = (bool)options.printhash;
3714         c.printrecordflags = (bool)options.printrecordflags;
3715
3716         /* traverse and dump the cluster tdb */
3717         ret = ctdb_dump_db(ctdb_db, &c);
3718         if (ret == -1) {
3719                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3720                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3721                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3722                                   db_name));
3723                 return -1;
3724         }
3725         talloc_free(ctdb_db);
3726
3727         printf("Dumped %d records\n", ret);
3728         return 0;
3729 }
3730
3731 struct cattdb_data {
3732         struct ctdb_context *ctdb;
3733         uint32_t count;
3734 };
3735
3736 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3737 {
3738         struct cattdb_data *d = private_data;
3739         struct ctdb_dump_db_context c;
3740
3741         d->count++;
3742
3743         ZERO_STRUCT(c);
3744         c.f = stdout;
3745         c.printemptyrecords = (bool)options.printemptyrecords;
3746         c.printdatasize = (bool)options.printdatasize;
3747         c.printlmaster = false;
3748         c.printhash = (bool)options.printhash;
3749         c.printrecordflags = true;
3750
3751         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3752 }
3753
3754 /*
3755   cat the local tdb database using same format as catdb
3756  */
3757 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3758 {
3759         const char *db_name;
3760         struct ctdb_db_context *ctdb_db;
3761         struct cattdb_data d;
3762         uint8_t flags;
3763
3764         if (argc < 1) {
3765                 usage();
3766         }
3767
3768         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3769                 return -1;
3770         }
3771
3772         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3773         if (ctdb_db == NULL) {
3774                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3775                 return -1;
3776         }
3777
3778         /* traverse the local tdb */
3779         d.count = 0;
3780         d.ctdb  = ctdb;
3781         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3782                 printf("Failed to cattdb data\n");
3783                 exit(10);
3784         }
3785         talloc_free(ctdb_db);
3786
3787         printf("Dumped %d records\n", d.count);
3788         return 0;
3789 }
3790
3791 /*
3792   display the content of a database key
3793  */
3794 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3795 {
3796         const char *db_name;
3797         struct ctdb_db_context *ctdb_db;
3798         struct ctdb_record_handle *h;
3799         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3800         TDB_DATA key, data;
3801         uint8_t flags;
3802
3803         if (argc < 2) {
3804                 usage();
3805         }
3806
3807         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3808                 return -1;
3809         }
3810
3811         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3812         if (ctdb_db == NULL) {
3813                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3814                 return -1;
3815         }
3816
3817         key.dptr  = discard_const(argv[1]);
3818         key.dsize = strlen((char *)key.dptr);
3819
3820         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3821         if (h == NULL) {
3822                 printf("Failed to fetch record '%s' on node %d\n", 
3823                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3824                 talloc_free(tmp_ctx);
3825                 exit(10);
3826         }
3827
3828         printf("Data: size:%d ptr:[%.*s]\n", (int)data.dsize, (int)data.dsize, data.dptr);
3829
3830         talloc_free(ctdb_db);
3831         talloc_free(tmp_ctx);
3832         return 0;
3833 }
3834
3835 /*
3836   display the content of a database key
3837  */
3838 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3839 {
3840         const char *db_name;
3841         struct ctdb_db_context *ctdb_db;
3842         struct ctdb_record_handle *h;
3843         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3844         TDB_DATA key, data;
3845         uint8_t flags;
3846
3847         if (argc < 3) {
3848                 usage();
3849         }
3850
3851         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3852                 return -1;
3853         }
3854
3855         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3856         if (ctdb_db == NULL) {
3857                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3858                 return -1;
3859         }
3860
3861         key.dptr  = discard_const(argv[1]);
3862         key.dsize = strlen((char *)key.dptr);
3863
3864         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3865         if (h == NULL) {
3866                 printf("Failed to fetch record '%s' on node %d\n", 
3867                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3868                 talloc_free(tmp_ctx);
3869                 exit(10);
3870         }
3871
3872         data.dptr  = discard_const(argv[2]);
3873         data.dsize = strlen((char *)data.dptr);
3874
3875         if (ctdb_record_store(h, data) != 0) {
3876                 printf("Failed to store record\n");
3877         }
3878
3879         talloc_free(h);
3880         talloc_free(ctdb_db);
3881         talloc_free(tmp_ctx);
3882         return 0;
3883 }
3884
3885 /*
3886   fetch a record from a persistent database
3887  */
3888 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3889 {
3890         const char *db_name;
3891         struct ctdb_db_context *ctdb_db;
3892         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3893         struct ctdb_transaction_handle *h;
3894         TDB_DATA key, data;
3895         int fd, ret;
3896         bool persistent;
3897         uint8_t flags;
3898
3899         if (argc < 2) {
3900                 talloc_free(tmp_ctx);
3901                 usage();
3902         }
3903
3904         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
3905                 talloc_free(tmp_ctx);
3906                 return -1;
3907         }
3908
3909         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
3910         if (!persistent) {
3911                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3912                 talloc_free(tmp_ctx);
3913                 return -1;
3914         }
3915
3916         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3917         if (ctdb_db == NULL) {
3918                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3919                 talloc_free(tmp_ctx);
3920                 return -1;
3921         }
3922
3923         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3924         if (h == NULL) {
3925                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3926                 talloc_free(tmp_ctx);
3927                 return -1;
3928         }
3929
3930         key.dptr  = discard_const(argv[1]);
3931         key.dsize = strlen(argv[1]);
3932         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3933         if (ret != 0) {
3934                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
3935                 talloc_free(tmp_ctx);
3936                 return -1;
3937         }
3938
3939         if (data.dsize == 0 || data.dptr == NULL) {
3940                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3941                 talloc_free(tmp_ctx);
3942                 return -1;
3943         }
3944
3945         if (argc == 3) {
3946           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3947                 if (fd == -1) {
3948                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3949                         talloc_free(tmp_ctx);
3950                         return -1;
3951                 }
3952                 write(fd, data.dptr, data.dsize);
3953                 close(fd);
3954         } else {
3955                 write(1, data.dptr, data.dsize);
3956         }
3957
3958         /* abort the transaction */
3959         talloc_free(h);
3960
3961
3962         talloc_free(tmp_ctx);
3963         return 0;
3964 }
3965
3966 /*
3967   fetch a record from a tdb-file
3968  */
3969 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3970 {
3971         const char *tdb_file;
3972         TDB_CONTEXT *tdb;
3973         TDB_DATA key, data;
3974         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3975         int fd;
3976
3977         if (argc < 2) {
3978                 usage();
3979         }
3980
3981         tdb_file = argv[0];
3982
3983         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
3984         if (tdb == NULL) {
3985                 printf("Failed to open TDB file %s\n", tdb_file);
3986                 return -1;
3987         }
3988
3989         if (!strncmp(argv[1], "0x", 2)) {
3990                 key = hextodata(tmp_ctx, argv[1] + 2);
3991                 if (key.dsize == 0) {
3992                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3993                         return -1;
3994                 }
3995         } else {
3996                 key.dptr  = discard_const(argv[1]);
3997                 key.dsize = strlen(argv[1]);
3998         }
3999
4000         data = tdb_fetch(tdb, key);
4001         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
4002                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
4003                 tdb_close(tdb);
4004                 return -1;
4005         }
4006
4007         tdb_close(tdb);
4008
4009         if (argc == 3) {
4010           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
4011                 if (fd == -1) {
4012                         printf("Failed to open output file %s\n", argv[2]);
4013                         return -1;
4014                 }
4015                 if (options.verbose){
4016                         write(fd, data.dptr, data.dsize);
4017                 } else {
4018                         write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4019                 }
4020                 close(fd);
4021         } else {
4022                 if (options.verbose){
4023                         write(1, data.dptr, data.dsize);
4024                 } else {
4025                         write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
4026                 }
4027         }
4028
4029         talloc_free(tmp_ctx);
4030         return 0;
4031 }
4032
4033 /*
4034   store a record and header to a tdb-file
4035  */
4036 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
4037 {
4038         const char *tdb_file;
4039         TDB_CONTEXT *tdb;
4040         TDB_DATA key, data;
4041         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4042
4043         if (argc < 3) {
4044                 usage();
4045         }
4046
4047         tdb_file = argv[0];
4048
4049         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
4050         if (tdb == NULL) {
4051                 printf("Failed to open TDB file %s\n", tdb_file);
4052                 return -1;
4053         }
4054
4055         if (!strncmp(argv[1], "0x", 2)) {
4056                 key = hextodata(tmp_ctx, argv[1] + 2);
4057                 if (key.dsize == 0) {
4058                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
4059                         return -1;
4060                 }
4061         } else {
4062                 key.dptr  = discard_const(argv[1]);
4063                 key.dsize = strlen(argv[1]);
4064         }
4065
4066         if (!strncmp(argv[2], "0x", 2)) {
4067                 data = hextodata(tmp_ctx, argv[2] + 2);
4068                 if (data.dsize == 0) {
4069                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
4070                         return -1;
4071                 }
4072         } else {
4073                 data.dptr  = discard_const(argv[2]);
4074                 data.dsize = strlen(argv[2]);
4075         }
4076
4077         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
4078                 printf("Not enough data. You must specify the full ctdb_ltdb_header too when storing\n");
4079                 return -1;
4080         }
4081         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
4082                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
4083                 tdb_close(tdb);
4084                 return -1;
4085         }
4086
4087         tdb_close(tdb);
4088
4089         talloc_free(tmp_ctx);
4090         return 0;
4091 }
4092
4093 /*
4094   write a record to a persistent database
4095  */
4096 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
4097 {
4098         const char *db_name;
4099         struct ctdb_db_context *ctdb_db;
4100         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4101         struct ctdb_transaction_handle *h;
4102         struct stat st;
4103         TDB_DATA key, data;
4104         int fd, ret;
4105
4106         if (argc < 3) {
4107                 talloc_free(tmp_ctx);
4108                 usage();
4109         }
4110
4111         fd = open(argv[2], O_RDONLY);
4112         if (fd == -1) {
4113                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
4114                 talloc_free(tmp_ctx);
4115                 return -1;
4116         }
4117         
4118         ret = fstat(fd, &st);
4119         if (ret == -1) {
4120                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
4121                 close(fd);
4122                 talloc_free(tmp_ctx);
4123                 return -1;
4124         }
4125
4126         if (!S_ISREG(st.st_mode)) {
4127                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
4128                 close(fd);
4129                 talloc_free(tmp_ctx);
4130                 return -1;
4131         }
4132
4133         data.dsize = st.st_size;
4134         if (data.dsize == 0) {
4135                 data.dptr  = NULL;
4136         } else {
4137                 data.dptr = talloc_size(tmp_ctx, data.dsize);
4138                 if (data.dptr == NULL) {
4139                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
4140                         close(fd);
4141                         talloc_free(tmp_ctx);
4142                         return -1;
4143                 }
4144                 ret = read(fd, data.dptr, data.dsize);
4145                 if (ret != data.dsize) {
4146                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
4147                         close(fd);
4148                         talloc_free(tmp_ctx);
4149                         return -1;
4150                 }
4151         }
4152         close(fd);
4153
4154
4155         db_name = argv[0];
4156
4157         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
4158         if (ctdb_db == NULL) {
4159                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4160                 talloc_free(tmp_ctx);
4161                 return -1;
4162         }
4163
4164         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4165         if (h == NULL) {
4166                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
4167                 talloc_free(tmp_ctx);
4168                 return -1;
4169         }
4170
4171         key.dptr  = discard_const(argv[1]);
4172         key.dsize = strlen(argv[1]);
4173         ret = ctdb_transaction_store(h, key, data);
4174         if (ret != 0) {
4175                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4176                 talloc_free(tmp_ctx);
4177                 return -1;
4178         }
4179
4180         ret = ctdb_transaction_commit(h);
4181         if (ret != 0) {
4182                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4183                 talloc_free(tmp_ctx);
4184                 return -1;
4185         }
4186
4187
4188         talloc_free(tmp_ctx);
4189         return 0;
4190 }
4191
4192 /*
4193  * delete a record from a persistent database
4194  */
4195 static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
4196 {
4197         const char *db_name;
4198         struct ctdb_db_context *ctdb_db;
4199         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4200         struct ctdb_transaction_handle *h;
4201         TDB_DATA key;
4202         int ret;
4203         bool persistent;
4204         uint8_t flags;
4205
4206         if (argc < 2) {
4207                 talloc_free(tmp_ctx);
4208                 usage();
4209         }
4210
4211         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
4212                 talloc_free(tmp_ctx);
4213                 return -1;
4214         }
4215
4216         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
4217         if (!persistent) {
4218                 DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
4219                 talloc_free(tmp_ctx);
4220                 return -1;
4221         }
4222
4223         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4224         if (ctdb_db == NULL) {
4225                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
4226                 talloc_free(tmp_ctx);
4227                 return -1;
4228         }
4229
4230         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4231         if (h == NULL) {
4232                 DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
4233                 talloc_free(tmp_ctx);
4234                 return -1;
4235         }
4236
4237         key.dptr = discard_const(argv[1]);
4238         key.dsize = strlen(argv[1]);
4239         ret = ctdb_transaction_store(h, key, tdb_null);
4240         if (ret != 0) {
4241                 DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
4242                 talloc_free(tmp_ctx);
4243                 return -1;
4244         }
4245
4246         ret = ctdb_transaction_commit(h);
4247         if (ret != 0) {
4248                 DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
4249                 talloc_free(tmp_ctx);
4250                 return -1;
4251         }
4252
4253         talloc_free(tmp_ctx);
4254         return 0;
4255 }
4256
4257 /*
4258   check if a service is bound to a port or not
4259  */
4260 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
4261 {
4262         int s, ret;
4263         int v;
4264         int port;
4265         struct sockaddr_in sin;
4266
4267         if (argc != 1) {
4268                 printf("Use: ctdb chktcport <port>\n");
4269                 return EINVAL;
4270         }
4271
4272         port = atoi(argv[0]);
4273
4274         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
4275         if (s == -1) {
4276                 printf("Failed to open local socket\n");
4277                 return errno;
4278         }
4279
4280         v = fcntl(s, F_GETFL, 0);
4281         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK) != 0) {
4282                 printf("Unable to set socket non-blocking: %s\n", strerror(errno));
4283         }
4284
4285         bzero(&sin, sizeof(sin));
4286         sin.sin_family = PF_INET;
4287         sin.sin_port   = htons(port);
4288         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
4289         close(s);
4290         if (ret == -1) {
4291                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
4292                 return errno;
4293         }
4294
4295         return 0;
4296 }
4297
4298
4299
4300 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4301                              TDB_DATA data, void *private_data)
4302 {
4303         DEBUG(DEBUG_ERR,("Log data received\n"));
4304         if (data.dsize > 0) {
4305                 printf("%s", data.dptr);
4306         }
4307
4308         exit(0);
4309 }
4310
4311 /*
4312   display a list of log messages from the in memory ringbuffer
4313  */
4314 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
4315 {
4316         int ret, i;
4317         bool main_daemon;
4318         struct ctdb_get_log_addr log_addr;
4319         TDB_DATA data;
4320         struct timeval tv;
4321
4322         /* Process options */
4323         main_daemon = true;
4324         log_addr.pnn = ctdb_get_pnn(ctdb);
4325         log_addr.level = DEBUG_NOTICE;
4326         for (i = 0; i < argc; i++) {
4327                 if (strcmp(argv[i], "recoverd") == 0) {
4328                         main_daemon = false;
4329                 } else {
4330                         if (isalpha(argv[i][0]) || argv[i][0] == '-') { 
4331                                 log_addr.level = get_debug_by_desc(argv[i]);
4332                         } else {
4333                                 log_addr.level = strtol(argv[i], NULL, 0);
4334                         }
4335                 }
4336         }
4337
4338         /* Our message port is our PID */
4339         log_addr.srvid = getpid();
4340
4341         data.dptr = (unsigned char *)&log_addr;
4342         data.dsize = sizeof(log_addr);
4343
4344         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
4345
4346         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
4347         sleep(1);
4348
4349         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
4350
4351         if (main_daemon) {
4352                 int32_t res;
4353                 char *errmsg;
4354                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4355
4356                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
4357                                    0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4358                 if (ret != 0 || res != 0) {
4359                         DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
4360                         talloc_free(tmp_ctx);
4361                         return -1;
4362                 }
4363                 talloc_free(tmp_ctx);
4364         } else {
4365                 ret = ctdb_client_send_message(ctdb, options.pnn,
4366                                                CTDB_SRVID_GETLOG, data);
4367                 if (ret != 0) {
4368                         DEBUG(DEBUG_ERR,("Failed to send getlog request message to %u\n", options.pnn));
4369                         return -1;
4370                 }
4371         }
4372
4373         tv = timeval_current();
4374         /* this loop will terminate when we have received the reply */
4375         while (timeval_elapsed(&tv) < (double)options.timelimit) {
4376                 event_loop_once(ctdb->ev);
4377         }
4378
4379         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
4380
4381         return 0;
4382 }
4383
4384 /*
4385   clear the in memory log area
4386  */
4387 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
4388 {
4389         int ret;
4390
4391         if (argc == 0 || (argc >= 1 && strcmp(argv[0], "recoverd") != 0)) {
4392                 /* "recoverd" not given - get logs from main daemon */
4393                 int32_t res;
4394                 char *errmsg;
4395                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4396
4397                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
4398                                    0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4399                 if (ret != 0 || res != 0) {
4400                         DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
4401                         talloc_free(tmp_ctx);
4402                         return -1;
4403                 }
4404
4405                 talloc_free(tmp_ctx);
4406         } else {
4407                 TDB_DATA data; /* unused in recoverd... */
4408                 data.dsize = 0;
4409
4410                 ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_CLEARLOG, data);
4411                 if (ret != 0) {
4412                         DEBUG(DEBUG_ERR,("Failed to send clearlog request message to %u\n", options.pnn));
4413                         return -1;
4414                 }
4415         }
4416
4417         return 0;
4418 }
4419
4420 /* Reload public IPs on a specified nodes */
4421 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4422 {
4423         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4424         uint32_t *nodes;
4425         uint32_t pnn_mode;
4426         uint32_t timeout;
4427         int ret;
4428
4429         assert_single_node_only();
4430
4431         if (argc > 1) {
4432                 usage();
4433         }
4434
4435         /* Determine the nodes where IPs need to be reloaded */
4436         if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
4437                               options.pnn, true, &nodes, &pnn_mode)) {
4438                 ret = -1;
4439                 goto done;
4440         }
4441
4442 again:
4443         /* Disable takeover runs on all connected nodes.  A reply
4444          * indicating success is needed from each node so all nodes
4445          * will need to be active.  This will retry until maxruntime
4446          * is exceeded, hence no error handling.
4447          * 
4448          * A check could be added to not allow reloading of IPs when
4449          * there are disconnected nodes.  However, this should
4450          * probably be left up to the administrator.
4451          */
4452         timeout = LONGTIMEOUT;
4453         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4454                         "Disable takeover runs", true);
4455
4456         /* Now tell all the desired nodes to reload their public IPs.
4457          * Keep trying this until it succeeds.  This assumes all
4458          * failures are transient, which might not be true...
4459          */
4460         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4461                                       nodes, 0, LONGTIMELIMIT(),
4462                                       false, tdb_null,
4463                                       NULL, NULL, NULL) != 0) {
4464                 DEBUG(DEBUG_ERR,
4465                       ("Unable to reload IPs on some nodes, try again.\n"));
4466                 goto again;
4467         }
4468
4469         /* It isn't strictly necessary to wait until takeover runs are
4470          * re-enabled but doing so can't hurt.
4471          */
4472         timeout = 0;
4473         srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
4474                         "Enable takeover runs", true);
4475
4476         ipreallocate(ctdb);
4477
4478         ret = 0;
4479 done:
4480         talloc_free(tmp_ctx);
4481         return ret;
4482 }
4483
4484 /*
4485   display a list of the databases on a remote ctdb
4486  */
4487 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4488 {
4489         int i, ret;
4490         struct ctdb_dbid_map *dbmap=NULL;
4491
4492         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4493         if (ret != 0) {
4494                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4495                 return ret;
4496         }
4497
4498         if(options.machinereadable){
4499                 printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4500                 for(i=0;i<dbmap->num;i++){
4501                         const char *path;
4502                         const char *name;
4503                         const char *health;
4504                         bool persistent;
4505                         bool readonly;
4506                         bool sticky;
4507
4508                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4509                                             dbmap->dbs[i].dbid, ctdb, &path);
4510                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4511                                             dbmap->dbs[i].dbid, ctdb, &name);
4512                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4513                                               dbmap->dbs[i].dbid, ctdb, &health);
4514                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4515                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4516                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4517                         printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4518                                dbmap->dbs[i].dbid, name, path,
4519                                !!(persistent), !!(sticky),
4520                                !!(health), !!(readonly));
4521                 }
4522                 return 0;
4523         }
4524
4525         printf("Number of databases:%d\n", dbmap->num);
4526         for(i=0;i<dbmap->num;i++){
4527                 const char *path;
4528                 const char *name;
4529                 const char *health;
4530                 bool persistent;
4531                 bool readonly;
4532                 bool sticky;
4533
4534                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4535                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4536                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4537                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4538                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4539                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4540                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4541                        dbmap->dbs[i].dbid, name, path,
4542                        persistent?" PERSISTENT":"",
4543                        sticky?" STICKY":"",
4544                        readonly?" READONLY":"",
4545                        health?" UNHEALTHY":"");
4546         }
4547
4548         return 0;
4549 }
4550
4551 /*
4552   display the status of a database on a remote ctdb
4553  */
4554 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4555 {
4556         const char *db_name;
4557         uint32_t db_id;
4558         uint8_t flags;
4559         const char *path;
4560         const char *health;
4561
4562         if (argc < 1) {
4563                 usage();
4564         }
4565
4566         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
4567                 return -1;
4568         }
4569
4570         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
4571         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
4572         printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4573                db_id, db_name, path,
4574                (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
4575                (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
4576                (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
4577                (health ? health : "OK"));
4578
4579         return 0;
4580 }
4581
4582 /*
4583   check if the local node is recmaster or not
4584   it will return 1 if this node is the recmaster and 0 if it is not
4585   or if the local ctdb daemon could not be contacted
4586  */
4587 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4588 {
4589         uint32_t mypnn, recmaster;
4590         int ret;
4591
4592         assert_single_node_only();
4593
4594         mypnn = getpnn(ctdb);
4595
4596         ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
4597         if (ret != 0) {
4598                 printf("Failed to get the recmaster\n");
4599                 return 1;
4600         }
4601
4602         if (recmaster != mypnn) {
4603                 printf("this node is not the recmaster\n");
4604                 return 1;
4605         }
4606
4607         printf("this node is the recmaster\n");
4608         return 0;
4609 }
4610
4611 /*
4612   ping a node
4613  */
4614 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4615 {
4616         int ret;
4617         struct timeval tv = timeval_current();
4618         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4619         if (ret == -1) {
4620                 printf("Unable to get ping response from node %u\n", options.pnn);
4621                 return -1;
4622         } else {
4623                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4624                        options.pnn, timeval_elapsed(&tv), ret);
4625         }
4626         return 0;
4627 }
4628
4629
4630 /*
4631   get a node's runstate
4632  */
4633 static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
4634 {
4635         int ret;
4636         enum ctdb_runstate runstate;
4637
4638         ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
4639         if (ret == -1) {
4640                 printf("Unable to get runstate response from node %u\n",
4641                        options.pnn);
4642                 return -1;
4643         } else {
4644                 bool found = true;
4645                 enum ctdb_runstate t;
4646                 int i;
4647                 for (i=0; i<argc; i++) {
4648                         found = false;
4649                         t = runstate_from_string(argv[i]);
4650                         if (t == CTDB_RUNSTATE_UNKNOWN) {
4651                                 printf("Invalid run state (%s)\n", argv[i]);
4652                                 return -1;
4653                         }
4654
4655                         if (t == runstate) {
4656                                 found = true;
4657                                 break;
4658                         }
4659                 }
4660
4661                 if (!found) {
4662                         printf("CTDB not in required run state (got %s)\n", 
4663                                runstate_to_string((enum ctdb_runstate)runstate));
4664                         return -1;
4665                 }
4666         }
4667
4668         printf("%s\n", runstate_to_string(runstate));
4669         return 0;
4670 }
4671
4672
4673 /*
4674   get a tunable
4675  */
4676 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4677 {
4678         const char *name;
4679         uint32_t value;
4680         int ret;
4681
4682         if (argc < 1) {
4683                 usage();
4684         }
4685
4686         name = argv[0];
4687         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4688         if (ret != 0) {
4689                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4690                 return -1;
4691         }
4692
4693         printf("%-23s = %u\n", name, value);
4694         return 0;
4695 }
4696
4697 /*
4698   set a tunable
4699  */
4700 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4701 {
4702         const char *name;
4703         uint32_t value;
4704         int ret;
4705
4706         if (argc < 2) {
4707                 usage();
4708         }
4709
4710         name = argv[0];
4711         value = strtoul(argv[1], NULL, 0);
4712
4713         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4714         if (ret == -1) {
4715                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4716                 return -1;
4717         }
4718         return 0;
4719 }
4720
4721 /*
4722   list all tunables
4723  */
4724 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4725 {
4726         uint32_t count;
4727         const char **list;
4728         int ret, i;
4729
4730         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4731         if (ret == -1) {
4732                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4733                 return -1;
4734         }
4735
4736         for (i=0;i<count;i++) {
4737                 control_getvar(ctdb, 1, &list[i]);
4738         }
4739
4740         talloc_free(list);
4741         
4742         return 0;
4743 }
4744
4745 /*
4746   display debug level on a node
4747  */
4748 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4749 {
4750         int ret;
4751         int32_t level;
4752
4753         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4754         if (ret != 0) {
4755                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4756                 return ret;
4757         } else {
4758                 if (options.machinereadable){
4759                         printf(":Name:Level:\n");
4760                         printf(":%s:%d:\n",get_debug_by_level(level),level);
4761                 } else {
4762                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
4763                 }
4764         }
4765         return 0;
4766 }
4767
4768 /*
4769   display reclock file of a node
4770  */
4771 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4772 {
4773         int ret;
4774         const char *reclock;
4775
4776         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4777         if (ret != 0) {
4778                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4779                 return ret;
4780         } else {
4781                 if (options.machinereadable){
4782                         if (reclock != NULL) {
4783                                 printf("%s", reclock);
4784                         }
4785                 } else {
4786                         if (reclock == NULL) {
4787                                 printf("No reclock file used.\n");
4788                         } else {
4789                                 printf("Reclock file:%s\n", reclock);
4790                         }
4791                 }
4792         }
4793         return 0;
4794 }
4795
4796 /*
4797   set the reclock file of a node
4798  */
4799 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4800 {
4801         int ret;
4802         const char *reclock;
4803
4804         if (argc == 0) {
4805                 reclock = NULL;
4806         } else if (argc == 1) {
4807                 reclock = argv[0];
4808         } else {
4809                 usage();
4810         }
4811
4812         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4813         if (ret != 0) {
4814                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4815                 return ret;
4816         }
4817         return 0;
4818 }
4819
4820 /*
4821   set the natgw state on/off
4822  */
4823 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4824 {
4825         int ret;
4826         uint32_t natgwstate;
4827
4828         if (argc == 0) {
4829                 usage();
4830         }
4831
4832         if (!strcmp(argv[0], "on")) {
4833                 natgwstate = 1;
4834         } else if (!strcmp(argv[0], "off")) {
4835                 natgwstate = 0;
4836         } else {
4837                 usage();
4838         }
4839
4840         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4841         if (ret != 0) {
4842                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4843                 return ret;
4844         }
4845
4846         return 0;
4847 }
4848
4849 /*
4850   set the lmaster role on/off
4851  */
4852 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4853 {
4854         int ret;
4855         uint32_t lmasterrole;
4856
4857         if (argc == 0) {
4858                 usage();
4859         }
4860
4861         if (!strcmp(argv[0], "on")) {
4862                 lmasterrole = 1;
4863         } else if (!strcmp(argv[0], "off")) {
4864                 lmasterrole = 0;
4865         } else {
4866                 usage();
4867         }
4868
4869         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
4870         if (ret != 0) {
4871                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
4872                 return ret;
4873         }
4874
4875         return 0;
4876 }
4877
4878 /*
4879   set the recmaster role on/off
4880  */
4881 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4882 {
4883         int ret;
4884         uint32_t recmasterrole;
4885
4886         if (argc == 0) {
4887                 usage();
4888         }
4889
4890         if (!strcmp(argv[0], "on")) {
4891                 recmasterrole = 1;
4892         } else if (!strcmp(argv[0], "off")) {
4893                 recmasterrole = 0;
4894         } else {
4895                 usage();
4896         }
4897
4898         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
4899         if (ret != 0) {
4900                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
4901                 return ret;
4902         }
4903
4904         return 0;
4905 }
4906
4907 /*
4908   set debug level on a node or all nodes
4909  */
4910 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4911 {
4912         int i, ret;
4913         int32_t level;
4914
4915         if (argc == 0) {
4916                 printf("You must specify the debug level. Valid levels are:\n");
4917                 for (i=0; debug_levels[i].description != NULL; i++) {
4918                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4919                 }
4920
4921                 return 0;
4922         }
4923
4924         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
4925                 level = get_debug_by_desc(argv[0]);
4926         } else {
4927                 level = strtol(argv[0], NULL, 0);
4928         }
4929
4930         for (i=0; debug_levels[i].description != NULL; i++) {
4931                 if (level == debug_levels[i].level) {
4932                         break;
4933                 }
4934         }
4935         if (debug_levels[i].description == NULL) {
4936                 printf("Invalid debug level, must be one of\n");
4937                 for (i=0; debug_levels[i].description != NULL; i++) {
4938                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4939                 }
4940                 return -1;
4941         }
4942
4943         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
4944         if (ret != 0) {
4945                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
4946         }
4947         return 0;
4948 }
4949
4950
4951 /*
4952   thaw a node
4953  */
4954 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
4955 {
4956         int ret;
4957         uint32_t priority;
4958         
4959         if (argc == 1) {
4960                 priority = strtol(argv[0], NULL, 0);
4961         } else {
4962                 priority = 0;
4963         }
4964         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
4965
4966         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
4967         if (ret != 0) {
4968                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
4969         }               
4970         return 0;
4971 }
4972
4973
4974 /*
4975   attach to a database
4976  */
4977 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
4978 {
4979         const char *db_name;
4980         struct ctdb_db_context *ctdb_db;
4981         bool persistent = false;
4982
4983         if (argc < 1) {
4984                 usage();
4985         }
4986         db_name = argv[0];
4987         if (argc > 2) {
4988                 usage();
4989         }
4990         if (argc == 2) {
4991                 if (strcmp(argv[1], "persistent") != 0) {
4992                         usage();
4993                 }
4994                 persistent = true;
4995         }
4996
4997         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4998         if (ctdb_db == NULL) {
4999                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
5000                 return -1;
5001         }
5002
5003         return 0;
5004 }
5005
5006 /*
5007   set db priority
5008  */
5009 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5010 {
5011         struct ctdb_db_priority db_prio;
5012         int ret;
5013
5014         if (argc < 2) {
5015                 usage();
5016         }
5017
5018         db_prio.db_id    = strtoul(argv[0], NULL, 0);
5019         db_prio.priority = strtoul(argv[1], NULL, 0);
5020
5021         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
5022         if (ret != 0) {
5023                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
5024                 return -1;
5025         }
5026
5027         return 0;
5028 }
5029
5030 /*
5031   get db priority
5032  */
5033 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
5034 {
5035         uint32_t db_id, priority;
5036         int ret;
5037
5038         if (argc < 1) {
5039                 usage();
5040         }
5041
5042         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5043                 return -1;
5044         }
5045
5046         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
5047         if (ret != 0) {
5048                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
5049                 return -1;
5050         }
5051
5052         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
5053
5054         return 0;
5055 }
5056
5057 /*
5058   set the sticky records capability for a database
5059  */
5060 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
5061 {
5062         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5063         uint32_t db_id;
5064         int ret;
5065
5066         if (argc < 1) {
5067                 usage();
5068         }
5069
5070         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5071                 return -1;
5072         }
5073
5074         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
5075         if (ret != 0) {
5076                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
5077                 talloc_free(tmp_ctx);
5078                 return -1;
5079         }
5080
5081         talloc_free(tmp_ctx);
5082         return 0;
5083 }
5084
5085 /*
5086   set the readonly capability for a database
5087  */
5088 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
5089 {
5090         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5091         uint32_t db_id;
5092         int ret;
5093
5094         if (argc < 1) {
5095                 usage();
5096         }
5097
5098         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5099                 return -1;
5100         }
5101
5102         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
5103         if (ret != 0) {
5104                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
5105                 talloc_free(tmp_ctx);
5106                 return -1;
5107         }
5108
5109         talloc_free(tmp_ctx);
5110         return 0;
5111 }
5112
5113 /*
5114   get db seqnum
5115  */
5116 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
5117 {
5118         uint32_t db_id;
5119         uint64_t seqnum;
5120         int ret;
5121
5122         if (argc < 1) {
5123                 usage();
5124         }
5125
5126         if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
5127                 return -1;
5128         }
5129
5130         ret = ctdb_ctrl_getdbseqnum(ctdb, TIMELIMIT(), options.pnn, db_id, &seqnum);
5131         if (ret != 0) {
5132                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
5133                 return -1;
5134         }
5135
5136         printf("Sequence number:%lld\n", (long long)seqnum);
5137
5138         return 0;
5139 }
5140
5141 /*
5142   run an eventscript on a node
5143  */
5144 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
5145 {
5146         TDB_DATA data;
5147         int ret;
5148         int32_t res;
5149         char *errmsg;
5150         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5151
5152         if (argc != 1) {
5153                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5154                 return -1;
5155         }
5156
5157         data.dptr = (unsigned char *)discard_const(argv[0]);
5158         data.dsize = strlen((char *)data.dptr) + 1;
5159
5160         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
5161
5162         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
5163                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
5164         if (ret != 0 || res != 0) {
5165                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
5166                 talloc_free(tmp_ctx);
5167                 return -1;
5168         }
5169         talloc_free(tmp_ctx);
5170         return 0;
5171 }
5172
5173 #define DB_VERSION 1
5174 #define MAX_DB_NAME 64
5175 struct db_file_header {
5176         unsigned long version;
5177         time_t timestamp;
5178         unsigned long persistent;
5179         unsigned long size;
5180         const char name[MAX_DB_NAME];
5181 };
5182
5183 struct backup_data {
5184         struct ctdb_marshall_buffer *records;
5185         uint32_t len;
5186         uint32_t total;
5187         bool traverse_error;
5188 };
5189
5190 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
5191 {
5192         struct backup_data *bd = talloc_get_type(private, struct backup_data);
5193         struct ctdb_rec_data *rec;
5194
5195         /* add the record */
5196         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
5197         if (rec == NULL) {
5198                 bd->traverse_error = true;
5199                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
5200                 return -1;
5201         }
5202         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
5203         if (bd->records == NULL) {
5204                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
5205                 bd->traverse_error = true;
5206                 return -1;
5207         }
5208         bd->records->count++;
5209         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
5210         bd->len += rec->length;
5211         talloc_free(rec);
5212
5213         bd->total++;
5214         return 0;
5215 }
5216
5217 /*
5218  * backup a database to a file 
5219  */
5220 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
5221 {
5222         const char *db_name;
5223         int ret;
5224         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5225         struct db_file_header dbhdr;
5226         struct ctdb_db_context *ctdb_db;
5227         struct backup_data *bd;
5228         int fh = -1;
5229         int status = -1;
5230         const char *reason = NULL;
5231         uint32_t db_id;
5232         uint8_t flags;
5233
5234         assert_single_node_only();
5235
5236         if (argc != 2) {
5237                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5238                 return -1;
5239         }
5240
5241         if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
5242                 return -1;
5243         }
5244
5245         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
5246                                     db_id, tmp_ctx, &reason);
5247         if (ret != 0) {
5248                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
5249                                  argv[0]));
5250                 talloc_free(tmp_ctx);
5251                 return -1;
5252         }
5253         if (reason) {
5254                 uint32_t allow_unhealthy = 0;
5255
5256                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
5257                                       "AllowUnhealthyDBRead",
5258                                       &allow_unhealthy);
5259
5260                 if (allow_unhealthy != 1) {
5261                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
5262                                          argv[0], reason));
5263
5264                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
5265                                          allow_unhealthy));
5266                         talloc_free(tmp_ctx);
5267                         return -1;
5268                 }
5269
5270                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
5271                                      argv[0], argv[0]));
5272                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
5273                                      "tunnable AllowUnhealthyDBRead = %u\n",
5274                                      allow_unhealthy));
5275         }
5276
5277         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5278         if (ctdb_db == NULL) {
5279                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
5280                 talloc_free(tmp_ctx);
5281                 return -1;
5282         }
5283
5284
5285         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
5286         if (ret == -1) {
5287                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
5288                 talloc_free(tmp_ctx);
5289                 return -1;
5290         }
5291
5292
5293         bd = talloc_zero(tmp_ctx, struct backup_data);
5294         if (bd == NULL) {
5295                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
5296                 talloc_free(tmp_ctx);
5297                 return -1;
5298         }
5299
5300         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
5301         if (bd->records == NULL) {
5302                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
5303                 talloc_free(tmp_ctx);
5304                 return -1;
5305         }
5306
5307         bd->len = offsetof(struct ctdb_marshall_buffer, data);
5308         bd->records->db_id = ctdb_db->db_id;
5309         /* traverse the database collecting all records */
5310         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
5311             bd->traverse_error) {
5312                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5313                 talloc_free(tmp_ctx);
5314                 return -1;              
5315         }
5316
5317         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5318
5319
5320         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5321         if (fh == -1) {
5322                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5323                 talloc_free(tmp_ctx);
5324                 return -1;
5325         }
5326
5327         ZERO_STRUCT(dbhdr);
5328         dbhdr.version = DB_VERSION;
5329         dbhdr.timestamp = time(NULL);
5330         dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
5331         dbhdr.size = bd->len;
5332         if (strlen(argv[0]) >= MAX_DB_NAME) {
5333                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5334                 goto done;
5335         }
5336         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME-1);
5337         ret = write(fh, &dbhdr, sizeof(dbhdr));
5338         if (ret == -1) {
5339                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5340                 goto done;
5341         }
5342         ret = write(fh, bd->records, bd->len);
5343         if (ret == -1) {
5344                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5345                 goto done;
5346         }
5347
5348         status = 0;
5349 done:
5350         if (fh != -1) {
5351                 ret = close(fh);
5352                 if (ret == -1) {
5353                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5354                 }
5355         }
5356
5357         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5358
5359         talloc_free(tmp_ctx);
5360         return status;
5361 }
5362
5363 /*
5364  * restore a database from a file 
5365  */
5366 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5367 {
5368         int ret;
5369         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5370         TDB_DATA outdata;
5371         TDB_DATA data;
5372         struct db_file_header dbhdr;
5373         struct ctdb_db_context *ctdb_db;
5374         struct ctdb_node_map *nodemap=NULL;
5375         struct ctdb_vnn_map *vnnmap=NULL;
5376         int i, fh;
5377         struct ctdb_control_wipe_database w;
5378         uint32_t *nodes;
5379         uint32_t generation;
5380         struct tm *tm;
5381         char tbuf[100];
5382         char *dbname;
5383
5384         assert_single_node_only();
5385
5386         if (argc < 1 || argc > 2) {
5387                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5388                 return -1;
5389         }
5390
5391         fh = open(argv[0], O_RDONLY);
5392         if (fh == -1) {
5393                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5394                 talloc_free(tmp_ctx);
5395                 return -1;
5396         }
5397
5398         read(fh, &dbhdr, sizeof(dbhdr));
5399         if (dbhdr.version != DB_VERSION) {
5400                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5401                 close(fh);
5402                 talloc_free(tmp_ctx);
5403                 return -1;
5404         }
5405
5406         dbname = discard_const(dbhdr.name);
5407         if (argc == 2) {
5408                 dbname = discard_const(argv[1]);
5409         }
5410
5411         outdata.dsize = dbhdr.size;
5412         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5413         if (outdata.dptr == NULL) {
5414                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5415                 close(fh);
5416                 talloc_free(tmp_ctx);
5417                 return -1;
5418         }               
5419         read(fh, outdata.dptr, outdata.dsize);
5420         close(fh);
5421
5422         tm = localtime(&dbhdr.timestamp);
5423         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5424         printf("Restoring database '%s' from backup @ %s\n",
5425                 dbname, tbuf);
5426
5427
5428         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5429         if (ctdb_db == NULL) {
5430                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5431                 talloc_free(tmp_ctx);
5432                 return -1;
5433         }
5434
5435         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5436         if (ret != 0) {
5437                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5438                 talloc_free(tmp_ctx);
5439                 return ret;
5440         }
5441
5442
5443         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5444         if (ret != 0) {
5445                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5446                 talloc_free(tmp_ctx);
5447                 return ret;
5448         }
5449
5450         /* freeze all nodes */
5451         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5452         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5453                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5454                                         nodes, i,
5455                                         TIMELIMIT(),
5456                                         false, tdb_null,
5457                                         NULL, NULL,
5458                                         NULL) != 0) {
5459                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5460                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5461                         talloc_free(tmp_ctx);
5462                         return -1;
5463                 }
5464         }
5465
5466         generation = vnnmap->generation;
5467         data.dptr = (void *)&generation;
5468         data.dsize = sizeof(generation);
5469
5470         /* start a cluster wide transaction */
5471         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5472         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5473                                         nodes, 0,
5474                                         TIMELIMIT(), false, data,
5475                                         NULL, NULL,
5476                                         NULL) != 0) {
5477                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5478                 return -1;
5479         }
5480
5481
5482         w.db_id = ctdb_db->db_id;
5483         w.transaction_id = generation;
5484
5485         data.dptr = (void *)&w;
5486         data.dsize = sizeof(w);
5487
5488         /* wipe all the remote databases. */
5489         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5490         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5491                                         nodes, 0,
5492                                         TIMELIMIT(), false, data,
5493                                         NULL, NULL,
5494                                         NULL) != 0) {
5495                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5496                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5497                 talloc_free(tmp_ctx);
5498                 return -1;
5499         }
5500         
5501         /* push the database */
5502         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5503         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5504                                         nodes, 0,
5505                                         TIMELIMIT(), false, outdata,
5506                                         NULL, NULL,
5507                                         NULL) != 0) {
5508                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5509                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5510                 talloc_free(tmp_ctx);
5511                 return -1;
5512         }
5513
5514         data.dptr = (void *)&ctdb_db->db_id;
5515         data.dsize = sizeof(ctdb_db->db_id);
5516
5517         /* mark the database as healthy */
5518         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5519         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5520                                         nodes, 0,
5521                                         TIMELIMIT(), false, data,
5522                                         NULL, NULL,
5523                                         NULL) != 0) {
5524                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5525                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5526                 talloc_free(tmp_ctx);
5527                 return -1;
5528         }
5529
5530         data.dptr = (void *)&generation;
5531         data.dsize = sizeof(generation);
5532
5533         /* commit all the changes */
5534         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5535                                         nodes, 0,
5536                                         TIMELIMIT(), false, data,
5537                                         NULL, NULL,
5538                                         NULL) != 0) {
5539                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5540                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5541                 talloc_free(tmp_ctx);
5542                 return -1;
5543         }
5544
5545
5546         /* thaw all nodes */
5547         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5548         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5549                                         nodes, 0,
5550                                         TIMELIMIT(),
5551                                         false, tdb_null,
5552                                         NULL, NULL,
5553                                         NULL) != 0) {
5554                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5555                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5556                 talloc_free(tmp_ctx);
5557                 return -1;
5558         }
5559
5560
5561         talloc_free(tmp_ctx);
5562         return 0;
5563 }
5564
5565 /*
5566  * dump a database backup from a file
5567  */
5568 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5569 {
5570         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5571         TDB_DATA outdata;
5572         struct db_file_header dbhdr;
5573         int i, fh;
5574         struct tm *tm;
5575         char tbuf[100];
5576         struct ctdb_rec_data *rec = NULL;
5577         struct ctdb_marshall_buffer *m;
5578         struct ctdb_dump_db_context c;
5579
5580         assert_single_node_only();
5581
5582         if (argc != 1) {
5583                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5584                 return -1;
5585         }
5586
5587         fh = open(argv[0], O_RDONLY);
5588         if (fh == -1) {
5589                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5590                 talloc_free(tmp_ctx);
5591                 return -1;
5592         }
5593
5594         read(fh, &dbhdr, sizeof(dbhdr));
5595         if (dbhdr.version != DB_VERSION) {
5596                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5597                 close(fh);
5598                 talloc_free(tmp_ctx);
5599                 return -1;
5600         }
5601
5602         outdata.dsize = dbhdr.size;
5603         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5604         if (outdata.dptr == NULL) {
5605                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5606                 close(fh);
5607                 talloc_free(tmp_ctx);
5608                 return -1;
5609         }
5610         read(fh, outdata.dptr, outdata.dsize);
5611         close(fh);
5612         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5613
5614         tm = localtime(&dbhdr.timestamp);
5615         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5616         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5617                 dbhdr.name, m->db_id, tbuf);
5618
5619         ZERO_STRUCT(c);
5620         c.f = stdout;
5621         c.printemptyrecords = (bool)options.printemptyrecords;
5622         c.printdatasize = (bool)options.printdatasize;
5623         c.printlmaster = false;
5624         c.printhash = (bool)options.printhash;
5625         c.printrecordflags = (bool)options.printrecordflags;
5626
5627         for (i=0; i < m->count; i++) {
5628                 uint32_t reqid = 0;
5629                 TDB_DATA key, data;
5630
5631                 /* we do not want the header splitted, so we pass NULL*/
5632                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5633                                               NULL, &key, &data);
5634
5635                 ctdb_dumpdb_record(ctdb, key, data, &c);
5636         }
5637
5638         printf("Dumped %d records\n", i);
5639         talloc_free(tmp_ctx);
5640         return 0;
5641 }
5642
5643 /*
5644  * wipe a database from a file
5645  */
5646 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5647                           const char **argv)
5648 {
5649         const char *db_name;
5650         int ret;
5651         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5652         TDB_DATA data;
5653         struct ctdb_db_context *ctdb_db;
5654         struct ctdb_node_map *nodemap = NULL;
5655         struct ctdb_vnn_map *vnnmap = NULL;
5656         int i;
5657         struct ctdb_control_wipe_database w;
5658         uint32_t *nodes;
5659         uint32_t generation;
5660         uint8_t flags;
5661
5662         assert_single_node_only();
5663
5664         if (argc != 1) {
5665                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5666                 return -1;
5667         }
5668
5669         if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
5670                 return -1;
5671         }
5672
5673         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5674         if (ctdb_db == NULL) {
5675                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5676                                   argv[0]));
5677                 talloc_free(tmp_ctx);
5678                 return -1;
5679         }
5680
5681         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5682                                    &nodemap);
5683         if (ret != 0) {
5684                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5685                                   options.pnn));
5686                 talloc_free(tmp_ctx);
5687                 return ret;
5688         }
5689
5690         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5691                                   &vnnmap);
5692         if (ret != 0) {
5693                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5694                                   options.pnn));
5695                 talloc_free(tmp_ctx);
5696                 return ret;
5697         }
5698
5699         /* freeze all nodes */
5700         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5701         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5702                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5703                                                 nodes, i,
5704                                                 TIMELIMIT(),
5705                                                 false, tdb_null,
5706                                                 NULL, NULL,
5707                                                 NULL);
5708                 if (ret != 0) {
5709                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5710                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5711                                              CTDB_RECOVERY_ACTIVE);
5712                         talloc_free(tmp_ctx);
5713                         return -1;
5714                 }
5715         }
5716
5717         generation = vnnmap->generation;
5718         data.dptr = (void *)&generation;
5719         data.dsize = sizeof(generation);
5720
5721         /* start a cluster wide transaction */
5722         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5723         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5724                                         nodes, 0,
5725                                         TIMELIMIT(), false, data,
5726                                         NULL, NULL,
5727                                         NULL);
5728         if (ret!= 0) {
5729                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5730                                   "transactions.\n"));
5731                 return -1;
5732         }
5733
5734         w.db_id = ctdb_db->db_id;
5735         w.transaction_id = generation;
5736
5737         data.dptr = (void *)&w;
5738         data.dsize = sizeof(w);
5739
5740         /* wipe all the remote databases. */
5741         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5742         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5743                                         nodes, 0,
5744                                         TIMELIMIT(), false, data,
5745                                         NULL, NULL,
5746                                         NULL) != 0) {
5747                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5748                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5749                 talloc_free(tmp_ctx);
5750                 return -1;
5751         }
5752
5753         data.dptr = (void *)&ctdb_db->db_id;
5754         data.dsize = sizeof(ctdb_db->db_id);
5755
5756         /* mark the database as healthy */
5757         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5758         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5759                                         nodes, 0,
5760                                         TIMELIMIT(), false, data,
5761                                         NULL, NULL,
5762                                         NULL) != 0) {
5763                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5764                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5765                 talloc_free(tmp_ctx);
5766                 return -1;
5767         }
5768
5769         data.dptr = (void *)&generation;
5770         data.dsize = sizeof(generation);
5771
5772         /* commit all the changes */
5773         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5774                                         nodes, 0,
5775                                         TIMELIMIT(), false, data,
5776                                         NULL, NULL,
5777                                         NULL) != 0) {
5778                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5779                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5780                 talloc_free(tmp_ctx);
5781                 return -1;
5782         }
5783
5784         /* thaw all nodes */
5785         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5786         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5787                                         nodes, 0,
5788                                         TIMELIMIT(),
5789                                         false, tdb_null,
5790                                         NULL, NULL,
5791                                         NULL) != 0) {
5792                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5793                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5794                 talloc_free(tmp_ctx);
5795                 return -1;
5796         }
5797
5798         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
5799
5800         talloc_free(tmp_ctx);
5801         return 0;
5802 }
5803
5804 /*
5805   dump memory usage
5806  */
5807 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5808 {
5809         TDB_DATA data;
5810         int ret;
5811         int32_t res;
5812         char *errmsg;
5813         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5814         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
5815                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
5816         if (ret != 0 || res != 0) {
5817                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
5818                 talloc_free(tmp_ctx);
5819                 return -1;
5820         }
5821         write(1, data.dptr, data.dsize);
5822         talloc_free(tmp_ctx);
5823         return 0;
5824 }
5825
5826 /*
5827   handler for memory dumps
5828 */
5829 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5830                              TDB_DATA data, void *private_data)
5831 {
5832         write(1, data.dptr, data.dsize);
5833         exit(0);
5834 }
5835
5836 /*
5837   dump memory usage on the recovery daemon
5838  */
5839 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5840 {
5841         int ret;
5842         TDB_DATA data;
5843         struct srvid_request rd;
5844
5845         rd.pnn = ctdb_get_pnn(ctdb);
5846         rd.srvid = getpid();
5847
5848         /* register a message port for receiveing the reply so that we
5849            can receive the reply
5850         */
5851         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
5852
5853
5854         data.dptr = (uint8_t *)&rd;
5855         data.dsize = sizeof(rd);
5856
5857         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
5858         if (ret != 0) {
5859                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5860                 return -1;
5861         }
5862
5863         /* this loop will terminate when we have received the reply */
5864         while (1) {     
5865                 event_loop_once(ctdb->ev);
5866         }
5867
5868         return 0;
5869 }
5870
5871 /*
5872   send a message to a srvid
5873  */
5874 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
5875 {
5876         unsigned long srvid;
5877         int ret;
5878         TDB_DATA data;
5879
5880         if (argc < 2) {
5881                 usage();
5882         }
5883
5884         srvid      = strtoul(argv[0], NULL, 0);
5885
5886         data.dptr = (uint8_t *)discard_const(argv[1]);
5887         data.dsize= strlen(argv[1]);
5888
5889         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
5890         if (ret != 0) {
5891                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5892                 return -1;
5893         }
5894
5895         return 0;
5896 }
5897
5898 /*
5899   handler for msglisten
5900 */
5901 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5902                              TDB_DATA data, void *private_data)
5903 {
5904         int i;
5905
5906         printf("Message received: ");
5907         for (i=0;i<data.dsize;i++) {
5908                 printf("%c", data.dptr[i]);
5909         }
5910         printf("\n");
5911 }
5912
5913 /*
5914   listen for messages on a messageport
5915  */
5916 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
5917 {
5918         uint64_t srvid;
5919
5920         srvid = getpid();
5921
5922         /* register a message port and listen for messages
5923         */
5924         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
5925         printf("Listening for messages on srvid:%d\n", (int)srvid);
5926
5927         while (1) {     
5928                 event_loop_once(ctdb->ev);
5929         }
5930
5931         return 0;
5932 }
5933
5934 /*
5935   list all nodes in the cluster
5936   we parse the nodes file directly
5937  */
5938 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
5939 {
5940         TALLOC_CTX *mem_ctx = talloc_new(NULL);
5941         struct pnn_node *pnn_nodes;
5942         struct pnn_node *pnn_node;
5943
5944         assert_single_node_only();
5945
5946         pnn_nodes = read_nodes_file(mem_ctx);
5947         if (pnn_nodes == NULL) {
5948                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
5949                 talloc_free(mem_ctx);
5950                 return -1;
5951         }
5952
5953         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
5954                 ctdb_sock_addr addr;
5955                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
5956                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
5957                         talloc_free(mem_ctx);
5958                         return -1;
5959                 }
5960                 if (options.machinereadable){
5961                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
5962                 } else {
5963                         printf("%s\n", pnn_node->addr);
5964                 }
5965         }
5966         talloc_free(mem_ctx);
5967
5968         return 0;
5969 }
5970
5971 /*
5972   reload the nodes file on the local node
5973  */
5974 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
5975 {
5976         int i, ret;
5977         int mypnn;
5978         struct ctdb_node_map *nodemap=NULL;
5979
5980         assert_single_node_only();
5981
5982         mypnn = ctdb_get_pnn(ctdb);
5983
5984         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
5985         if (ret != 0) {
5986                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
5987                 return ret;
5988         }
5989
5990         /* reload the nodes file on all remote nodes */
5991         for (i=0;i<nodemap->num;i++) {
5992                 if (nodemap->nodes[i].pnn == mypnn) {
5993                         continue;
5994                 }
5995                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
5996                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
5997                         nodemap->nodes[i].pnn);
5998                 if (ret != 0) {
5999                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
6000                 }
6001         }
6002
6003         /* reload the nodes file on the local node */
6004         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
6005         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
6006         if (ret != 0) {
6007                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
6008         }
6009
6010         /* initiate a recovery */
6011         control_recover(ctdb, argc, argv);
6012
6013         return 0;
6014 }
6015
6016
6017 static const struct {
6018         const char *name;
6019         int (*fn)(struct ctdb_context *, int, const char **);
6020         bool auto_all;
6021         bool without_daemon; /* can be run without daemon running ? */
6022         const char *msg;
6023         const char *args;
6024 } ctdb_commands[] = {
6025         { "version",         control_version,           true,   true,   "show version of ctdb" },
6026         { "status",          control_status,            true,   false,  "show node status" },
6027         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
6028         { "ping",            control_ping,              true,   false,  "ping all nodes" },
6029         { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
6030         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
6031         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
6032         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
6033         { "statistics",      control_statistics,        false,  false, "show statistics" },
6034         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
6035         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
6036         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
6037         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
6038         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
6039         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
6040         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
6041         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
6042         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
6043         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
6044         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
6045         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
6046         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
6047         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
6048         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
6049         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
6050         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
6051         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
6052         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
6053         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
6054         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "[<level>] [recoverd]" },
6055         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer", "[recoverd]" },
6056         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
6057         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
6058         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
6059         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
6060         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
6061         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
6062         { "stop",            control_stop,              true,   false,  "stop a node" },
6063         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
6064         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime>"},
6065         { "unban",           control_unban,             true,   false,  "unban a node" },
6066         { "showban",         control_showban,           true,   false,  "show ban information"},
6067         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
6068         { "recover",         control_recover,           true,   false,  "force recovery" },
6069         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
6070         { "ipreallocate",    control_ipreallocate,      false,  false,  "force the recovery daemon to perform a ip reallocation procedure" },
6071         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
6072         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
6073         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "[<srcip:port> <dstip:port>]" },
6074         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
6075         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
6076         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
6077         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
6078
6079         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
6080
6081         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
6082         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
6083         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
6084         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
6085         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
6086         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
6087         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
6088         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
6089         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
6090         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
6091         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
6092         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
6093         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
6094         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
6095         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
6096         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
6097         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
6098         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
6099         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
6100         { "enablescript",     control_enablescript,  true,      false, "enable an eventscript", "<script>"},
6101         { "disablescript",    control_disablescript,  true,     false, "disable an eventscript", "<script>"},
6102         { "natgwlist",        control_natgwlist,        true,   false, "show the nodes belonging to this natgw configuration"},
6103         { "xpnn",             control_xpnn,             false,  true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
6104         { "getreclock",       control_getreclock,       true,   false, "Show the reclock file of a node"},
6105         { "setreclock",       control_setreclock,       true,   false, "Set/clear the reclock file of a node", "[filename]"},
6106         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
6107         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
6108         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
6109         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
6110         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
6111         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
6112         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
6113         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
6114         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
6115         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
6116         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
6117         { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
6118         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
6119         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data+header>" },
6120         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
6121         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
6122         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
6123         { "rebalancenode",     control_rebalancenode,   false,  false, "mark nodes as forced IP rebalancing targets", "[<pnn-list>]"},
6124         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
6125         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status", "[<pnn-list>]" },
6126         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
6127         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on specified nodes" , "[<pnn-list>]" },
6128         { "ipiface",         control_ipiface,           false,  true,  "Find which interface an ip address is hosted on", "<ip>" },
6129 };
6130
6131 /*
6132   show usage message
6133  */
6134 static void usage(void)
6135 {
6136         int i;
6137         printf(
6138 "Usage: ctdb [options] <control>\n" \
6139 "Options:\n" \
6140 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
6141 "   -Y                 generate machinereadable output\n"
6142 "   -v                 generate verbose output\n"
6143 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
6144         printf("Controls:\n");
6145         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6146                 printf("  %-15s %-27s  %s\n", 
6147                        ctdb_commands[i].name, 
6148                        ctdb_commands[i].args?ctdb_commands[i].args:"",
6149                        ctdb_commands[i].msg);
6150         }
6151         exit(1);
6152 }
6153
6154
6155 static void ctdb_alarm(int sig)
6156 {
6157         printf("Maximum runtime exceeded - exiting\n");
6158         _exit(ERR_TIMEOUT);
6159 }
6160
6161 /*
6162   main program
6163 */
6164 int main(int argc, const char *argv[])
6165 {
6166         struct ctdb_context *ctdb;
6167         char *nodestring = NULL;
6168         struct poptOption popt_options[] = {
6169                 POPT_AUTOHELP
6170                 POPT_CTDB_CMDLINE
6171                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
6172                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
6173                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
6174                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
6175                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
6176                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
6177                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
6178                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
6179                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
6180                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
6181                 POPT_TABLEEND
6182         };
6183         int opt;
6184         const char **extra_argv;
6185         int extra_argc = 0;
6186         int ret=-1, i;
6187         poptContext pc;
6188         struct event_context *ev;
6189         const char *control;
6190
6191         setlinebuf(stdout);
6192         
6193         /* set some defaults */
6194         options.maxruntime = 0;
6195         options.timelimit = 10;
6196         options.pnn = CTDB_CURRENT_NODE;
6197
6198         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
6199
6200         while ((opt = poptGetNextOpt(pc)) != -1) {
6201                 switch (opt) {
6202                 default:
6203                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
6204                                 poptBadOption(pc, 0), poptStrerror(opt)));
6205                         exit(1);
6206                 }
6207         }
6208
6209         /* setup the remaining options for the main program to use */
6210         extra_argv = poptGetArgs(pc);
6211         if (extra_argv) {
6212                 extra_argv++;
6213                 while (extra_argv[extra_argc]) extra_argc++;
6214         }
6215
6216         if (extra_argc < 1) {
6217                 usage();
6218         }
6219
6220         if (options.maxruntime == 0) {
6221                 const char *ctdb_timeout;
6222                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6223                 if (ctdb_timeout != NULL) {
6224                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6225                 } else {
6226                         /* default timeout is 120 seconds */
6227                         options.maxruntime = 120;
6228                 }
6229         }
6230
6231         signal(SIGALRM, ctdb_alarm);
6232         alarm(options.maxruntime);
6233
6234         control = extra_argv[0];
6235
6236         /* Default value for CTDB_BASE - don't override */
6237         setenv("CTDB_BASE", ETCDIR "/ctdb", 0);
6238
6239         ev = event_context_init(NULL);
6240         if (!ev) {
6241                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
6242                 exit(1);
6243         }
6244
6245         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6246                 if (strcmp(control, ctdb_commands[i].name) == 0) {
6247                         break;
6248                 }
6249         }
6250
6251         if (i == ARRAY_SIZE(ctdb_commands)) {
6252                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
6253                 exit(1);
6254         }
6255
6256         if (ctdb_commands[i].without_daemon == true) {
6257                 if (nodestring != NULL) {
6258                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
6259                         exit(1);
6260                 }
6261                 close(2);
6262                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
6263         }
6264
6265         /* initialise ctdb */
6266         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
6267
6268         if (ctdb == NULL) {
6269                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
6270                 exit(1);
6271         }
6272
6273         /* setup the node number(s) to contact */
6274         if (!parse_nodestring(ctdb, ctdb, nodestring, CTDB_CURRENT_NODE, false,
6275                               &options.nodes, &options.pnn)) {
6276                 usage();
6277         }
6278
6279         if (options.pnn == CTDB_CURRENT_NODE) {
6280                 options.pnn = options.nodes[0];
6281         }
6282
6283         if (ctdb_commands[i].auto_all && 
6284             ((options.pnn == CTDB_BROADCAST_ALL) ||
6285              (options.pnn == CTDB_MULTICAST))) {
6286                 int j;
6287
6288                 ret = 0;
6289                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6290                         options.pnn = options.nodes[j];
6291                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6292                 }
6293         } else {
6294                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6295         }
6296
6297         talloc_free(ctdb);
6298         talloc_free(ev);
6299         (void)poptFreeContext(pc);
6300
6301         return ret;
6302
6303 }