STATISTICS: add total counts for number of delegations and number of revokes
[obnox/ctdb.git] / tools / ctdb.c
1 /* 
2    ctdb control tool
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "lib/tevent/tevent.h"
23 #include "system/time.h"
24 #include "system/filesys.h"
25 #include "system/network.h"
26 #include "system/locale.h"
27 #include "popt.h"
28 #include "cmdline.h"
29 #include "../include/ctdb.h"
30 #include "../include/ctdb_client.h"
31 #include "../include/ctdb_private.h"
32 #include "../common/rb_tree.h"
33 #include "db_wrap.h"
34
35 #define ERR_TIMEOUT     20      /* timed out trying to reach node */
36 #define ERR_NONODE      21      /* node does not exist */
37 #define ERR_DISNODE     22      /* node is disconnected */
38
39 struct ctdb_connection *ctdb_connection;
40
41 static void usage(void);
42
43 static struct {
44         int timelimit;
45         uint32_t pnn;
46         uint32_t *nodes;
47         int machinereadable;
48         int verbose;
49         int maxruntime;
50         int printemptyrecords;
51         int printdatasize;
52         int printlmaster;
53         int printhash;
54         int printrecordflags;
55 } options;
56
57 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
58 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
59
60 #ifdef CTDB_VERS
61 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
62 {
63 #define STR(x) #x
64 #define XSTR(x) STR(x)
65         printf("CTDB version: %s\n", XSTR(CTDB_VERS));
66         return 0;
67 }
68 #endif
69
70 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                            \
71                 DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
72                                    "Out of memory in " __location__ )); \
73                 abort();                                                \
74         }} while (0)
75
76 /* Pretty print the flags to a static buffer in human-readable format.
77  * This never returns NULL!
78  */
79 static const char *pretty_print_flags(uint32_t flags)
80 {
81         int j;
82         static const struct {
83                 uint32_t flag;
84                 const char *name;
85         } flag_names[] = {
86                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
87                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
88                 { NODE_FLAGS_BANNED,                "BANNED" },
89                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
90                 { NODE_FLAGS_DELETED,               "DELETED" },
91                 { NODE_FLAGS_STOPPED,               "STOPPED" },
92                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
93         };
94         static char flags_str[512]; /* Big enough to contain all flag names */
95
96         flags_str[0] = '\0';
97         for (j=0;j<ARRAY_SIZE(flag_names);j++) {
98                 if (flags & flag_names[j].flag) {
99                         if (flags_str[0] == '\0') {
100                                 (void) strcpy(flags_str, flag_names[j].name);
101                         } else {
102                                 (void) strcat(flags_str, "|");
103                                 (void) strcat(flags_str, flag_names[j].name);
104                         }
105                 }
106         }
107         if (flags_str[0] == '\0') {
108                 (void) strcpy(flags_str, "OK");
109         }
110
111         return flags_str;
112 }
113
114 static int h2i(char h)
115 {
116         if (h >= 'a' && h <= 'f') return h - 'a' + 10;
117         if (h >= 'A' && h <= 'F') return h - 'f' + 10;
118         return h - '0';
119 }
120
121 static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
122 {
123         int i, len;
124         TDB_DATA key = {NULL, 0};
125
126         len = strlen(str);
127         if (len & 0x01) {
128                 DEBUG(DEBUG_ERR,("Key specified with odd number of hexadecimal digits\n"));
129                 return key;
130         }
131
132         key.dsize = len>>1;
133         key.dptr  = talloc_size(mem_ctx, key.dsize);
134
135         for (i=0; i < len/2; i++) {
136                 key.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
137         }
138         return key;
139 }
140
141 /* Parse a nodestring.  Parameter dd_ok controls what happens to nodes
142  * that are disconnected or deleted.  If dd_ok is true those nodes are
143  * included in the output list of nodes.  If dd_ok is false, those
144  * nodes are filtered from the "all" case and cause an error if
145  * explicitly specified.
146  */
147 static bool parse_nodestring(struct ctdb_context *ctdb,
148                              const char * nodestring,
149                              uint32_t current_pnn,
150                              bool dd_ok,
151                              uint32_t **nodes,
152                              uint32_t *pnn_mode)
153 {
154         int n;
155         uint32_t i;
156         struct ctdb_node_map *nodemap;
157        
158         *nodes = NULL;
159
160         if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
161                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
162                 exit(10);
163         }
164
165         if (nodestring != NULL) {
166                 *nodes = talloc_array(ctdb, uint32_t, 0);
167                 CTDB_NOMEM_ABORT(*nodes);
168                
169                 n = 0;
170
171                 if (strcmp(nodestring, "all") == 0) {
172                         *pnn_mode = CTDB_BROADCAST_ALL;
173
174                         /* all */
175                         for (i = 0; i < nodemap->num; i++) {
176                                 if ((nodemap->nodes[i].flags & 
177                                      (NODE_FLAGS_DISCONNECTED |
178                                       NODE_FLAGS_DELETED)) && !dd_ok) {
179                                         continue;
180                                 }
181                                 *nodes = talloc_realloc(ctdb, *nodes,
182                                                         uint32_t, n+1);
183                                 CTDB_NOMEM_ABORT(*nodes);
184                                 (*nodes)[n] = i;
185                                 n++;
186                         }
187                 } else {
188                         /* x{,y...} */
189                         char *ns, *tok;
190                        
191                         ns = talloc_strdup(ctdb, nodestring);
192                         tok = strtok(ns, ",");
193                         while (tok != NULL) {
194                                 uint32_t pnn;
195                                 i = (uint32_t)strtoul(tok, NULL, 0);
196                                 if (i >= nodemap->num) {
197                                         DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
198                                         exit(ERR_NONODE);
199                                 }
200                                 if ((nodemap->nodes[i].flags & 
201                                      (NODE_FLAGS_DISCONNECTED |
202                                       NODE_FLAGS_DELETED)) && !dd_ok) {
203                                         DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
204                                         exit(ERR_DISNODE);
205                                 }
206                                 if (!ctdb_getpnn(ctdb_connection, i, &pnn)) {
207                                         DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
208                                         exit(10);
209                                 }
210
211                                 *nodes = talloc_realloc(ctdb, *nodes,
212                                                         uint32_t, n+1);
213                                 CTDB_NOMEM_ABORT(*nodes);
214
215                                 (*nodes)[n] = i;
216                                 n++;
217
218                                 tok = strtok(NULL, ",");
219                         }
220                         talloc_free(ns);
221
222                         if (n == 1) {
223                                 *pnn_mode = (*nodes)[0];
224                         } else {
225                                 *pnn_mode = CTDB_MULTICAST;
226                         }
227                 }
228         } else {
229                 /* default - no nodes specified */
230                 *nodes = talloc_array(ctdb, uint32_t, 1);
231                 CTDB_NOMEM_ABORT(*nodes);
232                 *pnn_mode = CTDB_CURRENT_NODE;
233
234                 if (!ctdb_getpnn(ctdb_connection, current_pnn,
235                                  &((*nodes)[0]))) {
236                         return false;
237                 }
238         }
239
240         ctdb_free_nodemap(nodemap);
241
242         return true;
243 }
244
245 /*
246  check if a database exists
247 */
248 static int db_exists(struct ctdb_context *ctdb, const char *db_name, bool *persistent)
249 {
250         int i, ret;
251         struct ctdb_dbid_map *dbmap=NULL;
252
253         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
254         if (ret != 0) {
255                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
256                 return -1;
257         }
258
259         for(i=0;i<dbmap->num;i++){
260                 const char *name;
261
262                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
263                 if (!strcmp(name, db_name)) {
264                         if (persistent) {
265                                 *persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
266                         }
267                         return 0;
268                 }
269         }
270
271         return -1;
272 }
273
274 /*
275   see if a process exists
276  */
277 static int control_process_exists(struct ctdb_context *ctdb, int argc, const char **argv)
278 {
279         uint32_t pnn, pid;
280         int ret;
281         if (argc < 1) {
282                 usage();
283         }
284
285         if (sscanf(argv[0], "%u:%u", &pnn, &pid) != 2) {
286                 DEBUG(DEBUG_ERR, ("Badly formed pnn:pid\n"));
287                 return -1;
288         }
289
290         ret = ctdb_ctrl_process_exists(ctdb, pnn, pid);
291         if (ret == 0) {
292                 printf("%u:%u exists\n", pnn, pid);
293         } else {
294                 printf("%u:%u does not exist\n", pnn, pid);
295         }
296         return ret;
297 }
298
299 /*
300   display statistics structure
301  */
302 static void show_statistics(struct ctdb_statistics *s, int show_header)
303 {
304         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
305         int i;
306         const char *prefix=NULL;
307         int preflen=0;
308         int tmp, days, hours, minutes, seconds;
309         const struct {
310                 const char *name;
311                 uint32_t offset;
312         } fields[] = {
313 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
314                 STATISTICS_FIELD(num_clients),
315                 STATISTICS_FIELD(frozen),
316                 STATISTICS_FIELD(recovering),
317                 STATISTICS_FIELD(num_recoveries),
318                 STATISTICS_FIELD(client_packets_sent),
319                 STATISTICS_FIELD(client_packets_recv),
320                 STATISTICS_FIELD(node_packets_sent),
321                 STATISTICS_FIELD(node_packets_recv),
322                 STATISTICS_FIELD(keepalive_packets_sent),
323                 STATISTICS_FIELD(keepalive_packets_recv),
324                 STATISTICS_FIELD(node.req_call),
325                 STATISTICS_FIELD(node.reply_call),
326                 STATISTICS_FIELD(node.req_dmaster),
327                 STATISTICS_FIELD(node.reply_dmaster),
328                 STATISTICS_FIELD(node.reply_error),
329                 STATISTICS_FIELD(node.req_message),
330                 STATISTICS_FIELD(node.req_control),
331                 STATISTICS_FIELD(node.reply_control),
332                 STATISTICS_FIELD(client.req_call),
333                 STATISTICS_FIELD(client.req_message),
334                 STATISTICS_FIELD(client.req_control),
335                 STATISTICS_FIELD(timeouts.call),
336                 STATISTICS_FIELD(timeouts.control),
337                 STATISTICS_FIELD(timeouts.traverse),
338                 STATISTICS_FIELD(total_calls),
339                 STATISTICS_FIELD(pending_calls),
340                 STATISTICS_FIELD(lockwait_calls),
341                 STATISTICS_FIELD(pending_lockwait_calls),
342                 STATISTICS_FIELD(childwrite_calls),
343                 STATISTICS_FIELD(pending_childwrite_calls),
344                 STATISTICS_FIELD(memory_used),
345                 STATISTICS_FIELD(max_hop_count),
346                 STATISTICS_FIELD(total_ro_delegations),
347                 STATISTICS_FIELD(total_ro_revokes),
348         };
349         tmp = s->statistics_current_time.tv_sec - s->statistics_start_time.tv_sec;
350         seconds = tmp%60;
351         tmp    /= 60;
352         minutes = tmp%60;
353         tmp    /= 60;
354         hours   = tmp%24;
355         tmp    /= 24;
356         days    = tmp;
357
358         if (options.machinereadable){
359                 if (show_header) {
360                         printf("CTDB version:");
361                         printf("Current time of statistics:");
362                         printf("Statistics collected since:");
363                         for (i=0;i<ARRAY_SIZE(fields);i++) {
364                                 printf("%s:", fields[i].name);
365                         }
366                         printf("num_reclock_ctdbd_latency:");
367                         printf("min_reclock_ctdbd_latency:");
368                         printf("avg_reclock_ctdbd_latency:");
369                         printf("max_reclock_ctdbd_latency:");
370
371                         printf("num_reclock_recd_latency:");
372                         printf("min_reclock_recd_latency:");
373                         printf("avg_reclock_recd_latency:");
374                         printf("max_reclock_recd_latency:");
375
376                         printf("num_call_latency:");
377                         printf("min_call_latency:");
378                         printf("avg_call_latency:");
379                         printf("max_call_latency:");
380
381                         printf("num_lockwait_latency:");
382                         printf("min_lockwait_latency:");
383                         printf("avg_lockwait_latency:");
384                         printf("max_lockwait_latency:");
385
386                         printf("num_childwrite_latency:");
387                         printf("min_childwrite_latency:");
388                         printf("avg_childwrite_latency:");
389                         printf("max_childwrite_latency:");
390                         printf("\n");
391                 }
392                 printf("%d:", CTDB_VERSION);
393                 printf("%d:", (int)s->statistics_current_time.tv_sec);
394                 printf("%d:", (int)s->statistics_start_time.tv_sec);
395                 for (i=0;i<ARRAY_SIZE(fields);i++) {
396                         printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
397                 }
398                 printf("%d:", s->reclock.ctdbd.num);
399                 printf("%.6f:", s->reclock.ctdbd.min);
400                 printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
401                 printf("%.6f:", s->reclock.ctdbd.max);
402
403                 printf("%d:", s->reclock.recd.num);
404                 printf("%.6f:", s->reclock.recd.min);
405                 printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
406                 printf("%.6f:", s->reclock.recd.max);
407
408                 printf("%d:", s->call_latency.num);
409                 printf("%.6f:", s->call_latency.min);
410                 printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
411                 printf("%.6f:", s->call_latency.max);
412
413                 printf("%d:", s->lockwait_latency.num);
414                 printf("%.6f:", s->lockwait_latency.min);
415                 printf("%.6f:", s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0);
416                 printf("%.6f:", s->lockwait_latency.max);
417
418                 printf("%d:", s->childwrite_latency.num);
419                 printf("%.6f:", s->childwrite_latency.min);
420                 printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
421                 printf("%.6f:", s->childwrite_latency.max);
422                 printf("\n");
423         } else {
424                 printf("CTDB version %u\n", CTDB_VERSION);
425                 printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
426                 printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&s->statistics_start_time.tv_sec));
427
428                 for (i=0;i<ARRAY_SIZE(fields);i++) {
429                         if (strchr(fields[i].name, '.')) {
430                                 preflen = strcspn(fields[i].name, ".")+1;
431                                 if (!prefix || strncmp(prefix, fields[i].name, preflen) != 0) {
432                                         prefix = fields[i].name;
433                                         printf(" %*.*s\n", preflen-1, preflen-1, fields[i].name);
434                                 }
435                         } else {
436                                 preflen = 0;
437                         }
438                         printf(" %*s%-22s%*s%10u\n", 
439                                preflen?4:0, "",
440                                fields[i].name+preflen, 
441                                preflen?0:4, "",
442                                *(uint32_t *)(fields[i].offset+(uint8_t *)s));
443                 }
444                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd       MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
445
446                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
447
448                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
449                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "lockwait_latency   MIN/AVG/MAX", s->lockwait_latency.min, s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0, s->lockwait_latency.max, s->lockwait_latency.num);
450                 printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
451         }
452
453         talloc_free(tmp_ctx);
454 }
455
456 /*
457   display remote ctdb statistics combined from all nodes
458  */
459 static int control_statistics_all(struct ctdb_context *ctdb)
460 {
461         int ret, i;
462         struct ctdb_statistics statistics;
463         uint32_t *nodes;
464         uint32_t num_nodes;
465
466         nodes = ctdb_get_connected_nodes(ctdb, TIMELIMIT(), ctdb, &num_nodes);
467         CTDB_NO_MEMORY(ctdb, nodes);
468         
469         ZERO_STRUCT(statistics);
470
471         for (i=0;i<num_nodes;i++) {
472                 struct ctdb_statistics s1;
473                 int j;
474                 uint32_t *v1 = (uint32_t *)&s1;
475                 uint32_t *v2 = (uint32_t *)&statistics;
476                 uint32_t num_ints = 
477                         offsetof(struct ctdb_statistics, __last_counter) / sizeof(uint32_t);
478                 ret = ctdb_ctrl_statistics(ctdb, nodes[i], &s1);
479                 if (ret != 0) {
480                         DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", nodes[i]));
481                         return ret;
482                 }
483                 for (j=0;j<num_ints;j++) {
484                         v2[j] += v1[j];
485                 }
486                 statistics.max_hop_count = 
487                         MAX(statistics.max_hop_count, s1.max_hop_count);
488                 statistics.call_latency.max = 
489                         MAX(statistics.call_latency.max, s1.call_latency.max);
490                 statistics.lockwait_latency.max = 
491                         MAX(statistics.lockwait_latency.max, s1.lockwait_latency.max);
492         }
493         talloc_free(nodes);
494         printf("Gathered statistics for %u nodes\n", num_nodes);
495         show_statistics(&statistics, 1);
496         return 0;
497 }
498
499 /*
500   display remote ctdb statistics
501  */
502 static int control_statistics(struct ctdb_context *ctdb, int argc, const char **argv)
503 {
504         int ret;
505         struct ctdb_statistics statistics;
506
507         if (options.pnn == CTDB_BROADCAST_ALL) {
508                 return control_statistics_all(ctdb);
509         }
510
511         ret = ctdb_ctrl_statistics(ctdb, options.pnn, &statistics);
512         if (ret != 0) {
513                 DEBUG(DEBUG_ERR, ("Unable to get statistics from node %u\n", options.pnn));
514                 return ret;
515         }
516         show_statistics(&statistics, 1);
517         return 0;
518 }
519
520
521 /*
522   reset remote ctdb statistics
523  */
524 static int control_statistics_reset(struct ctdb_context *ctdb, int argc, const char **argv)
525 {
526         int ret;
527
528         ret = ctdb_statistics_reset(ctdb, options.pnn);
529         if (ret != 0) {
530                 DEBUG(DEBUG_ERR, ("Unable to reset statistics on node %u\n", options.pnn));
531                 return ret;
532         }
533         return 0;
534 }
535
536
537 /*
538   display remote ctdb rolling statistics
539  */
540 static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
541 {
542         int ret;
543         struct ctdb_statistics_wire *stats;
544         int i, num_records = -1;
545
546         if (argc ==1) {
547                 num_records = atoi(argv[0]) - 1;
548         }
549
550         ret = ctdb_ctrl_getstathistory(ctdb, TIMELIMIT(), options.pnn, ctdb, &stats);
551         if (ret != 0) {
552                 DEBUG(DEBUG_ERR, ("Unable to get rolling statistics from node %u\n", options.pnn));
553                 return ret;
554         }
555         for (i=0;i<stats->num;i++) {
556                 if (stats->stats[i].statistics_start_time.tv_sec == 0) {
557                         continue;
558                 }
559                 show_statistics(&stats->stats[i], i==0);
560                 if (i == num_records) {
561                         break;
562                 }
563         }
564         return 0;
565 }
566
567
568 /*
569   display uptime of remote node
570  */
571 static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv)
572 {
573         int ret;
574         struct ctdb_uptime *uptime = NULL;
575         int tmp, days, hours, minutes, seconds;
576
577         ret = ctdb_ctrl_uptime(ctdb, ctdb, TIMELIMIT(), options.pnn, &uptime);
578         if (ret != 0) {
579                 DEBUG(DEBUG_ERR, ("Unable to get uptime from node %u\n", options.pnn));
580                 return ret;
581         }
582
583         if (options.machinereadable){
584                 printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
585                 printf(":%u:%u:%u:%lf\n",
586                         (unsigned int)uptime->current_time.tv_sec,
587                         (unsigned int)uptime->ctdbd_start_time.tv_sec,
588                         (unsigned int)uptime->last_recovery_finished.tv_sec,
589                         timeval_delta(&uptime->last_recovery_finished,
590                                       &uptime->last_recovery_started)
591                 );
592                 return 0;
593         }
594
595         printf("Current time of node          :                %s", ctime(&uptime->current_time.tv_sec));
596
597         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
598         seconds = tmp%60;
599         tmp    /= 60;
600         minutes = tmp%60;
601         tmp    /= 60;
602         hours   = tmp%24;
603         tmp    /= 24;
604         days    = tmp;
605         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->ctdbd_start_time.tv_sec));
606
607         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
608         seconds = tmp%60;
609         tmp    /= 60;
610         minutes = tmp%60;
611         tmp    /= 60;
612         hours   = tmp%24;
613         tmp    /= 24;
614         days    = tmp;
615         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s", days, hours, minutes, seconds, ctime(&uptime->last_recovery_finished.tv_sec));
616         
617         printf("Duration of last recovery/failover: %lf seconds\n",
618                 timeval_delta(&uptime->last_recovery_finished,
619                               &uptime->last_recovery_started));
620
621         return 0;
622 }
623
624 /*
625   show the PNN of the current node
626  */
627 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
628 {
629         uint32_t mypnn;
630         bool ret;
631
632         ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
633         if (!ret) {
634                 DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
635                 return -1;
636         }
637
638         printf("PNN:%d\n", mypnn);
639         return 0;
640 }
641
642
643 struct pnn_node {
644         struct pnn_node *next;
645         const char *addr;
646         int pnn;
647 };
648
649 static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
650 {
651         const char *nodes_list;
652         int nlines;
653         char **lines;
654         int i, pnn;
655         struct pnn_node *pnn_nodes = NULL;
656         struct pnn_node *pnn_node;
657         struct pnn_node *tmp_node;
658
659         /* read the nodes file */
660         nodes_list = getenv("CTDB_NODES");
661         if (nodes_list == NULL) {
662                 nodes_list = "/etc/ctdb/nodes";
663         }
664         lines = file_lines_load(nodes_list, &nlines, mem_ctx);
665         if (lines == NULL) {
666                 return NULL;
667         }
668         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
669                 nlines--;
670         }
671         for (i=0, pnn=0; i<nlines; i++) {
672                 char *node;
673
674                 node = lines[i];
675                 /* strip leading spaces */
676                 while((*node == ' ') || (*node == '\t')) {
677                         node++;
678                 }
679                 if (*node == '#') {
680                         pnn++;
681                         continue;
682                 }
683                 if (strcmp(node, "") == 0) {
684                         continue;
685                 }
686                 pnn_node = talloc(mem_ctx, struct pnn_node);
687                 pnn_node->pnn = pnn++;
688                 pnn_node->addr = talloc_strdup(pnn_node, node);
689                 pnn_node->next = pnn_nodes;
690                 pnn_nodes = pnn_node;
691         }
692
693         /* swap them around so we return them in incrementing order */
694         pnn_node = pnn_nodes;
695         pnn_nodes = NULL;
696         while (pnn_node) {
697                 tmp_node = pnn_node;
698                 pnn_node = pnn_node->next;
699
700                 tmp_node->next = pnn_nodes;
701                 pnn_nodes = tmp_node;
702         }
703
704         return pnn_nodes;
705 }
706
707 /*
708   show the PNN of the current node
709   discover the pnn by loading the nodes file and try to bind to all
710   addresses one at a time until the ip address is found.
711  */
712 static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
713 {
714         TALLOC_CTX *mem_ctx = talloc_new(NULL);
715         struct pnn_node *pnn_nodes;
716         struct pnn_node *pnn_node;
717
718         pnn_nodes = read_nodes_file(mem_ctx);
719         if (pnn_nodes == NULL) {
720                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
721                 talloc_free(mem_ctx);
722                 return -1;
723         }
724
725         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
726                 ctdb_sock_addr addr;
727
728                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
729                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
730                         talloc_free(mem_ctx);
731                         return -1;
732                 }
733
734                 if (ctdb_sys_have_ip(&addr)) {
735                         printf("PNN:%d\n", pnn_node->pnn);
736                         talloc_free(mem_ctx);
737                         return 0;
738                 }
739         }
740
741         printf("Failed to detect which PNN this node is\n");
742         talloc_free(mem_ctx);
743         return -1;
744 }
745
746 /* Helpers for ctdb status
747  */
748 static bool is_partially_online(struct ctdb_node_and_flags *node)
749 {
750         int j;
751         bool ret = false;
752
753         if (node->flags == 0) {
754                 struct ctdb_ifaces_list *ifaces;
755
756                 if (ctdb_getifaces(ctdb_connection, node->pnn, &ifaces)) {
757                         for (j=0; j < ifaces->num; j++) {
758                                 if (ifaces->ifaces[j].link_state != 0) {
759                                         continue;
760                                 }
761                                 ret = true;
762                                 break;
763                         }
764                         ctdb_free_ifaces(ifaces);
765                 }
766         }
767
768         return ret;
769 }
770
771 static int control_status_1_machine(int mypnn, struct ctdb_node_and_flags *node)
772 {
773         printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
774                ctdb_addr_to_str(&node->addr),
775                !!(node->flags&NODE_FLAGS_DISCONNECTED),
776                !!(node->flags&NODE_FLAGS_BANNED),
777                !!(node->flags&NODE_FLAGS_PERMANENTLY_DISABLED),
778                !!(node->flags&NODE_FLAGS_UNHEALTHY),
779                !!(node->flags&NODE_FLAGS_STOPPED),
780                !!(node->flags&NODE_FLAGS_INACTIVE),
781                is_partially_online(node) ? 1 : 0,
782                (node->pnn == mypnn)?'Y':'N');
783
784         return node->flags;
785 }
786
787 static int control_status_1_human(int mypnn, struct ctdb_node_and_flags *node)
788 {
789        printf("pnn:%d %-16s %s%s\n", node->pnn,
790               ctdb_addr_to_str(&node->addr),
791               is_partially_online(node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
792               node->pnn == mypnn?" (THIS NODE)":"");
793
794        return node->flags;
795 }
796
797 /*
798   display remote ctdb status
799  */
800 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
801 {
802         int i;
803         struct ctdb_vnn_map *vnnmap=NULL;
804         struct ctdb_node_map *nodemap=NULL;
805         uint32_t recmode, recmaster, mypnn;
806
807         if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
808                 return -1;
809         }
810
811         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
812                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
813                 return -1;
814         }
815
816         if (options.machinereadable) {
817                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
818                        ":Inactive:PartiallyOnline:ThisNode:\n");
819                 for (i=0;i<nodemap->num;i++) {
820                         if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
821                                 continue;
822                         }
823                         (void) control_status_1_machine(mypnn,
824                                                         &nodemap->nodes[i]);
825                 }
826                 return 0;
827         }
828
829         printf("Number of nodes:%d\n", nodemap->num);
830         for(i=0;i<nodemap->num;i++){
831                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
832                         continue;
833                 }
834                 (void) control_status_1_human(mypnn, &nodemap->nodes[i]);
835         }
836
837         if (!ctdb_getvnnmap(ctdb_connection, options.pnn, &vnnmap)) {
838                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
839                 return -1;
840         }
841         if (vnnmap->generation == INVALID_GENERATION) {
842                 printf("Generation:INVALID\n");
843         } else {
844                 printf("Generation:%d\n",vnnmap->generation);
845         }
846         printf("Size:%d\n",vnnmap->size);
847         for(i=0;i<vnnmap->size;i++){
848                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
849         }
850         ctdb_free_vnnmap(vnnmap);
851
852         if (!ctdb_getrecmode(ctdb_connection, options.pnn, &recmode)) {
853                 DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
854                 return -1;
855         }
856         printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
857
858         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
859                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
860                 return -1;
861         }
862         printf("Recovery master:%d\n",recmaster);
863
864         return 0;
865 }
866
867 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
868 {
869         int i, ret;
870         struct ctdb_node_map *nodemap=NULL;
871         uint32_t * nodes;
872         uint32_t pnn_mode, mypnn;
873
874         if (argc > 1) {
875                 usage();
876         }
877
878         if (!parse_nodestring(ctdb, argc == 1 ? argv[0] : NULL,
879                               options.pnn, true, &nodes, &pnn_mode)) {
880                 return -1;
881         }
882
883         if (options.machinereadable) {
884                 printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
885                        ":Inactive:PartiallyOnline:ThisNode:\n");
886         } else if (pnn_mode == CTDB_BROADCAST_ALL) {
887                 printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
888         }
889
890         if (!ctdb_getpnn(ctdb_connection, CTDB_CURRENT_NODE, &mypnn)) {
891                 DEBUG(DEBUG_ERR, ("Unable to get PNN from local node\n"));
892                 return -1;
893         }
894
895         if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
896                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
897                 return -1;
898         }
899
900         ret = 0;
901
902         for (i = 0; i < talloc_array_length(nodes); i++) {
903                 if (options.machinereadable) {
904                         ret |= control_status_1_machine(mypnn,
905                                                         &nodemap->nodes[nodes[i]]);
906                 } else {
907                         ret |= control_status_1_human(mypnn,
908                                                       &nodemap->nodes[nodes[i]]);
909                 }
910         }
911         return ret;
912 }
913
914 struct natgw_node {
915         struct natgw_node *next;
916         const char *addr;
917 };
918
919 /*
920   display the list of nodes belonging to this natgw configuration
921  */
922 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
923 {
924         int i, ret;
925         uint32_t capabilities;
926         const char *natgw_list;
927         int nlines;
928         char **lines;
929         struct natgw_node *natgw_nodes = NULL;
930         struct natgw_node *natgw_node;
931         struct ctdb_node_map *nodemap=NULL;
932
933
934         /* read the natgw nodes file into a linked list */
935         natgw_list = getenv("NATGW_NODES");
936         if (natgw_list == NULL) {
937                 natgw_list = "/etc/ctdb/natgw_nodes";
938         }
939         lines = file_lines_load(natgw_list, &nlines, ctdb);
940         if (lines == NULL) {
941                 ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
942                 return -1;
943         }
944         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
945                 nlines--;
946         }
947         for (i=0;i<nlines;i++) {
948                 char *node;
949
950                 node = lines[i];
951                 /* strip leading spaces */
952                 while((*node == ' ') || (*node == '\t')) {
953                         node++;
954                 }
955                 if (*node == '#') {
956                         continue;
957                 }
958                 if (strcmp(node, "") == 0) {
959                         continue;
960                 }
961                 natgw_node = talloc(ctdb, struct natgw_node);
962                 natgw_node->addr = talloc_strdup(natgw_node, node);
963                 CTDB_NO_MEMORY(ctdb, natgw_node->addr);
964                 natgw_node->next = natgw_nodes;
965                 natgw_nodes = natgw_node;
966         }
967
968         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
969         if (ret != 0) {
970                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
971                 return ret;
972         }
973
974         i=0;
975         while(i<nodemap->num) {
976                 for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
977                         if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
978                                 break;
979                         }
980                 }
981
982                 /* this node was not in the natgw so we just remove it from
983                  * the list
984                  */
985                 if ((natgw_node == NULL) 
986                 ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
987                         int j;
988
989                         for (j=i+1; j<nodemap->num; j++) {
990                                 nodemap->nodes[j-1] = nodemap->nodes[j];
991                         }
992                         nodemap->num--;
993                         continue;
994                 }
995
996                 i++;
997         }               
998
999         /* pick a node to be natgwmaster
1000          * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
1001          */
1002         for(i=0;i<nodemap->num;i++){
1003                 if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
1004                         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
1005                         if (ret != 0) {
1006                                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1007                                 return ret;
1008                         }
1009                         if (!(capabilities&CTDB_CAP_NATGW)) {
1010                                 continue;
1011                         }
1012                         printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
1013                         break;
1014                 }
1015         }
1016         /* we couldnt find any healthy node, try unhealthy ones */
1017         if (i == nodemap->num) {
1018                 for(i=0;i<nodemap->num;i++){
1019                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
1020                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
1021                                 if (ret != 0) {
1022                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1023                                         return ret;
1024                                 }
1025                                 if (!(capabilities&CTDB_CAP_NATGW)) {
1026                                         continue;
1027                                 }
1028                                 printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
1029                                 break;
1030                         }
1031                 }
1032         }
1033         /* unless all nodes are STOPPED, when we pick one anyway */
1034         if (i == nodemap->num) {
1035                 for(i=0;i<nodemap->num;i++){
1036                         if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
1037                                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
1038                                 if (ret != 0) {
1039                                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
1040                                         return ret;
1041                                 }
1042                                 if (!(capabilities&CTDB_CAP_NATGW)) {
1043                                         continue;
1044                                 }
1045                                 printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
1046                                 break;
1047                         }
1048                 }
1049                 /* or if we still can not find any */
1050                 if (i == nodemap->num) {
1051                         printf("-1 0.0.0.0\n");
1052                         ret = 2; /* matches ENOENT */
1053                 }
1054         }
1055
1056         /* print the pruned list of nodes belonging to this natgw list */
1057         for(i=0;i<nodemap->num;i++){
1058                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1059                         continue;
1060                 }
1061                 printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
1062                         ctdb_addr_to_str(&nodemap->nodes[i].addr),
1063                        !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
1064                        !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
1065                        !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
1066                        !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
1067                        !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
1068         }
1069
1070         return ret;
1071 }
1072
1073 /*
1074   display the status of the scripts for monitoring (or other events)
1075  */
1076 static int control_one_scriptstatus(struct ctdb_context *ctdb,
1077                                     enum ctdb_eventscript_call type)
1078 {
1079         struct ctdb_scripts_wire *script_status;
1080         int ret, i;
1081
1082         ret = ctdb_ctrl_getscriptstatus(ctdb, TIMELIMIT(), options.pnn, ctdb, type, &script_status);
1083         if (ret != 0) {
1084                 DEBUG(DEBUG_ERR, ("Unable to get script status from node %u\n", options.pnn));
1085                 return ret;
1086         }
1087
1088         if (script_status == NULL) {
1089                 if (!options.machinereadable) {
1090                         printf("%s cycle never run\n",
1091                                ctdb_eventscript_call_names[type]);
1092                 }
1093                 return 0;
1094         }
1095
1096         if (!options.machinereadable) {
1097                 printf("%d scripts were executed last %s cycle\n",
1098                        script_status->num_scripts,
1099                        ctdb_eventscript_call_names[type]);
1100         }
1101         for (i=0; i<script_status->num_scripts; i++) {
1102                 const char *status = NULL;
1103
1104                 switch (script_status->scripts[i].status) {
1105                 case -ETIME:
1106                         status = "TIMEDOUT";
1107                         break;
1108                 case -ENOEXEC:
1109                         status = "DISABLED";
1110                         break;
1111                 case 0:
1112                         status = "OK";
1113                         break;
1114                 default:
1115                         if (script_status->scripts[i].status > 0)
1116                                 status = "ERROR";
1117                         break;
1118                 }
1119                 if (options.machinereadable) {
1120                         printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
1121                                ctdb_eventscript_call_names[type],
1122                                script_status->scripts[i].name,
1123                                script_status->scripts[i].status,
1124                                status,
1125                                (long)script_status->scripts[i].start.tv_sec,
1126                                (long)script_status->scripts[i].start.tv_usec,
1127                                (long)script_status->scripts[i].finished.tv_sec,
1128                                (long)script_status->scripts[i].finished.tv_usec,
1129                                script_status->scripts[i].output);
1130                         continue;
1131                 }
1132                 if (status)
1133                         printf("%-20s Status:%s    ",
1134                                script_status->scripts[i].name, status);
1135                 else
1136                         /* Some other error, eg from stat. */
1137                         printf("%-20s Status:CANNOT RUN (%s)",
1138                                script_status->scripts[i].name,
1139                                strerror(-script_status->scripts[i].status));
1140
1141                 if (script_status->scripts[i].status >= 0) {
1142                         printf("Duration:%.3lf ",
1143                         timeval_delta(&script_status->scripts[i].finished,
1144                               &script_status->scripts[i].start));
1145                 }
1146                 if (script_status->scripts[i].status != -ENOEXEC) {
1147                         printf("%s",
1148                                ctime(&script_status->scripts[i].start.tv_sec));
1149                         if (script_status->scripts[i].status != 0) {
1150                                 printf("   OUTPUT:%s\n",
1151                                        script_status->scripts[i].output);
1152                         }
1153                 } else {
1154                         printf("\n");
1155                 }
1156         }
1157         return 0;
1158 }
1159
1160
1161 static int control_scriptstatus(struct ctdb_context *ctdb,
1162                                 int argc, const char **argv)
1163 {
1164         int ret;
1165         enum ctdb_eventscript_call type, min, max;
1166         const char *arg;
1167
1168         if (argc > 1) {
1169                 DEBUG(DEBUG_ERR, ("Unknown arguments to scriptstatus\n"));
1170                 return -1;
1171         }
1172
1173         if (argc == 0)
1174                 arg = ctdb_eventscript_call_names[CTDB_EVENT_MONITOR];
1175         else
1176                 arg = argv[0];
1177
1178         for (type = 0; type < CTDB_EVENT_MAX; type++) {
1179                 if (strcmp(arg, ctdb_eventscript_call_names[type]) == 0) {
1180                         min = type;
1181                         max = type+1;
1182                         break;
1183                 }
1184         }
1185         if (type == CTDB_EVENT_MAX) {
1186                 if (strcmp(arg, "all") == 0) {
1187                         min = 0;
1188                         max = CTDB_EVENT_MAX;
1189                 } else {
1190                         DEBUG(DEBUG_ERR, ("Unknown event type %s\n", argv[0]));
1191                         return -1;
1192                 }
1193         }
1194
1195         if (options.machinereadable) {
1196                 printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
1197         }
1198
1199         for (type = min; type < max; type++) {
1200                 ret = control_one_scriptstatus(ctdb, type);
1201                 if (ret != 0) {
1202                         return ret;
1203                 }
1204         }
1205
1206         return 0;
1207 }
1208
1209 /*
1210   enable an eventscript
1211  */
1212 static int control_enablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1213 {
1214         int ret;
1215
1216         if (argc < 1) {
1217                 usage();
1218         }
1219
1220         ret = ctdb_ctrl_enablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1221         if (ret != 0) {
1222           DEBUG(DEBUG_ERR, ("Unable to enable script %s on node %u\n", argv[0], options.pnn));
1223                 return ret;
1224         }
1225
1226         return 0;
1227 }
1228
1229 /*
1230   disable an eventscript
1231  */
1232 static int control_disablescript(struct ctdb_context *ctdb, int argc, const char **argv)
1233 {
1234         int ret;
1235
1236         if (argc < 1) {
1237                 usage();
1238         }
1239
1240         ret = ctdb_ctrl_disablescript(ctdb, TIMELIMIT(), options.pnn, argv[0]);
1241         if (ret != 0) {
1242           DEBUG(DEBUG_ERR, ("Unable to disable script %s on node %u\n", argv[0], options.pnn));
1243                 return ret;
1244         }
1245
1246         return 0;
1247 }
1248
1249 /*
1250   display the pnn of the recovery master
1251  */
1252 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
1253 {
1254         uint32_t recmaster;
1255
1256         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
1257                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1258                 return -1;
1259         }
1260         printf("%d\n",recmaster);
1261
1262         return 0;
1263 }
1264
1265 /*
1266   add a tickle to a public address
1267  */
1268 static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1269 {
1270         struct ctdb_tcp_connection t;
1271         TDB_DATA data;
1272         int ret;
1273
1274         if (argc < 2) {
1275                 usage();
1276         }
1277
1278         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1279                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1280                 return -1;
1281         }
1282         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1283                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1284                 return -1;
1285         }
1286
1287         data.dptr = (uint8_t *)&t;
1288         data.dsize = sizeof(t);
1289
1290         /* tell all nodes about this tcp connection */
1291         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_ADD_DELAYED_UPDATE,
1292                            0, data, ctdb, NULL, NULL, NULL, NULL);
1293         if (ret != 0) {
1294                 DEBUG(DEBUG_ERR,("Failed to add tickle\n"));
1295                 return -1;
1296         }
1297         
1298         return 0;
1299 }
1300
1301
1302 /*
1303   delete a tickle from a node
1304  */
1305 static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **argv)
1306 {
1307         struct ctdb_tcp_connection t;
1308         TDB_DATA data;
1309         int ret;
1310
1311         if (argc < 2) {
1312                 usage();
1313         }
1314
1315         if (parse_ip_port(argv[0], &t.src_addr) == 0) {
1316                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1317                 return -1;
1318         }
1319         if (parse_ip_port(argv[1], &t.dst_addr) == 0) {
1320                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[1]));
1321                 return -1;
1322         }
1323
1324         data.dptr = (uint8_t *)&t;
1325         data.dsize = sizeof(t);
1326
1327         /* tell all nodes about this tcp connection */
1328         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_TCP_REMOVE,
1329                            0, data, ctdb, NULL, NULL, NULL, NULL);
1330         if (ret != 0) {
1331                 DEBUG(DEBUG_ERR,("Failed to remove tickle\n"));
1332                 return -1;
1333         }
1334         
1335         return 0;
1336 }
1337
1338
1339 /*
1340   get a list of all tickles for this pnn
1341  */
1342 static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char **argv)
1343 {
1344         struct ctdb_control_tcp_tickle_list *list;
1345         ctdb_sock_addr addr;
1346         int i, ret;
1347         unsigned port = 0;
1348
1349         if (argc < 1) {
1350                 usage();
1351         }
1352
1353         if (argc == 2) {
1354                 port = atoi(argv[1]);
1355         }
1356
1357         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1358                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1359                 return -1;
1360         }
1361
1362         ret = ctdb_ctrl_get_tcp_tickles(ctdb, TIMELIMIT(), options.pnn, ctdb, &addr, &list);
1363         if (ret == -1) {
1364                 DEBUG(DEBUG_ERR, ("Unable to list tickles\n"));
1365                 return -1;
1366         }
1367
1368         if (options.machinereadable){
1369                 printf(":source ip:port:destination ip:port:\n");
1370                 for (i=0;i<list->tickles.num;i++) {
1371                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1372                                 continue;
1373                         }
1374                         printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1375                         printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1376                 }
1377         } else {
1378                 printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
1379                 printf("Num tickles:%u\n", list->tickles.num);
1380                 for (i=0;i<list->tickles.num;i++) {
1381                         if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
1382                                 continue;
1383                         }
1384                         printf("SRC: %s:%u   ", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
1385                         printf("DST: %s:%u\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
1386                 }
1387         }
1388
1389         talloc_free(list);
1390         
1391         return 0;
1392 }
1393
1394
1395 static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
1396 {
1397         struct ctdb_all_public_ips *ips;
1398         struct ctdb_public_ip ip;
1399         int i, ret;
1400         uint32_t *nodes;
1401         uint32_t disable_time;
1402         TDB_DATA data;
1403         struct ctdb_node_map *nodemap=NULL;
1404         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1405
1406         disable_time = 30;
1407         data.dptr  = (uint8_t*)&disable_time;
1408         data.dsize = sizeof(disable_time);
1409         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
1410         if (ret != 0) {
1411                 DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
1412                 return -1;
1413         }
1414
1415
1416
1417         /* read the public ip list from the node */
1418         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), pnn, ctdb, &ips);
1419         if (ret != 0) {
1420                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", pnn));
1421                 talloc_free(tmp_ctx);
1422                 return -1;
1423         }
1424
1425         for (i=0;i<ips->num;i++) {
1426                 if (ctdb_same_ip(addr, &ips->ips[i].addr)) {
1427                         break;
1428                 }
1429         }
1430         if (i==ips->num) {
1431                 DEBUG(DEBUG_ERR, ("Node %u can not host ip address '%s'\n",
1432                         pnn, ctdb_addr_to_str(addr)));
1433                 talloc_free(tmp_ctx);
1434                 return -1;
1435         }
1436
1437         ip.pnn  = pnn;
1438         ip.addr = *addr;
1439
1440         data.dptr  = (uint8_t *)&ip;
1441         data.dsize = sizeof(ip);
1442
1443         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1446                 talloc_free(tmp_ctx);
1447                 return ret;
1448         }
1449
1450         nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
1451         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
1452                                         nodes, 0,
1453                                         LONGTIMELIMIT(),
1454                                         false, data,
1455                                         NULL, NULL,
1456                                         NULL);
1457         if (ret != 0) {
1458                 DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
1459                 talloc_free(tmp_ctx);
1460                 return -1;
1461         }
1462
1463         ret = ctdb_ctrl_takeover_ip(ctdb, LONGTIMELIMIT(), pnn, &ip);
1464         if (ret != 0) {
1465                 DEBUG(DEBUG_ERR,("Failed to take over IP on node %d\n", pnn));
1466                 talloc_free(tmp_ctx);
1467                 return -1;
1468         }
1469
1470         /* update the recovery daemon so it now knows to expect the new
1471            node assignment for this ip.
1472         */
1473         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECD_UPDATE_IP, data);
1474         if (ret != 0) {
1475                 DEBUG(DEBUG_ERR,("Failed to send message to update the ip on the recovery master.\n"));
1476                 return -1;
1477         }
1478
1479         talloc_free(tmp_ctx);
1480         return 0;
1481 }
1482
1483 /*
1484   move/failover an ip address to a specific node
1485  */
1486 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
1487 {
1488         uint32_t pnn;
1489         int ret, retries = 0;
1490         ctdb_sock_addr addr;
1491
1492         if (argc < 2) {
1493                 usage();
1494                 return -1;
1495         }
1496
1497         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1498                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1499                 return -1;
1500         }
1501
1502
1503         if (sscanf(argv[1], "%u", &pnn) != 1) {
1504                 DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
1505                 return -1;
1506         }
1507
1508         do {
1509                 ret = move_ip(ctdb, &addr, pnn);
1510                 if (ret != 0) {
1511                         DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 second and try again.\n", pnn));
1512                         sleep(3);
1513                         retries++;
1514                 }
1515         } while (retries < 5 && ret != 0);
1516         if (ret != 0) {
1517                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", pnn));
1518                 return -1;
1519         }
1520
1521         return 0;
1522 }
1523
1524 static int getips_store_callback(void *param, void *data)
1525 {
1526         struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
1527         struct ctdb_all_public_ips *ips = param;
1528         int i;
1529
1530         i = ips->num++;
1531         ips->ips[i].pnn  = node_ip->pnn;
1532         ips->ips[i].addr = node_ip->addr;
1533         return 0;
1534 }
1535
1536 static int getips_count_callback(void *param, void *data)
1537 {
1538         uint32_t *count = param;
1539
1540         (*count)++;
1541         return 0;
1542 }
1543
1544 #define IP_KEYLEN       4
1545 static uint32_t *ip_key(ctdb_sock_addr *ip)
1546 {
1547         static uint32_t key[IP_KEYLEN];
1548
1549         bzero(key, sizeof(key));
1550
1551         switch (ip->sa.sa_family) {
1552         case AF_INET:
1553                 key[0]  = ip->ip.sin_addr.s_addr;
1554                 break;
1555         case AF_INET6:
1556                 key[0]  = ip->ip6.sin6_addr.s6_addr32[3];
1557                 key[1]  = ip->ip6.sin6_addr.s6_addr32[2];
1558                 key[2]  = ip->ip6.sin6_addr.s6_addr32[1];
1559                 key[3]  = ip->ip6.sin6_addr.s6_addr32[0];
1560                 break;
1561         default:
1562                 DEBUG(DEBUG_ERR, (__location__ " ERROR, unknown family passed :%u\n", ip->sa.sa_family));
1563                 return key;
1564         }
1565
1566         return key;
1567 }
1568
1569 static void *add_ip_callback(void *parm, void *data)
1570 {
1571         return parm;
1572 }
1573
1574 static int
1575 control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
1576 {
1577         struct ctdb_all_public_ips *tmp_ips;
1578         struct ctdb_node_map *nodemap=NULL;
1579         trbt_tree_t *ip_tree;
1580         int i, j, len, ret;
1581         uint32_t count;
1582
1583         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1584         if (ret != 0) {
1585                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1586                 return ret;
1587         }
1588
1589         ip_tree = trbt_create(tmp_ctx, 0);
1590
1591         for(i=0;i<nodemap->num;i++){
1592                 if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
1593                         continue;
1594                 }
1595                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1596                         continue;
1597                 }
1598
1599                 /* read the public ip list from this node */
1600                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
1601                 if (ret != 0) {
1602                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1603                         return -1;
1604                 }
1605         
1606                 for (j=0; j<tmp_ips->num;j++) {
1607                         struct ctdb_public_ip *node_ip;
1608
1609                         node_ip = talloc(tmp_ctx, struct ctdb_public_ip);
1610                         node_ip->pnn  = tmp_ips->ips[j].pnn;
1611                         node_ip->addr = tmp_ips->ips[j].addr;
1612
1613                         trbt_insertarray32_callback(ip_tree,
1614                                 IP_KEYLEN, ip_key(&tmp_ips->ips[j].addr),
1615                                 add_ip_callback,
1616                                 node_ip);
1617                 }
1618                 talloc_free(tmp_ips);
1619         }
1620
1621         /* traverse */
1622         count = 0;
1623         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_count_callback, &count);
1624
1625         len = offsetof(struct ctdb_all_public_ips, ips) + 
1626                 count*sizeof(struct ctdb_public_ip);
1627         tmp_ips = talloc_zero_size(tmp_ctx, len);
1628         trbt_traversearray32(ip_tree, IP_KEYLEN, getips_store_callback, tmp_ips);
1629
1630         *ips = tmp_ips;
1631
1632         return 0;
1633 }
1634
1635
1636 /* 
1637  * scans all other nodes and returns a pnn for another node that can host this 
1638  * ip address or -1
1639  */
1640 static int
1641 find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
1642 {
1643         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1644         struct ctdb_all_public_ips *ips;
1645         struct ctdb_node_map *nodemap=NULL;
1646         int i, j, ret;
1647
1648         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1649         if (ret != 0) {
1650                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
1651                 talloc_free(tmp_ctx);
1652                 return ret;
1653         }
1654
1655         for(i=0;i<nodemap->num;i++){
1656                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1657                         continue;
1658                 }
1659                 if (nodemap->nodes[i].pnn == options.pnn) {
1660                         continue;
1661                 }
1662
1663                 /* read the public ip list from this node */
1664                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
1665                 if (ret != 0) {
1666                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
1667                         return -1;
1668                 }
1669
1670                 for (j=0;j<ips->num;j++) {
1671                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1672                                 talloc_free(tmp_ctx);
1673                                 return nodemap->nodes[i].pnn;
1674                         }
1675                 }
1676                 talloc_free(ips);
1677         }
1678
1679         talloc_free(tmp_ctx);
1680         return -1;
1681 }
1682
1683 static uint32_t ipreallocate_finished;
1684
1685 /*
1686   handler for receiving the response to ipreallocate
1687 */
1688 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1689                              TDB_DATA data, void *private_data)
1690 {
1691         ipreallocate_finished = 1;
1692 }
1693
1694 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1695 {
1696         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1697
1698         event_add_timed(ctdb->ev, ctdb, 
1699                                 timeval_current_ofs(1, 0),
1700                                 ctdb_every_second, ctdb);
1701 }
1702
1703 /*
1704   ask the recovery daemon on the recovery master to perform a ip reallocation
1705  */
1706 static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
1707 {
1708         int i, ret;
1709         TDB_DATA data;
1710         struct takeover_run_reply rd;
1711         uint32_t recmaster;
1712         struct ctdb_node_map *nodemap=NULL;
1713         int retries=0;
1714         struct timeval tv = timeval_current();
1715
1716         /* we need some events to trigger so we can timeout and restart
1717            the loop
1718         */
1719         event_add_timed(ctdb->ev, ctdb, 
1720                                 timeval_current_ofs(1, 0),
1721                                 ctdb_every_second, ctdb);
1722
1723         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1724         if (rd.pnn == -1) {
1725                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
1726                 return -1;
1727         }
1728         rd.srvid = getpid();
1729
1730         /* register a message port for receiveing the reply so that we
1731            can receive the reply
1732         */
1733         ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
1734
1735         data.dptr = (uint8_t *)&rd;
1736         data.dsize = sizeof(rd);
1737
1738 again:
1739         /* check that there are valid nodes available */
1740         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
1741                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1742                 return -1;
1743         }
1744         for (i=0; i<nodemap->num;i++) {
1745                 if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
1746                         break;
1747                 }
1748         }
1749         if (i==nodemap->num) {
1750                 DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
1751                 return 0;
1752         }
1753
1754
1755         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
1756                 DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
1757                 return -1;
1758         }
1759
1760         /* verify the node exists */
1761         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
1762                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
1763                 return -1;
1764         }
1765
1766
1767         /* check tha there are nodes available that can act as a recmaster */
1768         for (i=0; i<nodemap->num; i++) {
1769                 if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1770                         continue;
1771                 }
1772                 break;
1773         }
1774         if (i == nodemap->num) {
1775                 DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
1776                 return 0;
1777         }
1778
1779         /* verify the recovery master is not STOPPED, nor BANNED */
1780         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1781                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1782                 retries++;
1783                 sleep(1);
1784                 goto again;
1785         } 
1786         
1787         /* verify the recovery master is not STOPPED, nor BANNED */
1788         if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1789                 DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
1790                 retries++;
1791                 sleep(1);
1792                 goto again;
1793         } 
1794
1795         ipreallocate_finished = 0;
1796         ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
1797         if (ret != 0) {
1798                 DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
1799                 return -1;
1800         }
1801
1802         tv = timeval_current();
1803         /* this loop will terminate when we have received the reply */
1804         while (timeval_elapsed(&tv) < 5.0 && ipreallocate_finished == 0) {
1805                 event_loop_once(ctdb->ev);
1806         }
1807         if (ipreallocate_finished == 1) {
1808                 return 0;
1809         }
1810
1811         retries++;
1812         sleep(1);
1813         goto again;
1814
1815         return 0;
1816 }
1817
1818
1819 /*
1820   add a public ip address to a node
1821  */
1822 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
1823 {
1824         int i, ret;
1825         int len, retries = 0;
1826         unsigned mask;
1827         ctdb_sock_addr addr;
1828         struct ctdb_control_ip_iface *pub;
1829         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1830         struct ctdb_all_public_ips *ips;
1831
1832
1833         if (argc != 2) {
1834                 talloc_free(tmp_ctx);
1835                 usage();
1836         }
1837
1838         if (!parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
1839                 DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
1840                 talloc_free(tmp_ctx);
1841                 return -1;
1842         }
1843
1844         /* read the public ip list from the node */
1845         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
1846         if (ret != 0) {
1847                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", options.pnn));
1848                 talloc_free(tmp_ctx);
1849                 return -1;
1850         }
1851         for (i=0;i<ips->num;i++) {
1852                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
1853                         DEBUG(DEBUG_ERR,("Can not add ip to node. Node already hosts this ip\n"));
1854                         return 0;
1855                 }
1856         }
1857
1858
1859
1860         /* Dont timeout. This command waits for an ip reallocation
1861            which sometimes can take wuite a while if there has
1862            been a recent recovery
1863         */
1864         alarm(0);
1865
1866         len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
1867         pub = talloc_size(tmp_ctx, len); 
1868         CTDB_NO_MEMORY(ctdb, pub);
1869
1870         pub->addr  = addr;
1871         pub->mask  = mask;
1872         pub->len   = strlen(argv[1])+1;
1873         memcpy(&pub->iface[0], argv[1], strlen(argv[1])+1);
1874
1875         do {
1876                 ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
1877                 if (ret != 0) {
1878                         DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Wait 3 seconds and try again.\n", options.pnn));
1879                         sleep(3);
1880                         retries++;
1881                 }
1882         } while (retries < 5 && ret != 0);
1883         if (ret != 0) {
1884                 DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u. Giving up.\n", options.pnn));
1885                 talloc_free(tmp_ctx);
1886                 return ret;
1887         }
1888
1889         do {
1890                 ret = control_ipreallocate(ctdb, argc, argv);
1891                 if (ret != 0) {
1892                         DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u. Wait 3 seconds and try again.\n", options.pnn));
1893                         sleep(3);
1894                         retries++;
1895                 }
1896         } while (retries < 5 && ret != 0);
1897         if (ret != 0) {
1898                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u. Giving up.\n", options.pnn));
1899                 talloc_free(tmp_ctx);
1900                 return ret;
1901         }
1902
1903         talloc_free(tmp_ctx);
1904         return 0;
1905 }
1906
1907 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
1908
1909 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
1910 {
1911         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1912         struct ctdb_node_map *nodemap=NULL;
1913         struct ctdb_all_public_ips *ips;
1914         int ret, i, j;
1915
1916         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1917         if (ret != 0) {
1918                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from current node\n"));
1919                 return ret;
1920         }
1921
1922         /* remove it from the nodes that are not hosting the ip currently */
1923         for(i=0;i<nodemap->num;i++){
1924                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1925                         continue;
1926                 }
1927                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1928                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1929                         continue;
1930                 }
1931
1932                 for (j=0;j<ips->num;j++) {
1933                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1934                                 break;
1935                         }
1936                 }
1937                 if (j==ips->num) {
1938                         continue;
1939                 }
1940
1941                 if (ips->ips[j].pnn == nodemap->nodes[i].pnn) {
1942                         continue;
1943                 }
1944
1945                 options.pnn = nodemap->nodes[i].pnn;
1946                 control_delip(ctdb, argc, argv);
1947         }
1948
1949
1950         /* remove it from every node (also the one hosting it) */
1951         for(i=0;i<nodemap->num;i++){
1952                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1953                         continue;
1954                 }
1955                 if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
1956                         DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
1957                         continue;
1958                 }
1959
1960                 for (j=0;j<ips->num;j++) {
1961                         if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
1962                                 break;
1963                         }
1964                 }
1965                 if (j==ips->num) {
1966                         continue;
1967                 }
1968
1969                 options.pnn = nodemap->nodes[i].pnn;
1970                 control_delip(ctdb, argc, argv);
1971         }
1972
1973         talloc_free(tmp_ctx);
1974         return 0;
1975 }
1976         
1977 /*
1978   delete a public ip address from a node
1979  */
1980 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
1981 {
1982         int i, ret;
1983         int retries = 0;
1984         ctdb_sock_addr addr;
1985         struct ctdb_control_ip_iface pub;
1986         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1987         struct ctdb_all_public_ips *ips;
1988
1989         if (argc != 1) {
1990                 talloc_free(tmp_ctx);
1991                 usage();
1992         }
1993
1994         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
1995                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
1996                 return -1;
1997         }
1998
1999         if (options.pnn == CTDB_BROADCAST_ALL) {
2000                 return control_delip_all(ctdb, argc, argv, &addr);
2001         }
2002
2003         pub.addr  = addr;
2004         pub.mask  = 0;
2005         pub.len   = 0;
2006
2007         ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2008         if (ret != 0) {
2009                 DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
2010                 talloc_free(tmp_ctx);
2011                 return ret;
2012         }
2013         
2014         for (i=0;i<ips->num;i++) {
2015                 if (ctdb_same_ip(&addr, &ips->ips[i].addr)) {
2016                         break;
2017                 }
2018         }
2019
2020         if (i==ips->num) {
2021                 DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
2022                         ctdb_addr_to_str(&addr)));
2023                 talloc_free(tmp_ctx);
2024                 return -1;
2025         }
2026
2027         if (ips->ips[i].pnn == options.pnn) {
2028                 ret = find_other_host_for_public_ip(ctdb, &addr);
2029                 if (ret != -1) {
2030                         do {
2031                                 ret = move_ip(ctdb, &addr, ret);
2032                                 if (ret != 0) {
2033                                         DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 seconds and try again.\n", options.pnn));
2034                                         sleep(3);
2035                                         retries++;
2036                                 }
2037                         } while (retries < 5 && ret != 0);
2038                         if (ret != 0) {
2039                                 DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", options.pnn));
2040                                 return -1;
2041                         }
2042                 }
2043         }
2044
2045         ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
2046         if (ret != 0) {
2047                 DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
2048                 talloc_free(tmp_ctx);
2049                 return ret;
2050         }
2051
2052         talloc_free(tmp_ctx);
2053         return 0;
2054 }
2055
2056 /*
2057   kill a tcp connection
2058  */
2059 static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2060 {
2061         int ret;
2062         struct ctdb_control_killtcp killtcp;
2063
2064         if (argc < 2) {
2065                 usage();
2066         }
2067
2068         if (!parse_ip_port(argv[0], &killtcp.src_addr)) {
2069                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2070                 return -1;
2071         }
2072
2073         if (!parse_ip_port(argv[1], &killtcp.dst_addr)) {
2074                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2075                 return -1;
2076         }
2077
2078         ret = ctdb_ctrl_killtcp(ctdb, TIMELIMIT(), options.pnn, &killtcp);
2079         if (ret != 0) {
2080                 DEBUG(DEBUG_ERR, ("Unable to killtcp from node %u\n", options.pnn));
2081                 return ret;
2082         }
2083
2084         return 0;
2085 }
2086
2087
2088 /*
2089   send a gratious arp
2090  */
2091 static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char **argv)
2092 {
2093         int ret;
2094         ctdb_sock_addr addr;
2095
2096         if (argc < 2) {
2097                 usage();
2098         }
2099
2100         if (!parse_ip(argv[0], NULL, 0, &addr)) {
2101                 DEBUG(DEBUG_ERR, ("Bad IP '%s'\n", argv[0]));
2102                 return -1;
2103         }
2104
2105         ret = ctdb_ctrl_gratious_arp(ctdb, TIMELIMIT(), options.pnn, &addr, argv[1]);
2106         if (ret != 0) {
2107                 DEBUG(DEBUG_ERR, ("Unable to send gratious_arp from node %u\n", options.pnn));
2108                 return ret;
2109         }
2110
2111         return 0;
2112 }
2113
2114 /*
2115   register a server id
2116  */
2117 static int regsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2118 {
2119         int ret;
2120         struct ctdb_server_id server_id;
2121
2122         if (argc < 3) {
2123                 usage();
2124         }
2125
2126         server_id.pnn       = strtoul(argv[0], NULL, 0);
2127         server_id.type      = strtoul(argv[1], NULL, 0);
2128         server_id.server_id = strtoul(argv[2], NULL, 0);
2129
2130         ret = ctdb_ctrl_register_server_id(ctdb, TIMELIMIT(), &server_id);
2131         if (ret != 0) {
2132                 DEBUG(DEBUG_ERR, ("Unable to register server_id from node %u\n", options.pnn));
2133                 return ret;
2134         }
2135         DEBUG(DEBUG_ERR,("Srvid registered. Sleeping for 999 seconds\n"));
2136         sleep(999);
2137         return -1;
2138 }
2139
2140 /*
2141   unregister a server id
2142  */
2143 static int unregsrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2144 {
2145         int ret;
2146         struct ctdb_server_id server_id;
2147
2148         if (argc < 3) {
2149                 usage();
2150         }
2151
2152         server_id.pnn       = strtoul(argv[0], NULL, 0);
2153         server_id.type      = strtoul(argv[1], NULL, 0);
2154         server_id.server_id = strtoul(argv[2], NULL, 0);
2155
2156         ret = ctdb_ctrl_unregister_server_id(ctdb, TIMELIMIT(), &server_id);
2157         if (ret != 0) {
2158                 DEBUG(DEBUG_ERR, ("Unable to unregister server_id from node %u\n", options.pnn));
2159                 return ret;
2160         }
2161         return -1;
2162 }
2163
2164 /*
2165   check if a server id exists
2166  */
2167 static int chksrvid(struct ctdb_context *ctdb, int argc, const char **argv)
2168 {
2169         uint32_t status;
2170         int ret;
2171         struct ctdb_server_id server_id;
2172
2173         if (argc < 3) {
2174                 usage();
2175         }
2176
2177         server_id.pnn       = strtoul(argv[0], NULL, 0);
2178         server_id.type      = strtoul(argv[1], NULL, 0);
2179         server_id.server_id = strtoul(argv[2], NULL, 0);
2180
2181         ret = ctdb_ctrl_check_server_id(ctdb, TIMELIMIT(), options.pnn, &server_id, &status);
2182         if (ret != 0) {
2183                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n", options.pnn));
2184                 return ret;
2185         }
2186
2187         if (status) {
2188                 printf("Server id %d:%d:%d EXISTS\n", server_id.pnn, server_id.type, server_id.server_id);
2189         } else {
2190                 printf("Server id %d:%d:%d does NOT exist\n", server_id.pnn, server_id.type, server_id.server_id);
2191         }
2192         return 0;
2193 }
2194
2195 /*
2196   get a list of all server ids that are registered on a node
2197  */
2198 static int getsrvids(struct ctdb_context *ctdb, int argc, const char **argv)
2199 {
2200         int i, ret;
2201         struct ctdb_server_id_list *server_ids;
2202
2203         ret = ctdb_ctrl_get_server_id_list(ctdb, ctdb, TIMELIMIT(), options.pnn, &server_ids);
2204         if (ret != 0) {
2205                 DEBUG(DEBUG_ERR, ("Unable to get server_id list from node %u\n", options.pnn));
2206                 return ret;
2207         }
2208
2209         for (i=0; i<server_ids->num; i++) {
2210                 printf("Server id %d:%d:%d\n", 
2211                         server_ids->server_ids[i].pnn, 
2212                         server_ids->server_ids[i].type, 
2213                         server_ids->server_ids[i].server_id); 
2214         }
2215
2216         return -1;
2217 }
2218
2219 /*
2220   check if a server id exists
2221  */
2222 static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
2223 {
2224         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2225         uint64_t *ids;
2226         uint8_t *result;
2227         int i;
2228
2229         if (argc < 1) {
2230                 talloc_free(tmp_ctx);
2231                 usage();
2232         }
2233
2234         ids    = talloc_array(tmp_ctx, uint64_t, argc);
2235         result = talloc_array(tmp_ctx, uint8_t, argc);
2236
2237         for (i = 0; i < argc; i++) {
2238                 ids[i] = strtoull(argv[i], NULL, 0);
2239         }
2240
2241         if (!ctdb_check_message_handlers(ctdb_connection,
2242                 options.pnn, argc, ids, result)) {
2243                 DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
2244                                   options.pnn));
2245                 talloc_free(tmp_ctx);
2246                 return -1;
2247         }
2248
2249         for (i=0; i < argc; i++) {
2250                 printf("Server id %d:%llu %s\n", options.pnn, (long long)ids[i],
2251                        result[i] ? "exists" : "does not exist");
2252         }
2253
2254         talloc_free(tmp_ctx);
2255         return 0;
2256 }
2257
2258 /*
2259   send a tcp tickle ack
2260  */
2261 static int tickle_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
2262 {
2263         int ret;
2264         ctdb_sock_addr  src, dst;
2265
2266         if (argc < 2) {
2267                 usage();
2268         }
2269
2270         if (!parse_ip_port(argv[0], &src)) {
2271                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[0]));
2272                 return -1;
2273         }
2274
2275         if (!parse_ip_port(argv[1], &dst)) {
2276                 DEBUG(DEBUG_ERR, ("Bad IP:port '%s'\n", argv[1]));
2277                 return -1;
2278         }
2279
2280         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
2281         if (ret==0) {
2282                 return 0;
2283         }
2284         DEBUG(DEBUG_ERR, ("Error while sending tickle ack\n"));
2285
2286         return -1;
2287 }
2288
2289
2290 /*
2291   display public ip status
2292  */
2293 static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
2294 {
2295         int i, ret;
2296         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2297         struct ctdb_all_public_ips *ips;
2298
2299         if (options.pnn == CTDB_BROADCAST_ALL) {
2300                 /* read the list of public ips from all nodes */
2301                 ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
2302         } else {
2303                 /* read the public ip list from this node */
2304                 ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
2305         }
2306         if (ret != 0) {
2307                 DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
2308                 talloc_free(tmp_ctx);
2309                 return ret;
2310         }
2311
2312         if (options.machinereadable){
2313                 printf(":Public IP:Node:");
2314                 if (options.verbose){
2315                         printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
2316                 }
2317                 printf("\n");
2318         } else {
2319                 if (options.pnn == CTDB_BROADCAST_ALL) {
2320                         printf("Public IPs on ALL nodes\n");
2321                 } else {
2322                         printf("Public IPs on node %u\n", options.pnn);
2323                 }
2324         }
2325
2326         for (i=1;i<=ips->num;i++) {
2327                 struct ctdb_control_public_ip_info *info = NULL;
2328                 int32_t pnn;
2329                 char *aciface = NULL;
2330                 char *avifaces = NULL;
2331                 char *cifaces = NULL;
2332
2333                 if (options.pnn == CTDB_BROADCAST_ALL) {
2334                         pnn = ips->ips[ips->num-i].pnn;
2335                 } else {
2336                         pnn = options.pnn;
2337                 }
2338
2339                 if (pnn != -1) {
2340                         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), pnn, ctdb,
2341                                                    &ips->ips[ips->num-i].addr, &info);
2342                 } else {
2343                         ret = -1;
2344                 }
2345
2346                 if (ret == 0) {
2347                         int j;
2348                         for (j=0; j < info->num; j++) {
2349                                 if (cifaces == NULL) {
2350                                         cifaces = talloc_strdup(info,
2351                                                                 info->ifaces[j].name);
2352                                 } else {
2353                                         cifaces = talloc_asprintf_append(cifaces,
2354                                                                          ",%s",
2355                                                                          info->ifaces[j].name);
2356                                 }
2357
2358                                 if (info->active_idx == j) {
2359                                         aciface = info->ifaces[j].name;
2360                                 }
2361
2362                                 if (info->ifaces[j].link_state == 0) {
2363                                         continue;
2364                                 }
2365
2366                                 if (avifaces == NULL) {
2367                                         avifaces = talloc_strdup(info, info->ifaces[j].name);
2368                                 } else {
2369                                         avifaces = talloc_asprintf_append(avifaces,
2370                                                                           ",%s",
2371                                                                           info->ifaces[j].name);
2372                                 }
2373                         }
2374                 }
2375
2376                 if (options.machinereadable){
2377                         printf(":%s:%d:",
2378                                 ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2379                                 ips->ips[ips->num-i].pnn);
2380                         if (options.verbose){
2381                                 printf("%s:%s:%s:",
2382                                         aciface?aciface:"",
2383                                         avifaces?avifaces:"",
2384                                         cifaces?cifaces:"");
2385                         }
2386                         printf("\n");
2387                 } else {
2388                         if (options.verbose) {
2389                                 printf("%s node[%d] active[%s] available[%s] configured[%s]\n",
2390                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2391                                         ips->ips[ips->num-i].pnn,
2392                                         aciface?aciface:"",
2393                                         avifaces?avifaces:"",
2394                                         cifaces?cifaces:"");
2395                         } else {
2396                                 printf("%s %d\n",
2397                                         ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
2398                                         ips->ips[ips->num-i].pnn);
2399                         }
2400                 }
2401                 talloc_free(info);
2402         }
2403
2404         talloc_free(tmp_ctx);
2405         return 0;
2406 }
2407
2408 /*
2409   public ip info
2410  */
2411 static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv)
2412 {
2413         int i, ret;
2414         ctdb_sock_addr addr;
2415         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2416         struct ctdb_control_public_ip_info *info;
2417
2418         if (argc != 1) {
2419                 talloc_free(tmp_ctx);
2420                 usage();
2421         }
2422
2423         if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
2424                 DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
2425                 return -1;
2426         }
2427
2428         /* read the public ip info from this node */
2429         ret = ctdb_ctrl_get_public_ip_info(ctdb, TIMELIMIT(), options.pnn,
2430                                            tmp_ctx, &addr, &info);
2431         if (ret != 0) {
2432                 DEBUG(DEBUG_ERR, ("Unable to get public ip[%s]info from node %u\n",
2433                                   argv[0], options.pnn));
2434                 talloc_free(tmp_ctx);
2435                 return ret;
2436         }
2437
2438         printf("Public IP[%s] info on node %u\n",
2439                ctdb_addr_to_str(&info->ip.addr),
2440                options.pnn);
2441
2442         printf("IP:%s\nCurrentNode:%d\nNumInterfaces:%u\n",
2443                ctdb_addr_to_str(&info->ip.addr),
2444                info->ip.pnn, info->num);
2445
2446         for (i=0; i<info->num; i++) {
2447                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2448
2449                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
2450                        i+1, info->ifaces[i].name,
2451                        info->ifaces[i].link_state?"up":"down",
2452                        (unsigned int)info->ifaces[i].references,
2453                        (i==info->active_idx)?" (active)":"");
2454         }
2455
2456         talloc_free(tmp_ctx);
2457         return 0;
2458 }
2459
2460 /*
2461   display interfaces status
2462  */
2463 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
2464 {
2465         int i;
2466         struct ctdb_ifaces_list *ifaces;
2467
2468         /* read the public ip list from this node */
2469         if (!ctdb_getifaces(ctdb_connection, options.pnn, &ifaces)) {
2470                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
2471                                   options.pnn));
2472                 return -1;
2473         }
2474
2475         if (options.machinereadable){
2476                 printf(":Name:LinkStatus:References:\n");
2477         } else {
2478                 printf("Interfaces on node %u\n", options.pnn);
2479         }
2480
2481         for (i=0; i<ifaces->num; i++) {
2482                 if (options.machinereadable){
2483                         printf(":%s:%s:%u\n",
2484                                ifaces->ifaces[i].name,
2485                                ifaces->ifaces[i].link_state?"1":"0",
2486                                (unsigned int)ifaces->ifaces[i].references);
2487                 } else {
2488                         printf("name:%s link:%s references:%u\n",
2489                                ifaces->ifaces[i].name,
2490                                ifaces->ifaces[i].link_state?"up":"down",
2491                                (unsigned int)ifaces->ifaces[i].references);
2492                 }
2493         }
2494
2495         ctdb_free_ifaces(ifaces);
2496         return 0;
2497 }
2498
2499
2500 /*
2501   set link status of an interface
2502  */
2503 static int control_setifacelink(struct ctdb_context *ctdb, int argc, const char **argv)
2504 {
2505         int ret;
2506         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2507         struct ctdb_control_iface_info info;
2508
2509         ZERO_STRUCT(info);
2510
2511         if (argc != 2) {
2512                 usage();
2513         }
2514
2515         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
2516                 DEBUG(DEBUG_ERR, ("interfaces name '%s' too long\n",
2517                                   argv[0]));
2518                 talloc_free(tmp_ctx);
2519                 return -1;
2520         }
2521         strcpy(info.name, argv[0]);
2522
2523         if (strcmp(argv[1], "up") == 0) {
2524                 info.link_state = 1;
2525         } else if (strcmp(argv[1], "down") == 0) {
2526                 info.link_state = 0;
2527         } else {
2528                 DEBUG(DEBUG_ERR, ("link state invalid '%s' should be 'up' or 'down'\n",
2529                                   argv[1]));
2530                 talloc_free(tmp_ctx);
2531                 return -1;
2532         }
2533
2534         /* read the public ip list from this node */
2535         ret = ctdb_ctrl_set_iface_link(ctdb, TIMELIMIT(), options.pnn,
2536                                    tmp_ctx, &info);
2537         if (ret != 0) {
2538                 DEBUG(DEBUG_ERR, ("Unable to set link state for interfaces %s node %u\n",
2539                                   argv[0], options.pnn));
2540                 talloc_free(tmp_ctx);
2541                 return ret;
2542         }
2543
2544         talloc_free(tmp_ctx);
2545         return 0;
2546 }
2547
2548 /*
2549   display pid of a ctdb daemon
2550  */
2551 static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv)
2552 {
2553         uint32_t pid;
2554         int ret;
2555
2556         ret = ctdb_ctrl_getpid(ctdb, TIMELIMIT(), options.pnn, &pid);
2557         if (ret != 0) {
2558                 DEBUG(DEBUG_ERR, ("Unable to get daemon pid from node %u\n", options.pnn));
2559                 return ret;
2560         }
2561         printf("Pid:%d\n", pid);
2562
2563         return 0;
2564 }
2565
2566 /*
2567   disable a remote node
2568  */
2569 static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
2570 {
2571         int ret;
2572         struct ctdb_node_map *nodemap=NULL;
2573
2574         /* check if the node is already disabled */
2575         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2576                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2577                 exit(10);
2578         }
2579         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
2580                 DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
2581                 return 0;
2582         }
2583
2584         do {
2585                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2586                 if (ret != 0) {
2587                         DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
2588                         return ret;
2589                 }
2590
2591                 sleep(1);
2592
2593                 /* read the nodemap and verify the change took effect */
2594                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2595                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2596                         exit(10);
2597                 }
2598
2599         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
2600         ret = control_ipreallocate(ctdb, argc, argv);
2601         if (ret != 0) {
2602                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2603                 return ret;
2604         }
2605
2606         return 0;
2607 }
2608
2609 /*
2610   enable a disabled remote node
2611  */
2612 static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
2613 {
2614         int ret;
2615
2616         struct ctdb_node_map *nodemap=NULL;
2617
2618
2619         /* check if the node is already enabled */
2620         if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2621                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2622                 exit(10);
2623         }
2624         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
2625                 DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
2626                 return 0;
2627         }
2628
2629         do {
2630                 ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
2631                 if (ret != 0) {
2632                         DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
2633                         return ret;
2634                 }
2635
2636                 sleep(1);
2637
2638                 /* read the nodemap and verify the change took effect */
2639                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2640                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2641                         exit(10);
2642                 }
2643
2644         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
2645
2646         ret = control_ipreallocate(ctdb, argc, argv);
2647         if (ret != 0) {
2648                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2649                 return ret;
2650         }
2651
2652         return 0;
2653 }
2654
2655 /*
2656   stop a remote node
2657  */
2658 static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
2659 {
2660         int ret;
2661         struct ctdb_node_map *nodemap=NULL;
2662
2663         do {
2664                 ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
2665                 if (ret != 0) {
2666                         DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
2667                 }
2668         
2669                 sleep(1);
2670
2671                 /* read the nodemap and verify the change took effect */
2672                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2673                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2674                         exit(10);
2675                 }
2676
2677         } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
2678         ret = control_ipreallocate(ctdb, argc, argv);
2679         if (ret != 0) {
2680                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2681                 return ret;
2682         }
2683
2684         return 0;
2685 }
2686
2687 /*
2688   restart a stopped remote node
2689  */
2690 static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
2691 {
2692         int ret;
2693
2694         struct ctdb_node_map *nodemap=NULL;
2695
2696         do {
2697                 ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
2698                 if (ret != 0) {
2699                         DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
2700                         return ret;
2701                 }
2702         
2703                 sleep(1);
2704
2705                 /* read the nodemap and verify the change took effect */
2706                 if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
2707                         DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2708                         exit(10);
2709                 }
2710
2711         } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
2712         ret = control_ipreallocate(ctdb, argc, argv);
2713         if (ret != 0) {
2714                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2715                 return ret;
2716         }
2717
2718         return 0;
2719 }
2720
2721 static uint32_t get_generation(struct ctdb_context *ctdb)
2722 {
2723         struct ctdb_vnn_map *vnnmap=NULL;
2724         int ret;
2725
2726         /* wait until the recmaster is not in recovery mode */
2727         while (1) {
2728                 uint32_t recmode, recmaster;
2729                 
2730                 if (vnnmap != NULL) {
2731                         talloc_free(vnnmap);
2732                         vnnmap = NULL;
2733                 }
2734
2735                 /* get the recmaster */
2736                 if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
2737                         DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
2738                         exit(10);
2739                 }
2740
2741                 /* get recovery mode */
2742                 if (!ctdb_getrecmode(ctdb_connection, recmaster, &recmode)) {
2743                         DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
2744                         exit(10);
2745                 }
2746
2747                 /* get the current generation number */
2748                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
2749                 if (ret != 0) {
2750                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
2751                         exit(10);
2752                 }
2753
2754                 if ((recmode == CTDB_RECOVERY_NORMAL)
2755                 &&  (vnnmap->generation != 1)){
2756                         return vnnmap->generation;
2757                 }
2758                 sleep(1);
2759         }
2760 }
2761
2762 /*
2763   ban a node from the cluster
2764  */
2765 static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
2766 {
2767         int ret;
2768         struct ctdb_node_map *nodemap=NULL;
2769         struct ctdb_ban_time bantime;
2770
2771         if (argc < 1) {
2772                 usage();
2773         }
2774         
2775         /* verify the node exists */
2776         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2777         if (ret != 0) {
2778                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2779                 return ret;
2780         }
2781
2782         if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
2783                 DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
2784                 return -1;
2785         }
2786
2787         bantime.pnn  = options.pnn;
2788         bantime.time = strtoul(argv[0], NULL, 0);
2789
2790         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2791         if (ret != 0) {
2792                 DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
2793                 return -1;
2794         }       
2795
2796         ret = control_ipreallocate(ctdb, argc, argv);
2797         if (ret != 0) {
2798                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2799                 return ret;
2800         }
2801
2802         return 0;
2803 }
2804
2805
2806 /*
2807   unban a node from the cluster
2808  */
2809 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
2810 {
2811         int ret;
2812         struct ctdb_node_map *nodemap=NULL;
2813         struct ctdb_ban_time bantime;
2814
2815         /* verify the node exists */
2816         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2817         if (ret != 0) {
2818                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2819                 return ret;
2820         }
2821
2822         if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
2823                 DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
2824                 return -1;
2825         }
2826
2827         bantime.pnn  = options.pnn;
2828         bantime.time = 0;
2829
2830         ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
2831         if (ret != 0) {
2832                 DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
2833                 return -1;
2834         }       
2835
2836         ret = control_ipreallocate(ctdb, argc, argv);
2837         if (ret != 0) {
2838                 DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
2839                 return ret;
2840         }
2841
2842         return 0;
2843 }
2844
2845
2846 /*
2847   show ban information for a node
2848  */
2849 static int control_showban(struct ctdb_context *ctdb, int argc, const char **argv)
2850 {
2851         int ret;
2852         struct ctdb_node_map *nodemap=NULL;
2853         struct ctdb_ban_time *bantime;
2854
2855         /* verify the node exists */
2856         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
2857         if (ret != 0) {
2858                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
2859                 return ret;
2860         }
2861
2862         ret = ctdb_ctrl_get_ban(ctdb, TIMELIMIT(), options.pnn, ctdb, &bantime);
2863         if (ret != 0) {
2864                 DEBUG(DEBUG_ERR,("Showing ban info for node %d failed.\n", options.pnn));
2865                 return -1;
2866         }       
2867
2868         if (bantime->time == 0) {
2869                 printf("Node %u is not banned\n", bantime->pnn);
2870         } else {
2871                 printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
2872         }
2873
2874         return 0;
2875 }
2876
2877 /*
2878   shutdown a daemon
2879  */
2880 static int control_shutdown(struct ctdb_context *ctdb, int argc, const char **argv)
2881 {
2882         int ret;
2883
2884         ret = ctdb_ctrl_shutdown(ctdb, TIMELIMIT(), options.pnn);
2885         if (ret != 0) {
2886                 DEBUG(DEBUG_ERR, ("Unable to shutdown node %u\n", options.pnn));
2887                 return ret;
2888         }
2889
2890         return 0;
2891 }
2892
2893 /*
2894   trigger a recovery
2895  */
2896 static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
2897 {
2898         int ret;
2899         uint32_t generation, next_generation;
2900
2901         /* record the current generation number */
2902         generation = get_generation(ctdb);
2903
2904         ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
2905         if (ret != 0) {
2906                 DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
2907                 return ret;
2908         }
2909
2910         ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
2911         if (ret != 0) {
2912                 DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
2913                 return ret;
2914         }
2915
2916         /* wait until we are in a new generation */
2917         while (1) {
2918                 next_generation = get_generation(ctdb);
2919                 if (next_generation != generation) {
2920                         return 0;
2921                 }
2922                 sleep(1);
2923         }
2924
2925         return 0;
2926 }
2927
2928
2929 /*
2930   display monitoring mode of a remote node
2931  */
2932 static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **argv)
2933 {
2934         uint32_t monmode;
2935         int ret;
2936
2937         ret = ctdb_ctrl_getmonmode(ctdb, TIMELIMIT(), options.pnn, &monmode);
2938         if (ret != 0) {
2939                 DEBUG(DEBUG_ERR, ("Unable to get monmode from node %u\n", options.pnn));
2940                 return ret;
2941         }
2942         if (!options.machinereadable){
2943                 printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
2944         } else {
2945                 printf(":mode:\n");
2946                 printf(":%d:\n",monmode);
2947         }
2948         return 0;
2949 }
2950
2951
2952 /*
2953   display capabilities of a remote node
2954  */
2955 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
2956 {
2957         uint32_t capabilities;
2958         int ret;
2959
2960         ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
2961         if (ret != 0) {
2962                 DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
2963                 return ret;
2964         }
2965         
2966         if (!options.machinereadable){
2967                 printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
2968                 printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
2969                 printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
2970                 printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
2971         } else {
2972                 printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
2973                 printf(":%d:%d:%d:%d:\n",
2974                         !!(capabilities&CTDB_CAP_RECMASTER),
2975                         !!(capabilities&CTDB_CAP_LMASTER),
2976                         !!(capabilities&CTDB_CAP_LVS),
2977                         !!(capabilities&CTDB_CAP_NATGW));
2978         }
2979         return 0;
2980 }
2981
2982 /*
2983   display lvs configuration
2984  */
2985 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
2986 {
2987         uint32_t *capabilities;
2988         struct ctdb_node_map *nodemap=NULL;
2989         int i, ret;
2990         int healthy_count = 0;
2991
2992         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
2993         if (ret != 0) {
2994                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
2995                 return ret;
2996         }
2997
2998         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
2999         CTDB_NO_MEMORY(ctdb, capabilities);
3000         
3001         /* collect capabilities for all connected nodes */
3002         for (i=0; i<nodemap->num; i++) {
3003                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3004                         continue;
3005                 }
3006                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3007                         continue;
3008                 }
3009         
3010                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3011                 if (ret != 0) {
3012                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3013                         return ret;
3014                 }
3015
3016                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3017                         continue;
3018                 }
3019
3020                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3021                         healthy_count++;
3022                 }
3023         }
3024
3025         /* Print all LVS nodes */
3026         for (i=0; i<nodemap->num; i++) {
3027                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3028                         continue;
3029                 }
3030                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3031                         continue;
3032                 }
3033                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3034                         continue;
3035                 }
3036
3037                 if (healthy_count != 0) {
3038                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3039                                 continue;
3040                         }
3041                 }
3042
3043                 printf("%d:%s\n", i, 
3044                         ctdb_addr_to_str(&nodemap->nodes[i].addr));
3045         }
3046
3047         return 0;
3048 }
3049
3050 /*
3051   display who is the lvs master
3052  */
3053 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3054 {
3055         uint32_t *capabilities;
3056         struct ctdb_node_map *nodemap=NULL;
3057         int i, ret;
3058         int healthy_count = 0;
3059
3060         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
3061         if (ret != 0) {
3062                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
3063                 return ret;
3064         }
3065
3066         capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
3067         CTDB_NO_MEMORY(ctdb, capabilities);
3068         
3069         /* collect capabilities for all connected nodes */
3070         for (i=0; i<nodemap->num; i++) {
3071                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3072                         continue;
3073                 }
3074                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3075                         continue;
3076                 }
3077         
3078                 ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
3079                 if (ret != 0) {
3080                         DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
3081                         return ret;
3082                 }
3083
3084                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3085                         continue;
3086                 }
3087
3088                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
3089                         healthy_count++;
3090                 }
3091         }
3092
3093         /* find and show the lvsmaster */
3094         for (i=0; i<nodemap->num; i++) {
3095                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3096                         continue;
3097                 }
3098                 if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
3099                         continue;
3100                 }
3101                 if (!(capabilities[i] & CTDB_CAP_LVS)) {
3102                         continue;
3103                 }
3104
3105                 if (healthy_count != 0) {
3106                         if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
3107                                 continue;
3108                         }
3109                 }
3110
3111                 if (options.machinereadable){
3112                         printf("%d\n", i);
3113                 } else {
3114                         printf("Node %d is LVS master\n", i);
3115                 }
3116                 return 0;
3117         }
3118
3119         printf("There is no LVS master\n");
3120         return -1;
3121 }
3122
3123 /*
3124   disable monitoring on a  node
3125  */
3126 static int control_disable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3127 {
3128         
3129         int ret;
3130
3131         ret = ctdb_ctrl_disable_monmode(ctdb, TIMELIMIT(), options.pnn);
3132         if (ret != 0) {
3133                 DEBUG(DEBUG_ERR, ("Unable to disable monmode on node %u\n", options.pnn));
3134                 return ret;
3135         }
3136         printf("Monitoring mode:%s\n","DISABLED");
3137
3138         return 0;
3139 }
3140
3141 /*
3142   enable monitoring on a  node
3143  */
3144 static int control_enable_monmode(struct ctdb_context *ctdb, int argc, const char **argv)
3145 {
3146         
3147         int ret;
3148
3149         ret = ctdb_ctrl_enable_monmode(ctdb, TIMELIMIT(), options.pnn);
3150         if (ret != 0) {
3151                 DEBUG(DEBUG_ERR, ("Unable to enable monmode on node %u\n", options.pnn));
3152                 return ret;
3153         }
3154         printf("Monitoring mode:%s\n","ACTIVE");
3155
3156         return 0;
3157 }
3158
3159 /*
3160   display remote list of keys/data for a db
3161  */
3162 static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
3163 {
3164         const char *db_name;
3165         struct ctdb_db_context *ctdb_db;
3166         int ret;
3167         bool persistent;
3168         struct ctdb_dump_db_context c;
3169
3170         if (argc < 1) {
3171                 usage();
3172         }
3173
3174         db_name = argv[0];
3175
3176
3177         if (db_exists(ctdb, db_name, &persistent)) {
3178                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3179                 return -1;
3180         }
3181
3182         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3183
3184         if (ctdb_db == NULL) {
3185                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3186                 return -1;
3187         }
3188
3189         if (options.printlmaster) {
3190                 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn,
3191                                           ctdb, &ctdb->vnn_map);
3192                 if (ret != 0) {
3193                         DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
3194                                           options.pnn));
3195                         return ret;
3196                 }
3197         }
3198
3199         ZERO_STRUCT(c);
3200         c.f = stdout;
3201         c.printemptyrecords = (bool)options.printemptyrecords;
3202         c.printdatasize = (bool)options.printdatasize;
3203         c.printlmaster = (bool)options.printlmaster;
3204         c.printhash = (bool)options.printhash;
3205         c.printrecordflags = (bool)options.printrecordflags;
3206
3207         /* traverse and dump the cluster tdb */
3208         ret = ctdb_dump_db(ctdb_db, &c);
3209         if (ret == -1) {
3210                 DEBUG(DEBUG_ERR, ("Unable to dump database\n"));
3211                 DEBUG(DEBUG_ERR, ("Maybe try 'ctdb getdbstatus %s'"
3212                                   " and 'ctdb getvar AllowUnhealthyDBRead'\n",
3213                                   db_name));
3214                 return -1;
3215         }
3216         talloc_free(ctdb_db);
3217
3218         printf("Dumped %d records\n", ret);
3219         return 0;
3220 }
3221
3222 struct cattdb_data {
3223         struct ctdb_context *ctdb;
3224         uint32_t count;
3225 };
3226
3227 static int cattdb_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
3228 {
3229         struct cattdb_data *d = private_data;
3230         struct ctdb_dump_db_context c;
3231
3232         d->count++;
3233
3234         ZERO_STRUCT(c);
3235         c.f = stdout;
3236         c.printemptyrecords = (bool)options.printemptyrecords;
3237         c.printdatasize = (bool)options.printdatasize;
3238         c.printlmaster = false;
3239         c.printhash = (bool)options.printhash;
3240         c.printrecordflags = true;
3241
3242         return ctdb_dumpdb_record(d->ctdb, key, data, &c);
3243 }
3244
3245 /*
3246   cat the local tdb database using same format as catdb
3247  */
3248 static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv)
3249 {
3250         const char *db_name;
3251         struct ctdb_db_context *ctdb_db;
3252         struct cattdb_data d;
3253         bool persistent;
3254
3255         if (argc < 1) {
3256                 usage();
3257         }
3258
3259         db_name = argv[0];
3260
3261
3262         if (db_exists(ctdb, db_name, &persistent)) {
3263                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3264                 return -1;
3265         }
3266
3267         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, false, 0);
3268
3269         if (ctdb_db == NULL) {
3270                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3271                 return -1;
3272         }
3273
3274         /* traverse the local tdb */
3275         d.count = 0;
3276         d.ctdb  = ctdb;
3277         if (tdb_traverse_read(ctdb_db->ltdb->tdb, cattdb_traverse, &d) == -1) {
3278                 printf("Failed to cattdb data\n");
3279                 exit(10);
3280         }
3281         talloc_free(ctdb_db);
3282
3283         printf("Dumped %d records\n", d.count);
3284         return 0;
3285 }
3286
3287 /*
3288   display the content of a database key
3289  */
3290 static int control_readkey(struct ctdb_context *ctdb, int argc, const char **argv)
3291 {
3292         const char *db_name;
3293         struct ctdb_db_context *ctdb_db;
3294         struct ctdb_record_handle *h;
3295         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3296         TDB_DATA key, data;
3297         bool persistent;
3298
3299         if (argc < 2) {
3300                 usage();
3301         }
3302
3303         db_name = argv[0];
3304
3305         if (db_exists(ctdb, db_name, &persistent)) {
3306                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3307                 return -1;
3308         }
3309
3310         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3311
3312         if (ctdb_db == NULL) {
3313                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3314                 return -1;
3315         }
3316
3317         key.dptr  = discard_const(argv[1]);
3318         key.dsize = strlen((char *)key.dptr);
3319
3320         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3321         if (h == NULL) {
3322                 printf("Failed to fetch record '%s' on node %d\n", 
3323                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3324                 talloc_free(tmp_ctx);
3325                 exit(10);
3326         }
3327
3328         printf("Data: size:%d ptr:[%s]\n", (int)data.dsize, data.dptr);
3329
3330         talloc_free(ctdb_db);
3331         talloc_free(tmp_ctx);
3332         return 0;
3333 }
3334
3335 /*
3336   display the content of a database key
3337  */
3338 static int control_writekey(struct ctdb_context *ctdb, int argc, const char **argv)
3339 {
3340         const char *db_name;
3341         struct ctdb_db_context *ctdb_db;
3342         struct ctdb_record_handle *h;
3343         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3344         TDB_DATA key, data;
3345         bool persistent;
3346
3347         if (argc < 3) {
3348                 usage();
3349         }
3350
3351         db_name = argv[0];
3352
3353         if (db_exists(ctdb, db_name, &persistent)) {
3354                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3355                 return -1;
3356         }
3357
3358         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3359
3360         if (ctdb_db == NULL) {
3361                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3362                 return -1;
3363         }
3364
3365         key.dptr  = discard_const(argv[1]);
3366         key.dsize = strlen((char *)key.dptr);
3367
3368         h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
3369         if (h == NULL) {
3370                 printf("Failed to fetch record '%s' on node %d\n", 
3371                         (const char *)key.dptr, ctdb_get_pnn(ctdb));
3372                 talloc_free(tmp_ctx);
3373                 exit(10);
3374         }
3375
3376         data.dptr  = discard_const(argv[2]);
3377         data.dsize = strlen((char *)data.dptr);
3378
3379         if (ctdb_record_store(h, data) != 0) {
3380                 printf("Failed to store record\n");
3381         }
3382
3383         talloc_free(h);
3384         talloc_free(ctdb_db);
3385         talloc_free(tmp_ctx);
3386         return 0;
3387 }
3388
3389 /*
3390   fetch a record from a persistent database
3391  */
3392 static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3393 {
3394         const char *db_name;
3395         struct ctdb_db_context *ctdb_db;
3396         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3397         struct ctdb_transaction_handle *h;
3398         TDB_DATA key, data;
3399         int fd, ret;
3400         bool persistent;
3401
3402         if (argc < 2) {
3403                 talloc_free(tmp_ctx);
3404                 usage();
3405         }
3406
3407         db_name = argv[0];
3408
3409
3410         if (db_exists(ctdb, db_name, &persistent)) {
3411                 DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
3412                 talloc_free(tmp_ctx);
3413                 return -1;
3414         }
3415
3416         if (!persistent) {
3417                 DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
3418                 talloc_free(tmp_ctx);
3419                 return -1;
3420         }
3421
3422         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
3423
3424         if (ctdb_db == NULL) {
3425                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3426                 talloc_free(tmp_ctx);
3427                 return -1;
3428         }
3429
3430         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3431         if (h == NULL) {
3432                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3433                 talloc_free(tmp_ctx);
3434                 return -1;
3435         }
3436
3437         key.dptr  = discard_const(argv[1]);
3438         key.dsize = strlen(argv[1]);
3439         ret = ctdb_transaction_fetch(h, tmp_ctx, key, &data);
3440         if (ret != 0) {
3441                 DEBUG(DEBUG_ERR,("Failed to fetch record\n"));
3442                 talloc_free(tmp_ctx);
3443                 return -1;
3444         }
3445
3446         if (data.dsize == 0 || data.dptr == NULL) {
3447                 DEBUG(DEBUG_ERR,("Record is empty\n"));
3448                 talloc_free(tmp_ctx);
3449                 return -1;
3450         }
3451
3452         if (argc == 3) {
3453           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3454                 if (fd == -1) {
3455                         DEBUG(DEBUG_ERR,("Failed to open output file %s\n", argv[2]));
3456                         talloc_free(tmp_ctx);
3457                         return -1;
3458                 }
3459                 write(fd, data.dptr, data.dsize);
3460                 close(fd);
3461         } else {
3462                 write(1, data.dptr, data.dsize);
3463         }
3464
3465         /* abort the transaction */
3466         talloc_free(h);
3467
3468
3469         talloc_free(tmp_ctx);
3470         return 0;
3471 }
3472
3473 /*
3474   fetch a record from a tdb-file
3475  */
3476 static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv)
3477 {
3478         const char *tdb_file;
3479         TDB_CONTEXT *tdb;
3480         TDB_DATA key, data;
3481         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3482         int fd;
3483
3484         if (argc < 2) {
3485                 usage();
3486         }
3487
3488         tdb_file = argv[0];
3489
3490         tdb = tdb_open(tdb_file, 0, 0, O_RDONLY, 0);
3491         if (tdb == NULL) {
3492                 printf("Failed to open TDB file %s\n", tdb_file);
3493                 return -1;
3494         }
3495
3496         if (!strncmp(argv[1], "0x", 2)) {
3497                 key = hextodata(tmp_ctx, argv[1] + 2);
3498                 if (key.dsize == 0) {
3499                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3500                         return -1;
3501                 }
3502         } else {
3503                 key.dptr  = discard_const(argv[1]);
3504                 key.dsize = strlen(argv[1]);
3505         }
3506
3507         data = tdb_fetch(tdb, key);
3508         if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
3509                 printf("Failed to read record %s from tdb %s\n", argv[1], tdb_file);
3510                 tdb_close(tdb);
3511                 return -1;
3512         }
3513
3514         tdb_close(tdb);
3515
3516         if (argc == 3) {
3517           fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
3518                 if (fd == -1) {
3519                         printf("Failed to open output file %s\n", argv[2]);
3520                         return -1;
3521                 }
3522                 if (options.verbose){
3523                         write(fd, data.dptr, data.dsize);
3524                 } else {
3525                         write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3526                 }
3527                 close(fd);
3528         } else {
3529                 if (options.verbose){
3530                         write(1, data.dptr, data.dsize);
3531                 } else {
3532                         write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
3533                 }
3534         }
3535
3536         talloc_free(tmp_ctx);
3537         return 0;
3538 }
3539
3540 /*
3541   store a record and header to a tdb-file
3542  */
3543 static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv)
3544 {
3545         const char *tdb_file;
3546         TDB_CONTEXT *tdb;
3547         TDB_DATA key, data;
3548         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3549
3550         if (argc < 3) {
3551                 usage();
3552         }
3553
3554         tdb_file = argv[0];
3555
3556         tdb = tdb_open(tdb_file, 0, 0, O_RDWR, 0);
3557         if (tdb == NULL) {
3558                 printf("Failed to open TDB file %s\n", tdb_file);
3559                 return -1;
3560         }
3561
3562         if (!strncmp(argv[1], "0x", 2)) {
3563                 key = hextodata(tmp_ctx, argv[1] + 2);
3564                 if (key.dsize == 0) {
3565                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[1]);
3566                         return -1;
3567                 }
3568         } else {
3569                 key.dptr  = discard_const(argv[1]);
3570                 key.dsize = strlen(argv[1]);
3571         }
3572
3573         if (!strncmp(argv[2], "0x", 2)) {
3574                 data = hextodata(tmp_ctx, argv[2] + 2);
3575                 if (data.dsize == 0) {
3576                         printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
3577                         return -1;
3578                 }
3579         } else {
3580                 data.dptr  = discard_const(argv[2]);
3581                 data.dsize = strlen(argv[2]);
3582         }
3583
3584         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3585                 printf("Not enough data. You must specify the full ctdb_ltdb_header too when storing\n");
3586                 return -1;
3587         }
3588         if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
3589                 printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
3590                 tdb_close(tdb);
3591                 return -1;
3592         }
3593
3594         tdb_close(tdb);
3595
3596         talloc_free(tmp_ctx);
3597         return 0;
3598 }
3599
3600 /*
3601   write a record to a persistent database
3602  */
3603 static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv)
3604 {
3605         const char *db_name;
3606         struct ctdb_db_context *ctdb_db;
3607         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3608         struct ctdb_transaction_handle *h;
3609         struct stat st;
3610         TDB_DATA key, data;
3611         int fd, ret;
3612
3613         if (argc < 3) {
3614                 talloc_free(tmp_ctx);
3615                 usage();
3616         }
3617
3618         fd = open(argv[2], O_RDONLY);
3619         if (fd == -1) {
3620                 DEBUG(DEBUG_ERR,("Failed to open file containing record data : %s  %s\n", argv[2], strerror(errno)));
3621                 talloc_free(tmp_ctx);
3622                 return -1;
3623         }
3624         
3625         ret = fstat(fd, &st);
3626         if (ret == -1) {
3627                 DEBUG(DEBUG_ERR,("fstat of file %s failed: %s\n", argv[2], strerror(errno)));
3628                 close(fd);
3629                 talloc_free(tmp_ctx);
3630                 return -1;
3631         }
3632
3633         if (!S_ISREG(st.st_mode)) {
3634                 DEBUG(DEBUG_ERR,("Not a regular file %s\n", argv[2]));
3635                 close(fd);
3636                 talloc_free(tmp_ctx);
3637                 return -1;
3638         }
3639
3640         data.dsize = st.st_size;
3641         if (data.dsize == 0) {
3642                 data.dptr  = NULL;
3643         } else {
3644                 data.dptr = talloc_size(tmp_ctx, data.dsize);
3645                 if (data.dptr == NULL) {
3646                         DEBUG(DEBUG_ERR,("Failed to talloc %d of memory to store record data\n", (int)data.dsize));
3647                         close(fd);
3648                         talloc_free(tmp_ctx);
3649                         return -1;
3650                 }
3651                 ret = read(fd, data.dptr, data.dsize);
3652                 if (ret != data.dsize) {
3653                         DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
3654                         close(fd);
3655                         talloc_free(tmp_ctx);
3656                         return -1;
3657                 }
3658         }
3659         close(fd);
3660
3661
3662         db_name = argv[0];
3663
3664         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
3665         if (ctdb_db == NULL) {
3666                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
3667                 talloc_free(tmp_ctx);
3668                 return -1;
3669         }
3670
3671         h = ctdb_transaction_start(ctdb_db, tmp_ctx);
3672         if (h == NULL) {
3673                 DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
3674                 talloc_free(tmp_ctx);
3675                 return -1;
3676         }
3677
3678         key.dptr  = discard_const(argv[1]);
3679         key.dsize = strlen(argv[1]);
3680         ret = ctdb_transaction_store(h, key, data);
3681         if (ret != 0) {
3682                 DEBUG(DEBUG_ERR,("Failed to store record\n"));
3683                 talloc_free(tmp_ctx);
3684                 return -1;
3685         }
3686
3687         ret = ctdb_transaction_commit(h);
3688         if (ret != 0) {
3689                 DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
3690                 talloc_free(tmp_ctx);
3691                 return -1;
3692         }
3693
3694
3695         talloc_free(tmp_ctx);
3696         return 0;
3697 }
3698
3699 /*
3700   check if a service is bound to a port or not
3701  */
3702 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
3703 {
3704         int s, ret;
3705         unsigned v;
3706         int port;
3707         struct sockaddr_in sin;
3708
3709         if (argc != 1) {
3710                 printf("Use: ctdb chktcport <port>\n");
3711                 return EINVAL;
3712         }
3713
3714         port = atoi(argv[0]);
3715
3716         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
3717         if (s == -1) {
3718                 printf("Failed to open local socket\n");
3719                 return errno;
3720         }
3721
3722         v = fcntl(s, F_GETFL, 0);
3723         fcntl(s, F_SETFL, v | O_NONBLOCK);
3724
3725         bzero(&sin, sizeof(sin));
3726         sin.sin_family = PF_INET;
3727         sin.sin_port   = htons(port);
3728         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
3729         close(s);
3730         if (ret == -1) {
3731                 printf("Failed to bind to local socket: %d %s\n", errno, strerror(errno));
3732                 return errno;
3733         }
3734
3735         return 0;
3736 }
3737
3738
3739
3740 static void log_handler(struct ctdb_context *ctdb, uint64_t srvid, 
3741                              TDB_DATA data, void *private_data)
3742 {
3743         DEBUG(DEBUG_ERR,("Log data received\n"));
3744         if (data.dsize > 0) {
3745                 printf("%s", data.dptr);
3746         }
3747
3748         exit(0);
3749 }
3750
3751 /*
3752   display a list of log messages from the in memory ringbuffer
3753  */
3754 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
3755 {
3756         int ret;
3757         int32_t res;
3758         struct ctdb_get_log_addr log_addr;
3759         TDB_DATA data;
3760         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3761         char *errmsg;
3762         struct timeval tv;
3763
3764         if (argc != 1) {
3765                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
3766                 talloc_free(tmp_ctx);
3767                 return -1;
3768         }
3769
3770         log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
3771         log_addr.srvid = getpid();
3772         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
3773                 log_addr.level = get_debug_by_desc(argv[0]);
3774         } else {
3775                 log_addr.level = strtol(argv[0], NULL, 0);
3776         }
3777
3778
3779         data.dptr = (unsigned char *)&log_addr;
3780         data.dsize = sizeof(log_addr);
3781
3782         DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
3783
3784         ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
3785         sleep(1);
3786
3787         DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
3788
3789         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
3790                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
3791         if (ret != 0 || res != 0) {
3792                 DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
3793                 talloc_free(tmp_ctx);
3794                 return -1;
3795         }
3796
3797
3798         tv = timeval_current();
3799         /* this loop will terminate when we have received the reply */
3800         while (timeval_elapsed(&tv) < 3.0) {    
3801                 event_loop_once(ctdb->ev);
3802         }
3803
3804         DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
3805
3806         talloc_free(tmp_ctx);
3807         return 0;
3808 }
3809
3810 /*
3811   clear the in memory log area
3812  */
3813 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
3814 {
3815         int ret;
3816         int32_t res;
3817         char *errmsg;
3818         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3819
3820         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
3821                            0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
3822         if (ret != 0 || res != 0) {
3823                 DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
3824                 talloc_free(tmp_ctx);
3825                 return -1;
3826         }
3827
3828         talloc_free(tmp_ctx);
3829         return 0;
3830 }
3831
3832
3833
3834 /*
3835   display a list of the databases on a remote ctdb
3836  */
3837 static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **argv)
3838 {
3839         int i, ret;
3840         struct ctdb_dbid_map *dbmap=NULL;
3841
3842         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
3843         if (ret != 0) {
3844                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3845                 return ret;
3846         }
3847
3848         if(options.machinereadable){
3849                 printf(":ID:Name:Path:Persistent:Unhealthy:ReadOnly:\n");
3850                 for(i=0;i<dbmap->num;i++){
3851                         const char *path;
3852                         const char *name;
3853                         const char *health;
3854                         bool persistent;
3855                         bool readonly;
3856
3857                         ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
3858                                             dbmap->dbs[i].dbid, ctdb, &path);
3859                         ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
3860                                             dbmap->dbs[i].dbid, ctdb, &name);
3861                         ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
3862                                               dbmap->dbs[i].dbid, ctdb, &health);
3863                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
3864                         readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
3865                         printf(":0x%08X:%s:%s:%d:%d:%d:\n",
3866                                dbmap->dbs[i].dbid, name, path,
3867                                !!(persistent), !!(health), !!(readonly));
3868                 }
3869                 return 0;
3870         }
3871
3872         printf("Number of databases:%d\n", dbmap->num);
3873         for(i=0;i<dbmap->num;i++){
3874                 const char *path;
3875                 const char *name;
3876                 const char *health;
3877                 bool persistent;
3878                 bool readonly;
3879
3880                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
3881                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
3882                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
3883                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
3884                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
3885                 printf("dbid:0x%08x name:%s path:%s%s%s%s\n",
3886                        dbmap->dbs[i].dbid, name, path,
3887                        persistent?" PERSISTENT":"",
3888                        readonly?" READONLY":"",
3889                        health?" UNHEALTHY":"");
3890         }
3891
3892         return 0;
3893 }
3894
3895 /*
3896   display the status of a database on a remote ctdb
3897  */
3898 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
3899 {
3900         int i, ret;
3901         struct ctdb_dbid_map *dbmap=NULL;
3902         const char *db_name;
3903
3904         if (argc < 1) {
3905                 usage();
3906         }
3907
3908         db_name = argv[0];
3909
3910         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
3911         if (ret != 0) {
3912                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
3913                 return ret;
3914         }
3915
3916         for(i=0;i<dbmap->num;i++){
3917                 const char *path;
3918                 const char *name;
3919                 const char *health;
3920                 bool persistent;
3921                 bool readonly;
3922
3923                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
3924                 if (strcmp(name, db_name) != 0) {
3925                         continue;
3926                 }
3927
3928                 ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
3929                 ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
3930                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
3931                 readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
3932                 printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nREADONLY: %s\nHEALTH: %s\n",
3933                        dbmap->dbs[i].dbid, name, path,
3934                        persistent?"yes":"no",
3935                        readonly?"yes":"no",
3936                        health?health:"OK");
3937                 return 0;
3938         }
3939
3940         DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
3941         return 0;
3942 }
3943
3944 /*
3945   check if the local node is recmaster or not
3946   it will return 1 if this node is the recmaster and 0 if it is not
3947   or if the local ctdb daemon could not be contacted
3948  */
3949 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
3950 {
3951         uint32_t mypnn, recmaster;
3952
3953         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
3954         if (mypnn == -1) {
3955                 printf("Failed to get pnn of node\n");
3956                 return 1;
3957         }
3958
3959         if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
3960                 printf("Failed to get the recmaster\n");
3961                 return 1;
3962         }
3963
3964         if (recmaster != mypnn) {
3965                 printf("this node is not the recmaster\n");
3966                 return 1;
3967         }
3968
3969         printf("this node is the recmaster\n");
3970         return 0;
3971 }
3972
3973 /*
3974   ping a node
3975  */
3976 static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
3977 {
3978         int ret;
3979         struct timeval tv = timeval_current();
3980         ret = ctdb_ctrl_ping(ctdb, options.pnn);
3981         if (ret == -1) {
3982                 printf("Unable to get ping response from node %u\n", options.pnn);
3983                 return -1;
3984         } else {
3985                 printf("response from %u time=%.6f sec  (%d clients)\n", 
3986                        options.pnn, timeval_elapsed(&tv), ret);
3987         }
3988         return 0;
3989 }
3990
3991
3992 /*
3993   get a tunable
3994  */
3995 static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv)
3996 {
3997         const char *name;
3998         uint32_t value;
3999         int ret;
4000
4001         if (argc < 1) {
4002                 usage();
4003         }
4004
4005         name = argv[0];
4006         ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
4007         if (ret == -1) {
4008                 DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
4009                 return -1;
4010         }
4011
4012         printf("%-23s = %u\n", name, value);
4013         return 0;
4014 }
4015
4016 /*
4017   set a tunable
4018  */
4019 static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv)
4020 {
4021         const char *name;
4022         uint32_t value;
4023         int ret;
4024
4025         if (argc < 2) {
4026                 usage();
4027         }
4028
4029         name = argv[0];
4030         value = strtoul(argv[1], NULL, 0);
4031
4032         ret = ctdb_ctrl_set_tunable(ctdb, TIMELIMIT(), options.pnn, name, value);
4033         if (ret == -1) {
4034                 DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
4035                 return -1;
4036         }
4037         return 0;
4038 }
4039
4040 /*
4041   list all tunables
4042  */
4043 static int control_listvars(struct ctdb_context *ctdb, int argc, const char **argv)
4044 {
4045         uint32_t count;
4046         const char **list;
4047         int ret, i;
4048
4049         ret = ctdb_ctrl_list_tunables(ctdb, TIMELIMIT(), options.pnn, ctdb, &list, &count);
4050         if (ret == -1) {
4051                 DEBUG(DEBUG_ERR, ("Unable to list tunable variables\n"));
4052                 return -1;
4053         }
4054
4055         for (i=0;i<count;i++) {
4056                 control_getvar(ctdb, 1, &list[i]);
4057         }
4058
4059         talloc_free(list);
4060         
4061         return 0;
4062 }
4063
4064 /*
4065   display debug level on a node
4066  */
4067 static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4068 {
4069         int ret;
4070         int32_t level;
4071
4072         ret = ctdb_ctrl_get_debuglevel(ctdb, options.pnn, &level);
4073         if (ret != 0) {
4074                 DEBUG(DEBUG_ERR, ("Unable to get debuglevel response from node %u\n", options.pnn));
4075                 return ret;
4076         } else {
4077                 if (options.machinereadable){
4078                         printf(":Name:Level:\n");
4079                         printf(":%s:%d:\n",get_debug_by_level(level),level);
4080                 } else {
4081                         printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
4082                 }
4083         }
4084         return 0;
4085 }
4086
4087 /*
4088   display reclock file of a node
4089  */
4090 static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4091 {
4092         int ret;
4093         const char *reclock;
4094
4095         ret = ctdb_ctrl_getreclock(ctdb, TIMELIMIT(), options.pnn, ctdb, &reclock);
4096         if (ret != 0) {
4097                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4098                 return ret;
4099         } else {
4100                 if (options.machinereadable){
4101                         if (reclock != NULL) {
4102                                 printf("%s", reclock);
4103                         }
4104                 } else {
4105                         if (reclock == NULL) {
4106                                 printf("No reclock file used.\n");
4107                         } else {
4108                                 printf("Reclock file:%s\n", reclock);
4109                         }
4110                 }
4111         }
4112         return 0;
4113 }
4114
4115 /*
4116   set the reclock file of a node
4117  */
4118 static int control_setreclock(struct ctdb_context *ctdb, int argc, const char **argv)
4119 {
4120         int ret;
4121         const char *reclock;
4122
4123         if (argc == 0) {
4124                 reclock = NULL;
4125         } else if (argc == 1) {
4126                 reclock = argv[0];
4127         } else {
4128                 usage();
4129         }
4130
4131         ret = ctdb_ctrl_setreclock(ctdb, TIMELIMIT(), options.pnn, reclock);
4132         if (ret != 0) {
4133                 DEBUG(DEBUG_ERR, ("Unable to get reclock file from node %u\n", options.pnn));
4134                 return ret;
4135         }
4136         return 0;
4137 }
4138
4139 /*
4140   set the natgw state on/off
4141  */
4142 static int control_setnatgwstate(struct ctdb_context *ctdb, int argc, const char **argv)
4143 {
4144         int ret;
4145         uint32_t natgwstate;
4146
4147         if (argc == 0) {
4148                 usage();
4149         }
4150
4151         if (!strcmp(argv[0], "on")) {
4152                 natgwstate = 1;
4153         } else if (!strcmp(argv[0], "off")) {
4154                 natgwstate = 0;
4155         } else {
4156                 usage();
4157         }
4158
4159         ret = ctdb_ctrl_setnatgwstate(ctdb, TIMELIMIT(), options.pnn, natgwstate);
4160         if (ret != 0) {
4161                 DEBUG(DEBUG_ERR, ("Unable to set the natgw state for node %u\n", options.pnn));
4162                 return ret;
4163         }
4164
4165         return 0;
4166 }
4167
4168 /*
4169   set the lmaster role on/off
4170  */
4171 static int control_setlmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4172 {
4173         int ret;
4174         uint32_t lmasterrole;
4175
4176         if (argc == 0) {
4177                 usage();
4178         }
4179
4180         if (!strcmp(argv[0], "on")) {
4181                 lmasterrole = 1;
4182         } else if (!strcmp(argv[0], "off")) {
4183                 lmasterrole = 0;
4184         } else {
4185                 usage();
4186         }
4187
4188         ret = ctdb_ctrl_setlmasterrole(ctdb, TIMELIMIT(), options.pnn, lmasterrole);
4189         if (ret != 0) {
4190                 DEBUG(DEBUG_ERR, ("Unable to set the lmaster role for node %u\n", options.pnn));
4191                 return ret;
4192         }
4193
4194         return 0;
4195 }
4196
4197 /*
4198   set the recmaster role on/off
4199  */
4200 static int control_setrecmasterrole(struct ctdb_context *ctdb, int argc, const char **argv)
4201 {
4202         int ret;
4203         uint32_t recmasterrole;
4204
4205         if (argc == 0) {
4206                 usage();
4207         }
4208
4209         if (!strcmp(argv[0], "on")) {
4210                 recmasterrole = 1;
4211         } else if (!strcmp(argv[0], "off")) {
4212                 recmasterrole = 0;
4213         } else {
4214                 usage();
4215         }
4216
4217         ret = ctdb_ctrl_setrecmasterrole(ctdb, TIMELIMIT(), options.pnn, recmasterrole);
4218         if (ret != 0) {
4219                 DEBUG(DEBUG_ERR, ("Unable to set the recmaster role for node %u\n", options.pnn));
4220                 return ret;
4221         }
4222
4223         return 0;
4224 }
4225
4226 /*
4227   set debug level on a node or all nodes
4228  */
4229 static int control_setdebug(struct ctdb_context *ctdb, int argc, const char **argv)
4230 {
4231         int i, ret;
4232         int32_t level;
4233
4234         if (argc == 0) {
4235                 printf("You must specify the debug level. Valid levels are:\n");
4236                 for (i=0; debug_levels[i].description != NULL; i++) {
4237                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4238                 }
4239
4240                 return 0;
4241         }
4242
4243         if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
4244                 level = get_debug_by_desc(argv[0]);
4245         } else {
4246                 level = strtol(argv[0], NULL, 0);
4247         }
4248
4249         for (i=0; debug_levels[i].description != NULL; i++) {
4250                 if (level == debug_levels[i].level) {
4251                         break;
4252                 }
4253         }
4254         if (debug_levels[i].description == NULL) {
4255                 printf("Invalid debug level, must be one of\n");
4256                 for (i=0; debug_levels[i].description != NULL; i++) {
4257                         printf("%s (%d)\n", debug_levels[i].description, debug_levels[i].level);
4258                 }
4259                 return -1;
4260         }
4261
4262         ret = ctdb_ctrl_set_debuglevel(ctdb, options.pnn, level);
4263         if (ret != 0) {
4264                 DEBUG(DEBUG_ERR, ("Unable to set debug level on node %u\n", options.pnn));
4265         }
4266         return 0;
4267 }
4268
4269
4270 /*
4271   thaw a node
4272  */
4273 static int control_thaw(struct ctdb_context *ctdb, int argc, const char **argv)
4274 {
4275         int ret;
4276         uint32_t priority;
4277         
4278         if (argc == 1) {
4279                 priority = strtol(argv[0], NULL, 0);
4280         } else {
4281                 priority = 0;
4282         }
4283         DEBUG(DEBUG_ERR,("Thaw by priority %u\n", priority));
4284
4285         ret = ctdb_ctrl_thaw_priority(ctdb, TIMELIMIT(), options.pnn, priority);
4286         if (ret != 0) {
4287                 DEBUG(DEBUG_ERR, ("Unable to thaw node %u\n", options.pnn));
4288         }               
4289         return 0;
4290 }
4291
4292
4293 /*
4294   attach to a database
4295  */
4296 static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv)
4297 {
4298         const char *db_name;
4299         struct ctdb_db_context *ctdb_db;
4300         bool persistent = false;
4301
4302         if (argc < 1) {
4303                 usage();
4304         }
4305         db_name = argv[0];
4306         if (argc > 2) {
4307                 usage();
4308         }
4309         if (argc == 2) {
4310                 if (strcmp(argv[1], "persistent") != 0) {
4311                         usage();
4312                 }
4313                 persistent = true;
4314         }
4315
4316         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
4317         if (ctdb_db == NULL) {
4318                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
4319                 return -1;
4320         }
4321
4322         return 0;
4323 }
4324
4325 /*
4326   set db priority
4327  */
4328 static int control_setdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4329 {
4330         struct ctdb_db_priority db_prio;
4331         int ret;
4332
4333         if (argc < 2) {
4334                 usage();
4335         }
4336
4337         db_prio.db_id    = strtoul(argv[0], NULL, 0);
4338         db_prio.priority = strtoul(argv[1], NULL, 0);
4339
4340         ret = ctdb_ctrl_set_db_priority(ctdb, TIMELIMIT(), options.pnn, &db_prio);
4341         if (ret != 0) {
4342                 DEBUG(DEBUG_ERR,("Unable to set db prio\n"));
4343                 return -1;
4344         }
4345
4346         return 0;
4347 }
4348
4349 /*
4350   get db priority
4351  */
4352 static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **argv)
4353 {
4354         uint32_t db_id, priority;
4355         int ret;
4356
4357         if (argc < 1) {
4358                 usage();
4359         }
4360
4361         db_id = strtoul(argv[0], NULL, 0);
4362
4363         ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
4364         if (ret != 0) {
4365                 DEBUG(DEBUG_ERR,("Unable to get db prio\n"));
4366                 return -1;
4367         }
4368
4369         DEBUG(DEBUG_ERR,("Priority:%u\n", priority));
4370
4371         return 0;
4372 }
4373
4374 /*
4375   set the readonly capability for a database
4376  */
4377 static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char **argv)
4378 {
4379         uint32_t db_id;
4380         int ret;
4381
4382         if (argc < 1) {
4383                 usage();
4384         }
4385
4386         db_id = strtoul(argv[0], NULL, 0);
4387
4388         ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
4389         if (ret != 0) {
4390                 DEBUG(DEBUG_ERR,("Unable to set db to support readonly\n"));
4391                 return -1;
4392         }
4393
4394         return 0;
4395 }
4396
4397 /*
4398   get db seqnum
4399  */
4400 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
4401 {
4402         bool ret;
4403         uint32_t db_id;
4404         uint64_t seqnum;
4405
4406         if (argc < 1) {
4407                 usage();
4408         }
4409
4410         db_id = strtoul(argv[0], NULL, 0);
4411
4412         ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &seqnum);
4413         if (!ret) {
4414                 DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
4415                 return -1;
4416         }
4417
4418         printf("Sequence number:%lld\n", (long long)seqnum);
4419
4420         return 0;
4421 }
4422
4423 /*
4424   run an eventscript on a node
4425  */
4426 static int control_eventscript(struct ctdb_context *ctdb, int argc, const char **argv)
4427 {
4428         TDB_DATA data;
4429         int ret;
4430         int32_t res;
4431         char *errmsg;
4432         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4433
4434         if (argc != 1) {
4435                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4436                 return -1;
4437         }
4438
4439         data.dptr = (unsigned char *)discard_const(argv[0]);
4440         data.dsize = strlen((char *)data.dptr) + 1;
4441
4442         DEBUG(DEBUG_ERR, ("Running eventscripts with arguments \"%s\" on node %u\n", data.dptr, options.pnn));
4443
4444         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RUN_EVENTSCRIPTS,
4445                            0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
4446         if (ret != 0 || res != 0) {
4447                 DEBUG(DEBUG_ERR,("Failed to run eventscripts - %s\n", errmsg));
4448                 talloc_free(tmp_ctx);
4449                 return -1;
4450         }
4451         talloc_free(tmp_ctx);
4452         return 0;
4453 }
4454
4455 #define DB_VERSION 1
4456 #define MAX_DB_NAME 64
4457 struct db_file_header {
4458         unsigned long version;
4459         time_t timestamp;
4460         unsigned long persistent;
4461         unsigned long size;
4462         const char name[MAX_DB_NAME];
4463 };
4464
4465 struct backup_data {
4466         struct ctdb_marshall_buffer *records;
4467         uint32_t len;
4468         uint32_t total;
4469         bool traverse_error;
4470 };
4471
4472 static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
4473 {
4474         struct backup_data *bd = talloc_get_type(private, struct backup_data);
4475         struct ctdb_rec_data *rec;
4476
4477         /* add the record */
4478         rec = ctdb_marshall_record(bd->records, 0, key, NULL, data);
4479         if (rec == NULL) {
4480                 bd->traverse_error = true;
4481                 DEBUG(DEBUG_ERR,("Failed to marshall record\n"));
4482                 return -1;
4483         }
4484         bd->records = talloc_realloc_size(NULL, bd->records, rec->length + bd->len);
4485         if (bd->records == NULL) {
4486                 DEBUG(DEBUG_ERR,("Failed to expand marshalling buffer\n"));
4487                 bd->traverse_error = true;
4488                 return -1;
4489         }
4490         bd->records->count++;
4491         memcpy(bd->len+(uint8_t *)bd->records, rec, rec->length);
4492         bd->len += rec->length;
4493         talloc_free(rec);
4494
4495         bd->total++;
4496         return 0;
4497 }
4498
4499 /*
4500  * backup a database to a file 
4501  */
4502 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
4503 {
4504         int i, ret;
4505         struct ctdb_dbid_map *dbmap=NULL;
4506         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4507         struct db_file_header dbhdr;
4508         struct ctdb_db_context *ctdb_db;
4509         struct backup_data *bd;
4510         int fh = -1;
4511         int status = -1;
4512         const char *reason = NULL;
4513
4514         if (argc != 2) {
4515                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4516                 return -1;
4517         }
4518
4519         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
4520         if (ret != 0) {
4521                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
4522                 return ret;
4523         }
4524
4525         for(i=0;i<dbmap->num;i++){
4526                 const char *name;
4527
4528                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
4529                 if(!strcmp(argv[0], name)){
4530                         talloc_free(discard_const(name));
4531                         break;
4532                 }
4533                 talloc_free(discard_const(name));
4534         }
4535         if (i == dbmap->num) {
4536                 DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
4537                 talloc_free(tmp_ctx);
4538                 return -1;
4539         }
4540
4541         ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
4542                                     dbmap->dbs[i].dbid, tmp_ctx, &reason);
4543         if (ret != 0) {
4544                 DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
4545                                  argv[0]));
4546                 talloc_free(tmp_ctx);
4547                 return -1;
4548         }
4549         if (reason) {
4550                 uint32_t allow_unhealthy = 0;
4551
4552                 ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn,
4553                                       "AllowUnhealthyDBRead",
4554                                       &allow_unhealthy);
4555
4556                 if (allow_unhealthy != 1) {
4557                         DEBUG(DEBUG_ERR,("database '%s' is unhealthy: %s\n",
4558                                          argv[0], reason));
4559
4560                         DEBUG(DEBUG_ERR,("disallow backup : tunable AllowUnhealthyDBRead = %u\n",
4561                                          allow_unhealthy));
4562                         talloc_free(tmp_ctx);
4563                         return -1;
4564                 }
4565
4566                 DEBUG(DEBUG_WARNING,("WARNING database '%s' is unhealthy - see 'ctdb getdbstatus %s'\n",
4567                                      argv[0], argv[0]));
4568                 DEBUG(DEBUG_WARNING,("WARNING! allow backup of unhealthy database: "
4569                                      "tunnable AllowUnhealthyDBRead = %u\n",
4570                                      allow_unhealthy));
4571         }
4572
4573         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
4574         if (ctdb_db == NULL) {
4575                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
4576                 talloc_free(tmp_ctx);
4577                 return -1;
4578         }
4579
4580
4581         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
4582         if (ret == -1) {
4583                 DEBUG(DEBUG_ERR,("Failed to start transaction\n"));
4584                 talloc_free(tmp_ctx);
4585                 return -1;
4586         }
4587
4588
4589         bd = talloc_zero(tmp_ctx, struct backup_data);
4590         if (bd == NULL) {
4591                 DEBUG(DEBUG_ERR,("Failed to allocate backup_data\n"));
4592                 talloc_free(tmp_ctx);
4593                 return -1;
4594         }
4595
4596         bd->records = talloc_zero(bd, struct ctdb_marshall_buffer);
4597         if (bd->records == NULL) {
4598                 DEBUG(DEBUG_ERR,("Failed to allocate ctdb_marshall_buffer\n"));
4599                 talloc_free(tmp_ctx);
4600                 return -1;
4601         }
4602
4603         bd->len = offsetof(struct ctdb_marshall_buffer, data);
4604         bd->records->db_id = ctdb_db->db_id;
4605         /* traverse the database collecting all records */
4606         if (tdb_traverse_read(ctdb_db->ltdb->tdb, backup_traverse, bd) == -1 ||
4607             bd->traverse_error) {
4608                 DEBUG(DEBUG_ERR,("Traverse error\n"));
4609                 talloc_free(tmp_ctx);
4610                 return -1;              
4611         }
4612
4613         tdb_transaction_cancel(ctdb_db->ltdb->tdb);
4614
4615
4616         fh = open(argv[1], O_RDWR|O_CREAT, 0600);
4617         if (fh == -1) {
4618                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[1]));
4619                 talloc_free(tmp_ctx);
4620                 return -1;
4621         }
4622
4623         dbhdr.version = DB_VERSION;
4624         dbhdr.timestamp = time(NULL);
4625         dbhdr.persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
4626         dbhdr.size = bd->len;
4627         if (strlen(argv[0]) >= MAX_DB_NAME) {
4628                 DEBUG(DEBUG_ERR,("Too long dbname\n"));
4629                 goto done;
4630         }
4631         strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
4632         ret = write(fh, &dbhdr, sizeof(dbhdr));
4633         if (ret == -1) {
4634                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
4635                 goto done;
4636         }
4637         ret = write(fh, bd->records, bd->len);
4638         if (ret == -1) {
4639                 DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
4640                 goto done;
4641         }
4642
4643         status = 0;
4644 done:
4645         if (fh != -1) {
4646                 ret = close(fh);
4647                 if (ret == -1) {
4648                         DEBUG(DEBUG_ERR,("close failed: %s\n", strerror(errno)));
4649                 }
4650         }
4651
4652         DEBUG(DEBUG_ERR,("Database backed up to %s\n", argv[1]));
4653
4654         talloc_free(tmp_ctx);
4655         return status;
4656 }
4657
4658 /*
4659  * restore a database from a file 
4660  */
4661 static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **argv)
4662 {
4663         int ret;
4664         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4665         TDB_DATA outdata;
4666         TDB_DATA data;
4667         struct db_file_header dbhdr;
4668         struct ctdb_db_context *ctdb_db;
4669         struct ctdb_node_map *nodemap=NULL;
4670         struct ctdb_vnn_map *vnnmap=NULL;
4671         int i, fh;
4672         struct ctdb_control_wipe_database w;
4673         uint32_t *nodes;
4674         uint32_t generation;
4675         struct tm *tm;
4676         char tbuf[100];
4677         char *dbname;
4678
4679         if (argc < 1 || argc > 2) {
4680                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4681                 return -1;
4682         }
4683
4684         fh = open(argv[0], O_RDONLY);
4685         if (fh == -1) {
4686                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
4687                 talloc_free(tmp_ctx);
4688                 return -1;
4689         }
4690
4691         read(fh, &dbhdr, sizeof(dbhdr));
4692         if (dbhdr.version != DB_VERSION) {
4693                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
4694                 talloc_free(tmp_ctx);
4695                 return -1;
4696         }
4697
4698         dbname = discard_const(dbhdr.name);
4699         if (argc == 2) {
4700                 dbname = discard_const(argv[1]);
4701         }
4702
4703         outdata.dsize = dbhdr.size;
4704         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
4705         if (outdata.dptr == NULL) {
4706                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
4707                 close(fh);
4708                 talloc_free(tmp_ctx);
4709                 return -1;
4710         }               
4711         read(fh, outdata.dptr, outdata.dsize);
4712         close(fh);
4713
4714         tm = localtime(&dbhdr.timestamp);
4715         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
4716         printf("Restoring database '%s' from backup @ %s\n",
4717                 dbname, tbuf);
4718
4719
4720         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), dbname, dbhdr.persistent, 0);
4721         if (ctdb_db == NULL) {
4722                 DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", dbname));
4723                 talloc_free(tmp_ctx);
4724                 return -1;
4725         }
4726
4727         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
4728         if (ret != 0) {
4729                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
4730                 talloc_free(tmp_ctx);
4731                 return ret;
4732         }
4733
4734
4735         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
4736         if (ret != 0) {
4737                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
4738                 talloc_free(tmp_ctx);
4739                 return ret;
4740         }
4741
4742         /* freeze all nodes */
4743         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4744         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
4745                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
4746                                         nodes, i,
4747                                         TIMELIMIT(),
4748                                         false, tdb_null,
4749                                         NULL, NULL,
4750                                         NULL) != 0) {
4751                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
4752                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4753                         talloc_free(tmp_ctx);
4754                         return -1;
4755                 }
4756         }
4757
4758         generation = vnnmap->generation;
4759         data.dptr = (void *)&generation;
4760         data.dsize = sizeof(generation);
4761
4762         /* start a cluster wide transaction */
4763         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4764         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
4765                                         nodes, 0,
4766                                         TIMELIMIT(), false, data,
4767                                         NULL, NULL,
4768                                         NULL) != 0) {
4769                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide transactions.\n"));
4770                 return -1;
4771         }
4772
4773
4774         w.db_id = ctdb_db->db_id;
4775         w.transaction_id = generation;
4776
4777         data.dptr = (void *)&w;
4778         data.dsize = sizeof(w);
4779
4780         /* wipe all the remote databases. */
4781         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4782         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
4783                                         nodes, 0,
4784                                         TIMELIMIT(), false, data,
4785                                         NULL, NULL,
4786                                         NULL) != 0) {
4787                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
4788                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4789                 talloc_free(tmp_ctx);
4790                 return -1;
4791         }
4792         
4793         /* push the database */
4794         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4795         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
4796                                         nodes, 0,
4797                                         TIMELIMIT(), false, outdata,
4798                                         NULL, NULL,
4799                                         NULL) != 0) {
4800                 DEBUG(DEBUG_ERR, ("Failed to push database.\n"));
4801                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4802                 talloc_free(tmp_ctx);
4803                 return -1;
4804         }
4805
4806         data.dptr = (void *)&ctdb_db->db_id;
4807         data.dsize = sizeof(ctdb_db->db_id);
4808
4809         /* mark the database as healthy */
4810         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4811         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
4812                                         nodes, 0,
4813                                         TIMELIMIT(), false, data,
4814                                         NULL, NULL,
4815                                         NULL) != 0) {
4816                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
4817                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4818                 talloc_free(tmp_ctx);
4819                 return -1;
4820         }
4821
4822         data.dptr = (void *)&generation;
4823         data.dsize = sizeof(generation);
4824
4825         /* commit all the changes */
4826         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
4827                                         nodes, 0,
4828                                         TIMELIMIT(), false, data,
4829                                         NULL, NULL,
4830                                         NULL) != 0) {
4831                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
4832                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4833                 talloc_free(tmp_ctx);
4834                 return -1;
4835         }
4836
4837
4838         /* thaw all nodes */
4839         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
4840         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
4841                                         nodes, 0,
4842                                         TIMELIMIT(),
4843                                         false, tdb_null,
4844                                         NULL, NULL,
4845                                         NULL) != 0) {
4846                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
4847                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
4848                 talloc_free(tmp_ctx);
4849                 return -1;
4850         }
4851
4852
4853         talloc_free(tmp_ctx);
4854         return 0;
4855 }
4856
4857 /*
4858  * dump a database backup from a file
4859  */
4860 static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char **argv)
4861 {
4862         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4863         TDB_DATA outdata;
4864         struct db_file_header dbhdr;
4865         int i, fh;
4866         struct tm *tm;
4867         char tbuf[100];
4868         struct ctdb_rec_data *rec = NULL;
4869         struct ctdb_marshall_buffer *m;
4870         struct ctdb_dump_db_context c;
4871
4872         if (argc != 1) {
4873                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4874                 return -1;
4875         }
4876
4877         fh = open(argv[0], O_RDONLY);
4878         if (fh == -1) {
4879                 DEBUG(DEBUG_ERR,("Failed to open file '%s'\n", argv[0]));
4880                 talloc_free(tmp_ctx);
4881                 return -1;
4882         }
4883
4884         read(fh, &dbhdr, sizeof(dbhdr));
4885         if (dbhdr.version != DB_VERSION) {
4886                 DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
4887                 talloc_free(tmp_ctx);
4888                 return -1;
4889         }
4890
4891         outdata.dsize = dbhdr.size;
4892         outdata.dptr = talloc_size(tmp_ctx, outdata.dsize);
4893         if (outdata.dptr == NULL) {
4894                 DEBUG(DEBUG_ERR,("Failed to allocate data of size '%lu'\n", dbhdr.size));
4895                 close(fh);
4896                 talloc_free(tmp_ctx);
4897                 return -1;
4898         }
4899         read(fh, outdata.dptr, outdata.dsize);
4900         close(fh);
4901         m = (struct ctdb_marshall_buffer *)outdata.dptr;
4902
4903         tm = localtime(&dbhdr.timestamp);
4904         strftime(tbuf,sizeof(tbuf)-1,"%Y/%m/%d %H:%M:%S", tm);
4905         printf("Backup of database name:'%s' dbid:0x%x08x from @ %s\n",
4906                 dbhdr.name, m->db_id, tbuf);
4907
4908         ZERO_STRUCT(c);
4909         c.f = stdout;
4910         c.printemptyrecords = (bool)options.printemptyrecords;
4911         c.printdatasize = (bool)options.printdatasize;
4912         c.printlmaster = false;
4913         c.printhash = (bool)options.printhash;
4914         c.printrecordflags = (bool)options.printrecordflags;
4915
4916         for (i=0; i < m->count; i++) {
4917                 uint32_t reqid = 0;
4918                 TDB_DATA key, data;
4919
4920                 /* we do not want the header splitted, so we pass NULL*/
4921                 rec = ctdb_marshall_loop_next(m, rec, &reqid,
4922                                               NULL, &key, &data);
4923
4924                 ctdb_dumpdb_record(ctdb, key, data, &c);
4925         }
4926
4927         printf("Dumped %d records\n", i);
4928         talloc_free(tmp_ctx);
4929         return 0;
4930 }
4931
4932 /*
4933  * wipe a database from a file
4934  */
4935 static int control_wipedb(struct ctdb_context *ctdb, int argc,
4936                           const char **argv)
4937 {
4938         int ret;
4939         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
4940         TDB_DATA data;
4941         struct ctdb_db_context *ctdb_db;
4942         struct ctdb_node_map *nodemap = NULL;
4943         struct ctdb_vnn_map *vnnmap = NULL;
4944         int i;
4945         struct ctdb_control_wipe_database w;
4946         uint32_t *nodes;
4947         uint32_t generation;
4948         struct ctdb_dbid_map *dbmap = NULL;
4949
4950         if (argc != 1) {
4951                 DEBUG(DEBUG_ERR,("Invalid arguments\n"));
4952                 return -1;
4953         }
4954
4955         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
4956                                  &dbmap);
4957         if (ret != 0) {
4958                 DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
4959                                   options.pnn));
4960                 return ret;
4961         }
4962
4963         for(i=0;i<dbmap->num;i++){
4964                 const char *name;
4965
4966                 ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
4967                                     dbmap->dbs[i].dbid, tmp_ctx, &name);
4968                 if(!strcmp(argv[0], name)){
4969                         talloc_free(discard_const(name));
4970                         break;
4971                 }
4972                 talloc_free(discard_const(name));
4973         }
4974         if (i == dbmap->num) {
4975                 DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
4976                                   argv[0]));
4977                 talloc_free(tmp_ctx);
4978                 return -1;
4979         }
4980
4981         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
4982         if (ctdb_db == NULL) {
4983                 DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
4984                                   argv[0]));
4985                 talloc_free(tmp_ctx);
4986                 return -1;
4987         }
4988
4989         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb,
4990                                    &nodemap);
4991         if (ret != 0) {
4992                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
4993                                   options.pnn));
4994                 talloc_free(tmp_ctx);
4995                 return ret;
4996         }
4997
4998         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
4999                                   &vnnmap);
5000         if (ret != 0) {
5001                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n",
5002                                   options.pnn));
5003                 talloc_free(tmp_ctx);
5004                 return ret;
5005         }
5006
5007         /* freeze all nodes */
5008         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5009         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
5010                 ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
5011                                                 nodes, i,
5012                                                 TIMELIMIT(),
5013                                                 false, tdb_null,
5014                                                 NULL, NULL,
5015                                                 NULL);
5016                 if (ret != 0) {
5017                         DEBUG(DEBUG_ERR, ("Unable to freeze nodes.\n"));
5018                         ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn,
5019                                              CTDB_RECOVERY_ACTIVE);
5020                         talloc_free(tmp_ctx);
5021                         return -1;
5022                 }
5023         }
5024
5025         generation = vnnmap->generation;
5026         data.dptr = (void *)&generation;
5027         data.dsize = sizeof(generation);
5028
5029         /* start a cluster wide transaction */
5030         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5031         ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
5032                                         nodes, 0,
5033                                         TIMELIMIT(), false, data,
5034                                         NULL, NULL,
5035                                         NULL);
5036         if (ret!= 0) {
5037                 DEBUG(DEBUG_ERR, ("Unable to start cluster wide "
5038                                   "transactions.\n"));
5039                 return -1;
5040         }
5041
5042         w.db_id = ctdb_db->db_id;
5043         w.transaction_id = generation;
5044
5045         data.dptr = (void *)&w;
5046         data.dsize = sizeof(w);
5047
5048         /* wipe all the remote databases. */
5049         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5050         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
5051                                         nodes, 0,
5052                                         TIMELIMIT(), false, data,
5053                                         NULL, NULL,
5054                                         NULL) != 0) {
5055                 DEBUG(DEBUG_ERR, ("Unable to wipe database.\n"));
5056                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5057                 talloc_free(tmp_ctx);
5058                 return -1;
5059         }
5060
5061         data.dptr = (void *)&ctdb_db->db_id;
5062         data.dsize = sizeof(ctdb_db->db_id);
5063
5064         /* mark the database as healthy */
5065         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5066         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DB_SET_HEALTHY,
5067                                         nodes, 0,
5068                                         TIMELIMIT(), false, data,
5069                                         NULL, NULL,
5070                                         NULL) != 0) {
5071                 DEBUG(DEBUG_ERR, ("Failed to mark database as healthy.\n"));
5072                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5073                 talloc_free(tmp_ctx);
5074                 return -1;
5075         }
5076
5077         data.dptr = (void *)&generation;
5078         data.dsize = sizeof(generation);
5079
5080         /* commit all the changes */
5081         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
5082                                         nodes, 0,
5083                                         TIMELIMIT(), false, data,
5084                                         NULL, NULL,
5085                                         NULL) != 0) {
5086                 DEBUG(DEBUG_ERR, ("Unable to commit databases.\n"));
5087                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5088                 talloc_free(tmp_ctx);
5089                 return -1;
5090         }
5091
5092         /* thaw all nodes */
5093         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
5094         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
5095                                         nodes, 0,
5096                                         TIMELIMIT(),
5097                                         false, tdb_null,
5098                                         NULL, NULL,
5099                                         NULL) != 0) {
5100                 DEBUG(DEBUG_ERR, ("Unable to thaw nodes.\n"));
5101                 ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
5102                 talloc_free(tmp_ctx);
5103                 return -1;
5104         }
5105
5106         DEBUG(DEBUG_ERR, ("Database wiped.\n"));
5107
5108         talloc_free(tmp_ctx);
5109         return 0;
5110 }
5111
5112 /*
5113   dump memory usage
5114  */
5115 static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5116 {
5117         TDB_DATA data;
5118         int ret;
5119         int32_t res;
5120         char *errmsg;
5121         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
5122         ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_DUMP_MEMORY,
5123                            0, tdb_null, tmp_ctx, &data, &res, NULL, &errmsg);
5124         if (ret != 0 || res != 0) {
5125                 DEBUG(DEBUG_ERR,("Failed to dump memory - %s\n", errmsg));
5126                 talloc_free(tmp_ctx);
5127                 return -1;
5128         }
5129         write(1, data.dptr, data.dsize);
5130         talloc_free(tmp_ctx);
5131         return 0;
5132 }
5133
5134 /*
5135   handler for memory dumps
5136 */
5137 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5138                              TDB_DATA data, void *private_data)
5139 {
5140         write(1, data.dptr, data.dsize);
5141         exit(0);
5142 }
5143
5144 /*
5145   dump memory usage on the recovery daemon
5146  */
5147 static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char **argv)
5148 {
5149         int ret;
5150         TDB_DATA data;
5151         struct rd_memdump_reply rd;
5152
5153         rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
5154         if (rd.pnn == -1) {
5155                 DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
5156                 return -1;
5157         }
5158         rd.srvid = getpid();
5159
5160         /* register a message port for receiveing the reply so that we
5161            can receive the reply
5162         */
5163         ctdb_client_set_message_handler(ctdb, rd.srvid, mem_dump_handler, NULL);
5164
5165
5166         data.dptr = (uint8_t *)&rd;
5167         data.dsize = sizeof(rd);
5168
5169         ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_MEM_DUMP, data);
5170         if (ret != 0) {
5171                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5172                 return -1;
5173         }
5174
5175         /* this loop will terminate when we have received the reply */
5176         while (1) {     
5177                 event_loop_once(ctdb->ev);
5178         }
5179
5180         return 0;
5181 }
5182
5183 /*
5184   send a message to a srvid
5185  */
5186 static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
5187 {
5188         unsigned long srvid;
5189         int ret;
5190         TDB_DATA data;
5191
5192         if (argc < 2) {
5193                 usage();
5194         }
5195
5196         srvid      = strtoul(argv[0], NULL, 0);
5197
5198         data.dptr = (uint8_t *)discard_const(argv[1]);
5199         data.dsize= strlen(argv[1]);
5200
5201         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
5202         if (ret != 0) {
5203                 DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
5204                 return -1;
5205         }
5206
5207         return 0;
5208 }
5209
5210 /*
5211   handler for msglisten
5212 */
5213 static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid, 
5214                              TDB_DATA data, void *private_data)
5215 {
5216         int i;
5217
5218         printf("Message received: ");
5219         for (i=0;i<data.dsize;i++) {
5220                 printf("%c", data.dptr[i]);
5221         }
5222         printf("\n");
5223 }
5224
5225 /*
5226   listen for messages on a messageport
5227  */
5228 static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
5229 {
5230         uint64_t srvid;
5231
5232         srvid = getpid();
5233
5234         /* register a message port and listen for messages
5235         */
5236         ctdb_client_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
5237         printf("Listening for messages on srvid:%d\n", (int)srvid);
5238
5239         while (1) {     
5240                 event_loop_once(ctdb->ev);
5241         }
5242
5243         return 0;
5244 }
5245
5246 /*
5247   list all nodes in the cluster
5248   we parse the nodes file directly
5249  */
5250 static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **argv)
5251 {
5252         TALLOC_CTX *mem_ctx = talloc_new(NULL);
5253         struct pnn_node *pnn_nodes;
5254         struct pnn_node *pnn_node;
5255
5256         pnn_nodes = read_nodes_file(mem_ctx);
5257         if (pnn_nodes == NULL) {
5258                 DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
5259                 talloc_free(mem_ctx);
5260                 return -1;
5261         }
5262
5263         for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
5264                 ctdb_sock_addr addr;
5265                 if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
5266                         DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
5267                         talloc_free(mem_ctx);
5268                         return -1;
5269                 }
5270                 if (options.machinereadable){
5271                         printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
5272                 } else {
5273                         printf("%s\n", pnn_node->addr);
5274                 }
5275         }
5276         talloc_free(mem_ctx);
5277
5278         return 0;
5279 }
5280
5281 /*
5282   reload the nodes file on the local node
5283  */
5284 static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const char **argv)
5285 {
5286         int i, ret;
5287         int mypnn;
5288         struct ctdb_node_map *nodemap=NULL;
5289
5290         mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
5291         if (mypnn == -1) {
5292                 DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
5293                 return -1;
5294         }
5295
5296         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
5297         if (ret != 0) {
5298                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
5299                 return ret;
5300         }
5301
5302         /* reload the nodes file on all remote nodes */
5303         for (i=0;i<nodemap->num;i++) {
5304                 if (nodemap->nodes[i].pnn == mypnn) {
5305                         continue;
5306                 }
5307                 DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", nodemap->nodes[i].pnn));
5308                 ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(),
5309                         nodemap->nodes[i].pnn);
5310                 if (ret != 0) {
5311                         DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", nodemap->nodes[i].pnn));
5312                 }
5313         }
5314
5315         /* reload the nodes file on the local node */
5316         DEBUG(DEBUG_NOTICE, ("Reloading nodes file on node %u\n", mypnn));
5317         ret = ctdb_ctrl_reload_nodes_file(ctdb, TIMELIMIT(), mypnn);
5318         if (ret != 0) {
5319                 DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
5320         }
5321
5322         /* initiate a recovery */
5323         control_recover(ctdb, argc, argv);
5324
5325         return 0;
5326 }
5327
5328
5329 static const struct {
5330         const char *name;
5331         int (*fn)(struct ctdb_context *, int, const char **);
5332         bool auto_all;
5333         bool without_daemon; /* can be run without daemon running ? */
5334         const char *msg;
5335         const char *args;
5336 } ctdb_commands[] = {
5337 #ifdef CTDB_VERS
5338         { "version",         control_version,           true,   false,  "show version of ctdb" },
5339 #endif
5340         { "status",          control_status,            true,   false,  "show node status" },
5341         { "uptime",          control_uptime,            true,   false,  "show node uptime" },
5342         { "ping",            control_ping,              true,   false,  "ping all nodes" },
5343         { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
5344         { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
5345         { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
5346         { "statistics",      control_statistics,        false,  false, "show statistics" },
5347         { "statisticsreset", control_statistics_reset,  true,   false,  "reset statistics"},
5348         { "stats",           control_stats,             false,  false,  "show rolling statistics", "[number of history records]" },
5349         { "ip",              control_ip,                false,  false,  "show which public ip's that ctdb manages" },
5350         { "ipinfo",          control_ipinfo,            true,   false,  "show details about a public ip that ctdb manages", "<ip>" },
5351         { "ifaces",          control_ifaces,            true,   false,  "show which interfaces that ctdb manages" },
5352         { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
5353         { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
5354         { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
5355         { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
5356         { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname>"},
5357         { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname>"},
5358         { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
5359         { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
5360         { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
5361         { "lvs",             control_lvs,               true,   false,  "show lvs configuration" },
5362         { "lvsmaster",       control_lvsmaster,         true,   false,  "show which node is the lvs master" },
5363         { "disablemonitor",      control_disable_monmode,true,  false,  "set monitoring mode to DISABLE" },
5364         { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
5365         { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
5366         { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
5367         { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
5368         { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
5369         { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
5370         { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
5371         { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
5372         { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
5373         { "disable",         control_disable,           true,   false,  "disable a nodes public IP" },
5374         { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
5375         { "stop",            control_stop,              true,   false,  "stop a node" },
5376         { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
5377         { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
5378         { "unban",           control_unban,             true,   false,  "unban a node" },
5379         { "showban",         control_showban,           true,   false,  "show ban information"},
5380         { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
5381         { "recover",         control_recover,           true,   false,  "force recovery" },
5382         { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
5383         { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
5384         { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
5385         { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
5386         { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
5387         { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
5388         { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
5389         { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
5390         { "addtickle",       control_add_tickle,        false,  false, "add a tickle for this ip", "<ip>:<port> <ip>:<port>" },
5391
5392         { "deltickle",       control_del_tickle,        false,  false, "delete a tickle from this ip", "<ip>:<port> <ip>:<port>" },
5393
5394         { "regsrvid",        regsrvid,                  false,  false, "register a server id", "<pnn> <type> <id>" },
5395         { "unregsrvid",      unregsrvid,                false,  false, "unregister a server id", "<pnn> <type> <id>" },
5396         { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
5397         { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
5398         { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
5399         { "vacuum",          ctdb_vacuum,               false,  true, "vacuum the databases of empty records", "[max_records]"},
5400         { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
5401         { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
5402         { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
5403         { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
5404         { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
5405         { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
5406         { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
5407         { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
5408         { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
5409         { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
5410         { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
5411         { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
5412         { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
5413         { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
5414         { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
5415         { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
5416         { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
5417         { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
5418         { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
5419         { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
5420         { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
5421         { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
5422         { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
5423         { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
5424         { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbid>"},
5425         { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
5426         { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
5427         { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
5428         { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<db> <key> [<file>]" },
5429         { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
5430         { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
5431         { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data+header>" },
5432         { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
5433         { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
5434         { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
5435         { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbid>" },
5436         { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status" },
5437 };
5438
5439 /*
5440   show usage message
5441  */
5442 static void usage(void)
5443 {
5444         int i;
5445         printf(
5446 "Usage: ctdb [options] <control>\n" \
5447 "Options:\n" \
5448 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
5449 "   -Y                 generate machinereadable output\n"
5450 "   -v                 generate verbose output\n"
5451 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
5452         printf("Controls:\n");
5453         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
5454                 printf("  %-15s %-27s  %s\n", 
5455                        ctdb_commands[i].name, 
5456                        ctdb_commands[i].args?ctdb_commands[i].args:"",
5457                        ctdb_commands[i].msg);
5458         }
5459         exit(1);
5460 }
5461
5462
5463 static void ctdb_alarm(int sig)
5464 {
5465         printf("Maximum runtime exceeded - exiting\n");
5466         _exit(ERR_TIMEOUT);
5467 }
5468
5469 /*
5470   main program
5471 */
5472 int main(int argc, const char *argv[])
5473 {
5474         struct ctdb_context *ctdb;
5475         char *nodestring = NULL;
5476         struct poptOption popt_options[] = {
5477                 POPT_AUTOHELP
5478                 POPT_CTDB_CMDLINE
5479                 { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
5480                 { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
5481                 { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
5482                 { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
5483                 { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
5484                 { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
5485                 { "print-datasize", 0, POPT_ARG_NONE, &options.printdatasize, 0, "do not print record data when dumping databases, only the data size", NULL },
5486                 { "print-lmaster", 0, POPT_ARG_NONE, &options.printlmaster, 0, "print the record's lmaster in catdb", NULL },
5487                 { "print-hash", 0, POPT_ARG_NONE, &options.printhash, 0, "print the record's hash when dumping databases", NULL },
5488                 { "print-recordflags", 0, POPT_ARG_NONE, &options.printrecordflags, 0, "print the record flags in catdb and dumpdbbackup", NULL },
5489                 POPT_TABLEEND
5490         };
5491         int opt;
5492         const char **extra_argv;
5493         int extra_argc = 0;
5494         int ret=-1, i;
5495         poptContext pc;
5496         struct event_context *ev;
5497         const char *control;
5498         const char *socket_name;
5499
5500         setlinebuf(stdout);
5501         
5502         /* set some defaults */
5503         options.maxruntime = 0;
5504         options.timelimit = 3;
5505         options.pnn = CTDB_CURRENT_NODE;
5506
5507         pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
5508
5509         while ((opt = poptGetNextOpt(pc)) != -1) {
5510                 switch (opt) {
5511                 default:
5512                         DEBUG(DEBUG_ERR, ("Invalid option %s: %s\n", 
5513                                 poptBadOption(pc, 0), poptStrerror(opt)));
5514                         exit(1);
5515                 }
5516         }
5517
5518         /* setup the remaining options for the main program to use */
5519         extra_argv = poptGetArgs(pc);
5520         if (extra_argv) {
5521                 extra_argv++;
5522                 while (extra_argv[extra_argc]) extra_argc++;
5523         }
5524
5525         if (extra_argc < 1) {
5526                 usage();
5527         }
5528
5529         if (options.maxruntime == 0) {
5530                 const char *ctdb_timeout;
5531                 ctdb_timeout = getenv("CTDB_TIMEOUT");
5532                 if (ctdb_timeout != NULL) {
5533                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
5534                 } else {
5535                         /* default timeout is 120 seconds */
5536                         options.maxruntime = 120;
5537                 }
5538         }
5539
5540         signal(SIGALRM, ctdb_alarm);
5541         alarm(options.maxruntime);
5542
5543         control = extra_argv[0];
5544
5545         ev = event_context_init(NULL);
5546         if (!ev) {
5547                 DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
5548                 exit(1);
5549         }
5550         tevent_loop_allow_nesting(ev);
5551
5552         for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
5553                 if (strcmp(control, ctdb_commands[i].name) == 0) {
5554                         break;
5555                 }
5556         }
5557
5558         if (i == ARRAY_SIZE(ctdb_commands)) {
5559                 DEBUG(DEBUG_ERR, ("Unknown control '%s'\n", control));
5560                 exit(1);
5561         }
5562
5563         if (ctdb_commands[i].without_daemon == true) {
5564                 if (nodestring != NULL) {
5565                         DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
5566                         exit(1);
5567                 }
5568                 close(2);
5569                 return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
5570         }
5571
5572         /* initialise ctdb */
5573         ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
5574
5575         if (ctdb == NULL) {
5576                 DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
5577                 exit(1);
5578         }
5579
5580         /* initialize a libctdb connection as well */
5581         socket_name = ctdb_get_socketname(ctdb);
5582         ctdb_connection = ctdb_connect(socket_name,
5583                                        ctdb_log_file, stderr);
5584         if (ctdb_connection == NULL) {
5585                 DEBUG(DEBUG_ERR, ("Failed to connect to daemon from libctdb\n"));
5586                 exit(1);
5587         }                               
5588
5589         /* setup the node number(s) to contact */
5590         if (!parse_nodestring(ctdb, nodestring, CTDB_CURRENT_NODE, false,
5591                               &options.nodes, &options.pnn)) {
5592                 usage();
5593         }
5594
5595         if (options.pnn == CTDB_CURRENT_NODE) {
5596                 options.pnn = options.nodes[0];
5597         }
5598
5599         if (ctdb_commands[i].auto_all && 
5600             ((options.pnn == CTDB_BROADCAST_ALL) ||
5601              (options.pnn == CTDB_MULTICAST))) {
5602                 int j;
5603
5604                 ret = 0;
5605                 for (j = 0; j < talloc_array_length(options.nodes); j++) {
5606                         options.pnn = options.nodes[j];
5607                         ret |= ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
5608                 }
5609         } else {
5610                 ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
5611         }
5612
5613         ctdb_disconnect(ctdb_connection);
5614         talloc_free(ctdb);
5615         (void)poptFreeContext(pc);
5616
5617         return ret;
5618
5619 }