tools/ctdb: Remove redundant filtering loop in control_natgwlist()
[obnox/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24 #include "system/network.h"
25 #include "system/locale.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_client.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32 #include "db_wrap.h"
33
34 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
35 #define ERR_NONODE      21      /* node does not exist */
36 #define ERR_DISNODE     22      /* node is disconnected */
37
38 struct ctdb_connection *ctdb_connection;
39
40 static void usage(void);
41
42 static struct {
43         int timelimit;
44         uint32_t pnn;
45         uint32_t *nodes;
46         int machinereadable;
47         int verbose;
48         int maxruntime;
49         int printemptyrecords;
50         int printdatasize;
51         int printlmaster;
52         int printhash;
53         int printrecordflags;
54 } options;
55
56 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
57 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
58
59 #ifdef CTDB_VERS
60 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
61 {
62 #define STR(x) #x
63 #define XSTR(x) STR(x)
64         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
65         return 0;
66 }
67 #endif
68
69 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
70                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
71                                    "Out of memory in " __location__ )); \
72                 abort();                                                \
73         }} while (0)
74
75 /* Pretty print the flags to a static buffer in human-readable format.
76  * This never returns NULL!
77  */
78 static const char *pretty_print_flags(uint32_t flags)
79 {
80         int j;
81         static const struct {
82                 uint32_t flag;
83                 const char *name;
84         } flag_names[] = {
85                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
86                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
87                 { NODE_FLAGS_BANNED,                "BANNED" },
88                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
89                 { NODE_FLAGS_DELETED,               "DELETED" },
90                 { NODE_FLAGS_STOPPED,               "STOPPED" },
91                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
92         };
93         static char flags_str[512]; /* Big enough to contain all flag names */
94
95         flags_str[0] = '\0';
96         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
97                 if (flags & flag_names[j].flag) {
98                         if (flags_str[0] == '\0') {
99                                 (void) strcpy(flags_str, flag_names[j].name);
100                         } else {
101                                 (void) strcat(flags_str, "|");
102                                 (void) strcat(flags_str, flag_names[j].name);
103                         }
104                 }
105         }
106         if (flags_str[0] == '\0') {
107                 (void) strcpy(flags_str, "OK");
108         }
109
110         return flags_str;
111 }
112
113 static int h2i(char h)
114 {
115         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
116         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
117         return h - '0';
118 }
119
120 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
121 {
122         int i, len;
123         TDB_DATA key = {NULL, 0};
124
125         len = strlen(str);
126         if (len & 0x01) {
127                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
128                 return key;
129         }
130
131         key.dsize = len>>1;
132         key.dptr  = talloc_size(mem_ctx, key.dsize);
133
134         for (i=0; i < len/2; i++) {
135                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
136         }
137         return key;
138 }
139
140 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
141  * that are disconnected or deleted.  If dd_ok is true those nodes are
142  * included in the output list of nodes.  If dd_ok is false, those
143  * nodes are filtered from the "all" case and cause an error if
144  * explicitly specified.
145  */
146 static bool parse_nodestring(struct ctdb_context *ctdb,
147                              const char * nodestring,
148                              uint32_t current_pnn,
149                              bool dd_ok,
150                              uint32_t **nodes,
151                              uint32_t *pnn_mode)
152 {
153         int n;
154         uint32_t i;
155         struct ctdb_node_map *nodemap;
156        
157         *nodes = NULL;
158
159         if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
160                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
161                 exit(10);
162         }
163
164         if (nodestring != NULL) {
165                 *nodes = talloc_array(ctdb, uint32_t, 0);
166                 CTDB_NOMEM_ABORT(*nodes);
167                
168                 n = 0;
169
170                 if (strcmp(nodestring, "all") == 0) {
171                         *pnn_mode = CTDB_BROADCAST_ALL;
172
173                         /* all */
174                         for (i = 0; i < nodemap->num; i++) {
175                                 if ((nodemap->nodes[i].flags & 
176                                      (NODE_FLAGS_DISCONNECTED |
177                                       NODE_FLAGS_DELETED)) && !dd_ok) {
178                                         continue;
179                                 }
180                                 *nodes = talloc_realloc(ctdb, *nodes,
181                                                         uint32_t, n+1);
182                                 CTDB_NOMEM_ABORT(*nodes);
183                                 (*nodes)[n] = i;
184                                 n++;
185                         }
186                 } else {
187                         /* x{,y...} */
188                         char *ns, *tok;
189                        
190                         ns = talloc_strdup(ctdb, nodestring);
191                         tok = strtok(ns, ",");
192                         while (tok != NULL) {
193                                 uint32_t pnn;
194                                 i = (uint32_t)strtoul(tok, NULL, 0);
195                                 if (i >= nodemap->num) {
196                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
197                                         exit(ERR_NONODE);
198                                 }
199                                 if ((nodemap->nodes[i].flags & 
200                                      (NODE_FLAGS_DISCONNECTED |
201                                       NODE_FLAGS_DELETED)) && !dd_ok) {
202                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
203                                         exit(ERR_DISNODE);
204                                 }
205                                 if (!ctdb_getpnn(ctdb_connection, i, &pnn)) {
206                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
207                                         exit(10);
208                                 }
209
210                                 *nodes = talloc_realloc(ctdb, *nodes,
211                                                         uint32_t, n+1);
212                                 CTDB_NOMEM_ABORT(*nodes);
213
214                                 (*nodes)[n] = i;
215                                 n++;
216
217                                 tok = strtok(NULL, ",");
218                         }
219                         talloc_free(ns);
220
221                         if (n == 1) {
222                                 *pnn_mode = (*nodes)[0];
223                         } else {
224                                 *pnn_mode = CTDB_MULTICAST;
225                         }
226                 }
227         } else {
228                 /* default - no nodes specified */
229                 *nodes = talloc_array(ctdb, uint32_t, 1);
230                 CTDB_NOMEM_ABORT(*nodes);
231                 *pnn_mode = CTDB_CURRENT_NODE;
232
233                 if (!ctdb_getpnn(ctdb_connection, current_pnn,
234                                  &((*nodes)[0]))) {
235                         return false;
236                 }
237         }
238
239         ctdb_free_nodemap(nodemap);
240
241         return true;
242 }
243
244 /*
245  check if a database exists
246 */
247 static int db_exists(struct ctdb_context *ctdb, const char *db_name, bool *persistent)
248 {
249         int i, ret;
250         struct ctdb_dbid_map *dbmap=NULL;
251
252         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
253         if (ret != 0) {
254                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
255                 return -1;
256         }
257
258         for(i=0;i<dbmap->num;i++){
259                 const char *name;
260
261                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
262                 if (!strcmp(name, db_name)) {
263                         if (persistent) {
264                                 *persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
265                         }
266                         return 0;
267                 }
268         }
269
270         return -1;
271 }
272
273 /*
274   see if a process exists
275  */
276 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
277 {
278         uint32_t pnn, pid;
279         int ret;
280         if (argc < 1) {
281                 usage();
282         }
283
284         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
285                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
286                 return -1;
287         }
288
289         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
290         if (ret == 0) {
291                 printf("%u:%u exists\n", pnn, pid);
292         } else {
293                 printf("%u:%u does not exist\n", pnn, pid);
294         }
295         return ret;
296 }
297
298 /*
299   display statistics structure
300  */
301 static void show_statistics(struct ctdb_statistics *s, int show_header)
302 {
303         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
304         int i;
305         const char *prefix=NULL;
306         int preflen=0;
307         int tmp, days, hours, minutes, seconds;
308         const struct {
309                 const char *name;
310                 uint32_t offset;
311         } fields[] = {
312 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
313                 STATISTICS_FIELD(num_clients),
314                 STATISTICS_FIELD(frozen),
315                 STATISTICS_FIELD(recovering),
316                 STATISTICS_FIELD(num_recoveries),
317                 STATISTICS_FIELD(client_packets_sent),
318                 STATISTICS_FIELD(client_packets_recv),
319                 STATISTICS_FIELD(node_packets_sent),
320                 STATISTICS_FIELD(node_packets_recv),
321                 STATISTICS_FIELD(keepalive_packets_sent),
322                 STATISTICS_FIELD(keepalive_packets_recv),
323                 STATISTICS_FIELD(node.req_call),
324                 STATISTICS_FIELD(node.reply_call),
325                 STATISTICS_FIELD(node.req_dmaster),
326                 STATISTICS_FIELD(node.reply_dmaster),
327                 STATISTICS_FIELD(node.reply_error),
328                 STATISTICS_FIELD(node.req_message),
329                 STATISTICS_FIELD(node.req_control),
330                 STATISTICS_FIELD(node.reply_control),
331                 STATISTICS_FIELD(client.req_call),
332                 STATISTICS_FIELD(client.req_message),
333                 STATISTICS_FIELD(client.req_control),
334                 STATISTICS_FIELD(timeouts.call),
335                 STATISTICS_FIELD(timeouts.control),
336                 STATISTICS_FIELD(timeouts.traverse),
337                 STATISTICS_FIELD(total_calls),
338                 STATISTICS_FIELD(pending_calls),
339                 STATISTICS_FIELD(lockwait_calls),
340                 STATISTICS_FIELD(pending_lockwait_calls),
341                 STATISTICS_FIELD(childwrite_calls),
342                 STATISTICS_FIELD(pending_childwrite_calls),
343                 STATISTICS_FIELD(memory_used),
344                 STATISTICS_FIELD(max_hop_count),
345                 STATISTICS_FIELD(total_ro_delegations),
346                 STATISTICS_FIELD(total_ro_revokes),
347         };
348         
349         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
350         seconds = tmp%60;
351         tmp    /= 60;
352         minutes = tmp%60;
353         tmp    /= 60;
354         hours   = tmp%24;
355         tmp    /= 24;
356         days    = tmp;
357
358         if (options.machinereadable){
359                 if (show_header) {
360                         printf("CTDB version:");
361                         printf("Current time of statistics:");
362                         printf("Statistics collected since:");
363                         for (i=0;i<ARRAY_SIZE(fields);i++) {
364                                 printf("%s:", fields[i].name);
365                         }
366                         printf("num_reclock_ctdbd_latency:");
367                         printf("min_reclock_ctdbd_latency:");
368                         printf("avg_reclock_ctdbd_latency:");
369                         printf("max_reclock_ctdbd_latency:");
370
371                         printf("num_reclock_recd_latency:");
372                         printf("min_reclock_recd_latency:");
373                         printf("avg_reclock_recd_latency:");
374                         printf("max_reclock_recd_latency:");
375
376                         printf("num_call_latency:");
377                         printf("min_call_latency:");
378                         printf("avg_call_latency:");
379                         printf("max_call_latency:");
380
381                         printf("num_lockwait_latency:");
382                         printf("min_lockwait_latency:");
383                         printf("avg_lockwait_latency:");
384                         printf("max_lockwait_latency:");
385
386                         printf("num_childwrite_latency:");
387                         printf("min_childwrite_latency:");
388                         printf("avg_childwrite_latency:");
389                         printf("max_childwrite_latency:");
390                         printf("\n");
391                 }
392                 printf("%d:", CTDB_VERSION);
393                 printf("%d:", (int)s->statistics_current_time.tv_sec);
394                 printf("%d:", (int)s->statistics_start_time.tv_sec);
395                 for (i=0;i<ARRAY_SIZE(fields);i++) {
396                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
397                 }
398                 printf("%d:", s->reclock.ctdbd.num);
399                 printf("%.6f:", s->reclock.ctdbd.min);
400                 printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
401                 printf("%.6f:", s->reclock.ctdbd.max);
402
403                 printf("%d:", s->reclock.recd.num);
404                 printf("%.6f:", s->reclock.recd.min);
405                 printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
406                 printf("%.6f:", s->reclock.recd.max);
407
408                 printf("%d:", s->call_latency.num);
409                 printf("%.6f:", s->call_latency.min);
410                 printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
411                 printf("%.6f:", s->call_latency.max);
412
413                 printf("%d:", s->lockwait_latency.num);
414                 printf("%.6f:", s->lockwait_latency.min);
415                 printf("%.6f:", s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0);
416                 printf("%.6f:", s->lockwait_latency.max);
417
418                 printf("%d:", s->childwrite_latency.num);
419                 printf("%.6f:", s->childwrite_latency.min);
420                 printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
421                 printf("%.6f:", s->childwrite_latency.max);
422                 printf("\n");
423         } else {
424                 printf("CTDB version %u\n", CTDB_VERSION);
425                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
426                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
427
428                 for (i=0;i<ARRAY_SIZE(fields);i++) {
429                         if (strchr(fields[i].name, '.')) {
430                                 preflen = strcspn(fields[i].name, ".")+1;
431                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
432                                         prefix = fields[i].name;
433                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
434                                 }
435                         } else {
436                                 preflen = 0;
437                         }
438                         printf(" %*s%-22s%*s%10u\n", 
439                                preflen?4:0, "",
440                                fields[i].name+preflen, 
441                                preflen?0:4, "",
442                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
443                 }
444                 printf(" hop_count_buckets:");
445                 for (i=0;i<MAX_COUNT_BUCKETS;i++) {
446                         printf(" %d", s->hop_count_bucket[i]);
447                 }
448                 printf("\n");
449                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd       MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
450
451                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
452
453                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
454                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "lockwait_latency   MIN/AVG/MAX", s->lockwait_latency.min, s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0, s->lockwait_latency.max, s->lockwait_latency.num);
455                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
456         }
457
458         talloc_free(tmp_ctx);
459 }
460
461 /*
462   display remote ctdb statistics combined from all nodes
463  */
464 static int control_statistics_all(struct ctdb_context *ctdb)
465 {
466         int ret, i;
467         struct ctdb_statistics statistics;
468         uint32_t *nodes;
469         uint32_t num_nodes;
470
471         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
472         CTDB_NO_MEMORY(ctdb, nodes);
473         
474         ZERO_STRUCT(statistics);
475
476         for (i=0;i<num_nodes;i++) {
477                 struct ctdb_statistics s1;
478                 int j;
479                 uint32_t *v1 = (uint32_t *)&s1;
480                 uint32_t *v2 = (uint32_t *)&statistics;
481                 uint32_t num_ints = 
482                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
483                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
484                 if (ret != 0) {
485                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
486                         return ret;
487                 }
488                 for (j=0;j<num_ints;j++) {
489                         v2[j] += v1[j];
490                 }
491                 statistics.max_hop_count = 
492                         MAX(statistics.max_hop_count, s1.max_hop_count);
493                 statistics.call_latency.max = 
494                         MAX(statistics.call_latency.max, s1.call_latency.max);
495                 statistics.lockwait_latency.max = 
496                         MAX(statistics.lockwait_latency.max, s1.lockwait_latency.max);
497         }
498         talloc_free(nodes);
499         printf("Gathered statistics for %u nodes\n", num_nodes);
500         show_statistics(&statistics, 1);
501         return 0;
502 }
503
504 /*
505   display remote ctdb statistics
506  */
507 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
508 {
509         int ret;
510         struct ctdb_statistics statistics;
511
512         if (options.pnn == CTDB_BROADCAST_ALL) {
513                 return control_statistics_all(ctdb);
514         }
515
516         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
517         if (ret != 0) {
518                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
519                 return ret;
520         }
521         show_statistics(&statistics, 1);
522         return 0;
523 }
524
525
526 /*
527   reset remote ctdb statistics
528  */
529 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
530 {
531         int ret;
532
533         ret = ctdb_statistics_reset(ctdb, options.pnn);
534         if (ret != 0) {
535                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
536                 return ret;
537         }
538         return 0;
539 }
540
541
542 /*
543   display remote ctdb rolling statistics
544  */
545 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
546 {
547         int ret;
548         struct ctdb_statistics_wire *stats;
549         int i, num_records = -1;
550
551         if (argc ==1) {
552                 num_records = atoi(argv[0]) - 1;
553         }
554
555         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
556         if (ret != 0) {
557                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
558                 return ret;
559         }
560         for (i=0;i<stats->num;i++) {
561                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
562                         continue;
563                 }
564                 show_statistics(&stats->stats[i], i==0);
565                 if (i == num_records) {
566                         break;
567                 }
568         }
569         return 0;
570 }
571
572
573 /*
574   display remote ctdb db statistics
575  */
576 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
577 {
578         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
579         struct ctdb_db_statistics *dbstatistics;
580         struct ctdb_dbid_map *dbmap=NULL;
581         int i, ret;
582
583         if (argc < 1) {
584                 usage();
585         }
586
587         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
588         if (ret != 0) {
589                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
590                 return ret;
591         }
592         for(i=0;i<dbmap->num;i++){
593                 const char *name;
594
595                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
596                 if(!strcmp(argv[0], name)){
597                         talloc_free(discard_const(name));
598                         break;
599                 }
600                 talloc_free(discard_const(name));
601         }
602         if (i == dbmap->num) {
603                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
604                 talloc_free(tmp_ctx);
605                 return -1;
606         }
607
608         if (!ctdb_getdbstat(ctdb_connection, options.pnn, dbmap->dbs[i].dbid, &dbstatistics)) {
609                 DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
610                 talloc_free(tmp_ctx);
611                 return -1;
612         }
613
614         printf("DB Statistics:\n");
615         printf("RO Delegations: %d\n", dbstatistics->db_ro_delegations);
616         printf("RO Revokes:     %d\n", dbstatistics->db_ro_revokes);
617         printf(" hop_count_buckets:");
618         for (i=0;i<MAX_COUNT_BUCKETS;i++) {
619                 printf(" %d", dbstatistics->hop_count_bucket[i]);
620         }
621         printf("\n");
622         printf("Num Hot Keys:     %d\n", dbstatistics->num_hot_keys);
623         for (i = 0; i < dbstatistics->num_hot_keys; i++) {
624                 int j;
625                 printf("Count:%d Key:", dbstatistics->hot_keys[i].count);
626                 for (j = 0; j < dbstatistics->hot_keys[i].key.dsize; j++) {
627                         printf("%02x", dbstatistics->hot_keys[i].key.dptr[j]&0xff);
628                 }
629                 printf("\n");
630         }
631
632         ctdb_free_dbstat(dbstatistics);
633         return 0;
634 }
635
636 /*
637   display uptime of remote node
638  */
639 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
640 {
641         int ret;
642         struct ctdb_uptime *uptime = NULL;
643         int tmp, days, hours, minutes, seconds;
644
645         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
646         if (ret != 0) {
647                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
648                 return ret;
649         }
650
651         if (options.machinereadable){
652                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
653                 printf(":%u:%u:%u:%lf\n",
654                         (unsigned int)uptime->current_time.tv_sec,
655                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
656                         (unsigned int)uptime->last_recovery_finished.tv_sec,
657                         timeval_delta(&uptime->last_recovery_finished,
658                                       &uptime->last_recovery_started)
659                 );
660                 return 0;
661         }
662
663         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
664
665         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
666         seconds = tmp%60;
667         tmp    /= 60;
668         minutes = tmp%60;
669         tmp    /= 60;
670         hours   = tmp%24;
671         tmp    /= 24;
672         days    = tmp;
673         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
674
675         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
676         seconds = tmp%60;
677         tmp    /= 60;
678         minutes = tmp%60;
679         tmp    /= 60;
680         hours   = tmp%24;
681         tmp    /= 24;
682         days    = tmp;
683         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
684         
685         printf("Duration of last recovery/failover: %lf seconds\n",
686                 timeval_delta(&uptime->last_recovery_finished,
687                               &uptime->last_recovery_started));
688
689         return 0;
690 }
691
692 /*
693   show the PNN of the current node
694  */
695 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
696 {
697         uint32_t mypnn;
698         bool ret;
699
700         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
701         if (!ret) {
702                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
703                 return -1;
704         }
705
706         printf("PNN:%d\n", mypnn);
707         return 0;
708 }
709
710
711 struct pnn_node {
712         struct pnn_node *next;
713         const char *addr;
714         int pnn;
715 };
716
717 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
718 {
719         const char *nodes_list;
720         int nlines;
721         char **lines;
722         int i, pnn;
723         struct pnn_node *pnn_nodes = NULL;
724         struct pnn_node *pnn_node;
725         struct pnn_node *tmp_node;
726
727         /* read the nodes file */
728         nodes_list = getenv("CTDB_NODES");
729         if (nodes_list == NULL) {
730                 nodes_list = "/etc/ctdb/nodes";
731         }
732         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
733         if (lines == NULL) {
734                 return NULL;
735         }
736         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
737                 nlines--;
738         }
739         for (i=0, pnn=0; i<nlines; i++) {
740                 char *node;
741
742                 node = lines[i];
743                 /* strip leading spaces */
744                 while((*node == ' ') || (*node == '\t')) {
745                         node++;
746                 }
747                 if (*node == '#') {
748                         pnn++;
749                         continue;
750                 }
751                 if (strcmp(node, "") == 0) {
752                         continue;
753                 }
754                 pnn_node = talloc(mem_ctx, struct pnn_node);
755                 pnn_node->pnn = pnn++;
756                 pnn_node->addr = talloc_strdup(pnn_node, node);
757                 pnn_node->next = pnn_nodes;
758                 pnn_nodes = pnn_node;
759         }
760
761         /* swap them around so we return them in incrementing order */
762         pnn_node = pnn_nodes;
763         pnn_nodes = NULL;
764         while (pnn_node) {
765                 tmp_node = pnn_node;
766                 pnn_node = pnn_node->next;
767
768                 tmp_node->next = pnn_nodes;
769                 pnn_nodes = tmp_node;
770         }
771
772         return pnn_nodes;
773 }
774
775 /*
776   show the PNN of the current node
777   discover the pnn by loading the nodes file and try to bind to all
778   addresses one at a time until the ip address is found.
779  */
780 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
781 {
782         TALLOC_CTX *mem_ctx = talloc_new(NULL);
783         struct pnn_node *pnn_nodes;
784         struct pnn_node *pnn_node;
785
786         pnn_nodes = read_nodes_file(mem_ctx);
787         if (pnn_nodes == NULL) {
788                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
789                 talloc_free(mem_ctx);
790                 return -1;
791         }
792
793         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
794                 ctdb_sock_addr addr;
795
796                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
797                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
798                         talloc_free(mem_ctx);
799                         return -1;
800                 }
801
802                 if (ctdb_sys_have_ip(&addr)) {
803                         printf("PNN:%d\n", pnn_node->pnn);
804                         talloc_free(mem_ctx);
805                         return 0;
806                 }
807         }
808
809         printf("Failed to detect which PNN this node is\n");
810         talloc_free(mem_ctx);
811         return -1;
812 }
813
814 /* Helpers for ctdb status
815  */
816 static bool is_partially_online(struct ctdb_node_and_flags *node)
817 {
818         int j;
819         bool ret = false;
820
821         if (node->flags == 0) {
822                 struct ctdb_ifaces_list *ifaces;
823
824                 if (ctdb_getifaces(ctdb_connection, node->pnn, &ifaces)) {
825                         for (j=0; j < ifaces->num; j++) {
826                                 if (ifaces->ifaces[j].link_state != 0) {
827                                         continue;
828                                 }
829                                 ret = true;
830                                 break;
831                         }
832                         ctdb_free_ifaces(ifaces);
833                 }
834         }
835
836         return ret;
837 }
838
839 static void control_status_header_machine(void)
840 {
841         printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
842                ":Inactive:PartiallyOnline:ThisNode:\n");
843 }
844
845 static int control_status_1_machine(int mypnn, struct ctdb_node_and_flags *node)
846 {
847         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
848                ctdb_addr_to_str(&node->addr),
849                !!(node->flags&NODE_FLAGS_DISCONNECTED),
850                !!(node->flags&NODE_FLAGS_BANNED),
851                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
852                !!(node->flags&NODE_FLAGS_UNHEALTHY),
853                !!(node->flags&NODE_FLAGS_STOPPED),
854                !!(node->flags&NODE_FLAGS_INACTIVE),
855                is_partially_online(node) ? 1 : 0,
856                (node->pnn == mypnn)?'Y':'N');
857
858         return node->flags;
859 }
860
861 static int control_status_1_human(int mypnn, struct ctdb_node_and_flags *node)
862 {
863        printf("pnn:%d %-16s %s%s\n", node->pnn,
864               ctdb_addr_to_str(&node->addr),
865               is_partially_online(node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
866               node->pnn == mypnn?" (THIS NODE)":"");
867
868        return node->flags;
869 }
870
871 /*
872   display remote ctdb status
873  */
874 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
875 {
876         int i;
877         struct ctdb_vnn_map *vnnmap=NULL;
878         struct ctdb_node_map *nodemap=NULL;
879         uint32_t recmode, recmaster, mypnn;
880
881         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
882                 return -1;
883         }
884
885         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
886                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
887                 return -1;
888         }
889
890         if (options.machinereadable) {
891                 control_status_header_machine();
892                 for (i=0;i<nodemap->num;i++) {
893                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
894                                 continue;
895                         }
896                         (void) control_status_1_machine(mypnn,
897                                                         &nodemap->nodes[i]);
898                 }
899                 return 0;
900         }
901
902         printf("Number of nodes:%d\n", nodemap->num);
903         for(i=0;i<nodemap->num;i++){
904                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
905                         continue;
906                 }
907                 (void) control_status_1_human(mypnn, &nodemap->nodes[i]);
908         }
909
910         if (!ctdb_getvnnmap(ctdb_connection, options.pnn, &vnnmap)) {
911                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
912                 return -1;
913         }
914         if (vnnmap->generation == INVALID_GENERATION) {
915                 printf("Generation:INVALID\n");
916         } else {
917                 printf("Generation:%d\n",vnnmap->generation);
918         }
919         printf("Size:%d\n",vnnmap->size);
920         for(i=0;i<vnnmap->size;i++){
921                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
922         }
923         ctdb_free_vnnmap(vnnmap);
924
925         if (!ctdb_getrecmode(ctdb_connection, options.pnn, &recmode)) {
926                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
927                 return -1;
928         }
929         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
930
931         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
932                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
933                 return -1;
934         }
935         printf("Recovery master:%d\n",recmaster);
936
937         return 0;
938 }
939
940 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
941 {
942         int i, ret;
943         struct ctdb_node_map *nodemap=NULL;
944         uint32_t * nodes;
945         uint32_t pnn_mode, mypnn;
946
947         if (argc > 1) {
948                 usage();
949         }
950
951         if (!parse_nodestring(ctdb, argc == 1 ? argv[0] : NULL,
952                               options.pnn, true, &nodes, &pnn_mode)) {
953                 return -1;
954         }
955
956         if (options.machinereadable) {
957                 control_status_header_machine();
958         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
959                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
960         }
961
962         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
963                 DEBUG(DEBUG_ERR, ("Unable to get PNN from local node\n"));
964                 return -1;
965         }
966
967         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
968                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
969                 return -1;
970         }
971
972         ret = 0;
973
974         for (i = 0; i < talloc_array_length(nodes); i++) {
975                 if (options.machinereadable) {
976                         ret |= control_status_1_machine(mypnn,
977                                                         &nodemap->nodes[nodes[i]]);
978                 } else {
979                         ret |= control_status_1_human(mypnn,
980                                                       &nodemap->nodes[nodes[i]]);
981                 }
982         }
983         return ret;
984 }
985
986 struct natgw_node {
987         struct natgw_node *next;
988         const char *addr;
989 };
990
991 /*
992   display the list of nodes belonging to this natgw configuration
993  */
994 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
995 {
996         int i, ret;
997         uint32_t capabilities;
998         const char *natgw_list;
999         int nlines;
1000         char **lines;
1001         struct natgw_node *natgw_nodes = NULL;
1002         struct natgw_node *natgw_node;
1003         struct ctdb_node_map *nodemap=NULL;
1004         uint32_t mypnn;
1005         const char *fmt;
1006
1007
1008         /* read the natgw nodes file into a linked list */
1009         natgw_list = getenv("CTDB_NATGW_NODES");
1010         if (natgw_list == NULL) {
1011                 natgw_list = "/etc/ctdb/natgw_nodes";
1012         }
1013         lines = file_lines_load(natgw_list, &nlines, ctdb);
1014         if (lines == NULL) {
1015                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
1016                 return -1;
1017         }
1018         for (i=0;i<nlines;i++) {
1019                 char *node;
1020
1021                 node = lines[i];
1022                 /* strip leading spaces */
1023                 while((*node == ' ') || (*node == '\t')) {
1024                         node++;
1025                 }
1026                 if (*node == '#') {
1027                         continue;
1028                 }
1029                 if (strcmp(node, "") == 0) {
1030                         continue;
1031                 }
1032                 natgw_node = talloc(ctdb, struct natgw_node);
1033                 natgw_node->addr = talloc_strdup(natgw_node, node);
1034                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
1035                 natgw_node->next = natgw_nodes;
1036                 natgw_nodes = natgw_node;
1037         }
1038
1039         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
1040         if (ret != 0) {
1041                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
1042                 return ret;
1043         }
1044
1045         i=0;
1046         while(i<nodemap->num) {
1047                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
1048                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
1049                                 break;
1050                         }
1051                 }
1052
1053                 /* this node was not in the natgw so we just remove it from
1054                  * the list
1055                  */
1056                 if ((natgw_node == NULL) 
1057                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
1058                         int j;
1059
1060                         for (j=i+1; j<nodemap->num; j++) {
1061                                 nodemap->nodes[j-1] = nodemap->nodes[j];
1062                         }
1063                         nodemap->num--;
1064                         continue;
1065                 }
1066
1067                 i++;
1068         }
1069
1070         if (options.machinereadable) {
1071                 printf(":Node:IP:\n");
1072                 fmt = ":%d:%s:\n";
1073         } else {
1074                 fmt = "%d %s\n";
1075         }
1076
1077         /* pick a node to be natgwmaster
1078          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
1079          */
1080         for(i=0;i<nodemap->num;i++){
1081                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
1082                         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
1083                         if (ret != 0) {
1084                                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1085                                 return ret;
1086                         }
1087                         if (!(capabilities&CTDB_CAP_NATGW)) {
1088                                 continue;
1089                         }
1090                         printf(fmt, nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
1091                         break;
1092                 }
1093         }
1094         /* we couldnt find any healthy node, try unhealthy ones */
1095         if (i == nodemap->num) {
1096                 for(i=0;i<nodemap->num;i++){
1097                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
1098                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
1099                                 if (ret != 0) {
1100                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1101                                         return ret;
1102                                 }
1103                                 if (!(capabilities&CTDB_CAP_NATGW)) {
1104                                         continue;
1105                                 }
1106                                 printf(fmt, nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
1107                                 break;
1108                         }
1109                 }
1110         }
1111         /* unless all nodes are STOPPED, when we pick one anyway */
1112         if (i == nodemap->num) {
1113                 for(i=0;i<nodemap->num;i++){
1114                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
1115                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
1116                                 if (ret != 0) {
1117                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1118                                         return ret;
1119                                 }
1120                                 if (!(capabilities&CTDB_CAP_NATGW)) {
1121                                         continue;
1122                                 }
1123                                 printf(fmt, nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
1124                                 break;
1125                         }
1126                 }
1127                 /* or if we still can not find any */
1128                 if (i == nodemap->num) {
1129                         printf(fmt, -1, "0.0.0.0");
1130                         ret = 2; /* matches ENOENT */
1131                 }
1132         }
1133
1134         /* print the pruned list of nodes belonging to this natgw list */
1135         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
1136                 DEBUG(DEBUG_ERR, ("Unable to get PNN from node %u\n", options.pnn));
1137                 return -1;
1138         }
1139         if (options.machinereadable) {
1140                 control_status_header_machine();
1141         } else {
1142                 printf("Number of nodes:%d\n", nodemap->num);
1143         }
1144         for(i=0;i<nodemap->num;i++){
1145                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1146                         continue;
1147                 }
1148                 if (options.machinereadable) {
1149                         control_status_1_machine(mypnn, &(nodemap->nodes[i]));
1150                 } else {
1151                         control_status_1_human(mypnn, &(nodemap->nodes[i]));
1152                 }
1153         }
1154
1155         return ret;
1156 }
1157
1158 /*
1159   display the status of the scripts for monitoring (or other events)
1160  */
1161 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1162                                     enum ctdb_eventscript_call type)
1163 {
1164         struct ctdb_scripts_wire *script_status;
1165         int ret, i;
1166
1167         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1168         if (ret != 0) {
1169                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1170                 return ret;
1171         }
1172
1173         if (script_status == NULL) {
1174                 if (!options.machinereadable) {
1175                         printf("%s cycle never run\n",
1176                                ctdb_eventscript_call_names[type]);
1177                 }
1178                 return 0;
1179         }
1180
1181         if (!options.machinereadable) {
1182                 printf("%d scripts were executed last %s cycle\n",
1183                        script_status->num_scripts,
1184                        ctdb_eventscript_call_names[type]);
1185         }
1186         for (i=0; i<script_status->num_scripts; i++) {
1187                 const char *status = NULL;
1188
1189                 switch (script_status->scripts[i].status) {
1190                 case -ETIME:
1191                         status = "TIMEDOUT";
1192                         break;
1193                 case -ENOEXEC:
1194                         status = "DISABLED";
1195                         break;
1196                 case 0:
1197                         status = "OK";
1198                         break;
1199                 default:
1200                         if (script_status->scripts[i].status > 0)
1201                                 status = "ERROR";
1202                         break;
1203                 }
1204                 if (options.machinereadable) {
1205                         printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1206                                ctdb_eventscript_call_names[type],
1207                                script_status->scripts[i].name,
1208                                script_status->scripts[i].status,
1209                                status,
1210                                (long)script_status->scripts[i].start.tv_sec,
1211                                (long)script_status->scripts[i].start.tv_usec,
1212                                (long)script_status->scripts[i].finished.tv_sec,
1213                                (long)script_status->scripts[i].finished.tv_usec,
1214                                script_status->scripts[i].output);
1215                         continue;
1216                 }
1217                 if (status)
1218                         printf("%-20s Status:%s    ",
1219                                script_status->scripts[i].name, status);
1220                 else
1221                         /* Some other error, eg from stat. */
1222                         printf("%-20s Status:CANNOT RUN (%s)",
1223                                script_status->scripts[i].name,
1224                                strerror(-script_status->scripts[i].status));
1225
1226                 if (script_status->scripts[i].status >= 0) {
1227                         printf("Duration:%.3lf ",
1228                         timeval_delta(&script_status->scripts[i].finished,
1229                               &script_status->scripts[i].start));
1230                 }
1231                 if (script_status->scripts[i].status != -ENOEXEC) {
1232                         printf("%s",
1233                                ctime(&script_status->scripts[i].start.tv_sec));
1234                         if (script_status->scripts[i].status != 0) {
1235                                 printf("   OUTPUT:%s\n",
1236                                        script_status->scripts[i].output);
1237                         }
1238                 } else {
1239                         printf("\n");
1240                 }
1241         }
1242         return 0;
1243 }
1244
1245
1246 static int control_scriptstatus(struct ctdb_context *ctdb,
1247                                 int argc, const char **argv)
1248 {
1249         int ret;
1250         enum ctdb_eventscript_call type, min, max;
1251         const char *arg;
1252
1253         if (argc > 1) {
1254                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1255                 return -1;
1256         }
1257
1258         if (argc == 0)
1259                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1260         else
1261                 arg = argv[0];
1262
1263         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1264                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1265                         min = type;
1266                         max = type+1;
1267                         break;
1268                 }
1269         }
1270         if (type == CTDB_EVENT_MAX) {
1271                 if (strcmp(arg, "all") == 0) {
1272                         min = 0;
1273                         max = CTDB_EVENT_MAX;
1274                 } else {
1275                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1276                         return -1;
1277                 }
1278         }
1279
1280         if (options.machinereadable) {
1281                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1282         }
1283
1284         for (type = min; type < max; type++) {
1285                 ret = control_one_scriptstatus(ctdb, type);
1286                 if (ret != 0) {
1287                         return ret;
1288                 }
1289         }
1290
1291         return 0;
1292 }
1293
1294 /*
1295   enable an eventscript
1296  */
1297 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1298 {
1299         int ret;
1300
1301         if (argc < 1) {
1302                 usage();
1303         }
1304
1305         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1306         if (ret != 0) {
1307           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1308                 return ret;
1309         }
1310
1311         return 0;
1312 }
1313
1314 /*
1315   disable an eventscript
1316  */
1317 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1318 {
1319         int ret;
1320
1321         if (argc < 1) {
1322                 usage();
1323         }
1324
1325         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1326         if (ret != 0) {
1327           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1328                 return ret;
1329         }
1330
1331         return 0;
1332 }
1333
1334 /*
1335   display the pnn of the recovery master
1336  */
1337 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1338 {
1339         uint32_t recmaster;
1340
1341         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
1342                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1343                 return -1;
1344         }
1345         printf("%d\n",recmaster);
1346
1347         return 0;
1348 }
1349
1350 /*
1351   add a tickle to a public address
1352  */
1353 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1354 {
1355         struct ctdb_tcp_connection t;
1356         TDB_DATA data;
1357         int ret;
1358
1359         if (argc < 2) {
1360                 usage();
1361         }
1362
1363         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1364                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1365                 return -1;
1366         }
1367         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1368                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1369                 return -1;
1370         }
1371
1372         data.dptr = (uint8_t *)&t;
1373         data.dsize = sizeof(t);
1374
1375         /* tell all nodes about this tcp connection */
1376         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1377                            0, data, ctdb, NULL, NULL, NULL, NULL);
1378         if (ret != 0) {
1379                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1380                 return -1;
1381         }
1382         
1383         return 0;
1384 }
1385
1386
1387 /*
1388   delete a tickle from a node
1389  */
1390 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1391 {
1392         struct ctdb_tcp_connection t;
1393         TDB_DATA data;
1394         int ret;
1395
1396         if (argc < 2) {
1397                 usage();
1398         }
1399
1400         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1401                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1402                 return -1;
1403         }
1404         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1405                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1406                 return -1;
1407         }
1408
1409         data.dptr = (uint8_t *)&t;
1410         data.dsize = sizeof(t);
1411
1412         /* tell all nodes about this tcp connection */
1413         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1414                            0, data, ctdb, NULL, NULL, NULL, NULL);
1415         if (ret != 0) {
1416                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1417                 return -1;
1418         }
1419         
1420         return 0;
1421 }
1422
1423
1424 /*
1425   get a list of all tickles for this pnn
1426  */
1427 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1428 {
1429         struct ctdb_control_tcp_tickle_list *list;
1430         ctdb_sock_addr addr;
1431         int i, ret;
1432         unsigned port = 0;
1433
1434         if (argc < 1) {
1435                 usage();
1436         }
1437
1438         if (argc == 2) {
1439                 port = atoi(argv[1]);
1440         }
1441
1442         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1443                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1444                 return -1;
1445         }
1446
1447         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1448         if (ret == -1) {
1449                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1450                 return -1;
1451         }
1452
1453         if (options.machinereadable){
1454                 printf(":source ip:port:destination ip:port:\n");
1455                 for (i=0;i<list->tickles.num;i++) {
1456                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1457                                 continue;
1458                         }
1459                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1460                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1461                 }
1462         } else {
1463                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1464                 printf("Num tickles:%u\n", list->tickles.num);
1465                 for (i=0;i<list->tickles.num;i++) {
1466                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1467                                 continue;
1468                         }
1469                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1470                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1471                 }
1472         }
1473
1474         talloc_free(list);
1475         
1476         return 0;
1477 }
1478
1479
1480 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1481 {
1482         struct ctdb_all_public_ips *ips;
1483         struct ctdb_public_ip ip;
1484         int i, ret;
1485         uint32_t *nodes;
1486         uint32_t disable_time;
1487         TDB_DATA data;
1488         struct ctdb_node_map *nodemap=NULL;
1489         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1490
1491         disable_time = 30;
1492         data.dptr  = (uint8_t*)&disable_time;
1493         data.dsize = sizeof(disable_time);
1494         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1495         if (ret != 0) {
1496                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1497                 return -1;
1498         }
1499
1500
1501
1502         /* read the public ip list from the node */
1503         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1504         if (ret != 0) {
1505                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1506                 talloc_free(tmp_ctx);
1507                 return -1;
1508         }
1509
1510         for (i=0;i<ips->num;i++) {
1511                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1512                         break;
1513                 }
1514         }
1515         if (i==ips->num) {
1516                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1517                         pnn, ctdb_addr_to_str(addr)));
1518                 talloc_free(tmp_ctx);
1519                 return -1;
1520         }
1521
1522         ip.pnn  = pnn;
1523         ip.addr = *addr;
1524
1525         data.dptr  = (uint8_t *)&ip;
1526         data.dsize = sizeof(ip);
1527
1528         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1529         if (ret != 0) {
1530                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1531                 talloc_free(tmp_ctx);
1532                 return ret;
1533         }
1534
1535         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1536         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1537                                         nodes, 0,
1538                                         LONGTIMELIMIT(),
1539                                         false, data,
1540                                         NULL, NULL,
1541                                         NULL);
1542         if (ret != 0) {
1543                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1544                 talloc_free(tmp_ctx);
1545                 return -1;
1546         }
1547
1548         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1549         if (ret != 0) {
1550                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1551                 talloc_free(tmp_ctx);
1552                 return -1;
1553         }
1554
1555         /* update the recovery daemon so it now knows to expect the new
1556            node assignment for this ip.
1557         */
1558         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1559         if (ret != 0) {
1560                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1561                 return -1;
1562         }
1563
1564         talloc_free(tmp_ctx);
1565         return 0;
1566 }
1567
1568 /*
1569   move/failover an ip address to a specific node
1570  */
1571 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1572 {
1573         uint32_t pnn;
1574         int ret, retries = 0;
1575         ctdb_sock_addr addr;
1576
1577         if (argc < 2) {
1578                 usage();
1579                 return -1;
1580         }
1581
1582         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1583                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1584                 return -1;
1585         }
1586
1587
1588         if (sscanf(argv[1], "%u", &pnn) != 1) {
1589                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1590                 return -1;
1591         }
1592
1593         do {
1594                 ret = move_ip(ctdb, &addr, pnn);
1595                 if (ret != 0) {
1596                         DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 second and try again.\n", pnn));
1597                         sleep(3);
1598                         retries++;
1599                 }
1600         } while (retries < 5 && ret != 0);
1601         if (ret != 0) {
1602                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", pnn));
1603                 return -1;
1604         }
1605
1606         return 0;
1607 }
1608
1609 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
1610 {
1611         uint32_t recmaster;
1612         TDB_DATA data;
1613
1614         if (ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), pnn, &recmaster) != 0) {
1615                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", pnn));
1616                 return -1;
1617         }
1618
1619         data.dptr  = (uint8_t *)&pnn;
1620         data.dsize = sizeof(uint32_t);
1621         if (ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
1622                 DEBUG(DEBUG_ERR,("Failed to send message to force node reallocation\n"));
1623                 return -1;
1624         }
1625
1626         return 0;
1627 }
1628
1629
1630 /*
1631   rebalance a node by setting it to allow failback and triggering a
1632   takeover run
1633  */
1634 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
1635 {
1636         switch (options.pnn) {
1637         case CTDB_BROADCAST_ALL:
1638         case CTDB_CURRENT_NODE:
1639                 DEBUG(DEBUG_ERR,("You must specify a node number with -n <pnn> for the node to rebalance\n"));
1640                 return -1;
1641         }
1642
1643         return rebalance_node(ctdb, options.pnn);
1644 }
1645
1646
1647 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1648 {
1649         struct ctdb_public_ip ip;
1650         int ret;
1651         uint32_t *nodes;
1652         uint32_t disable_time;
1653         TDB_DATA data;
1654         struct ctdb_node_map *nodemap=NULL;
1655         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1656
1657         disable_time = 30;
1658         data.dptr  = (uint8_t*)&disable_time;
1659         data.dsize = sizeof(disable_time);
1660         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1661         if (ret != 0) {
1662                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1663                 return -1;
1664         }
1665
1666         ip.pnn  = -1;
1667         ip.addr = *addr;
1668
1669         data.dptr  = (uint8_t *)&ip;
1670         data.dsize = sizeof(ip);
1671
1672         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1673         if (ret != 0) {
1674                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1675                 talloc_free(tmp_ctx);
1676                 return ret;
1677         }
1678
1679         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1680         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1681                                         nodes, 0,
1682                                         LONGTIMELIMIT(),
1683                                         false, data,
1684                                         NULL, NULL,
1685                                         NULL);
1686         if (ret != 0) {
1687                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1688                 talloc_free(tmp_ctx);
1689                 return -1;
1690         }
1691
1692         talloc_free(tmp_ctx);
1693         return 0;
1694 }
1695
1696 /*
1697   release an ip form all nodes and have it re-assigned by recd
1698  */
1699 static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
1700 {
1701         ctdb_sock_addr addr;
1702
1703         if (argc < 1) {
1704                 usage();
1705                 return -1;
1706         }
1707
1708         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1709                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1710                 return -1;
1711         }
1712
1713         if (rebalance_ip(ctdb, &addr) != 0) {
1714                 DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
1715                 return -1;
1716         }
1717
1718         return 0;
1719 }
1720
1721 static int getips_store_callback(void *param, void *data)
1722 {
1723         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1724         struct ctdb_all_public_ips *ips = param;
1725         int i;
1726
1727         i = ips->num++;
1728         ips->ips[i].pnn  = node_ip->pnn;
1729         ips->ips[i].addr = node_ip->addr;
1730         return 0;
1731 }
1732
1733 static int getips_count_callback(void *param, void *data)
1734 {
1735         uint32_t *count = param;
1736
1737         (*count)++;
1738         return 0;
1739 }
1740
1741 #define IP_KEYLEN       4
1742 static uint32_t *ip_key(ctdb_sock_addr *ip)
1743 {
1744         static uint32_t key[IP_KEYLEN];
1745
1746         bzero(key, sizeof(key));
1747
1748         switch (ip->sa.sa_family) {
1749         case AF_INET:
1750                 key[0]  = ip->ip.sin_addr.s_addr;
1751                 break;
1752         case AF_INET6: {
1753                 uint32_t *s6_a32 = (uint32_t *)&(ip->ip6.sin6_addr.s6_addr);
1754                 key[0]  = s6_a32[3];
1755                 key[1]  = s6_a32[2];
1756                 key[2]  = s6_a32[1];
1757                 key[3]  = s6_a32[0];
1758                 break;
1759         }
1760         default:
1761                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1762                 return key;
1763         }
1764
1765         return key;
1766 }
1767
1768 static void *add_ip_callback(void *parm, void *data)
1769 {
1770         return parm;
1771 }
1772
1773 static int
1774 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1775 {
1776         struct ctdb_all_public_ips *tmp_ips;
1777         struct ctdb_node_map *nodemap=NULL;
1778         trbt_tree_t *ip_tree;
1779         int i, j, len, ret;
1780         uint32_t count;
1781
1782         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1783         if (ret != 0) {
1784                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1785                 return ret;
1786         }
1787
1788         ip_tree = trbt_create(tmp_ctx, 0);
1789
1790         for(i=0;i<nodemap->num;i++){
1791                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1792                         continue;
1793                 }
1794                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1795                         continue;
1796                 }
1797
1798                 /* read the public ip list from this node */
1799                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1800                 if (ret != 0) {
1801                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1802                         return -1;
1803                 }
1804         
1805                 for (j=0; j<tmp_ips->num;j++) {
1806                         struct ctdb_public_ip *node_ip;
1807
1808                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1809                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1810                         node_ip->addr = tmp_ips->ips[j].addr;
1811
1812                         trbt_insertarray32_callback(ip_tree,
1813                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1814                                 add_ip_callback,
1815                                 node_ip);
1816                 }
1817                 talloc_free(tmp_ips);
1818         }
1819
1820         /* traverse */
1821         count = 0;
1822         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1823
1824         len = offsetof(struct ctdb_all_public_ips, ips) + 
1825                 count*sizeof(struct ctdb_public_ip);
1826         tmp_ips = talloc_zero_size(tmp_ctx, len);
1827         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1828
1829         *ips = tmp_ips;
1830
1831         return 0;
1832 }
1833
1834
1835 /* 
1836  * scans all other nodes and returns a pnn for another node that can host this 
1837  * ip address or -1
1838  */
1839 static int
1840 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1841 {
1842         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1843         struct ctdb_all_public_ips *ips;
1844         struct ctdb_node_map *nodemap=NULL;
1845         int i, j, ret;
1846
1847         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1848         if (ret != 0) {
1849                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1850                 talloc_free(tmp_ctx);
1851                 return ret;
1852         }
1853
1854         for(i=0;i<nodemap->num;i++){
1855                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1856                         continue;
1857                 }
1858                 if (nodemap->nodes[i].pnn == options.pnn) {
1859                         continue;
1860                 }
1861
1862                 /* read the public ip list from this node */
1863                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1864                 if (ret != 0) {
1865                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1866                         return -1;
1867                 }
1868
1869                 for (j=0;j<ips->num;j++) {
1870                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1871                                 talloc_free(tmp_ctx);
1872                                 return nodemap->nodes[i].pnn;
1873                         }
1874                 }
1875                 talloc_free(ips);
1876         }
1877
1878         talloc_free(tmp_ctx);
1879         return -1;
1880 }
1881
1882 static uint32_t ipreallocate_finished;
1883
1884 /*
1885   handler for receiving the response to ipreallocate
1886 */
1887 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1888                              TDB_DATA data, void *private_data)
1889 {
1890         ipreallocate_finished = 1;
1891 }
1892
1893 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1894 {
1895         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1896
1897         event_add_timed(ctdb->ev, ctdb, 
1898                                 timeval_current_ofs(1, 0),
1899                                 ctdb_every_second, ctdb);
1900 }
1901
1902 /*
1903   ask the recovery daemon on the recovery master to perform a ip reallocation
1904  */
1905 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1906 {
1907         int i, ret;
1908         TDB_DATA data;
1909         struct takeover_run_reply rd;
1910         uint32_t recmaster;
1911         struct ctdb_node_map *nodemap=NULL;
1912         int retries=0;
1913         struct timeval tv = timeval_current();
1914
1915         /* we need some events to trigger so we can timeout and restart
1916            the loop
1917         */
1918         event_add_timed(ctdb->ev, ctdb, 
1919                                 timeval_current_ofs(1, 0),
1920                                 ctdb_every_second, ctdb);
1921
1922         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1923         if (rd.pnn == -1) {
1924                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1925                 return -1;
1926         }
1927         rd.srvid = getpid();
1928
1929         /* register a message port for receiveing the reply so that we
1930            can receive the reply
1931         */
1932         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1933
1934         data.dptr = (uint8_t *)&rd;
1935         data.dsize = sizeof(rd);
1936
1937 again:
1938         /* check that there are valid nodes available */
1939         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1940                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1941                 return -1;
1942         }
1943         for (i=0; i<nodemap->num;i++) {
1944                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1945                         break;
1946                 }
1947         }
1948         if (i==nodemap->num) {
1949                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1950                 return 0;
1951         }
1952
1953
1954         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
1955                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1956                 return -1;
1957         }
1958
1959         /* verify the node exists */
1960         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1961                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1962                 return -1;
1963         }
1964
1965
1966         /* check tha there are nodes available that can act as a recmaster */
1967         for (i=0; i<nodemap->num; i++) {
1968                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1969                         continue;
1970                 }
1971                 break;
1972         }
1973         if (i == nodemap->num) {
1974                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1975                 return 0;
1976         }
1977
1978         /* verify the recovery master is not STOPPED, nor BANNED */
1979         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1980                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1981                 retries++;
1982                 sleep(1);
1983                 goto again;
1984         } 
1985         
1986         /* verify the recovery master is not STOPPED, nor BANNED */
1987         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1988                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1989                 retries++;
1990                 sleep(1);
1991                 goto again;
1992         } 
1993
1994         ipreallocate_finished = 0;
1995         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1996         if (ret != 0) {
1997                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1998                 return -1;
1999         }
2000
2001         tv = timeval_current();
2002         /* this loop will terminate when we have received the reply */
2003         while (timeval_elapsed(&tv) < 5.0 && ipreallocate_finished == 0) {
2004                 event_loop_once(ctdb->ev);
2005         }
2006         if (ipreallocate_finished == 1) {
2007                 return 0;
2008         }
2009
2010         retries++;
2011         sleep(1);
2012         goto again;
2013
2014         return 0;
2015 }
2016
2017
2018 /*
2019   add a public ip address to a node
2020  */
2021 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
2022 {
2023         int i, ret;
2024         int len, retries = 0;
2025         unsigned mask;
2026         ctdb_sock_addr addr;
2027         struct ctdb_control_ip_iface *pub;
2028         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2029         struct ctdb_all_public_ips *ips;
2030
2031
2032         if (argc != 2) {
2033                 talloc_free(tmp_ctx);
2034                 usage();
2035         }
2036
2037         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
2038                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
2039                 talloc_free(tmp_ctx);
2040                 return -1;
2041         }
2042
2043         /* read the public ip list from the node */
2044         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2045         if (ret != 0) {
2046                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
2047                 talloc_free(tmp_ctx);
2048                 return -1;
2049         }
2050         for (i=0;i<ips->num;i++) {
2051                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2052                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
2053                         return 0;
2054                 }
2055         }
2056
2057
2058
2059         /* Dont timeout. This command waits for an ip reallocation
2060            which sometimes can take wuite a while if there has
2061            been a recent recovery
2062         */
2063         alarm(0);
2064
2065         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
2066         pub = talloc_size(tmp_ctx, len); 
2067         CTDB_NO_MEMORY(ctdb, pub);
2068
2069         pub->addr  = addr;
2070         pub->mask  = mask;
2071         pub->len   = strlen(argv[1])+1;
2072         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
2073
2074         do {
2075                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
2076                 if (ret != 0) {
2077                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
2078                         sleep(3);
2079                         retries++;
2080                 }
2081         } while (retries < 5 && ret != 0);
2082         if (ret != 0) {
2083                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
2084                 talloc_free(tmp_ctx);
2085                 return ret;
2086         }
2087
2088         if (rebalance_node(ctdb, options.pnn) != 0) {
2089                 DEBUG(DEBUG_ERR,("Error when trying to rebalance node\n"));
2090                 return ret;
2091         }
2092
2093         talloc_free(tmp_ctx);
2094         return 0;
2095 }
2096
2097 /*
2098   add a public ip address to a node
2099  */
2100 static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
2101 {
2102         ctdb_sock_addr addr;
2103
2104         if (argc != 1) {
2105                 usage();
2106         }
2107
2108         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2109                 printf("Badly formed ip : %s\n", argv[0]);
2110                 return -1;
2111         }
2112
2113         printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
2114
2115         return 0;
2116 }
2117
2118 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
2119
2120 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
2121 {
2122         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2123         struct ctdb_node_map *nodemap=NULL;
2124         struct ctdb_all_public_ips *ips;
2125         int ret, i, j;
2126
2127         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2128         if (ret != 0) {
2129                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
2130                 return ret;
2131         }
2132
2133         /* remove it from the nodes that are not hosting the ip currently */
2134         for(i=0;i<nodemap->num;i++){
2135                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2136                         continue;
2137                 }
2138                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
2139                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2140                         continue;
2141                 }
2142
2143                 for (j=0;j<ips->num;j++) {
2144                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2145                                 break;
2146                         }
2147                 }
2148                 if (j==ips->num) {
2149                         continue;
2150                 }
2151
2152                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
2153                         continue;
2154                 }
2155
2156                 options.pnn = nodemap->nodes[i].pnn;
2157                 control_delip(ctdb, argc, argv);
2158         }
2159
2160
2161         /* remove it from every node (also the one hosting it) */
2162         for(i=0;i<nodemap->num;i++){
2163                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2164                         continue;
2165                 }
2166                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
2167                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
2168                         continue;
2169                 }
2170
2171                 for (j=0;j<ips->num;j++) {
2172                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
2173                                 break;
2174                         }
2175                 }
2176                 if (j==ips->num) {
2177                         continue;
2178                 }
2179
2180                 options.pnn = nodemap->nodes[i].pnn;
2181                 control_delip(ctdb, argc, argv);
2182         }
2183
2184         talloc_free(tmp_ctx);
2185         return 0;
2186 }
2187         
2188 /*
2189   delete a public ip address from a node
2190  */
2191 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
2192 {
2193         int i, ret;
2194         int retries = 0;
2195         ctdb_sock_addr addr;
2196         struct ctdb_control_ip_iface pub;
2197         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2198         struct ctdb_all_public_ips *ips;
2199
2200         if (argc != 1) {
2201                 talloc_free(tmp_ctx);
2202                 usage();
2203         }
2204
2205         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2206                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2207                 return -1;
2208         }
2209
2210         if (options.pnn == CTDB_BROADCAST_ALL) {
2211                 return control_delip_all(ctdb, argc, argv, &addr);
2212         }
2213
2214         pub.addr  = addr;
2215         pub.mask  = 0;
2216         pub.len   = 0;
2217
2218         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2219         if (ret != 0) {
2220                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2221                 talloc_free(tmp_ctx);
2222                 return ret;
2223         }
2224         
2225         for (i=0;i<ips->num;i++) {
2226                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2227                         break;
2228                 }
2229         }
2230
2231         if (i==ips->num) {
2232                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2233                         ctdb_addr_to_str(&addr)));
2234                 talloc_free(tmp_ctx);
2235                 return -1;
2236         }
2237
2238         if (ips->ips[i].pnn == options.pnn) {
2239                 ret = find_other_host_for_public_ip(ctdb, &addr);
2240                 if (ret != -1) {
2241                         do {
2242                                 ret = move_ip(ctdb, &addr, ret);
2243                                 if (ret != 0) {
2244                                         DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 seconds and try again.\n", options.pnn));
2245                                         sleep(3);
2246                                         retries++;
2247                                 }
2248                         } while (retries < 5 && ret != 0);
2249                         if (ret != 0) {
2250                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", options.pnn));
2251                                 return -1;
2252                         }
2253                 }
2254         }
2255
2256         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2257         if (ret != 0) {
2258                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2259                 talloc_free(tmp_ctx);
2260                 return ret;
2261         }
2262
2263         talloc_free(tmp_ctx);
2264         return 0;
2265 }
2266
2267 /*
2268   kill a tcp connection
2269  */
2270 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2271 {
2272         int ret;
2273         struct ctdb_control_killtcp killtcp;
2274
2275         if (argc < 2) {
2276                 usage();
2277         }
2278
2279         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2280                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2281                 return -1;
2282         }
2283
2284         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2285                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2286                 return -1;
2287         }
2288
2289         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2290         if (ret != 0) {
2291                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2292                 return ret;
2293         }
2294
2295         return 0;
2296 }
2297
2298
2299 /*
2300   send a gratious arp
2301  */
2302 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2303 {
2304         int ret;
2305         ctdb_sock_addr addr;
2306
2307         if (argc < 2) {
2308                 usage();
2309         }
2310
2311         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2312                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2313                 return -1;
2314         }
2315
2316         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2317         if (ret != 0) {
2318                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2319                 return ret;
2320         }
2321
2322         return 0;
2323 }
2324
2325 /*
2326   register a server id
2327  */
2328 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2329 {
2330         int ret;
2331         struct ctdb_server_id server_id;
2332
2333         if (argc < 3) {
2334                 usage();
2335         }
2336
2337         server_id.pnn       = strtoul(argv[0], NULL, 0);
2338         server_id.type      = strtoul(argv[1], NULL, 0);
2339         server_id.server_id = strtoul(argv[2], NULL, 0);
2340
2341         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2342         if (ret != 0) {
2343                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2344                 return ret;
2345         }
2346         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2347         sleep(999);
2348         return -1;
2349 }
2350
2351 /*
2352   unregister a server id
2353  */
2354 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2355 {
2356         int ret;
2357         struct ctdb_server_id server_id;
2358
2359         if (argc < 3) {
2360                 usage();
2361         }
2362
2363         server_id.pnn       = strtoul(argv[0], NULL, 0);
2364         server_id.type      = strtoul(argv[1], NULL, 0);
2365         server_id.server_id = strtoul(argv[2], NULL, 0);
2366
2367         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2368         if (ret != 0) {
2369                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2370                 return ret;
2371         }
2372         return -1;
2373 }
2374
2375 /*
2376   check if a server id exists
2377  */
2378 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2379 {
2380         uint32_t status;
2381         int ret;
2382         struct ctdb_server_id server_id;
2383
2384         if (argc < 3) {
2385                 usage();
2386         }
2387
2388         server_id.pnn       = strtoul(argv[0], NULL, 0);
2389         server_id.type      = strtoul(argv[1], NULL, 0);
2390         server_id.server_id = strtoul(argv[2], NULL, 0);
2391
2392         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2393         if (ret != 0) {
2394                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2395                 return ret;
2396         }
2397
2398         if (status) {
2399                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2400         } else {
2401                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2402         }
2403         return 0;
2404 }
2405
2406 /*
2407   get a list of all server ids that are registered on a node
2408  */
2409 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2410 {
2411         int i, ret;
2412         struct ctdb_server_id_list *server_ids;
2413
2414         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2415         if (ret != 0) {
2416                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2417                 return ret;
2418         }
2419
2420         for (i=0; i<server_ids->num; i++) {
2421                 printf("Server id %d:%d:%d\n", 
2422                         server_ids->server_ids[i].pnn, 
2423                         server_ids->server_ids[i].type, 
2424                         server_ids->server_ids[i].server_id); 
2425         }
2426
2427         return -1;
2428 }
2429
2430 /*
2431   check if a server id exists
2432  */
2433 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2434 {
2435         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2436         uint64_t *ids;
2437         uint8_t *result;
2438         int i;
2439
2440         if (argc < 1) {
2441                 talloc_free(tmp_ctx);
2442                 usage();
2443         }
2444
2445         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2446         result = talloc_array(tmp_ctx, uint8_t, argc);
2447
2448         for (i = 0; i < argc; i++) {
2449                 ids[i] = strtoull(argv[i], NULL, 0);
2450         }
2451
2452         if (!ctdb_check_message_handlers(ctdb_connection,
2453                 options.pnn, argc, ids, result)) {
2454                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2455                                   options.pnn));
2456                 talloc_free(tmp_ctx);
2457                 return -1;
2458         }
2459
2460         for (i=0; i < argc; i++) {
2461                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2462                        result[i] ? "exists" : "does not exist");
2463         }
2464
2465         talloc_free(tmp_ctx);
2466         return 0;
2467 }
2468
2469 /*
2470   send a tcp tickle ack
2471  */
2472 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2473 {
2474         int ret;
2475         ctdb_sock_addr  src, dst;
2476
2477         if (argc < 2) {
2478                 usage();
2479         }
2480
2481         if (!parse_ip_port(argv[0], &src)) {
2482                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2483                 return -1;
2484         }
2485
2486         if (!parse_ip_port(argv[1], &dst)) {
2487                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2488                 return -1;
2489         }
2490
2491         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2492         if (ret==0) {
2493                 return 0;
2494         }
2495         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2496
2497         return -1;
2498 }
2499
2500
2501 /*
2502   display public ip status
2503  */
2504 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2505 {
2506         int i, ret;
2507         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2508         struct ctdb_all_public_ips *ips;
2509
2510         if (options.pnn == CTDB_BROADCAST_ALL) {
2511                 /* read the list of public ips from all nodes */
2512                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2513         } else {
2514                 /* read the public ip list from this node */
2515                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2516         }
2517         if (ret != 0) {
2518                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2519                 talloc_free(tmp_ctx);
2520                 return ret;
2521         }
2522
2523         if (options.machinereadable){
2524                 printf(":Public IP:Node:");
2525                 if (options.verbose){
2526                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2527                 }
2528                 printf("\n");
2529         } else {
2530                 if (options.pnn == CTDB_BROADCAST_ALL) {
2531                         printf("Public IPs on ALL nodes\n");
2532                 } else {
2533                         printf("Public IPs on node %u\n", options.pnn);
2534                 }
2535         }
2536
2537         for (i=1;i<=ips->num;i++) {
2538                 struct ctdb_control_public_ip_info *info = NULL;
2539                 int32_t pnn;
2540                 char *aciface = NULL;
2541                 char *avifaces = NULL;
2542                 char *cifaces = NULL;
2543
2544                 if (options.pnn == CTDB_BROADCAST_ALL) {
2545                         pnn = ips->ips[ips->num-i].pnn;
2546                 } else {
2547                         pnn = options.pnn;
2548                 }
2549
2550                 if (pnn != -1) {
2551                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
2552                                                    &ips->ips[ips->num-i].addr, &info);
2553                 } else {
2554                         ret = -1;
2555                 }
2556
2557                 if (ret == 0) {
2558                         int j;
2559                         for (j=0; j < info->num; j++) {
2560                                 if (cifaces == NULL) {
2561                                         cifaces = talloc_strdup(info,
2562                                                                 info->ifaces[j].name);
2563                                 } else {
2564                                         cifaces = talloc_asprintf_append(cifaces,
2565                                                                          ",%s",
2566                                                                          info->ifaces[j].name);
2567                                 }
2568
2569                                 if (info->active_idx == j) {
2570                                         aciface = info->ifaces[j].name;
2571                                 }
2572
2573                                 if (info->ifaces[j].link_state == 0) {
2574                                         continue;
2575                                 }
2576
2577                                 if (avifaces == NULL) {
2578                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
2579                                 } else {
2580                                         avifaces = talloc_asprintf_append(avifaces,
2581                                                                           ",%s",
2582                                                                           info->ifaces[j].name);
2583                                 }
2584                         }
2585                 }
2586
2587                 if (options.machinereadable){
2588                         printf(":%s:%d:",
2589                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2590                                 ips->ips[ips->num-i].pnn);
2591                         if (options.verbose){
2592                                 printf("%s:%s:%s:",
2593                                         aciface?aciface:"",
2594                                         avifaces?avifaces:"",
2595                                         cifaces?cifaces:"");
2596                         }
2597                         printf("\n");
2598                 } else {
2599                         if (options.verbose) {
2600                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
2601                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2602                                         ips->ips[ips->num-i].pnn,
2603                                         aciface?aciface:"",
2604                                         avifaces?avifaces:"",
2605                                         cifaces?cifaces:"");
2606                         } else {
2607                                 printf("%s %d\n",
2608                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2609                                         ips->ips[ips->num-i].pnn);
2610                         }
2611                 }
2612                 talloc_free(info);
2613         }
2614
2615         talloc_free(tmp_ctx);
2616         return 0;
2617 }
2618
2619 /*
2620   public ip info
2621  */
2622 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
2623 {
2624         int i, ret;
2625         ctdb_sock_addr addr;
2626         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2627         struct ctdb_control_public_ip_info *info;
2628
2629         if (argc != 1) {
2630                 talloc_free(tmp_ctx);
2631                 usage();
2632         }
2633
2634         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2635                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2636                 return -1;
2637         }
2638
2639         /* read the public ip info from this node */
2640         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
2641                                            tmp_ctx, &addr, &info);
2642         if (ret != 0) {
2643                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
2644                                   argv[0], options.pnn));
2645                 talloc_free(tmp_ctx);
2646                 return ret;
2647         }
2648
2649         printf("Public IP[%s] info on node %u\n",
2650                ctdb_addr_to_str(&info->ip.addr),
2651                options.pnn);
2652
2653         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
2654                ctdb_addr_to_str(&info->ip.addr),
2655                info->ip.pnn, info->num);
2656
2657         for (i=0; i<info->num; i++) {
2658                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2659
2660                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
2661                        i+1, info->ifaces[i].name,
2662                        info->ifaces[i].link_state?"up":"down",
2663                        (unsigned int)info->ifaces[i].references,
2664                        (i==info->active_idx)?" (active)":"");
2665         }
2666
2667         talloc_free(tmp_ctx);
2668         return 0;
2669 }
2670
2671 /*
2672   display interfaces status
2673  */
2674 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
2675 {
2676         int i;
2677         struct ctdb_ifaces_list *ifaces;
2678
2679         /* read the public ip list from this node */
2680         if (!ctdb_getifaces(ctdb_connection, options.pnn, &ifaces)) {
2681                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
2682                                   options.pnn));
2683                 return -1;
2684         }
2685
2686         if (options.machinereadable){
2687                 printf(":Name:LinkStatus:References:\n");
2688         } else {
2689                 printf("Interfaces on node %u\n", options.pnn);
2690         }
2691
2692         for (i=0; i<ifaces->num; i++) {
2693                 if (options.machinereadable){
2694                         printf(":%s:%s:%u\n",
2695                                ifaces->ifaces[i].name,
2696                                ifaces->ifaces[i].link_state?"1":"0",
2697                                (unsigned int)ifaces->ifaces[i].references);
2698                 } else {
2699                         printf("name:%s link:%s references:%u\n",
2700                                ifaces->ifaces[i].name,
2701                                ifaces->ifaces[i].link_state?"up":"down",
2702                                (unsigned int)ifaces->ifaces[i].references);
2703                 }
2704         }
2705
2706         ctdb_free_ifaces(ifaces);
2707         return 0;
2708 }
2709
2710
2711 /*
2712   set link status of an interface
2713  */
2714 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
2715 {
2716         int ret;
2717         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2718         struct ctdb_control_iface_info info;
2719
2720         ZERO_STRUCT(info);
2721
2722         if (argc != 2) {
2723                 usage();
2724         }
2725
2726         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
2727                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
2728                                   argv[0]));
2729                 talloc_free(tmp_ctx);
2730                 return -1;
2731         }
2732         strcpy(info.name, argv[0]);
2733
2734         if (strcmp(argv[1], "up") == 0) {
2735                 info.link_state = 1;
2736         } else if (strcmp(argv[1], "down") == 0) {
2737                 info.link_state = 0;
2738         } else {
2739                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
2740                                   argv[1]));
2741                 talloc_free(tmp_ctx);
2742                 return -1;
2743         }
2744
2745         /* read the public ip list from this node */
2746         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
2747                                    tmp_ctx, &info);
2748         if (ret != 0) {
2749                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
2750                                   argv[0], options.pnn));
2751                 talloc_free(tmp_ctx);
2752                 return ret;
2753         }
2754
2755         talloc_free(tmp_ctx);
2756         return 0;
2757 }
2758
2759 /*
2760   display pid of a ctdb daemon
2761  */
2762 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
2763 {
2764         uint32_t pid;
2765         int ret;
2766
2767         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
2768         if (ret != 0) {
2769                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
2770                 return ret;
2771         }
2772         printf("Pid:%d\n", pid);
2773
2774         return 0;
2775 }
2776
2777 /*
2778   disable a remote node
2779  */
2780 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2781 {
2782         int ret;
2783         struct ctdb_node_map *nodemap=NULL;
2784
2785         /* check if the node is already disabled */
2786         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2787                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2788                 exit(10);
2789         }
2790         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2791                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2792                 return 0;
2793         }
2794
2795         do {
2796                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2797                 if (ret != 0) {
2798                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2799                         return ret;
2800                 }
2801
2802                 sleep(1);
2803
2804                 /* read the nodemap and verify the change took effect */
2805                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2806                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2807                         exit(10);
2808                 }
2809
2810         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2811         ret = control_ipreallocate(ctdb, argc, argv);
2812         if (ret != 0) {
2813                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2814                 return ret;
2815         }
2816
2817         return 0;
2818 }
2819
2820 /*
2821   enable a disabled remote node
2822  */
2823 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2824 {
2825         int ret;
2826
2827         struct ctdb_node_map *nodemap=NULL;
2828
2829
2830         /* check if the node is already enabled */
2831         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2832                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2833                 exit(10);
2834         }
2835         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2836                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2837                 return 0;
2838         }
2839
2840         do {
2841                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2842                 if (ret != 0) {
2843                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2844                         return ret;
2845                 }
2846
2847                 sleep(1);
2848
2849                 /* read the nodemap and verify the change took effect */
2850                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2851                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2852                         exit(10);
2853                 }
2854
2855         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2856
2857         ret = control_ipreallocate(ctdb, argc, argv);
2858         if (ret != 0) {
2859                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2860                 return ret;
2861         }
2862
2863         return 0;
2864 }
2865
2866 /*
2867   stop a remote node
2868  */
2869 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2870 {
2871         int ret;
2872         struct ctdb_node_map *nodemap=NULL;
2873
2874         do {
2875                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2876                 if (ret != 0) {
2877                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2878                 }
2879         
2880                 sleep(1);
2881
2882                 /* read the nodemap and verify the change took effect */
2883                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2884                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2885                         exit(10);
2886                 }
2887
2888         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2889         ret = control_ipreallocate(ctdb, argc, argv);
2890         if (ret != 0) {
2891                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2892                 return ret;
2893         }
2894
2895         return 0;
2896 }
2897
2898 /*
2899   restart a stopped remote node
2900  */
2901 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2902 {
2903         int ret;
2904
2905         struct ctdb_node_map *nodemap=NULL;
2906
2907         do {
2908                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2909                 if (ret != 0) {
2910                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2911                         return ret;
2912                 }
2913         
2914                 sleep(1);
2915
2916                 /* read the nodemap and verify the change took effect */
2917                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2918                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2919                         exit(10);
2920                 }
2921
2922         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2923         ret = control_ipreallocate(ctdb, argc, argv);
2924         if (ret != 0) {
2925                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2926                 return ret;
2927         }
2928
2929         return 0;
2930 }
2931
2932 static uint32_t get_generation(struct ctdb_context *ctdb)
2933 {
2934         struct ctdb_vnn_map *vnnmap=NULL;
2935         int ret;
2936
2937         /* wait until the recmaster is not in recovery mode */
2938         while (1) {
2939                 uint32_t recmode, recmaster;
2940                 
2941                 if (vnnmap != NULL) {
2942                         talloc_free(vnnmap);
2943                         vnnmap = NULL;
2944                 }
2945
2946                 /* get the recmaster */
2947                 if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
2948                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2949                         exit(10);
2950                 }
2951
2952                 /* get recovery mode */
2953                 if (!ctdb_getrecmode(ctdb_connection, recmaster, &recmode)) {
2954                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2955                         exit(10);
2956                 }
2957
2958                 /* get the current generation number */
2959                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2960                 if (ret != 0) {
2961                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2962                         exit(10);
2963                 }
2964
2965                 if ((recmode == CTDB_RECOVERY_NORMAL)
2966                 &&  (vnnmap->generation != 1)){
2967                         return vnnmap->generation;
2968                 }
2969                 sleep(1);
2970         }
2971 }
2972
2973 /*
2974   ban a node from the cluster
2975  */
2976 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2977 {
2978         int ret;
2979         struct ctdb_node_map *nodemap=NULL;
2980         struct ctdb_ban_time bantime;
2981
2982         if (argc < 1) {
2983                 usage();
2984         }
2985         
2986         /* verify the node exists */
2987         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2988         if (ret != 0) {
2989                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2990                 return ret;
2991         }
2992
2993         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2994                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2995                 return -1;
2996         }
2997
2998         bantime.pnn  = options.pnn;
2999         bantime.time = strtoul(argv[0], NULL, 0);
3000
3001         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
3002         if (ret != 0) {
3003                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
3004                 return -1;
3005         }       
3006
3007         ret = control_ipreallocate(ctdb, argc, argv);
3008         if (ret != 0) {
3009                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
3010                 return ret;
3011         }
3012
3013         return 0;
3014 }
3015
3016
3017 /*
3018   unban a node from the cluster
3019  */
3020 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
3021 {
3022         int ret;
3023         struct ctdb_node_map *nodemap=NULL;
3024         struct ctdb_ban_time bantime;
3025
3026         /* verify the node exists */
3027         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3028         if (ret != 0) {
3029                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3030                 return ret;
3031         }
3032
3033         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
3034                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
3035                 return -1;
3036         }
3037
3038         bantime.pnn  = options.pnn;
3039         bantime.time = 0;
3040
3041         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
3042         if (ret != 0) {
3043                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
3044                 return -1;
3045         }       
3046
3047         ret = control_ipreallocate(ctdb, argc, argv);
3048         if (ret != 0) {
3049                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
3050                 return ret;
3051         }
3052
3053         return 0;
3054 }
3055
3056
3057 /*
3058   show ban information for a node
3059  */
3060 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
3061 {
3062         int ret;
3063         struct ctdb_node_map *nodemap=NULL;
3064         struct ctdb_ban_time *bantime;
3065
3066         /* verify the node exists */
3067         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
3068         if (ret != 0) {
3069                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
3070                 return ret;
3071         }
3072
3073         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
3074         if (ret != 0) {
3075                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
3076                 return -1;
3077         }       
3078
3079         if (bantime->time == 0) {
3080                 printf("Node %u is not banned\n", bantime->pnn);
3081         } else {
3082                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
3083         }
3084
3085         return 0;
3086 }
3087
3088 /*
3089   shutdown a daemon
3090  */
3091 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
3092 {
3093         int ret;
3094
3095         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
3096         if (ret != 0) {
3097                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
3098                 return ret;
3099         }
3100
3101         return 0;
3102 }
3103
3104 /*
3105   trigger a recovery
3106  */
3107 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
3108 {
3109         int ret;
3110         uint32_t generation, next_generation;
3111
3112         /* record the current generation number */
3113         generation = get_generation(ctdb);
3114
3115         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
3116         if (ret != 0) {
3117                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
3118                 return ret;
3119         }
3120
3121         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
3122         if (ret != 0) {
3123                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
3124                 return ret;
3125         }
3126
3127         /* wait until we are in a new generation */
3128         while (1) {
3129                 next_generation = get_generation(ctdb);
3130                 if (next_generation != generation) {
3131                         return 0;
3132                 }
3133                 sleep(1);
3134         }
3135
3136         return 0;
3137 }
3138
3139
3140 /*
3141   display monitoring mode of a remote node
3142  */
3143 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
3144 {
3145         uint32_t monmode;
3146         int ret;
3147
3148         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
3149         if (ret != 0) {
3150                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
3151                 return ret;
3152         }
3153         if (!options.machinereadable){
3154                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
3155         } else {
3156                 printf(":mode:\n");
3157                 printf(":%d:\n",monmode);
3158         }
3159         return 0;
3160 }
3161
3162
3163 /*
3164   display capabilities of a remote node
3165  */
3166 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
3167 {
3168         uint32_t capabilities;
3169         int ret;
3170
3171         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
3172         if (ret != 0) {
3173                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
3174                 return ret;
3175         }
3176         
3177         if (!options.machinereadable){
3178                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
3179                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
3180                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
3181                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
3182         } else {
3183                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
3184                 printf(":%d:%d:%d:%d:\n",
3185                         !!(capabilities&CTDB_CAP_RECMASTER),
3186                         !!(capabilities&CTDB_CAP_LMASTER),
3187                         !!(capabilities&CTDB_CAP_LVS),
3188                         !!(capabilities&CTDB_CAP_NATGW));
3189         }
3190         return 0;
3191 }
3192
3193 /*
3194   display lvs configuration
3195  */
3196 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
3197 {
3198         uint32_t *capabilities;
3199         struct ctdb_node_map *nodemap=NULL;
3200         int i, ret;
3201         int healthy_count = 0;
3202
3203         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3204         if (ret != 0) {
3205                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3206                 return ret;
3207         }
3208
3209         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3210         CTDB_NO_MEMORY(ctdb, capabilities);
3211         
3212         /* collect capabilities for all connected nodes */
3213         for (i=0; i<nodemap->num; i++) {
3214                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3215                         continue;
3216                 }
3217                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3218                         continue;
3219                 }
3220         
3221                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3222                 if (ret != 0) {
3223                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3224                         return ret;
3225                 }
3226
3227                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3228                         continue;
3229                 }
3230
3231                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3232                         healthy_count++;
3233                 }
3234         }
3235
3236         /* Print all LVS nodes */
3237         for (i=0; i<nodemap->num; i++) {
3238                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3239                         continue;
3240                 }
3241                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3242                         continue;
3243                 }
3244                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3245                         continue;
3246                 }
3247
3248                 if (healthy_count != 0) {
3249                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3250                                 continue;
3251                         }
3252                 }
3253
3254                 printf("%d:%s\n", i, 
3255                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
3256         }
3257
3258         return 0;
3259 }
3260
3261 /*
3262   display who is the lvs master
3263  */
3264 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3265 {
3266         uint32_t *capabilities;
3267         struct ctdb_node_map *nodemap=NULL;
3268         int i, ret;
3269         int healthy_count = 0;
3270
3271         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3272         if (ret != 0) {
3273                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3274                 return ret;
3275         }
3276
3277         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3278         CTDB_NO_MEMORY(ctdb, capabilities);
3279         
3280         /* collect capabilities for all connected nodes */
3281         for (i=0; i<nodemap->num; i++) {
3282                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3283                         continue;
3284                 }
3285                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3286                         continue;
3287                 }
3288         
3289                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3290                 if (ret != 0) {
3291                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3292                         return ret;
3293                 }
3294
3295                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3296                         continue;
3297                 }
3298
3299                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3300                         healthy_count++;
3301                 }
3302         }
3303
3304         /* find and show the lvsmaster */
3305         for (i=0; i<nodemap->num; i++) {
3306                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3307                         continue;
3308                 }
3309                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3310                         continue;
3311                 }
3312                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3313                         continue;
3314                 }
3315
3316                 if (healthy_count != 0) {
3317                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3318                                 continue;
3319                         }
3320                 }
3321
3322                 if (options.machinereadable){
3323                         printf("%d\n", i);
3324                 } else {
3325                         printf("Node %d is LVS master\n", i);
3326                 }
3327                 return 0;
3328         }
3329
3330         printf("There is no LVS master\n");
3331         return -1;
3332 }
3333
3334 /*
3335   disable monitoring on a  node
3336  */
3337 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3338 {
3339         
3340         int ret;
3341
3342         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3343         if (ret != 0) {
3344                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3345                 return ret;
3346         }
3347         printf("Monitoring mode:%s\n","DISABLED");
3348
3349         return 0;
3350 }
3351
3352 /*
3353   enable monitoring on a  node
3354  */
3355 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3356 {
3357         
3358         int ret;
3359
3360         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3361         if (ret != 0) {
3362                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3363                 return ret;
3364         }
3365         printf("Monitoring mode:%s\n","ACTIVE");
3366
3367         return 0;
3368 }
3369
3370 /*
3371   display remote list of keys/data for a db
3372  */
3373 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3374 {
3375         const char *db_name;
3376         struct ctdb_db_context *ctdb_db;
3377         int ret;
3378         bool persistent;
3379         struct ctdb_dump_db_context c;
3380
3381         if (argc < 1) {
3382                 usage();
3383         }
3384
3385         db_name = argv[0];
3386
3387
3388         if (db_exists(ctdb, db_name, &persistent)) {
3389                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3390                 return -1;
3391         }
3392
3393         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3394
3395         if (ctdb_db == NULL) {
3396                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3397                 return -1;
3398         }
3399
3400         if (options.printlmaster) {
3401                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3402                                           ctdb, &ctdb->vnn_map);
3403                 if (ret != 0) {
3404                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3405                                           options.pnn));
3406                         return ret;
3407                 }
3408         }
3409
3410         ZERO_STRUCT(c);
3411         c.f = stdout;
3412         c.printemptyrecords = (bool)options.printemptyrecords;
3413         c.printdatasize = (bool)options.printdatasize;
3414         c.printlmaster = (bool)options.printlmaster;
3415         c.printhash = (bool)options.printhash;
3416         c.printrecordflags = (bool)options.printrecordflags;
3417
3418         /* traverse and dump the cluster tdb */
3419         ret = ctdb_dump_db(ctdb_db, &c);
3420         if (ret == -1) {
3421                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3422                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3423                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3424                                   db_name));
3425                 return -1;
3426         }
3427         talloc_free(ctdb_db);
3428
3429         printf("Dumped %d records\n", ret);
3430         return 0;
3431 }
3432
3433 struct cattdb_data {
3434         struct ctdb_context *ctdb;
3435         uint32_t count;
3436 };
3437
3438 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3439 {
3440         struct cattdb_data *d = private_data;
3441         struct ctdb_dump_db_context c;
3442
3443         d->count++;
3444
3445         ZERO_STRUCT(c);
3446         c.f = stdout;
3447         c.printemptyrecords = (bool)options.printemptyrecords;
3448         c.printdatasize = (bool)options.printdatasize;
3449         c.printlmaster = false;
3450         c.printhash = (bool)options.printhash;
3451         c.printrecordflags = true;
3452
3453         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3454 }
3455
3456 /*
3457   cat the local tdb database using same format as catdb
3458  */
3459 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3460 {
3461         const char *db_name;
3462         struct ctdb_db_context *ctdb_db;
3463         struct cattdb_data d;
3464         bool persistent;
3465
3466         if (argc < 1) {
3467                 usage();
3468         }
3469
3470         db_name = argv[0];
3471
3472
3473         if (db_exists(ctdb, db_name, &persistent)) {
3474                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3475                 return -1;
3476         }
3477
3478         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3479
3480         if (ctdb_db == NULL) {
3481                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3482                 return -1;
3483         }
3484
3485         /* traverse the local tdb */
3486         d.count = 0;
3487         d.ctdb  = ctdb;
3488         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3489                 printf("Failed to cattdb data\n");
3490                 exit(10);
3491         }
3492         talloc_free(ctdb_db);
3493
3494         printf("Dumped %d records\n", d.count);
3495         return 0;
3496 }
3497
3498 /*
3499   display the content of a database key
3500  */
3501 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3502 {
3503         const char *db_name;
3504         struct ctdb_db_context *ctdb_db;
3505         struct ctdb_record_handle *h;
3506         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3507         TDB_DATA key, data;
3508         bool persistent;
3509
3510         if (argc < 2) {
3511                 usage();
3512         }
3513
3514         db_name = argv[0];
3515
3516         if (db_exists(ctdb, db_name, &persistent)) {
3517                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3518                 return -1;
3519         }
3520
3521         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3522
3523         if (ctdb_db == NULL) {
3524                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3525                 return -1;
3526         }
3527
3528         key.dptr  = discard_const(argv[1]);
3529         key.dsize = strlen((char *)key.dptr);
3530
3531         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3532         if (h == NULL) {
3533                 printf("Failed to fetch record '%s' on node %d\n", 
3534                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3535                 talloc_free(tmp_ctx);
3536                 exit(10);
3537         }
3538
3539         printf("Data: size:%d ptr:[%s]\n", (int)data.dsize, data.dptr);
3540
3541         talloc_free(ctdb_db);
3542         talloc_free(tmp_ctx);
3543         return 0;
3544 }
3545
3546 /*
3547   display the content of a database key
3548  */
3549 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3550 {
3551         const char *db_name;
3552         struct ctdb_db_context *ctdb_db;
3553         struct ctdb_record_handle *h;
3554         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3555         TDB_DATA key, data;
3556         bool persistent;
3557
3558         if (argc < 3) {
3559                 usage();
3560         }
3561
3562         db_name = argv[0];
3563
3564         if (db_exists(ctdb, db_name, &persistent)) {
3565                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3566                 return -1;
3567         }
3568
3569         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3570
3571         if (ctdb_db == NULL) {
3572                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3573                 return -1;
3574         }
3575
3576         key.dptr  = discard_const(argv[1]);
3577         key.dsize = strlen((char *)key.dptr);
3578
3579         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3580         if (h == NULL) {
3581                 printf("Failed to fetch record '%s' on node %d\n", 
3582                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3583                 talloc_free(tmp_ctx);
3584                 exit(10);
3585         }
3586
3587         data.dptr  = discard_const(argv[2]);
3588         data.dsize = strlen((char *)data.dptr);
3589
3590         if (ctdb_record_store(h, data) != 0) {
3591                 printf("Failed to store record\n");
3592         }
3593
3594         talloc_free(h);
3595         talloc_free(ctdb_db);
3596         talloc_free(tmp_ctx);
3597         return 0;
3598 }
3599
3600 /*
3601   fetch a record from a persistent database
3602  */
3603 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3604 {
3605         const char *db_name;
3606         struct ctdb_db_context *ctdb_db;
3607         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3608         struct ctdb_transaction_handle *h;
3609         TDB_DATA key, data;
3610         int fd, ret;
3611         bool persistent;
3612
3613         if (argc < 2) {
3614                 talloc_free(tmp_ctx);
3615                 usage();
3616         }
3617
3618         db_name = argv[0];
3619
3620
3621         if (db_exists(ctdb, db_name, &persistent)) {
3622                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3623                 talloc_free(tmp_ctx);
3624                 return -1;
3625         }
3626
3627         if (!persistent) {
3628                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3629                 talloc_free(tmp_ctx);
3630                 return -1;
3631         }
3632
3633         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3634
3635         if (ctdb_db == NULL) {
3636                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3637                 talloc_free(tmp_ctx);
3638                 return -1;
3639         }
3640
3641         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3642         if (h == NULL) {
3643                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3644                 talloc_free(tmp_ctx);
3645                 return -1;
3646         }
3647
3648         key.dptr  = discard_const(argv[1]);
3649         key.dsize = strlen(argv[1]);
3650         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3651         if (ret != 0) {
3652                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
3653                 talloc_free(tmp_ctx);
3654                 return -1;
3655         }
3656
3657         if (data.dsize == 0 || data.dptr == NULL) {
3658                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3659                 talloc_free(tmp_ctx);
3660                 return -1;
3661         }
3662
3663         if (argc == 3) {
3664           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3665                 if (fd == -1) {
3666                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3667                         talloc_free(tmp_ctx);
3668                         return -1;
3669                 }
3670                 write(fd, data.dptr, data.dsize);
3671                 close(fd);
3672         } else {
3673                 write(1, data.dptr, data.dsize);
3674         }
3675
3676         /* abort the transaction */
3677         talloc_free(h);
3678
3679
3680         talloc_free(tmp_ctx);
3681         return 0;
3682 }
3683
3684 /*
3685   fetch a record from a tdb-file
3686  */
3687 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3688 {
3689         const char *tdb_file;
3690         TDB_CONTEXT *tdb;
3691         TDB_DATA key, data;
3692         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3693         int fd;
3694
3695         if (argc < 2) {
3696                 usage();
3697         }
3698
3699         tdb_file = argv[0];
3700
3701         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
3702         if (tdb == NULL) {
3703                 printf("Failed to open TDB file %s\n", tdb_file);
3704                 return -1;
3705         }
3706
3707         if (!strncmp(argv[1], "0x", 2)) {
3708                 key = hextodata(tmp_ctx, argv[1] + 2);
3709                 if (key.dsize == 0) {
3710                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3711                         return -1;
3712                 }
3713         } else {
3714                 key.dptr  = discard_const(argv[1]);
3715                 key.dsize = strlen(argv[1]);
3716         }
3717
3718         data = tdb_fetch(tdb, key);
3719         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
3720                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
3721                 tdb_close(tdb);
3722                 return -1;
3723         }
3724
3725         tdb_close(tdb);
3726
3727         if (argc == 3) {
3728           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3729                 if (fd == -1) {
3730                         printf("Failed to open output file %s\n", argv[2]);
3731                         return -1;
3732                 }
3733                 if (options.verbose){
3734                         write(fd, data.dptr, data.dsize);
3735                 } else {
3736                         write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3737                 }
3738                 close(fd);
3739         } else {
3740                 if (options.verbose){
3741                         write(1, data.dptr, data.dsize);
3742                 } else {
3743                         write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3744                 }
3745         }
3746
3747         talloc_free(tmp_ctx);
3748         return 0;
3749 }
3750
3751 /*
3752   store a record and header to a tdb-file
3753  */
3754 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
3755 {
3756         const char *tdb_file;
3757         TDB_CONTEXT *tdb;
3758         TDB_DATA key, data;
3759         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3760
3761         if (argc < 3) {
3762                 usage();
3763         }
3764
3765         tdb_file = argv[0];
3766
3767         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
3768         if (tdb == NULL) {
3769                 printf("Failed to open TDB file %s\n", tdb_file);
3770                 return -1;
3771         }
3772
3773         if (!strncmp(argv[1], "0x", 2)) {
3774                 key = hextodata(tmp_ctx, argv[1] + 2);
3775                 if (key.dsize == 0) {
3776                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3777                         return -1;
3778                 }
3779         } else {
3780                 key.dptr  = discard_const(argv[1]);
3781                 key.dsize = strlen(argv[1]);
3782         }
3783
3784         if (!strncmp(argv[2], "0x", 2)) {
3785                 data = hextodata(tmp_ctx, argv[2] + 2);
3786                 if (data.dsize == 0) {
3787                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
3788                         return -1;
3789                 }
3790         } else {
3791                 data.dptr  = discard_const(argv[2]);
3792                 data.dsize = strlen(argv[2]);
3793         }
3794
3795         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3796                 printf("Not enough data. You must specify the full ctdb_ltdb_header too when storing\n");
3797                 return -1;
3798         }
3799         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
3800                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
3801                 tdb_close(tdb);
3802                 return -1;
3803         }
3804
3805         tdb_close(tdb);
3806
3807         talloc_free(tmp_ctx);
3808         return 0;
3809 }
3810
3811 /*
3812   write a record to a persistent database
3813  */
3814 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
3815 {
3816         const char *db_name;
3817         struct ctdb_db_context *ctdb_db;
3818         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3819         struct ctdb_transaction_handle *h;
3820         struct stat st;
3821         TDB_DATA key, data;
3822         int fd, ret;
3823
3824         if (argc < 3) {
3825                 talloc_free(tmp_ctx);
3826                 usage();
3827         }
3828
3829         fd = open(argv[2], O_RDONLY);
3830         if (fd == -1) {
3831                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
3832                 talloc_free(tmp_ctx);
3833                 return -1;
3834         }
3835         
3836         ret = fstat(fd, &st);
3837         if (ret == -1) {
3838                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
3839                 close(fd);
3840                 talloc_free(tmp_ctx);
3841                 return -1;
3842         }
3843
3844         if (!S_ISREG(st.st_mode)) {
3845                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
3846                 close(fd);
3847                 talloc_free(tmp_ctx);
3848                 return -1;
3849         }
3850
3851         data.dsize = st.st_size;
3852         if (data.dsize == 0) {
3853                 data.dptr  = NULL;
3854         } else {
3855                 data.dptr = talloc_size(tmp_ctx, data.dsize);
3856                 if (data.dptr == NULL) {
3857                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
3858                         close(fd);
3859                         talloc_free(tmp_ctx);
3860                         return -1;
3861                 }
3862                 ret = read(fd, data.dptr, data.dsize);
3863                 if (ret != data.dsize) {
3864                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
3865                         close(fd);
3866                         talloc_free(tmp_ctx);
3867                         return -1;
3868                 }
3869         }
3870         close(fd);
3871
3872
3873         db_name = argv[0];
3874
3875         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
3876         if (ctdb_db == NULL) {
3877                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3878                 talloc_free(tmp_ctx);
3879                 return -1;
3880         }
3881
3882         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3883         if (h == NULL) {
3884                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3885                 talloc_free(tmp_ctx);
3886                 return -1;
3887         }
3888
3889         key.dptr  = discard_const(argv[1]);
3890         key.dsize = strlen(argv[1]);
3891         ret = ctdb_transaction_store(h, key, data);
3892         if (ret != 0) {
3893                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
3894                 talloc_free(tmp_ctx);
3895                 return -1;
3896         }
3897
3898         ret = ctdb_transaction_commit(h);
3899         if (ret != 0) {
3900                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
3901                 talloc_free(tmp_ctx);
3902                 return -1;
3903         }
3904
3905
3906         talloc_free(tmp_ctx);
3907         return 0;
3908 }
3909
3910 /*
3911   check if a service is bound to a port or not
3912  */
3913 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
3914 {
3915         int s, ret;
3916         unsigned v;
3917         int port;
3918         struct sockaddr_in sin;
3919
3920         if (argc != 1) {
3921                 printf("Use: ctdb chktcport <port>\n");
3922                 return EINVAL;
3923         }
3924
3925         port = atoi(argv[0]);
3926
3927         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
3928         if (s == -1) {
3929                 printf("Failed to open local socket\n");
3930                 return errno;
3931         }
3932
3933         v = fcntl(s, F_GETFL, 0);
3934         fcntl(s, F_SETFL, v | O_NONBLOCK);
3935
3936         bzero(&sin, sizeof(sin));
3937         sin.sin_family = PF_INET;
3938         sin.sin_port   = htons(port);
3939         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
3940         close(s);
3941         if (ret == -1) {
3942                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
3943                 return errno;
3944         }
3945
3946         return 0;
3947 }
3948
3949
3950
3951 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3952                              TDB_DATA data, void *private_data)
3953 {
3954         DEBUG(DEBUG_ERR,("Log data received\n"));
3955         if (data.dsize > 0) {
3956                 printf("%s", data.dptr);
3957         }
3958
3959         exit(0);
3960 }
3961
3962 /*
3963   display a list of log messages from the in memory ringbuffer
3964  */
3965 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
3966 {
3967         int ret;
3968         int32_t res;
3969         struct ctdb_get_log_addr log_addr;
3970         TDB_DATA data;
3971         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3972         char *errmsg;
3973         struct timeval tv;
3974
3975         if (argc != 1) {
3976                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3977                 talloc_free(tmp_ctx);
3978                 return -1;
3979         }
3980
3981         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3982         log_addr.srvid = getpid();
3983         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3984                 log_addr.level = get_debug_by_desc(argv[0]);
3985         } else {
3986                 log_addr.level = strtol(argv[0], NULL, 0);
3987         }
3988
3989
3990         data.dptr = (unsigned char *)&log_addr;
3991         data.dsize = sizeof(log_addr);
3992
3993         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
3994
3995         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
3996         sleep(1);
3997
3998         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
3999
4000         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
4001                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4002         if (ret != 0 || res != 0) {
4003                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
4004                 talloc_free(tmp_ctx);
4005                 return -1;
4006         }
4007
4008
4009         tv = timeval_current();
4010         /* this loop will terminate when we have received the reply */
4011         while (timeval_elapsed(&tv) < 3.0) {    
4012                 event_loop_once(ctdb->ev);
4013         }
4014
4015         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
4016
4017         talloc_free(tmp_ctx);
4018         return 0;
4019 }
4020
4021 /*
4022   clear the in memory log area
4023  */
4024 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
4025 {
4026         int ret;
4027         int32_t res;
4028         char *errmsg;
4029         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4030
4031         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
4032                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4033         if (ret != 0 || res != 0) {
4034                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
4035                 talloc_free(tmp_ctx);
4036                 return -1;
4037         }
4038
4039         talloc_free(tmp_ctx);
4040         return 0;
4041 }
4042
4043
4044 static uint32_t reloadips_finished;
4045
4046 static void reloadips_handler(struct ctdb_context *ctdb, uint64_t srvid, 
4047                              TDB_DATA data, void *private_data)
4048 {
4049         reloadips_finished = 1;
4050 }
4051
4052 static int reloadips_all(struct ctdb_context *ctdb)
4053 {
4054         struct reloadips_all_reply rips;
4055         struct ctdb_node_map *nodemap=NULL;
4056         TDB_DATA data;
4057         uint32_t recmaster;
4058         int ret, i;
4059
4060         /* check that there are valid nodes available */
4061         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
4062                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
4063                 return 1;
4064         }
4065         for (i=0; i<nodemap->num;i++) {
4066                 if (nodemap->nodes[i].flags != 0) {
4067                         DEBUG(DEBUG_ERR,("reloadips -n all  can only be used when all nodes are up and healthy. Aborting due to problem with node %d\n", i));
4068                         return 1;
4069                 }
4070         }
4071
4072
4073         rips.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
4074         if (rips.pnn == -1) {
4075                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
4076                 return 1;
4077         }
4078         rips.srvid = getpid();
4079
4080
4081         /* register a message port for receiveing the reply so that we
4082            can receive the reply
4083         */
4084         ctdb_client_set_message_handler(ctdb, rips.srvid, reloadips_handler, NULL);
4085
4086         if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
4087                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node\n"));
4088                 return -1;
4089         }
4090
4091
4092         data.dptr = (uint8_t *)&rips;
4093         data.dsize = sizeof(rips);
4094
4095         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_RELOAD_ALL_IPS, data);
4096         if (ret != 0) {
4097                 DEBUG(DEBUG_ERR,("Failed to send reload all ips request message to %u\n", options.pnn));
4098                 return 1;
4099         }
4100
4101         reloadips_finished = 0;
4102         while (reloadips_finished == 0) {
4103                 event_loop_once(ctdb->ev);
4104         }
4105
4106         return 0;
4107 }
4108
4109 /*
4110   reload public ips on a specific node
4111  */
4112 static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
4113 {
4114         int ret;
4115         int32_t res;
4116         char *errmsg;
4117         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4118
4119         if (options.pnn == CTDB_BROADCAST_ALL) {
4120                 return reloadips_all(ctdb);
4121         }
4122
4123         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
4124                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
4125         if (ret != 0 || res != 0) {
4126                 DEBUG(DEBUG_ERR,("Failed to reload ips\n"));
4127                 talloc_free(tmp_ctx);
4128                 return -1;
4129         }
4130
4131         talloc_free(tmp_ctx);
4132         return 0;
4133 }
4134
4135 /*
4136   display a list of the databases on a remote ctdb
4137  */
4138 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
4139 {
4140         int i, ret;
4141         struct ctdb_dbid_map *dbmap=NULL;
4142
4143         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4144         if (ret != 0) {
4145                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4146                 return ret;
4147         }
4148
4149         if(options.machinereadable){
4150                 printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
4151                 for(i=0;i<dbmap->num;i++){
4152                         const char *path;
4153                         const char *name;
4154                         const char *health;
4155                         bool persistent;
4156                         bool readonly;
4157                         bool sticky;
4158
4159                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
4160                                             dbmap->dbs[i].dbid, ctdb, &path);
4161                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4162                                             dbmap->dbs[i].dbid, ctdb, &name);
4163                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4164                                               dbmap->dbs[i].dbid, ctdb, &health);
4165                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4166                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4167                         sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4168                         printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
4169                                dbmap->dbs[i].dbid, name, path,
4170                                !!(persistent), !!(sticky),
4171                                !!(health), !!(readonly));
4172                 }
4173                 return 0;
4174         }
4175
4176         printf("Number of databases:%d\n", dbmap->num);
4177         for(i=0;i<dbmap->num;i++){
4178                 const char *path;
4179                 const char *name;
4180                 const char *health;
4181                 bool persistent;
4182                 bool readonly;
4183                 bool sticky;
4184
4185                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4186                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4187                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4188                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4189                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4190                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4191                 printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
4192                        dbmap->dbs[i].dbid, name, path,
4193                        persistent?" PERSISTENT":"",
4194                        sticky?" STICKY":"",
4195                        readonly?" READONLY":"",
4196                        health?" UNHEALTHY":"");
4197         }
4198
4199         return 0;
4200 }
4201
4202 /*
4203   display the status of a database on a remote ctdb
4204  */
4205 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
4206 {
4207         int i, ret;
4208         struct ctdb_dbid_map *dbmap=NULL;
4209         const char *db_name;
4210
4211         if (argc < 1) {
4212                 usage();
4213         }
4214
4215         db_name = argv[0];
4216
4217         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
4218         if (ret != 0) {
4219                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4220                 return ret;
4221         }
4222
4223         for(i=0;i<dbmap->num;i++){
4224                 const char *path;
4225                 const char *name;
4226                 const char *health;
4227                 bool persistent;
4228                 bool readonly;
4229                 bool sticky;
4230
4231                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
4232                 if (strcmp(name, db_name) != 0) {
4233                         continue;
4234                 }
4235
4236                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
4237                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
4238                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4239                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
4240                 sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
4241                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
4242                        dbmap->dbs[i].dbid, name, path,
4243                        persistent?"yes":"no",
4244                        sticky?"yes":"no",
4245                        readonly?"yes":"no",
4246                        health?health:"OK");
4247                 return 0;
4248         }
4249
4250         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
4251         return 0;
4252 }
4253
4254 /*
4255   check if the local node is recmaster or not
4256   it will return 1 if this node is the recmaster and 0 if it is not
4257   or if the local ctdb daemon could not be contacted
4258  */
4259 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
4260 {
4261         uint32_t mypnn, recmaster;
4262
4263         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
4264         if (mypnn == -1) {
4265                 printf("Failed to get pnn of node\n");
4266                 return 1;
4267         }
4268
4269         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
4270                 printf("Failed to get the recmaster\n");
4271                 return 1;
4272         }
4273
4274         if (recmaster != mypnn) {
4275                 printf("this node is not the recmaster\n");
4276                 return 1;
4277         }
4278
4279         printf("this node is the recmaster\n");
4280         return 0;
4281 }
4282
4283 /*
4284   ping a node
4285  */
4286 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
4287 {
4288         int ret;
4289         struct timeval tv = timeval_current();
4290         ret = ctdb_ctrl_ping(ctdb, options.pnn);
4291         if (ret == -1) {
4292                 printf("Unable to get ping response from node %u\n", options.pnn);
4293                 return -1;
4294         } else {
4295                 printf("response from %u time=%.6f sec  (%d clients)\n", 
4296                        options.pnn, timeval_elapsed(&tv), ret);
4297         }
4298         return 0;
4299 }
4300
4301
4302 /*
4303   get a tunable
4304  */
4305 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
4306 {
4307         const char *name;
4308         uint32_t value;
4309         int ret;
4310
4311         if (argc < 1) {
4312                 usage();
4313         }
4314
4315         name = argv[0];
4316         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4317         if (ret == -1) {
4318                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4319                 return -1;
4320         }
4321
4322         printf("%-23s = %u\n", name, value);
4323         return 0;
4324 }
4325
4326 /*
4327   set a tunable
4328  */
4329 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4330 {
4331         const char *name;
4332         uint32_t value;
4333         int ret;
4334
4335         if (argc < 2) {
4336                 usage();
4337         }
4338
4339         name = argv[0];
4340         value = strtoul(argv[1], NULL, 0);
4341
4342         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4343         if (ret == -1) {
4344                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4345                 return -1;
4346         }
4347         return 0;
4348 }
4349
4350 /*
4351   list all tunables
4352  */
4353 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4354 {
4355         uint32_t count;
4356         const char **list;
4357         int ret, i;
4358
4359         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4360         if (ret == -1) {
4361                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4362                 return -1;
4363         }
4364
4365         for (i=0;i<count;i++) {
4366                 control_getvar(ctdb, 1, &list[i]);
4367         }
4368
4369         talloc_free(list);
4370         
4371         return 0;
4372 }
4373
4374 /*
4375   display debug level on a node
4376  */
4377 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4378 {
4379         int ret;
4380         int32_t level;
4381
4382         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4383         if (ret != 0) {
4384                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4385                 return ret;
4386         } else {
4387                 if (options.machinereadable){
4388                         printf(":Name:Level:\n");
4389                         printf(":%s:%d:\n",get_debug_by_level(level),level);
4390                 } else {
4391                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
4392                 }
4393         }
4394         return 0;
4395 }
4396
4397 /*
4398   display reclock file of a node
4399  */
4400 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4401 {
4402         int ret;
4403         const char *reclock;
4404
4405         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4406         if (ret != 0) {
4407                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4408                 return ret;
4409         } else {
4410                 if (options.machinereadable){
4411                         if (reclock != NULL) {
4412                                 printf("%s", reclock);
4413                         }
4414                 } else {
4415                         if (reclock == NULL) {
4416                                 printf("No reclock file used.\n");
4417                         } else {
4418                                 printf("Reclock file:%s\n", reclock);
4419                         }
4420                 }
4421         }
4422         return 0;
4423 }
4424
4425 /*
4426   set the reclock file of a node
4427  */
4428 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4429 {
4430         int ret;
4431         const char *reclock;
4432
4433         if (argc == 0) {
4434                 reclock = NULL;
4435         } else if (argc == 1) {
4436                 reclock = argv[0];
4437         } else {
4438                 usage();
4439         }
4440
4441         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4442         if (ret != 0) {
4443                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4444                 return ret;
4445         }
4446         return 0;
4447 }
4448
4449 /*
4450   set the natgw state on/off
4451  */
4452 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4453 {
4454         int ret;
4455         uint32_t natgwstate;
4456
4457         if (argc == 0) {
4458                 usage();
4459         }
4460
4461         if (!strcmp(argv[0], "on")) {
4462                 natgwstate = 1;
4463         } else if (!strcmp(argv[0], "off")) {
4464                 natgwstate = 0;
4465         } else {
4466                 usage();
4467         }
4468
4469         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4470         if (ret != 0) {
4471                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4472                 return ret;
4473         }
4474
4475         return 0;
4476 }
4477
4478 /*
4479   set the lmaster role on/off
4480  */
4481 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4482 {
4483         int ret;
4484         uint32_t lmasterrole;
4485
4486         if (argc == 0) {
4487                 usage();
4488         }
4489
4490         if (!strcmp(argv[0], "on")) {
4491                 lmasterrole = 1;
4492         } else if (!strcmp(argv[0], "off")) {
4493                 lmasterrole = 0;
4494         } else {
4495                 usage();
4496         }
4497
4498         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
4499         if (ret != 0) {
4500                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
4501                 return ret;
4502         }
4503
4504         return 0;
4505 }
4506
4507 /*
4508   set the recmaster role on/off
4509  */
4510 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4511 {
4512         int ret;
4513         uint32_t recmasterrole;
4514
4515         if (argc == 0) {
4516                 usage();
4517         }
4518
4519         if (!strcmp(argv[0], "on")) {
4520                 recmasterrole = 1;
4521         } else if (!strcmp(argv[0], "off")) {
4522                 recmasterrole = 0;
4523         } else {
4524                 usage();
4525         }
4526
4527         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
4528         if (ret != 0) {
4529                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
4530                 return ret;
4531         }
4532
4533         return 0;
4534 }
4535
4536 /*
4537   set debug level on a node or all nodes
4538  */
4539 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4540 {
4541         int i, ret;
4542         int32_t level;
4543
4544         if (argc == 0) {
4545                 printf("You must specify the debug level. Valid levels are:\n");
4546                 for (i=0; debug_levels[i].description != NULL; i++) {
4547                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4548                 }
4549
4550                 return 0;
4551         }
4552
4553         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
4554                 level = get_debug_by_desc(argv[0]);
4555         } else {
4556                 level = strtol(argv[0], NULL, 0);
4557         }
4558
4559         for (i=0; debug_levels[i].description != NULL; i++) {
4560                 if (level == debug_levels[i].level) {
4561                         break;
4562                 }
4563         }
4564         if (debug_levels[i].description == NULL) {
4565                 printf("Invalid debug level, must be one of\n");
4566                 for (i=0; debug_levels[i].description != NULL; i++) {
4567                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4568                 }
4569                 return -1;
4570         }
4571
4572         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
4573         if (ret != 0) {
4574                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
4575         }
4576         return 0;
4577 }
4578
4579
4580 /*
4581   thaw a node
4582  */
4583 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
4584 {
4585         int ret;
4586         uint32_t priority;
4587         
4588         if (argc == 1) {
4589                 priority = strtol(argv[0], NULL, 0);
4590         } else {
4591                 priority = 0;
4592         }
4593         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
4594
4595         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
4596         if (ret != 0) {
4597                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
4598         }               
4599         return 0;
4600 }
4601
4602
4603 /*
4604   attach to a database
4605  */
4606 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
4607 {
4608         const char *db_name;
4609         struct ctdb_db_context *ctdb_db;
4610         bool persistent = false;
4611
4612         if (argc < 1) {
4613                 usage();
4614         }
4615         db_name = argv[0];
4616         if (argc > 2) {
4617                 usage();
4618         }
4619         if (argc == 2) {
4620                 if (strcmp(argv[1], "persistent") != 0) {
4621                         usage();
4622                 }
4623                 persistent = true;
4624         }
4625
4626         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4627         if (ctdb_db == NULL) {
4628                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4629                 return -1;
4630         }
4631
4632         return 0;
4633 }
4634
4635 /*
4636   set db priority
4637  */
4638 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4639 {
4640         struct ctdb_db_priority db_prio;
4641         int ret;
4642
4643         if (argc < 2) {
4644                 usage();
4645         }
4646
4647         db_prio.db_id    = strtoul(argv[0], NULL, 0);
4648         db_prio.priority = strtoul(argv[1], NULL, 0);
4649
4650         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
4651         if (ret != 0) {
4652                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
4653                 return -1;
4654         }
4655
4656         return 0;
4657 }
4658
4659 /*
4660   get db priority
4661  */
4662 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4663 {
4664         uint32_t db_id, priority;
4665         int ret;
4666
4667         if (argc < 1) {
4668                 usage();
4669         }
4670
4671         db_id = strtoul(argv[0], NULL, 0);
4672
4673         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
4674         if (ret != 0) {
4675                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
4676                 return -1;
4677         }
4678
4679         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
4680
4681         return 0;
4682 }
4683
4684 /*
4685   set the sticky records capability for a database
4686  */
4687 static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
4688 {
4689         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4690         uint32_t db_id;
4691         struct ctdb_dbid_map *dbmap=NULL;
4692         int i, ret;
4693
4694         if (argc < 1) {
4695                 usage();
4696         }
4697
4698         if (!strncmp(argv[0], "0x", 2)) {
4699                 db_id = strtoul(argv[0] + 2, NULL, 0);
4700         } else {
4701                 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
4702                 if (ret != 0) {
4703                         DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4704                         talloc_free(tmp_ctx);
4705                         return ret;
4706                 }
4707                 for(i=0;i<dbmap->num;i++){
4708                         const char *name;
4709
4710                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
4711                         if(!strcmp(argv[0], name)){
4712                                 talloc_free(discard_const(name));
4713                                 break;
4714                         }
4715                         talloc_free(discard_const(name));
4716                 }
4717                 if (i == dbmap->num) {
4718                         DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
4719                         talloc_free(tmp_ctx);
4720                         return -1;
4721                 }
4722                 db_id = dbmap->dbs[i].dbid;
4723         }
4724
4725         ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
4726         if (ret != 0) {
4727                 DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
4728                 talloc_free(tmp_ctx);
4729                 return -1;
4730         }
4731
4732         talloc_free(tmp_ctx);
4733         return 0;
4734 }
4735
4736 /*
4737   set the readonly capability for a database
4738  */
4739 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
4740 {
4741         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4742         uint32_t db_id;
4743         struct ctdb_dbid_map *dbmap=NULL;
4744         int i, ret;
4745
4746         if (argc < 1) {
4747                 usage();
4748         }
4749
4750         if (!strncmp(argv[0], "0x", 2)) {
4751                 db_id = strtoul(argv[0] + 2, NULL, 0);
4752         } else {
4753                 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
4754                 if (ret != 0) {
4755                         DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4756                         talloc_free(tmp_ctx);
4757                         return ret;
4758                 }
4759                 for(i=0;i<dbmap->num;i++){
4760                         const char *name;
4761
4762                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
4763                         if(!strcmp(argv[0], name)){
4764                                 talloc_free(discard_const(name));
4765                                 break;
4766                         }
4767                         talloc_free(discard_const(name));
4768                 }
4769                 if (i == dbmap->num) {
4770                         DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
4771                         talloc_free(tmp_ctx);
4772                         return -1;
4773                 }
4774                 db_id = dbmap->dbs[i].dbid;
4775         }
4776
4777         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
4778         if (ret != 0) {
4779                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
4780                 talloc_free(tmp_ctx);
4781                 return -1;
4782         }
4783
4784         talloc_free(tmp_ctx);
4785         return 0;
4786 }
4787
4788 /*
4789   get db seqnum
4790  */
4791 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
4792 {
4793         bool ret;
4794         uint32_t db_id;
4795         uint64_t seqnum;
4796
4797         if (argc < 1) {
4798                 usage();
4799         }
4800
4801         db_id = strtoul(argv[0], NULL, 0);
4802
4803         ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &seqnum);
4804         if (!ret) {
4805                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
4806                 return -1;
4807         }
4808
4809         printf("Sequence number:%lld\n", (long long)seqnum);
4810
4811         return 0;
4812 }
4813
4814 /*
4815   run an eventscript on a node
4816  */
4817 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
4818 {
4819         TDB_DATA data;
4820         int ret;
4821         int32_t res;
4822         char *errmsg;
4823         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4824
4825         if (argc != 1) {
4826                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4827                 return -1;
4828         }
4829
4830         data.dptr = (unsigned char *)discard_const(argv[0]);
4831         data.dsize = strlen((char *)data.dptr) + 1;
4832
4833         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
4834
4835         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
4836                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4837         if (ret != 0 || res != 0) {
4838                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
4839                 talloc_free(tmp_ctx);
4840                 return -1;
4841         }
4842         talloc_free(tmp_ctx);
4843         return 0;
4844 }
4845
4846 #define DB_VERSION 1
4847 #define MAX_DB_NAME 64
4848 struct db_file_header {
4849         unsigned long version;
4850         time_t timestamp;
4851         unsigned long persistent;
4852         unsigned long size;
4853         const char name[MAX_DB_NAME];
4854 };
4855
4856 struct backup_data {
4857         struct ctdb_marshall_buffer *records;
4858         uint32_t len;
4859         uint32_t total;
4860         bool traverse_error;
4861 };
4862
4863 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
4864 {
4865         struct backup_data *bd = talloc_get_type(private, struct backup_data);
4866         struct ctdb_rec_data *rec;
4867
4868         /* add the record */
4869         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
4870         if (rec == NULL) {
4871                 bd->traverse_error = true;
4872                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
4873                 return -1;
4874         }
4875         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
4876         if (bd->records == NULL) {
4877                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
4878                 bd->traverse_error = true;
4879                 return -1;
4880         }
4881         bd->records->count++;
4882         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
4883         bd->len += rec->length;
4884         talloc_free(rec);
4885
4886         bd->total++;
4887         return 0;
4888 }
4889
4890 /*
4891  * backup a database to a file 
4892  */
4893 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
4894 {
4895         int i, ret;
4896         struct ctdb_dbid_map *dbmap=NULL;
4897         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4898         struct db_file_header dbhdr;
4899         struct ctdb_db_context *ctdb_db;
4900         struct backup_data *bd;
4901         int fh = -1;
4902         int status = -1;
4903         const char *reason = NULL;
4904
4905         if (argc != 2) {
4906                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4907                 return -1;
4908         }
4909
4910         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
4911         if (ret != 0) {
4912                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4913                 return ret;
4914         }
4915
4916         for(i=0;i<dbmap->num;i++){
4917                 const char *name;
4918
4919                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
4920                 if(!strcmp(argv[0], name)){
4921                         talloc_free(discard_const(name));
4922                         break;
4923                 }
4924                 talloc_free(discard_const(name));
4925         }
4926         if (i == dbmap->num) {
4927                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
4928                 talloc_free(tmp_ctx);
4929                 return -1;
4930         }
4931
4932         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4933                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
4934         if (ret != 0) {
4935                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
4936                                  argv[0]));
4937                 talloc_free(tmp_ctx);
4938                 return -1;
4939         }
4940         if (reason) {
4941                 uint32_t allow_unhealthy = 0;
4942
4943                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
4944                                       "AllowUnhealthyDBRead",
4945                                       &allow_unhealthy);
4946
4947                 if (allow_unhealthy != 1) {
4948                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
4949                                          argv[0], reason));
4950
4951                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
4952                                          allow_unhealthy));
4953                         talloc_free(tmp_ctx);
4954                         return -1;
4955                 }
4956
4957                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
4958                                      argv[0], argv[0]));
4959                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
4960                                      "tunnable AllowUnhealthyDBRead = %u\n",
4961                                      allow_unhealthy));
4962         }
4963
4964         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
4965         if (ctdb_db == NULL) {
4966                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
4967                 talloc_free(tmp_ctx);
4968                 return -1;
4969         }
4970
4971
4972         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
4973         if (ret == -1) {
4974                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
4975                 talloc_free(tmp_ctx);
4976                 return -1;
4977         }
4978
4979
4980         bd = talloc_zero(tmp_ctx, struct backup_data);
4981         if (bd == NULL) {
4982                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
4983                 talloc_free(tmp_ctx);
4984                 return -1;
4985         }
4986
4987         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
4988         if (bd->records == NULL) {
4989                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
4990                 talloc_free(tmp_ctx);
4991                 return -1;
4992         }
4993
4994         bd->len = offsetof(struct ctdb_marshall_buffer, data);
4995         bd->records->db_id = ctdb_db->db_id;
4996         /* traverse the database collecting all records */
4997         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
4998             bd->traverse_error) {
4999                 DEBUG(DEBUG_ERR,("Traverse error\n"));
5000                 talloc_free(tmp_ctx);
5001                 return -1;              
5002         }
5003
5004         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
5005
5006
5007         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
5008         if (fh == -1) {
5009                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
5010                 talloc_free(tmp_ctx);
5011                 return -1;
5012         }
5013
5014         dbhdr.version = DB_VERSION;
5015         dbhdr.timestamp = time(NULL);
5016         dbhdr.persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
5017         dbhdr.size = bd->len;
5018         if (strlen(argv[0]) >= MAX_DB_NAME) {
5019                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
5020                 goto done;
5021         }
5022         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
5023         ret = write(fh, &dbhdr, sizeof(dbhdr));
5024         if (ret == -1) {
5025                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5026                 goto done;
5027         }
5028         ret = write(fh, bd->records, bd->len);
5029         if (ret == -1) {
5030                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
5031                 goto done;
5032         }
5033
5034         status = 0;
5035 done:
5036         if (fh != -1) {
5037                 ret = close(fh);
5038                 if (ret == -1) {
5039                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
5040                 }
5041         }
5042
5043         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
5044
5045         talloc_free(tmp_ctx);
5046         return status;
5047 }
5048
5049 /*
5050  * restore a database from a file 
5051  */
5052 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
5053 {
5054         int ret;
5055         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5056         TDB_DATA outdata;
5057         TDB_DATA data;
5058         struct db_file_header dbhdr;
5059         struct ctdb_db_context *ctdb_db;
5060         struct ctdb_node_map *nodemap=NULL;
5061         struct ctdb_vnn_map *vnnmap=NULL;
5062         int i, fh;
5063         struct ctdb_control_wipe_database w;
5064         uint32_t *nodes;
5065         uint32_t generation;
5066         struct tm *tm;
5067         char tbuf[100];
5068         char *dbname;
5069
5070         if (argc < 1 || argc > 2) {
5071                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5072                 return -1;
5073         }
5074
5075         fh = open(argv[0], O_RDONLY);
5076         if (fh == -1) {
5077                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5078                 talloc_free(tmp_ctx);
5079                 return -1;
5080         }
5081
5082         read(fh, &dbhdr, sizeof(dbhdr));
5083         if (dbhdr.version != DB_VERSION) {
5084                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5085                 talloc_free(tmp_ctx);
5086                 return -1;
5087         }
5088
5089         dbname = discard_const(dbhdr.name);
5090         if (argc == 2) {
5091                 dbname = discard_const(argv[1]);
5092         }
5093
5094         outdata.dsize = dbhdr.size;
5095         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5096         if (outdata.dptr == NULL) {
5097                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5098                 close(fh);
5099                 talloc_free(tmp_ctx);
5100                 return -1;
5101         }               
5102         read(fh, outdata.dptr, outdata.dsize);
5103         close(fh);
5104
5105         tm = localtime(&dbhdr.timestamp);
5106         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5107         printf("Restoring database '%s' from backup @ %s\n",
5108                 dbname, tbuf);
5109
5110
5111         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
5112         if (ctdb_db == NULL) {
5113                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
5114                 talloc_free(tmp_ctx);
5115                 return -1;
5116         }
5117
5118         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
5119         if (ret != 0) {
5120                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
5121                 talloc_free(tmp_ctx);
5122                 return ret;
5123         }
5124
5125
5126         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
5127         if (ret != 0) {
5128                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
5129                 talloc_free(tmp_ctx);
5130                 return ret;
5131         }
5132
5133         /* freeze all nodes */
5134         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5135         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5136                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5137                                         nodes, i,
5138                                         TIMELIMIT(),
5139                                         false, tdb_null,
5140                                         NULL, NULL,
5141                                         NULL) != 0) {
5142                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5143                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5144                         talloc_free(tmp_ctx);
5145                         return -1;
5146                 }
5147         }
5148
5149         generation = vnnmap->generation;
5150         data.dptr = (void *)&generation;
5151         data.dsize = sizeof(generation);
5152
5153         /* start a cluster wide transaction */
5154         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5155         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5156                                         nodes, 0,
5157                                         TIMELIMIT(), false, data,
5158                                         NULL, NULL,
5159                                         NULL) != 0) {
5160                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
5161                 return -1;
5162         }
5163
5164
5165         w.db_id = ctdb_db->db_id;
5166         w.transaction_id = generation;
5167
5168         data.dptr = (void *)&w;
5169         data.dsize = sizeof(w);
5170
5171         /* wipe all the remote databases. */
5172         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5173         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5174                                         nodes, 0,
5175                                         TIMELIMIT(), false, data,
5176                                         NULL, NULL,
5177                                         NULL) != 0) {
5178                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5179                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5180                 talloc_free(tmp_ctx);
5181                 return -1;
5182         }
5183         
5184         /* push the database */
5185         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5186         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
5187                                         nodes, 0,
5188                                         TIMELIMIT(), false, outdata,
5189                                         NULL, NULL,
5190                                         NULL) != 0) {
5191                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
5192                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5193                 talloc_free(tmp_ctx);
5194                 return -1;
5195         }
5196
5197         data.dptr = (void *)&ctdb_db->db_id;
5198         data.dsize = sizeof(ctdb_db->db_id);
5199
5200         /* mark the database as healthy */
5201         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5202         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5203                                         nodes, 0,
5204                                         TIMELIMIT(), false, data,
5205                                         NULL, NULL,
5206                                         NULL) != 0) {
5207                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5208                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5209                 talloc_free(tmp_ctx);
5210                 return -1;
5211         }
5212
5213         data.dptr = (void *)&generation;
5214         data.dsize = sizeof(generation);
5215
5216         /* commit all the changes */
5217         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5218                                         nodes, 0,
5219                                         TIMELIMIT(), false, data,
5220                                         NULL, NULL,
5221                                         NULL) != 0) {
5222                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5223                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5224                 talloc_free(tmp_ctx);
5225                 return -1;
5226         }
5227
5228
5229         /* thaw all nodes */
5230         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5231         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5232                                         nodes, 0,
5233                                         TIMELIMIT(),
5234                                         false, tdb_null,
5235                                         NULL, NULL,
5236                                         NULL) != 0) {
5237                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5238                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5239                 talloc_free(tmp_ctx);
5240                 return -1;
5241         }
5242
5243
5244         talloc_free(tmp_ctx);
5245         return 0;
5246 }
5247
5248 /*
5249  * dump a database backup from a file
5250  */
5251 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
5252 {
5253         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5254         TDB_DATA outdata;
5255         struct db_file_header dbhdr;
5256         int i, fh;
5257         struct tm *tm;
5258         char tbuf[100];
5259         struct ctdb_rec_data *rec = NULL;
5260         struct ctdb_marshall_buffer *m;
5261         struct ctdb_dump_db_context c;
5262
5263         if (argc != 1) {
5264                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5265                 return -1;
5266         }
5267
5268         fh = open(argv[0], O_RDONLY);
5269         if (fh == -1) {
5270                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
5271                 talloc_free(tmp_ctx);
5272                 return -1;
5273         }
5274
5275         read(fh, &dbhdr, sizeof(dbhdr));
5276         if (dbhdr.version != DB_VERSION) {
5277                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
5278                 talloc_free(tmp_ctx);
5279                 return -1;
5280         }
5281
5282         outdata.dsize = dbhdr.size;
5283         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
5284         if (outdata.dptr == NULL) {
5285                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
5286                 close(fh);
5287                 talloc_free(tmp_ctx);
5288                 return -1;
5289         }
5290         read(fh, outdata.dptr, outdata.dsize);
5291         close(fh);
5292         m = (struct ctdb_marshall_buffer *)outdata.dptr;
5293
5294         tm = localtime(&dbhdr.timestamp);
5295         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
5296         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
5297                 dbhdr.name, m->db_id, tbuf);
5298
5299         ZERO_STRUCT(c);
5300         c.f = stdout;
5301         c.printemptyrecords = (bool)options.printemptyrecords;
5302         c.printdatasize = (bool)options.printdatasize;
5303         c.printlmaster = false;
5304         c.printhash = (bool)options.printhash;
5305         c.printrecordflags = (bool)options.printrecordflags;
5306
5307         for (i=0; i < m->count; i++) {
5308                 uint32_t reqid = 0;
5309                 TDB_DATA key, data;
5310
5311                 /* we do not want the header splitted, so we pass NULL*/
5312                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
5313                                               NULL, &key, &data);
5314
5315                 ctdb_dumpdb_record(ctdb, key, data, &c);
5316         }
5317
5318         printf("Dumped %d records\n", i);
5319         talloc_free(tmp_ctx);
5320         return 0;
5321 }
5322
5323 /*
5324  * wipe a database from a file
5325  */
5326 static int control_wipedb(struct ctdb_context *ctdb, int argc,
5327                           const char **argv)
5328 {
5329         int ret;
5330         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5331         TDB_DATA data;
5332         struct ctdb_db_context *ctdb_db;
5333         struct ctdb_node_map *nodemap = NULL;
5334         struct ctdb_vnn_map *vnnmap = NULL;
5335         int i;
5336         struct ctdb_control_wipe_database w;
5337         uint32_t *nodes;
5338         uint32_t generation;
5339         struct ctdb_dbid_map *dbmap = NULL;
5340
5341         if (argc != 1) {
5342                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
5343                 return -1;
5344         }
5345
5346         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5347                                  &dbmap);
5348         if (ret != 0) {
5349                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
5350                                   options.pnn));
5351                 return ret;
5352         }
5353
5354         for(i=0;i<dbmap->num;i++){
5355                 const char *name;
5356
5357                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
5358                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
5359                 if(!strcmp(argv[0], name)){
5360                         talloc_free(discard_const(name));
5361                         break;
5362                 }
5363                 talloc_free(discard_const(name));
5364         }
5365         if (i == dbmap->num) {
5366                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
5367                                   argv[0]));
5368                 talloc_free(tmp_ctx);
5369                 return -1;
5370         }
5371
5372         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
5373         if (ctdb_db == NULL) {
5374                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
5375                                   argv[0]));
5376                 talloc_free(tmp_ctx);
5377                 return -1;
5378         }
5379
5380         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
5381                                    &nodemap);
5382         if (ret != 0) {
5383                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
5384                                   options.pnn));
5385                 talloc_free(tmp_ctx);
5386                 return ret;
5387         }
5388
5389         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
5390                                   &vnnmap);
5391         if (ret != 0) {
5392                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5393                                   options.pnn));
5394                 talloc_free(tmp_ctx);
5395                 return ret;
5396         }
5397
5398         /* freeze all nodes */
5399         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5400         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5401                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5402                                                 nodes, i,
5403                                                 TIMELIMIT(),
5404                                                 false, tdb_null,
5405                                                 NULL, NULL,
5406                                                 NULL);
5407                 if (ret != 0) {
5408                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5409                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5410                                              CTDB_RECOVERY_ACTIVE);
5411                         talloc_free(tmp_ctx);
5412                         return -1;
5413                 }
5414         }
5415
5416         generation = vnnmap->generation;
5417         data.dptr = (void *)&generation;
5418         data.dsize = sizeof(generation);
5419
5420         /* start a cluster wide transaction */
5421         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5422         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5423                                         nodes, 0,
5424                                         TIMELIMIT(), false, data,
5425                                         NULL, NULL,
5426                                         NULL);
5427         if (ret!= 0) {
5428                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5429                                   "transactions.\n"));
5430                 return -1;
5431         }
5432
5433         w.db_id = ctdb_db->db_id;
5434         w.transaction_id = generation;
5435
5436         data.dptr = (void *)&w;
5437         data.dsize = sizeof(w);
5438
5439         /* wipe all the remote databases. */
5440         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5441         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5442                                         nodes, 0,
5443                                         TIMELIMIT(), false, data,
5444                                         NULL, NULL,
5445                                         NULL) != 0) {
5446                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5447                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5448                 talloc_free(tmp_ctx);
5449                 return -1;
5450         }
5451
5452         data.dptr = (void *)&ctdb_db->db_id;
5453         data.dsize = sizeof(ctdb_db->db_id);
5454
5455         /* mark the database as healthy */
5456         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5457         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5458                                         nodes, 0,
5459                                         TIMELIMIT(), false, data,
5460                                         NULL, NULL,
5461                                         NULL) != 0) {
5462                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5463                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5464                 talloc_free(tmp_ctx);
5465                 return -1;
5466         }
5467
5468         data.dptr = (void *)&generation;
5469         data.dsize = sizeof(generation);
5470
5471         /* commit all the changes */
5472         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5473                                         nodes, 0,
5474                                         TIMELIMIT(), false, data,
5475                                         NULL, NULL,
5476                                         NULL) != 0) {
5477                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5478                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5479                 talloc_free(tmp_ctx);
5480                 return -1;
5481         }
5482
5483         /* thaw all nodes */
5484         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5485         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5486                                         nodes, 0,
5487                                         TIMELIMIT(),
5488                                         false, tdb_null,
5489                                         NULL, NULL,
5490                                         NULL) != 0) {
5491                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5492                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5493                 talloc_free(tmp_ctx);
5494                 return -1;
5495         }
5496
5497         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
5498
5499         talloc_free(tmp_ctx);
5500         return 0;
5501 }
5502
5503 /*
5504   dump memory usage
5505  */
5506 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5507 {
5508         TDB_DATA data;
5509         int ret;
5510         int32_t res;
5511         char *errmsg;
5512         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5513         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
5514                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
5515         if (ret != 0 || res != 0) {
5516                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
5517                 talloc_free(tmp_ctx);
5518                 return -1;
5519         }
5520         write(1, data.dptr, data.dsize);
5521         talloc_free(tmp_ctx);
5522         return 0;
5523 }
5524
5525 /*
5526   handler for memory dumps
5527 */
5528 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5529                              TDB_DATA data, void *private_data)
5530 {
5531         write(1, data.dptr, data.dsize);
5532         exit(0);
5533 }
5534
5535 /*
5536   dump memory usage on the recovery daemon
5537  */
5538 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5539 {
5540         int ret;
5541         TDB_DATA data;
5542         struct rd_memdump_reply rd;
5543
5544         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
5545         if (rd.pnn == -1) {
5546                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
5547                 return -1;
5548         }
5549         rd.srvid = getpid();
5550
5551         /* register a message port for receiveing the reply so that we
5552            can receive the reply
5553         */
5554         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
5555
5556
5557         data.dptr = (uint8_t *)&rd;
5558         data.dsize = sizeof(rd);
5559
5560         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
5561         if (ret != 0) {
5562                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5563                 return -1;
5564         }
5565
5566         /* this loop will terminate when we have received the reply */
5567         while (1) {     
5568                 event_loop_once(ctdb->ev);
5569         }
5570
5571         return 0;
5572 }
5573
5574 /*
5575   send a message to a srvid
5576  */
5577 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
5578 {
5579         unsigned long srvid;
5580         int ret;
5581         TDB_DATA data;
5582
5583         if (argc < 2) {
5584                 usage();
5585         }
5586
5587         srvid      = strtoul(argv[0], NULL, 0);
5588
5589         data.dptr = (uint8_t *)discard_const(argv[1]);
5590         data.dsize= strlen(argv[1]);
5591
5592         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
5593         if (ret != 0) {
5594                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5595                 return -1;
5596         }
5597
5598         return 0;
5599 }
5600
5601 /*
5602   handler for msglisten
5603 */
5604 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5605                              TDB_DATA data, void *private_data)
5606 {
5607         int i;
5608
5609         printf("Message received: ");
5610         for (i=0;i<data.dsize;i++) {
5611                 printf("%c", data.dptr[i]);
5612         }
5613         printf("\n");
5614 }
5615
5616 /*
5617   listen for messages on a messageport
5618  */
5619 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
5620 {
5621         uint64_t srvid;
5622
5623         srvid = getpid();
5624
5625         /* register a message port and listen for messages
5626         */
5627         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
5628         printf("Listening for messages on srvid:%d\n", (int)srvid);
5629
5630         while (1) {     
5631                 event_loop_once(ctdb->ev);
5632         }
5633
5634         return 0;
5635 }
5636
5637 /*
5638   list all nodes in the cluster
5639   we parse the nodes file directly
5640  */
5641 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
5642 {
5643         TALLOC_CTX *mem_ctx = talloc_new(NULL);
5644         struct pnn_node *pnn_nodes;
5645         struct pnn_node *pnn_node;
5646
5647         pnn_nodes = read_nodes_file(mem_ctx);
5648         if (pnn_nodes == NULL) {
5649                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
5650                 talloc_free(mem_ctx);
5651                 return -1;
5652         }
5653
5654         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
5655                 ctdb_sock_addr addr;
5656                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
5657                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
5658                         talloc_free(mem_ctx);
5659                         return -1;
5660                 }
5661                 if (options.machinereadable){
5662                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
5663                 } else {
5664                         printf("%s\n", pnn_node->addr);
5665                 }
5666         }
5667         talloc_free(mem_ctx);
5668
5669         return 0;
5670 }
5671
5672 /*
5673   reload the nodes file on the local node
5674  */
5675 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
5676 {
5677         int i, ret;
5678         int mypnn;
5679         struct ctdb_node_map *nodemap=NULL;
5680
5681         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
5682         if (mypnn == -1) {
5683                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
5684                 return -1;
5685         }
5686
5687         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
5688         if (ret != 0) {
5689                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
5690                 return ret;
5691         }
5692
5693         /* reload the nodes file on all remote nodes */
5694         for (i=0;i<nodemap->num;i++) {
5695                 if (nodemap->nodes[i].pnn == mypnn) {
5696                         continue;
5697                 }
5698                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
5699                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
5700                         nodemap->nodes[i].pnn);
5701                 if (ret != 0) {
5702                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
5703                 }
5704         }
5705
5706         /* reload the nodes file on the local node */
5707         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
5708         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
5709         if (ret != 0) {
5710                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
5711         }
5712
5713         /* initiate a recovery */
5714         control_recover(ctdb, argc, argv);
5715
5716         return 0;
5717 }
5718
5719
5720 static const struct {
5721         const char *name;
5722         int (*fn)(struct ctdb_context *, int, const char **);
5723         bool auto_all;
5724         bool without_daemon; /* can be run without daemon running ? */
5725         const char *msg;
5726         const char *args;
5727 } ctdb_commands[] = {
5728 #ifdef CTDB_VERS
5729         { "version",         control_version,           true,   false,  "show version of ctdb" },
5730 #endif
5731         { "status",          control_status,            true,   false,  "show node status" },
5732         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
5733         { "ping",            control_ping,              true,   false,  "ping all nodes" },
5734         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
5735         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
5736         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
5737         { "statistics",      control_statistics,        false,  false, "show statistics" },
5738         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
5739         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
5740         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
5741         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
5742         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
5743         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
5744         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
5745         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
5746         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
5747         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname>"},
5748         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname>"},
5749         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
5750         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
5751         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
5752         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
5753         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
5754         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
5755         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
5756         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
5757         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
5758         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
5759         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
5760         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
5761         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
5762         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
5763         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
5764         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
5765         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
5766         { "stop",            control_stop,              true,   false,  "stop a node" },
5767         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
5768         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
5769         { "unban",           control_unban,             true,   false,  "unban a node" },
5770         { "showban",         control_showban,           true,   false,  "show ban information"},
5771         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
5772         { "recover",         control_recover,           true,   false,  "force recovery" },
5773         { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
5774         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
5775         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
5776         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
5777         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
5778         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
5779         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
5780         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
5781         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
5782
5783         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
5784
5785         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
5786         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
5787         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
5788         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
5789         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
5790         { "vacuum",          ctdb_vacuum,               false,  true, "vacuum the databases of empty records", "[max_records]"},
5791         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
5792         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
5793         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
5794         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
5795         { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
5796         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
5797         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
5798         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
5799         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
5800         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
5801         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
5802         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
5803         { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
5804         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
5805         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
5806         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
5807         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
5808         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
5809         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
5810         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
5811         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
5812         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
5813         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
5814         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
5815         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
5816         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbid>|<name>"},
5817         { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbid>|<name>"},
5818         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
5819         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
5820         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
5821         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<db> <key> [<file>]" },
5822         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
5823         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
5824         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data+header>" },
5825         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
5826         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
5827         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
5828         { "rebalancenode",     control_rebalancenode,   false,  false, "release a node by allowing it to takeover ips", "<pnn>"},
5829         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbid>" },
5830         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status" },
5831         { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<db>" },
5832         { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on a node" },
5833         { "ipiface",         control_ipiface,           true,   true,  "Find which interface an ip address is hsoted on", "<ip>" },
5834 };
5835
5836 /*
5837   show usage message
5838  */
5839 static void usage(void)
5840 {
5841         int i;
5842         printf(
5843 "Usage: ctdb [options] <control>\n" \
5844 "Options:\n" \
5845 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
5846 "   -Y                 generate machinereadable output\n"
5847 "   -v                 generate verbose output\n"
5848 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
5849         printf("Controls:\n");
5850         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
5851                 printf("  %-15s %-27s  %s\n", 
5852                        ctdb_commands[i].name, 
5853                        ctdb_commands[i].args?ctdb_commands[i].args:"",
5854                        ctdb_commands[i].msg);
5855         }
5856         exit(1);
5857 }
5858
5859
5860 static void ctdb_alarm(int sig)
5861 {
5862         printf("Maximum runtime exceeded - exiting\n");
5863         _exit(ERR_TIMEOUT);
5864 }
5865
5866 /*
5867   main program
5868 */
5869 int main(int argc, const char *argv[])
5870 {
5871         struct ctdb_context *ctdb;
5872         char *nodestring = NULL;
5873         struct poptOption popt_options[] = {
5874                 POPT_AUTOHELP
5875                 POPT_CTDB_CMDLINE
5876                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
5877                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
5878                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
5879                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
5880                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
5881                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
5882                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
5883                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
5884                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
5885                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
5886                 POPT_TABLEEND
5887         };
5888         int opt;
5889         const char **extra_argv;
5890         int extra_argc = 0;
5891         int ret=-1, i;
5892         poptContext pc;
5893         struct event_context *ev;
5894         const char *control;
5895         const char *socket_name;
5896
5897         setlinebuf(stdout);
5898         
5899         /* set some defaults */
5900         options.maxruntime = 0;
5901         options.timelimit = 3;
5902         options.pnn = CTDB_CURRENT_NODE;
5903
5904         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
5905
5906         while ((opt = poptGetNextOpt(pc)) != -1) {
5907                 switch (opt) {
5908                 default:
5909                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
5910                                 poptBadOption(pc, 0), poptStrerror(opt)));
5911                         exit(1);
5912                 }
5913         }
5914
5915         /* setup the remaining options for the main program to use */
5916         extra_argv = poptGetArgs(pc);
5917         if (extra_argv) {
5918                 extra_argv++;
5919                 while (extra_argv[extra_argc]) extra_argc++;
5920         }
5921
5922         if (extra_argc < 1) {
5923                 usage();
5924         }
5925
5926         if (options.maxruntime == 0) {
5927                 const char *ctdb_timeout;
5928                 ctdb_timeout = getenv("CTDB_TIMEOUT");
5929                 if (ctdb_timeout != NULL) {
5930                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
5931                 } else {
5932                         /* default timeout is 120 seconds */
5933                         options.maxruntime = 120;
5934                 }
5935         }
5936
5937         signal(SIGALRM, ctdb_alarm);
5938         alarm(options.maxruntime);
5939
5940         control = extra_argv[0];
5941
5942         ev = event_context_init(NULL);
5943         if (!ev) {
5944                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
5945                 exit(1);
5946         }
5947
5948         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
5949                 if (strcmp(control, ctdb_commands[i].name) == 0) {
5950                         break;
5951                 }
5952         }
5953
5954         if (i == ARRAY_SIZE(ctdb_commands)) {
5955                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
5956                 exit(1);
5957         }
5958
5959         if (ctdb_commands[i].without_daemon == true) {
5960                 if (nodestring != NULL) {
5961                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
5962                         exit(1);
5963                 }
5964                 close(2);
5965                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
5966         }
5967
5968         /* initialise ctdb */
5969         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
5970
5971         if (ctdb == NULL) {
5972                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
5973                 exit(1);
5974         }
5975
5976         /* initialize a libctdb connection as well */
5977         socket_name = ctdb_get_socketname(ctdb);
5978         ctdb_connection = ctdb_connect(socket_name,
5979                                        ctdb_log_file, stderr);
5980         if (ctdb_connection == NULL) {
5981                 DEBUG(DEBUG_ERR, ("Failed to connect to daemon from libctdb\n"));
5982                 exit(1);
5983         }                               
5984
5985         /* setup the node number(s) to contact */
5986         if (!parse_nodestring(ctdb, nodestring, CTDB_CURRENT_NODE, false,
5987                               &options.nodes, &options.pnn)) {
5988                 usage();
5989         }
5990
5991         if (options.pnn == CTDB_CURRENT_NODE) {
5992                 options.pnn = options.nodes[0];
5993         }
5994
5995         if (ctdb_commands[i].auto_all && 
5996             ((options.pnn == CTDB_BROADCAST_ALL) ||
5997              (options.pnn == CTDB_MULTICAST))) {
5998                 int j;
5999
6000                 ret = 0;
6001                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
6002                         options.pnn = options.nodes[j];
6003                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6004                 }
6005         } else {
6006                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
6007         }
6008
6009         ctdb_disconnect(ctdb_connection);
6010         talloc_free(ctdb);
6011         (void)poptFreeContext(pc);
6012
6013         return ret;
6014
6015 }