tools/ctdb: Add pdelete command to delete a record from persistent database
[obnox/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/version.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_client.h"
31 #include "../include/ctdb_private.h"
32 #include "../common/rb_tree.h"
33 #include "db_wrap.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 struct ctdb_connection *ctdb_connection;
40
41 static void usage(void);
42
43 static struct {
44         int timelimit;
45         uint32_t pnn;
46         uint32_t *nodes;
47         int machinereadable;
48         int verbose;
49         int maxruntime;
50         int printemptyrecords;
51         int printdatasize;
52         int printlmaster;
53         int printhash;
54         int printrecordflags;
55 } options;
56
57 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
58 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
59
60 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
61 {
62         printf("CTDB version: %s\n", CTDB_VERSION_STRING);
63         return 0;
64 }
65
66 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
67                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
68                                    "Out of memory in " __location__ )); \
69                 abort();                                                \
70         }} while (0)
71
72 /* Pretty print the flags to a static buffer in human-readable format.
73  * This never returns NULL!
74  */
75 static const char *pretty_print_flags(uint32_t flags)
76 {
77         int j;
78         static const struct {
79                 uint32_t flag;
80                 const char *name;
81         } flag_names[] = {
82                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
83                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
84                 { NODE_FLAGS_BANNED,                "BANNED" },
85                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
86                 { NODE_FLAGS_DELETED,               "DELETED" },
87                 { NODE_FLAGS_STOPPED,               "STOPPED" },
88                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
89         };
90         static char flags_str[512]; /* Big enough to contain all flag names */
91
92         flags_str[0] = '\0';
93         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
94                 if (flags & flag_names[j].flag) {
95                         if (flags_str[0] == '\0') {
96                                 (void) strcpy(flags_str, flag_names[j].name);
97                         } else {
98                                 (void) strcat(flags_str, "|");
99                                 (void) strcat(flags_str, flag_names[j].name);
100                         }
101                 }
102         }
103         if (flags_str[0] == '\0') {
104                 (void) strcpy(flags_str, "OK");
105         }
106
107         return flags_str;
108 }
109
110 static int h2i(char h)
111 {
112         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
113         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
114         return h - '0';
115 }
116
117 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
118 {
119         int i, len;
120         TDB_DATA key = {NULL, 0};
121
122         len = strlen(str);
123         if (len & 0x01) {
124                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
125                 return key;
126         }
127
128         key.dsize = len>>1;
129         key.dptr  = talloc_size(mem_ctx, key.dsize);
130
131         for (i=0; i < len/2; i++) {
132                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
133         }
134         return key;
135 }
136
137 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
138  * that are disconnected or deleted.  If dd_ok is true those nodes are
139  * included in the output list of nodes.  If dd_ok is false, those
140  * nodes are filtered from the "all" case and cause an error if
141  * explicitly specified.
142  */
143 static bool parse_nodestring(struct ctdb_context *ctdb,
144                              const char * nodestring,
145                              uint32_t current_pnn,
146                              bool dd_ok,
147                              uint32_t **nodes,
148                              uint32_t *pnn_mode)
149 {
150         int n;
151         uint32_t i;
152         struct ctdb_node_map *nodemap;
153        
154         *nodes = NULL;
155
156         if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
157                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
158                 exit(10);
159         }
160
161         if (nodestring != NULL) {
162                 *nodes = talloc_array(ctdb, uint32_t, 0);
163                 CTDB_NOMEM_ABORT(*nodes);
164                
165                 n = 0;
166
167                 if (strcmp(nodestring, "all") == 0) {
168                         *pnn_mode = CTDB_BROADCAST_ALL;
169
170                         /* all */
171                         for (i = 0; i < nodemap->num; i++) {
172                                 if ((nodemap->nodes[i].flags & 
173                                      (NODE_FLAGS_DISCONNECTED |
174                                       NODE_FLAGS_DELETED)) && !dd_ok) {
175                                         continue;
176                                 }
177                                 *nodes = talloc_realloc(ctdb, *nodes,
178                                                         uint32_t, n+1);
179                                 CTDB_NOMEM_ABORT(*nodes);
180                                 (*nodes)[n] = i;
181                                 n++;
182                         }
183                 } else {
184                         /* x{,y...} */
185                         char *ns, *tok;
186                        
187                         ns = talloc_strdup(ctdb, nodestring);
188                         tok = strtok(ns, ",");
189                         while (tok != NULL) {
190                                 uint32_t pnn;
191                                 i = (uint32_t)strtoul(tok, NULL, 0);
192                                 if (i >= nodemap->num) {
193                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
194                                         exit(ERR_NONODE);
195                                 }
196                                 if ((nodemap->nodes[i].flags & 
197                                      (NODE_FLAGS_DISCONNECTED |
198                                       NODE_FLAGS_DELETED)) && !dd_ok) {
199                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
200                                         exit(ERR_DISNODE);
201                                 }
202                                 if (!ctdb_getpnn(ctdb_connection, i, &pnn)) {
203                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
204                                         exit(10);
205                                 }
206
207                                 *nodes = talloc_realloc(ctdb, *nodes,
208                                                         uint32_t, n+1);
209                                 CTDB_NOMEM_ABORT(*nodes);
210
211                                 (*nodes)[n] = i;
212                                 n++;
213
214                                 tok = strtok(NULL, ",");
215                         }
216                         talloc_free(ns);
217
218                         if (n == 1) {
219                                 *pnn_mode = (*nodes)[0];
220                         } else {
221                                 *pnn_mode = CTDB_MULTICAST;
222                         }
223                 }
224         } else {
225                 /* default - no nodes specified */
226                 *nodes = talloc_array(ctdb, uint32_t, 1);
227                 CTDB_NOMEM_ABORT(*nodes);
228                 *pnn_mode = CTDB_CURRENT_NODE;
229
230                 if (!ctdb_getpnn(ctdb_connection, current_pnn,
231                                  &((*nodes)[0]))) {
232                         return false;
233                 }
234         }
235
236         ctdb_free_nodemap(nodemap);
237
238         return true;
239 }
240
241 /*
242  check if a database exists
243 */
244 static int db_exists(struct ctdb_context *ctdb, const char *db_name, bool *persistent)
245 {
246         int i, ret;
247         struct ctdb_dbid_map *dbmap=NULL;
248
249         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
250         if (ret != 0) {
251                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
252                 return -1;
253         }
254
255         for(i=0;i<dbmap->num;i++){
256                 const char *name;
257
258                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
259                 if (!strcmp(name, db_name)) {
260                         if (persistent) {
261                                 *persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
262                         }
263                         return 0;
264                 }
265         }
266
267         return -1;
268 }
269
270 /*
271   see if a process exists
272  */
273 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
274 {
275         uint32_t pnn, pid;
276         int ret;
277         if (argc < 1) {
278                 usage();
279         }
280
281         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
282                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
283                 return -1;
284         }
285
286         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
287         if (ret == 0) {
288                 printf("%u:%u exists\n", pnn, pid);
289         } else {
290                 printf("%u:%u does not exist\n", pnn, pid);
291         }
292         return ret;
293 }
294
295 /*
296   display statistics structure
297  */
298 static void show_statistics(struct ctdb_statistics *s, int show_header)
299 {
300         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
301         int i;
302         const char *prefix=NULL;
303         int preflen=0;
304         int tmp, days, hours, minutes, seconds;
305         const struct {
306                 const char *name;
307                 uint32_t offset;
308         } fields[] = {
309 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
310                 STATISTICS_FIELD(num_clients),
311                 STATISTICS_FIELD(frozen),
312                 STATISTICS_FIELD(recovering),
313                 STATISTICS_FIELD(num_recoveries),
314                 STATISTICS_FIELD(client_packets_sent),
315                 STATISTICS_FIELD(client_packets_recv),
316                 STATISTICS_FIELD(node_packets_sent),
317                 STATISTICS_FIELD(node_packets_recv),
318                 STATISTICS_FIELD(keepalive_packets_sent),
319                 STATISTICS_FIELD(keepalive_packets_recv),
320                 STATISTICS_FIELD(node.req_call),
321                 STATISTICS_FIELD(node.reply_call),
322                 STATISTICS_FIELD(node.req_dmaster),
323                 STATISTICS_FIELD(node.reply_dmaster),
324                 STATISTICS_FIELD(node.reply_error),
325                 STATISTICS_FIELD(node.req_message),
326                 STATISTICS_FIELD(node.req_control),
327                 STATISTICS_FIELD(node.reply_control),
328                 STATISTICS_FIELD(client.req_call),
329                 STATISTICS_FIELD(client.req_message),
330                 STATISTICS_FIELD(client.req_control),
331                 STATISTICS_FIELD(timeouts.call),
332                 STATISTICS_FIELD(timeouts.control),
333                 STATISTICS_FIELD(timeouts.traverse),
334                 STATISTICS_FIELD(locks.num_calls),
335                 STATISTICS_FIELD(locks.num_current),
336                 STATISTICS_FIELD(locks.num_pending),
337                 STATISTICS_FIELD(locks.num_failed),
338                 STATISTICS_FIELD(total_calls),
339                 STATISTICS_FIELD(pending_calls),
340                 STATISTICS_FIELD(childwrite_calls),
341                 STATISTICS_FIELD(pending_childwrite_calls),
342                 STATISTICS_FIELD(memory_used),
343                 STATISTICS_FIELD(max_hop_count),
344                 STATISTICS_FIELD(total_ro_delegations),
345                 STATISTICS_FIELD(total_ro_revokes),
346         };
347         
348         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
349         seconds = tmp%60;
350         tmp    /= 60;
351         minutes = tmp%60;
352         tmp    /= 60;
353         hours   = tmp%24;
354         tmp    /= 24;
355         days    = tmp;
356
357         if (options.machinereadable){
358                 if (show_header) {
359                         printf("CTDB version:");
360                         printf("Current time of statistics:");
361                         printf("Statistics collected since:");
362                         for (i=0;i<ARRAY_SIZE(fields);i++) {
363                                 printf("%s:", fields[i].name);
364                         }
365                         printf("num_reclock_ctdbd_latency:");
366                         printf("min_reclock_ctdbd_latency:");
367                         printf("avg_reclock_ctdbd_latency:");
368                         printf("max_reclock_ctdbd_latency:");
369
370                         printf("num_reclock_recd_latency:");
371                         printf("min_reclock_recd_latency:");
372                         printf("avg_reclock_recd_latency:");
373                         printf("max_reclock_recd_latency:");
374
375                         printf("num_call_latency:");
376                         printf("min_call_latency:");
377                         printf("avg_call_latency:");
378                         printf("max_call_latency:");
379
380                         printf("num_lockwait_latency:");
381                         printf("min_lockwait_latency:");
382                         printf("avg_lockwait_latency:");
383                         printf("max_lockwait_latency:");
384
385                         printf("num_childwrite_latency:");
386                         printf("min_childwrite_latency:");
387                         printf("avg_childwrite_latency:");
388                         printf("max_childwrite_latency:");
389                         printf("\n");
390                 }
391                 printf("%d:", CTDB_VERSION);
392                 printf("%d:", (int)s->statistics_current_time.tv_sec);
393                 printf("%d:", (int)s->statistics_start_time.tv_sec);
394                 for (i=0;i<ARRAY_SIZE(fields);i++) {
395                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
396                 }
397                 printf("%d:", s->reclock.ctdbd.num);
398                 printf("%.6f:", s->reclock.ctdbd.min);
399                 printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
400                 printf("%.6f:", s->reclock.ctdbd.max);
401
402                 printf("%d:", s->reclock.recd.num);
403                 printf("%.6f:", s->reclock.recd.min);
404                 printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
405                 printf("%.6f:", s->reclock.recd.max);
406
407                 printf("%d:", s->call_latency.num);
408                 printf("%.6f:", s->call_latency.min);
409                 printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
410                 printf("%.6f:", s->call_latency.max);
411
412                 printf("%d:", s->childwrite_latency.num);
413                 printf("%.6f:", s->childwrite_latency.min);
414                 printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
415                 printf("%.6f:", s->childwrite_latency.max);
416                 printf("\n");
417         } else {
418                 printf("CTDB version %u\n", CTDB_VERSION);
419                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
420                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
421
422                 for (i=0;i<ARRAY_SIZE(fields);i++) {
423                         if (strchr(fields[i].name, '.')) {
424                                 preflen = strcspn(fields[i].name, ".")+1;
425                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
426                                         prefix = fields[i].name;
427                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
428                                 }
429                         } else {
430                                 preflen = 0;
431                         }
432                         printf(" %*s%-22s%*s%10u\n", 
433                                preflen?4:0, "",
434                                fields[i].name+preflen, 
435                                preflen?0:4, "",
436                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
437                 }
438                 printf(" hop_count_buckets:");
439                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
440                         printf(" %d", s->hop_count_bucket[i]);
441                 }
442                 printf("\n");
443                 printf(" lock_buckets:");
444                 for (i=0; i<MAX_COUNT_BUCKETS; i++) {
445                         printf(" %d", s->locks.buckets[i]);
446                 }
447                 printf("\n");
448                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
449
450                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
451
452                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
453
454                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
455                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
456         }
457
458         talloc_free(tmp_ctx);
459 }
460
461 /*
462   display remote ctdb statistics combined from all nodes
463  */
464 static int control_statistics_all(struct ctdb_context *ctdb)
465 {
466         int ret, i;
467         struct ctdb_statistics statistics;
468         uint32_t *nodes;
469         uint32_t num_nodes;
470
471         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
472         CTDB_NO_MEMORY(ctdb, nodes);
473         
474         ZERO_STRUCT(statistics);
475
476         for (i=0;i<num_nodes;i++) {
477                 struct ctdb_statistics s1;
478                 int j;
479                 uint32_t *v1 = (uint32_t *)&s1;
480                 uint32_t *v2 = (uint32_t *)&statistics;
481                 uint32_t num_ints = 
482                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
483                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
484                 if (ret != 0) {
485                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
486                         return ret;
487                 }
488                 for (j=0;j<num_ints;j++) {
489                         v2[j] += v1[j];
490                 }
491                 statistics.max_hop_count = 
492                         MAX(statistics.max_hop_count, s1.max_hop_count);
493                 statistics.call_latency.max = 
494                         MAX(statistics.call_latency.max, s1.call_latency.max);
495         }
496         talloc_free(nodes);
497         printf("Gathered statistics for %u nodes\n", num_nodes);
498         show_statistics(&statistics, 1);
499         return 0;
500 }
501
502 /*
503   display remote ctdb statistics
504  */
505 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
506 {
507         int ret;
508         struct ctdb_statistics statistics;
509
510         if (options.pnn == CTDB_BROADCAST_ALL) {
511                 return control_statistics_all(ctdb);
512         }
513
514         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
515         if (ret != 0) {
516                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
517                 return ret;
518         }
519         show_statistics(&statistics, 1);
520         return 0;
521 }
522
523
524 /*
525   reset remote ctdb statistics
526  */
527 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
528 {
529         int ret;
530
531         ret = ctdb_statistics_reset(ctdb, options.pnn);
532         if (ret != 0) {
533                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
534                 return ret;
535         }
536         return 0;
537 }
538
539
540 /*
541   display remote ctdb rolling statistics
542  */
543 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
544 {
545         int ret;
546         struct ctdb_statistics_wire *stats;
547         int i, num_records = -1;
548
549         if (argc ==1) {
550                 num_records = atoi(argv[0]) - 1;
551         }
552
553         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
554         if (ret != 0) {
555                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
556                 return ret;
557         }
558         for (i=0;i<stats->num;i++) {
559                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
560                         continue;
561                 }
562                 show_statistics(&stats->stats[i], i==0);
563                 if (i == num_records) {
564                         break;
565                 }
566         }
567         return 0;
568 }
569
570
571 /*
572   display remote ctdb db statistics
573  */
574 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
575 {
576         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
577         struct ctdb_db_statistics *dbstat;
578         struct ctdb_dbid_map *dbmap=NULL;
579         int i, ret;
580
581         if (argc < 1) {
582                 usage();
583         }
584
585         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
586         if (ret != 0) {
587                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
588                 return ret;
589         }
590         for(i=0;i<dbmap->num;i++){
591                 const char *name;
592
593                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
594                 if(!strcmp(argv[0], name)){
595                         talloc_free(discard_const(name));
596                         break;
597                 }
598                 talloc_free(discard_const(name));
599         }
600         if (i == dbmap->num) {
601                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
602                 talloc_free(tmp_ctx);
603                 return -1;
604         }
605
606         if (!ctdb_getdbstat(ctdb_connection, options.pnn, dbmap->dbs[i].dbid, &dbstat)) {
607                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
608                 talloc_free(tmp_ctx);
609                 return -1;
610         }
611
612         printf("DB Statistics:\n");
613         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
614                 dbstat->db_ro_delegations);
615         printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
616                 dbstat->db_ro_delegations);
617         printf(" %s\n", "locks");
618         printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
619                 dbstat->locks.num_calls);
620         printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
621                 dbstat->locks.num_failed);
622         printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
623                 dbstat->locks.num_current);
624         printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
625                 dbstat->locks.num_pending);
626         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
627                 "    latency_ctdbd  MIN/AVG/MAX",
628                 dbstat->locks.latency.min,
629                 (dbstat->locks.latency.num ?
630                  dbstat->locks.latency.total /dbstat->locks.latency.num :
631                  0.0),
632                 dbstat->locks.latency.max,
633                 dbstat->locks.latency.num);
634         printf(" %s", "    buckets:");
635         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
636                 printf(" %d", dbstat->hop_count_bucket[i]);
637         }
638         printf("\n");
639         printf("Num Hot Keys:     %d\n", dbstat->num_hot_keys);
640         for (i = 0; i < dbstat->num_hot_keys; i++) {
641                 int j;
642                 printf("Count:%d Key:", dbstat->hot_keys[i].count);
643                 for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
644                         printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
645                 }
646                 printf("\n");
647         }
648
649         ctdb_free_dbstat(dbstat);
650         return 0;
651 }
652
653 /*
654   display uptime of remote node
655  */
656 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
657 {
658         int ret;
659         struct ctdb_uptime *uptime = NULL;
660         int tmp, days, hours, minutes, seconds;
661
662         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
663         if (ret != 0) {
664                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
665                 return ret;
666         }
667
668         if (options.machinereadable){
669                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
670                 printf(":%u:%u:%u:%lf\n",
671                         (unsigned int)uptime->current_time.tv_sec,
672                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
673                         (unsigned int)uptime->last_recovery_finished.tv_sec,
674                         timeval_delta(&uptime->last_recovery_finished,
675                                       &uptime->last_recovery_started)
676                 );
677                 return 0;
678         }
679
680         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
681
682         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
683         seconds = tmp%60;
684         tmp    /= 60;
685         minutes = tmp%60;
686         tmp    /= 60;
687         hours   = tmp%24;
688         tmp    /= 24;
689         days    = tmp;
690         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
691
692         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
693         seconds = tmp%60;
694         tmp    /= 60;
695         minutes = tmp%60;
696         tmp    /= 60;
697         hours   = tmp%24;
698         tmp    /= 24;
699         days    = tmp;
700         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
701         
702         printf("Duration of last recovery/failover: %lf seconds\n",
703                 timeval_delta(&uptime->last_recovery_finished,
704                               &uptime->last_recovery_started));
705
706         return 0;
707 }
708
709 /*
710   show the PNN of the current node
711  */
712 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
713 {
714         uint32_t mypnn;
715         bool ret;
716
717         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
718         if (!ret) {
719                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
720                 return -1;
721         }
722
723         printf("PNN:%d\n", mypnn);
724         return 0;
725 }
726
727
728 struct pnn_node {
729         struct pnn_node *next;
730         const char *addr;
731         int pnn;
732 };
733
734 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
735 {
736         const char *nodes_list;
737         int nlines;
738         char **lines;
739         int i, pnn;
740         struct pnn_node *pnn_nodes = NULL;
741         struct pnn_node *pnn_node;
742         struct pnn_node *tmp_node;
743
744         /* read the nodes file */
745         nodes_list = getenv("CTDB_NODES");
746         if (nodes_list == NULL) {
747                 nodes_list = "/etc/ctdb/nodes";
748         }
749         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
750         if (lines == NULL) {
751                 return NULL;
752         }
753         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
754                 nlines--;
755         }
756         for (i=0, pnn=0; i<nlines; i++) {
757                 char *node;
758
759                 node = lines[i];
760                 /* strip leading spaces */
761                 while((*node == ' ') || (*node == '\t')) {
762                         node++;
763                 }
764                 if (*node == '#') {
765                         pnn++;
766                         continue;
767                 }
768                 if (strcmp(node, "") == 0) {
769                         continue;
770                 }
771                 pnn_node = talloc(mem_ctx, struct pnn_node);
772                 pnn_node->pnn = pnn++;
773                 pnn_node->addr = talloc_strdup(pnn_node, node);
774                 pnn_node->next = pnn_nodes;
775                 pnn_nodes = pnn_node;
776         }
777
778         /* swap them around so we return them in incrementing order */
779         pnn_node = pnn_nodes;
780         pnn_nodes = NULL;
781         while (pnn_node) {
782                 tmp_node = pnn_node;
783                 pnn_node = pnn_node->next;
784
785                 tmp_node->next = pnn_nodes;
786                 pnn_nodes = tmp_node;
787         }
788
789         return pnn_nodes;
790 }
791
792 /*
793   show the PNN of the current node
794   discover the pnn by loading the nodes file and try to bind to all
795   addresses one at a time until the ip address is found.
796  */
797 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
798 {
799         TALLOC_CTX *mem_ctx = talloc_new(NULL);
800         struct pnn_node *pnn_nodes;
801         struct pnn_node *pnn_node;
802
803         pnn_nodes = read_nodes_file(mem_ctx);
804         if (pnn_nodes == NULL) {
805                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
806                 talloc_free(mem_ctx);
807                 return -1;
808         }
809
810         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
811                 ctdb_sock_addr addr;
812
813                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
814                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
815                         talloc_free(mem_ctx);
816                         return -1;
817                 }
818
819                 if (ctdb_sys_have_ip(&addr)) {
820                         printf("PNN:%d\n", pnn_node->pnn);
821                         talloc_free(mem_ctx);
822                         return 0;
823                 }
824         }
825
826         printf("Failed to detect which PNN this node is\n");
827         talloc_free(mem_ctx);
828         return -1;
829 }
830
831 /* Helpers for ctdb status
832  */
833 static bool is_partially_online(struct ctdb_node_and_flags *node)
834 {
835         int j;
836         bool ret = false;
837
838         if (node->flags == 0) {
839                 struct ctdb_ifaces_list *ifaces;
840
841                 if (ctdb_getifaces(ctdb_connection, node->pnn, &ifaces)) {
842                         for (j=0; j < ifaces->num; j++) {
843                                 if (ifaces->ifaces[j].link_state != 0) {
844                                         continue;
845                                 }
846                                 ret = true;
847                                 break;
848                         }
849                         ctdb_free_ifaces(ifaces);
850                 }
851         }
852
853         return ret;
854 }
855
856 static void control_status_header_machine(void)
857 {
858         printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
859                ":Inactive:PartiallyOnline:ThisNode:\n");
860 }
861
862 static int control_status_1_machine(int mypnn, struct ctdb_node_and_flags *node)
863 {
864         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
865                ctdb_addr_to_str(&node->addr),
866                !!(node->flags&NODE_FLAGS_DISCONNECTED),
867                !!(node->flags&NODE_FLAGS_BANNED),
868                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
869                !!(node->flags&NODE_FLAGS_UNHEALTHY),
870                !!(node->flags&NODE_FLAGS_STOPPED),
871                !!(node->flags&NODE_FLAGS_INACTIVE),
872                is_partially_online(node) ? 1 : 0,
873                (node->pnn == mypnn)?'Y':'N');
874
875         return node->flags;
876 }
877
878 static int control_status_1_human(int mypnn, struct ctdb_node_and_flags *node)
879 {
880        printf("pnn:%d %-16s %s%s\n", node->pnn,
881               ctdb_addr_to_str(&node->addr),
882               is_partially_online(node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
883               node->pnn == mypnn?" (THIS NODE)":"");
884
885        return node->flags;
886 }
887
888 /*
889   display remote ctdb status
890  */
891 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
892 {
893         int i;
894         struct ctdb_vnn_map *vnnmap=NULL;
895         struct ctdb_node_map *nodemap=NULL;
896         uint32_t recmode, recmaster, mypnn;
897
898         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
899                 return -1;
900         }
901
902         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
903                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
904                 return -1;
905         }
906
907         if (options.machinereadable) {
908                 control_status_header_machine();
909                 for (i=0;i<nodemap->num;i++) {
910                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
911                                 continue;
912                         }
913                         (void) control_status_1_machine(mypnn,
914                                                         &nodemap->nodes[i]);
915                 }
916                 return 0;
917         }
918
919         printf("Number of nodes:%d\n", nodemap->num);
920         for(i=0;i<nodemap->num;i++){
921                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
922                         continue;
923                 }
924                 (void) control_status_1_human(mypnn, &nodemap->nodes[i]);
925         }
926
927         if (!ctdb_getvnnmap(ctdb_connection, options.pnn, &vnnmap)) {
928                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
929                 return -1;
930         }
931         if (vnnmap->generation == INVALID_GENERATION) {
932                 printf("Generation:INVALID\n");
933         } else {
934                 printf("Generation:%d\n",vnnmap->generation);
935         }
936         printf("Size:%d\n",vnnmap->size);
937         for(i=0;i<vnnmap->size;i++){
938                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
939         }
940         ctdb_free_vnnmap(vnnmap);
941
942         if (!ctdb_getrecmode(ctdb_connection, options.pnn, &recmode)) {
943                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
944                 return -1;
945         }
946         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
947
948         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
949                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
950                 return -1;
951         }
952         printf("Recovery master:%d\n",recmaster);
953
954         return 0;
955 }
956
957 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
958 {
959         int i, ret;
960         struct ctdb_node_map *nodemap=NULL;
961         uint32_t * nodes;
962         uint32_t pnn_mode, mypnn;
963
964         if (argc > 1) {
965                 usage();
966         }
967
968         if (!parse_nodestring(ctdb, argc == 1 ? argv[0] : NULL,
969                               options.pnn, true, &nodes, &pnn_mode)) {
970                 return -1;
971         }
972
973         if (options.machinereadable) {
974                 control_status_header_machine();
975         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
976                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
977         }
978
979         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
980                 DEBUG(DEBUG_ERR, ("Unable to get PNN from local node\n"));
981                 return -1;
982         }
983
984         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
985                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
986                 return -1;
987         }
988
989         ret = 0;
990
991         for (i = 0; i < talloc_array_length(nodes); i++) {
992                 if (options.machinereadable) {
993                         ret |= control_status_1_machine(mypnn,
994                                                         &nodemap->nodes[nodes[i]]);
995                 } else {
996                         ret |= control_status_1_human(mypnn,
997                                                       &nodemap->nodes[nodes[i]]);
998                 }
999         }
1000         return ret;
1001 }
1002
1003 struct natgw_node {
1004         struct natgw_node *next;
1005         const char *addr;
1006 };
1007
1008 static int find_natgw(struct ctdb_context *ctdb,
1009                        struct ctdb_node_map *nodemap, uint32_t flags,
1010                        uint32_t *pnn, const char **ip)
1011 {
1012         int i;
1013         uint32_t capabilities;
1014
1015         for (i=0;i<nodemap->num;i++) {
1016                 if (!(nodemap->nodes[i].flags & flags)) {
1017                         if (!ctdb_getcapabilities(ctdb_connection, nodemap->nodes[i].pnn, &capabilities)) {
1018                                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1019                                 return -1;
1020                         }
1021                         if (!(capabilities&CTDB_CAP_NATGW)) {
1022                                 continue;
1023                         }
1024                         *pnn = nodemap->nodes[i].pnn;
1025                         *ip = ctdb_addr_to_str(&nodemap->nodes[i].addr);
1026                         return 0;
1027                 }
1028         }
1029
1030         return 2; /* matches ENOENT */
1031 }
1032
1033 /*
1034   display the list of nodes belonging to this natgw configuration
1035  */
1036 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
1037 {
1038         int i, ret;
1039         const char *natgw_list;
1040         int nlines;
1041         char **lines;
1042         struct natgw_node *natgw_nodes = NULL;
1043         struct natgw_node *natgw_node;
1044         struct ctdb_node_map *nodemap=NULL;
1045         uint32_t mypnn, pnn;
1046         const char *ip;
1047
1048         /* When we have some nodes that could be the NATGW, make a
1049          * series of attempts to find the first node that doesn't have
1050          * certain status flags set.
1051          */
1052         uint32_t exclude_flags[] = {
1053                 /* Look for a nice healthy node */
1054                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
1055                 /* If not found, an UNHEALTHY/BANNED node will do */
1056                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
1057                 /* If not found, a STOPPED node will do */
1058                 NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
1059                 0,
1060         };
1061
1062         /* read the natgw nodes file into a linked list */
1063         natgw_list = getenv("CTDB_NATGW_NODES");
1064         if (natgw_list == NULL) {
1065                 natgw_list = "/etc/ctdb/natgw_nodes";
1066         }
1067         lines = file_lines_load(natgw_list, &nlines, ctdb);
1068         if (lines == NULL) {
1069                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
1070                 return -1;
1071         }
1072         for (i=0;i<nlines;i++) {
1073                 char *node;
1074
1075                 node = lines[i];
1076                 /* strip leading spaces */
1077                 while((*node == ' ') || (*node == '\t')) {
1078                         node++;
1079                 }
1080                 if (*node == '#') {
1081                         continue;
1082                 }
1083                 if (strcmp(node, "") == 0) {
1084                         continue;
1085                 }
1086                 natgw_node = talloc(ctdb, struct natgw_node);
1087                 natgw_node->addr = talloc_strdup(natgw_node, node);
1088                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
1089                 natgw_node->next = natgw_nodes;
1090                 natgw_nodes = natgw_node;
1091         }
1092
1093         if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
1094                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1095                 return -1;
1096         }
1097
1098         /* Trim the nodemap so it only includes connected nodes in the
1099          * current natgw group.
1100          */
1101         i=0;
1102         while(i<nodemap->num) {
1103                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
1104                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
1105                                 break;
1106                         }
1107                 }
1108
1109                 /* this node was not in the natgw so we just remove it from
1110                  * the list
1111                  */
1112                 if ((natgw_node == NULL) 
1113                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
1114                         int j;
1115
1116                         for (j=i+1; j<nodemap->num; j++) {
1117                                 nodemap->nodes[j-1] = nodemap->nodes[j];
1118                         }
1119                         nodemap->num--;
1120                         continue;
1121                 }
1122
1123                 i++;
1124         }
1125
1126         ret = 2; /* matches ENOENT */
1127         pnn = -1;
1128         ip = "0.0.0.0";
1129         for (i = 0; exclude_flags[i] != 0; i++) {
1130                 ret = find_natgw(ctdb, nodemap,
1131                                  exclude_flags[i],
1132                                  &pnn, &ip);
1133                 if (ret == -1) {
1134                         goto done;
1135                 }
1136                 if (ret == 0) {
1137                         break;
1138                 }
1139         }
1140
1141         if (options.machinereadable) {
1142                 printf(":Node:IP:\n");
1143                 printf(":%d:%s:\n", pnn, ip);
1144         } else {
1145                 printf("%d %s\n", pnn, ip);
1146         }
1147
1148         /* print the pruned list of nodes belonging to this natgw list */
1149         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
1150                 DEBUG(DEBUG_NOTICE, ("Unable to get PNN from node %u\n", options.pnn));
1151                 /* This is actually harmless and will only result in
1152                  * the "this node" indication being missing
1153                  */
1154                 mypnn = -1;
1155         }
1156         if (options.machinereadable) {
1157                 control_status_header_machine();
1158         } else {
1159                 printf("Number of nodes:%d\n", nodemap->num);
1160         }
1161         for(i=0;i<nodemap->num;i++){
1162                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1163                         continue;
1164                 }
1165                 if (options.machinereadable) {
1166                         control_status_1_machine(mypnn, &(nodemap->nodes[i]));
1167                 } else {
1168                         control_status_1_human(mypnn, &(nodemap->nodes[i]));
1169                 }
1170         }
1171
1172 done:
1173         ctdb_free_nodemap(nodemap);
1174         return ret;
1175 }
1176
1177 /*
1178   display the status of the scripts for monitoring (or other events)
1179  */
1180 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1181                                     enum ctdb_eventscript_call type)
1182 {
1183         struct ctdb_scripts_wire *script_status;
1184         int ret, i;
1185
1186         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1187         if (ret != 0) {
1188                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1189                 return ret;
1190         }
1191
1192         if (script_status == NULL) {
1193                 if (!options.machinereadable) {
1194                         printf("%s cycle never run\n",
1195                                ctdb_eventscript_call_names[type]);
1196                 }
1197                 return 0;
1198         }
1199
1200         if (!options.machinereadable) {
1201                 printf("%d scripts were executed last %s cycle\n",
1202                        script_status->num_scripts,
1203                        ctdb_eventscript_call_names[type]);
1204         }
1205         for (i=0; i<script_status->num_scripts; i++) {
1206                 const char *status = NULL;
1207
1208                 switch (script_status->scripts[i].status) {
1209                 case -ETIME:
1210                         status = "TIMEDOUT";
1211                         break;
1212                 case -ENOEXEC:
1213                         status = "DISABLED";
1214                         break;
1215                 case 0:
1216                         status = "OK";
1217                         break;
1218                 default:
1219                         if (script_status->scripts[i].status > 0)
1220                                 status = "ERROR";
1221                         break;
1222                 }
1223                 if (options.machinereadable) {
1224                         printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1225                                ctdb_eventscript_call_names[type],
1226                                script_status->scripts[i].name,
1227                                script_status->scripts[i].status,
1228                                status,
1229                                (long)script_status->scripts[i].start.tv_sec,
1230                                (long)script_status->scripts[i].start.tv_usec,
1231                                (long)script_status->scripts[i].finished.tv_sec,
1232                                (long)script_status->scripts[i].finished.tv_usec,
1233                                script_status->scripts[i].output);
1234                         continue;
1235                 }
1236                 if (status)
1237                         printf("%-20s Status:%s    ",
1238                                script_status->scripts[i].name, status);
1239                 else
1240                         /* Some other error, eg from stat. */
1241                         printf("%-20s Status:CANNOT RUN (%s)",
1242                                script_status->scripts[i].name,
1243                                strerror(-script_status->scripts[i].status));
1244
1245                 if (script_status->scripts[i].status >= 0) {
1246                         printf("Duration:%.3lf ",
1247                         timeval_delta(&script_status->scripts[i].finished,
1248                               &script_status->scripts[i].start));
1249                 }
1250                 if (script_status->scripts[i].status != -ENOEXEC) {
1251                         printf("%s",
1252                                ctime(&script_status->scripts[i].start.tv_sec));
1253                         if (script_status->scripts[i].status != 0) {
1254                                 printf("   OUTPUT:%s\n",
1255                                        script_status->scripts[i].output);
1256                         }
1257                 } else {
1258                         printf("\n");
1259                 }
1260         }
1261         return 0;
1262 }
1263
1264
1265 static int control_scriptstatus(struct ctdb_context *ctdb,
1266                                 int argc, const char **argv)
1267 {
1268         int ret;
1269         enum ctdb_eventscript_call type, min, max;
1270         const char *arg;
1271
1272         if (argc > 1) {
1273                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1274                 return -1;
1275         }
1276
1277         if (argc == 0)
1278                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1279         else
1280                 arg = argv[0];
1281
1282         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1283                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1284                         min = type;
1285                         max = type+1;
1286                         break;
1287                 }
1288         }
1289         if (type == CTDB_EVENT_MAX) {
1290                 if (strcmp(arg, "all") == 0) {
1291                         min = 0;
1292                         max = CTDB_EVENT_MAX;
1293                 } else {
1294                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1295                         return -1;
1296                 }
1297         }
1298
1299         if (options.machinereadable) {
1300                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1301         }
1302
1303         for (type = min; type < max; type++) {
1304                 ret = control_one_scriptstatus(ctdb, type);
1305                 if (ret != 0) {
1306                         return ret;
1307                 }
1308         }
1309
1310         return 0;
1311 }
1312
1313 /*
1314   enable an eventscript
1315  */
1316 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1317 {
1318         int ret;
1319
1320         if (argc < 1) {
1321                 usage();
1322         }
1323
1324         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1325         if (ret != 0) {
1326           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1327                 return ret;
1328         }
1329
1330         return 0;
1331 }
1332
1333 /*
1334   disable an eventscript
1335  */
1336 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1337 {
1338         int ret;
1339
1340         if (argc < 1) {
1341                 usage();
1342         }
1343
1344         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1345         if (ret != 0) {
1346           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1347                 return ret;
1348         }
1349
1350         return 0;
1351 }
1352
1353 /*
1354   display the pnn of the recovery master
1355  */
1356 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1357 {
1358         uint32_t recmaster;
1359
1360         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
1361                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1362                 return -1;
1363         }
1364         printf("%d\n",recmaster);
1365
1366         return 0;
1367 }
1368
1369 /*
1370   add a tickle to a public address
1371  */
1372 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1373 {
1374         struct ctdb_tcp_connection t;
1375         TDB_DATA data;
1376         int ret;
1377
1378         if (argc < 2) {
1379                 usage();
1380         }
1381
1382         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1383                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1384                 return -1;
1385         }
1386         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1387                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1388                 return -1;
1389         }
1390
1391         data.dptr = (uint8_t *)&t;
1392         data.dsize = sizeof(t);
1393
1394         /* tell all nodes about this tcp connection */
1395         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1396                            0, data, ctdb, NULL, NULL, NULL, NULL);
1397         if (ret != 0) {
1398                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1399                 return -1;
1400         }
1401         
1402         return 0;
1403 }
1404
1405
1406 /*
1407   delete a tickle from a node
1408  */
1409 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1410 {
1411         struct ctdb_tcp_connection t;
1412         TDB_DATA data;
1413         int ret;
1414
1415         if (argc < 2) {
1416                 usage();
1417         }
1418
1419         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1420                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1421                 return -1;
1422         }
1423         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1424                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1425                 return -1;
1426         }
1427
1428         data.dptr = (uint8_t *)&t;
1429         data.dsize = sizeof(t);
1430
1431         /* tell all nodes about this tcp connection */
1432         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1433                            0, data, ctdb, NULL, NULL, NULL, NULL);
1434         if (ret != 0) {
1435                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1436                 return -1;
1437         }
1438         
1439         return 0;
1440 }
1441
1442
1443 /*
1444   get a list of all tickles for this pnn
1445  */
1446 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1447 {
1448         struct ctdb_control_tcp_tickle_list *list;
1449         ctdb_sock_addr addr;
1450         int i, ret;
1451         unsigned port = 0;
1452
1453         if (argc < 1) {
1454                 usage();
1455         }
1456
1457         if (argc == 2) {
1458                 port = atoi(argv[1]);
1459         }
1460
1461         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1462                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1463                 return -1;
1464         }
1465
1466         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1467         if (ret == -1) {
1468                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1469                 return -1;
1470         }
1471
1472         if (options.machinereadable){
1473                 printf(":source ip:port:destination ip:port:\n");
1474                 for (i=0;i<list->tickles.num;i++) {
1475                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1476                                 continue;
1477                         }
1478                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1479                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1480                 }
1481         } else {
1482                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1483                 printf("Num tickles:%u\n", list->tickles.num);
1484                 for (i=0;i<list->tickles.num;i++) {
1485                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1486                                 continue;
1487                         }
1488                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1489                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1490                 }
1491         }
1492
1493         talloc_free(list);
1494         
1495         return 0;
1496 }
1497
1498
1499 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1500 {
1501         struct ctdb_all_public_ips *ips;
1502         struct ctdb_public_ip ip;
1503         int i, ret;
1504         uint32_t *nodes;
1505         uint32_t disable_time;
1506         TDB_DATA data;
1507         struct ctdb_node_map *nodemap=NULL;
1508         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1509
1510         disable_time = 30;
1511         data.dptr  = (uint8_t*)&disable_time;
1512         data.dsize = sizeof(disable_time);
1513         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1514         if (ret != 0) {
1515                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1516                 return -1;
1517         }
1518
1519
1520
1521         /* read the public ip list from the node */
1522         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1523         if (ret != 0) {
1524                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1525                 talloc_free(tmp_ctx);
1526                 return -1;
1527         }
1528
1529         for (i=0;i<ips->num;i++) {
1530                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1531                         break;
1532                 }
1533         }
1534         if (i==ips->num) {
1535                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1536                         pnn, ctdb_addr_to_str(addr)));
1537                 talloc_free(tmp_ctx);
1538                 return -1;
1539         }
1540
1541         ip.pnn  = pnn;
1542         ip.addr = *addr;
1543
1544         data.dptr  = (uint8_t *)&ip;
1545         data.dsize = sizeof(ip);
1546
1547         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1548         if (ret != 0) {
1549                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1550                 talloc_free(tmp_ctx);
1551                 return ret;
1552         }
1553
1554         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1555         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1556                                         nodes, 0,
1557                                         LONGTIMELIMIT(),
1558                                         false, data,
1559                                         NULL, NULL,
1560                                         NULL);
1561         if (ret != 0) {
1562                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1563                 talloc_free(tmp_ctx);
1564                 return -1;
1565         }
1566
1567         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1568         if (ret != 0) {
1569                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1570                 talloc_free(tmp_ctx);
1571                 return -1;
1572         }
1573
1574         /* update the recovery daemon so it now knows to expect the new
1575            node assignment for this ip.
1576         */
1577         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1578         if (ret != 0) {
1579                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1580                 return -1;
1581         }
1582
1583         talloc_free(tmp_ctx);
1584         return 0;
1585 }
1586
1587 /*
1588   move/failover an ip address to a specific node
1589  */
1590 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1591 {
1592         uint32_t pnn;
1593         int ret, retries = 0;
1594         ctdb_sock_addr addr;
1595
1596         if (argc < 2) {
1597                 usage();
1598                 return -1;
1599         }
1600
1601         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1602                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1603                 return -1;
1604         }
1605
1606
1607         if (sscanf(argv[1], "%u", &pnn) != 1) {
1608                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1609                 return -1;
1610         }
1611
1612         do {
1613                 ret = move_ip(ctdb, &addr, pnn);
1614                 if (ret != 0) {
1615                         DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 second and try again.\n", pnn));
1616                         sleep(3);
1617                         retries++;
1618                 }
1619         } while (retries < 5 && ret != 0);
1620         if (ret != 0) {
1621                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", pnn));
1622                 return -1;
1623         }
1624
1625         return 0;
1626 }
1627
1628 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1629 {
1630         uint32_t recmaster;
1631         TDB_DATA data;
1632
1633         if (ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), pnn, &recmaster) != 0) {
1634                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", pnn));
1635                 return -1;
1636         }
1637
1638         data.dptr  = (uint8_t *)&pnn;
1639         data.dsize = sizeof(uint32_t);
1640         if (ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1641                 DEBUG(DEBUG_ERR,("Failed to send message to force node reallocation\n"));
1642                 return -1;
1643         }
1644
1645         return 0;
1646 }
1647
1648
1649 /*
1650   rebalance a node by setting it to allow failback and triggering a
1651   takeover run
1652  */
1653 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1654 {
1655         switch (options.pnn) {
1656         case CTDB_BROADCAST_ALL:
1657         case CTDB_CURRENT_NODE:
1658                 DEBUG(DEBUG_ERR,("You must specify a node number with -n <pnn> for the node to rebalance\n"));
1659                 return -1;
1660         }
1661
1662         return rebalance_node(ctdb, options.pnn);
1663 }
1664
1665
1666 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1667 {
1668         struct ctdb_public_ip ip;
1669         int ret;
1670         uint32_t *nodes;
1671         uint32_t disable_time;
1672         TDB_DATA data;
1673         struct ctdb_node_map *nodemap=NULL;
1674         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1675
1676         disable_time = 30;
1677         data.dptr  = (uint8_t*)&disable_time;
1678         data.dsize = sizeof(disable_time);
1679         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1680         if (ret != 0) {
1681                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1682                 return -1;
1683         }
1684
1685         ip.pnn  = -1;
1686         ip.addr = *addr;
1687
1688         data.dptr  = (uint8_t *)&ip;
1689         data.dsize = sizeof(ip);
1690
1691         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1692         if (ret != 0) {
1693                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1694                 talloc_free(tmp_ctx);
1695                 return ret;
1696         }
1697
1698         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1699         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1700                                         nodes, 0,
1701                                         LONGTIMELIMIT(),
1702                                         false, data,
1703                                         NULL, NULL,
1704                                         NULL);
1705         if (ret != 0) {
1706                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1707                 talloc_free(tmp_ctx);
1708                 return -1;
1709         }
1710
1711         talloc_free(tmp_ctx);
1712         return 0;
1713 }
1714
1715 /*
1716   release an ip form all nodes and have it re-assigned by recd
1717  */
1718 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
1719 {
1720         ctdb_sock_addr addr;
1721
1722         if (argc < 1) {
1723                 usage();
1724                 return -1;
1725         }
1726
1727         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1728                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1729                 return -1;
1730         }
1731
1732         if (rebalance_ip(ctdb, &addr) != 0) {
1733                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
1734                 return -1;
1735         }
1736
1737         return 0;
1738 }
1739
1740 static int getips_store_callback(void *param, void *data)
1741 {
1742         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1743         struct ctdb_all_public_ips *ips = param;
1744         int i;
1745
1746         i = ips->num++;
1747         ips->ips[i].pnn  = node_ip->pnn;
1748         ips->ips[i].addr = node_ip->addr;
1749         return 0;
1750 }
1751
1752 static int getips_count_callback(void *param, void *data)
1753 {
1754         uint32_t *count = param;
1755
1756         (*count)++;
1757         return 0;
1758 }
1759
1760 #define IP_KEYLEN       4
1761 static uint32_t *ip_key(ctdb_sock_addr *ip)
1762 {
1763         static uint32_t key[IP_KEYLEN];
1764
1765         bzero(key, sizeof(key));
1766
1767         switch (ip->sa.sa_family) {
1768         case AF_INET:
1769                 key[0]  = ip->ip.sin_addr.s_addr;
1770                 break;
1771         case AF_INET6: {
1772                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
1773                 key[0]  = s6_a32[3];
1774                 key[1]  = s6_a32[2];
1775                 key[2]  = s6_a32[1];
1776                 key[3]  = s6_a32[0];
1777                 break;
1778         }
1779         default:
1780                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1781                 return key;
1782         }
1783
1784         return key;
1785 }
1786
1787 static void *add_ip_callback(void *parm, void *data)
1788 {
1789         return parm;
1790 }
1791
1792 static int
1793 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1794 {
1795         struct ctdb_all_public_ips *tmp_ips;
1796         struct ctdb_node_map *nodemap=NULL;
1797         trbt_tree_t *ip_tree;
1798         int i, j, len, ret;
1799         uint32_t count;
1800
1801         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1802         if (ret != 0) {
1803                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1804                 return ret;
1805         }
1806
1807         ip_tree = trbt_create(tmp_ctx, 0);
1808
1809         for(i=0;i<nodemap->num;i++){
1810                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1811                         continue;
1812                 }
1813                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1814                         continue;
1815                 }
1816
1817                 /* read the public ip list from this node */
1818                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1819                 if (ret != 0) {
1820                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1821                         return -1;
1822                 }
1823         
1824                 for (j=0; j<tmp_ips->num;j++) {
1825                         struct ctdb_public_ip *node_ip;
1826
1827                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1828                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1829                         node_ip->addr = tmp_ips->ips[j].addr;
1830
1831                         trbt_insertarray32_callback(ip_tree,
1832                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1833                                 add_ip_callback,
1834                                 node_ip);
1835                 }
1836                 talloc_free(tmp_ips);
1837         }
1838
1839         /* traverse */
1840         count = 0;
1841         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1842
1843         len = offsetof(struct ctdb_all_public_ips, ips) + 
1844                 count*sizeof(struct ctdb_public_ip);
1845         tmp_ips = talloc_zero_size(tmp_ctx, len);
1846         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1847
1848         *ips = tmp_ips;
1849
1850         return 0;
1851 }
1852
1853
1854 /* 
1855  * scans all other nodes and returns a pnn for another node that can host this 
1856  * ip address or -1
1857  */
1858 static int
1859 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1860 {
1861         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1862         struct ctdb_all_public_ips *ips;
1863         struct ctdb_node_map *nodemap=NULL;
1864         int i, j, ret;
1865
1866         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1867         if (ret != 0) {
1868                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1869                 talloc_free(tmp_ctx);
1870                 return ret;
1871         }
1872
1873         for(i=0;i<nodemap->num;i++){
1874                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1875                         continue;
1876                 }
1877                 if (nodemap->nodes[i].pnn == options.pnn) {
1878                         continue;
1879                 }
1880
1881                 /* read the public ip list from this node */
1882                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1883                 if (ret != 0) {
1884                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1885                         return -1;
1886                 }
1887
1888                 for (j=0;j<ips->num;j++) {
1889                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1890                                 talloc_free(tmp_ctx);
1891                                 return nodemap->nodes[i].pnn;
1892                         }
1893                 }
1894                 talloc_free(ips);
1895         }
1896
1897         talloc_free(tmp_ctx);
1898         return -1;
1899 }
1900
1901 static uint32_t ipreallocate_finished;
1902
1903 /*
1904   handler for receiving the response to ipreallocate
1905 */
1906 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1907                              TDB_DATA data, void *private_data)
1908 {
1909         ipreallocate_finished = 1;
1910 }
1911
1912 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1913 {
1914         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1915
1916         event_add_timed(ctdb->ev, ctdb, 
1917                                 timeval_current_ofs(1, 0),
1918                                 ctdb_every_second, ctdb);
1919 }
1920
1921 /*
1922   ask the recovery daemon on the recovery master to perform a ip reallocation
1923  */
1924 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1925 {
1926         int i, ret;
1927         TDB_DATA data;
1928         struct takeover_run_reply rd;
1929         uint32_t recmaster;
1930         struct ctdb_node_map *nodemap=NULL;
1931         int retries=0;
1932         struct timeval tv = timeval_current();
1933
1934         /* we need some events to trigger so we can timeout and restart
1935            the loop
1936         */
1937         event_add_timed(ctdb->ev, ctdb, 
1938                                 timeval_current_ofs(1, 0),
1939                                 ctdb_every_second, ctdb);
1940
1941         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1942         if (rd.pnn == -1) {
1943                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1944                 return -1;
1945         }
1946         rd.srvid = getpid();
1947
1948         /* register a message port for receiveing the reply so that we
1949            can receive the reply
1950         */
1951         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1952
1953         data.dptr = (uint8_t *)&rd;
1954         data.dsize = sizeof(rd);
1955
1956 again:
1957         /* check that there are valid nodes available */
1958         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1959                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1960                 return -1;
1961         }
1962         for (i=0; i<nodemap->num;i++) {
1963                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1964                         break;
1965                 }
1966         }
1967         if (i==nodemap->num) {
1968                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1969                 return 0;
1970         }
1971
1972
1973         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
1974                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1975                 return -1;
1976         }
1977
1978         /* verify the node exists */
1979         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1980                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1981                 return -1;
1982         }
1983
1984
1985         /* check tha there are nodes available that can act as a recmaster */
1986         for (i=0; i<nodemap->num; i++) {
1987                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1988                         continue;
1989                 }
1990                 break;
1991         }
1992         if (i == nodemap->num) {
1993                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1994                 return 0;
1995         }
1996
1997         /* verify the recovery master is not STOPPED, nor BANNED */
1998         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1999                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2000                 retries++;
2001                 sleep(1);
2002                 goto again;
2003         } 
2004         
2005         /* verify the recovery master is not STOPPED, nor BANNED */
2006         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
2007                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
2008                 retries++;
2009                 sleep(1);
2010                 goto again;
2011         } 
2012
2013         ipreallocate_finished = 0;
2014         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2015         if (ret != 0) {
2016                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
2017                 return -1;
2018         }
2019
2020         tv = timeval_current();
2021         /* this loop will terminate when we have received the reply */
2022         while (timeval_elapsed(&tv) < 5.0 && ipreallocate_finished == 0) {
2023                 event_loop_once(ctdb->ev);
2024         }
2025         if (ipreallocate_finished == 1) {
2026                 return 0;
2027         }
2028
2029         retries++;
2030         sleep(1);
2031         goto again;
2032
2033         return 0;
2034 }
2035
2036
2037 /*
2038   add a public ip address to a node
2039  */
2040 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2041 {
2042         int i, ret;
2043         int len, retries = 0;
2044         unsigned mask;
2045         ctdb_sock_addr addr;
2046         struct ctdb_control_ip_iface *pub;
2047         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2048         struct ctdb_all_public_ips *ips;
2049
2050
2051         if (argc != 2) {
2052                 talloc_free(tmp_ctx);
2053                 usage();
2054         }
2055
2056         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2057                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2058                 talloc_free(tmp_ctx);
2059                 return -1;
2060         }
2061
2062         /* read the public ip list from the node */
2063         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2064         if (ret != 0) {
2065                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2066                 talloc_free(tmp_ctx);
2067                 return -1;
2068         }
2069         for (i=0;i<ips->num;i++) {
2070                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2071                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2072                         return 0;
2073                 }
2074         }
2075
2076
2077
2078         /* Dont timeout. This command waits for an ip reallocation
2079            which sometimes can take wuite a while if there has
2080            been a recent recovery
2081         */
2082         alarm(0);
2083
2084         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2085         pub = talloc_size(tmp_ctx, len); 
2086         CTDB_NO_MEMORY(ctdb, pub);
2087
2088         pub->addr  = addr;
2089         pub->mask  = mask;
2090         pub->len   = strlen(argv[1])+1;
2091         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2092
2093         do {
2094                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2095                 if (ret != 0) {
2096                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2097                         sleep(3);
2098                         retries++;
2099                 }
2100         } while (retries < 5 && ret != 0);
2101         if (ret != 0) {
2102                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2103                 talloc_free(tmp_ctx);
2104                 return ret;
2105         }
2106
2107         if (rebalance_node(ctdb, options.pnn) != 0) {
2108                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2109                 return ret;
2110         }
2111
2112         talloc_free(tmp_ctx);
2113         return 0;
2114 }
2115
2116 /*
2117   add a public ip address to a node
2118  */
2119 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2120 {
2121         ctdb_sock_addr addr;
2122
2123         if (argc != 1) {
2124                 usage();
2125         }
2126
2127         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2128                 printf("Badly formed ip : %s\n", argv[0]);
2129                 return -1;
2130         }
2131
2132         printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
2133
2134         return 0;
2135 }
2136
2137 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2138
2139 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2140 {
2141         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2142         struct ctdb_node_map *nodemap=NULL;
2143         struct ctdb_all_public_ips *ips;
2144         int ret, i, j;
2145
2146         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2147         if (ret != 0) {
2148                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2149                 return ret;
2150         }
2151
2152         /* remove it from the nodes that are not hosting the ip currently */
2153         for(i=0;i<nodemap->num;i++){
2154                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2155                         continue;
2156                 }
2157                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
2158                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2159                         continue;
2160                 }
2161
2162                 for (j=0;j<ips->num;j++) {
2163                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2164                                 break;
2165                         }
2166                 }
2167                 if (j==ips->num) {
2168                         continue;
2169                 }
2170
2171                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2172                         continue;
2173                 }
2174
2175                 options.pnn = nodemap->nodes[i].pnn;
2176                 control_delip(ctdb, argc, argv);
2177         }
2178
2179
2180         /* remove it from every node (also the one hosting it) */
2181         for(i=0;i<nodemap->num;i++){
2182                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2183                         continue;
2184                 }
2185                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
2186                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2187                         continue;
2188                 }
2189
2190                 for (j=0;j<ips->num;j++) {
2191                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2192                                 break;
2193                         }
2194                 }
2195                 if (j==ips->num) {
2196                         continue;
2197                 }
2198
2199                 options.pnn = nodemap->nodes[i].pnn;
2200                 control_delip(ctdb, argc, argv);
2201         }
2202
2203         talloc_free(tmp_ctx);
2204         return 0;
2205 }
2206         
2207 /*
2208   delete a public ip address from a node
2209  */
2210 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2211 {
2212         int i, ret;
2213         int retries = 0;
2214         ctdb_sock_addr addr;
2215         struct ctdb_control_ip_iface pub;
2216         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2217         struct ctdb_all_public_ips *ips;
2218
2219         if (argc != 1) {
2220                 talloc_free(tmp_ctx);
2221                 usage();
2222         }
2223
2224         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2225                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2226                 return -1;
2227         }
2228
2229         if (options.pnn == CTDB_BROADCAST_ALL) {
2230                 return control_delip_all(ctdb, argc, argv, &addr);
2231         }
2232
2233         pub.addr  = addr;
2234         pub.mask  = 0;
2235         pub.len   = 0;
2236
2237         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2238         if (ret != 0) {
2239                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2240                 talloc_free(tmp_ctx);
2241                 return ret;
2242         }
2243         
2244         for (i=0;i<ips->num;i++) {
2245                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2246                         break;
2247                 }
2248         }
2249
2250         if (i==ips->num) {
2251                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2252                         ctdb_addr_to_str(&addr)));
2253                 talloc_free(tmp_ctx);
2254                 return -1;
2255         }
2256
2257         if (ips->ips[i].pnn == options.pnn) {
2258                 int pnn;
2259
2260                 pnn = find_other_host_for_public_ip(ctdb, &addr);
2261                 if (pnn != -1) {
2262                         do {
2263                                 ret = move_ip(ctdb, &addr, pnn);
2264                                 if (ret != 0) {
2265                                         DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 seconds and try again.\n", options.pnn));
2266                                         sleep(3);
2267                                         retries++;
2268                                 }
2269                         } while (retries < 5 && ret != 0);
2270                         if (ret != 0) {
2271                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", options.pnn));
2272                                 return -1;
2273                         }
2274                 }
2275         }
2276
2277         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2278         if (ret != 0) {
2279                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2280                 talloc_free(tmp_ctx);
2281                 return ret;
2282         }
2283
2284         talloc_free(tmp_ctx);
2285         return 0;
2286 }
2287
2288 /*
2289   kill a tcp connection
2290  */
2291 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2292 {
2293         int ret;
2294         struct ctdb_control_killtcp killtcp;
2295
2296         if (argc < 2) {
2297                 usage();
2298         }
2299
2300         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2301                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2302                 return -1;
2303         }
2304
2305         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2306                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2307                 return -1;
2308         }
2309
2310         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2311         if (ret != 0) {
2312                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2313                 return ret;
2314         }
2315
2316         return 0;
2317 }
2318
2319
2320 /*
2321   send a gratious arp
2322  */
2323 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2324 {
2325         int ret;
2326         ctdb_sock_addr addr;
2327
2328         if (argc < 2) {
2329                 usage();
2330         }
2331
2332         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2333                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2334                 return -1;
2335         }
2336
2337         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2338         if (ret != 0) {
2339                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2340                 return ret;
2341         }
2342
2343         return 0;
2344 }
2345
2346 /*
2347   register a server id
2348  */
2349 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2350 {
2351         int ret;
2352         struct ctdb_server_id server_id;
2353
2354         if (argc < 3) {
2355                 usage();
2356         }
2357
2358         server_id.pnn       = strtoul(argv[0], NULL, 0);
2359         server_id.type      = strtoul(argv[1], NULL, 0);
2360         server_id.server_id = strtoul(argv[2], NULL, 0);
2361
2362         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2363         if (ret != 0) {
2364                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2365                 return ret;
2366         }
2367         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2368         sleep(999);
2369         return -1;
2370 }
2371
2372 /*
2373   unregister a server id
2374  */
2375 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2376 {
2377         int ret;
2378         struct ctdb_server_id server_id;
2379
2380         if (argc < 3) {
2381                 usage();
2382         }
2383
2384         server_id.pnn       = strtoul(argv[0], NULL, 0);
2385         server_id.type      = strtoul(argv[1], NULL, 0);
2386         server_id.server_id = strtoul(argv[2], NULL, 0);
2387
2388         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2389         if (ret != 0) {
2390                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2391                 return ret;
2392         }
2393         return -1;
2394 }
2395
2396 /*
2397   check if a server id exists
2398  */
2399 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2400 {
2401         uint32_t status;
2402         int ret;
2403         struct ctdb_server_id server_id;
2404
2405         if (argc < 3) {
2406                 usage();
2407         }
2408
2409         server_id.pnn       = strtoul(argv[0], NULL, 0);
2410         server_id.type      = strtoul(argv[1], NULL, 0);
2411         server_id.server_id = strtoul(argv[2], NULL, 0);
2412
2413         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2414         if (ret != 0) {
2415                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2416                 return ret;
2417         }
2418
2419         if (status) {
2420                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2421         } else {
2422                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2423         }
2424         return 0;
2425 }
2426
2427 /*
2428   get a list of all server ids that are registered on a node
2429  */
2430 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2431 {
2432         int i, ret;
2433         struct ctdb_server_id_list *server_ids;
2434
2435         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2436         if (ret != 0) {
2437                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2438                 return ret;
2439         }
2440
2441         for (i=0; i<server_ids->num; i++) {
2442                 printf("Server id %d:%d:%d\n", 
2443                         server_ids->server_ids[i].pnn, 
2444                         server_ids->server_ids[i].type, 
2445                         server_ids->server_ids[i].server_id); 
2446         }
2447
2448         return -1;
2449 }
2450
2451 /*
2452   check if a server id exists
2453  */
2454 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2455 {
2456         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2457         uint64_t *ids;
2458         uint8_t *result;
2459         int i;
2460
2461         if (argc < 1) {
2462                 talloc_free(tmp_ctx);
2463                 usage();
2464         }
2465
2466         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2467         result = talloc_array(tmp_ctx, uint8_t, argc);
2468
2469         for (i = 0; i < argc; i++) {
2470                 ids[i] = strtoull(argv[i], NULL, 0);
2471         }
2472
2473         if (!ctdb_check_message_handlers(ctdb_connection,
2474                 options.pnn, argc, ids, result)) {
2475                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2476                                   options.pnn));
2477                 talloc_free(tmp_ctx);
2478                 return -1;
2479         }
2480
2481         for (i=0; i < argc; i++) {
2482                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2483                        result[i] ? "exists" : "does not exist");
2484         }
2485
2486         talloc_free(tmp_ctx);
2487         return 0;
2488 }
2489
2490 /*
2491   send a tcp tickle ack
2492  */
2493 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2494 {
2495         int ret;
2496         ctdb_sock_addr  src, dst;
2497
2498         if (argc < 2) {
2499                 usage();
2500         }
2501
2502         if (!parse_ip_port(argv[0], &src)) {
2503                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2504                 return -1;
2505         }
2506
2507         if (!parse_ip_port(argv[1], &dst)) {
2508                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2509                 return -1;
2510         }
2511
2512         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2513         if (ret==0) {
2514                 return 0;
2515         }
2516         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2517
2518         return -1;
2519 }
2520
2521
2522 /*
2523   display public ip status
2524  */
2525 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2526 {
2527         int i, ret;
2528         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2529         struct ctdb_all_public_ips *ips;
2530
2531         if (options.pnn == CTDB_BROADCAST_ALL) {
2532                 /* read the list of public ips from all nodes */
2533                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2534         } else {
2535                 /* read the public ip list from this node */
2536                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2537         }
2538         if (ret != 0) {
2539                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2540                 talloc_free(tmp_ctx);
2541                 return ret;
2542         }
2543
2544         if (options.machinereadable){
2545                 printf(":Public IP:Node:");
2546                 if (options.verbose){
2547                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2548                 }
2549                 printf("\n");
2550         } else {
2551                 if (options.pnn == CTDB_BROADCAST_ALL) {
2552                         printf("Public IPs on ALL nodes\n");
2553                 } else {
2554                         printf("Public IPs on node %u\n", options.pnn);
2555                 }
2556         }
2557
2558         for (i=1;i<=ips->num;i++) {
2559                 struct ctdb_control_public_ip_info *info = NULL;
2560                 int32_t pnn;
2561                 char *aciface = NULL;
2562                 char *avifaces = NULL;
2563                 char *cifaces = NULL;
2564
2565                 if (options.pnn == CTDB_BROADCAST_ALL) {
2566                         pnn = ips->ips[ips->num-i].pnn;
2567                 } else {
2568                         pnn = options.pnn;
2569                 }
2570
2571                 if (pnn != -1) {
2572                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
2573                                                    &ips->ips[ips->num-i].addr, &info);
2574                 } else {
2575                         ret = -1;
2576                 }
2577
2578                 if (ret == 0) {
2579                         int j;
2580                         for (j=0; j < info->num; j++) {
2581                                 if (cifaces == NULL) {
2582                                         cifaces = talloc_strdup(info,
2583                                                                 info->ifaces[j].name);
2584                                 } else {
2585                                         cifaces = talloc_asprintf_append(cifaces,
2586                                                                          ",%s",
2587                                                                          info->ifaces[j].name);
2588                                 }
2589
2590                                 if (info->active_idx == j) {
2591                                         aciface = info->ifaces[j].name;
2592                                 }
2593
2594                                 if (info->ifaces[j].link_state == 0) {
2595                                         continue;
2596                                 }
2597
2598                                 if (avifaces == NULL) {
2599                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
2600                                 } else {
2601                                         avifaces = talloc_asprintf_append(avifaces,
2602                                                                           ",%s",
2603                                                                           info->ifaces[j].name);
2604                                 }
2605                         }
2606                 }
2607
2608                 if (options.machinereadable){
2609                         printf(":%s:%d:",
2610                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2611                                 ips->ips[ips->num-i].pnn);
2612                         if (options.verbose){
2613                                 printf("%s:%s:%s:",
2614                                         aciface?aciface:"",
2615                                         avifaces?avifaces:"",
2616                                         cifaces?cifaces:"");
2617                         }
2618                         printf("\n");
2619                 } else {
2620                         if (options.verbose) {
2621                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
2622                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2623                                         ips->ips[ips->num-i].pnn,
2624                                         aciface?aciface:"",
2625                                         avifaces?avifaces:"",
2626                                         cifaces?cifaces:"");
2627                         } else {
2628                                 printf("%s %d\n",
2629                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2630                                         ips->ips[ips->num-i].pnn);
2631                         }
2632                 }
2633                 talloc_free(info);
2634         }
2635
2636         talloc_free(tmp_ctx);
2637         return 0;
2638 }
2639
2640 /*
2641   public ip info
2642  */
2643 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
2644 {
2645         int i, ret;
2646         ctdb_sock_addr addr;
2647         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2648         struct ctdb_control_public_ip_info *info;
2649
2650         if (argc != 1) {
2651                 talloc_free(tmp_ctx);
2652                 usage();
2653         }
2654
2655         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2656                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2657                 return -1;
2658         }
2659
2660         /* read the public ip info from this node */
2661         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
2662                                            tmp_ctx, &addr, &info);
2663         if (ret != 0) {
2664                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
2665                                   argv[0], options.pnn));
2666                 talloc_free(tmp_ctx);
2667                 return ret;
2668         }
2669
2670         printf("Public IP[%s] info on node %u\n",
2671                ctdb_addr_to_str(&info->ip.addr),
2672                options.pnn);
2673
2674         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
2675                ctdb_addr_to_str(&info->ip.addr),
2676                info->ip.pnn, info->num);
2677
2678         for (i=0; i<info->num; i++) {
2679                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2680
2681                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
2682                        i+1, info->ifaces[i].name,
2683                        info->ifaces[i].link_state?"up":"down",
2684                        (unsigned int)info->ifaces[i].references,
2685                        (i==info->active_idx)?" (active)":"");
2686         }
2687
2688         talloc_free(tmp_ctx);
2689         return 0;
2690 }
2691
2692 /*
2693   display interfaces status
2694  */
2695 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
2696 {
2697         int i;
2698         struct ctdb_ifaces_list *ifaces;
2699
2700         /* read the public ip list from this node */
2701         if (!ctdb_getifaces(ctdb_connection, options.pnn, &ifaces)) {
2702                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
2703                                   options.pnn));
2704                 return -1;
2705         }
2706
2707         if (options.machinereadable){
2708                 printf(":Name:LinkStatus:References:\n");
2709         } else {
2710                 printf("Interfaces on node %u\n", options.pnn);
2711         }
2712
2713         for (i=0; i<ifaces->num; i++) {
2714                 if (options.machinereadable){
2715                         printf(":%s:%s:%u\n",
2716                                ifaces->ifaces[i].name,
2717                                ifaces->ifaces[i].link_state?"1":"0",
2718                                (unsigned int)ifaces->ifaces[i].references);
2719                 } else {
2720                         printf("name:%s link:%s references:%u\n",
2721                                ifaces->ifaces[i].name,
2722                                ifaces->ifaces[i].link_state?"up":"down",
2723                                (unsigned int)ifaces->ifaces[i].references);
2724                 }
2725         }
2726
2727         ctdb_free_ifaces(ifaces);
2728         return 0;
2729 }
2730
2731
2732 /*
2733   set link status of an interface
2734  */
2735 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
2736 {
2737         int ret;
2738         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2739         struct ctdb_control_iface_info info;
2740
2741         ZERO_STRUCT(info);
2742
2743         if (argc != 2) {
2744                 usage();
2745         }
2746
2747         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
2748                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
2749                                   argv[0]));
2750                 talloc_free(tmp_ctx);
2751                 return -1;
2752         }
2753         strcpy(info.name, argv[0]);
2754
2755         if (strcmp(argv[1], "up") == 0) {
2756                 info.link_state = 1;
2757         } else if (strcmp(argv[1], "down") == 0) {
2758                 info.link_state = 0;
2759         } else {
2760                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
2761                                   argv[1]));
2762                 talloc_free(tmp_ctx);
2763                 return -1;
2764         }
2765
2766         /* read the public ip list from this node */
2767         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
2768                                    tmp_ctx, &info);
2769         if (ret != 0) {
2770                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
2771                                   argv[0], options.pnn));
2772                 talloc_free(tmp_ctx);
2773                 return ret;
2774         }
2775
2776         talloc_free(tmp_ctx);
2777         return 0;
2778 }
2779
2780 /*
2781   display pid of a ctdb daemon
2782  */
2783 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
2784 {
2785         uint32_t pid;
2786         int ret;
2787
2788         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
2789         if (ret != 0) {
2790                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
2791                 return ret;
2792         }
2793         printf("Pid:%d\n", pid);
2794
2795         return 0;
2796 }
2797
2798 /*
2799   disable a remote node
2800  */
2801 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2802 {
2803         int ret;
2804         struct ctdb_node_map *nodemap=NULL;
2805
2806         /* check if the node is already disabled */
2807         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2808                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2809                 exit(10);
2810         }
2811         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2812                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2813                 return 0;
2814         }
2815
2816         do {
2817                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2818                 if (ret != 0) {
2819                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2820                         return ret;
2821                 }
2822
2823                 sleep(1);
2824
2825                 /* read the nodemap and verify the change took effect */
2826                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2827                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2828                         exit(10);
2829                 }
2830
2831         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2832         ret = control_ipreallocate(ctdb, argc, argv);
2833         if (ret != 0) {
2834                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2835                 return ret;
2836         }
2837
2838         return 0;
2839 }
2840
2841 /*
2842   enable a disabled remote node
2843  */
2844 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2845 {
2846         int ret;
2847
2848         struct ctdb_node_map *nodemap=NULL;
2849
2850
2851         /* check if the node is already enabled */
2852         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2853                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2854                 exit(10);
2855         }
2856         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2857                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2858                 return 0;
2859         }
2860
2861         do {
2862                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2863                 if (ret != 0) {
2864                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2865                         return ret;
2866                 }
2867
2868                 sleep(1);
2869
2870                 /* read the nodemap and verify the change took effect */
2871                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2872                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2873                         exit(10);
2874                 }
2875
2876         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2877
2878         ret = control_ipreallocate(ctdb, argc, argv);
2879         if (ret != 0) {
2880                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2881                 return ret;
2882         }
2883
2884         return 0;
2885 }
2886
2887 /*
2888   stop a remote node
2889  */
2890 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2891 {
2892         int ret;
2893         struct ctdb_node_map *nodemap=NULL;
2894
2895         do {
2896                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2897                 if (ret != 0) {
2898                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2899                 }
2900         
2901                 sleep(1);
2902
2903                 /* read the nodemap and verify the change took effect */
2904                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2905                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2906                         exit(10);
2907                 }
2908
2909         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2910         ret = control_ipreallocate(ctdb, argc, argv);
2911         if (ret != 0) {
2912                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2913                 return ret;
2914         }
2915
2916         return 0;
2917 }
2918
2919 /*
2920   restart a stopped remote node
2921  */
2922 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2923 {
2924         int ret;
2925
2926         struct ctdb_node_map *nodemap=NULL;
2927
2928         do {
2929                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2930                 if (ret != 0) {
2931                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2932                         return ret;
2933                 }
2934         
2935                 sleep(1);
2936
2937                 /* read the nodemap and verify the change took effect */
2938                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2939                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2940                         exit(10);
2941                 }
2942
2943         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2944         ret = control_ipreallocate(ctdb, argc, argv);
2945         if (ret != 0) {
2946                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2947                 return ret;
2948         }
2949
2950         return 0;
2951 }
2952
2953 static uint32_t get_generation(struct ctdb_context *ctdb)
2954 {
2955         struct ctdb_vnn_map *vnnmap=NULL;
2956         int ret;
2957
2958         /* wait until the recmaster is not in recovery mode */
2959         while (1) {
2960                 uint32_t recmode, recmaster;
2961                 
2962                 if (vnnmap != NULL) {
2963                         talloc_free(vnnmap);
2964                         vnnmap = NULL;
2965                 }
2966
2967                 /* get the recmaster */
2968                 if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
2969                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2970                         exit(10);
2971                 }
2972
2973                 /* get recovery mode */
2974                 if (!ctdb_getrecmode(ctdb_connection, recmaster, &recmode)) {
2975                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2976                         exit(10);
2977                 }
2978
2979                 /* get the current generation number */
2980                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2981                 if (ret != 0) {
2982                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2983                         exit(10);
2984                 }
2985
2986                 if ((recmode == CTDB_RECOVERY_NORMAL)
2987                 &&  (vnnmap->generation != 1)){
2988                         return vnnmap->generation;
2989                 }
2990                 sleep(1);
2991         }
2992 }
2993
2994 /*
2995   ban a node from the cluster
2996  */
2997 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2998 {
2999         int ret;
3000         struct ctdb_node_map *nodemap=NULL;
3001         struct ctdb_ban_time bantime;
3002
3003         if (argc < 1) {
3004                 usage();
3005         }
3006         
3007         /* verify the node exists */
3008         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3009         if (ret != 0) {
3010                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3011                 return ret;
3012         }
3013
3014         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
3015                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
3016                 return -1;
3017         }
3018
3019         bantime.pnn  = options.pnn;
3020         bantime.time = strtoul(argv[0], NULL, 0);
3021
3022         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
3023         if (ret != 0) {
3024                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
3025                 return -1;
3026         }       
3027
3028         ret = control_ipreallocate(ctdb, argc, argv);
3029         if (ret != 0) {
3030                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
3031                 return ret;
3032         }
3033
3034         return 0;
3035 }
3036
3037
3038 /*
3039   unban a node from the cluster
3040  */
3041 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3042 {
3043         int ret;
3044         struct ctdb_node_map *nodemap=NULL;
3045         struct ctdb_ban_time bantime;
3046
3047         /* verify the node exists */
3048         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3049         if (ret != 0) {
3050                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3051                 return ret;
3052         }
3053
3054         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
3055                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
3056                 return -1;
3057         }
3058
3059         bantime.pnn  = options.pnn;
3060         bantime.time = 0;
3061
3062         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
3063         if (ret != 0) {
3064                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
3065                 return -1;
3066         }       
3067
3068         ret = control_ipreallocate(ctdb, argc, argv);
3069         if (ret != 0) {
3070                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
3071                 return ret;
3072         }
3073
3074         return 0;
3075 }
3076
3077
3078 /*
3079   show ban information for a node
3080  */
3081 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3082 {
3083         int ret;
3084         struct ctdb_node_map *nodemap=NULL;
3085         struct ctdb_ban_time *bantime;
3086
3087         /* verify the node exists */
3088         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3089         if (ret != 0) {
3090                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3091                 return ret;
3092         }
3093
3094         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3095         if (ret != 0) {
3096                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3097                 return -1;
3098         }       
3099
3100         if (bantime->time == 0) {
3101                 printf("Node %u is not banned\n", bantime->pnn);
3102         } else {
3103                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
3104         }
3105
3106         return 0;
3107 }
3108
3109 /*
3110   shutdown a daemon
3111  */
3112 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3113 {
3114         int ret;
3115
3116         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3117         if (ret != 0) {
3118                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3119                 return ret;
3120         }
3121
3122         return 0;
3123 }
3124
3125 /*
3126   trigger a recovery
3127  */
3128 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3129 {
3130         int ret;
3131         uint32_t generation, next_generation;
3132
3133         /* record the current generation number */
3134         generation = get_generation(ctdb);
3135
3136         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
3137         if (ret != 0) {
3138                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
3139                 return ret;
3140         }
3141
3142         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3143         if (ret != 0) {
3144                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3145                 return ret;
3146         }
3147
3148         /* wait until we are in a new generation */
3149         while (1) {
3150                 next_generation = get_generation(ctdb);
3151                 if (next_generation != generation) {
3152                         return 0;
3153                 }
3154                 sleep(1);
3155         }
3156
3157         return 0;
3158 }
3159
3160
3161 /*
3162   display monitoring mode of a remote node
3163  */
3164 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3165 {
3166         uint32_t monmode;
3167         int ret;
3168
3169         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3170         if (ret != 0) {
3171                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3172                 return ret;
3173         }
3174         if (!options.machinereadable){
3175                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3176         } else {
3177                 printf(":mode:\n");
3178                 printf(":%d:\n",monmode);
3179         }
3180         return 0;
3181 }
3182
3183
3184 /*
3185   display capabilities of a remote node
3186  */
3187 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3188 {
3189         uint32_t capabilities;
3190
3191         if (!ctdb_getcapabilities(ctdb_connection, options.pnn, &capabilities)) {
3192                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3193                 return -1;
3194         }
3195         
3196         if (!options.machinereadable){
3197                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3198                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3199                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3200                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3201         } else {
3202                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
3203                 printf(":%d:%d:%d:%d:\n",
3204                         !!(capabilities&CTDB_CAP_RECMASTER),
3205                         !!(capabilities&CTDB_CAP_LMASTER),
3206                         !!(capabilities&CTDB_CAP_LVS),
3207                         !!(capabilities&CTDB_CAP_NATGW));
3208         }
3209         return 0;
3210 }
3211
3212 /*
3213   display lvs configuration
3214  */
3215 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3216 {
3217         uint32_t *capabilities;
3218         struct ctdb_node_map *nodemap=NULL;
3219         int i, ret;
3220         int healthy_count = 0;
3221
3222         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
3223                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3224                 return -1;
3225         }
3226
3227         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3228         CTDB_NO_MEMORY(ctdb, capabilities);
3229         
3230         ret = 0;
3231
3232         /* collect capabilities for all connected nodes */
3233         for (i=0; i<nodemap->num; i++) {
3234                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3235                         continue;
3236                 }
3237                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3238                         continue;
3239                 }
3240         
3241                 if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
3242                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3243                         ret = -1;
3244                         goto done;
3245                 }
3246
3247                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3248                         continue;
3249                 }
3250
3251                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3252                         healthy_count++;
3253                 }
3254         }
3255
3256         /* Print all LVS nodes */
3257         for (i=0; i<nodemap->num; i++) {
3258                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3259                         continue;
3260                 }
3261                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3262                         continue;
3263                 }
3264                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3265                         continue;
3266                 }
3267
3268                 if (healthy_count != 0) {
3269                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3270                                 continue;
3271                         }
3272                 }
3273
3274                 printf("%d:%s\n", i, 
3275                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
3276         }
3277
3278 done:
3279         ctdb_free_nodemap(nodemap);
3280         return ret;
3281 }
3282
3283 /*
3284   display who is the lvs master
3285  */
3286 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3287 {
3288         uint32_t *capabilities;
3289         struct ctdb_node_map *nodemap=NULL;
3290         int i, ret;
3291         int healthy_count = 0;
3292
3293         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
3294                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3295                 return -1;
3296         }
3297
3298         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3299         CTDB_NO_MEMORY(ctdb, capabilities);
3300
3301         ret = -1;
3302         
3303         /* collect capabilities for all connected nodes */
3304         for (i=0; i<nodemap->num; i++) {
3305                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3306                         continue;
3307                 }
3308                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3309                         continue;
3310                 }
3311         
3312                 if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
3313                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3314                         ret = -1;
3315                         goto done;
3316                 }
3317
3318                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3319                         continue;
3320                 }
3321
3322                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3323                         healthy_count++;
3324                 }
3325         }
3326
3327         /* find and show the lvsmaster */
3328         for (i=0; i<nodemap->num; i++) {
3329                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3330                         continue;
3331                 }
3332                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3333                         continue;
3334                 }
3335                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3336                         continue;
3337                 }
3338
3339                 if (healthy_count != 0) {
3340                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3341                                 continue;
3342                         }
3343                 }
3344
3345                 if (options.machinereadable){
3346                         printf("%d\n", i);
3347                 } else {
3348                         printf("Node %d is LVS master\n", i);
3349                 }
3350                 ret = 0;
3351                 goto done;
3352         }
3353
3354         printf("There is no LVS master\n");
3355 done:
3356         ctdb_free_nodemap(nodemap);
3357         return ret;
3358 }
3359
3360 /*
3361   disable monitoring on a  node
3362  */
3363 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3364 {
3365         
3366         int ret;
3367
3368         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3369         if (ret != 0) {
3370                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3371                 return ret;
3372         }
3373         printf("Monitoring mode:%s\n","DISABLED");
3374
3375         return 0;
3376 }
3377
3378 /*
3379   enable monitoring on a  node
3380  */
3381 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3382 {
3383         
3384         int ret;
3385
3386         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3387         if (ret != 0) {
3388                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3389                 return ret;
3390         }
3391         printf("Monitoring mode:%s\n","ACTIVE");
3392
3393         return 0;
3394 }
3395
3396 /*
3397   display remote list of keys/data for a db
3398  */
3399 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3400 {
3401         const char *db_name;
3402         struct ctdb_db_context *ctdb_db;
3403         int ret;
3404         bool persistent;
3405         struct ctdb_dump_db_context c;
3406
3407         if (argc < 1) {
3408                 usage();
3409         }
3410
3411         db_name = argv[0];
3412
3413
3414         if (db_exists(ctdb, db_name, &persistent)) {
3415                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3416                 return -1;
3417         }
3418
3419         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3420
3421         if (ctdb_db == NULL) {
3422                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3423                 return -1;
3424         }
3425
3426         if (options.printlmaster) {
3427                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3428                                           ctdb, &ctdb->vnn_map);
3429                 if (ret != 0) {
3430                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3431                                           options.pnn));
3432                         return ret;
3433                 }
3434         }
3435
3436         ZERO_STRUCT(c);
3437         c.f = stdout;
3438         c.printemptyrecords = (bool)options.printemptyrecords;
3439         c.printdatasize = (bool)options.printdatasize;
3440         c.printlmaster = (bool)options.printlmaster;
3441         c.printhash = (bool)options.printhash;
3442         c.printrecordflags = (bool)options.printrecordflags;
3443
3444         /* traverse and dump the cluster tdb */
3445         ret = ctdb_dump_db(ctdb_db, &c);
3446         if (ret == -1) {
3447                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3448                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3449                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3450                                   db_name));
3451                 return -1;
3452         }
3453         talloc_free(ctdb_db);
3454
3455         printf("Dumped %d records\n", ret);
3456         return 0;
3457 }
3458
3459 struct cattdb_data {
3460         struct ctdb_context *ctdb;
3461         uint32_t count;
3462 };
3463
3464 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3465 {
3466         struct cattdb_data *d = private_data;
3467         struct ctdb_dump_db_context c;
3468
3469         d->count++;
3470
3471         ZERO_STRUCT(c);
3472         c.f = stdout;
3473         c.printemptyrecords = (bool)options.printemptyrecords;
3474         c.printdatasize = (bool)options.printdatasize;
3475         c.printlmaster = false;
3476         c.printhash = (bool)options.printhash;
3477         c.printrecordflags = true;
3478
3479         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3480 }
3481
3482 /*
3483   cat the local tdb database using same format as catdb
3484  */
3485 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3486 {
3487         const char *db_name;
3488         struct ctdb_db_context *ctdb_db;
3489         struct cattdb_data d;
3490         bool persistent;
3491
3492         if (argc < 1) {
3493                 usage();
3494         }
3495
3496         db_name = argv[0];
3497
3498
3499         if (db_exists(ctdb, db_name, &persistent)) {
3500                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3501                 return -1;
3502         }
3503
3504         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3505
3506         if (ctdb_db == NULL) {
3507                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3508                 return -1;
3509         }
3510
3511         /* traverse the local tdb */
3512         d.count = 0;
3513         d.ctdb  = ctdb;
3514         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3515                 printf("Failed to cattdb data\n");
3516                 exit(10);
3517         }
3518         talloc_free(ctdb_db);
3519
3520         printf("Dumped %d records\n", d.count);
3521         return 0;
3522 }
3523
3524 /*
3525   display the content of a database key
3526  */
3527 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3528 {
3529         const char *db_name;
3530         struct ctdb_db_context *ctdb_db;
3531         struct ctdb_record_handle *h;
3532         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3533         TDB_DATA key, data;
3534         bool persistent;
3535
3536         if (argc < 2) {
3537                 usage();
3538         }
3539
3540         db_name = argv[0];
3541
3542         if (db_exists(ctdb, db_name, &persistent)) {
3543                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3544                 return -1;
3545         }
3546
3547         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3548
3549         if (ctdb_db == NULL) {
3550                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3551                 return -1;
3552         }
3553
3554         key.dptr  = discard_const(argv[1]);
3555         key.dsize = strlen((char *)key.dptr);
3556
3557         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3558         if (h == NULL) {
3559                 printf("Failed to fetch record '%s' on node %d\n", 
3560                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3561                 talloc_free(tmp_ctx);
3562                 exit(10);
3563         }
3564
3565         printf("Data: size:%d ptr:[%s]\n", (int)data.dsize, data.dptr);
3566
3567         talloc_free(ctdb_db);
3568         talloc_free(tmp_ctx);
3569         return 0;
3570 }
3571
3572 /*
3573   display the content of a database key
3574  */
3575 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3576 {
3577         const char *db_name;
3578         struct ctdb_db_context *ctdb_db;
3579         struct ctdb_record_handle *h;
3580         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3581         TDB_DATA key, data;
3582         bool persistent;
3583
3584         if (argc < 3) {
3585                 usage();
3586         }
3587
3588         db_name = argv[0];
3589
3590         if (db_exists(ctdb, db_name, &persistent)) {
3591                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3592                 return -1;
3593         }
3594
3595         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3596
3597         if (ctdb_db == NULL) {
3598                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3599                 return -1;
3600         }
3601
3602         key.dptr  = discard_const(argv[1]);
3603         key.dsize = strlen((char *)key.dptr);
3604
3605         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3606         if (h == NULL) {
3607                 printf("Failed to fetch record '%s' on node %d\n", 
3608                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3609                 talloc_free(tmp_ctx);
3610                 exit(10);
3611         }
3612
3613         data.dptr  = discard_const(argv[2]);
3614         data.dsize = strlen((char *)data.dptr);
3615
3616         if (ctdb_record_store(h, data) != 0) {
3617                 printf("Failed to store record\n");
3618         }
3619
3620         talloc_free(h);
3621         talloc_free(ctdb_db);
3622         talloc_free(tmp_ctx);
3623         return 0;
3624 }
3625
3626 /*
3627   fetch a record from a persistent database
3628  */
3629 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3630 {
3631         const char *db_name;
3632         struct ctdb_db_context *ctdb_db;
3633         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3634         struct ctdb_transaction_handle *h;
3635         TDB_DATA key, data;
3636         int fd, ret;
3637         bool persistent;
3638
3639         if (argc < 2) {
3640                 talloc_free(tmp_ctx);
3641                 usage();
3642         }
3643
3644         db_name = argv[0];
3645
3646
3647         if (db_exists(ctdb, db_name, &persistent)) {
3648                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3649                 talloc_free(tmp_ctx);
3650                 return -1;
3651         }
3652
3653         if (!persistent) {
3654                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3655                 talloc_free(tmp_ctx);
3656                 return -1;
3657         }
3658
3659         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3660
3661         if (ctdb_db == NULL) {
3662                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3663                 talloc_free(tmp_ctx);
3664                 return -1;
3665         }
3666
3667         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3668         if (h == NULL) {
3669                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3670                 talloc_free(tmp_ctx);
3671                 return -1;
3672         }
3673
3674         key.dptr  = discard_const(argv[1]);
3675         key.dsize = strlen(argv[1]);
3676         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3677         if (ret != 0) {
3678                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
3679                 talloc_free(tmp_ctx);
3680                 return -1;
3681         }
3682
3683         if (data.dsize == 0 || data.dptr == NULL) {
3684                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3685                 talloc_free(tmp_ctx);
3686                 return -1;
3687         }
3688
3689         if (argc == 3) {
3690           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3691                 if (fd == -1) {
3692                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3693                         talloc_free(tmp_ctx);
3694                         return -1;
3695                 }
3696                 write(fd, data.dptr, data.dsize);
3697                 close(fd);
3698         } else {
3699                 write(1, data.dptr, data.dsize);
3700         }
3701
3702         /* abort the transaction */
3703         talloc_free(h);
3704
3705
3706         talloc_free(tmp_ctx);
3707         return 0;
3708 }
3709
3710 /*
3711   fetch a record from a tdb-file
3712  */
3713 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3714 {
3715         const char *tdb_file;
3716         TDB_CONTEXT *tdb;
3717         TDB_DATA key, data;
3718         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3719         int fd;
3720
3721         if (argc < 2) {
3722                 usage();
3723         }
3724
3725         tdb_file = argv[0];
3726
3727         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
3728         if (tdb == NULL) {
3729                 printf("Failed to open TDB file %s\n", tdb_file);
3730                 return -1;
3731         }
3732
3733         if (!strncmp(argv[1], "0x", 2)) {
3734                 key = hextodata(tmp_ctx, argv[1] + 2);
3735                 if (key.dsize == 0) {
3736                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3737                         return -1;
3738                 }
3739         } else {
3740                 key.dptr  = discard_const(argv[1]);
3741                 key.dsize = strlen(argv[1]);
3742         }
3743
3744         data = tdb_fetch(tdb, key);
3745         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
3746                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
3747                 tdb_close(tdb);
3748                 return -1;
3749         }
3750
3751         tdb_close(tdb);
3752
3753         if (argc == 3) {
3754           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3755                 if (fd == -1) {
3756                         printf("Failed to open output file %s\n", argv[2]);
3757                         return -1;
3758                 }
3759                 if (options.verbose){
3760                         write(fd, data.dptr, data.dsize);
3761                 } else {
3762                         write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3763                 }
3764                 close(fd);
3765         } else {
3766                 if (options.verbose){
3767                         write(1, data.dptr, data.dsize);
3768                 } else {
3769                         write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3770                 }
3771         }
3772
3773         talloc_free(tmp_ctx);
3774         return 0;
3775 }
3776
3777 /*
3778   store a record and header to a tdb-file
3779  */
3780 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
3781 {
3782         const char *tdb_file;
3783         TDB_CONTEXT *tdb;
3784         TDB_DATA key, data;
3785         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3786
3787         if (argc < 3) {
3788                 usage();
3789         }
3790
3791         tdb_file = argv[0];
3792
3793         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
3794         if (tdb == NULL) {
3795                 printf("Failed to open TDB file %s\n", tdb_file);
3796                 return -1;
3797         }
3798
3799         if (!strncmp(argv[1], "0x", 2)) {
3800                 key = hextodata(tmp_ctx, argv[1] + 2);
3801                 if (key.dsize == 0) {
3802                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3803                         return -1;
3804                 }
3805         } else {
3806                 key.dptr  = discard_const(argv[1]);
3807                 key.dsize = strlen(argv[1]);
3808         }
3809
3810         if (!strncmp(argv[2], "0x", 2)) {
3811                 data = hextodata(tmp_ctx, argv[2] + 2);
3812                 if (data.dsize == 0) {
3813                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
3814                         return -1;
3815                 }
3816         } else {
3817                 data.dptr  = discard_const(argv[2]);
3818                 data.dsize = strlen(argv[2]);
3819         }
3820
3821         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3822                 printf("Not enough data. You must specify the full ctdb_ltdb_header too when storing\n");
3823                 return -1;
3824         }
3825         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
3826                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
3827                 tdb_close(tdb);
3828                 return -1;
3829         }
3830
3831         tdb_close(tdb);
3832
3833         talloc_free(tmp_ctx);
3834         return 0;
3835 }
3836
3837 /*
3838   write a record to a persistent database
3839  */
3840 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
3841 {
3842         const char *db_name;
3843         struct ctdb_db_context *ctdb_db;
3844         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3845         struct ctdb_transaction_handle *h;
3846         struct stat st;
3847         TDB_DATA key, data;
3848         int fd, ret;
3849
3850         if (argc < 3) {
3851                 talloc_free(tmp_ctx);
3852                 usage();
3853         }
3854
3855         fd = open(argv[2], O_RDONLY);
3856         if (fd == -1) {
3857                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
3858                 talloc_free(tmp_ctx);
3859                 return -1;
3860         }
3861         
3862         ret = fstat(fd, &st);
3863         if (ret == -1) {
3864                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
3865                 close(fd);
3866                 talloc_free(tmp_ctx);
3867                 return -1;
3868         }
3869
3870         if (!S_ISREG(st.st_mode)) {
3871                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
3872                 close(fd);
3873                 talloc_free(tmp_ctx);
3874                 return -1;
3875         }
3876
3877         data.dsize = st.st_size;
3878         if (data.dsize == 0) {
3879                 data.dptr  = NULL;
3880         } else {
3881                 data.dptr = talloc_size(tmp_ctx, data.dsize);
3882                 if (data.dptr == NULL) {
3883                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
3884                         close(fd);
3885                         talloc_free(tmp_ctx);
3886                         return -1;
3887                 }
3888                 ret = read(fd, data.dptr, data.dsize);
3889                 if (ret != data.dsize) {
3890                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
3891                         close(fd);
3892                         talloc_free(tmp_ctx);
3893                         return -1;
3894                 }
3895         }
3896         close(fd);
3897
3898
3899         db_name = argv[0];
3900
3901         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
3902         if (ctdb_db == NULL) {
3903                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3904                 talloc_free(tmp_ctx);
3905                 return -1;
3906         }
3907
3908         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3909         if (h == NULL) {
3910                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3911                 talloc_free(tmp_ctx);
3912                 return -1;
3913         }
3914
3915         key.dptr  = discard_const(argv[1]);
3916         key.dsize = strlen(argv[1]);
3917         ret = ctdb_transaction_store(h, key, data);
3918         if (ret != 0) {
3919                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
3920                 talloc_free(tmp_ctx);
3921                 return -1;
3922         }
3923
3924         ret = ctdb_transaction_commit(h);
3925         if (ret != 0) {
3926                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
3927                 talloc_free(tmp_ctx);
3928                 return -1;
3929         }
3930
3931
3932         talloc_free(tmp_ctx);
3933         return 0;
3934 }
3935
3936 /*
3937  * delete a record from a persistent database
3938  */
3939 static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
3940 {
3941         const char *db_name;
3942         struct ctdb_db_context *ctdb_db;
3943         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3944         struct ctdb_transaction_handle *h;
3945         TDB_DATA key;
3946         int ret;
3947         bool persistent;
3948
3949         if (argc < 2) {
3950                 talloc_free(tmp_ctx);
3951                 usage();
3952         }
3953
3954         db_name = argv[0];
3955
3956         if (db_exists(ctdb, db_name, &persistent)) {
3957                 DEBUG(DEBUG_ERR, ("Database '%s' does not exist\n", db_name));
3958                 talloc_free(tmp_ctx);
3959                 return -1;
3960         }
3961
3962         if (!persistent) {
3963                 DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
3964                 talloc_free(tmp_ctx);
3965                 return -1;
3966         }
3967
3968         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3969         if (ctdb_db == NULL) {
3970                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
3971                 talloc_free(tmp_ctx);
3972                 return -1;
3973         }
3974
3975         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3976         if (h == NULL) {
3977                 DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
3978                 talloc_free(tmp_ctx);
3979                 return -1;
3980         }
3981
3982         key.dptr = discard_const(argv[1]);
3983         key.dsize = strlen(argv[1]);
3984         ret = ctdb_transaction_store(h, key, tdb_null);
3985         if (ret != 0) {
3986                 DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
3987                 talloc_free(tmp_ctx);
3988                 return -1;
3989         }
3990
3991         ret = ctdb_transaction_commit(h);
3992         if (ret != 0) {
3993                 DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
3994                 talloc_free(tmp_ctx);
3995                 return -1;
3996         }
3997
3998         talloc_free(tmp_ctx);
3999         return 0;
4000 }
4001
4002 /*
4003   check if a service is bound to a port or not
4004  */
4005 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
4006 {
4007         int s, ret;
4008         unsigned v;
4009         int port;
4010         struct sockaddr_in sin;
4011
4012         if (argc != 1) {
4013                 printf("Use: ctdb chktcport <port>\n");
4014                 return EINVAL;
4015         }
4016
4017         port = atoi(argv[0]);
4018
4019         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
4020         if (s == -1) {
4021                 printf("Failed to open local socket\n");
4022                 return errno;
4023         }
4024
4025         v = fcntl(s, F_GETFL, 0);
4026         fcntl(s, F_SETFL, v | O_NONBLOCK);
4027
4028         bzero(&sin, sizeof(sin));
4029         sin.sin_family = PF_INET;
4030         sin.sin_port   = htons(port);
4031         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
4032         close(s);
4033         if (ret == -1) {
4034                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
4035                 return errno;
4036         }
4037
4038         return 0;
4039 }
4040
4041
4042
4043 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4044                              TDB_DATA data, void *private_data)
4045 {
4046         DEBUG(DEBUG_ERR,("Log data received\n"));
4047         if (data.dsize > 0) {
4048                 printf("%s", data.dptr);
4049         }
4050
4051         exit(0);
4052 }
4053
4054 /*
4055   display a list of log messages from the in memory ringbuffer
4056  */
4057 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
4058 {
4059         int ret, i;
4060         bool main_daemon;
4061         struct ctdb_get_log_addr log_addr;
4062         TDB_DATA data;
4063         struct timeval tv;
4064
4065         /* Since this can fail, do it first */
4066         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4067         if (log_addr.pnn == -1) {
4068                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4069                 return -1;
4070         }
4071
4072         /* Process options */
4073         main_daemon = true;
4074         log_addr.level = DEBUG_NOTICE;
4075         for (i = 0; i < argc; i++) {
4076                 if (strcmp(argv[i], "recoverd") == 0) {
4077                         main_daemon = false;
4078                 } else {
4079                         if (isalpha(argv[i][0]) || argv[i][0] == '-') { 
4080                                 log_addr.level = get_debug_by_desc(argv[i]);
4081                         } else {
4082                                 log_addr.level = strtol(argv[i], NULL, 0);
4083                         }
4084                 }
4085         }
4086
4087         /* Our message port is our PID */
4088         log_addr.srvid = getpid();
4089
4090         data.dptr = (unsigned char *)&log_addr;
4091         data.dsize = sizeof(log_addr);
4092
4093         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
4094
4095         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
4096         sleep(1);
4097
4098         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
4099
4100         if (main_daemon) {
4101                 int32_t res;
4102                 char *errmsg;
4103                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4104
4105                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
4106                                    0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4107                 if (ret != 0 || res != 0) {
4108                         DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
4109                         talloc_free(tmp_ctx);
4110                         return -1;
4111                 }
4112                 talloc_free(tmp_ctx);
4113         } else {
4114                 ret = ctdb_client_send_message(ctdb, options.pnn,
4115                                                CTDB_SRVID_GETLOG, data);
4116                 if (ret != 0) {
4117                         DEBUG(DEBUG_ERR,("Failed to send getlog request message to %u\n", options.pnn));
4118                         return -1;
4119                 }
4120         }
4121
4122         tv = timeval_current();
4123         /* this loop will terminate when we have received the reply */
4124         while (timeval_elapsed(&tv) < (double)options.timelimit) {
4125                 event_loop_once(ctdb->ev);
4126         }
4127
4128         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
4129
4130         return 0;
4131 }
4132
4133 /*
4134   clear the in memory log area
4135  */
4136 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
4137 {
4138         int ret;
4139
4140         if (argc == 0 || (argc >= 1 && strcmp(argv[0], "recoverd") != 0)) {
4141                 /* "recoverd" not given - get logs from main daemon */
4142                 int32_t res;
4143                 char *errmsg;
4144                 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4145
4146                 ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
4147                                    0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4148                 if (ret != 0 || res != 0) {
4149                         DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
4150                         talloc_free(tmp_ctx);
4151                         return -1;
4152                 }
4153
4154                 talloc_free(tmp_ctx);
4155         } else {
4156                 TDB_DATA data; /* unused in recoverd... */
4157                 data.dsize = 0;
4158
4159                 ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_CLEARLOG, data);
4160                 if (ret != 0) {
4161                         DEBUG(DEBUG_ERR,("Failed to send clearlog request message to %u\n", options.pnn));
4162                         return -1;
4163                 }
4164         }
4165
4166         return 0;
4167 }
4168
4169
4170 static uint32_t reloadips_finished;
4171
4172 static void reloadips_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4173                              TDB_DATA data, void *private_data)
4174 {
4175         reloadips_finished = 1;
4176 }
4177
4178 static int reloadips_all(struct ctdb_context *ctdb)
4179 {
4180         struct reloadips_all_reply rips;
4181         struct ctdb_node_map *nodemap=NULL;
4182         TDB_DATA data;
4183         uint32_t recmaster;
4184         int ret, i;
4185
4186         /* check that there are valid nodes available */
4187         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
4188                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4189                 return 1;
4190         }
4191         for (i=0; i<nodemap->num;i++) {
4192                 if (nodemap->nodes[i].flags != 0) {
4193                         DEBUG(DEBUG_ERR,("reloadips -n all  can only be used when all nodes are up and healthy. Aborting due to problem with node %d\n", i));
4194                         return 1;
4195                 }
4196         }
4197
4198
4199         rips.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4200         if (rips.pnn == -1) {
4201                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4202                 return 1;
4203         }
4204         rips.srvid = getpid();
4205
4206
4207         /* register a message port for receiveing the reply so that we
4208            can receive the reply
4209         */
4210         ctdb_client_set_message_handler(ctdb, rips.srvid, reloadips_handler, NULL);
4211
4212         if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
4213                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node\n"));
4214                 return -1;
4215         }
4216
4217
4218         data.dptr = (uint8_t *)&rips;
4219         data.dsize = sizeof(rips);
4220
4221         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_RELOAD_ALL_IPS, data);
4222         if (ret != 0) {
4223                 DEBUG(DEBUG_ERR,("Failed to send reload all ips request message to %u\n", options.pnn));
4224                 return 1;
4225         }
4226
4227         reloadips_finished = 0;
4228         while (reloadips_finished == 0) {
4229                 event_loop_once(ctdb->ev);
4230         }
4231
4232         return 0;
4233 }
4234
4235 /*
4236   reload public ips on a specific node
4237  */
4238 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4239 {
4240         int ret;
4241         int32_t res;
4242         char *errmsg;
4243         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4244
4245         if (options.pnn == CTDB_BROADCAST_ALL) {
4246                 return reloadips_all(ctdb);
4247         }
4248
4249         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4250                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4251         if (ret != 0 || res != 0) {
4252                 DEBUG(DEBUG_ERR,("Failed to reload ips\n"));
4253                 talloc_free(tmp_ctx);
4254                 return -1;
4255         }
4256
4257         talloc_free(tmp_ctx);
4258         return 0;
4259 }
4260
4261 /*
4262   display a list of the databases on a remote ctdb
4263  */
4264 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4265 {
4266         int i, ret;
4267         struct ctdb_dbid_map *dbmap=NULL;
4268
4269         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4270         if (ret != 0) {
4271                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4272                 return ret;
4273         }
4274
4275         if(options.machinereadable){
4276                 printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4277                 for(i=0;i<dbmap->num;i++){
4278                         const char *path;
4279                         const char *name;
4280                         const char *health;
4281                         bool persistent;
4282                         bool readonly;
4283                         bool sticky;
4284
4285                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4286                                             dbmap->dbs[i].dbid, ctdb, &path);
4287                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4288                                             dbmap->dbs[i].dbid, ctdb, &name);
4289                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4290                                               dbmap->dbs[i].dbid, ctdb, &health);
4291                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4292                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4293                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4294                         printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4295                                dbmap->dbs[i].dbid, name, path,
4296                                !!(persistent), !!(sticky),
4297                                !!(health), !!(readonly));
4298                 }
4299                 return 0;
4300         }
4301
4302         printf("Number of databases:%d\n", dbmap->num);
4303         for(i=0;i<dbmap->num;i++){
4304                 const char *path;
4305                 const char *name;
4306                 const char *health;
4307                 bool persistent;
4308                 bool readonly;
4309                 bool sticky;
4310
4311                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4312                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4313                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4314                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4315                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4316                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4317                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4318                        dbmap->dbs[i].dbid, name, path,
4319                        persistent?" PERSISTENT":"",
4320                        sticky?" STICKY":"",
4321                        readonly?" READONLY":"",
4322                        health?" UNHEALTHY":"");
4323         }
4324
4325         return 0;
4326 }
4327
4328 /*
4329   display the status of a database on a remote ctdb
4330  */
4331 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4332 {
4333         int i, ret;
4334         struct ctdb_dbid_map *dbmap=NULL;
4335         const char *db_name;
4336
4337         if (argc < 1) {
4338                 usage();
4339         }
4340
4341         db_name = argv[0];
4342
4343         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4344         if (ret != 0) {
4345                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4346                 return ret;
4347         }
4348
4349         for(i=0;i<dbmap->num;i++){
4350                 const char *path;
4351                 const char *name;
4352                 const char *health;
4353                 bool persistent;
4354                 bool readonly;
4355                 bool sticky;
4356
4357                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4358                 if (strcmp(name, db_name) != 0) {
4359                         continue;
4360                 }
4361
4362                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4363                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4364                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4365                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4366                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4367                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4368                        dbmap->dbs[i].dbid, name, path,
4369                        persistent?"yes":"no",
4370                        sticky?"yes":"no",
4371                        readonly?"yes":"no",
4372                        health?health:"OK");
4373                 return 0;
4374         }
4375
4376         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
4377         return 0;
4378 }
4379
4380 /*
4381   check if the local node is recmaster or not
4382   it will return 1 if this node is the recmaster and 0 if it is not
4383   or if the local ctdb daemon could not be contacted
4384  */
4385 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4386 {
4387         uint32_t mypnn, recmaster;
4388
4389         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
4390         if (mypnn == -1) {
4391                 printf("Failed to get pnn of node\n");
4392                 return 1;
4393         }
4394
4395         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
4396                 printf("Failed to get the recmaster\n");
4397                 return 1;
4398         }
4399
4400         if (recmaster != mypnn) {
4401                 printf("this node is not the recmaster\n");
4402                 return 1;
4403         }
4404
4405         printf("this node is the recmaster\n");
4406         return 0;
4407 }
4408
4409 /*
4410   ping a node
4411  */
4412 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4413 {
4414         int ret;
4415         struct timeval tv = timeval_current();
4416         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4417         if (ret == -1) {
4418                 printf("Unable to get ping response from node %u\n", options.pnn);
4419                 return -1;
4420         } else {
4421                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4422                        options.pnn, timeval_elapsed(&tv), ret);
4423         }
4424         return 0;
4425 }
4426
4427
4428 /*
4429   get a tunable
4430  */
4431 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4432 {
4433         const char *name;
4434         uint32_t value;
4435         int ret;
4436
4437         if (argc < 1) {
4438                 usage();
4439         }
4440
4441         name = argv[0];
4442         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4443         if (ret == -1) {
4444                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4445                 return -1;
4446         }
4447
4448         printf("%-23s = %u\n", name, value);
4449         return 0;
4450 }
4451
4452 /*
4453   set a tunable
4454  */
4455 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4456 {
4457         const char *name;
4458         uint32_t value;
4459         int ret;
4460
4461         if (argc < 2) {
4462                 usage();
4463         }
4464
4465         name = argv[0];
4466         value = strtoul(argv[1], NULL, 0);
4467
4468         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4469         if (ret == -1) {
4470                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4471                 return -1;
4472         }
4473         return 0;
4474 }
4475
4476 /*
4477   list all tunables
4478  */
4479 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4480 {
4481         uint32_t count;
4482         const char **list;
4483         int ret, i;
4484
4485         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4486         if (ret == -1) {
4487                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4488                 return -1;
4489         }
4490
4491         for (i=0;i<count;i++) {
4492                 control_getvar(ctdb, 1, &list[i]);
4493         }
4494
4495         talloc_free(list);
4496         
4497         return 0;
4498 }
4499
4500 /*
4501   display debug level on a node
4502  */
4503 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4504 {
4505         int ret;
4506         int32_t level;
4507
4508         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4509         if (ret != 0) {
4510                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4511                 return ret;
4512         } else {
4513                 if (options.machinereadable){
4514                         printf(":Name:Level:\n");
4515                         printf(":%s:%d:\n",get_debug_by_level(level),level);
4516                 } else {
4517                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
4518                 }
4519         }
4520         return 0;
4521 }
4522
4523 /*
4524   display reclock file of a node
4525  */
4526 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4527 {
4528         int ret;
4529         const char *reclock;
4530
4531         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4532         if (ret != 0) {
4533                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4534                 return ret;
4535         } else {
4536                 if (options.machinereadable){
4537                         if (reclock != NULL) {
4538                                 printf("%s", reclock);
4539                         }
4540                 } else {
4541                         if (reclock == NULL) {
4542                                 printf("No reclock file used.\n");
4543                         } else {
4544                                 printf("Reclock file:%s\n", reclock);
4545                         }
4546                 }
4547         }
4548         return 0;
4549 }
4550
4551 /*
4552   set the reclock file of a node
4553  */
4554 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4555 {
4556         int ret;
4557         const char *reclock;
4558
4559         if (argc == 0) {
4560                 reclock = NULL;
4561         } else if (argc == 1) {
4562                 reclock = argv[0];
4563         } else {
4564                 usage();
4565         }
4566
4567         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4568         if (ret != 0) {
4569                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4570                 return ret;
4571         }
4572         return 0;
4573 }
4574
4575 /*
4576   set the natgw state on/off
4577  */
4578 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4579 {
4580         int ret;
4581         uint32_t natgwstate;
4582
4583         if (argc == 0) {
4584                 usage();
4585         }
4586
4587         if (!strcmp(argv[0], "on")) {
4588                 natgwstate = 1;
4589         } else if (!strcmp(argv[0], "off")) {
4590                 natgwstate = 0;
4591         } else {
4592                 usage();
4593         }
4594
4595         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4596         if (ret != 0) {
4597                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4598                 return ret;
4599         }
4600
4601         return 0;
4602 }
4603
4604 /*
4605   set the lmaster role on/off
4606  */
4607 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4608 {
4609         int ret;
4610         uint32_t lmasterrole;
4611
4612         if (argc == 0) {
4613                 usage();
4614         }
4615
4616         if (!strcmp(argv[0], "on")) {
4617                 lmasterrole = 1;
4618         } else if (!strcmp(argv[0], "off")) {
4619                 lmasterrole = 0;
4620         } else {
4621                 usage();
4622         }
4623
4624         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
4625         if (ret != 0) {
4626                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
4627                 return ret;
4628         }
4629
4630         return 0;
4631 }
4632
4633 /*
4634   set the recmaster role on/off
4635  */
4636 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4637 {
4638         int ret;
4639         uint32_t recmasterrole;
4640
4641         if (argc == 0) {
4642                 usage();
4643         }
4644
4645         if (!strcmp(argv[0], "on")) {
4646                 recmasterrole = 1;
4647         } else if (!strcmp(argv[0], "off")) {
4648                 recmasterrole = 0;
4649         } else {
4650                 usage();
4651         }
4652
4653         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
4654         if (ret != 0) {
4655                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
4656                 return ret;
4657         }
4658
4659         return 0;
4660 }
4661
4662 /*
4663   set debug level on a node or all nodes
4664  */
4665 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4666 {
4667         int i, ret;
4668         int32_t level;
4669
4670         if (argc == 0) {
4671                 printf("You must specify the debug level. Valid levels are:\n");
4672                 for (i=0; debug_levels[i].description != NULL; i++) {
4673                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4674                 }
4675
4676                 return 0;
4677         }
4678
4679         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
4680                 level = get_debug_by_desc(argv[0]);
4681         } else {
4682                 level = strtol(argv[0], NULL, 0);
4683         }
4684
4685         for (i=0; debug_levels[i].description != NULL; i++) {
4686                 if (level == debug_levels[i].level) {
4687                         break;
4688                 }
4689         }
4690         if (debug_levels[i].description == NULL) {
4691                 printf("Invalid debug level, must be one of\n");
4692                 for (i=0; debug_levels[i].description != NULL; i++) {
4693                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4694                 }
4695                 return -1;
4696         }
4697
4698         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
4699         if (ret != 0) {
4700                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
4701         }
4702         return 0;
4703 }
4704
4705
4706 /*
4707   thaw a node
4708  */
4709 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
4710 {
4711         int ret;
4712         uint32_t priority;
4713         
4714         if (argc == 1) {
4715                 priority = strtol(argv[0], NULL, 0);
4716         } else {
4717                 priority = 0;
4718         }
4719         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
4720
4721         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
4722         if (ret != 0) {
4723                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
4724         }               
4725         return 0;
4726 }
4727
4728
4729 /*
4730   attach to a database
4731  */
4732 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
4733 {
4734         const char *db_name;
4735         struct ctdb_db_context *ctdb_db;
4736         bool persistent = false;
4737
4738         if (argc < 1) {
4739                 usage();
4740         }
4741         db_name = argv[0];
4742         if (argc > 2) {
4743                 usage();
4744         }
4745         if (argc == 2) {
4746                 if (strcmp(argv[1], "persistent") != 0) {
4747                         usage();
4748                 }
4749                 persistent = true;
4750         }
4751
4752         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4753         if (ctdb_db == NULL) {
4754                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4755                 return -1;
4756         }
4757
4758         return 0;
4759 }
4760
4761 /*
4762   set db priority
4763  */
4764 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4765 {
4766         struct ctdb_db_priority db_prio;
4767         int ret;
4768
4769         if (argc < 2) {
4770                 usage();
4771         }
4772
4773         db_prio.db_id    = strtoul(argv[0], NULL, 0);
4774         db_prio.priority = strtoul(argv[1], NULL, 0);
4775
4776         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
4777         if (ret != 0) {
4778                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
4779                 return -1;
4780         }
4781
4782         return 0;
4783 }
4784
4785 /*
4786   get db priority
4787  */
4788 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4789 {
4790         uint32_t db_id, priority;
4791         int ret;
4792
4793         if (argc < 1) {
4794                 usage();
4795         }
4796
4797         db_id = strtoul(argv[0], NULL, 0);
4798
4799         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
4800         if (ret != 0) {
4801                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
4802                 return -1;
4803         }
4804
4805         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
4806
4807         return 0;
4808 }
4809
4810 /*
4811   set the sticky records capability for a database
4812  */
4813 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
4814 {
4815         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4816         uint32_t db_id;
4817         struct ctdb_dbid_map *dbmap=NULL;
4818         int i, ret;
4819
4820         if (argc < 1) {
4821                 usage();
4822         }
4823
4824         if (!strncmp(argv[0], "0x", 2)) {
4825                 db_id = strtoul(argv[0] + 2, NULL, 0);
4826         } else {
4827                 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
4828                 if (ret != 0) {
4829                         DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4830                         talloc_free(tmp_ctx);
4831                         return ret;
4832                 }
4833                 for(i=0;i<dbmap->num;i++){
4834                         const char *name;
4835
4836                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
4837                         if(!strcmp(argv[0], name)){
4838                                 talloc_free(discard_const(name));
4839                                 break;
4840                         }
4841                         talloc_free(discard_const(name));
4842                 }
4843                 if (i == dbmap->num) {
4844                         DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
4845                         talloc_free(tmp_ctx);
4846                         return -1;
4847                 }
4848                 db_id = dbmap->dbs[i].dbid;
4849         }
4850
4851         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
4852         if (ret != 0) {
4853                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
4854                 talloc_free(tmp_ctx);
4855                 return -1;
4856         }
4857
4858         talloc_free(tmp_ctx);
4859         return 0;
4860 }
4861
4862 /*
4863   set the readonly capability for a database
4864  */
4865 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
4866 {
4867         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4868         uint32_t db_id;
4869         struct ctdb_dbid_map *dbmap=NULL;
4870         int i, ret;
4871
4872         if (argc < 1) {
4873                 usage();
4874         }
4875
4876         if (!strncmp(argv[0], "0x", 2)) {
4877                 db_id = strtoul(argv[0] + 2, NULL, 0);
4878         } else {
4879                 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
4880                 if (ret != 0) {
4881                         DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4882                         talloc_free(tmp_ctx);
4883                         return ret;
4884                 }
4885                 for(i=0;i<dbmap->num;i++){
4886                         const char *name;
4887
4888                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
4889                         if(!strcmp(argv[0], name)){
4890                                 talloc_free(discard_const(name));
4891                                 break;
4892                         }
4893                         talloc_free(discard_const(name));
4894                 }
4895                 if (i == dbmap->num) {
4896                         DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
4897                         talloc_free(tmp_ctx);
4898                         return -1;
4899                 }
4900                 db_id = dbmap->dbs[i].dbid;
4901         }
4902
4903         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
4904         if (ret != 0) {
4905                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
4906                 talloc_free(tmp_ctx);
4907                 return -1;
4908         }
4909
4910         talloc_free(tmp_ctx);
4911         return 0;
4912 }
4913
4914 /*
4915   get db seqnum
4916  */
4917 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
4918 {
4919         bool ret;
4920         uint32_t db_id;
4921         uint64_t seqnum;
4922
4923         if (argc < 1) {
4924                 usage();
4925         }
4926
4927         db_id = strtoul(argv[0], NULL, 0);
4928
4929         ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &seqnum);
4930         if (!ret) {
4931                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
4932                 return -1;
4933         }
4934
4935         printf("Sequence number:%lld\n", (long long)seqnum);
4936
4937         return 0;
4938 }
4939
4940 /*
4941   run an eventscript on a node
4942  */
4943 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
4944 {
4945         TDB_DATA data;
4946         int ret;
4947         int32_t res;
4948         char *errmsg;
4949         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4950
4951         if (argc != 1) {
4952                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4953                 return -1;
4954         }
4955
4956         data.dptr = (unsigned char *)discard_const(argv[0]);
4957         data.dsize = strlen((char *)data.dptr) + 1;
4958
4959         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
4960
4961         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
4962                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4963         if (ret != 0 || res != 0) {
4964                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
4965                 talloc_free(tmp_ctx);
4966                 return -1;
4967         }
4968         talloc_free(tmp_ctx);
4969         return 0;
4970 }
4971
4972 #define DB_VERSION 1
4973 #define MAX_DB_NAME 64
4974 struct db_file_header {
4975         unsigned long version;
4976         time_t timestamp;
4977         unsigned long persistent;
4978         unsigned long size;
4979         const char name[MAX_DB_NAME];
4980 };
4981
4982 struct backup_data {
4983         struct ctdb_marshall_buffer *records;
4984         uint32_t len;
4985         uint32_t total;
4986         bool traverse_error;
4987 };
4988
4989 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
4990 {
4991         struct backup_data *bd = talloc_get_type(private, struct backup_data);
4992         struct ctdb_rec_data *rec;
4993
4994         /* add the record */
4995         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
4996         if (rec == NULL) {
4997                 bd->traverse_error = true;
4998                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
4999                 return -1;
5000         }
5001         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
5002         if (bd->records == NULL) {
5003                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
5004                 bd->traverse_error = true;
5005                 return -1;
5006         }
5007         bd->records->count++;
5008         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
5009         bd->len += rec->length;
5010         talloc_free(rec);
5011
5012         bd->total++;
5013         return 0;
5014 }
5015
5016 /*
5017  * backup a database to a file 
5018  */
5019 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
5020 {
5021         int i, ret;
5022         struct ctdb_dbid_map *dbmap=NULL;
5023         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5024         struct db_file_header dbhdr;
5025         struct ctdb_db_context *ctdb_db;
5026         struct backup_data *bd;
5027         int fh = -1;
5028         int status = -1;
5029         const char *reason = NULL;
5030
5031         if (argc != 2) {
5032                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5033                 return -1;
5034         }
5035
5036         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
5037         if (ret != 0) {
5038                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
5039                 return ret;
5040         }
5041
5042         for(i=0;i<dbmap->num;i++){
5043                 const char *name;
5044
5045                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
5046                 if(!strcmp(argv[0], name)){
5047                         talloc_free(discard_const(name));
5048                         break;
5049                 }
5050                 talloc_free(discard_const(name));
5051         }
5052         if (i == dbmap->num) {
5053                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
5054                 talloc_free(tmp_ctx);
5055                 return -1;
5056         }
5057
5058         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
5059                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
5060         if (ret != 0) {
5061                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
5062                                  argv[0]));
5063                 talloc_free(tmp_ctx);
5064                 return -1;
5065         }
5066         if (reason) {
5067                 uint32_t allow_unhealthy = 0;
5068
5069                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
5070                                       "AllowUnhealthyDBRead",
5071                                       &allow_unhealthy);
5072
5073                 if (allow_unhealthy != 1) {
5074                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
5075                                          argv[0], reason));
5076
5077                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
5078                                          allow_unhealthy));
5079                         talloc_free(tmp_ctx);
5080                         return -1;
5081                 }
5082
5083                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
5084                                      argv[0], argv[0]));
5085                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
5086                                      "tunnable AllowUnhealthyDBRead = %u\n",
5087                                      allow_unhealthy));
5088         }
5089
5090         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5091         if (ctdb_db == NULL) {
5092                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
5093                 talloc_free(tmp_ctx);
5094                 return -1;
5095         }
5096
5097
5098         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
5099         if (ret == -1) {
5100                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
5101                 talloc_free(tmp_ctx);
5102                 return -1;
5103         }
5104
5105
5106         bd = talloc_zero(tmp_ctx, struct backup_data);
5107         if (bd == NULL) {
5108                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
5109                 talloc_free(tmp_ctx);
5110                 return -1;
5111         }
5112
5113         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
5114         if (bd->records == NULL) {
5115                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
5116                 talloc_free(tmp_ctx);
5117                 return -1;
5118         }
5119
5120         bd->len = offsetof(struct ctdb_marshall_buffer, data);
5121         bd->records->db_id = ctdb_db->db_id;
5122         /* traverse the database collecting all records */
5123         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
5124             bd->traverse_error) {
5125                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5126                 talloc_free(tmp_ctx);
5127                 return -1;              
5128         }
5129
5130         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5131
5132
5133         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5134         if (fh == -1) {
5135                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5136                 talloc_free(tmp_ctx);
5137                 return -1;
5138         }
5139
5140         dbhdr.version = DB_VERSION;
5141         dbhdr.timestamp = time(NULL);
5142         dbhdr.persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
5143         dbhdr.size = bd->len;
5144         if (strlen(argv[0]) >= MAX_DB_NAME) {
5145                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5146                 goto done;
5147         }
5148         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
5149         ret = write(fh, &dbhdr, sizeof(dbhdr));
5150         if (ret == -1) {
5151                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5152                 goto done;
5153         }
5154         ret = write(fh, bd->records, bd->len);
5155         if (ret == -1) {
5156                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5157                 goto done;
5158         }
5159
5160         status = 0;
5161 done:
5162         if (fh != -1) {
5163                 ret = close(fh);
5164                 if (ret == -1) {
5165                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5166                 }
5167         }
5168
5169         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5170
5171         talloc_free(tmp_ctx);
5172         return status;
5173 }
5174
5175 /*
5176  * restore a database from a file 
5177  */
5178 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5179 {
5180         int ret;
5181         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5182         TDB_DATA outdata;
5183         TDB_DATA data;
5184         struct db_file_header dbhdr;
5185         struct ctdb_db_context *ctdb_db;
5186         struct ctdb_node_map *nodemap=NULL;
5187         struct ctdb_vnn_map *vnnmap=NULL;
5188         int i, fh;
5189         struct ctdb_control_wipe_database w;
5190         uint32_t *nodes;
5191         uint32_t generation;
5192         struct tm *tm;
5193         char tbuf[100];
5194         char *dbname;
5195
5196         if (argc < 1 || argc > 2) {
5197                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5198                 return -1;
5199         }
5200
5201         fh = open(argv[0], O_RDONLY);
5202         if (fh == -1) {
5203                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5204                 talloc_free(tmp_ctx);
5205                 return -1;
5206         }
5207
5208         read(fh, &dbhdr, sizeof(dbhdr));
5209         if (dbhdr.version != DB_VERSION) {
5210                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5211                 talloc_free(tmp_ctx);
5212                 return -1;
5213         }
5214
5215         dbname = discard_const(dbhdr.name);
5216         if (argc == 2) {
5217                 dbname = discard_const(argv[1]);
5218         }
5219
5220         outdata.dsize = dbhdr.size;
5221         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5222         if (outdata.dptr == NULL) {
5223                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5224                 close(fh);
5225                 talloc_free(tmp_ctx);
5226                 return -1;
5227         }               
5228         read(fh, outdata.dptr, outdata.dsize);
5229         close(fh);
5230
5231         tm = localtime(&dbhdr.timestamp);
5232         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5233         printf("Restoring database '%s' from backup @ %s\n",
5234                 dbname, tbuf);
5235
5236
5237         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5238         if (ctdb_db == NULL) {
5239                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5240                 talloc_free(tmp_ctx);
5241                 return -1;
5242         }
5243
5244         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5245         if (ret != 0) {
5246                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5247                 talloc_free(tmp_ctx);
5248                 return ret;
5249         }
5250
5251
5252         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5253         if (ret != 0) {
5254                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5255                 talloc_free(tmp_ctx);
5256                 return ret;
5257         }
5258
5259         /* freeze all nodes */
5260         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5261         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5262                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5263                                         nodes, i,
5264                                         TIMELIMIT(),
5265                                         false, tdb_null,
5266                                         NULL, NULL,
5267                                         NULL) != 0) {
5268                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5269                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5270                         talloc_free(tmp_ctx);
5271                         return -1;
5272                 }
5273         }
5274
5275         generation = vnnmap->generation;
5276         data.dptr = (void *)&generation;
5277         data.dsize = sizeof(generation);
5278
5279         /* start a cluster wide transaction */
5280         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5281         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5282                                         nodes, 0,
5283                                         TIMELIMIT(), false, data,
5284                                         NULL, NULL,
5285                                         NULL) != 0) {
5286                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5287                 return -1;
5288         }
5289
5290
5291         w.db_id = ctdb_db->db_id;
5292         w.transaction_id = generation;
5293
5294         data.dptr = (void *)&w;
5295         data.dsize = sizeof(w);
5296
5297         /* wipe all the remote databases. */
5298         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5299         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5300                                         nodes, 0,
5301                                         TIMELIMIT(), false, data,
5302                                         NULL, NULL,
5303                                         NULL) != 0) {
5304                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5305                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5306                 talloc_free(tmp_ctx);
5307                 return -1;
5308         }
5309         
5310         /* push the database */
5311         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5312         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5313                                         nodes, 0,
5314                                         TIMELIMIT(), false, outdata,
5315                                         NULL, NULL,
5316                                         NULL) != 0) {
5317                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5318                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5319                 talloc_free(tmp_ctx);
5320                 return -1;
5321         }
5322
5323         data.dptr = (void *)&ctdb_db->db_id;
5324         data.dsize = sizeof(ctdb_db->db_id);
5325
5326         /* mark the database as healthy */
5327         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5328         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5329                                         nodes, 0,
5330                                         TIMELIMIT(), false, data,
5331                                         NULL, NULL,
5332                                         NULL) != 0) {
5333                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5334                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5335                 talloc_free(tmp_ctx);
5336                 return -1;
5337         }
5338
5339         data.dptr = (void *)&generation;
5340         data.dsize = sizeof(generation);
5341
5342         /* commit all the changes */
5343         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5344                                         nodes, 0,
5345                                         TIMELIMIT(), false, data,
5346                                         NULL, NULL,
5347                                         NULL) != 0) {
5348                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5349                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5350                 talloc_free(tmp_ctx);
5351                 return -1;
5352         }
5353
5354
5355         /* thaw all nodes */
5356         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5357         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5358                                         nodes, 0,
5359                                         TIMELIMIT(),
5360                                         false, tdb_null,
5361                                         NULL, NULL,
5362                                         NULL) != 0) {
5363                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5364                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5365                 talloc_free(tmp_ctx);
5366                 return -1;
5367         }
5368
5369
5370         talloc_free(tmp_ctx);
5371         return 0;
5372 }
5373
5374 /*
5375  * dump a database backup from a file
5376  */
5377 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5378 {
5379         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5380         TDB_DATA outdata;
5381         struct db_file_header dbhdr;
5382         int i, fh;
5383         struct tm *tm;
5384         char tbuf[100];
5385         struct ctdb_rec_data *rec = NULL;
5386         struct ctdb_marshall_buffer *m;
5387         struct ctdb_dump_db_context c;
5388
5389         if (argc != 1) {
5390                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5391                 return -1;
5392         }
5393
5394         fh = open(argv[0], O_RDONLY);
5395         if (fh == -1) {
5396                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5397                 talloc_free(tmp_ctx);
5398                 return -1;
5399         }
5400
5401         read(fh, &dbhdr, sizeof(dbhdr));
5402         if (dbhdr.version != DB_VERSION) {
5403                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5404                 talloc_free(tmp_ctx);
5405                 return -1;
5406         }
5407
5408         outdata.dsize = dbhdr.size;
5409         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5410         if (outdata.dptr == NULL) {
5411                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5412                 close(fh);
5413                 talloc_free(tmp_ctx);
5414                 return -1;
5415         }
5416         read(fh, outdata.dptr, outdata.dsize);
5417         close(fh);
5418         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5419
5420         tm = localtime(&dbhdr.timestamp);
5421         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5422         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5423                 dbhdr.name, m->db_id, tbuf);
5424
5425         ZERO_STRUCT(c);
5426         c.f = stdout;
5427         c.printemptyrecords = (bool)options.printemptyrecords;
5428         c.printdatasize = (bool)options.printdatasize;
5429         c.printlmaster = false;
5430         c.printhash = (bool)options.printhash;
5431         c.printrecordflags = (bool)options.printrecordflags;
5432
5433         for (i=0; i < m->count; i++) {
5434                 uint32_t reqid = 0;
5435                 TDB_DATA key, data;
5436
5437                 /* we do not want the header splitted, so we pass NULL*/
5438                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5439                                               NULL, &key, &data);
5440
5441                 ctdb_dumpdb_record(ctdb, key, data, &c);
5442         }
5443
5444         printf("Dumped %d records\n", i);
5445         talloc_free(tmp_ctx);
5446         return 0;
5447 }
5448
5449 /*
5450  * wipe a database from a file
5451  */
5452 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5453                           const char **argv)
5454 {
5455         int ret;
5456         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5457         TDB_DATA data;
5458         struct ctdb_db_context *ctdb_db;
5459         struct ctdb_node_map *nodemap = NULL;
5460         struct ctdb_vnn_map *vnnmap = NULL;
5461         int i;
5462         struct ctdb_control_wipe_database w;
5463         uint32_t *nodes;
5464         uint32_t generation;
5465         struct ctdb_dbid_map *dbmap = NULL;
5466
5467         if (argc != 1) {
5468                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5469                 return -1;
5470         }
5471
5472         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5473                                  &dbmap);
5474         if (ret != 0) {
5475                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
5476                                   options.pnn));
5477                 return ret;
5478         }
5479
5480         for(i=0;i<dbmap->num;i++){
5481                 const char *name;
5482
5483                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
5484                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
5485                 if(!strcmp(argv[0], name)){
5486                         talloc_free(discard_const(name));
5487                         break;
5488                 }
5489                 talloc_free(discard_const(name));
5490         }
5491         if (i == dbmap->num) {
5492                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
5493                                   argv[0]));
5494                 talloc_free(tmp_ctx);
5495                 return -1;
5496         }
5497
5498         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5499         if (ctdb_db == NULL) {
5500                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5501                                   argv[0]));
5502                 talloc_free(tmp_ctx);
5503                 return -1;
5504         }
5505
5506         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5507                                    &nodemap);
5508         if (ret != 0) {
5509                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5510                                   options.pnn));
5511                 talloc_free(tmp_ctx);
5512                 return ret;
5513         }
5514
5515         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5516                                   &vnnmap);
5517         if (ret != 0) {
5518                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5519                                   options.pnn));
5520                 talloc_free(tmp_ctx);
5521                 return ret;
5522         }
5523
5524         /* freeze all nodes */
5525         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5526         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5527                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5528                                                 nodes, i,
5529                                                 TIMELIMIT(),
5530                                                 false, tdb_null,
5531                                                 NULL, NULL,
5532                                                 NULL);
5533                 if (ret != 0) {
5534                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5535                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5536                                              CTDB_RECOVERY_ACTIVE);
5537                         talloc_free(tmp_ctx);
5538                         return -1;
5539                 }
5540         }
5541
5542         generation = vnnmap->generation;
5543         data.dptr = (void *)&generation;
5544         data.dsize = sizeof(generation);
5545
5546         /* start a cluster wide transaction */
5547         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5548         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5549                                         nodes, 0,
5550                                         TIMELIMIT(), false, data,
5551                                         NULL, NULL,
5552                                         NULL);
5553         if (ret!= 0) {
5554                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5555                                   "transactions.\n"));
5556                 return -1;
5557         }
5558
5559         w.db_id = ctdb_db->db_id;
5560         w.transaction_id = generation;
5561
5562         data.dptr = (void *)&w;
5563         data.dsize = sizeof(w);
5564
5565         /* wipe all the remote databases. */
5566         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5567         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5568                                         nodes, 0,
5569                                         TIMELIMIT(), false, data,
5570                                         NULL, NULL,
5571                                         NULL) != 0) {
5572                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5573                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5574                 talloc_free(tmp_ctx);
5575                 return -1;
5576         }
5577
5578         data.dptr = (void *)&ctdb_db->db_id;
5579         data.dsize = sizeof(ctdb_db->db_id);
5580
5581         /* mark the database as healthy */
5582         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5583         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5584                                         nodes, 0,
5585                                         TIMELIMIT(), false, data,
5586                                         NULL, NULL,
5587                                         NULL) != 0) {
5588                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5589                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5590                 talloc_free(tmp_ctx);
5591                 return -1;
5592         }
5593
5594         data.dptr = (void *)&generation;
5595         data.dsize = sizeof(generation);
5596
5597         /* commit all the changes */
5598         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5599                                         nodes, 0,
5600                                         TIMELIMIT(), false, data,
5601                                         NULL, NULL,
5602                                         NULL) != 0) {
5603                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5604                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5605                 talloc_free(tmp_ctx);
5606                 return -1;
5607         }
5608
5609         /* thaw all nodes */
5610         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5611         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5612                                         nodes, 0,
5613                                         TIMELIMIT(),
5614                                         false, tdb_null,
5615                                         NULL, NULL,
5616                                         NULL) != 0) {
5617                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5618                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5619                 talloc_free(tmp_ctx);
5620                 return -1;
5621         }
5622
5623         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
5624
5625         talloc_free(tmp_ctx);
5626         return 0;
5627 }
5628
5629 /*
5630   dump memory usage
5631  */
5632 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5633 {
5634         TDB_DATA data;
5635         int ret;
5636         int32_t res;
5637         char *errmsg;
5638         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5639         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
5640                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
5641         if (ret != 0 || res != 0) {
5642                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
5643                 talloc_free(tmp_ctx);
5644                 return -1;
5645         }
5646         write(1, data.dptr, data.dsize);
5647         talloc_free(tmp_ctx);
5648         return 0;
5649 }
5650
5651 /*
5652   handler for memory dumps
5653 */
5654 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5655                              TDB_DATA data, void *private_data)
5656 {
5657         write(1, data.dptr, data.dsize);
5658         exit(0);
5659 }
5660
5661 /*
5662   dump memory usage on the recovery daemon
5663  */
5664 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5665 {
5666         int ret;
5667         TDB_DATA data;
5668         struct rd_memdump_reply rd;
5669
5670         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
5671         if (rd.pnn == -1) {
5672                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
5673                 return -1;
5674         }
5675         rd.srvid = getpid();
5676
5677         /* register a message port for receiveing the reply so that we
5678            can receive the reply
5679         */
5680         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
5681
5682
5683         data.dptr = (uint8_t *)&rd;
5684         data.dsize = sizeof(rd);
5685
5686         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
5687         if (ret != 0) {
5688                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5689                 return -1;
5690         }
5691
5692         /* this loop will terminate when we have received the reply */
5693         while (1) {     
5694                 event_loop_once(ctdb->ev);
5695         }
5696
5697         return 0;
5698 }
5699
5700 /*
5701   send a message to a srvid
5702  */
5703 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
5704 {
5705         unsigned long srvid;
5706         int ret;
5707         TDB_DATA data;
5708
5709         if (argc < 2) {
5710                 usage();
5711         }
5712
5713         srvid      = strtoul(argv[0], NULL, 0);
5714
5715         data.dptr = (uint8_t *)discard_const(argv[1]);
5716         data.dsize= strlen(argv[1]);
5717
5718         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
5719         if (ret != 0) {
5720                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5721                 return -1;
5722         }
5723
5724         return 0;
5725 }
5726
5727 /*
5728   handler for msglisten
5729 */
5730 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5731                              TDB_DATA data, void *private_data)
5732 {
5733         int i;
5734
5735         printf("Message received: ");
5736         for (i=0;i<data.dsize;i++) {
5737                 printf("%c", data.dptr[i]);
5738         }
5739         printf("\n");
5740 }
5741
5742 /*
5743   listen for messages on a messageport
5744  */
5745 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
5746 {
5747         uint64_t srvid;
5748
5749         srvid = getpid();
5750
5751         /* register a message port and listen for messages
5752         */
5753         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
5754         printf("Listening for messages on srvid:%d\n", (int)srvid);
5755
5756         while (1) {     
5757                 event_loop_once(ctdb->ev);
5758         }
5759
5760         return 0;
5761 }
5762
5763 /*
5764   list all nodes in the cluster
5765   we parse the nodes file directly
5766  */
5767 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
5768 {
5769         TALLOC_CTX *mem_ctx = talloc_new(NULL);
5770         struct pnn_node *pnn_nodes;
5771         struct pnn_node *pnn_node;
5772
5773         pnn_nodes = read_nodes_file(mem_ctx);
5774         if (pnn_nodes == NULL) {
5775                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
5776                 talloc_free(mem_ctx);
5777                 return -1;
5778         }
5779
5780         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
5781                 ctdb_sock_addr addr;
5782                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
5783                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
5784                         talloc_free(mem_ctx);
5785                         return -1;
5786                 }
5787                 if (options.machinereadable){
5788                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
5789                 } else {
5790                         printf("%s\n", pnn_node->addr);
5791                 }
5792         }
5793         talloc_free(mem_ctx);
5794
5795         return 0;
5796 }
5797
5798 /*
5799   reload the nodes file on the local node
5800  */
5801 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
5802 {
5803         int i, ret;
5804         int mypnn;
5805         struct ctdb_node_map *nodemap=NULL;
5806
5807         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
5808         if (mypnn == -1) {
5809                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
5810                 return -1;
5811         }
5812
5813         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
5814         if (ret != 0) {
5815                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
5816                 return ret;
5817         }
5818
5819         /* reload the nodes file on all remote nodes */
5820         for (i=0;i<nodemap->num;i++) {
5821                 if (nodemap->nodes[i].pnn == mypnn) {
5822                         continue;
5823                 }
5824                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
5825                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
5826                         nodemap->nodes[i].pnn);
5827                 if (ret != 0) {
5828                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
5829                 }
5830         }
5831
5832         /* reload the nodes file on the local node */
5833         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
5834         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
5835         if (ret != 0) {
5836                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
5837         }
5838
5839         /* initiate a recovery */
5840         control_recover(ctdb, argc, argv);
5841
5842         return 0;
5843 }
5844
5845
5846 static const struct {
5847         const char *name;
5848         int (*fn)(struct ctdb_context *, int, const char **);
5849         bool auto_all;
5850         bool without_daemon; /* can be run without daemon running ? */
5851         const char *msg;
5852         const char *args;
5853 } ctdb_commands[] = {
5854         { "version",         control_version,           true,   true,   "show version of ctdb" },
5855         { "status",          control_status,            true,   false,  "show node status" },
5856         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
5857         { "ping",            control_ping,              true,   false,  "ping all nodes" },
5858         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
5859         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
5860         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
5861         { "statistics",      control_statistics,        false,  false, "show statistics" },
5862         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
5863         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
5864         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
5865         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
5866         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
5867         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
5868         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
5869         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
5870         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
5871         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname>"},
5872         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname>"},
5873         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
5874         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
5875         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
5876         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
5877         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
5878         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
5879         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
5880         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
5881         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
5882         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "[<level>] [recoverd]" },
5883         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer", "[recoverd]" },
5884         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
5885         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
5886         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
5887         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
5888         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
5889         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
5890         { "stop",            control_stop,              true,   false,  "stop a node" },
5891         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
5892         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
5893         { "unban",           control_unban,             true,   false,  "unban a node" },
5894         { "showban",         control_showban,           true,   false,  "show ban information"},
5895         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
5896         { "recover",         control_recover,           true,   false,  "force recovery" },
5897         { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
5898         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
5899         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
5900         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
5901         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
5902         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
5903         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
5904         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
5905         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
5906
5907         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
5908
5909         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
5910         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
5911         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
5912         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
5913         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
5914         { "vacuum",          ctdb_vacuum,               false,  true, "vacuum the databases of empty records", "[max_records]"},
5915         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
5916         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
5917         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
5918         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
5919         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
5920         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
5921         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
5922         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
5923         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
5924         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
5925         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
5926         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
5927         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
5928         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
5929         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
5930         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
5931         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
5932         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
5933         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
5934         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
5935         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
5936         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
5937         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
5938         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
5939         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
5940         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbid>|<name>"},
5941         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbid>|<name>"},
5942         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
5943         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
5944         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
5945         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<db> <key> [<file>]" },
5946         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
5947         { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<db> <key>" },
5948         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
5949         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data+header>" },
5950         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
5951         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
5952         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
5953         { "rebalancenode",     control_rebalancenode,   false,  false, "release a node by allowing it to takeover ips", "<pnn>"},
5954         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbid>" },
5955         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status" },
5956         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<db>" },
5957         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on a node" },
5958         { "ipiface",         control_ipiface,           true,   true,  "Find which interface an ip address is hsoted on", "<ip>" },
5959 };
5960
5961 /*
5962   show usage message
5963  */
5964 static void usage(void)
5965 {
5966         int i;
5967         printf(
5968 "Usage: ctdb [options] <control>\n" \
5969 "Options:\n" \
5970 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
5971 "   -Y                 generate machinereadable output\n"
5972 "   -v                 generate verbose output\n"
5973 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
5974         printf("Controls:\n");
5975         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
5976                 printf("  %-15s %-27s  %s\n", 
5977                        ctdb_commands[i].name, 
5978                        ctdb_commands[i].args?ctdb_commands[i].args:"",
5979                        ctdb_commands[i].msg);
5980         }
5981         exit(1);
5982 }
5983
5984
5985 static void ctdb_alarm(int sig)
5986 {
5987         printf("Maximum runtime exceeded - exiting\n");
5988         _exit(ERR_TIMEOUT);
5989 }
5990
5991 /*
5992   main program
5993 */
5994 int main(int argc, const char *argv[])
5995 {
5996         struct ctdb_context *ctdb;
5997         char *nodestring = NULL;
5998         struct poptOption popt_options[] = {
5999                 POPT_AUTOHELP
6000                 POPT_CTDB_CMDLINE
6001                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
6002                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
6003                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
6004                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
6005                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
6006                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
6007                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
6008                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
6009                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
6010                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
6011                 POPT_TABLEEND
6012         };
6013         int opt;
6014         const char **extra_argv;
6015         int extra_argc = 0;
6016         int ret=-1, i;
6017         poptContext pc;
6018         struct event_context *ev;
6019         const char *control;
6020         const char *socket_name;
6021
6022         setlinebuf(stdout);
6023         
6024         /* set some defaults */
6025         options.maxruntime = 0;
6026         options.timelimit = 3;
6027         options.pnn = CTDB_CURRENT_NODE;
6028
6029         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
6030
6031         while ((opt = poptGetNextOpt(pc)) != -1) {
6032                 switch (opt) {
6033                 default:
6034                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
6035                                 poptBadOption(pc, 0), poptStrerror(opt)));
6036                         exit(1);
6037                 }
6038         }
6039
6040         /* setup the remaining options for the main program to use */
6041         extra_argv = poptGetArgs(pc);
6042         if (extra_argv) {
6043                 extra_argv++;
6044                 while (extra_argv[extra_argc]) extra_argc++;
6045         }
6046
6047         if (extra_argc < 1) {
6048                 usage();
6049         }
6050
6051         if (options.maxruntime == 0) {
6052                 const char *ctdb_timeout;
6053                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6054                 if (ctdb_timeout != NULL) {
6055                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6056                 } else {
6057                         /* default timeout is 120 seconds */
6058                         options.maxruntime = 120;
6059                 }
6060         }
6061
6062         signal(SIGALRM, ctdb_alarm);
6063         alarm(options.maxruntime);
6064
6065         control = extra_argv[0];
6066
6067         ev = event_context_init(NULL);
6068         if (!ev) {
6069                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
6070                 exit(1);
6071         }
6072
6073         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
6074                 if (strcmp(control, ctdb_commands[i].name) == 0) {
6075                         break;
6076                 }
6077         }
6078
6079         if (i == ARRAY_SIZE(ctdb_commands)) {
6080                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
6081                 exit(1);
6082         }
6083
6084         if (ctdb_commands[i].without_daemon == true) {
6085                 if (nodestring != NULL) {
6086                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
6087                         exit(1);
6088                 }
6089                 close(2);
6090                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
6091         }
6092
6093         /* initialise ctdb */
6094         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
6095
6096         if (ctdb == NULL) {
6097                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
6098                 exit(1);
6099         }
6100
6101         /* initialize a libctdb connection as well */
6102         socket_name = ctdb_get_socketname(ctdb);
6103         ctdb_connection = ctdb_connect(socket_name,
6104                                        ctdb_log_file, stderr);
6105         if (ctdb_connection == NULL) {
6106                 DEBUG(DEBUG_ERR, ("Failed to connect to daemon from libctdb\n"));
6107                 exit(1);
6108         }                               
6109
6110         /* setup the node number(s) to contact */
6111         if (!parse_nodestring(ctdb, nodestring, CTDB_CURRENT_NODE, false,
6112                               &options.nodes, &options.pnn)) {
6113                 usage();
6114         }
6115
6116         if (options.pnn == CTDB_CURRENT_NODE) {
6117                 options.pnn = options.nodes[0];
6118         }
6119
6120         if (ctdb_commands[i].auto_all && 
6121             ((options.pnn == CTDB_BROADCAST_ALL) ||
6122              (options.pnn == CTDB_MULTICAST))) {
6123                 int j;
6124
6125                 ret = 0;
6126                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6127                         options.pnn = options.nodes[j];
6128                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6129                 }
6130         } else {
6131                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6132         }
6133
6134         ctdb_disconnect(ctdb_connection);
6135         talloc_free(ctdb);
6136         talloc_free(ev);
6137         (void)poptFreeContext(pc);
6138
6139         return ret;
6140
6141 }