tools/ctdb: Improve auto-all settings for some commands
[ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_version.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_client.h"
31 #include "../include/ctdb_private.h"
32 #include "../common/rb_tree.h"
33 #include "db_wrap.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 struct ctdb_connection *ctdb_connection;
40
41 static void usage(void);
42
43 static struct {
44         int timelimit;
45         uint32_t pnn;
46         uint32_t *nodes;
47         int machinereadable;
48         int verbose;
49         int maxruntime;
50         int printemptyrecords;
51         int printdatasize;
52         int printlmaster;
53         int printhash;
54         int printrecordflags;
55 } options;
56
57 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
58 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
59
60 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
61 {
62         printf("CTDB version: %s\n", CTDB_VERSION_STRING);
63         return 0;
64 }
65
66 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
67                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
68                                    "Out of memory in " __location__ )); \
69                 abort();                                                \
70         }} while (0)
71
72 /* Pretty print the flags to a static buffer in human-readable format.
73  * This never returns NULL!
74  */
75 static const char *pretty_print_flags(uint32_t flags)
76 {
77         int j;
78         static const struct {
79                 uint32_t flag;
80                 const char *name;
81         } flag_names[] = {
82                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
83                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
84                 { NODE_FLAGS_BANNED,                "BANNED" },
85                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
86                 { NODE_FLAGS_DELETED,               "DELETED" },
87                 { NODE_FLAGS_STOPPED,               "STOPPED" },
88                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
89         };
90         static char flags_str[512]; /* Big enough to contain all flag names */
91
92         flags_str[0] = '\0';
93         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
94                 if (flags & flag_names[j].flag) {
95                         if (flags_str[0] == '\0') {
96                                 (void) strcpy(flags_str, flag_names[j].name);
97                         } else {
98                                 (void) strcat(flags_str, "|");
99                                 (void) strcat(flags_str, flag_names[j].name);
100                         }
101                 }
102         }
103         if (flags_str[0] == '\0') {
104                 (void) strcpy(flags_str, "OK");
105         }
106
107         return flags_str;
108 }
109
110 static int h2i(char h)
111 {
112         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
113         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
114         return h - '0';
115 }
116
117 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
118 {
119         int i, len;
120         TDB_DATA key = {NULL, 0};
121
122         len = strlen(str);
123         if (len & 0x01) {
124                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
125                 return key;
126         }
127
128         key.dsize = len>>1;
129         key.dptr  = talloc_size(mem_ctx, key.dsize);
130
131         for (i=0; i < len/2; i++) {
132                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
133         }
134         return key;
135 }
136
137 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
138  * that are disconnected or deleted.  If dd_ok is true those nodes are
139  * included in the output list of nodes.  If dd_ok is false, those
140  * nodes are filtered from the "all" case and cause an error if
141  * explicitly specified.
142  */
143 static bool parse_nodestring(struct ctdb_context *ctdb,
144                              const char * nodestring,
145                              uint32_t current_pnn,
146                              bool dd_ok,
147                              uint32_t **nodes,
148                              uint32_t *pnn_mode)
149 {
150         int n;
151         uint32_t i;
152         struct ctdb_node_map *nodemap;
153        
154         *nodes = NULL;
155
156         if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
157                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
158                 exit(10);
159         }
160
161         if (nodestring != NULL) {
162                 *nodes = talloc_array(ctdb, uint32_t, 0);
163                 CTDB_NOMEM_ABORT(*nodes);
164                
165                 n = 0;
166
167                 if (strcmp(nodestring, "all") == 0) {
168                         *pnn_mode = CTDB_BROADCAST_ALL;
169
170                         /* all */
171                         for (i = 0; i < nodemap->num; i++) {
172                                 if ((nodemap->nodes[i].flags & 
173                                      (NODE_FLAGS_DISCONNECTED |
174                                       NODE_FLAGS_DELETED)) && !dd_ok) {
175                                         continue;
176                                 }
177                                 *nodes = talloc_realloc(ctdb, *nodes,
178                                                         uint32_t, n+1);
179                                 CTDB_NOMEM_ABORT(*nodes);
180                                 (*nodes)[n] = i;
181                                 n++;
182                         }
183                 } else {
184                         /* x{,y...} */
185                         char *ns, *tok;
186                        
187                         ns = talloc_strdup(ctdb, nodestring);
188                         tok = strtok(ns, ",");
189                         while (tok != NULL) {
190                                 uint32_t pnn;
191                                 i = (uint32_t)strtoul(tok, NULL, 0);
192                                 if (i >= nodemap->num) {
193                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
194                                         exit(ERR_NONODE);
195                                 }
196                                 if ((nodemap->nodes[i].flags & 
197                                      (NODE_FLAGS_DISCONNECTED |
198                                       NODE_FLAGS_DELETED)) && !dd_ok) {
199                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
200                                         exit(ERR_DISNODE);
201                                 }
202                                 if (!ctdb_getpnn(ctdb_connection, i, &pnn)) {
203                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
204                                         exit(10);
205                                 }
206
207                                 *nodes = talloc_realloc(ctdb, *nodes,
208                                                         uint32_t, n+1);
209                                 CTDB_NOMEM_ABORT(*nodes);
210
211                                 (*nodes)[n] = i;
212                                 n++;
213
214                                 tok = strtok(NULL, ",");
215                         }
216                         talloc_free(ns);
217
218                         if (n == 1) {
219                                 *pnn_mode = (*nodes)[0];
220                         } else {
221                                 *pnn_mode = CTDB_MULTICAST;
222                         }
223                 }
224         } else {
225                 /* default - no nodes specified */
226                 *nodes = talloc_array(ctdb, uint32_t, 1);
227                 CTDB_NOMEM_ABORT(*nodes);
228                 *pnn_mode = CTDB_CURRENT_NODE;
229
230                 if (!ctdb_getpnn(ctdb_connection, current_pnn,
231                                  &((*nodes)[0]))) {
232                         return false;
233                 }
234         }
235
236         ctdb_free_nodemap(nodemap);
237
238         return true;
239 }
240
241 /*
242  check if a database exists
243 */
244 static bool db_exists(struct ctdb_context *ctdb, const char *dbarg, uint32_t *dbid, uint8_t *flags)
245 {
246         int i, ret;
247         struct ctdb_dbid_map *dbmap=NULL;
248         bool dbid_given = false, found = false;
249         uint32_t id;
250         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
251
252         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
253         if (ret != 0) {
254                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
255                 goto fail;
256         }
257
258         if (strncmp(dbarg, "0x", 2) == 0) {
259                 id = strtoul(dbarg, NULL, 0);
260                 dbid_given = true;
261         }
262
263         for(i=0; i<dbmap->num; i++) {
264                 if (dbid_given) {
265                         if (id == dbmap->dbs[i].dbid) {
266                                 found = true;
267                                 break;
268                         }
269                 } else {
270                         const char *name;
271                         ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
272                         if (ret != 0) {
273                                 DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
274                                 goto fail;
275                         }
276
277                         if (strcmp(name, dbarg) == 0) {
278                                 id = dbmap->dbs[i].dbid;
279                                 found = true;
280                                 break;
281                         }
282                 }
283         }
284
285         if (found) {
286                 if (dbid) *dbid = id;
287                 if (flags) *flags = dbmap->dbs[i].flags;
288         } else {
289                 DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
290         }
291
292 fail:
293         talloc_free(tmp_ctx);
294         return found;
295 }
296
297 /*
298   see if a process exists
299  */
300 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
301 {
302         uint32_t pnn, pid;
303         int ret;
304         if (argc < 1) {
305                 usage();
306         }
307
308         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
309                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
310                 return -1;
311         }
312
313         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
314         if (ret == 0) {
315                 printf("%u:%u exists\n", pnn, pid);
316         } else {
317                 printf("%u:%u does not exist\n", pnn, pid);
318         }
319         return ret;
320 }
321
322 /*
323   display statistics structure
324  */
325 static void show_statistics(struct ctdb_statistics *s, int show_header)
326 {
327         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
328         int i;
329         const char *prefix=NULL;
330         int preflen=0;
331         int tmp, days, hours, minutes, seconds;
332         const struct {
333                 const char *name;
334                 uint32_t offset;
335         } fields[] = {
336 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
337                 STATISTICS_FIELD(num_clients),
338                 STATISTICS_FIELD(frozen),
339                 STATISTICS_FIELD(recovering),
340                 STATISTICS_FIELD(num_recoveries),
341                 STATISTICS_FIELD(client_packets_sent),
342                 STATISTICS_FIELD(client_packets_recv),
343                 STATISTICS_FIELD(node_packets_sent),
344                 STATISTICS_FIELD(node_packets_recv),
345                 STATISTICS_FIELD(keepalive_packets_sent),
346                 STATISTICS_FIELD(keepalive_packets_recv),
347                 STATISTICS_FIELD(node.req_call),
348                 STATISTICS_FIELD(node.reply_call),
349                 STATISTICS_FIELD(node.req_dmaster),
350                 STATISTICS_FIELD(node.reply_dmaster),
351                 STATISTICS_FIELD(node.reply_error),
352                 STATISTICS_FIELD(node.req_message),
353                 STATISTICS_FIELD(node.req_control),
354                 STATISTICS_FIELD(node.reply_control),
355                 STATISTICS_FIELD(client.req_call),
356                 STATISTICS_FIELD(client.req_message),
357                 STATISTICS_FIELD(client.req_control),
358                 STATISTICS_FIELD(timeouts.call),
359                 STATISTICS_FIELD(timeouts.control),
360                 STATISTICS_FIELD(timeouts.traverse),
361                 STATISTICS_FIELD(locks.num_calls),
362                 STATISTICS_FIELD(locks.num_current),
363                 STATISTICS_FIELD(locks.num_pending),
364                 STATISTICS_FIELD(locks.num_failed),
365                 STATISTICS_FIELD(total_calls),
366                 STATISTICS_FIELD(pending_calls),
367                 STATISTICS_FIELD(childwrite_calls),
368                 STATISTICS_FIELD(pending_childwrite_calls),
369                 STATISTICS_FIELD(memory_used),
370                 STATISTICS_FIELD(max_hop_count),
371                 STATISTICS_FIELD(total_ro_delegations),
372                 STATISTICS_FIELD(total_ro_revokes),
373         };
374         
375         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
376         seconds = tmp%60;
377         tmp    /= 60;
378         minutes = tmp%60;
379         tmp    /= 60;
380         hours   = tmp%24;
381         tmp    /= 24;
382         days    = tmp;
383
384         if (options.machinereadable){
385                 if (show_header) {
386                         printf("CTDB version:");
387                         printf("Current time of statistics:");
388                         printf("Statistics collected since:");
389                         for (i=0;i<ARRAY_SIZE(fields);i++) {
390                                 printf("%s:", fields[i].name);
391                         }
392                         printf("num_reclock_ctdbd_latency:");
393                         printf("min_reclock_ctdbd_latency:");
394                         printf("avg_reclock_ctdbd_latency:");
395                         printf("max_reclock_ctdbd_latency:");
396
397                         printf("num_reclock_recd_latency:");
398                         printf("min_reclock_recd_latency:");
399                         printf("avg_reclock_recd_latency:");
400                         printf("max_reclock_recd_latency:");
401
402                         printf("num_call_latency:");
403                         printf("min_call_latency:");
404                         printf("avg_call_latency:");
405                         printf("max_call_latency:");
406
407                         printf("num_lockwait_latency:");
408                         printf("min_lockwait_latency:");
409                         printf("avg_lockwait_latency:");
410                         printf("max_lockwait_latency:");
411
412                         printf("num_childwrite_latency:");
413                         printf("min_childwrite_latency:");
414                         printf("avg_childwrite_latency:");
415                         printf("max_childwrite_latency:");
416                         printf("\n");
417                 }
418                 printf("%d:", CTDB_VERSION);
419                 printf("%d:", (int)s->statistics_current_time.tv_sec);
420                 printf("%d:", (int)s->statistics_start_time.tv_sec);
421                 for (i=0;i<ARRAY_SIZE(fields);i++) {
422                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
423                 }
424                 printf("%d:", s->reclock.ctdbd.num);
425                 printf("%.6f:", s->reclock.ctdbd.min);
426                 printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
427                 printf("%.6f:", s->reclock.ctdbd.max);
428
429                 printf("%d:", s->reclock.recd.num);
430                 printf("%.6f:", s->reclock.recd.min);
431                 printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
432                 printf("%.6f:", s->reclock.recd.max);
433
434                 printf("%d:", s->call_latency.num);
435                 printf("%.6f:", s->call_latency.min);
436                 printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
437                 printf("%.6f:", s->call_latency.max);
438
439                 printf("%d:", s->childwrite_latency.num);
440                 printf("%.6f:", s->childwrite_latency.min);
441                 printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
442                 printf("%.6f:", s->childwrite_latency.max);
443                 printf("\n");
444         } else {
445                 printf("CTDB version %u\n", CTDB_VERSION);
446                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
447                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
448
449                 for (i=0;i<ARRAY_SIZE(fields);i++) {
450                         if (strchr(fields[i].name, '.')) {
451                                 preflen = strcspn(fields[i].name, ".")+1;
452                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
453                                         prefix = fields[i].name;
454                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
455                                 }
456                         } else {
457                                 preflen = 0;
458                         }
459                         printf(" %*s%-22s%*s%10u\n", 
460                                preflen?4:0, "",
461                                fields[i].name+preflen, 
462                                preflen?0:4, "",
463                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
464                 }
465                 printf(" hop_count_buckets:");
466                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
467                         printf(" %d", s->hop_count_bucket[i]);
468                 }
469                 printf("\n");
470                 printf(" lock_buckets:");
471                 for (i=0; i<MAX_COUNT_BUCKETS; i++) {
472                         printf(" %d", s->locks.buckets[i]);
473                 }
474                 printf("\n");
475                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
476
477                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
478
479                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
480
481                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
482                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
483         }
484
485         talloc_free(tmp_ctx);
486 }
487
488 /*
489   display remote ctdb statistics combined from all nodes
490  */
491 static int control_statistics_all(struct ctdb_context *ctdb)
492 {
493         int ret, i;
494         struct ctdb_statistics statistics;
495         uint32_t *nodes;
496         uint32_t num_nodes;
497
498         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
499         CTDB_NO_MEMORY(ctdb, nodes);
500         
501         ZERO_STRUCT(statistics);
502
503         for (i=0;i<num_nodes;i++) {
504                 struct ctdb_statistics s1;
505                 int j;
506                 uint32_t *v1 = (uint32_t *)&s1;
507                 uint32_t *v2 = (uint32_t *)&statistics;
508                 uint32_t num_ints = 
509                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
510                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
511                 if (ret != 0) {
512                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
513                         return ret;
514                 }
515                 for (j=0;j<num_ints;j++) {
516                         v2[j] += v1[j];
517                 }
518                 statistics.max_hop_count = 
519                         MAX(statistics.max_hop_count, s1.max_hop_count);
520                 statistics.call_latency.max = 
521                         MAX(statistics.call_latency.max, s1.call_latency.max);
522         }
523         talloc_free(nodes);
524         printf("Gathered statistics for %u nodes\n", num_nodes);
525         show_statistics(&statistics, 1);
526         return 0;
527 }
528
529 /*
530   display remote ctdb statistics
531  */
532 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
533 {
534         int ret;
535         struct ctdb_statistics statistics;
536
537         if (options.pnn == CTDB_BROADCAST_ALL) {
538                 return control_statistics_all(ctdb);
539         }
540
541         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
542         if (ret != 0) {
543                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
544                 return ret;
545         }
546         show_statistics(&statistics, 1);
547         return 0;
548 }
549
550
551 /*
552   reset remote ctdb statistics
553  */
554 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
555 {
556         int ret;
557
558         ret = ctdb_statistics_reset(ctdb, options.pnn);
559         if (ret != 0) {
560                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
561                 return ret;
562         }
563         return 0;
564 }
565
566
567 /*
568   display remote ctdb rolling statistics
569  */
570 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
571 {
572         int ret;
573         struct ctdb_statistics_wire *stats;
574         int i, num_records = -1;
575
576         if (argc ==1) {
577                 num_records = atoi(argv[0]) - 1;
578         }
579
580         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
581         if (ret != 0) {
582                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
583                 return ret;
584         }
585         for (i=0;i<stats->num;i++) {
586                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
587                         continue;
588                 }
589                 show_statistics(&stats->stats[i], i==0);
590                 if (i == num_records) {
591                         break;
592                 }
593         }
594         return 0;
595 }
596
597
598 /*
599   display remote ctdb db statistics
600  */
601 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
602 {
603         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
604         struct ctdb_db_statistics *dbstat;
605         int i;
606         uint32_t db_id;
607         int num_hot_keys;
608
609         if (argc < 1) {
610                 usage();
611         }
612
613         if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
614                 return -1;
615         }
616
617         if (!ctdb_getdbstat(ctdb_connection, options.pnn, db_id, &dbstat)) {
618                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
619                 talloc_free(tmp_ctx);
620                 return -1;
621         }
622
623         printf("DB Statistics: %s\n", argv[0]);
624         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
625                 dbstat->db_ro_delegations);
626         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
627                 dbstat->db_ro_delegations);
628         printf(" %s\n", "locks");
629         printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
630                 dbstat->locks.num_calls);
631         printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
632                 dbstat->locks.num_failed);
633         printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
634                 dbstat->locks.num_current);
635         printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
636                 dbstat->locks.num_pending);
637         printf(" %s", "hop_count_buckets:");
638         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
639                 printf(" %d", dbstat->hop_count_bucket[i]);
640         }
641         printf("\n");
642         printf(" %s", "lock_buckets:");
643         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
644                 printf(" %d", dbstat->locks.buckets[i]);
645         }
646         printf("\n");
647         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
648                 "locks_latency      MIN/AVG/MAX",
649                 dbstat->locks.latency.min,
650                 (dbstat->locks.latency.num ?
651                  dbstat->locks.latency.total /dbstat->locks.latency.num :
652                  0.0),
653                 dbstat->locks.latency.max,
654                 dbstat->locks.latency.num);
655         num_hot_keys = 0;
656         for (i=0; i<dbstat->num_hot_keys; i++) {
657                 if (dbstat->hot_keys[i].count > 0) {
658                         num_hot_keys++;
659                 }
660         }
661         dbstat->num_hot_keys = num_hot_keys;
662
663         printf(" Num Hot Keys:     %d\n", dbstat->num_hot_keys);
664         for (i = 0; i < dbstat->num_hot_keys; i++) {
665                 int j;
666                 printf("     Count:%d Key:", dbstat->hot_keys[i].count);
667                 for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
668                         printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
669                 }
670                 printf("\n");
671         }
672
673         ctdb_free_dbstat(dbstat);
674         return 0;
675 }
676
677 /*
678   display uptime of remote node
679  */
680 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
681 {
682         int ret;
683         struct ctdb_uptime *uptime = NULL;
684         int tmp, days, hours, minutes, seconds;
685
686         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
687         if (ret != 0) {
688                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
689                 return ret;
690         }
691
692         if (options.machinereadable){
693                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
694                 printf(":%u:%u:%u:%lf\n",
695                         (unsigned int)uptime->current_time.tv_sec,
696                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
697                         (unsigned int)uptime->last_recovery_finished.tv_sec,
698                         timeval_delta(&uptime->last_recovery_finished,
699                                       &uptime->last_recovery_started)
700                 );
701                 return 0;
702         }
703
704         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
705
706         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
707         seconds = tmp%60;
708         tmp    /= 60;
709         minutes = tmp%60;
710         tmp    /= 60;
711         hours   = tmp%24;
712         tmp    /= 24;
713         days    = tmp;
714         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
715
716         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
717         seconds = tmp%60;
718         tmp    /= 60;
719         minutes = tmp%60;
720         tmp    /= 60;
721         hours   = tmp%24;
722         tmp    /= 24;
723         days    = tmp;
724         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
725         
726         printf("Duration of last recovery/failover: %lf seconds\n",
727                 timeval_delta(&uptime->last_recovery_finished,
728                               &uptime->last_recovery_started));
729
730         return 0;
731 }
732
733 /*
734   show the PNN of the current node
735  */
736 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
737 {
738         uint32_t mypnn;
739         bool ret;
740
741         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
742         if (!ret) {
743                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
744                 return -1;
745         }
746
747         printf("PNN:%d\n", mypnn);
748         return 0;
749 }
750
751
752 struct pnn_node {
753         struct pnn_node *next;
754         const char *addr;
755         int pnn;
756 };
757
758 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
759 {
760         const char *nodes_list;
761         int nlines;
762         char **lines;
763         int i, pnn;
764         struct pnn_node *pnn_nodes = NULL;
765         struct pnn_node *pnn_node;
766         struct pnn_node *tmp_node;
767
768         /* read the nodes file */
769         nodes_list = getenv("CTDB_NODES");
770         if (nodes_list == NULL) {
771                 nodes_list = "/etc/ctdb/nodes";
772         }
773         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
774         if (lines == NULL) {
775                 return NULL;
776         }
777         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
778                 nlines--;
779         }
780         for (i=0, pnn=0; i<nlines; i++) {
781                 char *node;
782
783                 node = lines[i];
784                 /* strip leading spaces */
785                 while((*node == ' ') || (*node == '\t')) {
786                         node++;
787                 }
788                 if (*node == '#') {
789                         pnn++;
790                         continue;
791                 }
792                 if (strcmp(node, "") == 0) {
793                         continue;
794                 }
795                 pnn_node = talloc(mem_ctx, struct pnn_node);
796                 pnn_node->pnn = pnn++;
797                 pnn_node->addr = talloc_strdup(pnn_node, node);
798                 pnn_node->next = pnn_nodes;
799                 pnn_nodes = pnn_node;
800         }
801
802         /* swap them around so we return them in incrementing order */
803         pnn_node = pnn_nodes;
804         pnn_nodes = NULL;
805         while (pnn_node) {
806                 tmp_node = pnn_node;
807                 pnn_node = pnn_node->next;
808
809                 tmp_node->next = pnn_nodes;
810                 pnn_nodes = tmp_node;
811         }
812
813         return pnn_nodes;
814 }
815
816 /*
817   show the PNN of the current node
818   discover the pnn by loading the nodes file and try to bind to all
819   addresses one at a time until the ip address is found.
820  */
821 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
822 {
823         TALLOC_CTX *mem_ctx = talloc_new(NULL);
824         struct pnn_node *pnn_nodes;
825         struct pnn_node *pnn_node;
826
827         pnn_nodes = read_nodes_file(mem_ctx);
828         if (pnn_nodes == NULL) {
829                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
830                 talloc_free(mem_ctx);
831                 return -1;
832         }
833
834         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
835                 ctdb_sock_addr addr;
836
837                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
838                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
839                         talloc_free(mem_ctx);
840                         return -1;
841                 }
842
843                 if (ctdb_sys_have_ip(&addr)) {
844                         printf("PNN:%d\n", pnn_node->pnn);
845                         talloc_free(mem_ctx);
846                         return 0;
847                 }
848         }
849
850         printf("Failed to detect which PNN this node is\n");
851         talloc_free(mem_ctx);
852         return -1;
853 }
854
855 /* Helpers for ctdb status
856  */
857 static bool is_partially_online(struct ctdb_node_and_flags *node)
858 {
859         int j;
860         bool ret = false;
861
862         if (node->flags == 0) {
863                 struct ctdb_ifaces_list *ifaces;
864
865                 if (ctdb_getifaces(ctdb_connection, node->pnn, &ifaces)) {
866                         for (j=0; j < ifaces->num; j++) {
867                                 if (ifaces->ifaces[j].link_state != 0) {
868                                         continue;
869                                 }
870                                 ret = true;
871                                 break;
872                         }
873                         ctdb_free_ifaces(ifaces);
874                 }
875         }
876
877         return ret;
878 }
879
880 static void control_status_header_machine(void)
881 {
882         printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
883                ":Inactive:PartiallyOnline:ThisNode:\n");
884 }
885
886 static int control_status_1_machine(int mypnn, struct ctdb_node_and_flags *node)
887 {
888         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
889                ctdb_addr_to_str(&node->addr),
890                !!(node->flags&NODE_FLAGS_DISCONNECTED),
891                !!(node->flags&NODE_FLAGS_BANNED),
892                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
893                !!(node->flags&NODE_FLAGS_UNHEALTHY),
894                !!(node->flags&NODE_FLAGS_STOPPED),
895                !!(node->flags&NODE_FLAGS_INACTIVE),
896                is_partially_online(node) ? 1 : 0,
897                (node->pnn == mypnn)?'Y':'N');
898
899         return node->flags;
900 }
901
902 static int control_status_1_human(int mypnn, struct ctdb_node_and_flags *node)
903 {
904        printf("pnn:%d %-16s %s%s\n", node->pnn,
905               ctdb_addr_to_str(&node->addr),
906               is_partially_online(node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
907               node->pnn == mypnn?" (THIS NODE)":"");
908
909        return node->flags;
910 }
911
912 /*
913   display remote ctdb status
914  */
915 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
916 {
917         int i;
918         struct ctdb_vnn_map *vnnmap=NULL;
919         struct ctdb_node_map *nodemap=NULL;
920         uint32_t recmode, recmaster, mypnn;
921         int num_deleted_nodes = 0;
922
923         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
924                 return -1;
925         }
926
927         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
928                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
929                 return -1;
930         }
931
932         if (options.machinereadable) {
933                 control_status_header_machine();
934                 for (i=0;i<nodemap->num;i++) {
935                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
936                                 continue;
937                         }
938                         (void) control_status_1_machine(mypnn,
939                                                         &nodemap->nodes[i]);
940                 }
941                 return 0;
942         }
943
944         for (i=0; i<nodemap->num; i++) {
945                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
946                         num_deleted_nodes++;
947                 }
948         }
949         if (num_deleted_nodes == 0) {
950                 printf("Number of nodes:%d\n", nodemap->num);
951         } else {
952                 printf("Number of nodes:%d (including %d deleted nodes)\n",
953                        nodemap->num, num_deleted_nodes);
954         }
955         for(i=0;i<nodemap->num;i++){
956                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
957                         continue;
958                 }
959                 (void) control_status_1_human(mypnn, &nodemap->nodes[i]);
960         }
961
962         if (!ctdb_getvnnmap(ctdb_connection, options.pnn, &vnnmap)) {
963                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
964                 return -1;
965         }
966         if (vnnmap->generation == INVALID_GENERATION) {
967                 printf("Generation:INVALID\n");
968         } else {
969                 printf("Generation:%d\n",vnnmap->generation);
970         }
971         printf("Size:%d\n",vnnmap->size);
972         for(i=0;i<vnnmap->size;i++){
973                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
974         }
975         ctdb_free_vnnmap(vnnmap);
976
977         if (!ctdb_getrecmode(ctdb_connection, options.pnn, &recmode)) {
978                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
979                 return -1;
980         }
981         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
982
983         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
984                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
985                 return -1;
986         }
987         printf("Recovery master:%d\n",recmaster);
988
989         return 0;
990 }
991
992 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
993 {
994         int i, ret;
995         struct ctdb_node_map *nodemap=NULL;
996         uint32_t * nodes;
997         uint32_t pnn_mode, mypnn;
998
999         if (argc > 1) {
1000                 usage();
1001         }
1002
1003         if (!parse_nodestring(ctdb, argc == 1 ? argv[0] : NULL,
1004                               options.pnn, true, &nodes, &pnn_mode)) {
1005                 return -1;
1006         }
1007
1008         if (options.machinereadable) {
1009                 control_status_header_machine();
1010         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
1011                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
1012         }
1013
1014         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
1015                 DEBUG(DEBUG_ERR, ("Unable to get PNN from local node\n"));
1016                 return -1;
1017         }
1018
1019         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
1020                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1021                 return -1;
1022         }
1023
1024         ret = 0;
1025
1026         for (i = 0; i < talloc_array_length(nodes); i++) {
1027                 if (options.machinereadable) {
1028                         ret |= control_status_1_machine(mypnn,
1029                                                         &nodemap->nodes[nodes[i]]);
1030                 } else {
1031                         ret |= control_status_1_human(mypnn,
1032                                                       &nodemap->nodes[nodes[i]]);
1033                 }
1034         }
1035         return ret;
1036 }
1037
1038 struct natgw_node {
1039         struct natgw_node *next;
1040         const char *addr;
1041 };
1042
1043 static int find_natgw(struct ctdb_context *ctdb,
1044                        struct ctdb_node_map *nodemap, uint32_t flags,
1045                        uint32_t *pnn, const char **ip)
1046 {
1047         int i;
1048         uint32_t capabilities;
1049
1050         for (i=0;i<nodemap->num;i++) {
1051                 if (!(nodemap->nodes[i].flags & flags)) {
1052                         if (!ctdb_getcapabilities(ctdb_connection, nodemap->nodes[i].pnn, &capabilities)) {
1053                                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1054                                 return -1;
1055                         }
1056                         if (!(capabilities&CTDB_CAP_NATGW)) {
1057                                 continue;
1058                         }
1059                         *pnn = nodemap->nodes[i].pnn;
1060                         *ip = ctdb_addr_to_str(&nodemap->nodes[i].addr);
1061                         return 0;
1062                 }
1063         }
1064
1065         return 2; /* matches ENOENT */
1066 }
1067
1068 /*
1069   display the list of nodes belonging to this natgw configuration
1070  */
1071 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
1072 {
1073         int i, ret;
1074         const char *natgw_list;
1075         int nlines;
1076         char **lines;
1077         struct natgw_node *natgw_nodes = NULL;
1078         struct natgw_node *natgw_node;
1079         struct ctdb_node_map *nodemap=NULL;
1080         uint32_t mypnn, pnn;
1081         const char *ip;
1082
1083         /* When we have some nodes that could be the NATGW, make a
1084          * series of attempts to find the first node that doesn't have
1085          * certain status flags set.
1086          */
1087         uint32_t exclude_flags[] = {
1088                 /* Look for a nice healthy node */
1089                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
1090                 /* If not found, an UNHEALTHY/BANNED node will do */
1091                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
1092                 /* If not found, a STOPPED node will do */
1093                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
1094                 0,
1095         };
1096
1097         /* read the natgw nodes file into a linked list */
1098         natgw_list = getenv("CTDB_NATGW_NODES");
1099         if (natgw_list == NULL) {
1100                 natgw_list = "/etc/ctdb/natgw_nodes";
1101         }
1102         lines = file_lines_load(natgw_list, &nlines, ctdb);
1103         if (lines == NULL) {
1104                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
1105                 return -1;
1106         }
1107         for (i=0;i<nlines;i++) {
1108                 char *node;
1109
1110                 node = lines[i];
1111                 /* strip leading spaces */
1112                 while((*node == ' ') || (*node == '\t')) {
1113                         node++;
1114                 }
1115                 if (*node == '#') {
1116                         continue;
1117                 }
1118                 if (strcmp(node, "") == 0) {
1119                         continue;
1120                 }
1121                 natgw_node = talloc(ctdb, struct natgw_node);
1122                 natgw_node->addr = talloc_strdup(natgw_node, node);
1123                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
1124                 natgw_node->next = natgw_nodes;
1125                 natgw_nodes = natgw_node;
1126         }
1127
1128         if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
1129                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1130                 return -1;
1131         }
1132
1133         /* Trim the nodemap so it only includes connected nodes in the
1134          * current natgw group.
1135          */
1136         i=0;
1137         while(i<nodemap->num) {
1138                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
1139                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
1140                                 break;
1141                         }
1142                 }
1143
1144                 /* this node was not in the natgw so we just remove it from
1145                  * the list
1146                  */
1147                 if ((natgw_node == NULL) 
1148                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
1149                         int j;
1150
1151                         for (j=i+1; j<nodemap->num; j++) {
1152                                 nodemap->nodes[j-1] = nodemap->nodes[j];
1153                         }
1154                         nodemap->num--;
1155                         continue;
1156                 }
1157
1158                 i++;
1159         }
1160
1161         ret = 2; /* matches ENOENT */
1162         pnn = -1;
1163         ip = "0.0.0.0";
1164         for (i = 0; exclude_flags[i] != 0; i++) {
1165                 ret = find_natgw(ctdb, nodemap,
1166                                  exclude_flags[i],
1167                                  &pnn, &ip);
1168                 if (ret == -1) {
1169                         goto done;
1170                 }
1171                 if (ret == 0) {
1172                         break;
1173                 }
1174         }
1175
1176         if (options.machinereadable) {
1177                 printf(":Node:IP:\n");
1178                 printf(":%d:%s:\n", pnn, ip);
1179         } else {
1180                 printf("%d %s\n", pnn, ip);
1181         }
1182
1183         /* print the pruned list of nodes belonging to this natgw list */
1184         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
1185                 DEBUG(DEBUG_NOTICE, ("Unable to get PNN from node %u\n", options.pnn));
1186                 /* This is actually harmless and will only result in
1187                  * the "this node" indication being missing
1188                  */
1189                 mypnn = -1;
1190         }
1191         if (options.machinereadable) {
1192                 control_status_header_machine();
1193         } else {
1194                 printf("Number of nodes:%d\n", nodemap->num);
1195         }
1196         for(i=0;i<nodemap->num;i++){
1197                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1198                         continue;
1199                 }
1200                 if (options.machinereadable) {
1201                         control_status_1_machine(mypnn, &(nodemap->nodes[i]));
1202                 } else {
1203                         control_status_1_human(mypnn, &(nodemap->nodes[i]));
1204                 }
1205         }
1206
1207 done:
1208         ctdb_free_nodemap(nodemap);
1209         return ret;
1210 }
1211
1212 /*
1213   display the status of the scripts for monitoring (or other events)
1214  */
1215 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1216                                     enum ctdb_eventscript_call type)
1217 {
1218         struct ctdb_scripts_wire *script_status;
1219         int ret, i;
1220
1221         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1222         if (ret != 0) {
1223                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1224                 return ret;
1225         }
1226
1227         if (script_status == NULL) {
1228                 if (!options.machinereadable) {
1229                         printf("%s cycle never run\n",
1230                                ctdb_eventscript_call_names[type]);
1231                 }
1232                 return 0;
1233         }
1234
1235         if (!options.machinereadable) {
1236                 printf("%d scripts were executed last %s cycle\n",
1237                        script_status->num_scripts,
1238                        ctdb_eventscript_call_names[type]);
1239         }
1240         for (i=0; i<script_status->num_scripts; i++) {
1241                 const char *status = NULL;
1242
1243                 switch (script_status->scripts[i].status) {
1244                 case -ETIME:
1245                         status = "TIMEDOUT";
1246                         break;
1247                 case -ENOEXEC:
1248                         status = "DISABLED";
1249                         break;
1250                 case 0:
1251                         status = "OK";
1252                         break;
1253                 default:
1254                         if (script_status->scripts[i].status > 0)
1255                                 status = "ERROR";
1256                         break;
1257                 }
1258                 if (options.machinereadable) {
1259                         printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1260                                ctdb_eventscript_call_names[type],
1261                                script_status->scripts[i].name,
1262                                script_status->scripts[i].status,
1263                                status,
1264                                (long)script_status->scripts[i].start.tv_sec,
1265                                (long)script_status->scripts[i].start.tv_usec,
1266                                (long)script_status->scripts[i].finished.tv_sec,
1267                                (long)script_status->scripts[i].finished.tv_usec,
1268                                script_status->scripts[i].output);
1269                         continue;
1270                 }
1271                 if (status)
1272                         printf("%-20s Status:%s    ",
1273                                script_status->scripts[i].name, status);
1274                 else
1275                         /* Some other error, eg from stat. */
1276                         printf("%-20s Status:CANNOT RUN (%s)",
1277                                script_status->scripts[i].name,
1278                                strerror(-script_status->scripts[i].status));
1279
1280                 if (script_status->scripts[i].status >= 0) {
1281                         printf("Duration:%.3lf ",
1282                         timeval_delta(&script_status->scripts[i].finished,
1283                               &script_status->scripts[i].start));
1284                 }
1285                 if (script_status->scripts[i].status != -ENOEXEC) {
1286                         printf("%s",
1287                                ctime(&script_status->scripts[i].start.tv_sec));
1288                         if (script_status->scripts[i].status != 0) {
1289                                 printf("   OUTPUT:%s\n",
1290                                        script_status->scripts[i].output);
1291                         }
1292                 } else {
1293                         printf("\n");
1294                 }
1295         }
1296         return 0;
1297 }
1298
1299
1300 static int control_scriptstatus(struct ctdb_context *ctdb,
1301                                 int argc, const char **argv)
1302 {
1303         int ret;
1304         enum ctdb_eventscript_call type, min, max;
1305         const char *arg;
1306
1307         if (argc > 1) {
1308                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1309                 return -1;
1310         }
1311
1312         if (argc == 0)
1313                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1314         else
1315                 arg = argv[0];
1316
1317         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1318                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1319                         min = type;
1320                         max = type+1;
1321                         break;
1322                 }
1323         }
1324         if (type == CTDB_EVENT_MAX) {
1325                 if (strcmp(arg, "all") == 0) {
1326                         min = 0;
1327                         max = CTDB_EVENT_MAX;
1328                 } else {
1329                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1330                         return -1;
1331                 }
1332         }
1333
1334         if (options.machinereadable) {
1335                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1336         }
1337
1338         for (type = min; type < max; type++) {
1339                 ret = control_one_scriptstatus(ctdb, type);
1340                 if (ret != 0) {
1341                         return ret;
1342                 }
1343         }
1344
1345         return 0;
1346 }
1347
1348 /*
1349   enable an eventscript
1350  */
1351 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1352 {
1353         int ret;
1354
1355         if (argc < 1) {
1356                 usage();
1357         }
1358
1359         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1360         if (ret != 0) {
1361           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1362                 return ret;
1363         }
1364
1365         return 0;
1366 }
1367
1368 /*
1369   disable an eventscript
1370  */
1371 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1372 {
1373         int ret;
1374
1375         if (argc < 1) {
1376                 usage();
1377         }
1378
1379         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1380         if (ret != 0) {
1381           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1382                 return ret;
1383         }
1384
1385         return 0;
1386 }
1387
1388 /*
1389   display the pnn of the recovery master
1390  */
1391 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1392 {
1393         uint32_t recmaster;
1394
1395         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
1396                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1397                 return -1;
1398         }
1399         printf("%d\n",recmaster);
1400
1401         return 0;
1402 }
1403
1404 /*
1405   add a tickle to a public address
1406  */
1407 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1408 {
1409         struct ctdb_tcp_connection t;
1410         TDB_DATA data;
1411         int ret;
1412
1413         if (argc < 2) {
1414                 usage();
1415         }
1416
1417         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1418                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1419                 return -1;
1420         }
1421         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1422                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1423                 return -1;
1424         }
1425
1426         data.dptr = (uint8_t *)&t;
1427         data.dsize = sizeof(t);
1428
1429         /* tell all nodes about this tcp connection */
1430         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1431                            0, data, ctdb, NULL, NULL, NULL, NULL);
1432         if (ret != 0) {
1433                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1434                 return -1;
1435         }
1436         
1437         return 0;
1438 }
1439
1440
1441 /*
1442   delete a tickle from a node
1443  */
1444 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1445 {
1446         struct ctdb_tcp_connection t;
1447         TDB_DATA data;
1448         int ret;
1449
1450         if (argc < 2) {
1451                 usage();
1452         }
1453
1454         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1455                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1456                 return -1;
1457         }
1458         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1459                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1460                 return -1;
1461         }
1462
1463         data.dptr = (uint8_t *)&t;
1464         data.dsize = sizeof(t);
1465
1466         /* tell all nodes about this tcp connection */
1467         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1468                            0, data, ctdb, NULL, NULL, NULL, NULL);
1469         if (ret != 0) {
1470                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1471                 return -1;
1472         }
1473         
1474         return 0;
1475 }
1476
1477
1478 /*
1479   get a list of all tickles for this pnn
1480  */
1481 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1482 {
1483         struct ctdb_control_tcp_tickle_list *list;
1484         ctdb_sock_addr addr;
1485         int i, ret;
1486         unsigned port = 0;
1487
1488         if (argc < 1) {
1489                 usage();
1490         }
1491
1492         if (argc == 2) {
1493                 port = atoi(argv[1]);
1494         }
1495
1496         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1497                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1498                 return -1;
1499         }
1500
1501         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1502         if (ret == -1) {
1503                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1504                 return -1;
1505         }
1506
1507         if (options.machinereadable){
1508                 printf(":source ip:port:destination ip:port:\n");
1509                 for (i=0;i<list->tickles.num;i++) {
1510                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1511                                 continue;
1512                         }
1513                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1514                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1515                 }
1516         } else {
1517                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1518                 printf("Num tickles:%u\n", list->tickles.num);
1519                 for (i=0;i<list->tickles.num;i++) {
1520                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1521                                 continue;
1522                         }
1523                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1524                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1525                 }
1526         }
1527
1528         talloc_free(list);
1529         
1530         return 0;
1531 }
1532
1533
1534 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1535 {
1536         struct ctdb_all_public_ips *ips;
1537         struct ctdb_public_ip ip;
1538         int i, ret;
1539         uint32_t *nodes;
1540         uint32_t disable_time;
1541         TDB_DATA data;
1542         struct ctdb_node_map *nodemap=NULL;
1543         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1544
1545         disable_time = 30;
1546         data.dptr  = (uint8_t*)&disable_time;
1547         data.dsize = sizeof(disable_time);
1548         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1549         if (ret != 0) {
1550                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1551                 return -1;
1552         }
1553
1554
1555
1556         /* read the public ip list from the node */
1557         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1558         if (ret != 0) {
1559                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1560                 talloc_free(tmp_ctx);
1561                 return -1;
1562         }
1563
1564         for (i=0;i<ips->num;i++) {
1565                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1566                         break;
1567                 }
1568         }
1569         if (i==ips->num) {
1570                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1571                         pnn, ctdb_addr_to_str(addr)));
1572                 talloc_free(tmp_ctx);
1573                 return -1;
1574         }
1575
1576         ip.pnn  = pnn;
1577         ip.addr = *addr;
1578
1579         data.dptr  = (uint8_t *)&ip;
1580         data.dsize = sizeof(ip);
1581
1582         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1583         if (ret != 0) {
1584                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1585                 talloc_free(tmp_ctx);
1586                 return ret;
1587         }
1588
1589         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1590         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1591                                         nodes, 0,
1592                                         LONGTIMELIMIT(),
1593                                         false, data,
1594                                         NULL, NULL,
1595                                         NULL);
1596         if (ret != 0) {
1597                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1598                 talloc_free(tmp_ctx);
1599                 return -1;
1600         }
1601
1602         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1603         if (ret != 0) {
1604                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1605                 talloc_free(tmp_ctx);
1606                 return -1;
1607         }
1608
1609         /* update the recovery daemon so it now knows to expect the new
1610            node assignment for this ip.
1611         */
1612         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1613         if (ret != 0) {
1614                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1615                 return -1;
1616         }
1617
1618         talloc_free(tmp_ctx);
1619         return 0;
1620 }
1621
1622
1623 /* 
1624  * scans all other nodes and returns a pnn for another node that can host this 
1625  * ip address or -1
1626  */
1627 static int
1628 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1629 {
1630         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1631         struct ctdb_all_public_ips *ips;
1632         struct ctdb_node_map *nodemap=NULL;
1633         int i, j, ret;
1634
1635         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1636         if (ret != 0) {
1637                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1638                 talloc_free(tmp_ctx);
1639                 return ret;
1640         }
1641
1642         for(i=0;i<nodemap->num;i++){
1643                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1644                         continue;
1645                 }
1646                 if (nodemap->nodes[i].pnn == options.pnn) {
1647                         continue;
1648                 }
1649
1650                 /* read the public ip list from this node */
1651                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1652                 if (ret != 0) {
1653                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1654                         return -1;
1655                 }
1656
1657                 for (j=0;j<ips->num;j++) {
1658                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1659                                 talloc_free(tmp_ctx);
1660                                 return nodemap->nodes[i].pnn;
1661                         }
1662                 }
1663                 talloc_free(ips);
1664         }
1665
1666         talloc_free(tmp_ctx);
1667         return -1;
1668 }
1669
1670 /* If pnn is -1 then try to find a node to move IP to... */
1671 static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1672 {
1673         bool pnn_specified = (pnn == -1 ? false : true);
1674         int retries = 0;
1675
1676         while (retries < 5) {
1677                 if (!pnn_specified) {
1678                         pnn = find_other_host_for_public_ip(ctdb, addr);
1679                         if (pnn == -1) {
1680                                 return false;
1681                         }
1682                         DEBUG(DEBUG_NOTICE,
1683                               ("Trying to move public IP to node %u\n", pnn));
1684                 }
1685
1686                 if (move_ip(ctdb, addr, pnn) == 0) {
1687                         return true;
1688                 }
1689
1690                 sleep(3);
1691                 retries++;
1692         }
1693
1694         return false;
1695 }
1696
1697
1698 /*
1699   move/failover an ip address to a specific node
1700  */
1701 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1702 {
1703         uint32_t pnn;
1704         ctdb_sock_addr addr;
1705
1706         if (argc < 2) {
1707                 usage();
1708                 return -1;
1709         }
1710
1711         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1712                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1713                 return -1;
1714         }
1715
1716
1717         if (sscanf(argv[1], "%u", &pnn) != 1) {
1718                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1719                 return -1;
1720         }
1721
1722         if (!try_moveip(ctdb, &addr, pnn)) {
1723                 DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
1724                 return -1;
1725         }
1726
1727         return 0;
1728 }
1729
1730 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1731 {
1732         uint32_t recmaster;
1733         TDB_DATA data;
1734
1735         if (ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), pnn, &recmaster) != 0) {
1736                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", pnn));
1737                 return -1;
1738         }
1739
1740         data.dptr  = (uint8_t *)&pnn;
1741         data.dsize = sizeof(uint32_t);
1742         if (ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1743                 DEBUG(DEBUG_ERR,("Failed to send message to force node reallocation\n"));
1744                 return -1;
1745         }
1746
1747         return 0;
1748 }
1749
1750
1751 /*
1752   rebalance a node by setting it to allow failback and triggering a
1753   takeover run
1754  */
1755 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1756 {
1757         switch (options.pnn) {
1758         case CTDB_BROADCAST_ALL:
1759         case CTDB_CURRENT_NODE:
1760                 DEBUG(DEBUG_ERR,("You must specify a node number with -n <pnn> for the node to rebalance\n"));
1761                 return -1;
1762         }
1763
1764         return rebalance_node(ctdb, options.pnn);
1765 }
1766
1767
1768 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1769 {
1770         struct ctdb_public_ip ip;
1771         int ret;
1772         uint32_t *nodes;
1773         uint32_t disable_time;
1774         TDB_DATA data;
1775         struct ctdb_node_map *nodemap=NULL;
1776         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1777
1778         disable_time = 30;
1779         data.dptr  = (uint8_t*)&disable_time;
1780         data.dsize = sizeof(disable_time);
1781         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1782         if (ret != 0) {
1783                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1784                 return -1;
1785         }
1786
1787         ip.pnn  = -1;
1788         ip.addr = *addr;
1789
1790         data.dptr  = (uint8_t *)&ip;
1791         data.dsize = sizeof(ip);
1792
1793         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1794         if (ret != 0) {
1795                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1796                 talloc_free(tmp_ctx);
1797                 return ret;
1798         }
1799
1800         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1801         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1802                                         nodes, 0,
1803                                         LONGTIMELIMIT(),
1804                                         false, data,
1805                                         NULL, NULL,
1806                                         NULL);
1807         if (ret != 0) {
1808                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1809                 talloc_free(tmp_ctx);
1810                 return -1;
1811         }
1812
1813         talloc_free(tmp_ctx);
1814         return 0;
1815 }
1816
1817 /*
1818   release an ip form all nodes and have it re-assigned by recd
1819  */
1820 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
1821 {
1822         ctdb_sock_addr addr;
1823
1824         if (argc < 1) {
1825                 usage();
1826                 return -1;
1827         }
1828
1829         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1830                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1831                 return -1;
1832         }
1833
1834         if (rebalance_ip(ctdb, &addr) != 0) {
1835                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
1836                 return -1;
1837         }
1838
1839         return 0;
1840 }
1841
1842 static int getips_store_callback(void *param, void *data)
1843 {
1844         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1845         struct ctdb_all_public_ips *ips = param;
1846         int i;
1847
1848         i = ips->num++;
1849         ips->ips[i].pnn  = node_ip->pnn;
1850         ips->ips[i].addr = node_ip->addr;
1851         return 0;
1852 }
1853
1854 static int getips_count_callback(void *param, void *data)
1855 {
1856         uint32_t *count = param;
1857
1858         (*count)++;
1859         return 0;
1860 }
1861
1862 #define IP_KEYLEN       4
1863 static uint32_t *ip_key(ctdb_sock_addr *ip)
1864 {
1865         static uint32_t key[IP_KEYLEN];
1866
1867         bzero(key, sizeof(key));
1868
1869         switch (ip->sa.sa_family) {
1870         case AF_INET:
1871                 key[0]  = ip->ip.sin_addr.s_addr;
1872                 break;
1873         case AF_INET6: {
1874                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
1875                 key[0]  = s6_a32[3];
1876                 key[1]  = s6_a32[2];
1877                 key[2]  = s6_a32[1];
1878                 key[3]  = s6_a32[0];
1879                 break;
1880         }
1881         default:
1882                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1883                 return key;
1884         }
1885
1886         return key;
1887 }
1888
1889 static void *add_ip_callback(void *parm, void *data)
1890 {
1891         return parm;
1892 }
1893
1894 static int
1895 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1896 {
1897         struct ctdb_all_public_ips *tmp_ips;
1898         struct ctdb_node_map *nodemap=NULL;
1899         trbt_tree_t *ip_tree;
1900         int i, j, len, ret;
1901         uint32_t count;
1902
1903         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1904         if (ret != 0) {
1905                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1906                 return ret;
1907         }
1908
1909         ip_tree = trbt_create(tmp_ctx, 0);
1910
1911         for(i=0;i<nodemap->num;i++){
1912                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1913                         continue;
1914                 }
1915                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1916                         continue;
1917                 }
1918
1919                 /* read the public ip list from this node */
1920                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1921                 if (ret != 0) {
1922                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1923                         return -1;
1924                 }
1925         
1926                 for (j=0; j<tmp_ips->num;j++) {
1927                         struct ctdb_public_ip *node_ip;
1928
1929                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1930                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1931                         node_ip->addr = tmp_ips->ips[j].addr;
1932
1933                         trbt_insertarray32_callback(ip_tree,
1934                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1935                                 add_ip_callback,
1936                                 node_ip);
1937                 }
1938                 talloc_free(tmp_ips);
1939         }
1940
1941         /* traverse */
1942         count = 0;
1943         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1944
1945         len = offsetof(struct ctdb_all_public_ips, ips) + 
1946                 count*sizeof(struct ctdb_public_ip);
1947         tmp_ips = talloc_zero_size(tmp_ctx, len);
1948         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1949
1950         *ips = tmp_ips;
1951
1952         return 0;
1953 }
1954
1955
1956 static bool ipreallocate_finished;
1957
1958 /*
1959   handler for receiving the response to ipreallocate
1960 */
1961 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1962                              TDB_DATA data, void *private_data)
1963 {
1964         ipreallocate_finished = true;
1965 }
1966
1967 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1968 {
1969         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1970
1971         event_add_timed(ctdb->ev, ctdb, 
1972                                 timeval_current_ofs(1, 0),
1973                                 ctdb_every_second, ctdb);
1974 }
1975
1976 /* Send an ipreallocate to the recovery daemon on all nodes.  Only the
1977  * recovery master will answer.
1978  */
1979 static int ipreallocate(struct ctdb_context *ctdb)
1980 {
1981         int ret;
1982         TDB_DATA data;
1983         struct takeover_run_reply rd;
1984         struct timeval tv;
1985
1986         /* Time ticks to enable timeouts to be processed */
1987         event_add_timed(ctdb->ev, ctdb, 
1988                                 timeval_current_ofs(1, 0),
1989                                 ctdb_every_second, ctdb);
1990
1991         rd.pnn = ctdb_get_pnn(ctdb);
1992         rd.srvid = getpid();
1993
1994         /* Register message port for reply from recovery master */
1995         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1996
1997         data.dptr = (uint8_t *)&rd;
1998         data.dsize = sizeof(rd);
1999
2000 again:
2001         /* Send to all connected nodes. Only recmaster replies */
2002         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2003                                        CTDB_SRVID_TAKEOVER_RUN, data);
2004         if (ret != 0) {
2005                 /* This can only happen if the socket is closed and
2006                  * there's no way to recover from that, so don't try
2007                  * again.
2008                  */
2009                 DEBUG(DEBUG_WARNING,
2010                       ("Failed to send IP reallocation request to connected nodes\n"));
2011                 return -1;
2012         }
2013
2014         tv = timeval_current();
2015         /* This loop terminates the reply is received */
2016         while (timeval_elapsed(&tv) < 5.0 && !ipreallocate_finished) {
2017                 event_loop_once(ctdb->ev);
2018         }
2019
2020         if (!ipreallocate_finished) {
2021                 DEBUG(DEBUG_NOTICE,
2022                       ("Still waiting for confirmation of IP reallocation\n"));
2023                 goto again;
2024         }
2025
2026         return 0;
2027 }
2028
2029
2030 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
2031 {
2032         return ipreallocate(ctdb);
2033 }
2034
2035 /*
2036   add a public ip address to a node
2037  */
2038 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2039 {
2040         int i, ret;
2041         int len, retries = 0;
2042         unsigned mask;
2043         ctdb_sock_addr addr;
2044         struct ctdb_control_ip_iface *pub;
2045         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2046         struct ctdb_all_public_ips *ips;
2047
2048
2049         if (argc != 2) {
2050                 talloc_free(tmp_ctx);
2051                 usage();
2052         }
2053
2054         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2055                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2056                 talloc_free(tmp_ctx);
2057                 return -1;
2058         }
2059
2060         /* read the public ip list from the node */
2061         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2062         if (ret != 0) {
2063                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2064                 talloc_free(tmp_ctx);
2065                 return -1;
2066         }
2067         for (i=0;i<ips->num;i++) {
2068                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2069                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2070                         return 0;
2071                 }
2072         }
2073
2074
2075
2076         /* Dont timeout. This command waits for an ip reallocation
2077            which sometimes can take wuite a while if there has
2078            been a recent recovery
2079         */
2080         alarm(0);
2081
2082         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2083         pub = talloc_size(tmp_ctx, len); 
2084         CTDB_NO_MEMORY(ctdb, pub);
2085
2086         pub->addr  = addr;
2087         pub->mask  = mask;
2088         pub->len   = strlen(argv[1])+1;
2089         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2090
2091         do {
2092                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2093                 if (ret != 0) {
2094                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2095                         sleep(3);
2096                         retries++;
2097                 }
2098         } while (retries < 5 && ret != 0);
2099         if (ret != 0) {
2100                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2101                 talloc_free(tmp_ctx);
2102                 return ret;
2103         }
2104
2105         if (rebalance_node(ctdb, options.pnn) != 0) {
2106                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2107                 return ret;
2108         }
2109
2110         talloc_free(tmp_ctx);
2111         return 0;
2112 }
2113
2114 /*
2115   add a public ip address to a node
2116  */
2117 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2118 {
2119         ctdb_sock_addr addr;
2120
2121         if (argc != 1) {
2122                 usage();
2123         }
2124
2125         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2126                 printf("Badly formed ip : %s\n", argv[0]);
2127                 return -1;
2128         }
2129
2130         printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
2131
2132         return 0;
2133 }
2134
2135 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2136
2137 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2138 {
2139         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2140         struct ctdb_node_map *nodemap=NULL;
2141         struct ctdb_all_public_ips *ips;
2142         int ret, i, j;
2143
2144         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2145         if (ret != 0) {
2146                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2147                 return ret;
2148         }
2149
2150         /* remove it from the nodes that are not hosting the ip currently */
2151         for(i=0;i<nodemap->num;i++){
2152                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2153                         continue;
2154                 }
2155                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
2156                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2157                         continue;
2158                 }
2159
2160                 for (j=0;j<ips->num;j++) {
2161                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2162                                 break;
2163                         }
2164                 }
2165                 if (j==ips->num) {
2166                         continue;
2167                 }
2168
2169                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2170                         continue;
2171                 }
2172
2173                 options.pnn = nodemap->nodes[i].pnn;
2174                 control_delip(ctdb, argc, argv);
2175         }
2176
2177
2178         /* remove it from every node (also the one hosting it) */
2179         for(i=0;i<nodemap->num;i++){
2180                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2181                         continue;
2182                 }
2183                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
2184                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2185                         continue;
2186                 }
2187
2188                 for (j=0;j<ips->num;j++) {
2189                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2190                                 break;
2191                         }
2192                 }
2193                 if (j==ips->num) {
2194                         continue;
2195                 }
2196
2197                 options.pnn = nodemap->nodes[i].pnn;
2198                 control_delip(ctdb, argc, argv);
2199         }
2200
2201         talloc_free(tmp_ctx);
2202         return 0;
2203 }
2204         
2205 /*
2206   delete a public ip address from a node
2207  */
2208 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2209 {
2210         int i, ret;
2211         ctdb_sock_addr addr;
2212         struct ctdb_control_ip_iface pub;
2213         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2214         struct ctdb_all_public_ips *ips;
2215
2216         if (argc != 1) {
2217                 talloc_free(tmp_ctx);
2218                 usage();
2219         }
2220
2221         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2222                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2223                 return -1;
2224         }
2225
2226         if (options.pnn == CTDB_BROADCAST_ALL) {
2227                 return control_delip_all(ctdb, argc, argv, &addr);
2228         }
2229
2230         pub.addr  = addr;
2231         pub.mask  = 0;
2232         pub.len   = 0;
2233
2234         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2235         if (ret != 0) {
2236                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2237                 talloc_free(tmp_ctx);
2238                 return ret;
2239         }
2240         
2241         for (i=0;i<ips->num;i++) {
2242                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2243                         break;
2244                 }
2245         }
2246
2247         if (i==ips->num) {
2248                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2249                         ctdb_addr_to_str(&addr)));
2250                 talloc_free(tmp_ctx);
2251                 return -1;
2252         }
2253
2254         /* This is an optimisation.  If this node is hosting the IP
2255          * then try to move it somewhere else without invoking a full
2256          * takeover run.  We don't care if this doesn't work!
2257          */
2258         if (ips->ips[i].pnn == options.pnn) {
2259                 (void) try_moveip(ctdb, &addr, -1);
2260         }
2261
2262         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2263         if (ret != 0) {
2264                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2265                 talloc_free(tmp_ctx);
2266                 return ret;
2267         }
2268
2269         talloc_free(tmp_ctx);
2270         return 0;
2271 }
2272
2273 static int kill_tcp_from_file(struct ctdb_context *ctdb,
2274                               int argc, const char **argv)
2275 {
2276         struct ctdb_control_killtcp *killtcp;
2277         int max_entries, current, i;
2278         struct timeval timeout;
2279         char line[128], src[128], dst[128];
2280         int linenum;
2281         TDB_DATA data;
2282         struct client_async_data *async_data;
2283         struct ctdb_client_control_state *state;
2284
2285         if (argc != 0) {
2286                 usage();
2287         }
2288
2289         if (options.pnn == CTDB_BROADCAST_ALL ||
2290             options.pnn == CTDB_MULTICAST) {
2291                 DEBUG(DEBUG_ERR, ("Can not use killtcp to multiple nodes\n"));
2292                 return -1;
2293         }
2294
2295         linenum = 1;
2296         killtcp = NULL;
2297         max_entries = 0;
2298         current = 0;
2299         while (!feof(stdin)) {
2300                 if (fgets(line, sizeof(line), stdin) == NULL) {
2301                         continue;
2302                 }
2303
2304                 /* Silently skip empty lines */
2305                 if (line[0] == '\n') {
2306                         continue;
2307                 }
2308
2309                 if (sscanf(line, "%s %s\n", src, dst) != 2) {
2310                         DEBUG(DEBUG_ERR, ("Bad line [%d]: '%s'\n",
2311                                           linenum, line));
2312                         talloc_free(killtcp);
2313                         return -1;
2314                 }
2315
2316                 if (current >= max_entries) {
2317                         max_entries += 1024;
2318                         killtcp = talloc_realloc(ctdb, killtcp,
2319                                                  struct ctdb_control_killtcp,
2320                                                  max_entries);
2321                         CTDB_NO_MEMORY(ctdb, killtcp);
2322                 }
2323
2324                 if (!parse_ip_port(src, &killtcp[current].src_addr)) {
2325                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2326                                           linenum, src));
2327                         talloc_free(killtcp);
2328                         return -1;
2329                 }
2330
2331                 if (!parse_ip_port(dst, &killtcp[current].dst_addr)) {
2332                         DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
2333                                           linenum, dst));
2334                         talloc_free(killtcp);
2335                         return -1;
2336                 }
2337
2338                 current++;
2339         }
2340
2341         async_data = talloc_zero(ctdb, struct client_async_data);
2342         if (async_data == NULL) {
2343                 talloc_free(killtcp);
2344                 return -1;
2345         }
2346
2347         for (i = 0; i < current; i++) {
2348
2349                 data.dsize = sizeof(struct ctdb_control_killtcp);
2350                 data.dptr  = (unsigned char *)&killtcp[i];
2351
2352                 timeout = TIMELIMIT();
2353                 state = ctdb_control_send(ctdb, options.pnn, 0,
2354                                           CTDB_CONTROL_KILL_TCP, 0, data,
2355                                           async_data, &timeout, NULL);
2356
2357                 if (state == NULL) {
2358                         DEBUG(DEBUG_ERR,
2359                               ("Failed to call async killtcp control to node %u\n",
2360                                options.pnn));
2361                         talloc_free(killtcp);
2362                         return -1;
2363                 }
2364                 
2365                 ctdb_client_async_add(async_data, state);
2366         }
2367
2368         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2369                 DEBUG(DEBUG_ERR,("killtcp failed\n"));
2370                 talloc_free(killtcp);
2371                 return -1;
2372         }
2373
2374         talloc_free(killtcp);
2375         return 0;
2376 }
2377
2378
2379 /*
2380   kill a tcp connection
2381  */
2382 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2383 {
2384         int ret;
2385         struct ctdb_control_killtcp killtcp;
2386
2387         if (argc == 0) {
2388                 return kill_tcp_from_file(ctdb, argc, argv);
2389         }
2390
2391         if (argc < 2) {
2392                 usage();
2393         }
2394
2395         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2396                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2397                 return -1;
2398         }
2399
2400         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2401                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2402                 return -1;
2403         }
2404
2405         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2406         if (ret != 0) {
2407                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2408                 return ret;
2409         }
2410
2411         return 0;
2412 }
2413
2414
2415 /*
2416   send a gratious arp
2417  */
2418 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2419 {
2420         int ret;
2421         ctdb_sock_addr addr;
2422
2423         if (argc < 2) {
2424                 usage();
2425         }
2426
2427         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2428                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2429                 return -1;
2430         }
2431
2432         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2433         if (ret != 0) {
2434                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2435                 return ret;
2436         }
2437
2438         return 0;
2439 }
2440
2441 /*
2442   register a server id
2443  */
2444 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2445 {
2446         int ret;
2447         struct ctdb_server_id server_id;
2448
2449         if (argc < 3) {
2450                 usage();
2451         }
2452
2453         server_id.pnn       = strtoul(argv[0], NULL, 0);
2454         server_id.type      = strtoul(argv[1], NULL, 0);
2455         server_id.server_id = strtoul(argv[2], NULL, 0);
2456
2457         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2458         if (ret != 0) {
2459                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2460                 return ret;
2461         }
2462         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2463         sleep(999);
2464         return -1;
2465 }
2466
2467 /*
2468   unregister a server id
2469  */
2470 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2471 {
2472         int ret;
2473         struct ctdb_server_id server_id;
2474
2475         if (argc < 3) {
2476                 usage();
2477         }
2478
2479         server_id.pnn       = strtoul(argv[0], NULL, 0);
2480         server_id.type      = strtoul(argv[1], NULL, 0);
2481         server_id.server_id = strtoul(argv[2], NULL, 0);
2482
2483         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2484         if (ret != 0) {
2485                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2486                 return ret;
2487         }
2488         return -1;
2489 }
2490
2491 /*
2492   check if a server id exists
2493  */
2494 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2495 {
2496         uint32_t status;
2497         int ret;
2498         struct ctdb_server_id server_id;
2499
2500         if (argc < 3) {
2501                 usage();
2502         }
2503
2504         server_id.pnn       = strtoul(argv[0], NULL, 0);
2505         server_id.type      = strtoul(argv[1], NULL, 0);
2506         server_id.server_id = strtoul(argv[2], NULL, 0);
2507
2508         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2509         if (ret != 0) {
2510                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2511                 return ret;
2512         }
2513
2514         if (status) {
2515                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2516         } else {
2517                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2518         }
2519         return 0;
2520 }
2521
2522 /*
2523   get a list of all server ids that are registered on a node
2524  */
2525 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2526 {
2527         int i, ret;
2528         struct ctdb_server_id_list *server_ids;
2529
2530         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2531         if (ret != 0) {
2532                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2533                 return ret;
2534         }
2535
2536         for (i=0; i<server_ids->num; i++) {
2537                 printf("Server id %d:%d:%d\n", 
2538                         server_ids->server_ids[i].pnn, 
2539                         server_ids->server_ids[i].type, 
2540                         server_ids->server_ids[i].server_id); 
2541         }
2542
2543         return -1;
2544 }
2545
2546 /*
2547   check if a server id exists
2548  */
2549 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2550 {
2551         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2552         uint64_t *ids;
2553         uint8_t *result;
2554         int i;
2555
2556         if (argc < 1) {
2557                 talloc_free(tmp_ctx);
2558                 usage();
2559         }
2560
2561         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2562         result = talloc_array(tmp_ctx, uint8_t, argc);
2563
2564         for (i = 0; i < argc; i++) {
2565                 ids[i] = strtoull(argv[i], NULL, 0);
2566         }
2567
2568         if (!ctdb_check_message_handlers(ctdb_connection,
2569                 options.pnn, argc, ids, result)) {
2570                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2571                                   options.pnn));
2572                 talloc_free(tmp_ctx);
2573                 return -1;
2574         }
2575
2576         for (i=0; i < argc; i++) {
2577                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2578                        result[i] ? "exists" : "does not exist");
2579         }
2580
2581         talloc_free(tmp_ctx);
2582         return 0;
2583 }
2584
2585 /*
2586   send a tcp tickle ack
2587  */
2588 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2589 {
2590         int ret;
2591         ctdb_sock_addr  src, dst;
2592
2593         if (argc < 2) {
2594                 usage();
2595         }
2596
2597         if (!parse_ip_port(argv[0], &src)) {
2598                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2599                 return -1;
2600         }
2601
2602         if (!parse_ip_port(argv[1], &dst)) {
2603                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2604                 return -1;
2605         }
2606
2607         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2608         if (ret==0) {
2609                 return 0;
2610         }
2611         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2612
2613         return -1;
2614 }
2615
2616
2617 /*
2618   display public ip status
2619  */
2620 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2621 {
2622         int i, ret;
2623         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2624         struct ctdb_all_public_ips *ips;
2625
2626         if (options.pnn == CTDB_BROADCAST_ALL) {
2627                 /* read the list of public ips from all nodes */
2628                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2629         } else {
2630                 /* read the public ip list from this node */
2631                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2632         }
2633         if (ret != 0) {
2634                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2635                 talloc_free(tmp_ctx);
2636                 return ret;
2637         }
2638
2639         if (options.machinereadable){
2640                 printf(":Public IP:Node:");
2641                 if (options.verbose){
2642                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2643                 }
2644                 printf("\n");
2645         } else {
2646                 if (options.pnn == CTDB_BROADCAST_ALL) {
2647                         printf("Public IPs on ALL nodes\n");
2648                 } else {
2649                         printf("Public IPs on node %u\n", options.pnn);
2650                 }
2651         }
2652
2653         for (i=1;i<=ips->num;i++) {
2654                 struct ctdb_control_public_ip_info *info = NULL;
2655                 int32_t pnn;
2656                 char *aciface = NULL;
2657                 char *avifaces = NULL;
2658                 char *cifaces = NULL;
2659
2660                 if (options.pnn == CTDB_BROADCAST_ALL) {
2661                         pnn = ips->ips[ips->num-i].pnn;
2662                 } else {
2663                         pnn = options.pnn;
2664                 }
2665
2666                 if (pnn != -1) {
2667                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
2668                                                    &ips->ips[ips->num-i].addr, &info);
2669                 } else {
2670                         ret = -1;
2671                 }
2672
2673                 if (ret == 0) {
2674                         int j;
2675                         for (j=0; j < info->num; j++) {
2676                                 if (cifaces == NULL) {
2677                                         cifaces = talloc_strdup(info,
2678                                                                 info->ifaces[j].name);
2679                                 } else {
2680                                         cifaces = talloc_asprintf_append(cifaces,
2681                                                                          ",%s",
2682                                                                          info->ifaces[j].name);
2683                                 }
2684
2685                                 if (info->active_idx == j) {
2686                                         aciface = info->ifaces[j].name;
2687                                 }
2688
2689                                 if (info->ifaces[j].link_state == 0) {
2690                                         continue;
2691                                 }
2692
2693                                 if (avifaces == NULL) {
2694                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
2695                                 } else {
2696                                         avifaces = talloc_asprintf_append(avifaces,
2697                                                                           ",%s",
2698                                                                           info->ifaces[j].name);
2699                                 }
2700                         }
2701                 }
2702
2703                 if (options.machinereadable){
2704                         printf(":%s:%d:",
2705                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2706                                 ips->ips[ips->num-i].pnn);
2707                         if (options.verbose){
2708                                 printf("%s:%s:%s:",
2709                                         aciface?aciface:"",
2710                                         avifaces?avifaces:"",
2711                                         cifaces?cifaces:"");
2712                         }
2713                         printf("\n");
2714                 } else {
2715                         if (options.verbose) {
2716                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
2717                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2718                                         ips->ips[ips->num-i].pnn,
2719                                         aciface?aciface:"",
2720                                         avifaces?avifaces:"",
2721                                         cifaces?cifaces:"");
2722                         } else {
2723                                 printf("%s %d\n",
2724                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2725                                         ips->ips[ips->num-i].pnn);
2726                         }
2727                 }
2728                 talloc_free(info);
2729         }
2730
2731         talloc_free(tmp_ctx);
2732         return 0;
2733 }
2734
2735 /*
2736   public ip info
2737  */
2738 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
2739 {
2740         int i, ret;
2741         ctdb_sock_addr addr;
2742         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2743         struct ctdb_control_public_ip_info *info;
2744
2745         if (argc != 1) {
2746                 talloc_free(tmp_ctx);
2747                 usage();
2748         }
2749
2750         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2751                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2752                 return -1;
2753         }
2754
2755         /* read the public ip info from this node */
2756         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
2757                                            tmp_ctx, &addr, &info);
2758         if (ret != 0) {
2759                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
2760                                   argv[0], options.pnn));
2761                 talloc_free(tmp_ctx);
2762                 return ret;
2763         }
2764
2765         printf("Public IP[%s] info on node %u\n",
2766                ctdb_addr_to_str(&info->ip.addr),
2767                options.pnn);
2768
2769         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
2770                ctdb_addr_to_str(&info->ip.addr),
2771                info->ip.pnn, info->num);
2772
2773         for (i=0; i<info->num; i++) {
2774                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2775
2776                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
2777                        i+1, info->ifaces[i].name,
2778                        info->ifaces[i].link_state?"up":"down",
2779                        (unsigned int)info->ifaces[i].references,
2780                        (i==info->active_idx)?" (active)":"");
2781         }
2782
2783         talloc_free(tmp_ctx);
2784         return 0;
2785 }
2786
2787 /*
2788   display interfaces status
2789  */
2790 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
2791 {
2792         int i;
2793         struct ctdb_ifaces_list *ifaces;
2794
2795         /* read the public ip list from this node */
2796         if (!ctdb_getifaces(ctdb_connection, options.pnn, &ifaces)) {
2797                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
2798                                   options.pnn));
2799                 return -1;
2800         }
2801
2802         if (options.machinereadable){
2803                 printf(":Name:LinkStatus:References:\n");
2804         } else {
2805                 printf("Interfaces on node %u\n", options.pnn);
2806         }
2807
2808         for (i=0; i<ifaces->num; i++) {
2809                 if (options.machinereadable){
2810                         printf(":%s:%s:%u\n",
2811                                ifaces->ifaces[i].name,
2812                                ifaces->ifaces[i].link_state?"1":"0",
2813                                (unsigned int)ifaces->ifaces[i].references);
2814                 } else {
2815                         printf("name:%s link:%s references:%u\n",
2816                                ifaces->ifaces[i].name,
2817                                ifaces->ifaces[i].link_state?"up":"down",
2818                                (unsigned int)ifaces->ifaces[i].references);
2819                 }
2820         }
2821
2822         ctdb_free_ifaces(ifaces);
2823         return 0;
2824 }
2825
2826
2827 /*
2828   set link status of an interface
2829  */
2830 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
2831 {
2832         int ret;
2833         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2834         struct ctdb_control_iface_info info;
2835
2836         ZERO_STRUCT(info);
2837
2838         if (argc != 2) {
2839                 usage();
2840         }
2841
2842         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
2843                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
2844                                   argv[0]));
2845                 talloc_free(tmp_ctx);
2846                 return -1;
2847         }
2848         strcpy(info.name, argv[0]);
2849
2850         if (strcmp(argv[1], "up") == 0) {
2851                 info.link_state = 1;
2852         } else if (strcmp(argv[1], "down") == 0) {
2853                 info.link_state = 0;
2854         } else {
2855                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
2856                                   argv[1]));
2857                 talloc_free(tmp_ctx);
2858                 return -1;
2859         }
2860
2861         /* read the public ip list from this node */
2862         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
2863                                    tmp_ctx, &info);
2864         if (ret != 0) {
2865                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
2866                                   argv[0], options.pnn));
2867                 talloc_free(tmp_ctx);
2868                 return ret;
2869         }
2870
2871         talloc_free(tmp_ctx);
2872         return 0;
2873 }
2874
2875 /*
2876   display pid of a ctdb daemon
2877  */
2878 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
2879 {
2880         uint32_t pid;
2881         int ret;
2882
2883         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
2884         if (ret != 0) {
2885                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
2886                 return ret;
2887         }
2888         printf("Pid:%d\n", pid);
2889
2890         return 0;
2891 }
2892
2893 typedef bool update_flags_handler_t(struct ctdb_context *ctdb, void *data);
2894
2895 static int update_flags_and_ipreallocate(struct ctdb_context *ctdb,
2896                                               void *data,
2897                                               update_flags_handler_t handler,
2898                                               uint32_t flag,
2899                                               const char *desc,
2900                                               bool set_flag)
2901 {
2902         struct ctdb_node_map *nodemap = NULL;
2903         bool flag_is_set;
2904
2905         /* Check if the node is already in the desired state */
2906         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2907                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2908                 exit(10);
2909         }
2910         flag_is_set = nodemap->nodes[options.pnn].flags & flag;
2911         if (set_flag == flag_is_set) {
2912                 DEBUG(DEBUG_NOTICE, ("Node %d is %s %s\n", options.pnn,
2913                                      (set_flag ? "already" : "not"), desc));
2914                 return 0;
2915         }
2916
2917         do {
2918                 if (!handler(ctdb, data)) {
2919                         DEBUG(DEBUG_WARNING,
2920                               ("Failed to send control to set state %s on node %u, try again\n",
2921                                desc, options.pnn));
2922                 }
2923
2924                 sleep(1);
2925
2926                 /* Read the nodemap and verify the change took effect.
2927                  * Even if the above control/hanlder timed out then it
2928                  * could still have worked!
2929                  */
2930                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
2931                                          ctdb, &nodemap) != 0) {
2932                         DEBUG(DEBUG_WARNING,
2933                               ("Unable to get nodemap from local node, try again\n"));
2934                 }
2935                 flag_is_set = nodemap->nodes[options.pnn].flags & flag;
2936         } while (nodemap == NULL || (set_flag != flag_is_set));
2937
2938         return ipreallocate(ctdb);
2939 }
2940
2941 /* Administratively disable a node */
2942 static bool update_flags_disabled(struct ctdb_context *ctdb, void *data)
2943 {
2944         return ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
2945                                   NODE_FLAGS_PERMANENTLY_DISABLED, 0) == 0;
2946 }
2947
2948 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2949 {
2950         return update_flags_and_ipreallocate(ctdb, NULL,
2951                                                   update_flags_disabled,
2952                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
2953                                                   "disabled",
2954                                                   true /* set_flag*/);
2955 }
2956
2957 /* Administratively re-enable a node */
2958 static bool update_flags_not_disabled(struct ctdb_context *ctdb, void *data)
2959 {
2960         return ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
2961                                   0, NODE_FLAGS_PERMANENTLY_DISABLED) == 0;
2962 }
2963
2964 static int control_enable(struct ctdb_context *ctdb,  int argc, const char **argv)
2965 {
2966         return update_flags_and_ipreallocate(ctdb, NULL,
2967                                                   update_flags_not_disabled,
2968                                                   NODE_FLAGS_PERMANENTLY_DISABLED,
2969                                                   "disabled",
2970                                                   false /* set_flag*/);
2971 }
2972
2973 /* Stop a node */
2974 static bool update_flags_stopped(struct ctdb_context *ctdb, void *data)
2975 {
2976         return ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn) == 0;
2977 }
2978
2979 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2980 {
2981         return update_flags_and_ipreallocate(ctdb, NULL,
2982                                                   update_flags_stopped,
2983                                                   NODE_FLAGS_STOPPED,
2984                                                   "stopped",
2985                                                   true /* set_flag*/);
2986 }
2987
2988 /* Continue a stopped node */
2989 static bool update_flags_not_stopped(struct ctdb_context *ctdb, void *data)
2990 {
2991         return ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn) == 0;
2992 }
2993
2994 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2995 {
2996         return update_flags_and_ipreallocate(ctdb, NULL,
2997                                                   update_flags_not_stopped,
2998                                                   NODE_FLAGS_STOPPED,
2999                                                   "stopped",
3000                                                   false /* set_flag */);
3001 }
3002
3003 static uint32_t get_generation(struct ctdb_context *ctdb)
3004 {
3005         struct ctdb_vnn_map *vnnmap=NULL;
3006         int ret;
3007
3008         /* wait until the recmaster is not in recovery mode */
3009         while (1) {
3010                 uint32_t recmode, recmaster;
3011                 
3012                 if (vnnmap != NULL) {
3013                         talloc_free(vnnmap);
3014                         vnnmap = NULL;
3015                 }
3016
3017                 /* get the recmaster */
3018                 if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
3019                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
3020                         exit(10);
3021                 }
3022
3023                 /* get recovery mode */
3024                 if (!ctdb_getrecmode(ctdb_connection, recmaster, &recmode)) {
3025                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
3026                         exit(10);
3027                 }
3028
3029                 /* get the current generation number */
3030                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
3031                 if (ret != 0) {
3032                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
3033                         exit(10);
3034                 }
3035
3036                 if ((recmode == CTDB_RECOVERY_NORMAL)
3037                 &&  (vnnmap->generation != 1)){
3038                         return vnnmap->generation;
3039                 }
3040                 sleep(1);
3041         }
3042 }
3043
3044 /* Ban a node */
3045 static bool update_state_banned(struct ctdb_context *ctdb, void *data)
3046 {
3047         struct ctdb_ban_time *bantime = (struct ctdb_ban_time *)data;
3048         return ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, bantime) == 0;
3049 }
3050
3051 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
3052 {
3053         struct ctdb_ban_time bantime;
3054
3055         if (argc < 1) {
3056                 usage();
3057         }
3058         
3059         bantime.pnn  = options.pnn;
3060         bantime.time = strtoul(argv[0], NULL, 0);
3061
3062         return update_flags_and_ipreallocate(ctdb, &bantime,
3063                                                   update_state_banned,
3064                                                   NODE_FLAGS_BANNED,
3065                                                   "banned",
3066                                                   true /* set_flag*/);
3067 }
3068
3069
3070 /* Unban a node */
3071 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3072 {
3073         struct ctdb_ban_time bantime;
3074
3075         bantime.pnn  = options.pnn;
3076         bantime.time = 0;
3077
3078         return update_flags_and_ipreallocate(ctdb, &bantime,
3079                                                   update_state_banned,
3080                                                   NODE_FLAGS_BANNED,
3081                                                   "banned",
3082                                                   false /* set_flag*/);
3083 }
3084
3085 /*
3086   show ban information for a node
3087  */
3088 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3089 {
3090         int ret;
3091         struct ctdb_node_map *nodemap=NULL;
3092         struct ctdb_ban_time *bantime;
3093
3094         /* verify the node exists */
3095         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3096         if (ret != 0) {
3097                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3098                 return ret;
3099         }
3100
3101         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3102         if (ret != 0) {
3103                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3104                 return -1;
3105         }       
3106
3107         if (bantime->time == 0) {
3108                 printf("Node %u is not banned\n", bantime->pnn);
3109         } else {
3110                 printf("Node %u is banned, %d seconds remaining\n",
3111                        bantime->pnn, bantime->time);
3112         }
3113
3114         return 0;
3115 }
3116
3117 /*
3118   shutdown a daemon
3119  */
3120 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3121 {
3122         int ret;
3123
3124         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3125         if (ret != 0) {
3126                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3127                 return ret;
3128         }
3129
3130         return 0;
3131 }
3132
3133 /*
3134   trigger a recovery
3135  */
3136 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3137 {
3138         int ret;
3139         uint32_t generation, next_generation;
3140         bool force;
3141
3142         /* "force" option ignores freeze failure and forces recovery */
3143         force = (argc == 1) && (strcasecmp(argv[0], "force") == 0);
3144
3145         /* record the current generation number */
3146         generation = get_generation(ctdb);
3147
3148         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
3149         if (ret != 0) {
3150                 if (!force) {
3151                         DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
3152                         return ret;
3153                 }
3154                 DEBUG(DEBUG_WARNING, ("Unable to freeze node but proceeding because \"force\" option given\n"));
3155         }
3156
3157         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3158         if (ret != 0) {
3159                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3160                 return ret;
3161         }
3162
3163         /* wait until we are in a new generation */
3164         while (1) {
3165                 next_generation = get_generation(ctdb);
3166                 if (next_generation != generation) {
3167                         return 0;
3168                 }
3169                 sleep(1);
3170         }
3171
3172         return 0;
3173 }
3174
3175
3176 /*
3177   display monitoring mode of a remote node
3178  */
3179 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3180 {
3181         uint32_t monmode;
3182         int ret;
3183
3184         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3185         if (ret != 0) {
3186                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3187                 return ret;
3188         }
3189         if (!options.machinereadable){
3190                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3191         } else {
3192                 printf(":mode:\n");
3193                 printf(":%d:\n",monmode);
3194         }
3195         return 0;
3196 }
3197
3198
3199 /*
3200   display capabilities of a remote node
3201  */
3202 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3203 {
3204         uint32_t capabilities;
3205
3206         if (!ctdb_getcapabilities(ctdb_connection, options.pnn, &capabilities)) {
3207                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3208                 return -1;
3209         }
3210         
3211         if (!options.machinereadable){
3212                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3213                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3214                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3215                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3216         } else {
3217                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
3218                 printf(":%d:%d:%d:%d:\n",
3219                         !!(capabilities&CTDB_CAP_RECMASTER),
3220                         !!(capabilities&CTDB_CAP_LMASTER),
3221                         !!(capabilities&CTDB_CAP_LVS),
3222                         !!(capabilities&CTDB_CAP_NATGW));
3223         }
3224         return 0;
3225 }
3226
3227 /*
3228   display lvs configuration
3229  */
3230 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3231 {
3232         uint32_t *capabilities;
3233         struct ctdb_node_map *nodemap=NULL;
3234         int i, ret;
3235         int healthy_count = 0;
3236
3237         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
3238                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3239                 return -1;
3240         }
3241
3242         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3243         CTDB_NO_MEMORY(ctdb, capabilities);
3244         
3245         ret = 0;
3246
3247         /* collect capabilities for all connected nodes */
3248         for (i=0; i<nodemap->num; i++) {
3249                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3250                         continue;
3251                 }
3252                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3253                         continue;
3254                 }
3255         
3256                 if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
3257                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3258                         ret = -1;
3259                         goto done;
3260                 }
3261
3262                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3263                         continue;
3264                 }
3265
3266                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3267                         healthy_count++;
3268                 }
3269         }
3270
3271         /* Print all LVS nodes */
3272         for (i=0; i<nodemap->num; i++) {
3273                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3274                         continue;
3275                 }
3276                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3277                         continue;
3278                 }
3279                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3280                         continue;
3281                 }
3282
3283                 if (healthy_count != 0) {
3284                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3285                                 continue;
3286                         }
3287                 }
3288
3289                 printf("%d:%s\n", i, 
3290                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
3291         }
3292
3293 done:
3294         ctdb_free_nodemap(nodemap);
3295         return ret;
3296 }
3297
3298 /*
3299   display who is the lvs master
3300  */
3301 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3302 {
3303         uint32_t *capabilities;
3304         struct ctdb_node_map *nodemap=NULL;
3305         int i, ret;
3306         int healthy_count = 0;
3307
3308         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
3309                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3310                 return -1;
3311         }
3312
3313         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3314         CTDB_NO_MEMORY(ctdb, capabilities);
3315
3316         ret = -1;
3317         
3318         /* collect capabilities for all connected nodes */
3319         for (i=0; i<nodemap->num; i++) {
3320                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3321                         continue;
3322                 }
3323                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3324                         continue;
3325                 }
3326         
3327                 if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
3328                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3329                         ret = -1;
3330                         goto done;
3331                 }
3332
3333                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3334                         continue;
3335                 }
3336
3337                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3338                         healthy_count++;
3339                 }
3340         }
3341
3342         /* find and show the lvsmaster */
3343         for (i=0; i<nodemap->num; i++) {
3344                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3345                         continue;
3346                 }
3347                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3348                         continue;
3349                 }
3350                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3351                         continue;
3352                 }
3353
3354                 if (healthy_count != 0) {
3355                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3356                                 continue;
3357                         }
3358                 }
3359
3360                 if (options.machinereadable){
3361                         printf("%d\n", i);
3362                 } else {
3363                         printf("Node %d is LVS master\n", i);
3364                 }
3365                 ret = 0;
3366                 goto done;
3367         }
3368
3369         printf("There is no LVS master\n");
3370 done:
3371         ctdb_free_nodemap(nodemap);
3372         return ret;
3373 }
3374
3375 /*
3376   disable monitoring on a  node
3377  */
3378 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3379 {
3380         
3381         int ret;
3382
3383         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3384         if (ret != 0) {
3385                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3386                 return ret;
3387         }
3388         printf("Monitoring mode:%s\n","DISABLED");
3389
3390         return 0;
3391 }
3392
3393 /*
3394   enable monitoring on a  node
3395  */
3396 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3397 {
3398         
3399         int ret;
3400
3401         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3402         if (ret != 0) {
3403                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3404                 return ret;
3405         }
3406         printf("Monitoring mode:%s\n","ACTIVE");
3407
3408         return 0;
3409 }
3410
3411 /*
3412   display remote list of keys/data for a db
3413  */
3414 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3415 {
3416         const char *db_name;
3417         struct ctdb_db_context *ctdb_db;
3418         int ret;
3419         struct ctdb_dump_db_context c;
3420         uint8_t flags;
3421
3422         if (argc < 1) {
3423                 usage();
3424         }
3425
3426         db_name = argv[0];
3427
3428         if (!db_exists(ctdb, db_name, NULL, &flags)) {
3429                 return -1;
3430         }
3431
3432         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3433         if (ctdb_db == NULL) {
3434                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3435                 return -1;
3436         }
3437
3438         if (options.printlmaster) {
3439                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3440                                           ctdb, &ctdb->vnn_map);
3441                 if (ret != 0) {
3442                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3443                                           options.pnn));
3444                         return ret;
3445                 }
3446         }
3447
3448         ZERO_STRUCT(c);
3449         c.f = stdout;
3450         c.printemptyrecords = (bool)options.printemptyrecords;
3451         c.printdatasize = (bool)options.printdatasize;
3452         c.printlmaster = (bool)options.printlmaster;
3453         c.printhash = (bool)options.printhash;
3454         c.printrecordflags = (bool)options.printrecordflags;
3455
3456         /* traverse and dump the cluster tdb */
3457         ret = ctdb_dump_db(ctdb_db, &c);
3458         if (ret == -1) {
3459                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3460                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3461                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3462                                   db_name));
3463                 return -1;
3464         }
3465         talloc_free(ctdb_db);
3466
3467         printf("Dumped %d records\n", ret);
3468         return 0;
3469 }
3470
3471 struct cattdb_data {
3472         struct ctdb_context *ctdb;
3473         uint32_t count;
3474 };
3475
3476 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3477 {
3478         struct cattdb_data *d = private_data;
3479         struct ctdb_dump_db_context c;
3480
3481         d->count++;
3482
3483         ZERO_STRUCT(c);
3484         c.f = stdout;
3485         c.printemptyrecords = (bool)options.printemptyrecords;
3486         c.printdatasize = (bool)options.printdatasize;
3487         c.printlmaster = false;
3488         c.printhash = (bool)options.printhash;
3489         c.printrecordflags = true;
3490
3491         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3492 }
3493
3494 /*
3495   cat the local tdb database using same format as catdb
3496  */
3497 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3498 {
3499         const char *db_name;
3500         struct ctdb_db_context *ctdb_db;
3501         struct cattdb_data d;
3502         uint8_t flags;
3503
3504         if (argc < 1) {
3505                 usage();
3506         }
3507
3508         db_name = argv[0];
3509
3510         if (!db_exists(ctdb, db_name, NULL, &flags)) {
3511                 return -1;
3512         }
3513
3514         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3515         if (ctdb_db == NULL) {
3516                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3517                 return -1;
3518         }
3519
3520         /* traverse the local tdb */
3521         d.count = 0;
3522         d.ctdb  = ctdb;
3523         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3524                 printf("Failed to cattdb data\n");
3525                 exit(10);
3526         }
3527         talloc_free(ctdb_db);
3528
3529         printf("Dumped %d records\n", d.count);
3530         return 0;
3531 }
3532
3533 /*
3534   display the content of a database key
3535  */
3536 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3537 {
3538         const char *db_name;
3539         struct ctdb_db_context *ctdb_db;
3540         struct ctdb_record_handle *h;
3541         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3542         TDB_DATA key, data;
3543         uint8_t flags;
3544
3545         if (argc < 2) {
3546                 usage();
3547         }
3548
3549         db_name = argv[0];
3550
3551         if (!db_exists(ctdb, db_name, NULL, &flags)) {
3552                 return -1;
3553         }
3554
3555         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3556         if (ctdb_db == NULL) {
3557                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3558                 return -1;
3559         }
3560
3561         key.dptr  = discard_const(argv[1]);
3562         key.dsize = strlen((char *)key.dptr);
3563
3564         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3565         if (h == NULL) {
3566                 printf("Failed to fetch record '%s' on node %d\n", 
3567                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3568                 talloc_free(tmp_ctx);
3569                 exit(10);
3570         }
3571
3572         printf("Data: size:%d ptr:[%s]\n", (int)data.dsize, data.dptr);
3573
3574         talloc_free(ctdb_db);
3575         talloc_free(tmp_ctx);
3576         return 0;
3577 }
3578
3579 /*
3580   display the content of a database key
3581  */
3582 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3583 {
3584         const char *db_name;
3585         struct ctdb_db_context *ctdb_db;
3586         struct ctdb_record_handle *h;
3587         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3588         TDB_DATA key, data;
3589         uint8_t flags;
3590
3591         if (argc < 3) {
3592                 usage();
3593         }
3594
3595         db_name = argv[0];
3596
3597         if (!db_exists(ctdb, db_name, NULL, &flags)) {
3598                 return -1;
3599         }
3600
3601         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
3602         if (ctdb_db == NULL) {
3603                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3604                 return -1;
3605         }
3606
3607         key.dptr  = discard_const(argv[1]);
3608         key.dsize = strlen((char *)key.dptr);
3609
3610         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3611         if (h == NULL) {
3612                 printf("Failed to fetch record '%s' on node %d\n", 
3613                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3614                 talloc_free(tmp_ctx);
3615                 exit(10);
3616         }
3617
3618         data.dptr  = discard_const(argv[2]);
3619         data.dsize = strlen((char *)data.dptr);
3620
3621         if (ctdb_record_store(h, data) != 0) {
3622                 printf("Failed to store record\n");
3623         }
3624
3625         talloc_free(h);
3626         talloc_free(ctdb_db);
3627         talloc_free(tmp_ctx);
3628         return 0;
3629 }
3630
3631 /*
3632   fetch a record from a persistent database
3633  */
3634 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3635 {
3636         const char *db_name;
3637         struct ctdb_db_context *ctdb_db;
3638         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3639         struct ctdb_transaction_handle *h;
3640         TDB_DATA key, data;
3641         int fd, ret;
3642         bool persistent;
3643         uint8_t flags;
3644
3645         if (argc < 2) {
3646                 talloc_free(tmp_ctx);
3647                 usage();
3648         }
3649
3650         db_name = argv[0];
3651
3652         if (!db_exists(ctdb, db_name, NULL, &flags)) {
3653                 talloc_free(tmp_ctx);
3654                 return -1;
3655         }
3656
3657         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
3658         if (!persistent) {
3659                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3660                 talloc_free(tmp_ctx);
3661                 return -1;
3662         }
3663
3664         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3665         if (ctdb_db == NULL) {
3666                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3667                 talloc_free(tmp_ctx);
3668                 return -1;
3669         }
3670
3671         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3672         if (h == NULL) {
3673                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3674                 talloc_free(tmp_ctx);
3675                 return -1;
3676         }
3677
3678         key.dptr  = discard_const(argv[1]);
3679         key.dsize = strlen(argv[1]);
3680         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3681         if (ret != 0) {
3682                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
3683                 talloc_free(tmp_ctx);
3684                 return -1;
3685         }
3686
3687         if (data.dsize == 0 || data.dptr == NULL) {
3688                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3689                 talloc_free(tmp_ctx);
3690                 return -1;
3691         }
3692
3693         if (argc == 3) {
3694           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3695                 if (fd == -1) {
3696                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3697                         talloc_free(tmp_ctx);
3698                         return -1;
3699                 }
3700                 write(fd, data.dptr, data.dsize);
3701                 close(fd);
3702         } else {
3703                 write(1, data.dptr, data.dsize);
3704         }
3705
3706         /* abort the transaction */
3707         talloc_free(h);
3708
3709
3710         talloc_free(tmp_ctx);
3711         return 0;
3712 }
3713
3714 /*
3715   fetch a record from a tdb-file
3716  */
3717 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3718 {
3719         const char *tdb_file;
3720         TDB_CONTEXT *tdb;
3721         TDB_DATA key, data;
3722         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3723         int fd;
3724
3725         if (argc < 2) {
3726                 usage();
3727         }
3728
3729         tdb_file = argv[0];
3730
3731         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
3732         if (tdb == NULL) {
3733                 printf("Failed to open TDB file %s\n", tdb_file);
3734                 return -1;
3735         }
3736
3737         if (!strncmp(argv[1], "0x", 2)) {
3738                 key = hextodata(tmp_ctx, argv[1] + 2);
3739                 if (key.dsize == 0) {
3740                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3741                         return -1;
3742                 }
3743         } else {
3744                 key.dptr  = discard_const(argv[1]);
3745                 key.dsize = strlen(argv[1]);
3746         }
3747
3748         data = tdb_fetch(tdb, key);
3749         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
3750                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
3751                 tdb_close(tdb);
3752                 return -1;
3753         }
3754
3755         tdb_close(tdb);
3756
3757         if (argc == 3) {
3758           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3759                 if (fd == -1) {
3760                         printf("Failed to open output file %s\n", argv[2]);
3761                         return -1;
3762                 }
3763                 if (options.verbose){
3764                         write(fd, data.dptr, data.dsize);
3765                 } else {
3766                         write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3767                 }
3768                 close(fd);
3769         } else {
3770                 if (options.verbose){
3771                         write(1, data.dptr, data.dsize);
3772                 } else {
3773                         write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3774                 }
3775         }
3776
3777         talloc_free(tmp_ctx);
3778         return 0;
3779 }
3780
3781 /*
3782   store a record and header to a tdb-file
3783  */
3784 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
3785 {
3786         const char *tdb_file;
3787         TDB_CONTEXT *tdb;
3788         TDB_DATA key, data;
3789         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3790
3791         if (argc < 3) {
3792                 usage();
3793         }
3794
3795         tdb_file = argv[0];
3796
3797         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
3798         if (tdb == NULL) {
3799                 printf("Failed to open TDB file %s\n", tdb_file);
3800                 return -1;
3801         }
3802
3803         if (!strncmp(argv[1], "0x", 2)) {
3804                 key = hextodata(tmp_ctx, argv[1] + 2);
3805                 if (key.dsize == 0) {
3806                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3807                         return -1;
3808                 }
3809         } else {
3810                 key.dptr  = discard_const(argv[1]);
3811                 key.dsize = strlen(argv[1]);
3812         }
3813
3814         if (!strncmp(argv[2], "0x", 2)) {
3815                 data = hextodata(tmp_ctx, argv[2] + 2);
3816                 if (data.dsize == 0) {
3817                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
3818                         return -1;
3819                 }
3820         } else {
3821                 data.dptr  = discard_const(argv[2]);
3822                 data.dsize = strlen(argv[2]);
3823         }
3824
3825         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3826                 printf("Not enough data. You must specify the full ctdb_ltdb_header too when storing\n");
3827                 return -1;
3828         }
3829         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
3830                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
3831                 tdb_close(tdb);
3832                 return -1;
3833         }
3834
3835         tdb_close(tdb);
3836
3837         talloc_free(tmp_ctx);
3838         return 0;
3839 }
3840
3841 /*
3842   write a record to a persistent database
3843  */
3844 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
3845 {
3846         const char *db_name;
3847         struct ctdb_db_context *ctdb_db;
3848         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3849         struct ctdb_transaction_handle *h;
3850         struct stat st;
3851         TDB_DATA key, data;
3852         int fd, ret;
3853
3854         if (argc < 3) {
3855                 talloc_free(tmp_ctx);
3856                 usage();
3857         }
3858
3859         fd = open(argv[2], O_RDONLY);
3860         if (fd == -1) {
3861                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
3862                 talloc_free(tmp_ctx);
3863                 return -1;
3864         }
3865         
3866         ret = fstat(fd, &st);
3867         if (ret == -1) {
3868                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
3869                 close(fd);
3870                 talloc_free(tmp_ctx);
3871                 return -1;
3872         }
3873
3874         if (!S_ISREG(st.st_mode)) {
3875                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
3876                 close(fd);
3877                 talloc_free(tmp_ctx);
3878                 return -1;
3879         }
3880
3881         data.dsize = st.st_size;
3882         if (data.dsize == 0) {
3883                 data.dptr  = NULL;
3884         } else {
3885                 data.dptr = talloc_size(tmp_ctx, data.dsize);
3886                 if (data.dptr == NULL) {
3887                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
3888                         close(fd);
3889                         talloc_free(tmp_ctx);
3890                         return -1;
3891                 }
3892                 ret = read(fd, data.dptr, data.dsize);
3893                 if (ret != data.dsize) {
3894                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
3895                         close(fd);
3896                         talloc_free(tmp_ctx);
3897                         return -1;
3898                 }
3899         }
3900         close(fd);
3901
3902
3903         db_name = argv[0];
3904
3905         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
3906         if (ctdb_db == NULL) {
3907                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3908                 talloc_free(tmp_ctx);
3909                 return -1;
3910         }
3911
3912         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3913         if (h == NULL) {
3914                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3915                 talloc_free(tmp_ctx);
3916                 return -1;
3917         }
3918
3919         key.dptr  = discard_const(argv[1]);
3920         key.dsize = strlen(argv[1]);
3921         ret = ctdb_transaction_store(h, key, data);
3922         if (ret != 0) {
3923                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
3924                 talloc_free(tmp_ctx);
3925                 return -1;
3926         }
3927
3928         ret = ctdb_transaction_commit(h);
3929         if (ret != 0) {
3930                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
3931                 talloc_free(tmp_ctx);
3932                 return -1;
3933         }
3934
3935
3936         talloc_free(tmp_ctx);
3937         return 0;
3938 }
3939
3940 /*
3941  * delete a record from a persistent database
3942  */
3943 static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
3944 {
3945         const char *db_name;
3946         struct ctdb_db_context *ctdb_db;
3947         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3948         struct ctdb_transaction_handle *h;
3949         TDB_DATA key;
3950         int ret;
3951         bool persistent;
3952         uint8_t flags;
3953
3954         if (argc < 2) {
3955                 talloc_free(tmp_ctx);
3956                 usage();
3957         }
3958
3959         db_name = argv[0];
3960
3961         if (!db_exists(ctdb, db_name, NULL, &flags)) {
3962                 talloc_free(tmp_ctx);
3963                 return -1;
3964         }
3965
3966         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
3967         if (!persistent) {
3968                 DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
3969                 talloc_free(tmp_ctx);
3970                 return -1;
3971         }
3972
3973         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3974         if (ctdb_db == NULL) {
3975                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
3976                 talloc_free(tmp_ctx);
3977                 return -1;
3978         }
3979
3980         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3981         if (h == NULL) {
3982                 DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
3983                 talloc_free(tmp_ctx);
3984                 return -1;
3985         }
3986
3987         key.dptr = discard_const(argv[1]);
3988         key.dsize = strlen(argv[1]);
3989         ret = ctdb_transaction_store(h, key, tdb_null);
3990         if (ret != 0) {
3991                 DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
3992                 talloc_free(tmp_ctx);
3993                 return -1;
3994         }
3995
3996         ret = ctdb_transaction_commit(h);
3997         if (ret != 0) {
3998                 DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
3999                 talloc_free(tmp_ctx);
4000                 return -1;
4001         }
4002
4003         talloc_free(tmp_ctx);
4004         return 0;
4005 }
4006
4007 /*
4008   check if a service is bound to a port or not
4009  */
4010 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
4011 {
4012         int s, ret;
4013         unsigned v;
4014         int port;
4015         struct sockaddr_in sin;
4016
4017         if (argc != 1) {
4018                 printf("Use: ctdb chktcport <port>\n");
4019                 return EINVAL;
4020         }
4021
4022         port = atoi(argv[0]);
4023
4024         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
4025         if (s == -1) {
4026                 printf("Failed to open local socket\n");
4027                 return errno;
4028         }
4029
4030         v = fcntl(s, F_GETFL, 0);
4031         fcntl(s, F_SETFL, v | O_NONBLOCK);
4032
4033         bzero(&sin, sizeof(sin));
4034         sin.sin_family = PF_INET;
4035         sin.sin_port   = htons(port);
4036         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
4037         close(s);
4038         if (ret == -1) {
4039                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
4040                 return errno;
4041         }
4042
4043         return 0;
4044 }
4045
4046
4047
4048 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4049                              TDB_DATA data, void *private_data)
4050 {
4051         DEBUG(DEBUG_ERR,("Log data received\n"));
4052         if (data.dsize > 0) {
4053                 printf("%s", data.dptr);
4054         }
4055
4056         exit(0);
4057 }
4058
4059 /*
4060   display a list of log messages from the in memory ringbuffer
4061  */
4062 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
4063 {
4064         int ret, i;
4065         bool main_daemon;
4066         struct ctdb_get_log_addr log_addr;
4067         TDB_DATA data;
4068         struct timeval tv;
4069
4070         /* Process options */
4071         main_daemon = true;
4072         log_addr.pnn = ctdb_get_pnn(ctdb);
4073         log_addr.level = DEBUG_NOTICE;
4074         for (i = 0; i < argc; i++) {
4075                 if (strcmp(argv[i], "recoverd") == 0) {
4076                         main_daemon = false;
4077                 } else {
4078                         if (isalpha(argv[i][0]) || argv[i][0] == '-') { 
4079                                 log_addr.level = get_debug_by_desc(argv[i]);
4080                         } else {
4081                                 log_addr.level = strtol(argv[i], NULL, 0);
4082                         }
4083                 }
4084         }
4085
4086         /* Our message port is our PID */
4087         log_addr.srvid = getpid();
4088
4089         data.dptr = (unsigned char *)&log_addr;
4090         data.dsize = sizeof(log_addr);
4091
4092         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
4093
4094         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
4095         sleep(1);
4096
4097         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
4098
4099         if (main_daemon) {
4100                 int32_t res;
4101                 char *errmsg;
4102                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4103
4104                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
4105                                    0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4106                 if (ret != 0 || res != 0) {
4107                         DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
4108                         talloc_free(tmp_ctx);
4109                         return -1;
4110                 }
4111                 talloc_free(tmp_ctx);
4112         } else {
4113                 ret = ctdb_client_send_message(ctdb, options.pnn,
4114                                                CTDB_SRVID_GETLOG, data);
4115                 if (ret != 0) {
4116                         DEBUG(DEBUG_ERR,("Failed to send getlog request message to %u\n", options.pnn));
4117                         return -1;
4118                 }
4119         }
4120
4121         tv = timeval_current();
4122         /* this loop will terminate when we have received the reply */
4123         while (timeval_elapsed(&tv) < (double)options.timelimit) {
4124                 event_loop_once(ctdb->ev);
4125         }
4126
4127         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
4128
4129         return 0;
4130 }
4131
4132 /*
4133   clear the in memory log area
4134  */
4135 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
4136 {
4137         int ret;
4138
4139         if (argc == 0 || (argc >= 1 && strcmp(argv[0], "recoverd") != 0)) {
4140                 /* "recoverd" not given - get logs from main daemon */
4141                 int32_t res;
4142                 char *errmsg;
4143                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4144
4145                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
4146                                    0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4147                 if (ret != 0 || res != 0) {
4148                         DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
4149                         talloc_free(tmp_ctx);
4150                         return -1;
4151                 }
4152
4153                 talloc_free(tmp_ctx);
4154         } else {
4155                 TDB_DATA data; /* unused in recoverd... */
4156                 data.dsize = 0;
4157
4158                 ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_CLEARLOG, data);
4159                 if (ret != 0) {
4160                         DEBUG(DEBUG_ERR,("Failed to send clearlog request message to %u\n", options.pnn));
4161                         return -1;
4162                 }
4163         }
4164
4165         return 0;
4166 }
4167
4168
4169 static uint32_t reloadips_finished;
4170
4171 static void reloadips_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4172                              TDB_DATA data, void *private_data)
4173 {
4174         reloadips_finished = 1;
4175 }
4176
4177 static int reloadips_all(struct ctdb_context *ctdb)
4178 {
4179         struct reloadips_all_reply rips;
4180         struct ctdb_node_map *nodemap=NULL;
4181         TDB_DATA data;
4182         uint32_t recmaster;
4183         int ret, i;
4184
4185         /* check that there are valid nodes available */
4186         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
4187                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4188                 return 1;
4189         }
4190         for (i=0; i<nodemap->num;i++) {
4191                 if (nodemap->nodes[i].flags != 0) {
4192                         DEBUG(DEBUG_ERR,("reloadips -n all  can only be used when all nodes are up and healthy. Aborting due to problem with node %d\n", i));
4193                         return 1;
4194                 }
4195         }
4196
4197         rips.pnn = ctdb_get_pnn(ctdb);
4198         rips.srvid = getpid();
4199
4200         /* register a message port for receiveing the reply so that we
4201            can receive the reply
4202         */
4203         ctdb_client_set_message_handler(ctdb, rips.srvid, reloadips_handler, NULL);
4204
4205         if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
4206                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node\n"));
4207                 return -1;
4208         }
4209
4210
4211         data.dptr = (uint8_t *)&rips;
4212         data.dsize = sizeof(rips);
4213
4214         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_RELOAD_ALL_IPS, data);
4215         if (ret != 0) {
4216                 DEBUG(DEBUG_ERR,("Failed to send reload all ips request message to %u\n", options.pnn));
4217                 return 1;
4218         }
4219
4220         reloadips_finished = 0;
4221         while (reloadips_finished == 0) {
4222                 event_loop_once(ctdb->ev);
4223         }
4224
4225         return 0;
4226 }
4227
4228 /*
4229   reload public ips on a specific node
4230  */
4231 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4232 {
4233         int ret;
4234         int32_t res;
4235         char *errmsg;
4236         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4237
4238         if (options.pnn == CTDB_BROADCAST_ALL) {
4239                 return reloadips_all(ctdb);
4240         }
4241
4242         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4243                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4244         if (ret != 0 || res != 0) {
4245                 DEBUG(DEBUG_ERR,("Failed to reload ips\n"));
4246                 talloc_free(tmp_ctx);
4247                 return -1;
4248         }
4249
4250         talloc_free(tmp_ctx);
4251         return 0;
4252 }
4253
4254 /*
4255   display a list of the databases on a remote ctdb
4256  */
4257 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4258 {
4259         int i, ret;
4260         struct ctdb_dbid_map *dbmap=NULL;
4261
4262         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4263         if (ret != 0) {
4264                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4265                 return ret;
4266         }
4267
4268         if(options.machinereadable){
4269                 printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4270                 for(i=0;i<dbmap->num;i++){
4271                         const char *path;
4272                         const char *name;
4273                         const char *health;
4274                         bool persistent;
4275                         bool readonly;
4276                         bool sticky;
4277
4278                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4279                                             dbmap->dbs[i].dbid, ctdb, &path);
4280                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4281                                             dbmap->dbs[i].dbid, ctdb, &name);
4282                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4283                                               dbmap->dbs[i].dbid, ctdb, &health);
4284                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4285                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4286                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4287                         printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4288                                dbmap->dbs[i].dbid, name, path,
4289                                !!(persistent), !!(sticky),
4290                                !!(health), !!(readonly));
4291                 }
4292                 return 0;
4293         }
4294
4295         printf("Number of databases:%d\n", dbmap->num);
4296         for(i=0;i<dbmap->num;i++){
4297                 const char *path;
4298                 const char *name;
4299                 const char *health;
4300                 bool persistent;
4301                 bool readonly;
4302                 bool sticky;
4303
4304                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4305                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4306                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4307                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4308                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4309                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4310                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4311                        dbmap->dbs[i].dbid, name, path,
4312                        persistent?" PERSISTENT":"",
4313                        sticky?" STICKY":"",
4314                        readonly?" READONLY":"",
4315                        health?" UNHEALTHY":"");
4316         }
4317
4318         return 0;
4319 }
4320
4321 /*
4322   display the status of a database on a remote ctdb
4323  */
4324 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4325 {
4326         const char *db_name;
4327         uint32_t db_id;
4328         uint8_t flags;
4329         const char *path;
4330         const char *health;
4331
4332         if (argc < 1) {
4333                 usage();
4334         }
4335
4336         db_name = argv[0];
4337
4338         if (!db_exists(ctdb, db_name, &db_id, &flags)) {
4339                 return -1;
4340         }
4341
4342         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
4343         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
4344         printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4345                db_id, db_name, path,
4346                (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
4347                (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
4348                (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
4349                (health ? health : "OK"));
4350
4351         return 0;
4352 }
4353
4354 /*
4355   check if the local node is recmaster or not
4356   it will return 1 if this node is the recmaster and 0 if it is not
4357   or if the local ctdb daemon could not be contacted
4358  */
4359 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4360 {
4361         uint32_t mypnn, recmaster;
4362
4363         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
4364         if (mypnn == -1) {
4365                 printf("Failed to get pnn of node\n");
4366                 return 1;
4367         }
4368
4369         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
4370                 printf("Failed to get the recmaster\n");
4371                 return 1;
4372         }
4373
4374         if (recmaster != mypnn) {
4375                 printf("this node is not the recmaster\n");
4376                 return 1;
4377         }
4378
4379         printf("this node is the recmaster\n");
4380         return 0;
4381 }
4382
4383 /*
4384   ping a node
4385  */
4386 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4387 {
4388         int ret;
4389         struct timeval tv = timeval_current();
4390         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4391         if (ret == -1) {
4392                 printf("Unable to get ping response from node %u\n", options.pnn);
4393                 return -1;
4394         } else {
4395                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4396                        options.pnn, timeval_elapsed(&tv), ret);
4397         }
4398         return 0;
4399 }
4400
4401
4402 /*
4403   get a node's runstate
4404  */
4405 static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
4406 {
4407         int ret;
4408         enum ctdb_runstate runstate;
4409
4410         ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
4411         if (ret == -1) {
4412                 printf("Unable to get runstate response from node %u\n",
4413                        options.pnn);
4414                 return -1;
4415         } else {
4416                 bool found = true;
4417                 enum ctdb_runstate t;
4418                 int i;
4419                 for (i=0; i<argc; i++) {
4420                         found = false;
4421                         t = runstate_from_string(argv[i]);
4422                         if (t == CTDB_RUNSTATE_UNKNOWN) {
4423                                 printf("Invalid run state (%s)\n", argv[i]);
4424                                 return -1;
4425                         }
4426
4427                         if (t == runstate) {
4428                                 found = true;
4429                                 break;
4430                         }
4431                 }
4432
4433                 if (!found) {
4434                         printf("CTDB not in required run state (got %s)\n", 
4435                                runstate_to_string((enum ctdb_runstate)runstate));
4436                         return -1;
4437                 }
4438         }
4439
4440         printf("%s\n", runstate_to_string(runstate));
4441         return 0;
4442 }
4443
4444
4445 /*
4446   get a tunable
4447  */
4448 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4449 {
4450         const char *name;
4451         uint32_t value;
4452         int ret;
4453
4454         if (argc < 1) {
4455                 usage();
4456         }
4457
4458         name = argv[0];
4459         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4460         if (ret != 0) {
4461                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4462                 return -1;
4463         }
4464
4465         printf("%-23s = %u\n", name, value);
4466         return 0;
4467 }
4468
4469 /*
4470   set a tunable
4471  */
4472 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4473 {
4474         const char *name;
4475         uint32_t value;
4476         int ret;
4477
4478         if (argc < 2) {
4479                 usage();
4480         }
4481
4482         name = argv[0];
4483         value = strtoul(argv[1], NULL, 0);
4484
4485         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4486         if (ret == -1) {
4487                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4488                 return -1;
4489         }
4490         return 0;
4491 }
4492
4493 /*
4494   list all tunables
4495  */
4496 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4497 {
4498         uint32_t count;
4499         const char **list;
4500         int ret, i;
4501
4502         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4503         if (ret == -1) {
4504                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4505                 return -1;
4506         }
4507
4508         for (i=0;i<count;i++) {
4509                 control_getvar(ctdb, 1, &list[i]);
4510         }
4511
4512         talloc_free(list);
4513         
4514         return 0;
4515 }
4516
4517 /*
4518   display debug level on a node
4519  */
4520 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4521 {
4522         int ret;
4523         int32_t level;
4524
4525         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4526         if (ret != 0) {
4527                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4528                 return ret;
4529         } else {
4530                 if (options.machinereadable){
4531                         printf(":Name:Level:\n");
4532                         printf(":%s:%d:\n",get_debug_by_level(level),level);
4533                 } else {
4534                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
4535                 }
4536         }
4537         return 0;
4538 }
4539
4540 /*
4541   display reclock file of a node
4542  */
4543 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4544 {
4545         int ret;
4546         const char *reclock;
4547
4548         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4549         if (ret != 0) {
4550                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4551                 return ret;
4552         } else {
4553                 if (options.machinereadable){
4554                         if (reclock != NULL) {
4555                                 printf("%s", reclock);
4556                         }
4557                 } else {
4558                         if (reclock == NULL) {
4559                                 printf("No reclock file used.\n");
4560                         } else {
4561                                 printf("Reclock file:%s\n", reclock);
4562                         }
4563                 }
4564         }
4565         return 0;
4566 }
4567
4568 /*
4569   set the reclock file of a node
4570  */
4571 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4572 {
4573         int ret;
4574         const char *reclock;
4575
4576         if (argc == 0) {
4577                 reclock = NULL;
4578         } else if (argc == 1) {
4579                 reclock = argv[0];
4580         } else {
4581                 usage();
4582         }
4583
4584         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4585         if (ret != 0) {
4586                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4587                 return ret;
4588         }
4589         return 0;
4590 }
4591
4592 /*
4593   set the natgw state on/off
4594  */
4595 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4596 {
4597         int ret;
4598         uint32_t natgwstate;
4599
4600         if (argc == 0) {
4601                 usage();
4602         }
4603
4604         if (!strcmp(argv[0], "on")) {
4605                 natgwstate = 1;
4606         } else if (!strcmp(argv[0], "off")) {
4607                 natgwstate = 0;
4608         } else {
4609                 usage();
4610         }
4611
4612         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4613         if (ret != 0) {
4614                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4615                 return ret;
4616         }
4617
4618         return 0;
4619 }
4620
4621 /*
4622   set the lmaster role on/off
4623  */
4624 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4625 {
4626         int ret;
4627         uint32_t lmasterrole;
4628
4629         if (argc == 0) {
4630                 usage();
4631         }
4632
4633         if (!strcmp(argv[0], "on")) {
4634                 lmasterrole = 1;
4635         } else if (!strcmp(argv[0], "off")) {
4636                 lmasterrole = 0;
4637         } else {
4638                 usage();
4639         }
4640
4641         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
4642         if (ret != 0) {
4643                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
4644                 return ret;
4645         }
4646
4647         return 0;
4648 }
4649
4650 /*
4651   set the recmaster role on/off
4652  */
4653 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4654 {
4655         int ret;
4656         uint32_t recmasterrole;
4657
4658         if (argc == 0) {
4659                 usage();
4660         }
4661
4662         if (!strcmp(argv[0], "on")) {
4663                 recmasterrole = 1;
4664         } else if (!strcmp(argv[0], "off")) {
4665                 recmasterrole = 0;
4666         } else {
4667                 usage();
4668         }
4669
4670         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
4671         if (ret != 0) {
4672                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
4673                 return ret;
4674         }
4675
4676         return 0;
4677 }
4678
4679 /*
4680   set debug level on a node or all nodes
4681  */
4682 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4683 {
4684         int i, ret;
4685         int32_t level;
4686
4687         if (argc == 0) {
4688                 printf("You must specify the debug level. Valid levels are:\n");
4689                 for (i=0; debug_levels[i].description != NULL; i++) {
4690                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4691                 }
4692
4693                 return 0;
4694         }
4695
4696         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
4697                 level = get_debug_by_desc(argv[0]);
4698         } else {
4699                 level = strtol(argv[0], NULL, 0);
4700         }
4701
4702         for (i=0; debug_levels[i].description != NULL; i++) {
4703                 if (level == debug_levels[i].level) {
4704                         break;
4705                 }
4706         }
4707         if (debug_levels[i].description == NULL) {
4708                 printf("Invalid debug level, must be one of\n");
4709                 for (i=0; debug_levels[i].description != NULL; i++) {
4710                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4711                 }
4712                 return -1;
4713         }
4714
4715         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
4716         if (ret != 0) {
4717                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
4718         }
4719         return 0;
4720 }
4721
4722
4723 /*
4724   thaw a node
4725  */
4726 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
4727 {
4728         int ret;
4729         uint32_t priority;
4730         
4731         if (argc == 1) {
4732                 priority = strtol(argv[0], NULL, 0);
4733         } else {
4734                 priority = 0;
4735         }
4736         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
4737
4738         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
4739         if (ret != 0) {
4740                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
4741         }               
4742         return 0;
4743 }
4744
4745
4746 /*
4747   attach to a database
4748  */
4749 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
4750 {
4751         const char *db_name;
4752         struct ctdb_db_context *ctdb_db;
4753         bool persistent = false;
4754
4755         if (argc < 1) {
4756                 usage();
4757         }
4758         db_name = argv[0];
4759         if (argc > 2) {
4760                 usage();
4761         }
4762         if (argc == 2) {
4763                 if (strcmp(argv[1], "persistent") != 0) {
4764                         usage();
4765                 }
4766                 persistent = true;
4767         }
4768
4769         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4770         if (ctdb_db == NULL) {
4771                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4772                 return -1;
4773         }
4774
4775         return 0;
4776 }
4777
4778 /*
4779   set db priority
4780  */
4781 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4782 {
4783         struct ctdb_db_priority db_prio;
4784         int ret;
4785
4786         if (argc < 2) {
4787                 usage();
4788         }
4789
4790         db_prio.db_id    = strtoul(argv[0], NULL, 0);
4791         db_prio.priority = strtoul(argv[1], NULL, 0);
4792
4793         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
4794         if (ret != 0) {
4795                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
4796                 return -1;
4797         }
4798
4799         return 0;
4800 }
4801
4802 /*
4803   get db priority
4804  */
4805 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4806 {
4807         uint32_t db_id, priority;
4808         int ret;
4809
4810         if (argc < 1) {
4811                 usage();
4812         }
4813
4814         if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
4815                 return -1;
4816         }
4817
4818         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
4819         if (ret != 0) {
4820                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
4821                 return -1;
4822         }
4823
4824         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
4825
4826         return 0;
4827 }
4828
4829 /*
4830   set the sticky records capability for a database
4831  */
4832 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
4833 {
4834         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4835         uint32_t db_id;
4836         int ret;
4837
4838         if (argc < 1) {
4839                 usage();
4840         }
4841
4842         if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
4843                 return -1;
4844         }
4845
4846         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
4847         if (ret != 0) {
4848                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
4849                 talloc_free(tmp_ctx);
4850                 return -1;
4851         }
4852
4853         talloc_free(tmp_ctx);
4854         return 0;
4855 }
4856
4857 /*
4858   set the readonly capability for a database
4859  */
4860 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
4861 {
4862         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4863         uint32_t db_id;
4864         int ret;
4865
4866         if (argc < 1) {
4867                 usage();
4868         }
4869
4870         if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
4871                 return -1;
4872         }
4873
4874         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
4875         if (ret != 0) {
4876                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
4877                 talloc_free(tmp_ctx);
4878                 return -1;
4879         }
4880
4881         talloc_free(tmp_ctx);
4882         return 0;
4883 }
4884
4885 /*
4886   get db seqnum
4887  */
4888 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
4889 {
4890         bool ret;
4891         uint32_t db_id;
4892         uint64_t seqnum;
4893
4894         if (argc < 1) {
4895                 usage();
4896         }
4897
4898         if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
4899                 return -1;
4900         }
4901
4902         ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &seqnum);
4903         if (!ret) {
4904                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
4905                 return -1;
4906         }
4907
4908         printf("Sequence number:%lld\n", (long long)seqnum);
4909
4910         return 0;
4911 }
4912
4913 /*
4914  * set db seqnum
4915  */
4916 static int control_setdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
4917 {
4918         bool ret;
4919         struct ctdb_db_context *ctdb_db;
4920         uint32_t db_id;
4921         uint8_t flags;
4922         uint64_t old_seqnum, new_seqnum;
4923         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4924         struct ctdb_transaction_handle *h;
4925         TDB_DATA key, data;
4926         bool persistent;
4927
4928         if (argc != 2) {
4929                 talloc_free(tmp_ctx);
4930                 usage();
4931         }
4932
4933         if (!db_exists(ctdb, argv[0], &db_id, &flags)) {
4934                 talloc_free(tmp_ctx);
4935                 return -1;
4936         }
4937
4938         persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
4939         if (!persistent) {
4940                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", argv[0]));
4941                 talloc_free(tmp_ctx);
4942                 return -1;
4943         }
4944
4945         ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &old_seqnum);
4946         if (!ret) {
4947                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
4948                 talloc_free(tmp_ctx);
4949                 return -1;
4950         }
4951
4952         new_seqnum = strtoull(argv[1], NULL, 0);
4953         if (new_seqnum <= old_seqnum) {
4954                 DEBUG(DEBUG_ERR, ("New sequence number is less than current sequence number\n"));
4955                 talloc_free(tmp_ctx);
4956                 return -1;
4957         }
4958
4959         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], persistent, 0);
4960         if (ctdb_db == NULL) {
4961                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
4962                 talloc_free(tmp_ctx);
4963                 return -1;
4964         }
4965
4966         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
4967         if (h == NULL) {
4968                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", argv[0]));
4969                 talloc_free(tmp_ctx);
4970                 return -1;
4971         }
4972
4973         key.dptr  = (uint8_t *)discard_const(CTDB_DB_SEQNUM_KEY);
4974         key.dsize = strlen(CTDB_DB_SEQNUM_KEY) + 1;
4975
4976         data.dsize = sizeof(new_seqnum);
4977         data.dptr = talloc_size(tmp_ctx, data.dsize);
4978         *data.dptr = new_seqnum;
4979
4980         ret = ctdb_transaction_store(h, key, data);
4981         if (ret != 0) {
4982                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
4983                 talloc_free(tmp_ctx);
4984                 return -1;
4985         }
4986
4987         ret = ctdb_transaction_commit(h);
4988         if (ret != 0) {
4989                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
4990                 talloc_free(tmp_ctx);
4991                 return -1;
4992         }
4993
4994         talloc_free(tmp_ctx);
4995         return 0;
4996 }
4997
4998 /*
4999   run an eventscript on a node
5000  */
5001 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
5002 {
5003         TDB_DATA data;
5004         int ret;
5005         int32_t res;
5006         char *errmsg;
5007         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5008
5009         if (argc != 1) {
5010                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5011                 return -1;
5012         }
5013
5014         data.dptr = (unsigned char *)discard_const(argv[0]);
5015         data.dsize = strlen((char *)data.dptr) + 1;
5016
5017         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
5018
5019         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
5020                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
5021         if (ret != 0 || res != 0) {
5022                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
5023                 talloc_free(tmp_ctx);
5024                 return -1;
5025         }
5026         talloc_free(tmp_ctx);
5027         return 0;
5028 }
5029
5030 #define DB_VERSION 1
5031 #define MAX_DB_NAME 64
5032 struct db_file_header {
5033         unsigned long version;
5034         time_t timestamp;
5035         unsigned long persistent;
5036         unsigned long size;
5037         const char name[MAX_DB_NAME];
5038 };
5039
5040 struct backup_data {
5041         struct ctdb_marshall_buffer *records;
5042         uint32_t len;
5043         uint32_t total;
5044         bool traverse_error;
5045 };
5046
5047 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
5048 {
5049         struct backup_data *bd = talloc_get_type(private, struct backup_data);
5050         struct ctdb_rec_data *rec;
5051
5052         /* add the record */
5053         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
5054         if (rec == NULL) {
5055                 bd->traverse_error = true;
5056                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
5057                 return -1;
5058         }
5059         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
5060         if (bd->records == NULL) {
5061                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
5062                 bd->traverse_error = true;
5063                 return -1;
5064         }
5065         bd->records->count++;
5066         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
5067         bd->len += rec->length;
5068         talloc_free(rec);
5069
5070         bd->total++;
5071         return 0;
5072 }
5073
5074 /*
5075  * backup a database to a file 
5076  */
5077 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
5078 {
5079         int ret;
5080         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5081         struct db_file_header dbhdr;
5082         struct ctdb_db_context *ctdb_db;
5083         struct backup_data *bd;
5084         int fh = -1;
5085         int status = -1;
5086         const char *reason = NULL;
5087         uint32_t db_id;
5088         uint8_t flags;
5089
5090         if (argc != 2) {
5091                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5092                 return -1;
5093         }
5094
5095         if (!db_exists(ctdb, argv[0], &db_id, &flags)) {
5096                 return -1;
5097         }
5098
5099         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
5100                                     db_id, tmp_ctx, &reason);
5101         if (ret != 0) {
5102                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
5103                                  argv[0]));
5104                 talloc_free(tmp_ctx);
5105                 return -1;
5106         }
5107         if (reason) {
5108                 uint32_t allow_unhealthy = 0;
5109
5110                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
5111                                       "AllowUnhealthyDBRead",
5112                                       &allow_unhealthy);
5113
5114                 if (allow_unhealthy != 1) {
5115                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
5116                                          argv[0], reason));
5117
5118                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
5119                                          allow_unhealthy));
5120                         talloc_free(tmp_ctx);
5121                         return -1;
5122                 }
5123
5124                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
5125                                      argv[0], argv[0]));
5126                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
5127                                      "tunnable AllowUnhealthyDBRead = %u\n",
5128                                      allow_unhealthy));
5129         }
5130
5131         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5132         if (ctdb_db == NULL) {
5133                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
5134                 talloc_free(tmp_ctx);
5135                 return -1;
5136         }
5137
5138
5139         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
5140         if (ret == -1) {
5141                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
5142                 talloc_free(tmp_ctx);
5143                 return -1;
5144         }
5145
5146
5147         bd = talloc_zero(tmp_ctx, struct backup_data);
5148         if (bd == NULL) {
5149                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
5150                 talloc_free(tmp_ctx);
5151                 return -1;
5152         }
5153
5154         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
5155         if (bd->records == NULL) {
5156                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
5157                 talloc_free(tmp_ctx);
5158                 return -1;
5159         }
5160
5161         bd->len = offsetof(struct ctdb_marshall_buffer, data);
5162         bd->records->db_id = ctdb_db->db_id;
5163         /* traverse the database collecting all records */
5164         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
5165             bd->traverse_error) {
5166                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5167                 talloc_free(tmp_ctx);
5168                 return -1;              
5169         }
5170
5171         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5172
5173
5174         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5175         if (fh == -1) {
5176                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5177                 talloc_free(tmp_ctx);
5178                 return -1;
5179         }
5180
5181         dbhdr.version = DB_VERSION;
5182         dbhdr.timestamp = time(NULL);
5183         dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
5184         dbhdr.size = bd->len;
5185         if (strlen(argv[0]) >= MAX_DB_NAME) {
5186                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5187                 goto done;
5188         }
5189         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
5190         ret = write(fh, &dbhdr, sizeof(dbhdr));
5191         if (ret == -1) {
5192                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5193                 goto done;
5194         }
5195         ret = write(fh, bd->records, bd->len);
5196         if (ret == -1) {
5197                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5198                 goto done;
5199         }
5200
5201         status = 0;
5202 done:
5203         if (fh != -1) {
5204                 ret = close(fh);
5205                 if (ret == -1) {
5206                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5207                 }
5208         }
5209
5210         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5211
5212         talloc_free(tmp_ctx);
5213         return status;
5214 }
5215
5216 /*
5217  * restore a database from a file 
5218  */
5219 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5220 {
5221         int ret;
5222         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5223         TDB_DATA outdata;
5224         TDB_DATA data;
5225         struct db_file_header dbhdr;
5226         struct ctdb_db_context *ctdb_db;
5227         struct ctdb_node_map *nodemap=NULL;
5228         struct ctdb_vnn_map *vnnmap=NULL;
5229         int i, fh;
5230         struct ctdb_control_wipe_database w;
5231         uint32_t *nodes;
5232         uint32_t generation;
5233         struct tm *tm;
5234         char tbuf[100];
5235         char *dbname;
5236
5237         if (argc < 1 || argc > 2) {
5238                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5239                 return -1;
5240         }
5241
5242         fh = open(argv[0], O_RDONLY);
5243         if (fh == -1) {
5244                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5245                 talloc_free(tmp_ctx);
5246                 return -1;
5247         }
5248
5249         read(fh, &dbhdr, sizeof(dbhdr));
5250         if (dbhdr.version != DB_VERSION) {
5251                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5252                 talloc_free(tmp_ctx);
5253                 return -1;
5254         }
5255
5256         dbname = discard_const(dbhdr.name);
5257         if (argc == 2) {
5258                 dbname = discard_const(argv[1]);
5259         }
5260
5261         outdata.dsize = dbhdr.size;
5262         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5263         if (outdata.dptr == NULL) {
5264                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5265                 close(fh);
5266                 talloc_free(tmp_ctx);
5267                 return -1;
5268         }               
5269         read(fh, outdata.dptr, outdata.dsize);
5270         close(fh);
5271
5272         tm = localtime(&dbhdr.timestamp);
5273         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5274         printf("Restoring database '%s' from backup @ %s\n",
5275                 dbname, tbuf);
5276
5277
5278         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5279         if (ctdb_db == NULL) {
5280                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5281                 talloc_free(tmp_ctx);
5282                 return -1;
5283         }
5284
5285         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5286         if (ret != 0) {
5287                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5288                 talloc_free(tmp_ctx);
5289                 return ret;
5290         }
5291
5292
5293         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5294         if (ret != 0) {
5295                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5296                 talloc_free(tmp_ctx);
5297                 return ret;
5298         }
5299
5300         /* freeze all nodes */
5301         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5302         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5303                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5304                                         nodes, i,
5305                                         TIMELIMIT(),
5306                                         false, tdb_null,
5307                                         NULL, NULL,
5308                                         NULL) != 0) {
5309                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5310                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5311                         talloc_free(tmp_ctx);
5312                         return -1;
5313                 }
5314         }
5315
5316         generation = vnnmap->generation;
5317         data.dptr = (void *)&generation;
5318         data.dsize = sizeof(generation);
5319
5320         /* start a cluster wide transaction */
5321         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5322         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5323                                         nodes, 0,
5324                                         TIMELIMIT(), false, data,
5325                                         NULL, NULL,
5326                                         NULL) != 0) {
5327                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5328                 return -1;
5329         }
5330
5331
5332         w.db_id = ctdb_db->db_id;
5333         w.transaction_id = generation;
5334
5335         data.dptr = (void *)&w;
5336         data.dsize = sizeof(w);
5337
5338         /* wipe all the remote databases. */
5339         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5340         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5341                                         nodes, 0,
5342                                         TIMELIMIT(), false, data,
5343                                         NULL, NULL,
5344                                         NULL) != 0) {
5345                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5346                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5347                 talloc_free(tmp_ctx);
5348                 return -1;
5349         }
5350         
5351         /* push the database */
5352         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5353         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5354                                         nodes, 0,
5355                                         TIMELIMIT(), false, outdata,
5356                                         NULL, NULL,
5357                                         NULL) != 0) {
5358                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5359                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5360                 talloc_free(tmp_ctx);
5361                 return -1;
5362         }
5363
5364         data.dptr = (void *)&ctdb_db->db_id;
5365         data.dsize = sizeof(ctdb_db->db_id);
5366
5367         /* mark the database as healthy */
5368         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5369         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5370                                         nodes, 0,
5371                                         TIMELIMIT(), false, data,
5372                                         NULL, NULL,
5373                                         NULL) != 0) {
5374                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5375                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5376                 talloc_free(tmp_ctx);
5377                 return -1;
5378         }
5379
5380         data.dptr = (void *)&generation;
5381         data.dsize = sizeof(generation);
5382
5383         /* commit all the changes */
5384         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5385                                         nodes, 0,
5386                                         TIMELIMIT(), false, data,
5387                                         NULL, NULL,
5388                                         NULL) != 0) {
5389                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5390                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5391                 talloc_free(tmp_ctx);
5392                 return -1;
5393         }
5394
5395
5396         /* thaw all nodes */
5397         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5398         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5399                                         nodes, 0,
5400                                         TIMELIMIT(),
5401                                         false, tdb_null,
5402                                         NULL, NULL,
5403                                         NULL) != 0) {
5404                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5405                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5406                 talloc_free(tmp_ctx);
5407                 return -1;
5408         }
5409
5410
5411         talloc_free(tmp_ctx);
5412         return 0;
5413 }
5414
5415 /*
5416  * dump a database backup from a file
5417  */
5418 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5419 {
5420         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5421         TDB_DATA outdata;
5422         struct db_file_header dbhdr;
5423         int i, fh;
5424         struct tm *tm;
5425         char tbuf[100];
5426         struct ctdb_rec_data *rec = NULL;
5427         struct ctdb_marshall_buffer *m;
5428         struct ctdb_dump_db_context c;
5429
5430         if (argc != 1) {
5431                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5432                 return -1;
5433         }
5434
5435         fh = open(argv[0], O_RDONLY);
5436         if (fh == -1) {
5437                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5438                 talloc_free(tmp_ctx);
5439                 return -1;
5440         }
5441
5442         read(fh, &dbhdr, sizeof(dbhdr));
5443         if (dbhdr.version != DB_VERSION) {
5444                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5445                 talloc_free(tmp_ctx);
5446                 return -1;
5447         }
5448
5449         outdata.dsize = dbhdr.size;
5450         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5451         if (outdata.dptr == NULL) {
5452                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5453                 close(fh);
5454                 talloc_free(tmp_ctx);
5455                 return -1;
5456         }
5457         read(fh, outdata.dptr, outdata.dsize);
5458         close(fh);
5459         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5460
5461         tm = localtime(&dbhdr.timestamp);
5462         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5463         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5464                 dbhdr.name, m->db_id, tbuf);
5465
5466         ZERO_STRUCT(c);
5467         c.f = stdout;
5468         c.printemptyrecords = (bool)options.printemptyrecords;
5469         c.printdatasize = (bool)options.printdatasize;
5470         c.printlmaster = false;
5471         c.printhash = (bool)options.printhash;
5472         c.printrecordflags = (bool)options.printrecordflags;
5473
5474         for (i=0; i < m->count; i++) {
5475                 uint32_t reqid = 0;
5476                 TDB_DATA key, data;
5477
5478                 /* we do not want the header splitted, so we pass NULL*/
5479                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5480                                               NULL, &key, &data);
5481
5482                 ctdb_dumpdb_record(ctdb, key, data, &c);
5483         }
5484
5485         printf("Dumped %d records\n", i);
5486         talloc_free(tmp_ctx);
5487         return 0;
5488 }
5489
5490 /*
5491  * wipe a database from a file
5492  */
5493 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5494                           const char **argv)
5495 {
5496         int ret;
5497         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5498         TDB_DATA data;
5499         struct ctdb_db_context *ctdb_db;
5500         struct ctdb_node_map *nodemap = NULL;
5501         struct ctdb_vnn_map *vnnmap = NULL;
5502         int i;
5503         struct ctdb_control_wipe_database w;
5504         uint32_t *nodes;
5505         uint32_t generation;
5506         uint8_t flags;
5507
5508         if (argc != 1) {
5509                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5510                 return -1;
5511         }
5512
5513         if (!db_exists(ctdb, argv[0], NULL, &flags)) {
5514                 return -1;
5515         }
5516
5517         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5518         if (ctdb_db == NULL) {
5519                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5520                                   argv[0]));
5521                 talloc_free(tmp_ctx);
5522                 return -1;
5523         }
5524
5525         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5526                                    &nodemap);
5527         if (ret != 0) {
5528                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5529                                   options.pnn));
5530                 talloc_free(tmp_ctx);
5531                 return ret;
5532         }
5533
5534         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5535                                   &vnnmap);
5536         if (ret != 0) {
5537                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5538                                   options.pnn));
5539                 talloc_free(tmp_ctx);
5540                 return ret;
5541         }
5542
5543         /* freeze all nodes */
5544         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5545         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5546                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5547                                                 nodes, i,
5548                                                 TIMELIMIT(),
5549                                                 false, tdb_null,
5550                                                 NULL, NULL,
5551                                                 NULL);
5552                 if (ret != 0) {
5553                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5554                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5555                                              CTDB_RECOVERY_ACTIVE);
5556                         talloc_free(tmp_ctx);
5557                         return -1;
5558                 }
5559         }
5560
5561         generation = vnnmap->generation;
5562         data.dptr = (void *)&generation;
5563         data.dsize = sizeof(generation);
5564
5565         /* start a cluster wide transaction */
5566         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5567         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5568                                         nodes, 0,
5569                                         TIMELIMIT(), false, data,
5570                                         NULL, NULL,
5571                                         NULL);
5572         if (ret!= 0) {
5573                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5574                                   "transactions.\n"));
5575                 return -1;
5576         }
5577
5578         w.db_id = ctdb_db->db_id;
5579         w.transaction_id = generation;
5580
5581         data.dptr = (void *)&w;
5582         data.dsize = sizeof(w);
5583
5584         /* wipe all the remote databases. */
5585         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5586         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5587                                         nodes, 0,
5588                                         TIMELIMIT(), false, data,
5589                                         NULL, NULL,
5590                                         NULL) != 0) {
5591                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5592                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5593                 talloc_free(tmp_ctx);
5594                 return -1;
5595         }
5596
5597         data.dptr = (void *)&ctdb_db->db_id;
5598         data.dsize = sizeof(ctdb_db->db_id);
5599
5600         /* mark the database as healthy */
5601         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5602         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5603                                         nodes, 0,
5604                                         TIMELIMIT(), false, data,
5605                                         NULL, NULL,
5606                                         NULL) != 0) {
5607                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5608                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5609                 talloc_free(tmp_ctx);
5610                 return -1;
5611         }
5612
5613         data.dptr = (void *)&generation;
5614         data.dsize = sizeof(generation);
5615
5616         /* commit all the changes */
5617         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5618                                         nodes, 0,
5619                                         TIMELIMIT(), false, data,
5620                                         NULL, NULL,
5621                                         NULL) != 0) {
5622                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5623                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5624                 talloc_free(tmp_ctx);
5625                 return -1;
5626         }
5627
5628         /* thaw all nodes */
5629         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5630         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5631                                         nodes, 0,
5632                                         TIMELIMIT(),
5633                                         false, tdb_null,
5634                                         NULL, NULL,
5635                                         NULL) != 0) {
5636                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5637                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5638                 talloc_free(tmp_ctx);
5639                 return -1;
5640         }
5641
5642         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
5643
5644         talloc_free(tmp_ctx);
5645         return 0;
5646 }
5647
5648 /*
5649   dump memory usage
5650  */
5651 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5652 {
5653         TDB_DATA data;
5654         int ret;
5655         int32_t res;
5656         char *errmsg;
5657         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5658         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
5659                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
5660         if (ret != 0 || res != 0) {
5661                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
5662                 talloc_free(tmp_ctx);
5663                 return -1;
5664         }
5665         write(1, data.dptr, data.dsize);
5666         talloc_free(tmp_ctx);
5667         return 0;
5668 }
5669
5670 /*
5671   handler for memory dumps
5672 */
5673 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5674                              TDB_DATA data, void *private_data)
5675 {
5676         write(1, data.dptr, data.dsize);
5677         exit(0);
5678 }
5679
5680 /*
5681   dump memory usage on the recovery daemon
5682  */
5683 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5684 {
5685         int ret;
5686         TDB_DATA data;
5687         struct rd_memdump_reply rd;
5688
5689         rd.pnn = ctdb_get_pnn(ctdb);
5690         rd.srvid = getpid();
5691
5692         /* register a message port for receiveing the reply so that we
5693            can receive the reply
5694         */
5695         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
5696
5697
5698         data.dptr = (uint8_t *)&rd;
5699         data.dsize = sizeof(rd);
5700
5701         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
5702         if (ret != 0) {
5703                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5704                 return -1;
5705         }
5706
5707         /* this loop will terminate when we have received the reply */
5708         while (1) {     
5709                 event_loop_once(ctdb->ev);
5710         }
5711
5712         return 0;
5713 }
5714
5715 /*
5716   send a message to a srvid
5717  */
5718 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
5719 {
5720         unsigned long srvid;
5721         int ret;
5722         TDB_DATA data;
5723
5724         if (argc < 2) {
5725                 usage();
5726         }
5727
5728         srvid      = strtoul(argv[0], NULL, 0);
5729
5730         data.dptr = (uint8_t *)discard_const(argv[1]);
5731         data.dsize= strlen(argv[1]);
5732
5733         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
5734         if (ret != 0) {
5735                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5736                 return -1;
5737         }
5738
5739         return 0;
5740 }
5741
5742 /*
5743   handler for msglisten
5744 */
5745 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5746                              TDB_DATA data, void *private_data)
5747 {
5748         int i;
5749
5750         printf("Message received: ");
5751         for (i=0;i<data.dsize;i++) {
5752                 printf("%c", data.dptr[i]);
5753         }
5754         printf("\n");
5755 }
5756
5757 /*
5758   listen for messages on a messageport
5759  */
5760 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
5761 {
5762         uint64_t srvid;
5763
5764         srvid = getpid();
5765
5766         /* register a message port and listen for messages
5767         */
5768         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
5769         printf("Listening for messages on srvid:%d\n", (int)srvid);
5770
5771         while (1) {     
5772                 event_loop_once(ctdb->ev);
5773         }
5774
5775         return 0;
5776 }
5777
5778 /*
5779   list all nodes in the cluster
5780   we parse the nodes file directly
5781  */
5782 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
5783 {
5784         TALLOC_CTX *mem_ctx = talloc_new(NULL);
5785         struct pnn_node *pnn_nodes;
5786         struct pnn_node *pnn_node;
5787
5788         pnn_nodes = read_nodes_file(mem_ctx);
5789         if (pnn_nodes == NULL) {
5790                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
5791                 talloc_free(mem_ctx);
5792                 return -1;
5793         }
5794
5795         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
5796                 ctdb_sock_addr addr;
5797                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
5798                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
5799                         talloc_free(mem_ctx);
5800                         return -1;
5801                 }
5802                 if (options.machinereadable){
5803                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
5804                 } else {
5805                         printf("%s\n", pnn_node->addr);
5806                 }
5807         }
5808         talloc_free(mem_ctx);
5809
5810         return 0;
5811 }
5812
5813 /*
5814   reload the nodes file on the local node
5815  */
5816 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
5817 {
5818         int i, ret;
5819         int mypnn;
5820         struct ctdb_node_map *nodemap=NULL;
5821
5822         mypnn = ctdb_get_pnn(ctdb);
5823
5824         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
5825         if (ret != 0) {
5826                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
5827                 return ret;
5828         }
5829
5830         /* reload the nodes file on all remote nodes */
5831         for (i=0;i<nodemap->num;i++) {
5832                 if (nodemap->nodes[i].pnn == mypnn) {
5833                         continue;
5834                 }
5835                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
5836                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
5837                         nodemap->nodes[i].pnn);
5838                 if (ret != 0) {
5839                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
5840                 }
5841         }
5842
5843         /* reload the nodes file on the local node */
5844         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
5845         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
5846         if (ret != 0) {
5847                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
5848         }
5849
5850         /* initiate a recovery */
5851         control_recover(ctdb, argc, argv);
5852
5853         return 0;
5854 }
5855
5856
5857 static const struct {
5858         const char *name;
5859         int (*fn)(struct ctdb_context *, int, const char **);
5860         bool auto_all;
5861         bool without_daemon; /* can be run without daemon running ? */
5862         const char *msg;
5863         const char *args;
5864 } ctdb_commands[] = {
5865         { "version",         control_version,           true,   true,   "show version of ctdb" },
5866         { "status",          control_status,            true,   false,  "show node status" },
5867         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
5868         { "ping",            control_ping,              true,   false,  "ping all nodes" },
5869         { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
5870         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
5871         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
5872         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
5873         { "statistics",      control_statistics,        false,  false, "show statistics" },
5874         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
5875         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
5876         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
5877         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
5878         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
5879         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
5880         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
5881         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
5882         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
5883         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
5884         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
5885         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
5886         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
5887         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
5888         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
5889         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
5890         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
5891         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
5892         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
5893         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
5894         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "[<level>] [recoverd]" },
5895         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer", "[recoverd]" },
5896         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
5897         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
5898         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
5899         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
5900         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
5901         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
5902         { "stop",            control_stop,              true,   false,  "stop a node" },
5903         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
5904         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
5905         { "unban",           control_unban,             true,   false,  "unban a node" },
5906         { "showban",         control_showban,           true,   false,  "show ban information"},
5907         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
5908         { "recover",         control_recover,           true,   false,  "force recovery" },
5909         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
5910         { "ipreallocate",    control_ipreallocate,      false,  false,  "force the recovery daemon to perform a ip reallocation procedure" },
5911         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
5912         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
5913         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "[<srcip:port> <dstip:port>]" },
5914         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
5915         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
5916         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
5917         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
5918
5919         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
5920
5921         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
5922         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
5923         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
5924         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
5925         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
5926         { "vacuum",          ctdb_vacuum,               false,  true, "vacuum the databases of empty records", "[max_records]"},
5927         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
5928         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
5929         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
5930         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
5931         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
5932         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
5933         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
5934         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
5935         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
5936         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
5937         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
5938         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
5939         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
5940         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
5941         { "enablescript",     control_enablescript,  true,      false, "enable an eventscript", "<script>"},
5942         { "disablescript",    control_disablescript,  true,     false, "disable an eventscript", "<script>"},
5943         { "natgwlist",        control_natgwlist,        true,   false, "show the nodes belonging to this natgw configuration"},
5944         { "xpnn",             control_xpnn,             false,  true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
5945         { "getreclock",       control_getreclock,       true,   false, "Show the reclock file of a node"},
5946         { "setreclock",       control_setreclock,       true,   false, "Set/clear the reclock file of a node", "[filename]"},
5947         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
5948         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
5949         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
5950         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
5951         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
5952         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
5953         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
5954         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
5955         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
5956         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
5957         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
5958         { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
5959         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
5960         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data+header>" },
5961         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
5962         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
5963         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
5964         { "rebalancenode",     control_rebalancenode,   false,  false, "release a node by allowing it to takeover ips", "<pnn>"},
5965         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
5966         { "setdbseqnum",     control_setdbseqnum,       false,  false, "set the sequence number for a database", "<dbname|dbid> <seqnum>" },
5967         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status" },
5968         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
5969         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on a node" },
5970         { "ipiface",         control_ipiface,           false,  true,  "Find which interface an ip address is hsoted on", "<ip>" },
5971 };
5972
5973 /*
5974   show usage message
5975  */
5976 static void usage(void)
5977 {
5978         int i;
5979         printf(
5980 "Usage: ctdb [options] <control>\n" \
5981 "Options:\n" \
5982 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
5983 "   -Y                 generate machinereadable output\n"
5984 "   -v                 generate verbose output\n"
5985 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
5986         printf("Controls:\n");
5987         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
5988                 printf("  %-15s %-27s  %s\n", 
5989                        ctdb_commands[i].name, 
5990                        ctdb_commands[i].args?ctdb_commands[i].args:"",
5991                        ctdb_commands[i].msg);
5992         }
5993         exit(1);
5994 }
5995
5996
5997 static void ctdb_alarm(int sig)
5998 {
5999         printf("Maximum runtime exceeded - exiting\n");
6000         _exit(ERR_TIMEOUT);
6001 }
6002
6003 /*
6004   main program
6005 */
6006 int main(int argc, const char *argv[])
6007 {
6008         struct ctdb_context *ctdb;
6009         char *nodestring = NULL;
6010         struct poptOption popt_options[] = {
6011                 POPT_AUTOHELP
6012                 POPT_CTDB_CMDLINE
6013                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
6014                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
6015                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
6016                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
6017                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
6018                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
6019                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
6020                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
6021                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
6022                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
6023                 POPT_TABLEEND
6024         };
6025         int opt;
6026         const char **extra_argv;
6027         int extra_argc = 0;
6028         int ret=-1, i;
6029         poptContext pc;
6030         struct event_context *ev;
6031         const char *control;
6032         const char *socket_name;
6033
6034         setlinebuf(stdout);
6035         
6036         /* set some defaults */
6037         options.maxruntime = 0;
6038         options.timelimit = 10;
6039         options.pnn = CTDB_CURRENT_NODE;
6040
6041         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
6042
6043         while ((opt = poptGetNextOpt(pc)) != -1) {
6044                 switch (opt) {
6045                 default:
6046                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
6047                                 poptBadOption(pc, 0), poptStrerror(opt)));
6048                         exit(1);
6049                 }
6050         }
6051
6052         /* setup the remaining options for the main program to use */
6053         extra_argv = poptGetArgs(pc);
6054         if (extra_argv) {
6055                 extra_argv++;
6056                 while (extra_argv[extra_argc]) extra_argc++;
6057         }
6058
6059         if (extra_argc < 1) {
6060                 usage();
6061         }
6062
6063         if (options.maxruntime == 0) {
6064                 const char *ctdb_timeout;
6065                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6066                 if (ctdb_timeout != NULL) {
6067                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6068                 } else {
6069                         /* default timeout is 120 seconds */
6070                         options.maxruntime = 120;
6071                 }
6072         }
6073
6074         signal(SIGALRM, ctdb_alarm);
6075         alarm(options.maxruntime);
6076
6077         control = extra_argv[0];
6078
6079         ev = event_context_init(NULL);
6080         if (!ev) {
6081                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
6082                 exit(1);
6083         }
6084
6085         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6086                 if (strcmp(control, ctdb_commands[i].name) == 0) {
6087                         break;
6088                 }
6089         }
6090
6091         if (i == ARRAY_SIZE(ctdb_commands)) {
6092                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
6093                 exit(1);
6094         }
6095
6096         if (ctdb_commands[i].without_daemon == true) {
6097                 if (nodestring != NULL) {
6098                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
6099                         exit(1);
6100                 }
6101                 close(2);
6102                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
6103         }
6104
6105         /* initialise ctdb */
6106         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
6107
6108         if (ctdb == NULL) {
6109                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
6110                 exit(1);
6111         }
6112
6113         /* initialize a libctdb connection as well */
6114         socket_name = ctdb_get_socketname(ctdb);
6115         ctdb_connection = ctdb_connect(socket_name,
6116                                        ctdb_log_file, stderr);
6117         if (ctdb_connection == NULL) {
6118                 DEBUG(DEBUG_ERR, ("Failed to connect to daemon from libctdb\n"));
6119                 exit(1);
6120         }                               
6121
6122         /* setup the node number(s) to contact */
6123         if (!parse_nodestring(ctdb, nodestring, CTDB_CURRENT_NODE, false,
6124                               &options.nodes, &options.pnn)) {
6125                 usage();
6126         }
6127
6128         if (options.pnn == CTDB_CURRENT_NODE) {
6129                 options.pnn = options.nodes[0];
6130         }
6131
6132         if (ctdb_commands[i].auto_all && 
6133             ((options.pnn == CTDB_BROADCAST_ALL) ||
6134              (options.pnn == CTDB_MULTICAST))) {
6135                 int j;
6136
6137                 ret = 0;
6138                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6139                         options.pnn = options.nodes[j];
6140                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6141                 }
6142         } else {
6143                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6144         }
6145
6146         ctdb_disconnect(ctdb_connection);
6147         talloc_free(ctdb);
6148         talloc_free(ev);
6149         (void)poptFreeContext(pc);
6150
6151         return ret;
6152
6153 }