tools/ctdb: Factor out printing of the machine readable status header
[obnox/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_client.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 struct ctdb_connection *ctdb_connection;
39
40 static void usage(void);
41
42 static struct {
43         int timelimit;
44         uint32_t pnn;
45         uint32_t *nodes;
46         int machinereadable;
47         int verbose;
48         int maxruntime;
49         int printemptyrecords;
50         int printdatasize;
51         int printlmaster;
52         int printhash;
53         int printrecordflags;
54 } options;
55
56 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
57 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
58
59 #ifdef CTDB_VERS
60 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
61 {
62 #define STR(x) #x
63 #define XSTR(x) STR(x)
64         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
65         return 0;
66 }
67 #endif
68
69 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
70                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
71                                    "Out of memory in " __location__ )); \
72                 abort();                                                \
73         }} while (0)
74
75 /* Pretty print the flags to a static buffer in human-readable format.
76  * This never returns NULL!
77  */
78 static const char *pretty_print_flags(uint32_t flags)
79 {
80         int j;
81         static const struct {
82                 uint32_t flag;
83                 const char *name;
84         } flag_names[] = {
85                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
86                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
87                 { NODE_FLAGS_BANNED,                "BANNED" },
88                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
89                 { NODE_FLAGS_DELETED,               "DELETED" },
90                 { NODE_FLAGS_STOPPED,               "STOPPED" },
91                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
92         };
93         static char flags_str[512]; /* Big enough to contain all flag names */
94
95         flags_str[0] = '\0';
96         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
97                 if (flags & flag_names[j].flag) {
98                         if (flags_str[0] == '\0') {
99                                 (void) strcpy(flags_str, flag_names[j].name);
100                         } else {
101                                 (void) strcat(flags_str, "|");
102                                 (void) strcat(flags_str, flag_names[j].name);
103                         }
104                 }
105         }
106         if (flags_str[0] == '\0') {
107                 (void) strcpy(flags_str, "OK");
108         }
109
110         return flags_str;
111 }
112
113 static int h2i(char h)
114 {
115         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
116         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
117         return h - '0';
118 }
119
120 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
121 {
122         int i, len;
123         TDB_DATA key = {NULL, 0};
124
125         len = strlen(str);
126         if (len & 0x01) {
127                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
128                 return key;
129         }
130
131         key.dsize = len>>1;
132         key.dptr  = talloc_size(mem_ctx, key.dsize);
133
134         for (i=0; i < len/2; i++) {
135                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
136         }
137         return key;
138 }
139
140 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
141  * that are disconnected or deleted.  If dd_ok is true those nodes are
142  * included in the output list of nodes.  If dd_ok is false, those
143  * nodes are filtered from the "all" case and cause an error if
144  * explicitly specified.
145  */
146 static bool parse_nodestring(struct ctdb_context *ctdb,
147                              const char * nodestring,
148                              uint32_t current_pnn,
149                              bool dd_ok,
150                              uint32_t **nodes,
151                              uint32_t *pnn_mode)
152 {
153         int n;
154         uint32_t i;
155         struct ctdb_node_map *nodemap;
156        
157         *nodes = NULL;
158
159         if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
160                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
161                 exit(10);
162         }
163
164         if (nodestring != NULL) {
165                 *nodes = talloc_array(ctdb, uint32_t, 0);
166                 CTDB_NOMEM_ABORT(*nodes);
167                
168                 n = 0;
169
170                 if (strcmp(nodestring, "all") == 0) {
171                         *pnn_mode = CTDB_BROADCAST_ALL;
172
173                         /* all */
174                         for (i = 0; i < nodemap->num; i++) {
175                                 if ((nodemap->nodes[i].flags & 
176                                      (NODE_FLAGS_DISCONNECTED |
177                                       NODE_FLAGS_DELETED)) && !dd_ok) {
178                                         continue;
179                                 }
180                                 *nodes = talloc_realloc(ctdb, *nodes,
181                                                         uint32_t, n+1);
182                                 CTDB_NOMEM_ABORT(*nodes);
183                                 (*nodes)[n] = i;
184                                 n++;
185                         }
186                 } else {
187                         /* x{,y...} */
188                         char *ns, *tok;
189                        
190                         ns = talloc_strdup(ctdb, nodestring);
191                         tok = strtok(ns, ",");
192                         while (tok != NULL) {
193                                 uint32_t pnn;
194                                 i = (uint32_t)strtoul(tok, NULL, 0);
195                                 if (i >= nodemap->num) {
196                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
197                                         exit(ERR_NONODE);
198                                 }
199                                 if ((nodemap->nodes[i].flags & 
200                                      (NODE_FLAGS_DISCONNECTED |
201                                       NODE_FLAGS_DELETED)) && !dd_ok) {
202                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
203                                         exit(ERR_DISNODE);
204                                 }
205                                 if (!ctdb_getpnn(ctdb_connection, i, &pnn)) {
206                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
207                                         exit(10);
208                                 }
209
210                                 *nodes = talloc_realloc(ctdb, *nodes,
211                                                         uint32_t, n+1);
212                                 CTDB_NOMEM_ABORT(*nodes);
213
214                                 (*nodes)[n] = i;
215                                 n++;
216
217                                 tok = strtok(NULL, ",");
218                         }
219                         talloc_free(ns);
220
221                         if (n == 1) {
222                                 *pnn_mode = (*nodes)[0];
223                         } else {
224                                 *pnn_mode = CTDB_MULTICAST;
225                         }
226                 }
227         } else {
228                 /* default - no nodes specified */
229                 *nodes = talloc_array(ctdb, uint32_t, 1);
230                 CTDB_NOMEM_ABORT(*nodes);
231                 *pnn_mode = CTDB_CURRENT_NODE;
232
233                 if (!ctdb_getpnn(ctdb_connection, current_pnn,
234                                  &((*nodes)[0]))) {
235                         return false;
236                 }
237         }
238
239         ctdb_free_nodemap(nodemap);
240
241         return true;
242 }
243
244 /*
245  check if a database exists
246 */
247 static int db_exists(struct ctdb_context *ctdb, const char *db_name, bool *persistent)
248 {
249         int i, ret;
250         struct ctdb_dbid_map *dbmap=NULL;
251
252         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
253         if (ret != 0) {
254                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
255                 return -1;
256         }
257
258         for(i=0;i<dbmap->num;i++){
259                 const char *name;
260
261                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
262                 if (!strcmp(name, db_name)) {
263                         if (persistent) {
264                                 *persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
265                         }
266                         return 0;
267                 }
268         }
269
270         return -1;
271 }
272
273 /*
274   see if a process exists
275  */
276 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
277 {
278         uint32_t pnn, pid;
279         int ret;
280         if (argc < 1) {
281                 usage();
282         }
283
284         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
285                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
286                 return -1;
287         }
288
289         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
290         if (ret == 0) {
291                 printf("%u:%u exists\n", pnn, pid);
292         } else {
293                 printf("%u:%u does not exist\n", pnn, pid);
294         }
295         return ret;
296 }
297
298 /*
299   display statistics structure
300  */
301 static void show_statistics(struct ctdb_statistics *s, int show_header)
302 {
303         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
304         int i;
305         const char *prefix=NULL;
306         int preflen=0;
307         int tmp, days, hours, minutes, seconds;
308         const struct {
309                 const char *name;
310                 uint32_t offset;
311         } fields[] = {
312 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
313                 STATISTICS_FIELD(num_clients),
314                 STATISTICS_FIELD(frozen),
315                 STATISTICS_FIELD(recovering),
316                 STATISTICS_FIELD(num_recoveries),
317                 STATISTICS_FIELD(client_packets_sent),
318                 STATISTICS_FIELD(client_packets_recv),
319                 STATISTICS_FIELD(node_packets_sent),
320                 STATISTICS_FIELD(node_packets_recv),
321                 STATISTICS_FIELD(keepalive_packets_sent),
322                 STATISTICS_FIELD(keepalive_packets_recv),
323                 STATISTICS_FIELD(node.req_call),
324                 STATISTICS_FIELD(node.reply_call),
325                 STATISTICS_FIELD(node.req_dmaster),
326                 STATISTICS_FIELD(node.reply_dmaster),
327                 STATISTICS_FIELD(node.reply_error),
328                 STATISTICS_FIELD(node.req_message),
329                 STATISTICS_FIELD(node.req_control),
330                 STATISTICS_FIELD(node.reply_control),
331                 STATISTICS_FIELD(client.req_call),
332                 STATISTICS_FIELD(client.req_message),
333                 STATISTICS_FIELD(client.req_control),
334                 STATISTICS_FIELD(timeouts.call),
335                 STATISTICS_FIELD(timeouts.control),
336                 STATISTICS_FIELD(timeouts.traverse),
337                 STATISTICS_FIELD(total_calls),
338                 STATISTICS_FIELD(pending_calls),
339                 STATISTICS_FIELD(lockwait_calls),
340                 STATISTICS_FIELD(pending_lockwait_calls),
341                 STATISTICS_FIELD(childwrite_calls),
342                 STATISTICS_FIELD(pending_childwrite_calls),
343                 STATISTICS_FIELD(memory_used),
344                 STATISTICS_FIELD(max_hop_count),
345                 STATISTICS_FIELD(total_ro_delegations),
346                 STATISTICS_FIELD(total_ro_revokes),
347         };
348         
349         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
350         seconds = tmp%60;
351         tmp    /= 60;
352         minutes = tmp%60;
353         tmp    /= 60;
354         hours   = tmp%24;
355         tmp    /= 24;
356         days    = tmp;
357
358         if (options.machinereadable){
359                 if (show_header) {
360                         printf("CTDB version:");
361                         printf("Current time of statistics:");
362                         printf("Statistics collected since:");
363                         for (i=0;i<ARRAY_SIZE(fields);i++) {
364                                 printf("%s:", fields[i].name);
365                         }
366                         printf("num_reclock_ctdbd_latency:");
367                         printf("min_reclock_ctdbd_latency:");
368                         printf("avg_reclock_ctdbd_latency:");
369                         printf("max_reclock_ctdbd_latency:");
370
371                         printf("num_reclock_recd_latency:");
372                         printf("min_reclock_recd_latency:");
373                         printf("avg_reclock_recd_latency:");
374                         printf("max_reclock_recd_latency:");
375
376                         printf("num_call_latency:");
377                         printf("min_call_latency:");
378                         printf("avg_call_latency:");
379                         printf("max_call_latency:");
380
381                         printf("num_lockwait_latency:");
382                         printf("min_lockwait_latency:");
383                         printf("avg_lockwait_latency:");
384                         printf("max_lockwait_latency:");
385
386                         printf("num_childwrite_latency:");
387                         printf("min_childwrite_latency:");
388                         printf("avg_childwrite_latency:");
389                         printf("max_childwrite_latency:");
390                         printf("\n");
391                 }
392                 printf("%d:", CTDB_VERSION);
393                 printf("%d:", (int)s->statistics_current_time.tv_sec);
394                 printf("%d:", (int)s->statistics_start_time.tv_sec);
395                 for (i=0;i<ARRAY_SIZE(fields);i++) {
396                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
397                 }
398                 printf("%d:", s->reclock.ctdbd.num);
399                 printf("%.6f:", s->reclock.ctdbd.min);
400                 printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
401                 printf("%.6f:", s->reclock.ctdbd.max);
402
403                 printf("%d:", s->reclock.recd.num);
404                 printf("%.6f:", s->reclock.recd.min);
405                 printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
406                 printf("%.6f:", s->reclock.recd.max);
407
408                 printf("%d:", s->call_latency.num);
409                 printf("%.6f:", s->call_latency.min);
410                 printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
411                 printf("%.6f:", s->call_latency.max);
412
413                 printf("%d:", s->lockwait_latency.num);
414                 printf("%.6f:", s->lockwait_latency.min);
415                 printf("%.6f:", s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0);
416                 printf("%.6f:", s->lockwait_latency.max);
417
418                 printf("%d:", s->childwrite_latency.num);
419                 printf("%.6f:", s->childwrite_latency.min);
420                 printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
421                 printf("%.6f:", s->childwrite_latency.max);
422                 printf("\n");
423         } else {
424                 printf("CTDB version %u\n", CTDB_VERSION);
425                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
426                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
427
428                 for (i=0;i<ARRAY_SIZE(fields);i++) {
429                         if (strchr(fields[i].name, '.')) {
430                                 preflen = strcspn(fields[i].name, ".")+1;
431                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
432                                         prefix = fields[i].name;
433                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
434                                 }
435                         } else {
436                                 preflen = 0;
437                         }
438                         printf(" %*s%-22s%*s%10u\n", 
439                                preflen?4:0, "",
440                                fields[i].name+preflen, 
441                                preflen?0:4, "",
442                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
443                 }
444                 printf(" hop_count_buckets:");
445                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
446                         printf(" %d", s->hop_count_bucket[i]);
447                 }
448                 printf("\n");
449                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd       MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
450
451                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
452
453                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
454                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "lockwait_latency   MIN/AVG/MAX", s->lockwait_latency.min, s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0, s->lockwait_latency.max, s->lockwait_latency.num);
455                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
456         }
457
458         talloc_free(tmp_ctx);
459 }
460
461 /*
462   display remote ctdb statistics combined from all nodes
463  */
464 static int control_statistics_all(struct ctdb_context *ctdb)
465 {
466         int ret, i;
467         struct ctdb_statistics statistics;
468         uint32_t *nodes;
469         uint32_t num_nodes;
470
471         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
472         CTDB_NO_MEMORY(ctdb, nodes);
473         
474         ZERO_STRUCT(statistics);
475
476         for (i=0;i<num_nodes;i++) {
477                 struct ctdb_statistics s1;
478                 int j;
479                 uint32_t *v1 = (uint32_t *)&s1;
480                 uint32_t *v2 = (uint32_t *)&statistics;
481                 uint32_t num_ints = 
482                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
483                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
484                 if (ret != 0) {
485                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
486                         return ret;
487                 }
488                 for (j=0;j<num_ints;j++) {
489                         v2[j] += v1[j];
490                 }
491                 statistics.max_hop_count = 
492                         MAX(statistics.max_hop_count, s1.max_hop_count);
493                 statistics.call_latency.max = 
494                         MAX(statistics.call_latency.max, s1.call_latency.max);
495                 statistics.lockwait_latency.max = 
496                         MAX(statistics.lockwait_latency.max, s1.lockwait_latency.max);
497         }
498         talloc_free(nodes);
499         printf("Gathered statistics for %u nodes\n", num_nodes);
500         show_statistics(&statistics, 1);
501         return 0;
502 }
503
504 /*
505   display remote ctdb statistics
506  */
507 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
508 {
509         int ret;
510         struct ctdb_statistics statistics;
511
512         if (options.pnn == CTDB_BROADCAST_ALL) {
513                 return control_statistics_all(ctdb);
514         }
515
516         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
517         if (ret != 0) {
518                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
519                 return ret;
520         }
521         show_statistics(&statistics, 1);
522         return 0;
523 }
524
525
526 /*
527   reset remote ctdb statistics
528  */
529 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
530 {
531         int ret;
532
533         ret = ctdb_statistics_reset(ctdb, options.pnn);
534         if (ret != 0) {
535                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
536                 return ret;
537         }
538         return 0;
539 }
540
541
542 /*
543   display remote ctdb rolling statistics
544  */
545 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
546 {
547         int ret;
548         struct ctdb_statistics_wire *stats;
549         int i, num_records = -1;
550
551         if (argc ==1) {
552                 num_records = atoi(argv[0]) - 1;
553         }
554
555         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
556         if (ret != 0) {
557                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
558                 return ret;
559         }
560         for (i=0;i<stats->num;i++) {
561                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
562                         continue;
563                 }
564                 show_statistics(&stats->stats[i], i==0);
565                 if (i == num_records) {
566                         break;
567                 }
568         }
569         return 0;
570 }
571
572
573 /*
574   display remote ctdb db statistics
575  */
576 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
577 {
578         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
579         struct ctdb_db_statistics *dbstatistics;
580         struct ctdb_dbid_map *dbmap=NULL;
581         int i, ret;
582
583         if (argc < 1) {
584                 usage();
585         }
586
587         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
588         if (ret != 0) {
589                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
590                 return ret;
591         }
592         for(i=0;i<dbmap->num;i++){
593                 const char *name;
594
595                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
596                 if(!strcmp(argv[0], name)){
597                         talloc_free(discard_const(name));
598                         break;
599                 }
600                 talloc_free(discard_const(name));
601         }
602         if (i == dbmap->num) {
603                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
604                 talloc_free(tmp_ctx);
605                 return -1;
606         }
607
608         if (!ctdb_getdbstat(ctdb_connection, options.pnn, dbmap->dbs[i].dbid, &dbstatistics)) {
609                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
610                 talloc_free(tmp_ctx);
611                 return -1;
612         }
613
614         printf("DB Statistics:\n");
615         printf("RO Delegations: %d\n", dbstatistics->db_ro_delegations);
616         printf("RO Revokes:     %d\n", dbstatistics->db_ro_revokes);
617         printf(" hop_count_buckets:");
618         for (i=0;i<MAX_COUNT_BUCKETS;i++) {
619                 printf(" %d", dbstatistics->hop_count_bucket[i]);
620         }
621         printf("\n");
622         printf("Num Hot Keys:     %d\n", dbstatistics->num_hot_keys);
623         for (i = 0; i < dbstatistics->num_hot_keys; i++) {
624                 int j;
625                 printf("Count:%d Key:", dbstatistics->hot_keys[i].count);
626                 for (j = 0; j < dbstatistics->hot_keys[i].key.dsize; j++) {
627                         printf("%02x", dbstatistics->hot_keys[i].key.dptr[j]&0xff);
628                 }
629                 printf("\n");
630         }
631
632         ctdb_free_dbstat(dbstatistics);
633         return 0;
634 }
635
636 /*
637   display uptime of remote node
638  */
639 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
640 {
641         int ret;
642         struct ctdb_uptime *uptime = NULL;
643         int tmp, days, hours, minutes, seconds;
644
645         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
646         if (ret != 0) {
647                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
648                 return ret;
649         }
650
651         if (options.machinereadable){
652                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
653                 printf(":%u:%u:%u:%lf\n",
654                         (unsigned int)uptime->current_time.tv_sec,
655                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
656                         (unsigned int)uptime->last_recovery_finished.tv_sec,
657                         timeval_delta(&uptime->last_recovery_finished,
658                                       &uptime->last_recovery_started)
659                 );
660                 return 0;
661         }
662
663         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
664
665         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
666         seconds = tmp%60;
667         tmp    /= 60;
668         minutes = tmp%60;
669         tmp    /= 60;
670         hours   = tmp%24;
671         tmp    /= 24;
672         days    = tmp;
673         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
674
675         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
676         seconds = tmp%60;
677         tmp    /= 60;
678         minutes = tmp%60;
679         tmp    /= 60;
680         hours   = tmp%24;
681         tmp    /= 24;
682         days    = tmp;
683         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
684         
685         printf("Duration of last recovery/failover: %lf seconds\n",
686                 timeval_delta(&uptime->last_recovery_finished,
687                               &uptime->last_recovery_started));
688
689         return 0;
690 }
691
692 /*
693   show the PNN of the current node
694  */
695 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
696 {
697         uint32_t mypnn;
698         bool ret;
699
700         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
701         if (!ret) {
702                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
703                 return -1;
704         }
705
706         printf("PNN:%d\n", mypnn);
707         return 0;
708 }
709
710
711 struct pnn_node {
712         struct pnn_node *next;
713         const char *addr;
714         int pnn;
715 };
716
717 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
718 {
719         const char *nodes_list;
720         int nlines;
721         char **lines;
722         int i, pnn;
723         struct pnn_node *pnn_nodes = NULL;
724         struct pnn_node *pnn_node;
725         struct pnn_node *tmp_node;
726
727         /* read the nodes file */
728         nodes_list = getenv("CTDB_NODES");
729         if (nodes_list == NULL) {
730                 nodes_list = "/etc/ctdb/nodes";
731         }
732         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
733         if (lines == NULL) {
734                 return NULL;
735         }
736         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
737                 nlines--;
738         }
739         for (i=0, pnn=0; i<nlines; i++) {
740                 char *node;
741
742                 node = lines[i];
743                 /* strip leading spaces */
744                 while((*node == ' ') || (*node == '\t')) {
745                         node++;
746                 }
747                 if (*node == '#') {
748                         pnn++;
749                         continue;
750                 }
751                 if (strcmp(node, "") == 0) {
752                         continue;
753                 }
754                 pnn_node = talloc(mem_ctx, struct pnn_node);
755                 pnn_node->pnn = pnn++;
756                 pnn_node->addr = talloc_strdup(pnn_node, node);
757                 pnn_node->next = pnn_nodes;
758                 pnn_nodes = pnn_node;
759         }
760
761         /* swap them around so we return them in incrementing order */
762         pnn_node = pnn_nodes;
763         pnn_nodes = NULL;
764         while (pnn_node) {
765                 tmp_node = pnn_node;
766                 pnn_node = pnn_node->next;
767
768                 tmp_node->next = pnn_nodes;
769                 pnn_nodes = tmp_node;
770         }
771
772         return pnn_nodes;
773 }
774
775 /*
776   show the PNN of the current node
777   discover the pnn by loading the nodes file and try to bind to all
778   addresses one at a time until the ip address is found.
779  */
780 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
781 {
782         TALLOC_CTX *mem_ctx = talloc_new(NULL);
783         struct pnn_node *pnn_nodes;
784         struct pnn_node *pnn_node;
785
786         pnn_nodes = read_nodes_file(mem_ctx);
787         if (pnn_nodes == NULL) {
788                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
789                 talloc_free(mem_ctx);
790                 return -1;
791         }
792
793         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
794                 ctdb_sock_addr addr;
795
796                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
797                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
798                         talloc_free(mem_ctx);
799                         return -1;
800                 }
801
802                 if (ctdb_sys_have_ip(&addr)) {
803                         printf("PNN:%d\n", pnn_node->pnn);
804                         talloc_free(mem_ctx);
805                         return 0;
806                 }
807         }
808
809         printf("Failed to detect which PNN this node is\n");
810         talloc_free(mem_ctx);
811         return -1;
812 }
813
814 /* Helpers for ctdb status
815  */
816 static bool is_partially_online(struct ctdb_node_and_flags *node)
817 {
818         int j;
819         bool ret = false;
820
821         if (node->flags == 0) {
822                 struct ctdb_ifaces_list *ifaces;
823
824                 if (ctdb_getifaces(ctdb_connection, node->pnn, &ifaces)) {
825                         for (j=0; j < ifaces->num; j++) {
826                                 if (ifaces->ifaces[j].link_state != 0) {
827                                         continue;
828                                 }
829                                 ret = true;
830                                 break;
831                         }
832                         ctdb_free_ifaces(ifaces);
833                 }
834         }
835
836         return ret;
837 }
838
839 static void control_status_header_machine(void)
840 {
841         printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
842                ":Inactive:PartiallyOnline:ThisNode:\n");
843 }
844
845 static int control_status_1_machine(int mypnn, struct ctdb_node_and_flags *node)
846 {
847         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
848                ctdb_addr_to_str(&node->addr),
849                !!(node->flags&NODE_FLAGS_DISCONNECTED),
850                !!(node->flags&NODE_FLAGS_BANNED),
851                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
852                !!(node->flags&NODE_FLAGS_UNHEALTHY),
853                !!(node->flags&NODE_FLAGS_STOPPED),
854                !!(node->flags&NODE_FLAGS_INACTIVE),
855                is_partially_online(node) ? 1 : 0,
856                (node->pnn == mypnn)?'Y':'N');
857
858         return node->flags;
859 }
860
861 static int control_status_1_human(int mypnn, struct ctdb_node_and_flags *node)
862 {
863        printf("pnn:%d %-16s %s%s\n", node->pnn,
864               ctdb_addr_to_str(&node->addr),
865               is_partially_online(node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
866               node->pnn == mypnn?" (THIS NODE)":"");
867
868        return node->flags;
869 }
870
871 /*
872   display remote ctdb status
873  */
874 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
875 {
876         int i;
877         struct ctdb_vnn_map *vnnmap=NULL;
878         struct ctdb_node_map *nodemap=NULL;
879         uint32_t recmode, recmaster, mypnn;
880
881         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
882                 return -1;
883         }
884
885         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
886                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
887                 return -1;
888         }
889
890         if (options.machinereadable) {
891                 control_status_header_machine();
892                 for (i=0;i<nodemap->num;i++) {
893                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
894                                 continue;
895                         }
896                         (void) control_status_1_machine(mypnn,
897                                                         &nodemap->nodes[i]);
898                 }
899                 return 0;
900         }
901
902         printf("Number of nodes:%d\n", nodemap->num);
903         for(i=0;i<nodemap->num;i++){
904                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
905                         continue;
906                 }
907                 (void) control_status_1_human(mypnn, &nodemap->nodes[i]);
908         }
909
910         if (!ctdb_getvnnmap(ctdb_connection, options.pnn, &vnnmap)) {
911                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
912                 return -1;
913         }
914         if (vnnmap->generation == INVALID_GENERATION) {
915                 printf("Generation:INVALID\n");
916         } else {
917                 printf("Generation:%d\n",vnnmap->generation);
918         }
919         printf("Size:%d\n",vnnmap->size);
920         for(i=0;i<vnnmap->size;i++){
921                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
922         }
923         ctdb_free_vnnmap(vnnmap);
924
925         if (!ctdb_getrecmode(ctdb_connection, options.pnn, &recmode)) {
926                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
927                 return -1;
928         }
929         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
930
931         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
932                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
933                 return -1;
934         }
935         printf("Recovery master:%d\n",recmaster);
936
937         return 0;
938 }
939
940 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
941 {
942         int i, ret;
943         struct ctdb_node_map *nodemap=NULL;
944         uint32_t * nodes;
945         uint32_t pnn_mode, mypnn;
946
947         if (argc > 1) {
948                 usage();
949         }
950
951         if (!parse_nodestring(ctdb, argc == 1 ? argv[0] : NULL,
952                               options.pnn, true, &nodes, &pnn_mode)) {
953                 return -1;
954         }
955
956         if (options.machinereadable) {
957                 control_status_header_machine();
958         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
959                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
960         }
961
962         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
963                 DEBUG(DEBUG_ERR, ("Unable to get PNN from local node\n"));
964                 return -1;
965         }
966
967         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
968                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
969                 return -1;
970         }
971
972         ret = 0;
973
974         for (i = 0; i < talloc_array_length(nodes); i++) {
975                 if (options.machinereadable) {
976                         ret |= control_status_1_machine(mypnn,
977                                                         &nodemap->nodes[nodes[i]]);
978                 } else {
979                         ret |= control_status_1_human(mypnn,
980                                                       &nodemap->nodes[nodes[i]]);
981                 }
982         }
983         return ret;
984 }
985
986 struct natgw_node {
987         struct natgw_node *next;
988         const char *addr;
989 };
990
991 /*
992   display the list of nodes belonging to this natgw configuration
993  */
994 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
995 {
996         int i, ret;
997         uint32_t capabilities;
998         const char *natgw_list;
999         int nlines;
1000         char **lines;
1001         struct natgw_node *natgw_nodes = NULL;
1002         struct natgw_node *natgw_node;
1003         struct ctdb_node_map *nodemap=NULL;
1004
1005
1006         /* read the natgw nodes file into a linked list */
1007         natgw_list = getenv("CTDB_NATGW_NODES");
1008         if (natgw_list == NULL) {
1009                 natgw_list = "/etc/ctdb/natgw_nodes";
1010         }
1011         lines = file_lines_load(natgw_list, &nlines, ctdb);
1012         if (lines == NULL) {
1013                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
1014                 return -1;
1015         }
1016         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
1017                 nlines--;
1018         }
1019         for (i=0;i<nlines;i++) {
1020                 char *node;
1021
1022                 node = lines[i];
1023                 /* strip leading spaces */
1024                 while((*node == ' ') || (*node == '\t')) {
1025                         node++;
1026                 }
1027                 if (*node == '#') {
1028                         continue;
1029                 }
1030                 if (strcmp(node, "") == 0) {
1031                         continue;
1032                 }
1033                 natgw_node = talloc(ctdb, struct natgw_node);
1034                 natgw_node->addr = talloc_strdup(natgw_node, node);
1035                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
1036                 natgw_node->next = natgw_nodes;
1037                 natgw_nodes = natgw_node;
1038         }
1039
1040         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
1041         if (ret != 0) {
1042                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1043                 return ret;
1044         }
1045
1046         i=0;
1047         while(i<nodemap->num) {
1048                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
1049                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
1050                                 break;
1051                         }
1052                 }
1053
1054                 /* this node was not in the natgw so we just remove it from
1055                  * the list
1056                  */
1057                 if ((natgw_node == NULL) 
1058                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
1059                         int j;
1060
1061                         for (j=i+1; j<nodemap->num; j++) {
1062                                 nodemap->nodes[j-1] = nodemap->nodes[j];
1063                         }
1064                         nodemap->num--;
1065                         continue;
1066                 }
1067
1068                 i++;
1069         }               
1070
1071         /* pick a node to be natgwmaster
1072          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
1073          */
1074         for(i=0;i<nodemap->num;i++){
1075                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
1076                         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
1077                         if (ret != 0) {
1078                                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1079                                 return ret;
1080                         }
1081                         if (!(capabilities&CTDB_CAP_NATGW)) {
1082                                 continue;
1083                         }
1084                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
1085                         break;
1086                 }
1087         }
1088         /* we couldnt find any healthy node, try unhealthy ones */
1089         if (i == nodemap->num) {
1090                 for(i=0;i<nodemap->num;i++){
1091                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
1092                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
1093                                 if (ret != 0) {
1094                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1095                                         return ret;
1096                                 }
1097                                 if (!(capabilities&CTDB_CAP_NATGW)) {
1098                                         continue;
1099                                 }
1100                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
1101                                 break;
1102                         }
1103                 }
1104         }
1105         /* unless all nodes are STOPPED, when we pick one anyway */
1106         if (i == nodemap->num) {
1107                 for(i=0;i<nodemap->num;i++){
1108                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
1109                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
1110                                 if (ret != 0) {
1111                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1112                                         return ret;
1113                                 }
1114                                 if (!(capabilities&CTDB_CAP_NATGW)) {
1115                                         continue;
1116                                 }
1117                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
1118                                 break;
1119                         }
1120                 }
1121                 /* or if we still can not find any */
1122                 if (i == nodemap->num) {
1123                         printf("-1 0.0.0.0\n");
1124                         ret = 2; /* matches ENOENT */
1125                 }
1126         }
1127
1128         /* print the pruned list of nodes belonging to this natgw list */
1129         for(i=0;i<nodemap->num;i++){
1130                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1131                         continue;
1132                 }
1133                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
1134                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
1135                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
1136                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
1137                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
1138                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
1139                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
1140         }
1141
1142         return ret;
1143 }
1144
1145 /*
1146   display the status of the scripts for monitoring (or other events)
1147  */
1148 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1149                                     enum ctdb_eventscript_call type)
1150 {
1151         struct ctdb_scripts_wire *script_status;
1152         int ret, i;
1153
1154         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1155         if (ret != 0) {
1156                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1157                 return ret;
1158         }
1159
1160         if (script_status == NULL) {
1161                 if (!options.machinereadable) {
1162                         printf("%s cycle never run\n",
1163                                ctdb_eventscript_call_names[type]);
1164                 }
1165                 return 0;
1166         }
1167
1168         if (!options.machinereadable) {
1169                 printf("%d scripts were executed last %s cycle\n",
1170                        script_status->num_scripts,
1171                        ctdb_eventscript_call_names[type]);
1172         }
1173         for (i=0; i<script_status->num_scripts; i++) {
1174                 const char *status = NULL;
1175
1176                 switch (script_status->scripts[i].status) {
1177                 case -ETIME:
1178                         status = "TIMEDOUT";
1179                         break;
1180                 case -ENOEXEC:
1181                         status = "DISABLED";
1182                         break;
1183                 case 0:
1184                         status = "OK";
1185                         break;
1186                 default:
1187                         if (script_status->scripts[i].status > 0)
1188                                 status = "ERROR";
1189                         break;
1190                 }
1191                 if (options.machinereadable) {
1192                         printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1193                                ctdb_eventscript_call_names[type],
1194                                script_status->scripts[i].name,
1195                                script_status->scripts[i].status,
1196                                status,
1197                                (long)script_status->scripts[i].start.tv_sec,
1198                                (long)script_status->scripts[i].start.tv_usec,
1199                                (long)script_status->scripts[i].finished.tv_sec,
1200                                (long)script_status->scripts[i].finished.tv_usec,
1201                                script_status->scripts[i].output);
1202                         continue;
1203                 }
1204                 if (status)
1205                         printf("%-20s Status:%s    ",
1206                                script_status->scripts[i].name, status);
1207                 else
1208                         /* Some other error, eg from stat. */
1209                         printf("%-20s Status:CANNOT RUN (%s)",
1210                                script_status->scripts[i].name,
1211                                strerror(-script_status->scripts[i].status));
1212
1213                 if (script_status->scripts[i].status >= 0) {
1214                         printf("Duration:%.3lf ",
1215                         timeval_delta(&script_status->scripts[i].finished,
1216                               &script_status->scripts[i].start));
1217                 }
1218                 if (script_status->scripts[i].status != -ENOEXEC) {
1219                         printf("%s",
1220                                ctime(&script_status->scripts[i].start.tv_sec));
1221                         if (script_status->scripts[i].status != 0) {
1222                                 printf("   OUTPUT:%s\n",
1223                                        script_status->scripts[i].output);
1224                         }
1225                 } else {
1226                         printf("\n");
1227                 }
1228         }
1229         return 0;
1230 }
1231
1232
1233 static int control_scriptstatus(struct ctdb_context *ctdb,
1234                                 int argc, const char **argv)
1235 {
1236         int ret;
1237         enum ctdb_eventscript_call type, min, max;
1238         const char *arg;
1239
1240         if (argc > 1) {
1241                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1242                 return -1;
1243         }
1244
1245         if (argc == 0)
1246                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1247         else
1248                 arg = argv[0];
1249
1250         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1251                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1252                         min = type;
1253                         max = type+1;
1254                         break;
1255                 }
1256         }
1257         if (type == CTDB_EVENT_MAX) {
1258                 if (strcmp(arg, "all") == 0) {
1259                         min = 0;
1260                         max = CTDB_EVENT_MAX;
1261                 } else {
1262                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1263                         return -1;
1264                 }
1265         }
1266
1267         if (options.machinereadable) {
1268                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1269         }
1270
1271         for (type = min; type < max; type++) {
1272                 ret = control_one_scriptstatus(ctdb, type);
1273                 if (ret != 0) {
1274                         return ret;
1275                 }
1276         }
1277
1278         return 0;
1279 }
1280
1281 /*
1282   enable an eventscript
1283  */
1284 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1285 {
1286         int ret;
1287
1288         if (argc < 1) {
1289                 usage();
1290         }
1291
1292         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1293         if (ret != 0) {
1294           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1295                 return ret;
1296         }
1297
1298         return 0;
1299 }
1300
1301 /*
1302   disable an eventscript
1303  */
1304 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1305 {
1306         int ret;
1307
1308         if (argc < 1) {
1309                 usage();
1310         }
1311
1312         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1313         if (ret != 0) {
1314           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1315                 return ret;
1316         }
1317
1318         return 0;
1319 }
1320
1321 /*
1322   display the pnn of the recovery master
1323  */
1324 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1325 {
1326         uint32_t recmaster;
1327
1328         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
1329                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1330                 return -1;
1331         }
1332         printf("%d\n",recmaster);
1333
1334         return 0;
1335 }
1336
1337 /*
1338   add a tickle to a public address
1339  */
1340 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1341 {
1342         struct ctdb_tcp_connection t;
1343         TDB_DATA data;
1344         int ret;
1345
1346         if (argc < 2) {
1347                 usage();
1348         }
1349
1350         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1351                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1352                 return -1;
1353         }
1354         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1355                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1356                 return -1;
1357         }
1358
1359         data.dptr = (uint8_t *)&t;
1360         data.dsize = sizeof(t);
1361
1362         /* tell all nodes about this tcp connection */
1363         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1364                            0, data, ctdb, NULL, NULL, NULL, NULL);
1365         if (ret != 0) {
1366                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1367                 return -1;
1368         }
1369         
1370         return 0;
1371 }
1372
1373
1374 /*
1375   delete a tickle from a node
1376  */
1377 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1378 {
1379         struct ctdb_tcp_connection t;
1380         TDB_DATA data;
1381         int ret;
1382
1383         if (argc < 2) {
1384                 usage();
1385         }
1386
1387         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1388                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1389                 return -1;
1390         }
1391         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1392                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1393                 return -1;
1394         }
1395
1396         data.dptr = (uint8_t *)&t;
1397         data.dsize = sizeof(t);
1398
1399         /* tell all nodes about this tcp connection */
1400         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1401                            0, data, ctdb, NULL, NULL, NULL, NULL);
1402         if (ret != 0) {
1403                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1404                 return -1;
1405         }
1406         
1407         return 0;
1408 }
1409
1410
1411 /*
1412   get a list of all tickles for this pnn
1413  */
1414 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1415 {
1416         struct ctdb_control_tcp_tickle_list *list;
1417         ctdb_sock_addr addr;
1418         int i, ret;
1419         unsigned port = 0;
1420
1421         if (argc < 1) {
1422                 usage();
1423         }
1424
1425         if (argc == 2) {
1426                 port = atoi(argv[1]);
1427         }
1428
1429         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1430                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1431                 return -1;
1432         }
1433
1434         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1435         if (ret == -1) {
1436                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1437                 return -1;
1438         }
1439
1440         if (options.machinereadable){
1441                 printf(":source ip:port:destination ip:port:\n");
1442                 for (i=0;i<list->tickles.num;i++) {
1443                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1444                                 continue;
1445                         }
1446                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1447                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1448                 }
1449         } else {
1450                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1451                 printf("Num tickles:%u\n", list->tickles.num);
1452                 for (i=0;i<list->tickles.num;i++) {
1453                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1454                                 continue;
1455                         }
1456                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1457                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1458                 }
1459         }
1460
1461         talloc_free(list);
1462         
1463         return 0;
1464 }
1465
1466
1467 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1468 {
1469         struct ctdb_all_public_ips *ips;
1470         struct ctdb_public_ip ip;
1471         int i, ret;
1472         uint32_t *nodes;
1473         uint32_t disable_time;
1474         TDB_DATA data;
1475         struct ctdb_node_map *nodemap=NULL;
1476         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1477
1478         disable_time = 30;
1479         data.dptr  = (uint8_t*)&disable_time;
1480         data.dsize = sizeof(disable_time);
1481         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1482         if (ret != 0) {
1483                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1484                 return -1;
1485         }
1486
1487
1488
1489         /* read the public ip list from the node */
1490         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1491         if (ret != 0) {
1492                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1493                 talloc_free(tmp_ctx);
1494                 return -1;
1495         }
1496
1497         for (i=0;i<ips->num;i++) {
1498                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1499                         break;
1500                 }
1501         }
1502         if (i==ips->num) {
1503                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1504                         pnn, ctdb_addr_to_str(addr)));
1505                 talloc_free(tmp_ctx);
1506                 return -1;
1507         }
1508
1509         ip.pnn  = pnn;
1510         ip.addr = *addr;
1511
1512         data.dptr  = (uint8_t *)&ip;
1513         data.dsize = sizeof(ip);
1514
1515         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1516         if (ret != 0) {
1517                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1518                 talloc_free(tmp_ctx);
1519                 return ret;
1520         }
1521
1522         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1523         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1524                                         nodes, 0,
1525                                         LONGTIMELIMIT(),
1526                                         false, data,
1527                                         NULL, NULL,
1528                                         NULL);
1529         if (ret != 0) {
1530                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1531                 talloc_free(tmp_ctx);
1532                 return -1;
1533         }
1534
1535         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1536         if (ret != 0) {
1537                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1538                 talloc_free(tmp_ctx);
1539                 return -1;
1540         }
1541
1542         /* update the recovery daemon so it now knows to expect the new
1543            node assignment for this ip.
1544         */
1545         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1546         if (ret != 0) {
1547                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1548                 return -1;
1549         }
1550
1551         talloc_free(tmp_ctx);
1552         return 0;
1553 }
1554
1555 /*
1556   move/failover an ip address to a specific node
1557  */
1558 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1559 {
1560         uint32_t pnn;
1561         int ret, retries = 0;
1562         ctdb_sock_addr addr;
1563
1564         if (argc < 2) {
1565                 usage();
1566                 return -1;
1567         }
1568
1569         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1570                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1571                 return -1;
1572         }
1573
1574
1575         if (sscanf(argv[1], "%u", &pnn) != 1) {
1576                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1577                 return -1;
1578         }
1579
1580         do {
1581                 ret = move_ip(ctdb, &addr, pnn);
1582                 if (ret != 0) {
1583                         DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 second and try again.\n", pnn));
1584                         sleep(3);
1585                         retries++;
1586                 }
1587         } while (retries < 5 && ret != 0);
1588         if (ret != 0) {
1589                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", pnn));
1590                 return -1;
1591         }
1592
1593         return 0;
1594 }
1595
1596 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1597 {
1598         uint32_t recmaster;
1599         TDB_DATA data;
1600
1601         if (ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), pnn, &recmaster) != 0) {
1602                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", pnn));
1603                 return -1;
1604         }
1605
1606         data.dptr  = (uint8_t *)&pnn;
1607         data.dsize = sizeof(uint32_t);
1608         if (ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1609                 DEBUG(DEBUG_ERR,("Failed to send message to force node reallocation\n"));
1610                 return -1;
1611         }
1612
1613         return 0;
1614 }
1615
1616
1617 /*
1618   rebalance a node by setting it to allow failback and triggering a
1619   takeover run
1620  */
1621 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1622 {
1623         switch (options.pnn) {
1624         case CTDB_BROADCAST_ALL:
1625         case CTDB_CURRENT_NODE:
1626                 DEBUG(DEBUG_ERR,("You must specify a node number with -n <pnn> for the node to rebalance\n"));
1627                 return -1;
1628         }
1629
1630         return rebalance_node(ctdb, options.pnn);
1631 }
1632
1633
1634 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1635 {
1636         struct ctdb_public_ip ip;
1637         int ret;
1638         uint32_t *nodes;
1639         uint32_t disable_time;
1640         TDB_DATA data;
1641         struct ctdb_node_map *nodemap=NULL;
1642         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1643
1644         disable_time = 30;
1645         data.dptr  = (uint8_t*)&disable_time;
1646         data.dsize = sizeof(disable_time);
1647         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1648         if (ret != 0) {
1649                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1650                 return -1;
1651         }
1652
1653         ip.pnn  = -1;
1654         ip.addr = *addr;
1655
1656         data.dptr  = (uint8_t *)&ip;
1657         data.dsize = sizeof(ip);
1658
1659         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1660         if (ret != 0) {
1661                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1662                 talloc_free(tmp_ctx);
1663                 return ret;
1664         }
1665
1666         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1667         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1668                                         nodes, 0,
1669                                         LONGTIMELIMIT(),
1670                                         false, data,
1671                                         NULL, NULL,
1672                                         NULL);
1673         if (ret != 0) {
1674                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1675                 talloc_free(tmp_ctx);
1676                 return -1;
1677         }
1678
1679         talloc_free(tmp_ctx);
1680         return 0;
1681 }
1682
1683 /*
1684   release an ip form all nodes and have it re-assigned by recd
1685  */
1686 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
1687 {
1688         ctdb_sock_addr addr;
1689
1690         if (argc < 1) {
1691                 usage();
1692                 return -1;
1693         }
1694
1695         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1696                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1697                 return -1;
1698         }
1699
1700         if (rebalance_ip(ctdb, &addr) != 0) {
1701                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
1702                 return -1;
1703         }
1704
1705         return 0;
1706 }
1707
1708 static int getips_store_callback(void *param, void *data)
1709 {
1710         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1711         struct ctdb_all_public_ips *ips = param;
1712         int i;
1713
1714         i = ips->num++;
1715         ips->ips[i].pnn  = node_ip->pnn;
1716         ips->ips[i].addr = node_ip->addr;
1717         return 0;
1718 }
1719
1720 static int getips_count_callback(void *param, void *data)
1721 {
1722         uint32_t *count = param;
1723
1724         (*count)++;
1725         return 0;
1726 }
1727
1728 #define IP_KEYLEN       4
1729 static uint32_t *ip_key(ctdb_sock_addr *ip)
1730 {
1731         static uint32_t key[IP_KEYLEN];
1732
1733         bzero(key, sizeof(key));
1734
1735         switch (ip->sa.sa_family) {
1736         case AF_INET:
1737                 key[0]  = ip->ip.sin_addr.s_addr;
1738                 break;
1739         case AF_INET6: {
1740                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
1741                 key[0]  = s6_a32[3];
1742                 key[1]  = s6_a32[2];
1743                 key[2]  = s6_a32[1];
1744                 key[3]  = s6_a32[0];
1745                 break;
1746         }
1747         default:
1748                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1749                 return key;
1750         }
1751
1752         return key;
1753 }
1754
1755 static void *add_ip_callback(void *parm, void *data)
1756 {
1757         return parm;
1758 }
1759
1760 static int
1761 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1762 {
1763         struct ctdb_all_public_ips *tmp_ips;
1764         struct ctdb_node_map *nodemap=NULL;
1765         trbt_tree_t *ip_tree;
1766         int i, j, len, ret;
1767         uint32_t count;
1768
1769         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1770         if (ret != 0) {
1771                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1772                 return ret;
1773         }
1774
1775         ip_tree = trbt_create(tmp_ctx, 0);
1776
1777         for(i=0;i<nodemap->num;i++){
1778                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1779                         continue;
1780                 }
1781                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1782                         continue;
1783                 }
1784
1785                 /* read the public ip list from this node */
1786                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1787                 if (ret != 0) {
1788                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1789                         return -1;
1790                 }
1791         
1792                 for (j=0; j<tmp_ips->num;j++) {
1793                         struct ctdb_public_ip *node_ip;
1794
1795                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1796                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1797                         node_ip->addr = tmp_ips->ips[j].addr;
1798
1799                         trbt_insertarray32_callback(ip_tree,
1800                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1801                                 add_ip_callback,
1802                                 node_ip);
1803                 }
1804                 talloc_free(tmp_ips);
1805         }
1806
1807         /* traverse */
1808         count = 0;
1809         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1810
1811         len = offsetof(struct ctdb_all_public_ips, ips) + 
1812                 count*sizeof(struct ctdb_public_ip);
1813         tmp_ips = talloc_zero_size(tmp_ctx, len);
1814         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1815
1816         *ips = tmp_ips;
1817
1818         return 0;
1819 }
1820
1821
1822 /* 
1823  * scans all other nodes and returns a pnn for another node that can host this 
1824  * ip address or -1
1825  */
1826 static int
1827 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1828 {
1829         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1830         struct ctdb_all_public_ips *ips;
1831         struct ctdb_node_map *nodemap=NULL;
1832         int i, j, ret;
1833
1834         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1835         if (ret != 0) {
1836                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1837                 talloc_free(tmp_ctx);
1838                 return ret;
1839         }
1840
1841         for(i=0;i<nodemap->num;i++){
1842                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1843                         continue;
1844                 }
1845                 if (nodemap->nodes[i].pnn == options.pnn) {
1846                         continue;
1847                 }
1848
1849                 /* read the public ip list from this node */
1850                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1851                 if (ret != 0) {
1852                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1853                         return -1;
1854                 }
1855
1856                 for (j=0;j<ips->num;j++) {
1857                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1858                                 talloc_free(tmp_ctx);
1859                                 return nodemap->nodes[i].pnn;
1860                         }
1861                 }
1862                 talloc_free(ips);
1863         }
1864
1865         talloc_free(tmp_ctx);
1866         return -1;
1867 }
1868
1869 static uint32_t ipreallocate_finished;
1870
1871 /*
1872   handler for receiving the response to ipreallocate
1873 */
1874 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1875                              TDB_DATA data, void *private_data)
1876 {
1877         ipreallocate_finished = 1;
1878 }
1879
1880 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1881 {
1882         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1883
1884         event_add_timed(ctdb->ev, ctdb, 
1885                                 timeval_current_ofs(1, 0),
1886                                 ctdb_every_second, ctdb);
1887 }
1888
1889 /*
1890   ask the recovery daemon on the recovery master to perform a ip reallocation
1891  */
1892 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1893 {
1894         int i, ret;
1895         TDB_DATA data;
1896         struct takeover_run_reply rd;
1897         uint32_t recmaster;
1898         struct ctdb_node_map *nodemap=NULL;
1899         int retries=0;
1900         struct timeval tv = timeval_current();
1901
1902         /* we need some events to trigger so we can timeout and restart
1903            the loop
1904         */
1905         event_add_timed(ctdb->ev, ctdb, 
1906                                 timeval_current_ofs(1, 0),
1907                                 ctdb_every_second, ctdb);
1908
1909         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1910         if (rd.pnn == -1) {
1911                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1912                 return -1;
1913         }
1914         rd.srvid = getpid();
1915
1916         /* register a message port for receiveing the reply so that we
1917            can receive the reply
1918         */
1919         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1920
1921         data.dptr = (uint8_t *)&rd;
1922         data.dsize = sizeof(rd);
1923
1924 again:
1925         /* check that there are valid nodes available */
1926         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1927                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1928                 return -1;
1929         }
1930         for (i=0; i<nodemap->num;i++) {
1931                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1932                         break;
1933                 }
1934         }
1935         if (i==nodemap->num) {
1936                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1937                 return 0;
1938         }
1939
1940
1941         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
1942                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1943                 return -1;
1944         }
1945
1946         /* verify the node exists */
1947         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1948                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1949                 return -1;
1950         }
1951
1952
1953         /* check tha there are nodes available that can act as a recmaster */
1954         for (i=0; i<nodemap->num; i++) {
1955                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1956                         continue;
1957                 }
1958                 break;
1959         }
1960         if (i == nodemap->num) {
1961                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1962                 return 0;
1963         }
1964
1965         /* verify the recovery master is not STOPPED, nor BANNED */
1966         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1967                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1968                 retries++;
1969                 sleep(1);
1970                 goto again;
1971         } 
1972         
1973         /* verify the recovery master is not STOPPED, nor BANNED */
1974         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1975                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1976                 retries++;
1977                 sleep(1);
1978                 goto again;
1979         } 
1980
1981         ipreallocate_finished = 0;
1982         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1983         if (ret != 0) {
1984                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1985                 return -1;
1986         }
1987
1988         tv = timeval_current();
1989         /* this loop will terminate when we have received the reply */
1990         while (timeval_elapsed(&tv) < 5.0 && ipreallocate_finished == 0) {
1991                 event_loop_once(ctdb->ev);
1992         }
1993         if (ipreallocate_finished == 1) {
1994                 return 0;
1995         }
1996
1997         retries++;
1998         sleep(1);
1999         goto again;
2000
2001         return 0;
2002 }
2003
2004
2005 /*
2006   add a public ip address to a node
2007  */
2008 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2009 {
2010         int i, ret;
2011         int len, retries = 0;
2012         unsigned mask;
2013         ctdb_sock_addr addr;
2014         struct ctdb_control_ip_iface *pub;
2015         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2016         struct ctdb_all_public_ips *ips;
2017
2018
2019         if (argc != 2) {
2020                 talloc_free(tmp_ctx);
2021                 usage();
2022         }
2023
2024         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2025                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2026                 talloc_free(tmp_ctx);
2027                 return -1;
2028         }
2029
2030         /* read the public ip list from the node */
2031         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2032         if (ret != 0) {
2033                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2034                 talloc_free(tmp_ctx);
2035                 return -1;
2036         }
2037         for (i=0;i<ips->num;i++) {
2038                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2039                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2040                         return 0;
2041                 }
2042         }
2043
2044
2045
2046         /* Dont timeout. This command waits for an ip reallocation
2047            which sometimes can take wuite a while if there has
2048            been a recent recovery
2049         */
2050         alarm(0);
2051
2052         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2053         pub = talloc_size(tmp_ctx, len); 
2054         CTDB_NO_MEMORY(ctdb, pub);
2055
2056         pub->addr  = addr;
2057         pub->mask  = mask;
2058         pub->len   = strlen(argv[1])+1;
2059         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2060
2061         do {
2062                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2063                 if (ret != 0) {
2064                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2065                         sleep(3);
2066                         retries++;
2067                 }
2068         } while (retries < 5 && ret != 0);
2069         if (ret != 0) {
2070                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2071                 talloc_free(tmp_ctx);
2072                 return ret;
2073         }
2074
2075         if (rebalance_node(ctdb, options.pnn) != 0) {
2076                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2077                 return ret;
2078         }
2079
2080         talloc_free(tmp_ctx);
2081         return 0;
2082 }
2083
2084 /*
2085   add a public ip address to a node
2086  */
2087 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2088 {
2089         ctdb_sock_addr addr;
2090
2091         if (argc != 1) {
2092                 usage();
2093         }
2094
2095         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2096                 printf("Badly formed ip : %s\n", argv[0]);
2097                 return -1;
2098         }
2099
2100         printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
2101
2102         return 0;
2103 }
2104
2105 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2106
2107 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2108 {
2109         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2110         struct ctdb_node_map *nodemap=NULL;
2111         struct ctdb_all_public_ips *ips;
2112         int ret, i, j;
2113
2114         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2115         if (ret != 0) {
2116                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2117                 return ret;
2118         }
2119
2120         /* remove it from the nodes that are not hosting the ip currently */
2121         for(i=0;i<nodemap->num;i++){
2122                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2123                         continue;
2124                 }
2125                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
2126                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2127                         continue;
2128                 }
2129
2130                 for (j=0;j<ips->num;j++) {
2131                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2132                                 break;
2133                         }
2134                 }
2135                 if (j==ips->num) {
2136                         continue;
2137                 }
2138
2139                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2140                         continue;
2141                 }
2142
2143                 options.pnn = nodemap->nodes[i].pnn;
2144                 control_delip(ctdb, argc, argv);
2145         }
2146
2147
2148         /* remove it from every node (also the one hosting it) */
2149         for(i=0;i<nodemap->num;i++){
2150                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2151                         continue;
2152                 }
2153                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
2154                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2155                         continue;
2156                 }
2157
2158                 for (j=0;j<ips->num;j++) {
2159                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2160                                 break;
2161                         }
2162                 }
2163                 if (j==ips->num) {
2164                         continue;
2165                 }
2166
2167                 options.pnn = nodemap->nodes[i].pnn;
2168                 control_delip(ctdb, argc, argv);
2169         }
2170
2171         talloc_free(tmp_ctx);
2172         return 0;
2173 }
2174         
2175 /*
2176   delete a public ip address from a node
2177  */
2178 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2179 {
2180         int i, ret;
2181         int retries = 0;
2182         ctdb_sock_addr addr;
2183         struct ctdb_control_ip_iface pub;
2184         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2185         struct ctdb_all_public_ips *ips;
2186
2187         if (argc != 1) {
2188                 talloc_free(tmp_ctx);
2189                 usage();
2190         }
2191
2192         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2193                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2194                 return -1;
2195         }
2196
2197         if (options.pnn == CTDB_BROADCAST_ALL) {
2198                 return control_delip_all(ctdb, argc, argv, &addr);
2199         }
2200
2201         pub.addr  = addr;
2202         pub.mask  = 0;
2203         pub.len   = 0;
2204
2205         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2206         if (ret != 0) {
2207                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2208                 talloc_free(tmp_ctx);
2209                 return ret;
2210         }
2211         
2212         for (i=0;i<ips->num;i++) {
2213                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2214                         break;
2215                 }
2216         }
2217
2218         if (i==ips->num) {
2219                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2220                         ctdb_addr_to_str(&addr)));
2221                 talloc_free(tmp_ctx);
2222                 return -1;
2223         }
2224
2225         if (ips->ips[i].pnn == options.pnn) {
2226                 ret = find_other_host_for_public_ip(ctdb, &addr);
2227                 if (ret != -1) {
2228                         do {
2229                                 ret = move_ip(ctdb, &addr, ret);
2230                                 if (ret != 0) {
2231                                         DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 seconds and try again.\n", options.pnn));
2232                                         sleep(3);
2233                                         retries++;
2234                                 }
2235                         } while (retries < 5 && ret != 0);
2236                         if (ret != 0) {
2237                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", options.pnn));
2238                                 return -1;
2239                         }
2240                 }
2241         }
2242
2243         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2244         if (ret != 0) {
2245                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2246                 talloc_free(tmp_ctx);
2247                 return ret;
2248         }
2249
2250         talloc_free(tmp_ctx);
2251         return 0;
2252 }
2253
2254 /*
2255   kill a tcp connection
2256  */
2257 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2258 {
2259         int ret;
2260         struct ctdb_control_killtcp killtcp;
2261
2262         if (argc < 2) {
2263                 usage();
2264         }
2265
2266         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2267                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2268                 return -1;
2269         }
2270
2271         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2272                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2273                 return -1;
2274         }
2275
2276         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2277         if (ret != 0) {
2278                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2279                 return ret;
2280         }
2281
2282         return 0;
2283 }
2284
2285
2286 /*
2287   send a gratious arp
2288  */
2289 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2290 {
2291         int ret;
2292         ctdb_sock_addr addr;
2293
2294         if (argc < 2) {
2295                 usage();
2296         }
2297
2298         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2299                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2300                 return -1;
2301         }
2302
2303         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2304         if (ret != 0) {
2305                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2306                 return ret;
2307         }
2308
2309         return 0;
2310 }
2311
2312 /*
2313   register a server id
2314  */
2315 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2316 {
2317         int ret;
2318         struct ctdb_server_id server_id;
2319
2320         if (argc < 3) {
2321                 usage();
2322         }
2323
2324         server_id.pnn       = strtoul(argv[0], NULL, 0);
2325         server_id.type      = strtoul(argv[1], NULL, 0);
2326         server_id.server_id = strtoul(argv[2], NULL, 0);
2327
2328         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2329         if (ret != 0) {
2330                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2331                 return ret;
2332         }
2333         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2334         sleep(999);
2335         return -1;
2336 }
2337
2338 /*
2339   unregister a server id
2340  */
2341 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2342 {
2343         int ret;
2344         struct ctdb_server_id server_id;
2345
2346         if (argc < 3) {
2347                 usage();
2348         }
2349
2350         server_id.pnn       = strtoul(argv[0], NULL, 0);
2351         server_id.type      = strtoul(argv[1], NULL, 0);
2352         server_id.server_id = strtoul(argv[2], NULL, 0);
2353
2354         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2355         if (ret != 0) {
2356                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2357                 return ret;
2358         }
2359         return -1;
2360 }
2361
2362 /*
2363   check if a server id exists
2364  */
2365 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2366 {
2367         uint32_t status;
2368         int ret;
2369         struct ctdb_server_id server_id;
2370
2371         if (argc < 3) {
2372                 usage();
2373         }
2374
2375         server_id.pnn       = strtoul(argv[0], NULL, 0);
2376         server_id.type      = strtoul(argv[1], NULL, 0);
2377         server_id.server_id = strtoul(argv[2], NULL, 0);
2378
2379         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2380         if (ret != 0) {
2381                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2382                 return ret;
2383         }
2384
2385         if (status) {
2386                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2387         } else {
2388                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2389         }
2390         return 0;
2391 }
2392
2393 /*
2394   get a list of all server ids that are registered on a node
2395  */
2396 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2397 {
2398         int i, ret;
2399         struct ctdb_server_id_list *server_ids;
2400
2401         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2402         if (ret != 0) {
2403                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2404                 return ret;
2405         }
2406
2407         for (i=0; i<server_ids->num; i++) {
2408                 printf("Server id %d:%d:%d\n", 
2409                         server_ids->server_ids[i].pnn, 
2410                         server_ids->server_ids[i].type, 
2411                         server_ids->server_ids[i].server_id); 
2412         }
2413
2414         return -1;
2415 }
2416
2417 /*
2418   check if a server id exists
2419  */
2420 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2421 {
2422         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2423         uint64_t *ids;
2424         uint8_t *result;
2425         int i;
2426
2427         if (argc < 1) {
2428                 talloc_free(tmp_ctx);
2429                 usage();
2430         }
2431
2432         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2433         result = talloc_array(tmp_ctx, uint8_t, argc);
2434
2435         for (i = 0; i < argc; i++) {
2436                 ids[i] = strtoull(argv[i], NULL, 0);
2437         }
2438
2439         if (!ctdb_check_message_handlers(ctdb_connection,
2440                 options.pnn, argc, ids, result)) {
2441                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2442                                   options.pnn));
2443                 talloc_free(tmp_ctx);
2444                 return -1;
2445         }
2446
2447         for (i=0; i < argc; i++) {
2448                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2449                        result[i] ? "exists" : "does not exist");
2450         }
2451
2452         talloc_free(tmp_ctx);
2453         return 0;
2454 }
2455
2456 /*
2457   send a tcp tickle ack
2458  */
2459 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2460 {
2461         int ret;
2462         ctdb_sock_addr  src, dst;
2463
2464         if (argc < 2) {
2465                 usage();
2466         }
2467
2468         if (!parse_ip_port(argv[0], &src)) {
2469                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2470                 return -1;
2471         }
2472
2473         if (!parse_ip_port(argv[1], &dst)) {
2474                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2475                 return -1;
2476         }
2477
2478         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2479         if (ret==0) {
2480                 return 0;
2481         }
2482         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2483
2484         return -1;
2485 }
2486
2487
2488 /*
2489   display public ip status
2490  */
2491 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2492 {
2493         int i, ret;
2494         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2495         struct ctdb_all_public_ips *ips;
2496
2497         if (options.pnn == CTDB_BROADCAST_ALL) {
2498                 /* read the list of public ips from all nodes */
2499                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2500         } else {
2501                 /* read the public ip list from this node */
2502                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2503         }
2504         if (ret != 0) {
2505                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2506                 talloc_free(tmp_ctx);
2507                 return ret;
2508         }
2509
2510         if (options.machinereadable){
2511                 printf(":Public IP:Node:");
2512                 if (options.verbose){
2513                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2514                 }
2515                 printf("\n");
2516         } else {
2517                 if (options.pnn == CTDB_BROADCAST_ALL) {
2518                         printf("Public IPs on ALL nodes\n");
2519                 } else {
2520                         printf("Public IPs on node %u\n", options.pnn);
2521                 }
2522         }
2523
2524         for (i=1;i<=ips->num;i++) {
2525                 struct ctdb_control_public_ip_info *info = NULL;
2526                 int32_t pnn;
2527                 char *aciface = NULL;
2528                 char *avifaces = NULL;
2529                 char *cifaces = NULL;
2530
2531                 if (options.pnn == CTDB_BROADCAST_ALL) {
2532                         pnn = ips->ips[ips->num-i].pnn;
2533                 } else {
2534                         pnn = options.pnn;
2535                 }
2536
2537                 if (pnn != -1) {
2538                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
2539                                                    &ips->ips[ips->num-i].addr, &info);
2540                 } else {
2541                         ret = -1;
2542                 }
2543
2544                 if (ret == 0) {
2545                         int j;
2546                         for (j=0; j < info->num; j++) {
2547                                 if (cifaces == NULL) {
2548                                         cifaces = talloc_strdup(info,
2549                                                                 info->ifaces[j].name);
2550                                 } else {
2551                                         cifaces = talloc_asprintf_append(cifaces,
2552                                                                          ",%s",
2553                                                                          info->ifaces[j].name);
2554                                 }
2555
2556                                 if (info->active_idx == j) {
2557                                         aciface = info->ifaces[j].name;
2558                                 }
2559
2560                                 if (info->ifaces[j].link_state == 0) {
2561                                         continue;
2562                                 }
2563
2564                                 if (avifaces == NULL) {
2565                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
2566                                 } else {
2567                                         avifaces = talloc_asprintf_append(avifaces,
2568                                                                           ",%s",
2569                                                                           info->ifaces[j].name);
2570                                 }
2571                         }
2572                 }
2573
2574                 if (options.machinereadable){
2575                         printf(":%s:%d:",
2576                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2577                                 ips->ips[ips->num-i].pnn);
2578                         if (options.verbose){
2579                                 printf("%s:%s:%s:",
2580                                         aciface?aciface:"",
2581                                         avifaces?avifaces:"",
2582                                         cifaces?cifaces:"");
2583                         }
2584                         printf("\n");
2585                 } else {
2586                         if (options.verbose) {
2587                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
2588                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2589                                         ips->ips[ips->num-i].pnn,
2590                                         aciface?aciface:"",
2591                                         avifaces?avifaces:"",
2592                                         cifaces?cifaces:"");
2593                         } else {
2594                                 printf("%s %d\n",
2595                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2596                                         ips->ips[ips->num-i].pnn);
2597                         }
2598                 }
2599                 talloc_free(info);
2600         }
2601
2602         talloc_free(tmp_ctx);
2603         return 0;
2604 }
2605
2606 /*
2607   public ip info
2608  */
2609 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
2610 {
2611         int i, ret;
2612         ctdb_sock_addr addr;
2613         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2614         struct ctdb_control_public_ip_info *info;
2615
2616         if (argc != 1) {
2617                 talloc_free(tmp_ctx);
2618                 usage();
2619         }
2620
2621         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2622                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2623                 return -1;
2624         }
2625
2626         /* read the public ip info from this node */
2627         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
2628                                            tmp_ctx, &addr, &info);
2629         if (ret != 0) {
2630                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
2631                                   argv[0], options.pnn));
2632                 talloc_free(tmp_ctx);
2633                 return ret;
2634         }
2635
2636         printf("Public IP[%s] info on node %u\n",
2637                ctdb_addr_to_str(&info->ip.addr),
2638                options.pnn);
2639
2640         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
2641                ctdb_addr_to_str(&info->ip.addr),
2642                info->ip.pnn, info->num);
2643
2644         for (i=0; i<info->num; i++) {
2645                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2646
2647                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
2648                        i+1, info->ifaces[i].name,
2649                        info->ifaces[i].link_state?"up":"down",
2650                        (unsigned int)info->ifaces[i].references,
2651                        (i==info->active_idx)?" (active)":"");
2652         }
2653
2654         talloc_free(tmp_ctx);
2655         return 0;
2656 }
2657
2658 /*
2659   display interfaces status
2660  */
2661 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
2662 {
2663         int i;
2664         struct ctdb_ifaces_list *ifaces;
2665
2666         /* read the public ip list from this node */
2667         if (!ctdb_getifaces(ctdb_connection, options.pnn, &ifaces)) {
2668                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
2669                                   options.pnn));
2670                 return -1;
2671         }
2672
2673         if (options.machinereadable){
2674                 printf(":Name:LinkStatus:References:\n");
2675         } else {
2676                 printf("Interfaces on node %u\n", options.pnn);
2677         }
2678
2679         for (i=0; i<ifaces->num; i++) {
2680                 if (options.machinereadable){
2681                         printf(":%s:%s:%u\n",
2682                                ifaces->ifaces[i].name,
2683                                ifaces->ifaces[i].link_state?"1":"0",
2684                                (unsigned int)ifaces->ifaces[i].references);
2685                 } else {
2686                         printf("name:%s link:%s references:%u\n",
2687                                ifaces->ifaces[i].name,
2688                                ifaces->ifaces[i].link_state?"up":"down",
2689                                (unsigned int)ifaces->ifaces[i].references);
2690                 }
2691         }
2692
2693         ctdb_free_ifaces(ifaces);
2694         return 0;
2695 }
2696
2697
2698 /*
2699   set link status of an interface
2700  */
2701 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
2702 {
2703         int ret;
2704         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2705         struct ctdb_control_iface_info info;
2706
2707         ZERO_STRUCT(info);
2708
2709         if (argc != 2) {
2710                 usage();
2711         }
2712
2713         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
2714                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
2715                                   argv[0]));
2716                 talloc_free(tmp_ctx);
2717                 return -1;
2718         }
2719         strcpy(info.name, argv[0]);
2720
2721         if (strcmp(argv[1], "up") == 0) {
2722                 info.link_state = 1;
2723         } else if (strcmp(argv[1], "down") == 0) {
2724                 info.link_state = 0;
2725         } else {
2726                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
2727                                   argv[1]));
2728                 talloc_free(tmp_ctx);
2729                 return -1;
2730         }
2731
2732         /* read the public ip list from this node */
2733         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
2734                                    tmp_ctx, &info);
2735         if (ret != 0) {
2736                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
2737                                   argv[0], options.pnn));
2738                 talloc_free(tmp_ctx);
2739                 return ret;
2740         }
2741
2742         talloc_free(tmp_ctx);
2743         return 0;
2744 }
2745
2746 /*
2747   display pid of a ctdb daemon
2748  */
2749 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
2750 {
2751         uint32_t pid;
2752         int ret;
2753
2754         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
2755         if (ret != 0) {
2756                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
2757                 return ret;
2758         }
2759         printf("Pid:%d\n", pid);
2760
2761         return 0;
2762 }
2763
2764 /*
2765   disable a remote node
2766  */
2767 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2768 {
2769         int ret;
2770         struct ctdb_node_map *nodemap=NULL;
2771
2772         /* check if the node is already disabled */
2773         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2774                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2775                 exit(10);
2776         }
2777         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2778                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2779                 return 0;
2780         }
2781
2782         do {
2783                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2784                 if (ret != 0) {
2785                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2786                         return ret;
2787                 }
2788
2789                 sleep(1);
2790
2791                 /* read the nodemap and verify the change took effect */
2792                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2793                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2794                         exit(10);
2795                 }
2796
2797         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2798         ret = control_ipreallocate(ctdb, argc, argv);
2799         if (ret != 0) {
2800                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2801                 return ret;
2802         }
2803
2804         return 0;
2805 }
2806
2807 /*
2808   enable a disabled remote node
2809  */
2810 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2811 {
2812         int ret;
2813
2814         struct ctdb_node_map *nodemap=NULL;
2815
2816
2817         /* check if the node is already enabled */
2818         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2819                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2820                 exit(10);
2821         }
2822         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2823                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2824                 return 0;
2825         }
2826
2827         do {
2828                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2829                 if (ret != 0) {
2830                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2831                         return ret;
2832                 }
2833
2834                 sleep(1);
2835
2836                 /* read the nodemap and verify the change took effect */
2837                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2838                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2839                         exit(10);
2840                 }
2841
2842         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2843
2844         ret = control_ipreallocate(ctdb, argc, argv);
2845         if (ret != 0) {
2846                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2847                 return ret;
2848         }
2849
2850         return 0;
2851 }
2852
2853 /*
2854   stop a remote node
2855  */
2856 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2857 {
2858         int ret;
2859         struct ctdb_node_map *nodemap=NULL;
2860
2861         do {
2862                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2863                 if (ret != 0) {
2864                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2865                 }
2866         
2867                 sleep(1);
2868
2869                 /* read the nodemap and verify the change took effect */
2870                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2871                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2872                         exit(10);
2873                 }
2874
2875         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2876         ret = control_ipreallocate(ctdb, argc, argv);
2877         if (ret != 0) {
2878                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2879                 return ret;
2880         }
2881
2882         return 0;
2883 }
2884
2885 /*
2886   restart a stopped remote node
2887  */
2888 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2889 {
2890         int ret;
2891
2892         struct ctdb_node_map *nodemap=NULL;
2893
2894         do {
2895                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2896                 if (ret != 0) {
2897                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2898                         return ret;
2899                 }
2900         
2901                 sleep(1);
2902
2903                 /* read the nodemap and verify the change took effect */
2904                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2905                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2906                         exit(10);
2907                 }
2908
2909         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2910         ret = control_ipreallocate(ctdb, argc, argv);
2911         if (ret != 0) {
2912                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2913                 return ret;
2914         }
2915
2916         return 0;
2917 }
2918
2919 static uint32_t get_generation(struct ctdb_context *ctdb)
2920 {
2921         struct ctdb_vnn_map *vnnmap=NULL;
2922         int ret;
2923
2924         /* wait until the recmaster is not in recovery mode */
2925         while (1) {
2926                 uint32_t recmode, recmaster;
2927                 
2928                 if (vnnmap != NULL) {
2929                         talloc_free(vnnmap);
2930                         vnnmap = NULL;
2931                 }
2932
2933                 /* get the recmaster */
2934                 if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
2935                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2936                         exit(10);
2937                 }
2938
2939                 /* get recovery mode */
2940                 if (!ctdb_getrecmode(ctdb_connection, recmaster, &recmode)) {
2941                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2942                         exit(10);
2943                 }
2944
2945                 /* get the current generation number */
2946                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2947                 if (ret != 0) {
2948                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2949                         exit(10);
2950                 }
2951
2952                 if ((recmode == CTDB_RECOVERY_NORMAL)
2953                 &&  (vnnmap->generation != 1)){
2954                         return vnnmap->generation;
2955                 }
2956                 sleep(1);
2957         }
2958 }
2959
2960 /*
2961   ban a node from the cluster
2962  */
2963 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2964 {
2965         int ret;
2966         struct ctdb_node_map *nodemap=NULL;
2967         struct ctdb_ban_time bantime;
2968
2969         if (argc < 1) {
2970                 usage();
2971         }
2972         
2973         /* verify the node exists */
2974         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2975         if (ret != 0) {
2976                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2977                 return ret;
2978         }
2979
2980         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2981                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2982                 return -1;
2983         }
2984
2985         bantime.pnn  = options.pnn;
2986         bantime.time = strtoul(argv[0], NULL, 0);
2987
2988         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2989         if (ret != 0) {
2990                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2991                 return -1;
2992         }       
2993
2994         ret = control_ipreallocate(ctdb, argc, argv);
2995         if (ret != 0) {
2996                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2997                 return ret;
2998         }
2999
3000         return 0;
3001 }
3002
3003
3004 /*
3005   unban a node from the cluster
3006  */
3007 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3008 {
3009         int ret;
3010         struct ctdb_node_map *nodemap=NULL;
3011         struct ctdb_ban_time bantime;
3012
3013         /* verify the node exists */
3014         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3015         if (ret != 0) {
3016                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3017                 return ret;
3018         }
3019
3020         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
3021                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
3022                 return -1;
3023         }
3024
3025         bantime.pnn  = options.pnn;
3026         bantime.time = 0;
3027
3028         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
3029         if (ret != 0) {
3030                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
3031                 return -1;
3032         }       
3033
3034         ret = control_ipreallocate(ctdb, argc, argv);
3035         if (ret != 0) {
3036                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
3037                 return ret;
3038         }
3039
3040         return 0;
3041 }
3042
3043
3044 /*
3045   show ban information for a node
3046  */
3047 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3048 {
3049         int ret;
3050         struct ctdb_node_map *nodemap=NULL;
3051         struct ctdb_ban_time *bantime;
3052
3053         /* verify the node exists */
3054         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3055         if (ret != 0) {
3056                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3057                 return ret;
3058         }
3059
3060         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3061         if (ret != 0) {
3062                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3063                 return -1;
3064         }       
3065
3066         if (bantime->time == 0) {
3067                 printf("Node %u is not banned\n", bantime->pnn);
3068         } else {
3069                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
3070         }
3071
3072         return 0;
3073 }
3074
3075 /*
3076   shutdown a daemon
3077  */
3078 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3079 {
3080         int ret;
3081
3082         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3083         if (ret != 0) {
3084                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3085                 return ret;
3086         }
3087
3088         return 0;
3089 }
3090
3091 /*
3092   trigger a recovery
3093  */
3094 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3095 {
3096         int ret;
3097         uint32_t generation, next_generation;
3098
3099         /* record the current generation number */
3100         generation = get_generation(ctdb);
3101
3102         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
3103         if (ret != 0) {
3104                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
3105                 return ret;
3106         }
3107
3108         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3109         if (ret != 0) {
3110                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3111                 return ret;
3112         }
3113
3114         /* wait until we are in a new generation */
3115         while (1) {
3116                 next_generation = get_generation(ctdb);
3117                 if (next_generation != generation) {
3118                         return 0;
3119                 }
3120                 sleep(1);
3121         }
3122
3123         return 0;
3124 }
3125
3126
3127 /*
3128   display monitoring mode of a remote node
3129  */
3130 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3131 {
3132         uint32_t monmode;
3133         int ret;
3134
3135         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3136         if (ret != 0) {
3137                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3138                 return ret;
3139         }
3140         if (!options.machinereadable){
3141                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3142         } else {
3143                 printf(":mode:\n");
3144                 printf(":%d:\n",monmode);
3145         }
3146         return 0;
3147 }
3148
3149
3150 /*
3151   display capabilities of a remote node
3152  */
3153 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3154 {
3155         uint32_t capabilities;
3156         int ret;
3157
3158         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
3159         if (ret != 0) {
3160                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3161                 return ret;
3162         }
3163         
3164         if (!options.machinereadable){
3165                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3166                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3167                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3168                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3169         } else {
3170                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
3171                 printf(":%d:%d:%d:%d:\n",
3172                         !!(capabilities&CTDB_CAP_RECMASTER),
3173                         !!(capabilities&CTDB_CAP_LMASTER),
3174                         !!(capabilities&CTDB_CAP_LVS),
3175                         !!(capabilities&CTDB_CAP_NATGW));
3176         }
3177         return 0;
3178 }
3179
3180 /*
3181   display lvs configuration
3182  */
3183 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3184 {
3185         uint32_t *capabilities;
3186         struct ctdb_node_map *nodemap=NULL;
3187         int i, ret;
3188         int healthy_count = 0;
3189
3190         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3191         if (ret != 0) {
3192                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3193                 return ret;
3194         }
3195
3196         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3197         CTDB_NO_MEMORY(ctdb, capabilities);
3198         
3199         /* collect capabilities for all connected nodes */
3200         for (i=0; i<nodemap->num; i++) {
3201                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3202                         continue;
3203                 }
3204                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3205                         continue;
3206                 }
3207         
3208                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3209                 if (ret != 0) {
3210                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3211                         return ret;
3212                 }
3213
3214                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3215                         continue;
3216                 }
3217
3218                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3219                         healthy_count++;
3220                 }
3221         }
3222
3223         /* Print all LVS nodes */
3224         for (i=0; i<nodemap->num; i++) {
3225                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3226                         continue;
3227                 }
3228                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3229                         continue;
3230                 }
3231                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3232                         continue;
3233                 }
3234
3235                 if (healthy_count != 0) {
3236                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3237                                 continue;
3238                         }
3239                 }
3240
3241                 printf("%d:%s\n", i, 
3242                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
3243         }
3244
3245         return 0;
3246 }
3247
3248 /*
3249   display who is the lvs master
3250  */
3251 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3252 {
3253         uint32_t *capabilities;
3254         struct ctdb_node_map *nodemap=NULL;
3255         int i, ret;
3256         int healthy_count = 0;
3257
3258         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3259         if (ret != 0) {
3260                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3261                 return ret;
3262         }
3263
3264         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3265         CTDB_NO_MEMORY(ctdb, capabilities);
3266         
3267         /* collect capabilities for all connected nodes */
3268         for (i=0; i<nodemap->num; i++) {
3269                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3270                         continue;
3271                 }
3272                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3273                         continue;
3274                 }
3275         
3276                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3277                 if (ret != 0) {
3278                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3279                         return ret;
3280                 }
3281
3282                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3283                         continue;
3284                 }
3285
3286                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3287                         healthy_count++;
3288                 }
3289         }
3290
3291         /* find and show the lvsmaster */
3292         for (i=0; i<nodemap->num; i++) {
3293                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3294                         continue;
3295                 }
3296                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3297                         continue;
3298                 }
3299                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3300                         continue;
3301                 }
3302
3303                 if (healthy_count != 0) {
3304                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3305                                 continue;
3306                         }
3307                 }
3308
3309                 if (options.machinereadable){
3310                         printf("%d\n", i);
3311                 } else {
3312                         printf("Node %d is LVS master\n", i);
3313                 }
3314                 return 0;
3315         }
3316
3317         printf("There is no LVS master\n");
3318         return -1;
3319 }
3320
3321 /*
3322   disable monitoring on a  node
3323  */
3324 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3325 {
3326         
3327         int ret;
3328
3329         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3330         if (ret != 0) {
3331                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3332                 return ret;
3333         }
3334         printf("Monitoring mode:%s\n","DISABLED");
3335
3336         return 0;
3337 }
3338
3339 /*
3340   enable monitoring on a  node
3341  */
3342 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3343 {
3344         
3345         int ret;
3346
3347         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3348         if (ret != 0) {
3349                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3350                 return ret;
3351         }
3352         printf("Monitoring mode:%s\n","ACTIVE");
3353
3354         return 0;
3355 }
3356
3357 /*
3358   display remote list of keys/data for a db
3359  */
3360 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3361 {
3362         const char *db_name;
3363         struct ctdb_db_context *ctdb_db;
3364         int ret;
3365         bool persistent;
3366         struct ctdb_dump_db_context c;
3367
3368         if (argc < 1) {
3369                 usage();
3370         }
3371
3372         db_name = argv[0];
3373
3374
3375         if (db_exists(ctdb, db_name, &persistent)) {
3376                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3377                 return -1;
3378         }
3379
3380         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3381
3382         if (ctdb_db == NULL) {
3383                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3384                 return -1;
3385         }
3386
3387         if (options.printlmaster) {
3388                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3389                                           ctdb, &ctdb->vnn_map);
3390                 if (ret != 0) {
3391                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3392                                           options.pnn));
3393                         return ret;
3394                 }
3395         }
3396
3397         ZERO_STRUCT(c);
3398         c.f = stdout;
3399         c.printemptyrecords = (bool)options.printemptyrecords;
3400         c.printdatasize = (bool)options.printdatasize;
3401         c.printlmaster = (bool)options.printlmaster;
3402         c.printhash = (bool)options.printhash;
3403         c.printrecordflags = (bool)options.printrecordflags;
3404
3405         /* traverse and dump the cluster tdb */
3406         ret = ctdb_dump_db(ctdb_db, &c);
3407         if (ret == -1) {
3408                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3409                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3410                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3411                                   db_name));
3412                 return -1;
3413         }
3414         talloc_free(ctdb_db);
3415
3416         printf("Dumped %d records\n", ret);
3417         return 0;
3418 }
3419
3420 struct cattdb_data {
3421         struct ctdb_context *ctdb;
3422         uint32_t count;
3423 };
3424
3425 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3426 {
3427         struct cattdb_data *d = private_data;
3428         struct ctdb_dump_db_context c;
3429
3430         d->count++;
3431
3432         ZERO_STRUCT(c);
3433         c.f = stdout;
3434         c.printemptyrecords = (bool)options.printemptyrecords;
3435         c.printdatasize = (bool)options.printdatasize;
3436         c.printlmaster = false;
3437         c.printhash = (bool)options.printhash;
3438         c.printrecordflags = true;
3439
3440         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3441 }
3442
3443 /*
3444   cat the local tdb database using same format as catdb
3445  */
3446 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3447 {
3448         const char *db_name;
3449         struct ctdb_db_context *ctdb_db;
3450         struct cattdb_data d;
3451         bool persistent;
3452
3453         if (argc < 1) {
3454                 usage();
3455         }
3456
3457         db_name = argv[0];
3458
3459
3460         if (db_exists(ctdb, db_name, &persistent)) {
3461                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3462                 return -1;
3463         }
3464
3465         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3466
3467         if (ctdb_db == NULL) {
3468                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3469                 return -1;
3470         }
3471
3472         /* traverse the local tdb */
3473         d.count = 0;
3474         d.ctdb  = ctdb;
3475         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3476                 printf("Failed to cattdb data\n");
3477                 exit(10);
3478         }
3479         talloc_free(ctdb_db);
3480
3481         printf("Dumped %d records\n", d.count);
3482         return 0;
3483 }
3484
3485 /*
3486   display the content of a database key
3487  */
3488 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3489 {
3490         const char *db_name;
3491         struct ctdb_db_context *ctdb_db;
3492         struct ctdb_record_handle *h;
3493         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3494         TDB_DATA key, data;
3495         bool persistent;
3496
3497         if (argc < 2) {
3498                 usage();
3499         }
3500
3501         db_name = argv[0];
3502
3503         if (db_exists(ctdb, db_name, &persistent)) {
3504                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3505                 return -1;
3506         }
3507
3508         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3509
3510         if (ctdb_db == NULL) {
3511                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3512                 return -1;
3513         }
3514
3515         key.dptr  = discard_const(argv[1]);
3516         key.dsize = strlen((char *)key.dptr);
3517
3518         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3519         if (h == NULL) {
3520                 printf("Failed to fetch record '%s' on node %d\n", 
3521                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3522                 talloc_free(tmp_ctx);
3523                 exit(10);
3524         }
3525
3526         printf("Data: size:%d ptr:[%s]\n", (int)data.dsize, data.dptr);
3527
3528         talloc_free(ctdb_db);
3529         talloc_free(tmp_ctx);
3530         return 0;
3531 }
3532
3533 /*
3534   display the content of a database key
3535  */
3536 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3537 {
3538         const char *db_name;
3539         struct ctdb_db_context *ctdb_db;
3540         struct ctdb_record_handle *h;
3541         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3542         TDB_DATA key, data;
3543         bool persistent;
3544
3545         if (argc < 3) {
3546                 usage();
3547         }
3548
3549         db_name = argv[0];
3550
3551         if (db_exists(ctdb, db_name, &persistent)) {
3552                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3553                 return -1;
3554         }
3555
3556         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3557
3558         if (ctdb_db == NULL) {
3559                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3560                 return -1;
3561         }
3562
3563         key.dptr  = discard_const(argv[1]);
3564         key.dsize = strlen((char *)key.dptr);
3565
3566         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3567         if (h == NULL) {
3568                 printf("Failed to fetch record '%s' on node %d\n", 
3569                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3570                 talloc_free(tmp_ctx);
3571                 exit(10);
3572         }
3573
3574         data.dptr  = discard_const(argv[2]);
3575         data.dsize = strlen((char *)data.dptr);
3576
3577         if (ctdb_record_store(h, data) != 0) {
3578                 printf("Failed to store record\n");
3579         }
3580
3581         talloc_free(h);
3582         talloc_free(ctdb_db);
3583         talloc_free(tmp_ctx);
3584         return 0;
3585 }
3586
3587 /*
3588   fetch a record from a persistent database
3589  */
3590 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3591 {
3592         const char *db_name;
3593         struct ctdb_db_context *ctdb_db;
3594         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3595         struct ctdb_transaction_handle *h;
3596         TDB_DATA key, data;
3597         int fd, ret;
3598         bool persistent;
3599
3600         if (argc < 2) {
3601                 talloc_free(tmp_ctx);
3602                 usage();
3603         }
3604
3605         db_name = argv[0];
3606
3607
3608         if (db_exists(ctdb, db_name, &persistent)) {
3609                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3610                 talloc_free(tmp_ctx);
3611                 return -1;
3612         }
3613
3614         if (!persistent) {
3615                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3616                 talloc_free(tmp_ctx);
3617                 return -1;
3618         }
3619
3620         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3621
3622         if (ctdb_db == NULL) {
3623                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3624                 talloc_free(tmp_ctx);
3625                 return -1;
3626         }
3627
3628         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3629         if (h == NULL) {
3630                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3631                 talloc_free(tmp_ctx);
3632                 return -1;
3633         }
3634
3635         key.dptr  = discard_const(argv[1]);
3636         key.dsize = strlen(argv[1]);
3637         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3638         if (ret != 0) {
3639                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
3640                 talloc_free(tmp_ctx);
3641                 return -1;
3642         }
3643
3644         if (data.dsize == 0 || data.dptr == NULL) {
3645                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3646                 talloc_free(tmp_ctx);
3647                 return -1;
3648         }
3649
3650         if (argc == 3) {
3651           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3652                 if (fd == -1) {
3653                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3654                         talloc_free(tmp_ctx);
3655                         return -1;
3656                 }
3657                 write(fd, data.dptr, data.dsize);
3658                 close(fd);
3659         } else {
3660                 write(1, data.dptr, data.dsize);
3661         }
3662
3663         /* abort the transaction */
3664         talloc_free(h);
3665
3666
3667         talloc_free(tmp_ctx);
3668         return 0;
3669 }
3670
3671 /*
3672   fetch a record from a tdb-file
3673  */
3674 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3675 {
3676         const char *tdb_file;
3677         TDB_CONTEXT *tdb;
3678         TDB_DATA key, data;
3679         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3680         int fd;
3681
3682         if (argc < 2) {
3683                 usage();
3684         }
3685
3686         tdb_file = argv[0];
3687
3688         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
3689         if (tdb == NULL) {
3690                 printf("Failed to open TDB file %s\n", tdb_file);
3691                 return -1;
3692         }
3693
3694         if (!strncmp(argv[1], "0x", 2)) {
3695                 key = hextodata(tmp_ctx, argv[1] + 2);
3696                 if (key.dsize == 0) {
3697                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3698                         return -1;
3699                 }
3700         } else {
3701                 key.dptr  = discard_const(argv[1]);
3702                 key.dsize = strlen(argv[1]);
3703         }
3704
3705         data = tdb_fetch(tdb, key);
3706         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
3707                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
3708                 tdb_close(tdb);
3709                 return -1;
3710         }
3711
3712         tdb_close(tdb);
3713
3714         if (argc == 3) {
3715           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3716                 if (fd == -1) {
3717                         printf("Failed to open output file %s\n", argv[2]);
3718                         return -1;
3719                 }
3720                 if (options.verbose){
3721                         write(fd, data.dptr, data.dsize);
3722                 } else {
3723                         write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3724                 }
3725                 close(fd);
3726         } else {
3727                 if (options.verbose){
3728                         write(1, data.dptr, data.dsize);
3729                 } else {
3730                         write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3731                 }
3732         }
3733
3734         talloc_free(tmp_ctx);
3735         return 0;
3736 }
3737
3738 /*
3739   store a record and header to a tdb-file
3740  */
3741 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
3742 {
3743         const char *tdb_file;
3744         TDB_CONTEXT *tdb;
3745         TDB_DATA key, data;
3746         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3747
3748         if (argc < 3) {
3749                 usage();
3750         }
3751
3752         tdb_file = argv[0];
3753
3754         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
3755         if (tdb == NULL) {
3756                 printf("Failed to open TDB file %s\n", tdb_file);
3757                 return -1;
3758         }
3759
3760         if (!strncmp(argv[1], "0x", 2)) {
3761                 key = hextodata(tmp_ctx, argv[1] + 2);
3762                 if (key.dsize == 0) {
3763                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3764                         return -1;
3765                 }
3766         } else {
3767                 key.dptr  = discard_const(argv[1]);
3768                 key.dsize = strlen(argv[1]);
3769         }
3770
3771         if (!strncmp(argv[2], "0x", 2)) {
3772                 data = hextodata(tmp_ctx, argv[2] + 2);
3773                 if (data.dsize == 0) {
3774                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
3775                         return -1;
3776                 }
3777         } else {
3778                 data.dptr  = discard_const(argv[2]);
3779                 data.dsize = strlen(argv[2]);
3780         }
3781
3782         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3783                 printf("Not enough data. You must specify the full ctdb_ltdb_header too when storing\n");
3784                 return -1;
3785         }
3786         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
3787                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
3788                 tdb_close(tdb);
3789                 return -1;
3790         }
3791
3792         tdb_close(tdb);
3793
3794         talloc_free(tmp_ctx);
3795         return 0;
3796 }
3797
3798 /*
3799   write a record to a persistent database
3800  */
3801 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
3802 {
3803         const char *db_name;
3804         struct ctdb_db_context *ctdb_db;
3805         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3806         struct ctdb_transaction_handle *h;
3807         struct stat st;
3808         TDB_DATA key, data;
3809         int fd, ret;
3810
3811         if (argc < 3) {
3812                 talloc_free(tmp_ctx);
3813                 usage();
3814         }
3815
3816         fd = open(argv[2], O_RDONLY);
3817         if (fd == -1) {
3818                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
3819                 talloc_free(tmp_ctx);
3820                 return -1;
3821         }
3822         
3823         ret = fstat(fd, &st);
3824         if (ret == -1) {
3825                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
3826                 close(fd);
3827                 talloc_free(tmp_ctx);
3828                 return -1;
3829         }
3830
3831         if (!S_ISREG(st.st_mode)) {
3832                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
3833                 close(fd);
3834                 talloc_free(tmp_ctx);
3835                 return -1;
3836         }
3837
3838         data.dsize = st.st_size;
3839         if (data.dsize == 0) {
3840                 data.dptr  = NULL;
3841         } else {
3842                 data.dptr = talloc_size(tmp_ctx, data.dsize);
3843                 if (data.dptr == NULL) {
3844                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
3845                         close(fd);
3846                         talloc_free(tmp_ctx);
3847                         return -1;
3848                 }
3849                 ret = read(fd, data.dptr, data.dsize);
3850                 if (ret != data.dsize) {
3851                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
3852                         close(fd);
3853                         talloc_free(tmp_ctx);
3854                         return -1;
3855                 }
3856         }
3857         close(fd);
3858
3859
3860         db_name = argv[0];
3861
3862         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
3863         if (ctdb_db == NULL) {
3864                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3865                 talloc_free(tmp_ctx);
3866                 return -1;
3867         }
3868
3869         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3870         if (h == NULL) {
3871                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3872                 talloc_free(tmp_ctx);
3873                 return -1;
3874         }
3875
3876         key.dptr  = discard_const(argv[1]);
3877         key.dsize = strlen(argv[1]);
3878         ret = ctdb_transaction_store(h, key, data);
3879         if (ret != 0) {
3880                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
3881                 talloc_free(tmp_ctx);
3882                 return -1;
3883         }
3884
3885         ret = ctdb_transaction_commit(h);
3886         if (ret != 0) {
3887                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
3888                 talloc_free(tmp_ctx);
3889                 return -1;
3890         }
3891
3892
3893         talloc_free(tmp_ctx);
3894         return 0;
3895 }
3896
3897 /*
3898   check if a service is bound to a port or not
3899  */
3900 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
3901 {
3902         int s, ret;
3903         unsigned v;
3904         int port;
3905         struct sockaddr_in sin;
3906
3907         if (argc != 1) {
3908                 printf("Use: ctdb chktcport <port>\n");
3909                 return EINVAL;
3910         }
3911
3912         port = atoi(argv[0]);
3913
3914         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
3915         if (s == -1) {
3916                 printf("Failed to open local socket\n");
3917                 return errno;
3918         }
3919
3920         v = fcntl(s, F_GETFL, 0);
3921         fcntl(s, F_SETFL, v | O_NONBLOCK);
3922
3923         bzero(&sin, sizeof(sin));
3924         sin.sin_family = PF_INET;
3925         sin.sin_port   = htons(port);
3926         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
3927         close(s);
3928         if (ret == -1) {
3929                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
3930                 return errno;
3931         }
3932
3933         return 0;
3934 }
3935
3936
3937
3938 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3939                              TDB_DATA data, void *private_data)
3940 {
3941         DEBUG(DEBUG_ERR,("Log data received\n"));
3942         if (data.dsize > 0) {
3943                 printf("%s", data.dptr);
3944         }
3945
3946         exit(0);
3947 }
3948
3949 /*
3950   display a list of log messages from the in memory ringbuffer
3951  */
3952 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
3953 {
3954         int ret;
3955         int32_t res;
3956         struct ctdb_get_log_addr log_addr;
3957         TDB_DATA data;
3958         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3959         char *errmsg;
3960         struct timeval tv;
3961
3962         if (argc != 1) {
3963                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3964                 talloc_free(tmp_ctx);
3965                 return -1;
3966         }
3967
3968         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3969         log_addr.srvid = getpid();
3970         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3971                 log_addr.level = get_debug_by_desc(argv[0]);
3972         } else {
3973                 log_addr.level = strtol(argv[0], NULL, 0);
3974         }
3975
3976
3977         data.dptr = (unsigned char *)&log_addr;
3978         data.dsize = sizeof(log_addr);
3979
3980         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
3981
3982         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
3983         sleep(1);
3984
3985         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
3986
3987         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
3988                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3989         if (ret != 0 || res != 0) {
3990                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
3991                 talloc_free(tmp_ctx);
3992                 return -1;
3993         }
3994
3995
3996         tv = timeval_current();
3997         /* this loop will terminate when we have received the reply */
3998         while (timeval_elapsed(&tv) < 3.0) {    
3999                 event_loop_once(ctdb->ev);
4000         }
4001
4002         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
4003
4004         talloc_free(tmp_ctx);
4005         return 0;
4006 }
4007
4008 /*
4009   clear the in memory log area
4010  */
4011 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
4012 {
4013         int ret;
4014         int32_t res;
4015         char *errmsg;
4016         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4017
4018         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
4019                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4020         if (ret != 0 || res != 0) {
4021                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
4022                 talloc_free(tmp_ctx);
4023                 return -1;
4024         }
4025
4026         talloc_free(tmp_ctx);
4027         return 0;
4028 }
4029
4030
4031 static uint32_t reloadips_finished;
4032
4033 static void reloadips_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4034                              TDB_DATA data, void *private_data)
4035 {
4036         reloadips_finished = 1;
4037 }
4038
4039 static int reloadips_all(struct ctdb_context *ctdb)
4040 {
4041         struct reloadips_all_reply rips;
4042         struct ctdb_node_map *nodemap=NULL;
4043         TDB_DATA data;
4044         uint32_t recmaster;
4045         int ret, i;
4046
4047         /* check that there are valid nodes available */
4048         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
4049                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4050                 return 1;
4051         }
4052         for (i=0; i<nodemap->num;i++) {
4053                 if (nodemap->nodes[i].flags != 0) {
4054                         DEBUG(DEBUG_ERR,("reloadips -n all  can only be used when all nodes are up and healthy. Aborting due to problem with node %d\n", i));
4055                         return 1;
4056                 }
4057         }
4058
4059
4060         rips.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4061         if (rips.pnn == -1) {
4062                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4063                 return 1;
4064         }
4065         rips.srvid = getpid();
4066
4067
4068         /* register a message port for receiveing the reply so that we
4069            can receive the reply
4070         */
4071         ctdb_client_set_message_handler(ctdb, rips.srvid, reloadips_handler, NULL);
4072
4073         if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
4074                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node\n"));
4075                 return -1;
4076         }
4077
4078
4079         data.dptr = (uint8_t *)&rips;
4080         data.dsize = sizeof(rips);
4081
4082         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_RELOAD_ALL_IPS, data);
4083         if (ret != 0) {
4084                 DEBUG(DEBUG_ERR,("Failed to send reload all ips request message to %u\n", options.pnn));
4085                 return 1;
4086         }
4087
4088         reloadips_finished = 0;
4089         while (reloadips_finished == 0) {
4090                 event_loop_once(ctdb->ev);
4091         }
4092
4093         return 0;
4094 }
4095
4096 /*
4097   reload public ips on a specific node
4098  */
4099 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4100 {
4101         int ret;
4102         int32_t res;
4103         char *errmsg;
4104         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4105
4106         if (options.pnn == CTDB_BROADCAST_ALL) {
4107                 return reloadips_all(ctdb);
4108         }
4109
4110         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4111                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4112         if (ret != 0 || res != 0) {
4113                 DEBUG(DEBUG_ERR,("Failed to reload ips\n"));
4114                 talloc_free(tmp_ctx);
4115                 return -1;
4116         }
4117
4118         talloc_free(tmp_ctx);
4119         return 0;
4120 }
4121
4122 /*
4123   display a list of the databases on a remote ctdb
4124  */
4125 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4126 {
4127         int i, ret;
4128         struct ctdb_dbid_map *dbmap=NULL;
4129
4130         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4131         if (ret != 0) {
4132                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4133                 return ret;
4134         }
4135
4136         if(options.machinereadable){
4137                 printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4138                 for(i=0;i<dbmap->num;i++){
4139                         const char *path;
4140                         const char *name;
4141                         const char *health;
4142                         bool persistent;
4143                         bool readonly;
4144                         bool sticky;
4145
4146                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4147                                             dbmap->dbs[i].dbid, ctdb, &path);
4148                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4149                                             dbmap->dbs[i].dbid, ctdb, &name);
4150                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4151                                               dbmap->dbs[i].dbid, ctdb, &health);
4152                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4153                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4154                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4155                         printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4156                                dbmap->dbs[i].dbid, name, path,
4157                                !!(persistent), !!(sticky),
4158                                !!(health), !!(readonly));
4159                 }
4160                 return 0;
4161         }
4162
4163         printf("Number of databases:%d\n", dbmap->num);
4164         for(i=0;i<dbmap->num;i++){
4165                 const char *path;
4166                 const char *name;
4167                 const char *health;
4168                 bool persistent;
4169                 bool readonly;
4170                 bool sticky;
4171
4172                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4173                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4174                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4175                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4176                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4177                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4178                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4179                        dbmap->dbs[i].dbid, name, path,
4180                        persistent?" PERSISTENT":"",
4181                        sticky?" STICKY":"",
4182                        readonly?" READONLY":"",
4183                        health?" UNHEALTHY":"");
4184         }
4185
4186         return 0;
4187 }
4188
4189 /*
4190   display the status of a database on a remote ctdb
4191  */
4192 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4193 {
4194         int i, ret;
4195         struct ctdb_dbid_map *dbmap=NULL;
4196         const char *db_name;
4197
4198         if (argc < 1) {
4199                 usage();
4200         }
4201
4202         db_name = argv[0];
4203
4204         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4205         if (ret != 0) {
4206                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4207                 return ret;
4208         }
4209
4210         for(i=0;i<dbmap->num;i++){
4211                 const char *path;
4212                 const char *name;
4213                 const char *health;
4214                 bool persistent;
4215                 bool readonly;
4216                 bool sticky;
4217
4218                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4219                 if (strcmp(name, db_name) != 0) {
4220                         continue;
4221                 }
4222
4223                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4224                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4225                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4226                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4227                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4228                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4229                        dbmap->dbs[i].dbid, name, path,
4230                        persistent?"yes":"no",
4231                        sticky?"yes":"no",
4232                        readonly?"yes":"no",
4233                        health?health:"OK");
4234                 return 0;
4235         }
4236
4237         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
4238         return 0;
4239 }
4240
4241 /*
4242   check if the local node is recmaster or not
4243   it will return 1 if this node is the recmaster and 0 if it is not
4244   or if the local ctdb daemon could not be contacted
4245  */
4246 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4247 {
4248         uint32_t mypnn, recmaster;
4249
4250         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
4251         if (mypnn == -1) {
4252                 printf("Failed to get pnn of node\n");
4253                 return 1;
4254         }
4255
4256         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
4257                 printf("Failed to get the recmaster\n");
4258                 return 1;
4259         }
4260
4261         if (recmaster != mypnn) {
4262                 printf("this node is not the recmaster\n");
4263                 return 1;
4264         }
4265
4266         printf("this node is the recmaster\n");
4267         return 0;
4268 }
4269
4270 /*
4271   ping a node
4272  */
4273 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4274 {
4275         int ret;
4276         struct timeval tv = timeval_current();
4277         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4278         if (ret == -1) {
4279                 printf("Unable to get ping response from node %u\n", options.pnn);
4280                 return -1;
4281         } else {
4282                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4283                        options.pnn, timeval_elapsed(&tv), ret);
4284         }
4285         return 0;
4286 }
4287
4288
4289 /*
4290   get a tunable
4291  */
4292 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4293 {
4294         const char *name;
4295         uint32_t value;
4296         int ret;
4297
4298         if (argc < 1) {
4299                 usage();
4300         }
4301
4302         name = argv[0];
4303         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4304         if (ret == -1) {
4305                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4306                 return -1;
4307         }
4308
4309         printf("%-23s = %u\n", name, value);
4310         return 0;
4311 }
4312
4313 /*
4314   set a tunable
4315  */
4316 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4317 {
4318         const char *name;
4319         uint32_t value;
4320         int ret;
4321
4322         if (argc < 2) {
4323                 usage();
4324         }
4325
4326         name = argv[0];
4327         value = strtoul(argv[1], NULL, 0);
4328
4329         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4330         if (ret == -1) {
4331                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4332                 return -1;
4333         }
4334         return 0;
4335 }
4336
4337 /*
4338   list all tunables
4339  */
4340 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4341 {
4342         uint32_t count;
4343         const char **list;
4344         int ret, i;
4345
4346         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4347         if (ret == -1) {
4348                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4349                 return -1;
4350         }
4351
4352         for (i=0;i<count;i++) {
4353                 control_getvar(ctdb, 1, &list[i]);
4354         }
4355
4356         talloc_free(list);
4357         
4358         return 0;
4359 }
4360
4361 /*
4362   display debug level on a node
4363  */
4364 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4365 {
4366         int ret;
4367         int32_t level;
4368
4369         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4370         if (ret != 0) {
4371                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4372                 return ret;
4373         } else {
4374                 if (options.machinereadable){
4375                         printf(":Name:Level:\n");
4376                         printf(":%s:%d:\n",get_debug_by_level(level),level);
4377                 } else {
4378                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
4379                 }
4380         }
4381         return 0;
4382 }
4383
4384 /*
4385   display reclock file of a node
4386  */
4387 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4388 {
4389         int ret;
4390         const char *reclock;
4391
4392         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4393         if (ret != 0) {
4394                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4395                 return ret;
4396         } else {
4397                 if (options.machinereadable){
4398                         if (reclock != NULL) {
4399                                 printf("%s", reclock);
4400                         }
4401                 } else {
4402                         if (reclock == NULL) {
4403                                 printf("No reclock file used.\n");
4404                         } else {
4405                                 printf("Reclock file:%s\n", reclock);
4406                         }
4407                 }
4408         }
4409         return 0;
4410 }
4411
4412 /*
4413   set the reclock file of a node
4414  */
4415 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4416 {
4417         int ret;
4418         const char *reclock;
4419
4420         if (argc == 0) {
4421                 reclock = NULL;
4422         } else if (argc == 1) {
4423                 reclock = argv[0];
4424         } else {
4425                 usage();
4426         }
4427
4428         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4429         if (ret != 0) {
4430                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4431                 return ret;
4432         }
4433         return 0;
4434 }
4435
4436 /*
4437   set the natgw state on/off
4438  */
4439 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4440 {
4441         int ret;
4442         uint32_t natgwstate;
4443
4444         if (argc == 0) {
4445                 usage();
4446         }
4447
4448         if (!strcmp(argv[0], "on")) {
4449                 natgwstate = 1;
4450         } else if (!strcmp(argv[0], "off")) {
4451                 natgwstate = 0;
4452         } else {
4453                 usage();
4454         }
4455
4456         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4457         if (ret != 0) {
4458                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4459                 return ret;
4460         }
4461
4462         return 0;
4463 }
4464
4465 /*
4466   set the lmaster role on/off
4467  */
4468 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4469 {
4470         int ret;
4471         uint32_t lmasterrole;
4472
4473         if (argc == 0) {
4474                 usage();
4475         }
4476
4477         if (!strcmp(argv[0], "on")) {
4478                 lmasterrole = 1;
4479         } else if (!strcmp(argv[0], "off")) {
4480                 lmasterrole = 0;
4481         } else {
4482                 usage();
4483         }
4484
4485         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
4486         if (ret != 0) {
4487                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
4488                 return ret;
4489         }
4490
4491         return 0;
4492 }
4493
4494 /*
4495   set the recmaster role on/off
4496  */
4497 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4498 {
4499         int ret;
4500         uint32_t recmasterrole;
4501
4502         if (argc == 0) {
4503                 usage();
4504         }
4505
4506         if (!strcmp(argv[0], "on")) {
4507                 recmasterrole = 1;
4508         } else if (!strcmp(argv[0], "off")) {
4509                 recmasterrole = 0;
4510         } else {
4511                 usage();
4512         }
4513
4514         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
4515         if (ret != 0) {
4516                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
4517                 return ret;
4518         }
4519
4520         return 0;
4521 }
4522
4523 /*
4524   set debug level on a node or all nodes
4525  */
4526 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4527 {
4528         int i, ret;
4529         int32_t level;
4530
4531         if (argc == 0) {
4532                 printf("You must specify the debug level. Valid levels are:\n");
4533                 for (i=0; debug_levels[i].description != NULL; i++) {
4534                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4535                 }
4536
4537                 return 0;
4538         }
4539
4540         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
4541                 level = get_debug_by_desc(argv[0]);
4542         } else {
4543                 level = strtol(argv[0], NULL, 0);
4544         }
4545
4546         for (i=0; debug_levels[i].description != NULL; i++) {
4547                 if (level == debug_levels[i].level) {
4548                         break;
4549                 }
4550         }
4551         if (debug_levels[i].description == NULL) {
4552                 printf("Invalid debug level, must be one of\n");
4553                 for (i=0; debug_levels[i].description != NULL; i++) {
4554                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4555                 }
4556                 return -1;
4557         }
4558
4559         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
4560         if (ret != 0) {
4561                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
4562         }
4563         return 0;
4564 }
4565
4566
4567 /*
4568   thaw a node
4569  */
4570 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
4571 {
4572         int ret;
4573         uint32_t priority;
4574         
4575         if (argc == 1) {
4576                 priority = strtol(argv[0], NULL, 0);
4577         } else {
4578                 priority = 0;
4579         }
4580         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
4581
4582         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
4583         if (ret != 0) {
4584                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
4585         }               
4586         return 0;
4587 }
4588
4589
4590 /*
4591   attach to a database
4592  */
4593 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
4594 {
4595         const char *db_name;
4596         struct ctdb_db_context *ctdb_db;
4597         bool persistent = false;
4598
4599         if (argc < 1) {
4600                 usage();
4601         }
4602         db_name = argv[0];
4603         if (argc > 2) {
4604                 usage();
4605         }
4606         if (argc == 2) {
4607                 if (strcmp(argv[1], "persistent") != 0) {
4608                         usage();
4609                 }
4610                 persistent = true;
4611         }
4612
4613         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4614         if (ctdb_db == NULL) {
4615                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4616                 return -1;
4617         }
4618
4619         return 0;
4620 }
4621
4622 /*
4623   set db priority
4624  */
4625 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4626 {
4627         struct ctdb_db_priority db_prio;
4628         int ret;
4629
4630         if (argc < 2) {
4631                 usage();
4632         }
4633
4634         db_prio.db_id    = strtoul(argv[0], NULL, 0);
4635         db_prio.priority = strtoul(argv[1], NULL, 0);
4636
4637         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
4638         if (ret != 0) {
4639                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
4640                 return -1;
4641         }
4642
4643         return 0;
4644 }
4645
4646 /*
4647   get db priority
4648  */
4649 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4650 {
4651         uint32_t db_id, priority;
4652         int ret;
4653
4654         if (argc < 1) {
4655                 usage();
4656         }
4657
4658         db_id = strtoul(argv[0], NULL, 0);
4659
4660         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
4661         if (ret != 0) {
4662                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
4663                 return -1;
4664         }
4665
4666         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
4667
4668         return 0;
4669 }
4670
4671 /*
4672   set the sticky records capability for a database
4673  */
4674 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
4675 {
4676         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4677         uint32_t db_id;
4678         struct ctdb_dbid_map *dbmap=NULL;
4679         int i, ret;
4680
4681         if (argc < 1) {
4682                 usage();
4683         }
4684
4685         if (!strncmp(argv[0], "0x", 2)) {
4686                 db_id = strtoul(argv[0] + 2, NULL, 0);
4687         } else {
4688                 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
4689                 if (ret != 0) {
4690                         DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4691                         talloc_free(tmp_ctx);
4692                         return ret;
4693                 }
4694                 for(i=0;i<dbmap->num;i++){
4695                         const char *name;
4696
4697                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
4698                         if(!strcmp(argv[0], name)){
4699                                 talloc_free(discard_const(name));
4700                                 break;
4701                         }
4702                         talloc_free(discard_const(name));
4703                 }
4704                 if (i == dbmap->num) {
4705                         DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
4706                         talloc_free(tmp_ctx);
4707                         return -1;
4708                 }
4709                 db_id = dbmap->dbs[i].dbid;
4710         }
4711
4712         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
4713         if (ret != 0) {
4714                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
4715                 talloc_free(tmp_ctx);
4716                 return -1;
4717         }
4718
4719         talloc_free(tmp_ctx);
4720         return 0;
4721 }
4722
4723 /*
4724   set the readonly capability for a database
4725  */
4726 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
4727 {
4728         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4729         uint32_t db_id;
4730         struct ctdb_dbid_map *dbmap=NULL;
4731         int i, ret;
4732
4733         if (argc < 1) {
4734                 usage();
4735         }
4736
4737         if (!strncmp(argv[0], "0x", 2)) {
4738                 db_id = strtoul(argv[0] + 2, NULL, 0);
4739         } else {
4740                 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
4741                 if (ret != 0) {
4742                         DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4743                         talloc_free(tmp_ctx);
4744                         return ret;
4745                 }
4746                 for(i=0;i<dbmap->num;i++){
4747                         const char *name;
4748
4749                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
4750                         if(!strcmp(argv[0], name)){
4751                                 talloc_free(discard_const(name));
4752                                 break;
4753                         }
4754                         talloc_free(discard_const(name));
4755                 }
4756                 if (i == dbmap->num) {
4757                         DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
4758                         talloc_free(tmp_ctx);
4759                         return -1;
4760                 }
4761                 db_id = dbmap->dbs[i].dbid;
4762         }
4763
4764         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
4765         if (ret != 0) {
4766                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
4767                 talloc_free(tmp_ctx);
4768                 return -1;
4769         }
4770
4771         talloc_free(tmp_ctx);
4772         return 0;
4773 }
4774
4775 /*
4776   get db seqnum
4777  */
4778 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
4779 {
4780         bool ret;
4781         uint32_t db_id;
4782         uint64_t seqnum;
4783
4784         if (argc < 1) {
4785                 usage();
4786         }
4787
4788         db_id = strtoul(argv[0], NULL, 0);
4789
4790         ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &seqnum);
4791         if (!ret) {
4792                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
4793                 return -1;
4794         }
4795
4796         printf("Sequence number:%lld\n", (long long)seqnum);
4797
4798         return 0;
4799 }
4800
4801 /*
4802   run an eventscript on a node
4803  */
4804 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
4805 {
4806         TDB_DATA data;
4807         int ret;
4808         int32_t res;
4809         char *errmsg;
4810         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4811
4812         if (argc != 1) {
4813                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4814                 return -1;
4815         }
4816
4817         data.dptr = (unsigned char *)discard_const(argv[0]);
4818         data.dsize = strlen((char *)data.dptr) + 1;
4819
4820         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
4821
4822         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
4823                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4824         if (ret != 0 || res != 0) {
4825                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
4826                 talloc_free(tmp_ctx);
4827                 return -1;
4828         }
4829         talloc_free(tmp_ctx);
4830         return 0;
4831 }
4832
4833 #define DB_VERSION 1
4834 #define MAX_DB_NAME 64
4835 struct db_file_header {
4836         unsigned long version;
4837         time_t timestamp;
4838         unsigned long persistent;
4839         unsigned long size;
4840         const char name[MAX_DB_NAME];
4841 };
4842
4843 struct backup_data {
4844         struct ctdb_marshall_buffer *records;
4845         uint32_t len;
4846         uint32_t total;
4847         bool traverse_error;
4848 };
4849
4850 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
4851 {
4852         struct backup_data *bd = talloc_get_type(private, struct backup_data);
4853         struct ctdb_rec_data *rec;
4854
4855         /* add the record */
4856         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
4857         if (rec == NULL) {
4858                 bd->traverse_error = true;
4859                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
4860                 return -1;
4861         }
4862         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
4863         if (bd->records == NULL) {
4864                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
4865                 bd->traverse_error = true;
4866                 return -1;
4867         }
4868         bd->records->count++;
4869         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
4870         bd->len += rec->length;
4871         talloc_free(rec);
4872
4873         bd->total++;
4874         return 0;
4875 }
4876
4877 /*
4878  * backup a database to a file 
4879  */
4880 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
4881 {
4882         int i, ret;
4883         struct ctdb_dbid_map *dbmap=NULL;
4884         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4885         struct db_file_header dbhdr;
4886         struct ctdb_db_context *ctdb_db;
4887         struct backup_data *bd;
4888         int fh = -1;
4889         int status = -1;
4890         const char *reason = NULL;
4891
4892         if (argc != 2) {
4893                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4894                 return -1;
4895         }
4896
4897         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
4898         if (ret != 0) {
4899                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4900                 return ret;
4901         }
4902
4903         for(i=0;i<dbmap->num;i++){
4904                 const char *name;
4905
4906                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
4907                 if(!strcmp(argv[0], name)){
4908                         talloc_free(discard_const(name));
4909                         break;
4910                 }
4911                 talloc_free(discard_const(name));
4912         }
4913         if (i == dbmap->num) {
4914                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
4915                 talloc_free(tmp_ctx);
4916                 return -1;
4917         }
4918
4919         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4920                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
4921         if (ret != 0) {
4922                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
4923                                  argv[0]));
4924                 talloc_free(tmp_ctx);
4925                 return -1;
4926         }
4927         if (reason) {
4928                 uint32_t allow_unhealthy = 0;
4929
4930                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
4931                                       "AllowUnhealthyDBRead",
4932                                       &allow_unhealthy);
4933
4934                 if (allow_unhealthy != 1) {
4935                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
4936                                          argv[0], reason));
4937
4938                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
4939                                          allow_unhealthy));
4940                         talloc_free(tmp_ctx);
4941                         return -1;
4942                 }
4943
4944                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
4945                                      argv[0], argv[0]));
4946                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
4947                                      "tunnable AllowUnhealthyDBRead = %u\n",
4948                                      allow_unhealthy));
4949         }
4950
4951         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
4952         if (ctdb_db == NULL) {
4953                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
4954                 talloc_free(tmp_ctx);
4955                 return -1;
4956         }
4957
4958
4959         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
4960         if (ret == -1) {
4961                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
4962                 talloc_free(tmp_ctx);
4963                 return -1;
4964         }
4965
4966
4967         bd = talloc_zero(tmp_ctx, struct backup_data);
4968         if (bd == NULL) {
4969                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
4970                 talloc_free(tmp_ctx);
4971                 return -1;
4972         }
4973
4974         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
4975         if (bd->records == NULL) {
4976                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
4977                 talloc_free(tmp_ctx);
4978                 return -1;
4979         }
4980
4981         bd->len = offsetof(struct ctdb_marshall_buffer, data);
4982         bd->records->db_id = ctdb_db->db_id;
4983         /* traverse the database collecting all records */
4984         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
4985             bd->traverse_error) {
4986                 DEBUG(DEBUG_ERR,("Traverse error\n"));
4987                 talloc_free(tmp_ctx);
4988                 return -1;              
4989         }
4990
4991         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
4992
4993
4994         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
4995         if (fh == -1) {
4996                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
4997                 talloc_free(tmp_ctx);
4998                 return -1;
4999         }
5000
5001         dbhdr.version = DB_VERSION;
5002         dbhdr.timestamp = time(NULL);
5003         dbhdr.persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
5004         dbhdr.size = bd->len;
5005         if (strlen(argv[0]) >= MAX_DB_NAME) {
5006                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5007                 goto done;
5008         }
5009         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
5010         ret = write(fh, &dbhdr, sizeof(dbhdr));
5011         if (ret == -1) {
5012                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5013                 goto done;
5014         }
5015         ret = write(fh, bd->records, bd->len);
5016         if (ret == -1) {
5017                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5018                 goto done;
5019         }
5020
5021         status = 0;
5022 done:
5023         if (fh != -1) {
5024                 ret = close(fh);
5025                 if (ret == -1) {
5026                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5027                 }
5028         }
5029
5030         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5031
5032         talloc_free(tmp_ctx);
5033         return status;
5034 }
5035
5036 /*
5037  * restore a database from a file 
5038  */
5039 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5040 {
5041         int ret;
5042         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5043         TDB_DATA outdata;
5044         TDB_DATA data;
5045         struct db_file_header dbhdr;
5046         struct ctdb_db_context *ctdb_db;
5047         struct ctdb_node_map *nodemap=NULL;
5048         struct ctdb_vnn_map *vnnmap=NULL;
5049         int i, fh;
5050         struct ctdb_control_wipe_database w;
5051         uint32_t *nodes;
5052         uint32_t generation;
5053         struct tm *tm;
5054         char tbuf[100];
5055         char *dbname;
5056
5057         if (argc < 1 || argc > 2) {
5058                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5059                 return -1;
5060         }
5061
5062         fh = open(argv[0], O_RDONLY);
5063         if (fh == -1) {
5064                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5065                 talloc_free(tmp_ctx);
5066                 return -1;
5067         }
5068
5069         read(fh, &dbhdr, sizeof(dbhdr));
5070         if (dbhdr.version != DB_VERSION) {
5071                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5072                 talloc_free(tmp_ctx);
5073                 return -1;
5074         }
5075
5076         dbname = discard_const(dbhdr.name);
5077         if (argc == 2) {
5078                 dbname = discard_const(argv[1]);
5079         }
5080
5081         outdata.dsize = dbhdr.size;
5082         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5083         if (outdata.dptr == NULL) {
5084                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5085                 close(fh);
5086                 talloc_free(tmp_ctx);
5087                 return -1;
5088         }               
5089         read(fh, outdata.dptr, outdata.dsize);
5090         close(fh);
5091
5092         tm = localtime(&dbhdr.timestamp);
5093         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5094         printf("Restoring database '%s' from backup @ %s\n",
5095                 dbname, tbuf);
5096
5097
5098         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5099         if (ctdb_db == NULL) {
5100                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5101                 talloc_free(tmp_ctx);
5102                 return -1;
5103         }
5104
5105         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5106         if (ret != 0) {
5107                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5108                 talloc_free(tmp_ctx);
5109                 return ret;
5110         }
5111
5112
5113         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5114         if (ret != 0) {
5115                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5116                 talloc_free(tmp_ctx);
5117                 return ret;
5118         }
5119
5120         /* freeze all nodes */
5121         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5122         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5123                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5124                                         nodes, i,
5125                                         TIMELIMIT(),
5126                                         false, tdb_null,
5127                                         NULL, NULL,
5128                                         NULL) != 0) {
5129                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5130                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5131                         talloc_free(tmp_ctx);
5132                         return -1;
5133                 }
5134         }
5135
5136         generation = vnnmap->generation;
5137         data.dptr = (void *)&generation;
5138         data.dsize = sizeof(generation);
5139
5140         /* start a cluster wide transaction */
5141         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5142         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5143                                         nodes, 0,
5144                                         TIMELIMIT(), false, data,
5145                                         NULL, NULL,
5146                                         NULL) != 0) {
5147                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5148                 return -1;
5149         }
5150
5151
5152         w.db_id = ctdb_db->db_id;
5153         w.transaction_id = generation;
5154
5155         data.dptr = (void *)&w;
5156         data.dsize = sizeof(w);
5157
5158         /* wipe all the remote databases. */
5159         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5160         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5161                                         nodes, 0,
5162                                         TIMELIMIT(), false, data,
5163                                         NULL, NULL,
5164                                         NULL) != 0) {
5165                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5166                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5167                 talloc_free(tmp_ctx);
5168                 return -1;
5169         }
5170         
5171         /* push the database */
5172         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5173         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5174                                         nodes, 0,
5175                                         TIMELIMIT(), false, outdata,
5176                                         NULL, NULL,
5177                                         NULL) != 0) {
5178                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5179                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5180                 talloc_free(tmp_ctx);
5181                 return -1;
5182         }
5183
5184         data.dptr = (void *)&ctdb_db->db_id;
5185         data.dsize = sizeof(ctdb_db->db_id);
5186
5187         /* mark the database as healthy */
5188         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5189         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5190                                         nodes, 0,
5191                                         TIMELIMIT(), false, data,
5192                                         NULL, NULL,
5193                                         NULL) != 0) {
5194                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5195                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5196                 talloc_free(tmp_ctx);
5197                 return -1;
5198         }
5199
5200         data.dptr = (void *)&generation;
5201         data.dsize = sizeof(generation);
5202
5203         /* commit all the changes */
5204         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5205                                         nodes, 0,
5206                                         TIMELIMIT(), false, data,
5207                                         NULL, NULL,
5208                                         NULL) != 0) {
5209                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5210                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5211                 talloc_free(tmp_ctx);
5212                 return -1;
5213         }
5214
5215
5216         /* thaw all nodes */
5217         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5218         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5219                                         nodes, 0,
5220                                         TIMELIMIT(),
5221                                         false, tdb_null,
5222                                         NULL, NULL,
5223                                         NULL) != 0) {
5224                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5225                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5226                 talloc_free(tmp_ctx);
5227                 return -1;
5228         }
5229
5230
5231         talloc_free(tmp_ctx);
5232         return 0;
5233 }
5234
5235 /*
5236  * dump a database backup from a file
5237  */
5238 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5239 {
5240         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5241         TDB_DATA outdata;
5242         struct db_file_header dbhdr;
5243         int i, fh;
5244         struct tm *tm;
5245         char tbuf[100];
5246         struct ctdb_rec_data *rec = NULL;
5247         struct ctdb_marshall_buffer *m;
5248         struct ctdb_dump_db_context c;
5249
5250         if (argc != 1) {
5251                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5252                 return -1;
5253         }
5254
5255         fh = open(argv[0], O_RDONLY);
5256         if (fh == -1) {
5257                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5258                 talloc_free(tmp_ctx);
5259                 return -1;
5260         }
5261
5262         read(fh, &dbhdr, sizeof(dbhdr));
5263         if (dbhdr.version != DB_VERSION) {
5264                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5265                 talloc_free(tmp_ctx);
5266                 return -1;
5267         }
5268
5269         outdata.dsize = dbhdr.size;
5270         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5271         if (outdata.dptr == NULL) {
5272                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5273                 close(fh);
5274                 talloc_free(tmp_ctx);
5275                 return -1;
5276         }
5277         read(fh, outdata.dptr, outdata.dsize);
5278         close(fh);
5279         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5280
5281         tm = localtime(&dbhdr.timestamp);
5282         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5283         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5284                 dbhdr.name, m->db_id, tbuf);
5285
5286         ZERO_STRUCT(c);
5287         c.f = stdout;
5288         c.printemptyrecords = (bool)options.printemptyrecords;
5289         c.printdatasize = (bool)options.printdatasize;
5290         c.printlmaster = false;
5291         c.printhash = (bool)options.printhash;
5292         c.printrecordflags = (bool)options.printrecordflags;
5293
5294         for (i=0; i < m->count; i++) {
5295                 uint32_t reqid = 0;
5296                 TDB_DATA key, data;
5297
5298                 /* we do not want the header splitted, so we pass NULL*/
5299                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5300                                               NULL, &key, &data);
5301
5302                 ctdb_dumpdb_record(ctdb, key, data, &c);
5303         }
5304
5305         printf("Dumped %d records\n", i);
5306         talloc_free(tmp_ctx);
5307         return 0;
5308 }
5309
5310 /*
5311  * wipe a database from a file
5312  */
5313 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5314                           const char **argv)
5315 {
5316         int ret;
5317         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5318         TDB_DATA data;
5319         struct ctdb_db_context *ctdb_db;
5320         struct ctdb_node_map *nodemap = NULL;
5321         struct ctdb_vnn_map *vnnmap = NULL;
5322         int i;
5323         struct ctdb_control_wipe_database w;
5324         uint32_t *nodes;
5325         uint32_t generation;
5326         struct ctdb_dbid_map *dbmap = NULL;
5327
5328         if (argc != 1) {
5329                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5330                 return -1;
5331         }
5332
5333         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5334                                  &dbmap);
5335         if (ret != 0) {
5336                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
5337                                   options.pnn));
5338                 return ret;
5339         }
5340
5341         for(i=0;i<dbmap->num;i++){
5342                 const char *name;
5343
5344                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
5345                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
5346                 if(!strcmp(argv[0], name)){
5347                         talloc_free(discard_const(name));
5348                         break;
5349                 }
5350                 talloc_free(discard_const(name));
5351         }
5352         if (i == dbmap->num) {
5353                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
5354                                   argv[0]));
5355                 talloc_free(tmp_ctx);
5356                 return -1;
5357         }
5358
5359         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5360         if (ctdb_db == NULL) {
5361                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5362                                   argv[0]));
5363                 talloc_free(tmp_ctx);
5364                 return -1;
5365         }
5366
5367         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5368                                    &nodemap);
5369         if (ret != 0) {
5370                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5371                                   options.pnn));
5372                 talloc_free(tmp_ctx);
5373                 return ret;
5374         }
5375
5376         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5377                                   &vnnmap);
5378         if (ret != 0) {
5379                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5380                                   options.pnn));
5381                 talloc_free(tmp_ctx);
5382                 return ret;
5383         }
5384
5385         /* freeze all nodes */
5386         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5387         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5388                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5389                                                 nodes, i,
5390                                                 TIMELIMIT(),
5391                                                 false, tdb_null,
5392                                                 NULL, NULL,
5393                                                 NULL);
5394                 if (ret != 0) {
5395                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5396                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5397                                              CTDB_RECOVERY_ACTIVE);
5398                         talloc_free(tmp_ctx);
5399                         return -1;
5400                 }
5401         }
5402
5403         generation = vnnmap->generation;
5404         data.dptr = (void *)&generation;
5405         data.dsize = sizeof(generation);
5406
5407         /* start a cluster wide transaction */
5408         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5409         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5410                                         nodes, 0,
5411                                         TIMELIMIT(), false, data,
5412                                         NULL, NULL,
5413                                         NULL);
5414         if (ret!= 0) {
5415                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5416                                   "transactions.\n"));
5417                 return -1;
5418         }
5419
5420         w.db_id = ctdb_db->db_id;
5421         w.transaction_id = generation;
5422
5423         data.dptr = (void *)&w;
5424         data.dsize = sizeof(w);
5425
5426         /* wipe all the remote databases. */
5427         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5428         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5429                                         nodes, 0,
5430                                         TIMELIMIT(), false, data,
5431                                         NULL, NULL,
5432                                         NULL) != 0) {
5433                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5434                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5435                 talloc_free(tmp_ctx);
5436                 return -1;
5437         }
5438
5439         data.dptr = (void *)&ctdb_db->db_id;
5440         data.dsize = sizeof(ctdb_db->db_id);
5441
5442         /* mark the database as healthy */
5443         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5444         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5445                                         nodes, 0,
5446                                         TIMELIMIT(), false, data,
5447                                         NULL, NULL,
5448                                         NULL) != 0) {
5449                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5450                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5451                 talloc_free(tmp_ctx);
5452                 return -1;
5453         }
5454
5455         data.dptr = (void *)&generation;
5456         data.dsize = sizeof(generation);
5457
5458         /* commit all the changes */
5459         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5460                                         nodes, 0,
5461                                         TIMELIMIT(), false, data,
5462                                         NULL, NULL,
5463                                         NULL) != 0) {
5464                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5465                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5466                 talloc_free(tmp_ctx);
5467                 return -1;
5468         }
5469
5470         /* thaw all nodes */
5471         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5472         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5473                                         nodes, 0,
5474                                         TIMELIMIT(),
5475                                         false, tdb_null,
5476                                         NULL, NULL,
5477                                         NULL) != 0) {
5478                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5479                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5480                 talloc_free(tmp_ctx);
5481                 return -1;
5482         }
5483
5484         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
5485
5486         talloc_free(tmp_ctx);
5487         return 0;
5488 }
5489
5490 /*
5491   dump memory usage
5492  */
5493 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5494 {
5495         TDB_DATA data;
5496         int ret;
5497         int32_t res;
5498         char *errmsg;
5499         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5500         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
5501                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
5502         if (ret != 0 || res != 0) {
5503                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
5504                 talloc_free(tmp_ctx);
5505                 return -1;
5506         }
5507         write(1, data.dptr, data.dsize);
5508         talloc_free(tmp_ctx);
5509         return 0;
5510 }
5511
5512 /*
5513   handler for memory dumps
5514 */
5515 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5516                              TDB_DATA data, void *private_data)
5517 {
5518         write(1, data.dptr, data.dsize);
5519         exit(0);
5520 }
5521
5522 /*
5523   dump memory usage on the recovery daemon
5524  */
5525 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5526 {
5527         int ret;
5528         TDB_DATA data;
5529         struct rd_memdump_reply rd;
5530
5531         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
5532         if (rd.pnn == -1) {
5533                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
5534                 return -1;
5535         }
5536         rd.srvid = getpid();
5537
5538         /* register a message port for receiveing the reply so that we
5539            can receive the reply
5540         */
5541         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
5542
5543
5544         data.dptr = (uint8_t *)&rd;
5545         data.dsize = sizeof(rd);
5546
5547         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
5548         if (ret != 0) {
5549                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5550                 return -1;
5551         }
5552
5553         /* this loop will terminate when we have received the reply */
5554         while (1) {     
5555                 event_loop_once(ctdb->ev);
5556         }
5557
5558         return 0;
5559 }
5560
5561 /*
5562   send a message to a srvid
5563  */
5564 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
5565 {
5566         unsigned long srvid;
5567         int ret;
5568         TDB_DATA data;
5569
5570         if (argc < 2) {
5571                 usage();
5572         }
5573
5574         srvid      = strtoul(argv[0], NULL, 0);
5575
5576         data.dptr = (uint8_t *)discard_const(argv[1]);
5577         data.dsize= strlen(argv[1]);
5578
5579         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
5580         if (ret != 0) {
5581                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5582                 return -1;
5583         }
5584
5585         return 0;
5586 }
5587
5588 /*
5589   handler for msglisten
5590 */
5591 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5592                              TDB_DATA data, void *private_data)
5593 {
5594         int i;
5595
5596         printf("Message received: ");
5597         for (i=0;i<data.dsize;i++) {
5598                 printf("%c", data.dptr[i]);
5599         }
5600         printf("\n");
5601 }
5602
5603 /*
5604   listen for messages on a messageport
5605  */
5606 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
5607 {
5608         uint64_t srvid;
5609
5610         srvid = getpid();
5611
5612         /* register a message port and listen for messages
5613         */
5614         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
5615         printf("Listening for messages on srvid:%d\n", (int)srvid);
5616
5617         while (1) {     
5618                 event_loop_once(ctdb->ev);
5619         }
5620
5621         return 0;
5622 }
5623
5624 /*
5625   list all nodes in the cluster
5626   we parse the nodes file directly
5627  */
5628 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
5629 {
5630         TALLOC_CTX *mem_ctx = talloc_new(NULL);
5631         struct pnn_node *pnn_nodes;
5632         struct pnn_node *pnn_node;
5633
5634         pnn_nodes = read_nodes_file(mem_ctx);
5635         if (pnn_nodes == NULL) {
5636                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
5637                 talloc_free(mem_ctx);
5638                 return -1;
5639         }
5640
5641         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
5642                 ctdb_sock_addr addr;
5643                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
5644                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
5645                         talloc_free(mem_ctx);
5646                         return -1;
5647                 }
5648                 if (options.machinereadable){
5649                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
5650                 } else {
5651                         printf("%s\n", pnn_node->addr);
5652                 }
5653         }
5654         talloc_free(mem_ctx);
5655
5656         return 0;
5657 }
5658
5659 /*
5660   reload the nodes file on the local node
5661  */
5662 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
5663 {
5664         int i, ret;
5665         int mypnn;
5666         struct ctdb_node_map *nodemap=NULL;
5667
5668         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
5669         if (mypnn == -1) {
5670                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
5671                 return -1;
5672         }
5673
5674         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
5675         if (ret != 0) {
5676                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
5677                 return ret;
5678         }
5679
5680         /* reload the nodes file on all remote nodes */
5681         for (i=0;i<nodemap->num;i++) {
5682                 if (nodemap->nodes[i].pnn == mypnn) {
5683                         continue;
5684                 }
5685                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
5686                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
5687                         nodemap->nodes[i].pnn);
5688                 if (ret != 0) {
5689                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
5690                 }
5691         }
5692
5693         /* reload the nodes file on the local node */
5694         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
5695         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
5696         if (ret != 0) {
5697                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
5698         }
5699
5700         /* initiate a recovery */
5701         control_recover(ctdb, argc, argv);
5702
5703         return 0;
5704 }
5705
5706
5707 static const struct {
5708         const char *name;
5709         int (*fn)(struct ctdb_context *, int, const char **);
5710         bool auto_all;
5711         bool without_daemon; /* can be run without daemon running ? */
5712         const char *msg;
5713         const char *args;
5714 } ctdb_commands[] = {
5715 #ifdef CTDB_VERS
5716         { "version",         control_version,           true,   false,  "show version of ctdb" },
5717 #endif
5718         { "status",          control_status,            true,   false,  "show node status" },
5719         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
5720         { "ping",            control_ping,              true,   false,  "ping all nodes" },
5721         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
5722         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
5723         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
5724         { "statistics",      control_statistics,        false,  false, "show statistics" },
5725         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
5726         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
5727         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
5728         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
5729         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
5730         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
5731         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
5732         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
5733         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
5734         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname>"},
5735         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname>"},
5736         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
5737         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
5738         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
5739         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
5740         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
5741         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
5742         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
5743         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
5744         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
5745         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
5746         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
5747         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
5748         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
5749         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
5750         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
5751         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
5752         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
5753         { "stop",            control_stop,              true,   false,  "stop a node" },
5754         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
5755         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
5756         { "unban",           control_unban,             true,   false,  "unban a node" },
5757         { "showban",         control_showban,           true,   false,  "show ban information"},
5758         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
5759         { "recover",         control_recover,           true,   false,  "force recovery" },
5760         { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
5761         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
5762         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
5763         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
5764         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
5765         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
5766         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
5767         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
5768         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
5769
5770         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
5771
5772         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
5773         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
5774         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
5775         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
5776         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
5777         { "vacuum",          ctdb_vacuum,               false,  true, "vacuum the databases of empty records", "[max_records]"},
5778         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
5779         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
5780         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
5781         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
5782         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
5783         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
5784         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
5785         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
5786         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
5787         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
5788         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
5789         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
5790         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
5791         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
5792         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
5793         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
5794         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
5795         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
5796         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
5797         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
5798         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
5799         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
5800         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
5801         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
5802         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
5803         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbid>|<name>"},
5804         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbid>|<name>"},
5805         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
5806         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
5807         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
5808         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<db> <key> [<file>]" },
5809         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
5810         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
5811         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data+header>" },
5812         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
5813         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
5814         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
5815         { "rebalancenode",     control_rebalancenode,   false,  false, "release a node by allowing it to takeover ips", "<pnn>"},
5816         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbid>" },
5817         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status" },
5818         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<db>" },
5819         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on a node" },
5820         { "ipiface",         control_ipiface,           true,   true,  "Find which interface an ip address is hsoted on", "<ip>" },
5821 };
5822
5823 /*
5824   show usage message
5825  */
5826 static void usage(void)
5827 {
5828         int i;
5829         printf(
5830 "Usage: ctdb [options] <control>\n" \
5831 "Options:\n" \
5832 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
5833 "   -Y                 generate machinereadable output\n"
5834 "   -v                 generate verbose output\n"
5835 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
5836         printf("Controls:\n");
5837         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
5838                 printf("  %-15s %-27s  %s\n", 
5839                        ctdb_commands[i].name, 
5840                        ctdb_commands[i].args?ctdb_commands[i].args:"",
5841                        ctdb_commands[i].msg);
5842         }
5843         exit(1);
5844 }
5845
5846
5847 static void ctdb_alarm(int sig)
5848 {
5849         printf("Maximum runtime exceeded - exiting\n");
5850         _exit(ERR_TIMEOUT);
5851 }
5852
5853 /*
5854   main program
5855 */
5856 int main(int argc, const char *argv[])
5857 {
5858         struct ctdb_context *ctdb;
5859         char *nodestring = NULL;
5860         struct poptOption popt_options[] = {
5861                 POPT_AUTOHELP
5862                 POPT_CTDB_CMDLINE
5863                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
5864                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
5865                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
5866                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
5867                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
5868                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
5869                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
5870                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
5871                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
5872                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
5873                 POPT_TABLEEND
5874         };
5875         int opt;
5876         const char **extra_argv;
5877         int extra_argc = 0;
5878         int ret=-1, i;
5879         poptContext pc;
5880         struct event_context *ev;
5881         const char *control;
5882         const char *socket_name;
5883
5884         setlinebuf(stdout);
5885         
5886         /* set some defaults */
5887         options.maxruntime = 0;
5888         options.timelimit = 3;
5889         options.pnn = CTDB_CURRENT_NODE;
5890
5891         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
5892
5893         while ((opt = poptGetNextOpt(pc)) != -1) {
5894                 switch (opt) {
5895                 default:
5896                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
5897                                 poptBadOption(pc, 0), poptStrerror(opt)));
5898                         exit(1);
5899                 }
5900         }
5901
5902         /* setup the remaining options for the main program to use */
5903         extra_argv = poptGetArgs(pc);
5904         if (extra_argv) {
5905                 extra_argv++;
5906                 while (extra_argv[extra_argc]) extra_argc++;
5907         }
5908
5909         if (extra_argc < 1) {
5910                 usage();
5911         }
5912
5913         if (options.maxruntime == 0) {
5914                 const char *ctdb_timeout;
5915                 ctdb_timeout = getenv("CTDB_TIMEOUT");
5916                 if (ctdb_timeout != NULL) {
5917                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
5918                 } else {
5919                         /* default timeout is 120 seconds */
5920                         options.maxruntime = 120;
5921                 }
5922         }
5923
5924         signal(SIGALRM, ctdb_alarm);
5925         alarm(options.maxruntime);
5926
5927         control = extra_argv[0];
5928
5929         ev = event_context_init(NULL);
5930         if (!ev) {
5931                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
5932                 exit(1);
5933         }
5934
5935         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
5936                 if (strcmp(control, ctdb_commands[i].name) == 0) {
5937                         break;
5938                 }
5939         }
5940
5941         if (i == ARRAY_SIZE(ctdb_commands)) {
5942                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
5943                 exit(1);
5944         }
5945
5946         if (ctdb_commands[i].without_daemon == true) {
5947                 if (nodestring != NULL) {
5948                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
5949                         exit(1);
5950                 }
5951                 close(2);
5952                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
5953         }
5954
5955         /* initialise ctdb */
5956         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
5957
5958         if (ctdb == NULL) {
5959                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
5960                 exit(1);
5961         }
5962
5963         /* initialize a libctdb connection as well */
5964         socket_name = ctdb_get_socketname(ctdb);
5965         ctdb_connection = ctdb_connect(socket_name,
5966                                        ctdb_log_file, stderr);
5967         if (ctdb_connection == NULL) {
5968                 DEBUG(DEBUG_ERR, ("Failed to connect to daemon from libctdb\n"));
5969                 exit(1);
5970         }                               
5971
5972         /* setup the node number(s) to contact */
5973         if (!parse_nodestring(ctdb, nodestring, CTDB_CURRENT_NODE, false,
5974                               &options.nodes, &options.pnn)) {
5975                 usage();
5976         }
5977
5978         if (options.pnn == CTDB_CURRENT_NODE) {
5979                 options.pnn = options.nodes[0];
5980         }
5981
5982         if (ctdb_commands[i].auto_all && 
5983             ((options.pnn == CTDB_BROADCAST_ALL) ||
5984              (options.pnn == CTDB_MULTICAST))) {
5985                 int j;
5986
5987                 ret = 0;
5988                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
5989                         options.pnn = options.nodes[j];
5990                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
5991                 }
5992         } else {
5993                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
5994         }
5995
5996         ctdb_disconnect(ctdb_connection);
5997         talloc_free(ctdb);
5998         (void)poptFreeContext(pc);
5999
6000         return ret;
6001
6002 }