ctdb-tools: Wait for ctdb daemon to go away in shutdown
[metze/samba/wip.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25 #include "system/dir.h"
26
27 #include <ctype.h>
28 #include <popt.h>
29 #include <talloc.h>
30 #include <tevent.h>
31 #include <tdb.h>
32
33 #include "common/version.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36 #include "lib/util/sys_rw.h"
37
38 #include "common/db_hash.h"
39 #include "common/logging.h"
40 #include "protocol/protocol.h"
41 #include "protocol/protocol_api.h"
42 #include "protocol/protocol_util.h"
43 #include "common/system.h"
44 #include "client/client.h"
45 #include "client/client_sync.h"
46
47 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
48
49 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
50 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
51
52 static struct {
53         const char *socket;
54         const char *debuglevelstr;
55         int timelimit;
56         int pnn;
57         int machinereadable;
58         const char *sep;
59         int machineparsable;
60         int verbose;
61         int maxruntime;
62         int printemptyrecords;
63         int printdatasize;
64         int printlmaster;
65         int printhash;
66         int printrecordflags;
67 } options;
68
69 static poptContext pc;
70
71 struct ctdb_context {
72         struct tevent_context *ev;
73         struct ctdb_client_context *client;
74         struct ctdb_node_map *nodemap;
75         uint32_t pnn, cmd_pnn;
76         uint64_t srvid;
77 };
78
79 static void usage(const char *command);
80
81 /*
82  * Utility Functions
83  */
84
85 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
86 {
87         return (tv2->tv_sec - tv->tv_sec) +
88                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
89 }
90
91 static struct ctdb_node_and_flags *get_node_by_pnn(
92                                         struct ctdb_node_map *nodemap,
93                                         uint32_t pnn)
94 {
95         int i;
96
97         for (i=0; i<nodemap->num; i++) {
98                 if (nodemap->node[i].pnn == pnn) {
99                         return &nodemap->node[i];
100                 }
101         }
102         return NULL;
103 }
104
105 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
106 {
107         static const struct {
108                 uint32_t flag;
109                 const char *name;
110         } flag_names[] = {
111                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
112                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
113                 { NODE_FLAGS_BANNED,                "BANNED" },
114                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
115                 { NODE_FLAGS_DELETED,               "DELETED" },
116                 { NODE_FLAGS_STOPPED,               "STOPPED" },
117                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
118         };
119         char *flags_str = NULL;
120         int i;
121
122         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
123                 if (flags & flag_names[i].flag) {
124                         if (flags_str == NULL) {
125                                 flags_str = talloc_asprintf(mem_ctx,
126                                                 "%s", flag_names[i].name);
127                         } else {
128                                 flags_str = talloc_asprintf_append(flags_str,
129                                                 "|%s", flag_names[i].name);
130                         }
131                         if (flags_str == NULL) {
132                                 return "OUT-OF-MEMORY";
133                         }
134                 }
135         }
136         if (flags_str == NULL) {
137                 return "OK";
138         }
139
140         return flags_str;
141 }
142
143 static uint64_t next_srvid(struct ctdb_context *ctdb)
144 {
145         ctdb->srvid += 1;
146         return ctdb->srvid;
147 }
148
149 /*
150  * Get consistent nodemap information.
151  *
152  * If nodemap is already cached, use that. If not get it.
153  * If the current node is BANNED, then get nodemap from "better" node.
154  */
155 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
156 {
157         TALLOC_CTX *tmp_ctx;
158         struct ctdb_node_map *nodemap;
159         struct ctdb_node_and_flags *node;
160         uint32_t current_node;
161         int ret;
162
163         if (force) {
164                 TALLOC_FREE(ctdb->nodemap);
165         }
166
167         if (ctdb->nodemap != NULL) {
168                 return ctdb->nodemap;
169         }
170
171         tmp_ctx = talloc_new(ctdb);
172         if (tmp_ctx == NULL) {
173                 return false;
174         }
175
176         current_node = ctdb->pnn;
177 again:
178         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
179                                     current_node, TIMEOUT(), &nodemap);
180         if (ret != 0) {
181                 fprintf(stderr, "Failed to get nodemap from node %u\n",
182                         current_node);
183                 goto failed;
184         }
185
186         node = get_node_by_pnn(nodemap, current_node);
187         if (node->flags & NODE_FLAGS_BANNED) {
188                 /* Pick next node */
189                 do {
190                         current_node = (current_node + 1) % nodemap->num;
191                         node = get_node_by_pnn(nodemap, current_node);
192                         if (! (node->flags &
193                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
194                                 break;
195                         }
196                 } while (current_node != ctdb->pnn);
197
198                 if (current_node == ctdb->pnn) {
199                         /* Tried all nodes in the cluster */
200                         fprintf(stderr, "Warning: All nodes are banned.\n");
201                         goto failed;
202                 }
203
204                 goto again;
205         }
206
207         ctdb->nodemap = talloc_steal(ctdb, nodemap);
208         return nodemap;
209
210 failed:
211         talloc_free(tmp_ctx);
212         return NULL;
213 }
214
215 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
216 {
217         struct ctdb_node_map *nodemap;
218         bool found;
219         int i;
220
221         if (pnn == -1) {
222                 return false;
223         }
224
225         nodemap = get_nodemap(ctdb, false);
226         if (nodemap == NULL) {
227                 return false;
228         }
229
230         found = false;
231         for (i=0; i<nodemap->num; i++) {
232                 if (nodemap->node[i].pnn == pnn) {
233                         found = true;
234                         break;
235                 }
236         }
237         if (! found) {
238                 fprintf(stderr, "Node %u does not exist\n", pnn);
239                 return false;
240         }
241
242         if (nodemap->node[i].flags &
243             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
244                 fprintf(stderr, "Node %u has status %s\n", pnn,
245                         pretty_print_flags(ctdb, nodemap->node[i].flags));
246                 return false;
247         }
248
249         return true;
250 }
251
252 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
253                                             struct ctdb_node_map *nodemap)
254 {
255         struct ctdb_node_map *nodemap2;
256
257         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
258         if (nodemap2 == NULL) {
259                 return NULL;
260         }
261
262         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
263                                       nodemap->num);
264         if (nodemap2->node == NULL) {
265                 talloc_free(nodemap2);
266                 return NULL;
267         }
268
269         return nodemap2;
270 }
271
272 /*
273  * Get the number and the list of matching nodes
274  *
275  *   nodestring :=  NULL | all | pnn,[pnn,...]
276  *
277  * If nodestring is NULL, use the current node.
278  */
279 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
280                              const char *nodestring,
281                              struct ctdb_node_map **out)
282 {
283         struct ctdb_node_map *nodemap, *nodemap2;
284         struct ctdb_node_and_flags *node;
285         int i;
286
287         nodemap = get_nodemap(ctdb, false);
288         if (nodemap == NULL) {
289                 return false;
290         }
291
292         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
293         if (nodemap2 == NULL) {
294                 return false;
295         }
296
297         if (nodestring == NULL) {
298                 for (i=0; i<nodemap->num; i++) {
299                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
300                                 nodemap2->node[0] = nodemap->node[i];
301                                 break;
302                         }
303                 }
304                 nodemap2->num = 1;
305
306                 goto done;
307         }
308
309         if (strcmp(nodestring, "all") == 0) {
310                 for (i=0; i<nodemap->num; i++) {
311                         nodemap2->node[i] = nodemap->node[i];
312                 }
313                 nodemap2->num = nodemap->num;
314
315                 goto done;
316         } else {
317                 char *ns, *tok;
318
319                 ns = talloc_strdup(mem_ctx, nodestring);
320                 if (ns == NULL) {
321                         return false;
322                 }
323
324                 tok = strtok(ns, ",");
325                 while (tok != NULL) {
326                         uint32_t pnn;
327                         char *endptr;
328
329                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
330                         if (pnn == 0 && tok == endptr) {
331                                 fprintf(stderr, "Invalid node %s\n", tok);
332                                         return false;
333                         }
334
335                         node = get_node_by_pnn(nodemap, pnn);
336                         if (node == NULL) {
337                                 fprintf(stderr, "Node %u does not exist\n",
338                                         pnn);
339                                 return false;
340                         }
341
342                         nodemap2->node[nodemap2->num] = *node;
343                         nodemap2->num += 1;
344
345                         tok = strtok(NULL, ",");
346                 }
347         }
348
349 done:
350         *out = nodemap2;
351         return true;
352 }
353
354 /* Compare IP address */
355 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
356 {
357         bool ret = false;
358
359         if (ip1->sa.sa_family != ip2->sa.sa_family) {
360                 return false;
361         }
362
363         switch (ip1->sa.sa_family) {
364         case AF_INET:
365                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
366                               sizeof(struct in_addr)) == 0);
367                 break;
368
369         case AF_INET6:
370                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
371                               sizeof(struct in6_addr)) == 0);
372                 break;
373         }
374
375         return ret;
376 }
377
378 /* Append a node to a node map with given address and flags */
379 static bool node_map_add(struct ctdb_node_map *nodemap,
380                          const char *nstr, uint32_t flags)
381 {
382         ctdb_sock_addr addr;
383         uint32_t num;
384         struct ctdb_node_and_flags *n;
385         int ret;
386
387         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
388         if (ret != 0) {
389                 fprintf(stderr, "Invalid IP address %s\n", nstr);
390                 return false;
391         }
392
393         num = nodemap->num;
394         nodemap->node = talloc_realloc(nodemap, nodemap->node,
395                                        struct ctdb_node_and_flags, num+1);
396         if (nodemap->node == NULL) {
397                 return false;
398         }
399
400         n = &nodemap->node[num];
401         n->addr = addr;
402         n->pnn = num;
403         n->flags = flags;
404
405         nodemap->num = num+1;
406         return true;
407 }
408
409 /* Read a nodes file into a node map */
410 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
411                                                   const char *nlist)
412 {
413         char **lines;
414         int nlines;
415         int i;
416         struct ctdb_node_map *nodemap;
417
418         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
419         if (nodemap == NULL) {
420                 return NULL;
421         }
422
423         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
424         if (lines == NULL) {
425                 return NULL;
426         }
427
428         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
429                 nlines--;
430         }
431
432         for (i=0; i<nlines; i++) {
433                 char *node;
434                 uint32_t flags;
435                 size_t len;
436
437                 node = lines[i];
438                 /* strip leading spaces */
439                 while((*node == ' ') || (*node == '\t')) {
440                         node++;
441                 }
442
443                 len = strlen(node);
444
445                 /* strip trailing spaces */
446                 while ((len > 1) &&
447                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
448                 {
449                         node[len-1] = '\0';
450                         len--;
451                 }
452
453                 if (len == 0) {
454                         continue;
455                 }
456                 if (*node == '#') {
457                         /* A "deleted" node is a node that is
458                            commented out in the nodes file.  This is
459                            used instead of removing a line, which
460                            would cause subsequent nodes to change
461                            their PNN. */
462                         flags = NODE_FLAGS_DELETED;
463                         node = discard_const("0.0.0.0");
464                 } else {
465                         flags = 0;
466                 }
467                 if (! node_map_add(nodemap, node, flags)) {
468                         talloc_free(lines);
469                         TALLOC_FREE(nodemap);
470                         return NULL;
471                 }
472         }
473
474         talloc_free(lines);
475         return nodemap;
476 }
477
478 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
479 {
480         struct ctdb_node_map *nodemap;
481         char *nodepath;
482         const char *nodes_list = NULL;
483
484         if (pnn != CTDB_UNKNOWN_PNN) {
485                 nodepath = talloc_asprintf(mem_ctx, "CTDB_NODES_%u", pnn);
486                 if (nodepath != NULL) {
487                         nodes_list = getenv(nodepath);
488                 }
489         }
490         if (nodes_list == NULL) {
491                 nodes_list = getenv("CTDB_NODES");
492         }
493         if (nodes_list == NULL) {
494                 const char *basedir = getenv("CTDB_BASE");
495                 if (basedir == NULL) {
496                         basedir = CTDB_ETCDIR;
497                 }
498                 nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
499                 if (nodes_list == NULL) {
500                         fprintf(stderr, "Memory allocation error\n");
501                         return NULL;
502                 }
503         }
504
505         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
506         if (nodemap == NULL) {
507                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
508                         nodes_list);
509                 return NULL;
510         }
511
512         return nodemap;
513 }
514
515 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
516                                  struct ctdb_context *ctdb,
517                                  struct ctdb_dbid_map *dbmap,
518                                  const char *db_name)
519 {
520         struct ctdb_dbid *db = NULL;
521         const char *name;
522         int ret, i;
523
524         for (i=0; i<dbmap->num; i++) {
525                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
526                                            ctdb->pnn, TIMEOUT(),
527                                            dbmap->dbs[i].db_id, &name);
528                 if (ret != 0) {
529                         return false;
530                 }
531
532                 if (strcmp(db_name, name) == 0) {
533                         talloc_free(discard_const(name));
534                         db = &dbmap->dbs[i];
535                         break;
536                 }
537         }
538
539         return db;
540 }
541
542 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
543                       const char *db_arg, uint32_t *db_id,
544                       const char **db_name, uint8_t *db_flags)
545 {
546         struct ctdb_dbid_map *dbmap;
547         struct ctdb_dbid *db = NULL;
548         uint32_t id = 0;
549         const char *name = NULL;
550         int ret, i;
551
552         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
553                                   ctdb->pnn, TIMEOUT(), &dbmap);
554         if (ret != 0) {
555                 return false;
556         }
557
558         if (strncmp(db_arg, "0x", 2) == 0) {
559                 id = strtoul(db_arg, NULL, 0);
560                 for (i=0; i<dbmap->num; i++) {
561                         if (id == dbmap->dbs[i].db_id) {
562                                 db = &dbmap->dbs[i];
563                                 break;
564                         }
565                 }
566         } else {
567                 name = db_arg;
568                 db = db_find(mem_ctx, ctdb, dbmap, name);
569         }
570
571         if (db == NULL) {
572                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
573                 return false;
574         }
575
576         if (name == NULL) {
577                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
578                                            ctdb->pnn, TIMEOUT(), id, &name);
579                 if (ret != 0) {
580                         return false;
581                 }
582         }
583
584         if (db_id != NULL) {
585                 *db_id = db->db_id;
586         }
587         if (db_name != NULL) {
588                 *db_name = talloc_strdup(mem_ctx, name);
589         }
590         if (db_flags != NULL) {
591                 *db_flags = db->flags;
592         }
593         return true;
594 }
595
596 static int h2i(char h)
597 {
598         if (h >= 'a' && h <= 'f') {
599                 return h - 'a' + 10;
600         }
601         if (h >= 'A' && h <= 'F') {
602                 return h - 'f' + 10;
603         }
604         return h - '0';
605 }
606
607 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
608                        TDB_DATA *out)
609 {
610         int i;
611         TDB_DATA data;
612
613         if (len & 0x01) {
614                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
615                         str);
616                 return EINVAL;
617         }
618
619         data.dsize = len / 2;
620         data.dptr = talloc_size(mem_ctx, data.dsize);
621         if (data.dptr == NULL) {
622                 return ENOMEM;
623         }
624
625         for (i=0; i<data.dsize; i++) {
626                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
627         }
628
629         *out = data;
630         return 0;
631 }
632
633 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
634                        TDB_DATA *out)
635 {
636         TDB_DATA data;
637         int ret = 0;
638
639         if (strncmp(str, "0x", 2) == 0) {
640                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
641         } else {
642                 data.dptr = talloc_memdup(mem_ctx, str, len);
643                 if (data.dptr == NULL) {
644                         return ENOMEM;
645                 }
646                 data.dsize = len;
647         }
648
649         *out = data;
650         return ret;
651 }
652
653 static int run_helper(TALLOC_CTX *mem_ctx, const char *command,
654                       const char *path, int argc, const char **argv)
655 {
656         pid_t pid;
657         int save_errno, status, ret;
658         const char **new_argv;
659         int i;
660
661         new_argv = talloc_array(mem_ctx, const char *, argc + 2);
662         if (new_argv == NULL) {
663                 return ENOMEM;
664         }
665
666         new_argv[0] = path;
667         for (i=0; i<argc; i++) {
668                 new_argv[i+1] = argv[i];
669         }
670         new_argv[argc+1] = NULL;
671
672         pid = fork();
673         if (pid < 0) {
674                 save_errno = errno;
675                 talloc_free(new_argv);
676                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
677                         command, path, strerror(save_errno));
678                 return save_errno;
679         }
680
681         if (pid == 0) {
682                 ret = execv(path, discard_const(new_argv));
683                 if (ret == -1) {
684                         _exit(64+errno);
685                 }
686                 /* Should not happen */
687                 _exit(64+ENOEXEC);
688         }
689
690         talloc_free(new_argv);
691
692         ret = waitpid(pid, &status, 0);
693         if (ret == -1) {
694                 save_errno = errno;
695                 fprintf(stderr, "waitpid() failed for %s - %s\n",
696                         command, strerror(save_errno));
697                 return save_errno;
698         }
699
700         if (WIFEXITED(status)) {
701                 int pstatus = WEXITSTATUS(status);
702                 if (WIFSIGNALED(status)) {
703                         fprintf(stderr, "%s terminated with signal %d\n",
704                                 command, WTERMSIG(status));
705                         ret = EINTR;
706                 } else if (pstatus >= 64 && pstatus < 255) {
707                         fprintf(stderr, "%s failed with error %d\n",
708                                 command, pstatus-64);
709                         ret = pstatus - 64;
710                 } else {
711                         ret = pstatus;
712                 }
713                 return ret;
714         } else if (WIFSIGNALED(status)) {
715                 fprintf(stderr, "%s terminated with signal %d\n",
716                         command, WTERMSIG(status));
717                 return EINTR;
718         }
719
720         return 0;
721 }
722
723 /*
724  * Command Functions
725  */
726
727 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
728                            int argc, const char **argv)
729 {
730         printf("%s\n", ctdb_version_string);
731         return 0;
732 }
733
734 static bool partially_online(TALLOC_CTX *mem_ctx,
735                              struct ctdb_context *ctdb,
736                              struct ctdb_node_and_flags *node)
737 {
738         struct ctdb_iface_list *iface_list;
739         int ret, i;
740         bool status = false;
741
742         if (node->flags != 0) {
743                 return false;
744         }
745
746         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
747                                    node->pnn, TIMEOUT(), &iface_list);
748         if (ret != 0) {
749                 return false;
750         }
751
752         status = false;
753         for (i=0; i < iface_list->num; i++) {
754                 if (iface_list->iface[i].link_state == 0) {
755                         status = true;
756                         break;
757                 }
758         }
759
760         return status;
761 }
762
763 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
764                                   struct ctdb_context *ctdb,
765                                   struct ctdb_node_map *nodemap,
766                                   uint32_t mypnn)
767 {
768         struct ctdb_node_and_flags *node;
769         int i;
770
771         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
772                options.sep,
773                "Node", options.sep,
774                "IP", options.sep,
775                "Disconnected", options.sep,
776                "Banned", options.sep,
777                "Disabled", options.sep,
778                "Unhealthy", options.sep,
779                "Stopped", options.sep,
780                "Inactive", options.sep,
781                "PartiallyOnline", options.sep,
782                "ThisNode", options.sep);
783
784         for (i=0; i<nodemap->num; i++) {
785                 node = &nodemap->node[i];
786                 if (node->flags & NODE_FLAGS_DELETED) {
787                         continue;
788                 }
789
790                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
791                        options.sep,
792                        node->pnn, options.sep,
793                        ctdb_sock_addr_to_string(mem_ctx, &node->addr, false),
794                        options.sep,
795                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
796                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
797                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
798                        options.sep,
799                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
800                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
801                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
802                        partially_online(mem_ctx, ctdb, node), options.sep,
803                        (node->pnn == mypnn)?'Y':'N', options.sep);
804         }
805
806 }
807
808 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
809                           struct ctdb_node_map *nodemap, uint32_t mypnn,
810                           bool print_header)
811 {
812         struct ctdb_node_and_flags *node;
813         int num_deleted_nodes = 0;
814         int i;
815
816         for (i=0; i<nodemap->num; i++) {
817                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
818                         num_deleted_nodes++;
819                 }
820         }
821
822         if (print_header) {
823                 if (num_deleted_nodes == 0) {
824                         printf("Number of nodes:%d\n", nodemap->num);
825                 } else {
826                         printf("Number of nodes:%d "
827                                "(including %d deleted nodes)\n",
828                                nodemap->num, num_deleted_nodes);
829                 }
830         }
831
832         for (i=0; i<nodemap->num; i++) {
833                 node = &nodemap->node[i];
834                 if (node->flags & NODE_FLAGS_DELETED) {
835                         continue;
836                 }
837
838                 printf("pnn:%u %-16s %s%s\n",
839                        node->pnn,
840                        ctdb_sock_addr_to_string(mem_ctx, &node->addr, false),
841                        partially_online(mem_ctx, ctdb, node) ?
842                                 "PARTIALLYONLINE" :
843                                 pretty_print_flags(mem_ctx, node->flags),
844                        node->pnn == mypnn ? " (THIS NODE)" : "");
845         }
846 }
847
848 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
849                          struct ctdb_node_map *nodemap, uint32_t mypnn,
850                          struct ctdb_vnn_map *vnnmap, int recmode,
851                          uint32_t recmaster)
852 {
853         int i;
854
855         print_nodemap(mem_ctx, ctdb, nodemap, mypnn, true);
856
857         if (vnnmap->generation == INVALID_GENERATION) {
858                 printf("Generation:INVALID\n");
859         } else {
860                 printf("Generation:%u\n", vnnmap->generation);
861         }
862         printf("Size:%d\n", vnnmap->size);
863         for (i=0; i<vnnmap->size; i++) {
864                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
865         }
866
867         printf("Recovery mode:%s (%d)\n",
868                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
869                recmode);
870         printf("Recovery master:%d\n", recmaster);
871 }
872
873 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
874                           int argc, const char **argv)
875 {
876         struct ctdb_node_map *nodemap;
877         struct ctdb_vnn_map *vnnmap;
878         int recmode;
879         uint32_t recmaster;
880         int ret;
881
882         if (argc != 0) {
883                 usage("status");
884         }
885
886         nodemap = get_nodemap(ctdb, false);
887         if (nodemap == NULL) {
888                 return 1;
889         }
890
891         if (options.machinereadable == 1) {
892                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
893                 return 0;
894         }
895
896         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
897                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
898         if (ret != 0) {
899                 return ret;
900         }
901
902         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
903                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
904         if (ret != 0) {
905                 return ret;
906         }
907
908         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
909                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
910         if (ret != 0) {
911                 return ret;
912         }
913
914         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
915                      recmode, recmaster);
916         return 0;
917 }
918
919 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
920                           int argc, const char **argv)
921 {
922         struct ctdb_uptime *uptime;
923         int ret, tmp, days, hours, minutes, seconds;
924
925         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
926                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
927         if (ret != 0) {
928                 return ret;
929         }
930
931         printf("Current time of node %-4u     :                %s",
932                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
933
934         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
935         seconds = tmp % 60; tmp /= 60;
936         minutes = tmp % 60; tmp /= 60;
937         hours = tmp % 24; tmp /= 24;
938         days = tmp;
939
940         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
941                days, hours, minutes, seconds,
942                ctime(&uptime->ctdbd_start_time.tv_sec));
943
944         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
945         seconds = tmp % 60; tmp /= 60;
946         minutes = tmp % 60; tmp /= 60;
947         hours = tmp % 24; tmp /= 24;
948         days = tmp;
949
950         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
951                days, hours, minutes, seconds,
952                ctime(&uptime->last_recovery_finished.tv_sec));
953
954         printf("Duration of last recovery/failover: %lf seconds\n",
955                timeval_delta(&uptime->last_recovery_finished,
956                              &uptime->last_recovery_started));
957
958         return 0;
959 }
960
961 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
962                         int argc, const char **argv)
963 {
964         struct timeval tv;
965         int ret, num_clients;
966
967         tv = timeval_current();
968         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
969                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
970         if (ret != 0) {
971                 return ret;
972         }
973
974         printf("response from %u time=%.6f sec  (%d clients)\n",
975                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
976         return 0;
977 }
978
979 const char *runstate_to_string(enum ctdb_runstate runstate);
980 enum ctdb_runstate runstate_from_string(const char *runstate_str);
981
982 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
983                             int argc, const char **argv)
984 {
985         enum ctdb_runstate runstate;
986         bool found;
987         int ret, i;
988
989         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
990                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
991         if (ret != 0) {
992                 return ret;
993         }
994
995         found = true;
996         for (i=0; i<argc; i++) {
997                 enum ctdb_runstate t;
998
999                 found = false;
1000                 t = ctdb_runstate_from_string(argv[i]);
1001                 if (t == CTDB_RUNSTATE_UNKNOWN) {
1002                         printf("Invalid run state (%s)\n", argv[i]);
1003                         return 1;
1004                 }
1005
1006                 if (t == runstate) {
1007                         found = true;
1008                         break;
1009                 }
1010         }
1011
1012         if (! found) {
1013                 printf("CTDB not in required run state (got %s)\n",
1014                        ctdb_runstate_to_string(runstate));
1015                 return 1;
1016         }
1017
1018         printf("%s\n", ctdb_runstate_to_string(runstate));
1019         return 0;
1020 }
1021
1022 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1023                           int argc, const char **argv)
1024 {
1025         struct ctdb_var_list *tun_var_list;
1026         uint32_t value;
1027         int ret, i;
1028         bool found;
1029
1030         if (argc != 1) {
1031                 usage("getvar");
1032         }
1033
1034         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1035                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1036         if (ret != 0) {
1037                 fprintf(stderr,
1038                         "Failed to get list of variables from node %u\n",
1039                         ctdb->cmd_pnn);
1040                 return ret;
1041         }
1042
1043         found = false;
1044         for (i=0; i<tun_var_list->count; i++) {
1045                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1046                         found = true;
1047                         break;
1048                 }
1049         }
1050
1051         if (! found) {
1052                 printf("No such tunable %s\n", argv[0]);
1053                 return 1;
1054         }
1055
1056         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1057                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1058         if (ret != 0) {
1059                 return ret;
1060         }
1061
1062         printf("%-26s = %u\n", argv[0], value);
1063         return 0;
1064 }
1065
1066 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1067                           int argc, const char **argv)
1068 {
1069         struct ctdb_var_list *tun_var_list;
1070         struct ctdb_tunable tunable;
1071         int ret, i;
1072         bool found;
1073
1074         if (argc != 2) {
1075                 usage("setvar");
1076         }
1077
1078         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1079                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1080         if (ret != 0) {
1081                 fprintf(stderr,
1082                         "Failed to get list of variables from node %u\n",
1083                         ctdb->cmd_pnn);
1084                 return ret;
1085         }
1086
1087         found = false;
1088         for (i=0; i<tun_var_list->count; i++) {
1089                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1090                         found = true;
1091                         break;
1092                 }
1093         }
1094
1095         if (! found) {
1096                 printf("No such tunable %s\n", argv[0]);
1097                 return 1;
1098         }
1099
1100         tunable.name = argv[0];
1101         tunable.value = strtoul(argv[1], NULL, 0);
1102
1103         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1104                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1105         if (ret != 0) {
1106                 if (ret == 1) {
1107                         fprintf(stderr,
1108                                 "Setting obsolete tunable variable '%s'\n",
1109                                tunable.name);
1110                         return 0;
1111                 }
1112         }
1113
1114         return ret;
1115 }
1116
1117 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1118                             int argc, const char **argv)
1119 {
1120         struct ctdb_var_list *tun_var_list;
1121         int ret, i;
1122
1123         if (argc != 0) {
1124                 usage("listvars");
1125         }
1126
1127         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1128                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1129         if (ret != 0) {
1130                 return ret;
1131         }
1132
1133         for (i=0; i<tun_var_list->count; i++) {
1134                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1135         }
1136
1137         return 0;
1138 }
1139
1140 const struct {
1141         const char *name;
1142         uint32_t offset;
1143 } stats_fields[] = {
1144 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1145         STATISTICS_FIELD(num_clients),
1146         STATISTICS_FIELD(frozen),
1147         STATISTICS_FIELD(recovering),
1148         STATISTICS_FIELD(num_recoveries),
1149         STATISTICS_FIELD(client_packets_sent),
1150         STATISTICS_FIELD(client_packets_recv),
1151         STATISTICS_FIELD(node_packets_sent),
1152         STATISTICS_FIELD(node_packets_recv),
1153         STATISTICS_FIELD(keepalive_packets_sent),
1154         STATISTICS_FIELD(keepalive_packets_recv),
1155         STATISTICS_FIELD(node.req_call),
1156         STATISTICS_FIELD(node.reply_call),
1157         STATISTICS_FIELD(node.req_dmaster),
1158         STATISTICS_FIELD(node.reply_dmaster),
1159         STATISTICS_FIELD(node.reply_error),
1160         STATISTICS_FIELD(node.req_message),
1161         STATISTICS_FIELD(node.req_control),
1162         STATISTICS_FIELD(node.reply_control),
1163         STATISTICS_FIELD(node.req_tunnel),
1164         STATISTICS_FIELD(client.req_call),
1165         STATISTICS_FIELD(client.req_message),
1166         STATISTICS_FIELD(client.req_control),
1167         STATISTICS_FIELD(client.req_tunnel),
1168         STATISTICS_FIELD(timeouts.call),
1169         STATISTICS_FIELD(timeouts.control),
1170         STATISTICS_FIELD(timeouts.traverse),
1171         STATISTICS_FIELD(locks.num_calls),
1172         STATISTICS_FIELD(locks.num_current),
1173         STATISTICS_FIELD(locks.num_pending),
1174         STATISTICS_FIELD(locks.num_failed),
1175         STATISTICS_FIELD(total_calls),
1176         STATISTICS_FIELD(pending_calls),
1177         STATISTICS_FIELD(childwrite_calls),
1178         STATISTICS_FIELD(pending_childwrite_calls),
1179         STATISTICS_FIELD(memory_used),
1180         STATISTICS_FIELD(max_hop_count),
1181         STATISTICS_FIELD(total_ro_delegations),
1182         STATISTICS_FIELD(total_ro_revokes),
1183 };
1184
1185 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1186
1187 static void print_statistics_machine(struct ctdb_statistics *s,
1188                                      bool show_header)
1189 {
1190         int i;
1191
1192         if (show_header) {
1193                 printf("CTDB version%s", options.sep);
1194                 printf("Current time of statistics%s", options.sep);
1195                 printf("Statistics collected since%s", options.sep);
1196                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1197                         printf("%s%s", stats_fields[i].name, options.sep);
1198                 }
1199                 printf("num_reclock_ctdbd_latency%s", options.sep);
1200                 printf("min_reclock_ctdbd_latency%s", options.sep);
1201                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1202                 printf("max_reclock_ctdbd_latency%s", options.sep);
1203
1204                 printf("num_reclock_recd_latency%s", options.sep);
1205                 printf("min_reclock_recd_latency%s", options.sep);
1206                 printf("avg_reclock_recd_latency%s", options.sep);
1207                 printf("max_reclock_recd_latency%s", options.sep);
1208
1209                 printf("num_call_latency%s", options.sep);
1210                 printf("min_call_latency%s", options.sep);
1211                 printf("avg_call_latency%s", options.sep);
1212                 printf("max_call_latency%s", options.sep);
1213
1214                 printf("num_lockwait_latency%s", options.sep);
1215                 printf("min_lockwait_latency%s", options.sep);
1216                 printf("avg_lockwait_latency%s", options.sep);
1217                 printf("max_lockwait_latency%s", options.sep);
1218
1219                 printf("num_childwrite_latency%s", options.sep);
1220                 printf("min_childwrite_latency%s", options.sep);
1221                 printf("avg_childwrite_latency%s", options.sep);
1222                 printf("max_childwrite_latency%s", options.sep);
1223                 printf("\n");
1224         }
1225
1226         printf("%u%s", CTDB_PROTOCOL, options.sep);
1227         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1228         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1229         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1230                 printf("%u%s",
1231                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1232                        options.sep);
1233         }
1234         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1235         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1236         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1237         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1238
1239         printf("%u%s", s->reclock.recd.num, options.sep);
1240         printf("%.6f%s", s->reclock.recd.min, options.sep);
1241         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1242         printf("%.6f%s", s->reclock.recd.max, options.sep);
1243
1244         printf("%d%s", s->call_latency.num, options.sep);
1245         printf("%.6f%s", s->call_latency.min, options.sep);
1246         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1247         printf("%.6f%s", s->call_latency.max, options.sep);
1248
1249         printf("%d%s", s->childwrite_latency.num, options.sep);
1250         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1251         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1252         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1253         printf("\n");
1254 }
1255
1256 static void print_statistics(struct ctdb_statistics *s)
1257 {
1258         int tmp, days, hours, minutes, seconds;
1259         int i;
1260         const char *prefix = NULL;
1261         int preflen = 0;
1262
1263         tmp = s->statistics_current_time.tv_sec -
1264               s->statistics_start_time.tv_sec;
1265         seconds = tmp % 60; tmp /= 60;
1266         minutes = tmp % 60; tmp /= 60;
1267         hours   = tmp % 24; tmp /= 24;
1268         days    = tmp;
1269
1270         printf("CTDB version %u\n", CTDB_PROTOCOL);
1271         printf("Current time of statistics  :                %s",
1272                ctime(&s->statistics_current_time.tv_sec));
1273         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1274                days, hours, minutes, seconds,
1275                ctime(&s->statistics_start_time.tv_sec));
1276
1277         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1278                 if (strchr(stats_fields[i].name, '.') != NULL) {
1279                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1280                         if (! prefix ||
1281                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1282                                 prefix = stats_fields[i].name;
1283                                 printf(" %*.*s\n", preflen-1, preflen-1,
1284                                        stats_fields[i].name);
1285                         }
1286                 } else {
1287                         preflen = 0;
1288                 }
1289                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1290                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1291                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1292         }
1293
1294         printf(" hop_count_buckets:");
1295         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1296                 printf(" %d", s->hop_count_bucket[i]);
1297         }
1298         printf("\n");
1299         printf(" lock_buckets:");
1300         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1301                 printf(" %d", s->locks.buckets[i]);
1302         }
1303         printf("\n");
1304         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1305                "locks_latency      MIN/AVG/MAX",
1306                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1307                s->locks.latency.max, s->locks.latency.num);
1308
1309         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1310                "reclock_ctdbd      MIN/AVG/MAX",
1311                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1312                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1313
1314         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1315                "reclock_recd       MIN/AVG/MAX",
1316                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1317                s->reclock.recd.max, s->reclock.recd.num);
1318
1319         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1320                "call_latency       MIN/AVG/MAX",
1321                s->call_latency.min, LATENCY_AVG(s->call_latency),
1322                s->call_latency.max, s->call_latency.num);
1323
1324         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1325                "childwrite_latency MIN/AVG/MAX",
1326                s->childwrite_latency.min,
1327                LATENCY_AVG(s->childwrite_latency),
1328                s->childwrite_latency.max, s->childwrite_latency.num);
1329 }
1330
1331 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1332                               int argc, const char **argv)
1333 {
1334         struct ctdb_statistics *stats;
1335         int ret;
1336
1337         if (argc != 0) {
1338                 usage("statistics");
1339         }
1340
1341         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1342                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1343         if (ret != 0) {
1344                 return ret;
1345         }
1346
1347         if (options.machinereadable) {
1348                 print_statistics_machine(stats, true);
1349         } else {
1350                 print_statistics(stats);
1351         }
1352
1353         return 0;
1354 }
1355
1356 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1357                                     struct ctdb_context *ctdb,
1358                                     int argc, const char **argv)
1359 {
1360         int ret;
1361
1362         if (argc != 0) {
1363                 usage("statisticsreset");
1364         }
1365
1366         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1367                                          ctdb->cmd_pnn, TIMEOUT());
1368         if (ret != 0) {
1369                 return ret;
1370         }
1371
1372         return 0;
1373 }
1374
1375 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1376                          int argc, const char **argv)
1377 {
1378         struct ctdb_statistics_list *slist;
1379         int ret, count = 0, i;
1380         bool show_header = true;
1381
1382         if (argc > 1) {
1383                 usage("stats");
1384         }
1385
1386         if (argc == 1) {
1387                 count = atoi(argv[0]);
1388         }
1389
1390         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1391                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1392         if (ret != 0) {
1393                 return ret;
1394         }
1395
1396         for (i=0; i<slist->num; i++) {
1397                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1398                         continue;
1399                 }
1400                 if (options.machinereadable == 1) {
1401                         print_statistics_machine(&slist->stats[i],
1402                                                  show_header);
1403                         show_header = false;
1404                 } else {
1405                         print_statistics(&slist->stats[i]);
1406                 }
1407                 if (count > 0 && i == count) {
1408                         break;
1409                 }
1410         }
1411
1412         return 0;
1413 }
1414
1415 static int ctdb_public_ip_cmp(const void *a, const void *b)
1416 {
1417         const struct ctdb_public_ip *ip_a = a;
1418         const struct ctdb_public_ip *ip_b = b;
1419
1420         return ctdb_sock_addr_cmp(&ip_a->addr, &ip_b->addr);
1421 }
1422
1423 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1424                      struct ctdb_public_ip_list *ips,
1425                      struct ctdb_public_ip_info **ipinfo,
1426                      bool all_nodes)
1427 {
1428         int i, j;
1429         char *conf, *avail, *active;
1430
1431         if (options.machinereadable == 1) {
1432                 printf("%s%s%s%s%s", options.sep,
1433                        "Public IP", options.sep,
1434                        "Node", options.sep);
1435                 if (options.verbose == 1) {
1436                         printf("%s%s%s%s%s%s\n",
1437                                "ActiveInterfaces", options.sep,
1438                                "AvailableInterfaces", options.sep,
1439                                "ConfiguredInterfaces", options.sep);
1440                 } else {
1441                         printf("\n");
1442                 }
1443         } else {
1444                 if (all_nodes) {
1445                         printf("Public IPs on ALL nodes\n");
1446                 } else {
1447                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1448                 }
1449         }
1450
1451         for (i = 0; i < ips->num; i++) {
1452
1453                 if (options.machinereadable == 1) {
1454                         printf("%s%s%s%d%s", options.sep,
1455                                ctdb_sock_addr_to_string(
1456                                        mem_ctx, &ips->ip[i].addr, false),
1457                                options.sep,
1458                                (int)ips->ip[i].pnn, options.sep);
1459                 } else {
1460                         printf("%s", ctdb_sock_addr_to_string(
1461                                        mem_ctx, &ips->ip[i].addr, false));
1462                 }
1463
1464                 if (options.verbose == 0) {
1465                         if (options.machinereadable == 1) {
1466                                 printf("\n");
1467                         } else {
1468                                 printf(" %d\n", (int)ips->ip[i].pnn);
1469                         }
1470                         continue;
1471                 }
1472
1473                 conf = NULL;
1474                 avail = NULL;
1475                 active = NULL;
1476
1477                 if (ipinfo[i] == NULL) {
1478                         goto skip_ipinfo;
1479                 }
1480
1481                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1482                         struct ctdb_iface *iface;
1483
1484                         iface = &ipinfo[i]->ifaces->iface[j];
1485                         if (conf == NULL) {
1486                                 conf = talloc_strdup(mem_ctx, iface->name);
1487                         } else {
1488                                 conf = talloc_asprintf_append(
1489                                                 conf, ",%s", iface->name);
1490                         }
1491
1492                         if (ipinfo[i]->active_idx == j) {
1493                                 active = iface->name;
1494                         }
1495
1496                         if (iface->link_state == 0) {
1497                                 continue;
1498                         }
1499
1500                         if (avail == NULL) {
1501                                 avail = talloc_strdup(mem_ctx, iface->name);
1502                         } else {
1503                                 avail = talloc_asprintf_append(
1504                                                 avail, ",%s", iface->name);
1505                         }
1506                 }
1507
1508         skip_ipinfo:
1509
1510                 if (options.machinereadable == 1) {
1511                         printf("%s%s%s%s%s%s\n",
1512                                active ? active : "", options.sep,
1513                                avail ? avail : "", options.sep,
1514                                conf ? conf : "", options.sep);
1515                 } else {
1516                         printf(" node[%d] active[%s] available[%s]"
1517                                " configured[%s]\n",
1518                                (int)ips->ip[i].pnn, active ? active : "",
1519                                avail ? avail : "", conf ? conf : "");
1520                 }
1521         }
1522 }
1523
1524 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1525                        size_t datalen, void *private_data)
1526 {
1527         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1528                 private_data, struct ctdb_public_ip_list);
1529         struct ctdb_public_ip *ip;
1530
1531         ip = (struct ctdb_public_ip *)databuf;
1532         ips->ip[ips->num] = *ip;
1533         ips->num += 1;
1534
1535         return 0;
1536 }
1537
1538 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1539                               struct ctdb_public_ip_list **out)
1540 {
1541         struct ctdb_node_map *nodemap;
1542         struct ctdb_public_ip_list *ips;
1543         struct db_hash_context *ipdb;
1544         uint32_t *pnn_list;
1545         int ret, count, i, j;
1546
1547         nodemap = get_nodemap(ctdb, false);
1548         if (nodemap == NULL) {
1549                 return 1;
1550         }
1551
1552         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1553         if (ret != 0) {
1554                 goto failed;
1555         }
1556
1557         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1558                                      &pnn_list);
1559         if (count <= 0) {
1560                 goto failed;
1561         }
1562
1563         for (i=0; i<count; i++) {
1564                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1565                                                pnn_list[i], TIMEOUT(),
1566                                                false, &ips);
1567                 if (ret != 0) {
1568                         goto failed;
1569                 }
1570
1571                 for (j=0; j<ips->num; j++) {
1572                         struct ctdb_public_ip ip;
1573
1574                         ip.pnn = ips->ip[j].pnn;
1575                         ip.addr = ips->ip[j].addr;
1576
1577                         if (pnn_list[i] == ip.pnn) {
1578                                 /* Node claims IP is hosted on it, so
1579                                  * save that information
1580                                  */
1581                                 ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1582                                                   sizeof(ip.addr),
1583                                                   (uint8_t *)&ip, sizeof(ip));
1584                                 if (ret != 0) {
1585                                         goto failed;
1586                                 }
1587                         } else {
1588                                 /* Node thinks IP is hosted elsewhere,
1589                                  * so overwrite with CTDB_UNKNOWN_PNN
1590                                  * if there's no existing entry
1591                                  */
1592                                 ret = db_hash_exists(ipdb, (uint8_t *)&ip.addr,
1593                                                      sizeof(ip.addr));
1594                                 if (ret == ENOENT) {
1595                                         ip.pnn = CTDB_UNKNOWN_PNN;
1596                                         ret = db_hash_add(ipdb,
1597                                                           (uint8_t *)&ip.addr,
1598                                                           sizeof(ip.addr),
1599                                                           (uint8_t *)&ip,
1600                                                           sizeof(ip));
1601                                         if (ret != 0) {
1602                                                 goto failed;
1603                                         }
1604                                 }
1605                         }
1606                 }
1607
1608                 TALLOC_FREE(ips);
1609         }
1610
1611         talloc_free(pnn_list);
1612
1613         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1614         if (ret != 0) {
1615                 goto failed;
1616         }
1617
1618         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1619         if (ips == NULL) {
1620                 goto failed;
1621         }
1622
1623         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1624         if (ips->ip == NULL) {
1625                 goto failed;
1626         }
1627
1628         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1629         if (ret != 0) {
1630                 goto failed;
1631         }
1632
1633         if (count != ips->num) {
1634                 goto failed;
1635         }
1636
1637         talloc_free(ipdb);
1638
1639         *out = ips;
1640         return 0;
1641
1642 failed:
1643         talloc_free(ipdb);
1644         return 1;
1645 }
1646
1647 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1648                       int argc, const char **argv)
1649 {
1650         struct ctdb_public_ip_list *ips;
1651         struct ctdb_public_ip_info **ipinfo;
1652         int ret, i;
1653         bool do_all = false;
1654
1655         if (argc > 1) {
1656                 usage("ip");
1657         }
1658
1659         if (argc == 1) {
1660                 if (strcmp(argv[0], "all") == 0) {
1661                         do_all = true;
1662                 } else {
1663                         usage("ip");
1664                 }
1665         }
1666
1667         if (do_all) {
1668                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1669         } else {
1670                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1671                                                ctdb->cmd_pnn, TIMEOUT(),
1672                                                false, &ips);
1673         }
1674         if (ret != 0) {
1675                 return ret;
1676         }
1677
1678         qsort(ips->ip, ips->num, sizeof(struct ctdb_public_ip),
1679               ctdb_public_ip_cmp);
1680
1681         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1682         if (ipinfo == NULL) {
1683                 return 1;
1684         }
1685
1686         for (i=0; i<ips->num; i++) {
1687                 uint32_t pnn;
1688                 if (do_all) {
1689                         pnn = ips->ip[i].pnn;
1690                 } else {
1691                         pnn = ctdb->cmd_pnn;
1692                 }
1693                 if (pnn == CTDB_UNKNOWN_PNN) {
1694                         ipinfo[i] = NULL;
1695                         continue;
1696                 }
1697                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1698                                                    ctdb->client, pnn,
1699                                                    TIMEOUT(), &ips->ip[i].addr,
1700                                                    &ipinfo[i]);
1701                 if (ret != 0) {
1702                         return ret;
1703                 }
1704         }
1705
1706         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1707         return 0;
1708 }
1709
1710 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1711                           int argc, const char **argv)
1712 {
1713         struct ctdb_public_ip_info *ipinfo;
1714         ctdb_sock_addr addr;
1715         int ret, i;
1716
1717         if (argc != 1) {
1718                 usage("ipinfo");
1719         }
1720
1721         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
1722         if (ret != 0) {
1723                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1724                 return 1;
1725         }
1726
1727         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1728                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1729                                            &ipinfo);
1730         if (ret != 0) {
1731                 if (ret == -1) {
1732                         printf("Node %u does not know about IP %s\n",
1733                                ctdb->cmd_pnn, argv[0]);
1734                 }
1735                 return ret;
1736         }
1737
1738         printf("Public IP[%s] info on node %u\n",
1739                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr, false),
1740                                         ctdb->cmd_pnn);
1741
1742         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1743                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr, false),
1744                ipinfo->ip.pnn, ipinfo->ifaces->num);
1745
1746         for (i=0; i<ipinfo->ifaces->num; i++) {
1747                 struct ctdb_iface *iface;
1748
1749                 iface = &ipinfo->ifaces->iface[i];
1750                 iface->name[CTDB_IFACE_SIZE] = '\0';
1751                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1752                        i+1, iface->name,
1753                        iface->link_state == 0 ? "down" : "up",
1754                        iface->references,
1755                        (i == ipinfo->active_idx) ? " (active)" : "");
1756         }
1757
1758         return 0;
1759 }
1760
1761 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1762                           int argc, const char **argv)
1763 {
1764         struct ctdb_iface_list *ifaces;
1765         int ret, i;
1766
1767         if (argc != 0) {
1768                 usage("ifaces");
1769         }
1770
1771         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1772                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1773         if (ret != 0) {
1774                 return ret;
1775         }
1776
1777         if (ifaces->num == 0) {
1778                 printf("No interfaces configured on node %u\n",
1779                        ctdb->cmd_pnn);
1780                 return 0;
1781         }
1782
1783         if (options.machinereadable) {
1784                 printf("%s%s%s%s%s%s%s\n", options.sep,
1785                        "Name", options.sep,
1786                        "LinkStatus", options.sep,
1787                        "References", options.sep);
1788         } else {
1789                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1790         }
1791
1792         for (i=0; i<ifaces->num; i++) {
1793                 if (options.machinereadable) {
1794                         printf("%s%s%s%u%s%u%s\n", options.sep,
1795                                ifaces->iface[i].name, options.sep,
1796                                ifaces->iface[i].link_state, options.sep,
1797                                ifaces->iface[i].references, options.sep);
1798                 } else {
1799                         printf("name:%s link:%s references:%u\n",
1800                                ifaces->iface[i].name,
1801                                ifaces->iface[i].link_state ? "up" : "down",
1802                                ifaces->iface[i].references);
1803                 }
1804         }
1805
1806         return 0;
1807 }
1808
1809 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1810                                 int argc, const char **argv)
1811 {
1812         struct ctdb_iface_list *ifaces;
1813         struct ctdb_iface *iface;
1814         int ret, i;
1815
1816         if (argc != 2) {
1817                 usage("setifacelink");
1818         }
1819
1820         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1821                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1822                 return 1;
1823         }
1824
1825         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1826                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1827         if (ret != 0) {
1828                 fprintf(stderr,
1829                         "Failed to get interface information from node %u\n",
1830                         ctdb->cmd_pnn);
1831                 return ret;
1832         }
1833
1834         iface = NULL;
1835         for (i=0; i<ifaces->num; i++) {
1836                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1837                         iface = &ifaces->iface[i];
1838                         break;
1839                 }
1840         }
1841
1842         if (iface == NULL) {
1843                 printf("Interface %s not configured on node %u\n",
1844                        argv[0], ctdb->cmd_pnn);
1845                 return 1;
1846         }
1847
1848         if (strcmp(argv[1], "up") == 0) {
1849                 iface->link_state = 1;
1850         } else if (strcmp(argv[1], "down") == 0) {
1851                 iface->link_state = 0;
1852         } else {
1853                 usage("setifacelink");
1854                 return 1;
1855         }
1856
1857         iface->references = 0;
1858
1859         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1860                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1861         if (ret != 0) {
1862                 return ret;
1863         }
1864
1865         return 0;
1866 }
1867
1868 static int control_process_exists(TALLOC_CTX *mem_ctx,
1869                                   struct ctdb_context *ctdb,
1870                                   int argc, const char **argv)
1871 {
1872         pid_t pid;
1873         uint64_t srvid = 0;
1874         int ret, status;
1875
1876         if (argc != 1 && argc != 2) {
1877                 usage("process-exists");
1878         }
1879
1880         pid = atoi(argv[0]);
1881         if (argc == 2) {
1882                 srvid = strtoull(argv[1], NULL, 0);
1883         }
1884
1885         if (srvid == 0) {
1886                 ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1887                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1888         } else {
1889                 struct ctdb_pid_srvid pid_srvid;
1890
1891                 pid_srvid.pid = pid;
1892                 pid_srvid.srvid = srvid;
1893
1894                 ret = ctdb_ctrl_check_pid_srvid(mem_ctx, ctdb->ev,
1895                                                 ctdb->client, ctdb->cmd_pnn,
1896                                                 TIMEOUT(), &pid_srvid,
1897                                                 &status);
1898         }
1899
1900         if (ret != 0) {
1901                 return ret;
1902         }
1903
1904         if (srvid == 0) {
1905                 printf("PID %d %s\n", pid,
1906                        (status == 0 ? "exists" : "does not exist"));
1907         } else {
1908                 printf("PID %d with SRVID 0x%"PRIx64" %s\n", pid, srvid,
1909                        (status == 0 ? "exists" : "does not exist"));
1910         }
1911         return status;
1912 }
1913
1914 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1915                             int argc, const char **argv)
1916 {
1917         struct ctdb_dbid_map *dbmap;
1918         int ret, i;
1919
1920         if (argc != 0) {
1921                 usage("getdbmap");
1922         }
1923
1924         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1925                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1926         if (ret != 0) {
1927                 return ret;
1928         }
1929
1930         if (options.machinereadable == 1) {
1931                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1932                        options.sep,
1933                        "ID", options.sep,
1934                        "Name", options.sep,
1935                        "Path", options.sep,
1936                        "Persistent", options.sep,
1937                        "Sticky", options.sep,
1938                        "Unhealthy", options.sep,
1939                        "Readonly", options.sep,
1940                        "Replicated", options.sep);
1941         } else {
1942                 printf("Number of databases:%d\n", dbmap->num);
1943         }
1944
1945         for (i=0; i<dbmap->num; i++) {
1946                 const char *name;
1947                 const char *path;
1948                 const char *health;
1949                 bool persistent;
1950                 bool readonly;
1951                 bool sticky;
1952                 bool replicated;
1953                 uint32_t db_id;
1954
1955                 db_id = dbmap->dbs[i].db_id;
1956
1957                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1958                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1959                                            &name);
1960                 if (ret != 0) {
1961                         return ret;
1962                 }
1963
1964                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1965                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1966                                           &path);
1967                 if (ret != 0) {
1968                         return ret;
1969                 }
1970
1971                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1972                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1973                                               &health);
1974                 if (ret != 0) {
1975                         return ret;
1976                 }
1977
1978                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1979                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1980                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1981                 replicated = dbmap->dbs[i].flags & CTDB_DB_FLAGS_REPLICATED;
1982
1983                 if (options.machinereadable == 1) {
1984                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s%d%s\n",
1985                                options.sep,
1986                                db_id, options.sep,
1987                                name, options.sep,
1988                                path, options.sep,
1989                                !! (persistent), options.sep,
1990                                !! (sticky), options.sep,
1991                                !! (health), options.sep,
1992                                !! (readonly), options.sep,
1993                                !! (replicated), options.sep);
1994                 } else {
1995                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s%s\n",
1996                                db_id, name, path,
1997                                persistent ? " PERSISTENT" : "",
1998                                sticky ? " STICKY" : "",
1999                                readonly ? " READONLY" : "",
2000                                replicated ? " REPLICATED" : "",
2001                                health ? " UNHEALTHY" : "");
2002                 }
2003
2004                 talloc_free(discard_const(name));
2005                 talloc_free(discard_const(path));
2006                 talloc_free(discard_const(health));
2007         }
2008
2009         return 0;
2010 }
2011
2012 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2013                                int argc, const char **argv)
2014 {
2015         uint32_t db_id;
2016         const char *db_name, *db_path, *db_health;
2017         uint8_t db_flags;
2018         int ret;
2019
2020         if (argc != 1) {
2021                 usage("getdbstatus");
2022         }
2023
2024         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2025                 return 1;
2026         }
2027
2028         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
2029                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
2030                                   &db_path);
2031         if (ret != 0) {
2032                 return ret;
2033         }
2034
2035         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
2036                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
2037                                       &db_health);
2038         if (ret != 0) {
2039                 return ret;
2040         }
2041
2042         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
2043         printf("PERSISTENT: %s\nREPLICATED: %s\nSTICKY: %s\nREADONLY: %s\n",
2044                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
2045                (db_flags & CTDB_DB_FLAGS_REPLICATED ? "yes" : "no"),
2046                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
2047                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"));
2048         printf("HEALTH: %s\n", (db_health ? db_health : "OK"));
2049         return 0;
2050 }
2051
2052 struct dump_record_state {
2053         uint32_t count;
2054 };
2055
2056 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
2057
2058 static void dump_tdb_data(const char *name, TDB_DATA val)
2059 {
2060         int i;
2061
2062         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
2063         for (i=0; i<val.dsize; i++) {
2064                 if (ISASCII(val.dptr[i])) {
2065                         fprintf(stdout, "%c", val.dptr[i]);
2066                 } else {
2067                         fprintf(stdout, "\\%02X", val.dptr[i]);
2068                 }
2069         }
2070         fprintf(stdout, "\"\n");
2071 }
2072
2073 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
2074 {
2075         fprintf(stdout, "dmaster: %u\n", header->dmaster);
2076         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
2077         fprintf(stdout, "flags: 0x%08x", header->flags);
2078         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
2079                 fprintf(stdout, " MIGRATED_WITH_DATA");
2080         }
2081         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
2082                 fprintf(stdout, " VACUUM_MIGRATED");
2083         }
2084         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
2085                 fprintf(stdout, " AUTOMATIC");
2086         }
2087         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
2088                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
2089         }
2090         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
2091                 fprintf(stdout, " RO_HAVE_READONLY");
2092         }
2093         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
2094                 fprintf(stdout, " RO_REVOKING_READONLY");
2095         }
2096         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
2097                 fprintf(stdout, " RO_REVOKE_COMPLETE");
2098         }
2099         fprintf(stdout, "\n");
2100
2101 }
2102
2103 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
2104                        TDB_DATA key, TDB_DATA data, void *private_data)
2105 {
2106         struct dump_record_state *state =
2107                 (struct dump_record_state *)private_data;
2108
2109         state->count += 1;
2110
2111         dump_tdb_data("key", key);
2112         dump_ltdb_header(header);
2113         dump_tdb_data("data", data);
2114         fprintf(stdout, "\n");
2115
2116         return 0;
2117 }
2118
2119 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2120                          int argc, const char **argv)
2121 {
2122         struct ctdb_db_context *db;
2123         const char *db_name;
2124         uint32_t db_id;
2125         uint8_t db_flags;
2126         struct dump_record_state state;
2127         int ret;
2128
2129         if (argc != 1) {
2130                 usage("catdb");
2131         }
2132
2133         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2134                 return 1;
2135         }
2136
2137         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2138                           db_flags, &db);
2139         if (ret != 0) {
2140                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2141                 return ret;
2142         }
2143
2144         state.count = 0;
2145
2146         ret = ctdb_db_traverse(mem_ctx, ctdb->ev, ctdb->client, db,
2147                                ctdb->cmd_pnn, TIMEOUT(),
2148                                dump_record, &state);
2149
2150         printf("Dumped %u records\n", state.count);
2151
2152         return ret;
2153 }
2154
2155 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2156                           int argc, const char **argv)
2157 {
2158         struct ctdb_db_context *db;
2159         const char *db_name;
2160         uint32_t db_id;
2161         uint8_t db_flags;
2162         struct dump_record_state state;
2163         int ret;
2164
2165         if (argc != 1) {
2166                 usage("catdb");
2167         }
2168
2169         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2170                 return 1;
2171         }
2172
2173         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2174                           db_flags, &db);
2175         if (ret != 0) {
2176                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2177                 return ret;
2178         }
2179
2180         state.count = 0;
2181         ret = ctdb_db_traverse_local(db, true, true, dump_record, &state);
2182
2183         printf("Dumped %u record(s)\n", state.count);
2184
2185         return ret;
2186 }
2187
2188 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2189                                    struct ctdb_context *ctdb,
2190                                    int argc, const char **argv)
2191 {
2192         uint32_t caps;
2193         int ret;
2194
2195         if (argc != 0) {
2196                 usage("getcapabilities");
2197         }
2198
2199         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2200                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2201         if (ret != 0) {
2202                 return ret;
2203         }
2204
2205         if (options.machinereadable == 1) {
2206                 printf("%s%s%s%s%s\n",
2207                        options.sep,
2208                        "RECMASTER", options.sep,
2209                        "LMASTER", options.sep);
2210                 printf("%s%d%s%d%s\n", options.sep,
2211                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2212                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2213         } else {
2214                 printf("RECMASTER: %s\n",
2215                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2216                 printf("LMASTER: %s\n",
2217                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2218         }
2219
2220         return 0;
2221 }
2222
2223 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2224                        int argc, const char **argv)
2225 {
2226         printf("%u\n", ctdb_client_pnn(ctdb->client));
2227         return 0;
2228 }
2229
2230 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2231                        int argc, const char **argv)
2232 {
2233         char *t, *lvs_helper = NULL;
2234
2235         if (argc != 1) {
2236                 usage("lvs");
2237         }
2238
2239         t = getenv("CTDB_LVS_HELPER");
2240         if (t != NULL) {
2241                 lvs_helper = talloc_strdup(mem_ctx, t);
2242         } else {
2243                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2244                                              CTDB_HELPER_BINDIR);
2245         }
2246
2247         if (lvs_helper == NULL) {
2248                 fprintf(stderr, "Unable to set LVS helper\n");
2249                 return 1;
2250         }
2251
2252         return run_helper(mem_ctx, "LVS helper", lvs_helper, argc, argv);
2253 }
2254
2255 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2256                             int argc, const char **argv)
2257 {
2258         int log_level;
2259         int ret;
2260         bool found;
2261
2262         if (argc != 1) {
2263                 usage("setdebug");
2264         }
2265
2266         found = debug_level_parse(argv[0], &log_level);
2267         if (! found) {
2268                 fprintf(stderr,
2269                         "Invalid debug level '%s'. Valid levels are:\n",
2270                         argv[0]);
2271                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2272                 return 1;
2273         }
2274
2275         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2276                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2277         if (ret != 0) {
2278                 return ret;
2279         }
2280
2281         return 0;
2282 }
2283
2284 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2285                             int argc, const char **argv)
2286 {
2287         int loglevel;
2288         const char *log_str;
2289         int ret;
2290
2291         if (argc != 0) {
2292                 usage("getdebug");
2293         }
2294
2295         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2296                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2297         if (ret != 0) {
2298                 return ret;
2299         }
2300
2301         log_str = debug_level_to_string(loglevel);
2302         printf("%s\n", log_str);
2303
2304         return 0;
2305 }
2306
2307 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2308                           int argc, const char **argv)
2309 {
2310         const char *db_name;
2311         uint8_t db_flags = 0;
2312         int ret;
2313
2314         if (argc < 1 || argc > 2) {
2315                 usage("attach");
2316         }
2317
2318         db_name = argv[0];
2319         if (argc == 2) {
2320                 if (strcmp(argv[1], "persistent") == 0) {
2321                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2322                 } else if (strcmp(argv[1], "readonly") == 0) {
2323                         db_flags = CTDB_DB_FLAGS_READONLY;
2324                 } else if (strcmp(argv[1], "sticky") == 0) {
2325                         db_flags = CTDB_DB_FLAGS_STICKY;
2326                 } else if (strcmp(argv[1], "replicated") == 0) {
2327                         db_flags = CTDB_DB_FLAGS_REPLICATED;
2328                 } else {
2329                         usage("attach");
2330                 }
2331         }
2332
2333         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2334                           db_flags, NULL);
2335         if (ret != 0) {
2336                 return ret;
2337         }
2338
2339         return 0;
2340 }
2341
2342 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2343                           int argc, const char **argv)
2344 {
2345         const char *db_name;
2346         uint32_t db_id;
2347         uint8_t db_flags;
2348         struct ctdb_node_map *nodemap;
2349         int recmode;
2350         int ret, ret2, i;
2351
2352         if (argc < 1) {
2353                 usage("detach");
2354         }
2355
2356         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2357                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2358         if (ret != 0) {
2359                 return ret;
2360         }
2361
2362         if (recmode == CTDB_RECOVERY_ACTIVE) {
2363                 fprintf(stderr, "Database cannot be detached"
2364                                 " when recovery is active\n");
2365                 return 1;
2366         }
2367
2368         nodemap = get_nodemap(ctdb, false);
2369         if (nodemap == NULL) {
2370                 return 1;
2371         }
2372
2373         for (i=0; i<nodemap->num; i++) {
2374                 uint32_t value;
2375
2376                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2377                         continue;
2378                 }
2379                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2380                         continue;
2381                 }
2382                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2383                         fprintf(stderr, "Database cannot be detached on"
2384                                 " inactive (stopped or banned) node %u\n",
2385                                 nodemap->node[i].pnn);
2386                         return 1;
2387                 }
2388
2389                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2390                                             nodemap->node[i].pnn, TIMEOUT(),
2391                                             "AllowClientDBAttach", &value);
2392                 if (ret != 0) {
2393                         fprintf(stderr,
2394                                 "Unable to get tunable AllowClientDBAttach"
2395                                 " from node %u\n", nodemap->node[i].pnn);
2396                         return ret;
2397                 }
2398
2399                 if (value == 1) {
2400                         fprintf(stderr,
2401                                 "Database access is still active on node %u."
2402                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2403                                 nodemap->node[i].pnn);
2404                         return 1;
2405                 }
2406         }
2407
2408         ret2 = 0;
2409         for (i=0; i<argc; i++) {
2410                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2411                                 &db_flags)) {
2412                         continue;
2413                 }
2414
2415                 if (db_flags &
2416                     (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
2417                         fprintf(stderr,
2418                                 "Only volatile databases can be detached\n");
2419                         return 1;
2420                 }
2421
2422                 ret = ctdb_detach(ctdb->ev, ctdb->client, TIMEOUT(), db_id);
2423                 if (ret != 0) {
2424                         fprintf(stderr, "Database %s detach failed\n", db_name);
2425                         ret2 = ret;
2426                 }
2427         }
2428
2429         return ret2;
2430 }
2431
2432 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2433                               int argc, const char **argv)
2434 {
2435         const char *mem_str;
2436         ssize_t n;
2437         int ret;
2438
2439         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2440                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2441         if (ret != 0) {
2442                 return ret;
2443         }
2444
2445         n = write(1, mem_str, strlen(mem_str)+1);
2446         if (n < 0 || n != strlen(mem_str)+1) {
2447                 fprintf(stderr, "Failed to write talloc summary\n");
2448                 return 1;
2449         }
2450
2451         return 0;
2452 }
2453
2454 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2455 {
2456         bool *done = (bool *)private_data;
2457         ssize_t n;
2458
2459         n = write(1, data.dptr, data.dsize);
2460         if (n < 0 || n != data.dsize) {
2461                 fprintf(stderr, "Failed to write talloc summary\n");
2462         }
2463
2464         *done = true;
2465 }
2466
2467 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2468                                 int argc, const char **argv)
2469 {
2470         struct ctdb_srvid_message msg = { 0 };
2471         int ret;
2472         bool done = false;
2473
2474         msg.pnn = ctdb->pnn;
2475         msg.srvid = next_srvid(ctdb);
2476
2477         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2478                                               msg.srvid, dump_memory, &done);
2479         if (ret != 0) {
2480                 return ret;
2481         }
2482
2483         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2484                                     ctdb->cmd_pnn, &msg);
2485         if (ret != 0) {
2486                 return ret;
2487         }
2488
2489         ctdb_client_wait(ctdb->ev, &done);
2490         return 0;
2491 }
2492
2493 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2494                           int argc, const char **argv)
2495 {
2496         pid_t pid;
2497         int ret;
2498
2499         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2500                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2501         if (ret != 0) {
2502                 return ret;
2503         }
2504
2505         printf("%u\n", pid);
2506         return 0;
2507 }
2508
2509 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2510                        const char *desc, uint32_t flag, bool set_flag)
2511 {
2512         struct ctdb_node_map *nodemap;
2513         bool flag_is_set;
2514
2515         nodemap = get_nodemap(ctdb, false);
2516         if (nodemap == NULL) {
2517                 return 1;
2518         }
2519
2520         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2521         if (set_flag == flag_is_set) {
2522                 if (set_flag) {
2523                         fprintf(stderr, "Node %u is already %s\n",
2524                                 ctdb->cmd_pnn, desc);
2525                 } else {
2526                         fprintf(stderr, "Node %u is not %s\n",
2527                                 ctdb->cmd_pnn, desc);
2528                 }
2529                 return 0;
2530         }
2531
2532         return 1;
2533 }
2534
2535 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2536                            uint32_t flag, bool set_flag)
2537 {
2538         struct ctdb_node_map *nodemap;
2539         bool flag_is_set;
2540
2541         while (1) {
2542                 nodemap = get_nodemap(ctdb, true);
2543                 if (nodemap == NULL) {
2544                         fprintf(stderr,
2545                                 "Failed to get nodemap, trying again\n");
2546                         sleep(1);
2547                         continue;
2548                 }
2549
2550                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2551                 if (flag_is_set == set_flag) {
2552                         break;
2553                 }
2554
2555                 sleep(1);
2556         }
2557 }
2558
2559 static int ctdb_ctrl_modflags(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2560                               struct ctdb_client_context *client,
2561                               uint32_t destnode, struct timeval timeout,
2562                               uint32_t set, uint32_t clear)
2563 {
2564         struct ctdb_node_map *nodemap;
2565         struct ctdb_node_flag_change flag_change;
2566         struct ctdb_req_control request;
2567         uint32_t *pnn_list;
2568         int ret, count;
2569
2570         ret = ctdb_ctrl_get_nodemap(mem_ctx, ev, client, destnode,
2571                                     tevent_timeval_zero(), &nodemap);
2572         if (ret != 0) {
2573                 return ret;
2574         }
2575
2576         flag_change.pnn = destnode;
2577         flag_change.old_flags = nodemap->node[destnode].flags;
2578         flag_change.new_flags = flag_change.old_flags | set;
2579         flag_change.new_flags &= ~clear;
2580
2581         count = list_of_connected_nodes(nodemap, -1, mem_ctx, &pnn_list);
2582         if (count == -1) {
2583                 return ENOMEM;
2584         }
2585
2586         ctdb_req_control_modify_flags(&request, &flag_change);
2587         ret = ctdb_client_control_multi(mem_ctx, ev, client, pnn_list, count,
2588                                         tevent_timeval_zero(), &request,
2589                                         NULL, NULL);
2590         return ret;
2591 }
2592
2593 struct ipreallocate_state {
2594         int status;
2595         bool done;
2596 };
2597
2598 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2599                                  void *private_data)
2600 {
2601         struct ipreallocate_state *state =
2602                 (struct ipreallocate_state *)private_data;
2603
2604         if (data.dsize != sizeof(int)) {
2605                 /* Ignore packet */
2606                 return;
2607         }
2608
2609         state->status = *(int *)data.dptr;
2610         state->done = true;
2611 }
2612
2613 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2614 {
2615         struct ctdb_srvid_message msg = { 0 };
2616         struct ipreallocate_state state;
2617         int ret;
2618
2619         msg.pnn = ctdb->pnn;
2620         msg.srvid = next_srvid(ctdb);
2621
2622         state.done = false;
2623         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2624                                               msg.srvid,
2625                                               ipreallocate_handler, &state);
2626         if (ret != 0) {
2627                 return ret;
2628         }
2629
2630         while (true) {
2631                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2632                                                 ctdb->client,
2633                                                 CTDB_BROADCAST_CONNECTED,
2634                                                 &msg);
2635                 if (ret != 0) {
2636                         goto fail;
2637                 }
2638
2639                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2640                                                TIMEOUT());
2641                 if (ret != 0) {
2642                         continue;
2643                 }
2644
2645                 if (state.status >= 0) {
2646                         ret = 0;
2647                 } else {
2648                         ret = state.status;
2649                 }
2650                 break;
2651         }
2652
2653 fail:
2654         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2655                                            msg.srvid, &state);
2656         return ret;
2657 }
2658
2659 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2660                            int argc, const char **argv)
2661 {
2662         int ret;
2663
2664         if (argc != 0) {
2665                 usage("disable");
2666         }
2667
2668         ret = check_flags(mem_ctx, ctdb, "disabled",
2669                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2670         if (ret == 0) {
2671                 return 0;
2672         }
2673
2674         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2675                                  ctdb->cmd_pnn, TIMEOUT(),
2676                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2677         if (ret != 0) {
2678                 fprintf(stderr,
2679                         "Failed to set DISABLED flag on node %u\n",
2680                         ctdb->cmd_pnn);
2681                 return ret;
2682         }
2683
2684         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2685         return ipreallocate(mem_ctx, ctdb);
2686 }
2687
2688 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2689                           int argc, const char **argv)
2690 {
2691         int ret;
2692
2693         if (argc != 0) {
2694                 usage("enable");
2695         }
2696
2697         ret = check_flags(mem_ctx, ctdb, "disabled",
2698                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2699         if (ret == 0) {
2700                 return 0;
2701         }
2702
2703         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2704                                  ctdb->cmd_pnn, TIMEOUT(),
2705                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2706         if (ret != 0) {
2707                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2708                         ctdb->cmd_pnn);
2709                 return ret;
2710         }
2711
2712         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2713         return ipreallocate(mem_ctx, ctdb);
2714 }
2715
2716 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2717                         int argc, const char **argv)
2718 {
2719         int ret;
2720
2721         if (argc != 0) {
2722                 usage("stop");
2723         }
2724
2725         ret = check_flags(mem_ctx, ctdb, "stopped",
2726                           NODE_FLAGS_STOPPED, true);
2727         if (ret == 0) {
2728                 return 0;
2729         }
2730
2731         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2732                                   ctdb->cmd_pnn, TIMEOUT());
2733         if (ret != 0) {
2734                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2735                 return ret;
2736         }
2737
2738         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2739         return ipreallocate(mem_ctx, ctdb);
2740 }
2741
2742 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2743                             int argc, const char **argv)
2744 {
2745         int ret;
2746
2747         if (argc != 0) {
2748                 usage("continue");
2749         }
2750
2751         ret = check_flags(mem_ctx, ctdb, "stopped",
2752                           NODE_FLAGS_STOPPED, false);
2753         if (ret == 0) {
2754                 return 0;
2755         }
2756
2757         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2758                                       ctdb->cmd_pnn, TIMEOUT());
2759         if (ret != 0) {
2760                 fprintf(stderr, "Failed to continue stopped node %u\n",
2761                         ctdb->cmd_pnn);
2762                 return ret;
2763         }
2764
2765         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2766         return ipreallocate(mem_ctx, ctdb);
2767 }
2768
2769 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2770                        int argc, const char **argv)
2771 {
2772         struct ctdb_ban_state ban_state;
2773         int ret;
2774
2775         if (argc != 1) {
2776                 usage("ban");
2777         }
2778
2779         ret = check_flags(mem_ctx, ctdb, "banned",
2780                           NODE_FLAGS_BANNED, true);
2781         if (ret == 0) {
2782                 return 0;
2783         }
2784
2785         ban_state.pnn = ctdb->cmd_pnn;
2786         ban_state.time = strtoul(argv[0], NULL, 0);
2787
2788         if (ban_state.time == 0) {
2789                 fprintf(stderr, "Ban time cannot be zero\n");
2790                 return EINVAL;
2791         }
2792
2793         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2794                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2795         if (ret != 0) {
2796                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2797                 return ret;
2798         }
2799
2800         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2801         return ipreallocate(mem_ctx, ctdb);
2802
2803 }
2804
2805 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2806                          int argc, const char **argv)
2807 {
2808         struct ctdb_ban_state ban_state;
2809         int ret;
2810
2811         if (argc != 0) {
2812                 usage("unban");
2813         }
2814
2815         ret = check_flags(mem_ctx, ctdb, "banned",
2816                           NODE_FLAGS_BANNED, false);
2817         if (ret == 0) {
2818                 return 0;
2819         }
2820
2821         ban_state.pnn = ctdb->cmd_pnn;
2822         ban_state.time = 0;
2823
2824         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2825                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2826         if (ret != 0) {
2827                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2828                 return ret;
2829         }
2830
2831         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2832         return ipreallocate(mem_ctx, ctdb);
2833
2834 }
2835
2836 static void wait_for_shutdown(void *private_data)
2837 {
2838         bool *done = (bool *)private_data;
2839
2840         *done = true;
2841 }
2842
2843 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2844                             int argc, const char **argv)
2845 {
2846         int ret;
2847         bool done = false;
2848
2849         if (argc != 0) {
2850                 usage("shutdown");
2851         }
2852
2853         if (ctdb->pnn == ctdb->cmd_pnn) {
2854                 ctdb_client_set_disconnect_callback(ctdb->client,
2855                                                     wait_for_shutdown,
2856                                                     &done);
2857         }
2858
2859         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2860                                  ctdb->cmd_pnn, TIMEOUT());
2861         if (ret != 0) {
2862                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2863                 return ret;
2864         }
2865
2866         if (ctdb->pnn == ctdb->cmd_pnn) {
2867                 ctdb_client_wait(ctdb->ev, &done);
2868         }
2869
2870         return 0;
2871 }
2872
2873 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2874                           uint32_t *generation)
2875 {
2876         uint32_t recmaster;
2877         int recmode;
2878         struct ctdb_vnn_map *vnnmap;
2879         int ret;
2880
2881 again:
2882         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2883                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2884         if (ret != 0) {
2885                 fprintf(stderr, "Failed to find recovery master\n");
2886                 return ret;
2887         }
2888
2889         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2890                                     recmaster, TIMEOUT(), &recmode);
2891         if (ret != 0) {
2892                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2893                         recmaster);
2894                 return ret;
2895         }
2896
2897         if (recmode == CTDB_RECOVERY_ACTIVE) {
2898                 sleep(1);
2899                 goto again;
2900         }
2901
2902         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2903                                   recmaster, TIMEOUT(), &vnnmap);
2904         if (ret != 0) {
2905                 fprintf(stderr, "Failed to get generation from node %u\n",
2906                         recmaster);
2907                 return ret;
2908         }
2909
2910         if (vnnmap->generation == INVALID_GENERATION) {
2911                 talloc_free(vnnmap);
2912                 sleep(1);
2913                 goto again;
2914         }
2915
2916         *generation = vnnmap->generation;
2917         talloc_free(vnnmap);
2918         return 0;
2919 }
2920
2921
2922 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2923                            int argc, const char **argv)
2924 {
2925         uint32_t generation, next_generation;
2926         int ret;
2927
2928         if (argc != 0) {
2929                 usage("recover");
2930         }
2931
2932         ret = get_generation(mem_ctx, ctdb, &generation);
2933         if (ret != 0) {
2934                 return ret;
2935         }
2936
2937         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2938                                     ctdb->cmd_pnn, TIMEOUT(),
2939                                     CTDB_RECOVERY_ACTIVE);
2940         if (ret != 0) {
2941                 fprintf(stderr, "Failed to set recovery mode active\n");
2942                 return ret;
2943         }
2944
2945         while (1) {
2946                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2947                 if (ret != 0) {
2948                         fprintf(stderr,
2949                                 "Failed to confirm end of recovery\n");
2950                         return ret;
2951                 }
2952
2953                 if (next_generation != generation) {
2954                         break;
2955                 }
2956
2957                 sleep (1);
2958         }
2959
2960         return 0;
2961 }
2962
2963 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2964                                 int argc, const char **argv)
2965 {
2966         if (argc != 0) {
2967                 usage("ipreallocate");
2968         }
2969
2970         return ipreallocate(mem_ctx, ctdb);
2971 }
2972
2973 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2974                                   struct ctdb_context *ctdb,
2975                                   int argc, const char **argv)
2976 {
2977         uint32_t recmaster;
2978         int ret;
2979
2980         if (argc != 0) {
2981                 usage("isnotrecmaster");
2982         }
2983
2984         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2985                                       ctdb->pnn, TIMEOUT(), &recmaster);
2986         if (ret != 0) {
2987                 fprintf(stderr, "Failed to get recmaster\n");
2988                 return ret;
2989         }
2990
2991         if (recmaster != ctdb->pnn) {
2992                 printf("this node is not the recmaster\n");
2993                 return 1;
2994         }
2995
2996         printf("this node is the recmaster\n");
2997         return 0;
2998 }
2999
3000 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3001                            int argc, const char **argv)
3002 {
3003         struct ctdb_addr_info addr_info;
3004         int ret;
3005
3006         if (argc != 2) {
3007                 usage("gratarp");
3008         }
3009
3010         ret = ctdb_sock_addr_from_string(argv[0], &addr_info.addr, false);
3011         if (ret != 0) {
3012                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3013                 return 1;
3014         }
3015         addr_info.iface = argv[1];
3016
3017         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
3018                                             ctdb->cmd_pnn, TIMEOUT(),
3019                                             &addr_info);
3020         if (ret != 0) {
3021                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
3022                         ctdb->cmd_pnn);
3023                 return ret;
3024         }
3025
3026         return 0;
3027 }
3028
3029 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3030                            int argc, const char **argv)
3031 {
3032         ctdb_sock_addr src, dst;
3033         int ret;
3034
3035         if (argc != 0 && argc != 2) {
3036                 usage("tickle");
3037         }
3038
3039         if (argc == 0) {
3040                 struct ctdb_connection_list *clist;
3041                 int i, num_failed;
3042
3043                 /* Client first but the src/dst logic is confused */
3044                 ret = ctdb_connection_list_read(mem_ctx, false, &clist);
3045                 if (ret != 0) {
3046                         return ret;
3047                 }
3048
3049                 num_failed = 0;
3050                 for (i = 0; i < clist->num; i++) {
3051                         ret = ctdb_sys_send_tcp(&clist->conn[i].src,
3052                                                 &clist->conn[i].dst,
3053                                                 0, 0, 0);
3054                         if (ret != 0) {
3055                                 num_failed += 1;
3056                         }
3057                 }
3058
3059                 TALLOC_FREE(clist);
3060
3061                 if (num_failed > 0) {
3062                         fprintf(stderr, "Failed to send %d tickles\n",
3063                                 num_failed);
3064                         return 1;
3065                 }
3066
3067                 return 0;
3068         }
3069
3070
3071         ret = ctdb_sock_addr_from_string(argv[0], &src, true);
3072         if (ret != 0) {
3073                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3074                 return 1;
3075         }
3076
3077         ret = ctdb_sock_addr_from_string(argv[1], &dst, true);
3078         if (ret != 0) {
3079                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3080                 return 1;
3081         }
3082
3083         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3084         if (ret != 0) {
3085                 fprintf(stderr, "Failed to send tickle ack\n");
3086                 return ret;
3087         }
3088
3089         return 0;
3090 }
3091
3092 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3093                               int argc, const char **argv)
3094 {
3095         ctdb_sock_addr addr;
3096         struct ctdb_tickle_list *tickles;
3097         unsigned port = 0;
3098         int ret, i;
3099
3100         if (argc < 1 || argc > 2) {
3101                 usage("gettickles");
3102         }
3103
3104         if (argc == 2) {
3105                 port = strtoul(argv[1], NULL, 10);
3106         }
3107
3108         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
3109         if (ret != 0) {
3110                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3111                 return 1;
3112         }
3113         ctdb_sock_addr_set_port(&addr, port);
3114
3115         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3116                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3117                                             &tickles);
3118         if (ret != 0) {
3119                 fprintf(stderr, "Failed to get list of connections\n");
3120                 return ret;
3121         }
3122
3123         if (options.machinereadable) {
3124                 printf("%s%s%s%s%s%s%s%s%s\n",
3125                        options.sep,
3126                        "Source IP", options.sep,
3127                        "Port", options.sep,
3128                        "Destiation IP", options.sep,
3129                        "Port", options.sep);
3130                 for (i=0; i<tickles->num; i++) {
3131                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3132                                ctdb_sock_addr_to_string(
3133                                        mem_ctx, &tickles->conn[i].src, false),
3134                                options.sep,
3135                                ntohs(tickles->conn[i].src.ip.sin_port),
3136                                options.sep,
3137                                ctdb_sock_addr_to_string(
3138                                        mem_ctx, &tickles->conn[i].dst, false),
3139                                options.sep,
3140                                ntohs(tickles->conn[i].dst.ip.sin_port),
3141                                options.sep);
3142                 }
3143         } else {
3144                 printf("Connections for IP: %s\n",
3145                        ctdb_sock_addr_to_string(mem_ctx,
3146                                                 &tickles->addr, false));
3147                 printf("Num connections: %u\n", tickles->num);
3148                 for (i=0; i<tickles->num; i++) {
3149                         printf("SRC: %s   DST: %s\n",
3150                                ctdb_sock_addr_to_string(
3151                                        mem_ctx, &tickles->conn[i].src, true),
3152                                ctdb_sock_addr_to_string(
3153                                        mem_ctx, &tickles->conn[i].dst, true));
3154                 }
3155         }
3156
3157         talloc_free(tickles);
3158         return 0;
3159 }
3160
3161 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3162                                    struct ctdb_connection *conn);
3163
3164 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3165
3166 struct process_clist_state {
3167         struct ctdb_connection_list *clist;
3168         int count;
3169         int num_failed, num_total;
3170         clist_reply_func reply_func;
3171 };
3172
3173 static void process_clist_done(struct tevent_req *subreq);
3174
3175 static struct tevent_req *process_clist_send(
3176                                         TALLOC_CTX *mem_ctx,
3177                                         struct ctdb_context *ctdb,
3178                                         struct ctdb_connection_list *clist,
3179                                         clist_request_func request_func,
3180                                         clist_reply_func reply_func)
3181 {
3182         struct tevent_req *req, *subreq;
3183         struct process_clist_state *state;
3184         struct ctdb_req_control request;
3185         int i;
3186
3187         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3188         if (req == NULL) {
3189                 return NULL;
3190         }
3191
3192         state->clist = clist;
3193         state->reply_func = reply_func;
3194
3195         for (i = 0; i < clist->num; i++) {
3196                 request_func(&request, &clist->conn[i]);
3197                 subreq = ctdb_client_control_send(state, ctdb->ev,
3198                                                   ctdb->client, ctdb->cmd_pnn,
3199                                                   TIMEOUT(), &request);
3200                 if (tevent_req_nomem(subreq, req)) {
3201                         return tevent_req_post(req, ctdb->ev);
3202                 }
3203                 tevent_req_set_callback(subreq, process_clist_done, req);
3204         }
3205
3206         return req;
3207 }
3208
3209 static void process_clist_done(struct tevent_req *subreq)
3210 {
3211         struct tevent_req *req = tevent_req_callback_data(
3212                 subreq, struct tevent_req);
3213         struct process_clist_state *state = tevent_req_data(
3214                 req, struct process_clist_state);
3215         struct ctdb_reply_control *reply;
3216         int ret;
3217         bool status;
3218
3219         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3220         TALLOC_FREE(subreq);
3221         if (! status) {
3222                 state->num_failed += 1;
3223                 goto done;
3224         }
3225
3226         ret = state->reply_func(reply);
3227         if (ret != 0) {
3228                 state->num_failed += 1;
3229                 goto done;
3230         }
3231
3232 done:
3233         state->num_total += 1;
3234         if (state->num_total == state->clist->num) {
3235                 tevent_req_done(req);
3236         }
3237 }
3238
3239 static int process_clist_recv(struct tevent_req *req)
3240 {
3241         struct process_clist_state *state = tevent_req_data(
3242                 req, struct process_clist_state);
3243
3244         return state->num_failed;
3245 }
3246
3247 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3248                              int argc, const char **argv)
3249 {
3250         struct ctdb_connection conn;
3251         int ret;
3252
3253         if (argc != 0 && argc != 2) {
3254                 usage("addtickle");
3255         }
3256
3257         if (argc == 0) {
3258                 struct ctdb_connection_list *clist;
3259                 struct tevent_req *req;
3260
3261                 /* Client first but the src/dst logic is confused */
3262                 ret = ctdb_connection_list_read(mem_ctx, false, &clist);
3263                 if (ret != 0) {
3264                         return ret;
3265                 }
3266                 if (clist->num == 0) {
3267                         return 0;
3268                 }
3269
3270                 req = process_clist_send(mem_ctx, ctdb, clist,
3271                                  ctdb_req_control_tcp_add_delayed_update,
3272                                  ctdb_reply_control_tcp_add_delayed_update);
3273                 if (req == NULL) {
3274                         talloc_free(clist);
3275                         return ENOMEM;
3276                 }
3277
3278                 tevent_req_poll(req, ctdb->ev);
3279                 talloc_free(clist);
3280
3281                 ret = process_clist_recv(req);
3282                 if (ret != 0) {
3283                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3284                         return 1;
3285                 }
3286
3287                 return 0;
3288         }
3289
3290         ret = ctdb_sock_addr_from_string(argv[0], &conn.src, true);
3291         if (ret != 0) {
3292                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3293                 return 1;
3294         }
3295         ret = ctdb_sock_addr_from_string(argv[1], &conn.dst, true);
3296         if (ret != 0) {
3297                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3298                 return 1;
3299         }
3300
3301         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3302                                                ctdb->client, ctdb->cmd_pnn,
3303                                                TIMEOUT(), &conn);
3304         if (ret != 0) {
3305                 fprintf(stderr, "Failed to register connection\n");
3306                 return ret;
3307         }
3308
3309         return 0;
3310 }
3311
3312 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3313                              int argc, const char **argv)
3314 {
3315         struct ctdb_connection conn;
3316         int ret;
3317
3318         if (argc != 0 && argc != 2) {
3319                 usage("deltickle");
3320         }
3321
3322         if (argc == 0) {
3323                 struct ctdb_connection_list *clist;
3324                 struct tevent_req *req;
3325
3326                 /* Client first but the src/dst logic is confused */
3327                 ret = ctdb_connection_list_read(mem_ctx, false, &clist);
3328                 if (ret != 0) {
3329                         return ret;
3330                 }
3331                 if (clist->num == 0) {
3332                         return 0;
3333                 }
3334
3335                 req = process_clist_send(mem_ctx, ctdb, clist,
3336                                          ctdb_req_control_tcp_remove,
3337                                          ctdb_reply_control_tcp_remove);
3338                 if (req == NULL) {
3339                         talloc_free(clist);
3340                         return ENOMEM;
3341                 }
3342
3343                 tevent_req_poll(req, ctdb->ev);
3344                 talloc_free(clist);
3345
3346                 ret = process_clist_recv(req);
3347                 if (ret != 0) {
3348                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3349                         return 1;
3350                 }
3351
3352                 return 0;
3353         }
3354
3355         ret = ctdb_sock_addr_from_string(argv[0], &conn.src, true);
3356         if (ret != 0) {
3357                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3358                 return 1;
3359         }
3360         ret = ctdb_sock_addr_from_string(argv[1], &conn.dst, true);
3361         if (ret != 0) {
3362                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3363                 return 1;
3364         }
3365
3366         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3367                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3368         if (ret != 0) {
3369                 fprintf(stderr, "Failed to unregister connection\n");
3370                 return ret;
3371         }
3372
3373         return 0;
3374 }
3375
3376 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3377                              int argc, const char **argv)
3378 {
3379         struct ctdb_node_map *nodemap;
3380         int i;
3381
3382         if (argc != 0) {
3383                 usage("listnodes");
3384         }
3385
3386         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3387         if (nodemap == NULL) {
3388                 return 1;
3389         }
3390
3391         for (i=0; i<nodemap->num; i++) {
3392                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3393                         continue;
3394                 }
3395
3396                 if (options.machinereadable) {
3397                         printf("%s%u%s%s%s\n", options.sep,
3398                                nodemap->node[i].pnn, options.sep,
3399                                ctdb_sock_addr_to_string(
3400                                        mem_ctx, &nodemap->node[i].addr, false),
3401                                options.sep);
3402                 } else {
3403                         printf("%s\n",
3404                                ctdb_sock_addr_to_string(
3405                                        mem_ctx, &nodemap->node[i].addr, false));
3406                 }
3407         }
3408
3409         return 0;
3410 }
3411
3412 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3413                               struct ctdb_node_map *nodemap2)
3414 {
3415         int i;
3416
3417         if (nodemap1->num != nodemap2->num) {
3418                 return false;
3419         }
3420
3421         for (i=0; i<nodemap1->num; i++) {
3422                 struct ctdb_node_and_flags *n1, *n2;
3423
3424                 n1 = &nodemap1->node[i];
3425                 n2 = &nodemap2->node[i];
3426
3427                 if ((n1->pnn != n2->pnn) ||
3428                     (n1->flags != n2->flags) ||
3429                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3430                         return false;
3431                 }
3432         }
3433
3434         return true;
3435 }
3436
3437 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3438                                    struct ctdb_node_map *nm,
3439                                    struct ctdb_node_map *fnm,
3440                                    bool *reload)
3441 {
3442         int i;
3443         bool check_failed = false;
3444
3445         *reload = false;
3446
3447         for (i=0; i<nm->num; i++) {
3448                 if (i >= fnm->num) {
3449                         fprintf(stderr,
3450                                 "Node %u (%s) missing from nodes file\n",
3451                                 nm->node[i].pnn,
3452                                 ctdb_sock_addr_to_string(
3453                                         mem_ctx, &nm->node[i].addr, false));
3454                         check_failed = true;
3455                         continue;
3456                 }
3457                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3458                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3459                         /* Node remains deleted */
3460                         continue;
3461                 }
3462
3463                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3464                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3465                         /* Node not newly nor previously deleted */
3466                         if (! ctdb_same_ip(&nm->node[i].addr,
3467                                            &fnm->node[i].addr)) {
3468                                 fprintf(stderr,
3469                                         "Node %u has changed IP address"
3470                                         " (was %s, now %s)\n",
3471                                         nm->node[i].pnn,
3472                                         ctdb_sock_addr_to_string(
3473                                                 mem_ctx,
3474                                                 &nm->node[i].addr, false),
3475                                         ctdb_sock_addr_to_string(
3476                                                 mem_ctx,
3477                                                 &fnm->node[i].addr, false));
3478                                 check_failed = true;
3479                         } else {
3480                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3481                                         fprintf(stderr,
3482                                                 "WARNING: Node %u is disconnected."
3483                                                 " You MUST fix this node manually!\n",
3484                                                 nm->node[i].pnn);
3485                                 }
3486                         }
3487                         continue;
3488                 }
3489
3490                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3491                         /* Node is being deleted */
3492                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3493                         *reload = true;
3494                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3495                                 fprintf(stderr,
3496                                         "ERROR: Node %u is still connected\n",
3497                                         nm->node[i].pnn);
3498                                 check_failed = true;
3499                         }
3500                         continue;
3501                 }
3502
3503                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3504                         /* Node was previously deleted */
3505                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3506                         *reload = true;
3507                 }
3508         }
3509
3510         if (check_failed) {
3511                 fprintf(stderr,
3512                         "ERROR: Nodes will not be reloaded due to previous error\n");
3513                 return 1;
3514         }
3515
3516         /* Leftover nodes in file are NEW */
3517         for (; i < fnm->num; i++) {
3518                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3519                 *reload = true;
3520         }
3521
3522         return 0;
3523 }
3524
3525 struct disable_recoveries_state {
3526         uint32_t *pnn_list;
3527         int node_count;
3528         bool *reply;
3529         int status;
3530         bool done;
3531 };
3532
3533 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3534                                        void *private_data)
3535 {
3536         struct disable_recoveries_state *state =
3537                 (struct disable_recoveries_state *)private_data;
3538         int ret, i;
3539
3540         if (data.dsize != sizeof(int)) {
3541                 /* Ignore packet */
3542                 return;
3543         }
3544
3545         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3546         ret = *(int *)data.dptr;
3547         if (ret < 0) {
3548                 state->status = ret;
3549                 state->done = true;
3550                 return;
3551         }
3552         for (i=0; i<state->node_count; i++) {
3553                 if (state->pnn_list[i] == ret) {
3554                         state->reply[i] = true;
3555                         break;
3556                 }
3557         }
3558
3559         state->done = true;
3560         for (i=0; i<state->node_count; i++) {
3561                 if (! state->reply[i]) {
3562                         state->done = false;
3563                         break;
3564                 }
3565         }
3566 }
3567
3568 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3569                               uint32_t timeout, uint32_t *pnn_list, int count)
3570 {
3571         struct ctdb_disable_message disable = { 0 };
3572         struct disable_recoveries_state state;
3573         int ret, i;
3574
3575         disable.pnn = ctdb->pnn;
3576         disable.srvid = next_srvid(ctdb);
3577         disable.timeout = timeout;
3578
3579         state.pnn_list = pnn_list;
3580         state.node_count = count;
3581         state.done = false;
3582         state.status = 0;
3583         state.reply = talloc_zero_array(mem_ctx, bool, count);
3584         if (state.reply == NULL) {
3585                 return ENOMEM;
3586         }
3587
3588         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3589                                               disable.srvid,
3590                                               disable_recoveries_handler,
3591                                               &state);
3592         if (ret != 0) {
3593                 return ret;
3594         }
3595
3596         for (i=0; i<count; i++) {
3597                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3598                                                       ctdb->client,
3599                                                       pnn_list[i],
3600                                                       &disable);
3601                 if (ret != 0) {
3602                         goto fail;
3603                 }
3604         }
3605
3606         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3607         if (ret == ETIME) {
3608                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3609         } else {
3610                 ret = (state.status >= 0 ? 0 : 1);
3611         }
3612
3613 fail:
3614         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3615                                            disable.srvid, &state);
3616         return ret;
3617 }
3618
3619 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3620                                int argc, const char **argv)
3621 {
3622         struct ctdb_node_map *nodemap = NULL;
3623         struct ctdb_node_map *file_nodemap;
3624         struct ctdb_node_map *remote_nodemap;
3625         struct ctdb_req_control request;
3626         struct ctdb_reply_control **reply;
3627         bool reload;
3628         int ret, i;
3629         uint32_t *pnn_list;
3630         int count;
3631
3632         nodemap = get_nodemap(ctdb, false);
3633         if (nodemap == NULL) {
3634                 return 1;
3635         }
3636
3637         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3638         if (file_nodemap == NULL) {
3639                 return 1;
3640         }
3641
3642         for (i=0; i<nodemap->num; i++) {
3643                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3644                         continue;
3645                 }
3646
3647                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3648                                                nodemap->node[i].pnn, TIMEOUT(),
3649                                                &remote_nodemap);
3650                 if (ret != 0) {
3651                         fprintf(stderr,
3652                                 "ERROR: Failed to get nodes file from node %u\n",
3653                                 nodemap->node[i].pnn);
3654                         return ret;
3655                 }
3656
3657                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3658                         fprintf(stderr,
3659                                 "ERROR: Nodes file on node %u differs"
3660                                  " from current node (%u)\n",
3661                                  nodemap->node[i].pnn, ctdb->pnn);
3662                         return 1;
3663                 }
3664         }
3665
3666         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3667         if (ret != 0) {
3668                 return ret;
3669         }
3670
3671         if (! reload) {
3672                 fprintf(stderr, "No change in nodes file,"
3673                                 " skipping unnecessary reload\n");
3674                 return 0;
3675         }
3676
3677         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3678                                         mem_ctx, &pnn_list);
3679         if (count <= 0) {
3680                 fprintf(stderr, "Memory allocation error\n");
3681                 return 1;
3682         }
3683
3684         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3685                                  pnn_list, count);
3686         if (ret != 0) {
3687                 fprintf(stderr, "Failed to disable recoveries\n");
3688                 return ret;
3689         }
3690
3691         ctdb_req_control_reload_nodes_file(&request);
3692         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3693                                         pnn_list, count, TIMEOUT(),
3694                                         &request, NULL, &reply);
3695         if (ret != 0) {
3696                 bool failed = false;
3697
3698                 for (i=0; i<count; i++) {
3699                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3700                         if (ret != 0) {
3701                                 fprintf(stderr,
3702                                         "Node %u failed to reload nodes\n",
3703                                         pnn_list[i]);
3704                                 failed = true;
3705                         }
3706                 }
3707                 if (failed) {
3708                         fprintf(stderr,
3709                                 "You MUST fix failed nodes manually!\n");
3710                 }
3711         }
3712
3713         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3714         if (ret != 0) {
3715                 fprintf(stderr, "Failed to enable recoveries\n");
3716                 return ret;
3717         }
3718
3719         return 0;
3720 }
3721
3722 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3723                   ctdb_sock_addr *addr, uint32_t pnn)
3724 {
3725         struct ctdb_public_ip_list *pubip_list;
3726         struct ctdb_public_ip pubip;
3727         struct ctdb_node_map *nodemap;
3728         struct ctdb_req_control request;
3729         uint32_t *pnn_list;
3730         int ret, i, count;
3731
3732         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3733                                             CTDB_BROADCAST_CONNECTED,
3734                                             2*options.timelimit);
3735         if (ret != 0) {
3736                 fprintf(stderr, "Failed to disable IP check\n");
3737                 return ret;
3738         }
3739
3740         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3741                                        pnn, TIMEOUT(), false, &pubip_list);
3742         if (ret != 0) {
3743                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3744                         pnn);
3745                 return ret;
3746         }
3747
3748         for (i=0; i<pubip_list->num; i++) {
3749                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3750                         break;
3751                 }
3752         }
3753
3754         if (i == pubip_list->num) {
3755                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3756                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr, false));
3757                 return 1;
3758         }
3759
3760         nodemap = get_nodemap(ctdb, false);
3761         if (nodemap == NULL) {
3762                 return 1;
3763         }
3764
3765         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3766         if (count <= 0) {
3767                 fprintf(stderr, "Memory allocation error\n");
3768                 return 1;
3769         }
3770
3771         pubip.pnn = pnn;
3772         pubip.addr = *addr;
3773         ctdb_req_control_release_ip(&request, &pubip);
3774
3775         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3776                                         pnn_list, count, TIMEOUT(),
3777                                         &request, NULL, NULL);
3778         if (ret != 0) {
3779                 fprintf(stderr, "Failed to release IP on nodes\n");
3780                 return ret;
3781         }
3782
3783         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3784                                     pnn, TIMEOUT(), &pubip);
3785         if (ret != 0) {
3786                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3787                 return ret;
3788         }
3789
3790         return 0;
3791 }
3792
3793 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3794                           int argc, const char **argv)
3795 {
3796         ctdb_sock_addr addr;
3797         uint32_t pnn;
3798         int ret, retries = 0;
3799
3800         if (argc != 2) {
3801                 usage("moveip");
3802         }
3803
3804         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
3805         if (ret != 0) {
3806                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3807                 return 1;
3808         }
3809
3810         pnn = strtoul(argv[1], NULL, 10);
3811         if (pnn == CTDB_UNKNOWN_PNN) {
3812                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3813                 return 1;
3814         }
3815
3816         while (retries < 5) {
3817                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3818                 if (ret == 0) {
3819                         break;
3820                 }
3821
3822                 sleep(3);
3823                 retries++;
3824         }
3825
3826         if (ret != 0) {
3827                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3828                         argv[0], pnn);
3829                 return ret;
3830         }
3831
3832         return 0;
3833 }
3834
3835 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3836                          uint32_t pnn)
3837 {
3838         int ret;
3839
3840         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3841                                           CTDB_BROADCAST_CONNECTED, pnn);
3842         if (ret != 0) {
3843                 fprintf(stderr,
3844                         "Failed to ask recovery master to distribute IPs\n");
3845                 return ret;
3846         }
3847
3848         return 0;
3849 }
3850
3851 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3852                          int argc, const char **argv)
3853 {
3854         ctdb_sock_addr addr;
3855         struct ctdb_public_ip_list *pubip_list;
3856         struct ctdb_addr_info addr_info;
3857         unsigned int mask;
3858         int ret, i, retries = 0;
3859
3860         if (argc != 2) {
3861                 usage("addip");
3862         }
3863
3864         if (! parse_ip_mask(argv[0], argv[1], &addr, &mask)) {
3865                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3866                 return 1;
3867         }
3868
3869         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3870                                        ctdb->cmd_pnn, TIMEOUT(),
3871                                        false, &pubip_list);
3872         if (ret != 0) {
3873                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3874                         ctdb->cmd_pnn);
3875                 return 1;
3876         }
3877
3878         for (i=0; i<pubip_list->num; i++) {
3879                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3880                         fprintf(stderr, "Node already knows about IP %s\n",
3881                                 ctdb_sock_addr_to_string(mem_ctx,
3882                                                          &addr, false));
3883                         return 0;
3884                 }
3885         }
3886
3887         addr_info.addr = addr;
3888         addr_info.mask = mask;
3889         addr_info.iface = argv[1];
3890
3891         while (retries < 5) {
3892                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3893                                               ctdb->cmd_pnn, TIMEOUT(),
3894                                               &addr_info);
3895                 if (ret == 0) {
3896                         break;
3897                 }
3898
3899                 sleep(3);
3900                 retries++;
3901         }
3902
3903         if (ret != 0) {
3904                 fprintf(stderr, "Failed to add public IP to node %u."
3905                                 " Giving up\n", ctdb->cmd_pnn);
3906                 return ret;
3907         }
3908
3909         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3910         if (ret != 0) {
3911                 return ret;
3912         }
3913
3914         return 0;
3915 }
3916
3917 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3918                          int argc, const char **argv)
3919 {
3920         ctdb_sock_addr addr;
3921         struct ctdb_public_ip_list *pubip_list;
3922         struct ctdb_addr_info addr_info;
3923         int ret, i;
3924
3925         if (argc != 1) {
3926                 usage("delip");
3927         }
3928
3929         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
3930         if (ret != 0) {
3931                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3932                 return 1;
3933         }
3934
3935         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3936                                        ctdb->cmd_pnn, TIMEOUT(),
3937                                        false, &pubip_list);
3938         if (ret != 0) {
3939                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3940                         ctdb->cmd_pnn);
3941                 return 1;
3942         }
3943
3944         for (i=0; i<pubip_list->num; i++) {
3945                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3946                         break;
3947                 }
3948         }
3949
3950         if (i == pubip_list->num) {
3951                 fprintf(stderr, "Node does not know about IP address %s\n",
3952                         ctdb_sock_addr_to_string(mem_ctx, &addr, false));
3953                 return 0;
3954         }
3955
3956         addr_info.addr = addr;
3957         addr_info.mask = 0;
3958         addr_info.iface = NULL;
3959
3960         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3961                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3962         if (ret != 0) {
3963                 fprintf(stderr, "Failed to delete public IP from node %u\n",
3964                         ctdb->cmd_pnn);
3965                 return ret;
3966         }
3967
3968         return 0;
3969 }
3970
3971 #define DB_VERSION      3
3972 #define MAX_DB_NAME     64
3973 #define MAX_REC_BUFFER_SIZE     (100*1000)
3974
3975 struct db_header {
3976         unsigned long version;
3977         time_t timestamp;
3978         unsigned long flags;
3979         unsigned long nbuf;
3980         unsigned long nrec;
3981         char name[MAX_DB_NAME];
3982 };
3983
3984 struct backup_state {
3985         TALLOC_CTX *mem_ctx;
3986         struct ctdb_rec_buffer *recbuf;
3987         uint32_t db_id;
3988         int fd;
3989         unsigned int nbuf, nrec;
3990 };
3991
3992 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
3993                           TDB_DATA key, TDB_DATA data, void *private_data)
3994 {
3995         struct backup_state *state = (struct backup_state *)private_data;
3996         size_t len;
3997         int ret;
3998
3999         if (state->recbuf == NULL) {
4000                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
4001                                                      state->db_id);
4002                 if (state->recbuf == NULL) {
4003                         return ENOMEM;
4004                 }
4005         }
4006
4007         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
4008                                   header, key, data);
4009         if (ret != 0) {
4010                 return ret;
4011         }
4012
4013         len = ctdb_rec_buffer_len(state->recbuf);
4014         if (len < MAX_REC_BUFFER_SIZE) {
4015                 return 0;
4016         }
4017
4018         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
4019         if (ret != 0) {
4020                 fprintf(stderr, "Failed to write records to backup file\n");
4021                 return ret;
4022         }
4023
4024         state->nbuf += 1;
4025         state->nrec += state->recbuf->count;
4026         TALLOC_FREE(state->recbuf);
4027
4028         return 0;
4029 }
4030
4031 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4032                             int argc, const char **argv)
4033 {
4034         const char *db_name;
4035         struct ctdb_db_context *db;
4036         uint32_t db_id;
4037         uint8_t db_flags;
4038         struct backup_state state;
4039         struct db_header db_hdr;
4040         int fd, ret;
4041
4042         if (argc != 2) {
4043                 usage("backupdb");
4044         }
4045
4046         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4047                 return 1;
4048         }
4049
4050         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4051                           db_flags, &db);
4052         if (ret != 0) {
4053                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4054                 return ret;
4055         }
4056
4057         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4058         if (fd == -1) {
4059                 ret = errno;
4060                 fprintf(stderr, "Failed to open file %s for writing\n",
4061                         argv[1]);
4062                 return ret;
4063         }
4064
4065         /* Write empty header first */
4066         ZERO_STRUCT(db_hdr);
4067         ret = write(fd, &db_hdr, sizeof(struct db_header));
4068         if (ret == -1) {
4069                 ret = errno;
4070                 close(fd);
4071                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4072                 return ret;
4073         }
4074
4075         state.mem_ctx = mem_ctx;
4076         state.recbuf = NULL;
4077         state.fd = fd;
4078         state.nbuf = 0;
4079         state.nrec = 0;
4080
4081         ret = ctdb_db_traverse_local(db, true, false, backup_handler, &state);
4082         if (ret != 0) {
4083                 fprintf(stderr, "Failed to collect records from DB %s\n",
4084                         db_name);
4085                 close(fd);
4086                 return ret;
4087         }
4088
4089         if (state.recbuf != NULL) {
4090                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4091                 if (ret != 0) {
4092                         fprintf(stderr,
4093                                 "Failed to write records to backup file\n");
4094                         close(fd);
4095                         return ret;
4096                 }
4097
4098                 state.nbuf += 1;
4099                 state.nrec += state.recbuf->count;
4100                 TALLOC_FREE(state.recbuf);
4101         }
4102
4103         db_hdr.version = DB_VERSION;
4104         db_hdr.timestamp = time(NULL);
4105         db_hdr.flags = db_flags;
4106         db_hdr.nbuf = state.nbuf;
4107         db_hdr.nrec = state.nrec;
4108         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4109
4110         lseek(fd, 0, SEEK_SET);
4111         ret = write(fd, &db_hdr, sizeof(struct db_header));
4112         if (ret == -1) {
4113                 ret = errno;
4114                 close(fd);
4115                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4116                 return ret;
4117         }
4118
4119         close(fd);
4120         printf("Database backed up to %s\n", argv[1]);
4121         return 0;
4122 }
4123
4124 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4125                              int argc, const char **argv)
4126 {
4127         const char *db_name = NULL;
4128         struct ctdb_db_context *db;
4129         struct db_header db_hdr;
4130         struct ctdb_node_map *nodemap;
4131         struct ctdb_req_control request;
4132         struct ctdb_reply_control **reply;
4133         struct ctdb_transdb wipedb;
4134         struct ctdb_pulldb_ext pulldb;
4135         struct ctdb_rec_buffer *recbuf;
4136         uint32_t generation;
4137         uint32_t *pnn_list;
4138         char timebuf[128];
4139         ssize_t n;
4140         int fd, i;
4141         int count, ret;
4142         uint8_t db_flags;
4143
4144         if (argc < 1 || argc > 2) {
4145                 usage("restoredb");
4146         }
4147
4148         fd = open(argv[0], O_RDONLY, 0600);
4149         if (fd == -1) {
4150                 ret = errno;
4151                 fprintf(stderr, "Failed to open file %s for reading\n",
4152                         argv[0]);
4153                 return ret;
4154         }
4155
4156         if (argc == 2) {
4157                 db_name = argv[1];
4158         }
4159
4160         n = read(fd, &db_hdr, sizeof(struct db_header));
4161         if (n == -1) {
4162                 ret = errno;
4163                 close(fd);
4164                 fprintf(stderr, "Failed to read db header from file %s\n",
4165                         argv[0]);
4166                 return ret;
4167         }
4168         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4169
4170         if (db_hdr.version != DB_VERSION) {
4171                 fprintf(stderr,
4172                         "Wrong version of backup file, expected %u, got %lu\n",
4173                         DB_VERSION, db_hdr.version);
4174                 close(fd);
4175                 return EINVAL;
4176         }
4177
4178         if (db_name == NULL) {
4179                 db_name = db_hdr.name;
4180         }
4181
4182         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4183                  localtime(&db_hdr.timestamp));
4184         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4185
4186         db_flags = db_hdr.flags & 0xff;
4187         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4188                           db_flags, &db);
4189         if (ret != 0) {
4190                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4191                 close(fd);
4192                 return ret;
4193         }
4194
4195         nodemap = get_nodemap(ctdb, false);
4196         if (nodemap == NULL) {
4197                 fprintf(stderr, "Failed to get nodemap\n");
4198                 close(fd);
4199                 return ENOMEM;
4200         }
4201
4202         ret = get_generation(mem_ctx, ctdb, &generation);
4203         if (ret != 0) {
4204                 fprintf(stderr, "Failed to get current generation\n");
4205                 close(fd);
4206                 return ret;
4207         }
4208
4209         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4210                                      &pnn_list);
4211         if (count <= 0) {
4212                 close(fd);
4213                 return ENOMEM;
4214         }
4215
4216         wipedb.db_id = ctdb_db_id(db);
4217         wipedb.tid = generation;
4218
4219         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4220         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4221                                         ctdb->client, pnn_list, count,
4222                                         TIMEOUT(), &request, NULL, NULL);
4223         if (ret != 0) {
4224                 goto failed;
4225         }
4226
4227
4228         ctdb_req_control_db_transaction_start(&request, &wipedb);
4229         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4230                                         pnn_list, count, TIMEOUT(),
4231                                         &request, NULL, NULL);
4232         if (ret != 0) {
4233                 goto failed;
4234         }
4235
4236         ctdb_req_control_wipe_database(&request, &wipedb);
4237         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4238                                         pnn_list, count, TIMEOUT(),
4239                                         &request, NULL, NULL);
4240         if (ret != 0) {
4241                 goto failed;
4242         }
4243
4244         pulldb.db_id = ctdb_db_id(db);
4245         pulldb.lmaster = 0;
4246         pulldb.srvid = SRVID_CTDB_PUSHDB;
4247
4248         ctdb_req_control_db_push_start(&request, &pulldb);
4249         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4250                                         pnn_list, count, TIMEOUT(),
4251                                         &request, NULL, NULL);
4252         if (ret != 0) {
4253                 goto failed;
4254         }
4255
4256         for (i=0; i<db_hdr.nbuf; i++) {
4257                 struct ctdb_req_message message;
4258                 TDB_DATA data;
4259                 size_t np;
4260
4261                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4262                 if (ret != 0) {
4263                         goto failed;
4264                 }
4265
4266                 data.dsize = ctdb_rec_buffer_len(recbuf);
4267                 data.dptr = talloc_size(mem_ctx, data.dsize);
4268                 if (data.dptr == NULL) {
4269                         goto failed;
4270                 }
4271
4272                 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
4273
4274                 message.srvid = pulldb.srvid;
4275                 message.data.data = data;
4276
4277                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4278                                                 ctdb->client,
4279                                                 pnn_list, count,
4280                                                 &message, NULL);
4281                 if (ret != 0) {
4282                         goto failed;
4283                 }
4284
4285                 talloc_free(recbuf);
4286                 talloc_free(data.dptr);
4287         }
4288
4289         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4290         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4291                                         pnn_list, count, TIMEOUT(),
4292                                         &request, NULL, &reply);
4293         if (ret != 0) {
4294                 goto failed;
4295         }
4296
4297         for (i=0; i<count; i++) {
4298                 uint32_t num_records;
4299
4300                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4301                                                          &num_records);
4302                 if (ret != 0) {
4303                         fprintf(stderr, "Invalid response from node %u\n",
4304                                 pnn_list[i]);
4305                         goto failed;
4306                 }
4307
4308                 if (num_records != db_hdr.nrec) {
4309                         fprintf(stderr, "Node %u received %u of %lu records\n",
4310                                 pnn_list[i], num_records, db_hdr.nrec);
4311                         goto failed;
4312                 }
4313         }
4314
4315         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4316         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4317                                         pnn_list, count, TIMEOUT(),
4318                                         &request, NULL, NULL);
4319         if (ret != 0) {
4320                 goto failed;
4321         }
4322
4323         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4324         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4325                                         pnn_list, count, TIMEOUT(),
4326                                         &request, NULL, NULL);
4327         if (ret != 0) {
4328                 goto failed;
4329         }
4330
4331         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4332         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4333                                         ctdb->client, pnn_list, count,
4334                                         TIMEOUT(), &request, NULL, NULL);
4335         if (ret != 0) {
4336                 goto failed;
4337         }
4338
4339         printf("Database %s restored\n", db_name);
4340         close(fd);
4341         return 0;
4342
4343
4344 failed:
4345         close(fd);
4346         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4347                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4348         return ret;
4349 }
4350
4351 struct dumpdbbackup_state {
4352         ctdb_rec_parser_func_t parser;
4353         struct dump_record_state sub_state;
4354 };
4355
4356 static int dumpdbbackup_handler(uint32_t reqid,
4357                                 struct ctdb_ltdb_header *header,
4358                                 TDB_DATA key, TDB_DATA data,
4359                                 void *private_data)
4360 {
4361         struct dumpdbbackup_state *state =
4362                 (struct dumpdbbackup_state *)private_data;
4363         struct ctdb_ltdb_header hdr;
4364         int ret;
4365
4366         ret = ctdb_ltdb_header_extract(&data, &hdr);
4367         if (ret != 0) {
4368                 return ret;
4369         }
4370
4371         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4372 }
4373
4374 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4375                                 int argc, const char **argv)
4376 {
4377         struct db_header db_hdr;
4378         char timebuf[128];
4379         struct dumpdbbackup_state state;
4380         ssize_t n;
4381         int fd, ret, i;
4382
4383         if (argc != 1) {
4384                 usage("dumpbackup");
4385         }
4386
4387         fd = open(argv[0], O_RDONLY, 0600);
4388         if (fd == -1) {
4389                 ret = errno;
4390                 fprintf(stderr, "Failed to open file %s for reading\n",
4391                         argv[0]);
4392                 return ret;
4393         }
4394
4395         n = read(fd, &db_hdr, sizeof(struct db_header));
4396         if (n == -1) {
4397                 ret = errno;
4398                 close(fd);
4399                 fprintf(stderr, "Failed to read db header from file %s\n",
4400                         argv[0]);
4401                 return ret;
4402         }
4403         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4404
4405         if (db_hdr.version != DB_VERSION) {
4406                 fprintf(stderr,
4407                         "Wrong version of backup file, expected %u, got %lu\n",
4408                         DB_VERSION, db_hdr.version);
4409                 close(fd);
4410                 return EINVAL;
4411         }
4412
4413         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4414                  localtime(&db_hdr.timestamp));
4415         printf("Dumping database %s from backup @ %s\n",
4416                db_hdr.name, timebuf);
4417
4418         state.parser = dump_record;
4419         state.sub_state.count = 0;
4420
4421         for (i=0; i<db_hdr.nbuf; i++) {
4422                 struct ctdb_rec_buffer *recbuf;
4423
4424                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4425                 if (ret != 0) {
4426                         fprintf(stderr, "Failed to read records\n");
4427                         close(fd);
4428                         return ret;
4429                 }
4430
4431                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4432                                                &state);
4433                 if (ret != 0) {
4434                         fprintf(stderr, "Failed to dump records\n");
4435                         close(fd);
4436                         return ret;
4437                 }
4438         }
4439
4440         close(fd);
4441         printf("Dumped %u record(s)\n", state.sub_state.count);
4442         return 0;
4443 }
4444
4445 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4446                           int argc, const char **argv)
4447 {
4448         const char *db_name;
4449         struct ctdb_db_context *db;
4450         uint32_t db_id;
4451         uint8_t db_flags;
4452         struct ctdb_node_map *nodemap;
4453         struct ctdb_req_control request;
4454         struct ctdb_transdb wipedb;
4455         uint32_t generation;
4456         uint32_t *pnn_list;
4457         int count, ret;
4458
4459         if (argc != 1) {
4460                 usage("wipedb");
4461         }
4462
4463         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4464                 return 1;
4465         }
4466
4467         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4468                           db_flags, &db);
4469         if (ret != 0) {
4470                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4471                 return ret;
4472         }
4473
4474         nodemap = get_nodemap(ctdb, false);
4475         if (nodemap == NULL) {
4476                 fprintf(stderr, "Failed to get nodemap\n");
4477                 return ENOMEM;
4478         }
4479
4480         ret = get_generation(mem_ctx, ctdb, &generation);
4481         if (ret != 0) {
4482                 fprintf(stderr, "Failed to get current generation\n");
4483                 return ret;
4484         }
4485
4486         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4487                                      &pnn_list);
4488         if (count <= 0) {
4489                 return ENOMEM;
4490         }
4491
4492         ctdb_req_control_db_freeze(&request, db_id);
4493         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4494                                         ctdb->client, pnn_list, count,
4495                                         TIMEOUT(), &request, NULL, NULL);
4496         if (ret != 0) {
4497                 goto failed;
4498         }
4499
4500         wipedb.db_id = db_id;
4501         wipedb.tid = generation;
4502
4503         ctdb_req_control_db_transaction_start(&request, &wipedb);
4504         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4505                                         pnn_list, count, TIMEOUT(),
4506                                         &request, NULL, NULL);
4507         if (ret != 0) {
4508                 goto failed;
4509         }
4510
4511         ctdb_req_control_wipe_database(&request, &wipedb);
4512         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4513                                         pnn_list, count, TIMEOUT(),
4514                                         &request, NULL, NULL);
4515         if (ret != 0) {
4516                 goto failed;
4517         }
4518
4519         ctdb_req_control_db_set_healthy(&request, db_id);
4520         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4521                                         pnn_list, count, TIMEOUT(),
4522                                         &request, NULL, NULL);
4523         if (ret != 0) {
4524                 goto failed;
4525         }
4526
4527         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4528         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4529                                         pnn_list, count, TIMEOUT(),
4530                                         &request, NULL, NULL);
4531         if (ret != 0) {
4532                 goto failed;
4533         }
4534
4535         ctdb_req_control_db_thaw(&request, db_id);
4536         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4537                                         ctdb->client, pnn_list, count,
4538                                         TIMEOUT(), &request, NULL, NULL);
4539         if (ret != 0) {
4540                 goto failed;
4541         }
4542
4543         printf("Database %s wiped\n", db_name);
4544         return 0;
4545
4546
4547 failed:
4548         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4549                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4550         return ret;
4551 }
4552
4553 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4554                              int argc, const char **argv)
4555 {
4556         uint32_t recmaster;
4557         int ret;
4558
4559         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4560                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4561         if (ret != 0) {
4562                 return ret;
4563         }
4564
4565         printf("%u\n", recmaster);
4566         return 0;
4567 }
4568
4569 static int control_event(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4570                          int argc, const char **argv)
4571 {
4572         char *t, *event_helper = NULL;
4573         char *eventd_socket = NULL;
4574         const char **new_argv;
4575         int i;
4576
4577         t = getenv("CTDB_EVENT_HELPER");
4578         if (t != NULL) {
4579                 event_helper = talloc_strdup(mem_ctx, t);
4580         } else {
4581                 event_helper = talloc_asprintf(mem_ctx, "%s/ctdb_event",
4582                                                CTDB_HELPER_BINDIR);
4583         }
4584
4585         if (event_helper == NULL) {
4586                 fprintf(stderr, "Unable to set event daemon helper\n");
4587                 return 1;
4588         }
4589
4590         t = getenv("CTDB_SOCKET");
4591         if (t != NULL) {
4592                 eventd_socket = talloc_asprintf(mem_ctx, "%s/eventd.sock",
4593                                                 dirname(t));
4594         } else {
4595                 eventd_socket = talloc_asprintf(mem_ctx, "%s/eventd.sock",
4596                                                 CTDB_RUNDIR);
4597         }
4598
4599         if (eventd_socket == NULL) {
4600                 fprintf(stderr, "Unable to set event daemon socket\n");
4601                 return 1;
4602         }
4603
4604         new_argv = talloc_array(mem_ctx, const char *, argc + 1);
4605         if (new_argv == NULL) {
4606                 fprintf(stderr, "Memory allocation error\n");
4607                 return 1;
4608         }
4609
4610         new_argv[0] = eventd_socket;
4611         for (i=0; i<argc; i++) {
4612                 new_argv[i+1] = argv[i];
4613         }
4614
4615         return run_helper(mem_ctx, "event daemon helper", event_helper,
4616                           argc+1, new_argv);
4617 }
4618
4619 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4620                                 int argc, const char **argv)
4621 {
4622         const char *new_argv[3];
4623
4624         if (argc > 1) {
4625                 usage("scriptstatus");
4626         }
4627
4628         new_argv[0] = "status";
4629         new_argv[1] = (argc == 0) ? "monitor" : argv[0];
4630         new_argv[2] = NULL;
4631
4632         (void) control_event(mem_ctx, ctdb, 2, new_argv);
4633         return 0;
4634 }
4635
4636 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4637                          int argc, const char **argv)
4638 {
4639         char *t, *natgw_helper = NULL;
4640
4641         if (argc != 1) {
4642                 usage("natgw");
4643         }
4644
4645         t = getenv("CTDB_NATGW_HELPER");
4646         if (t != NULL) {
4647                 natgw_helper = talloc_strdup(mem_ctx, t);
4648         } else {
4649                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4650                                                CTDB_HELPER_BINDIR);
4651         }
4652
4653         if (natgw_helper == NULL) {
4654                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4655                 return 1;
4656         }
4657
4658         return run_helper(mem_ctx, "NAT gateway helper", natgw_helper,
4659                           argc, argv);
4660 }
4661
4662 /*
4663  * Find the PNN of the current node
4664  * discover the pnn by loading the nodes file and try to bind
4665  * to all addresses one at a time until the ip address is found.
4666  */
4667 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4668 {
4669         struct ctdb_node_map *nodemap;
4670         int i;
4671
4672         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4673         if (nodemap == NULL) {
4674                 return false;
4675         }
4676
4677         for (i=0; i<nodemap->num; i++) {
4678                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4679                         continue;
4680                 }
4681                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4682                         if (pnn != NULL) {
4683                                 *pnn = nodemap->node[i].pnn;
4684                         }
4685                         talloc_free(nodemap);
4686                         return true;
4687                 }
4688         }
4689
4690         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4691         talloc_free(nodemap);
4692         return false;
4693 }
4694
4695 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4696                               int argc, const char **argv)
4697 {
4698         const char *reclock;
4699         int ret;
4700
4701         if (argc != 0) {
4702                 usage("getreclock");
4703         }
4704
4705         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4706                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4707         if (ret != 0) {
4708                 return ret;
4709         }
4710
4711         if (reclock != NULL) {
4712                 printf("%s\n", reclock);
4713         }
4714
4715         return 0;
4716 }
4717
4718 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4719                                   struct ctdb_context *ctdb,
4720                                   int argc, const char **argv)
4721 {
4722         uint32_t lmasterrole = 0;
4723         int ret;
4724
4725         if (argc != 1) {
4726                 usage("setlmasterrole");
4727         }
4728
4729         if (strcmp(argv[0], "on") == 0) {
4730                 lmasterrole = 1;
4731         } else if (strcmp(argv[0], "off") == 0) {
4732                 lmasterrole = 0;
4733         } else {
4734                 usage("setlmasterrole");
4735         }
4736
4737         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4738                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4739         if (ret != 0) {
4740                 return ret;
4741         }
4742
4743         return 0;
4744 }
4745
4746 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4747                                     struct ctdb_context *ctdb,
4748                                     int argc, const char **argv)
4749 {
4750         uint32_t recmasterrole = 0;
4751         int ret;
4752
4753         if (argc != 1) {
4754                 usage("setrecmasterrole");
4755         }
4756
4757         if (strcmp(argv[0], "on") == 0) {
4758                 recmasterrole = 1;
4759         } else if (strcmp(argv[0], "off") == 0) {
4760                 recmasterrole = 0;
4761         } else {
4762                 usage("setrecmasterrole");
4763         }
4764
4765         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4766                                           ctdb->cmd_pnn, TIMEOUT(),
4767                                           recmasterrole);
4768         if (ret != 0) {
4769                 return ret;
4770         }
4771
4772         return 0;
4773 }
4774
4775 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
4776                                  struct ctdb_context *ctdb,
4777                                  int argc, const char **argv)
4778 {
4779         uint32_t db_id;
4780         uint8_t db_flags;
4781         int ret;
4782
4783         if (argc != 1) {
4784                 usage("setdbreadonly");
4785         }
4786
4787         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4788                 return 1;
4789         }
4790
4791         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
4792                 fprintf(stderr, "READONLY can be set only on volatile DB\n");
4793                 return 1;
4794         }
4795
4796         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
4797                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
4798         if (ret != 0) {
4799                 return ret;
4800         }
4801
4802         return 0;
4803 }
4804
4805 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4806                                int argc, const char **argv)
4807 {
4808         uint32_t db_id;
4809         uint8_t db_flags;
4810         int ret;
4811
4812         if (argc != 1) {
4813                 usage("setdbsticky");
4814         }
4815
4816         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4817                 return 1;
4818         }
4819
4820         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
4821                 fprintf(stderr, "STICKY can be set only on volatile DB\n");
4822                 return 1;
4823         }
4824
4825         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
4826                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
4827         if (ret != 0) {
4828                 return ret;
4829         }
4830
4831         return 0;
4832 }
4833
4834 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4835                           int argc, const char **argv)
4836 {
4837         const char *db_name;
4838         struct ctdb_db_context *db;
4839         struct ctdb_transaction_handle *h;
4840         uint8_t db_flags;
4841         TDB_DATA key, data;
4842         int ret;
4843
4844         if (argc < 2 || argc > 3) {
4845                 usage("pfetch");
4846         }
4847
4848         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4849                 return 1;
4850         }
4851
4852         if (! (db_flags &
4853                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4854                 fprintf(stderr, "Transactions not supported on DB %s\n",
4855                         db_name);
4856                 return 1;
4857         }
4858
4859         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4860                           db_flags, &db);
4861         if (ret != 0) {
4862                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4863                 return ret;
4864         }
4865
4866         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4867         if (ret != 0) {
4868                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4869                 return ret;
4870         }
4871
4872         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4873                                      TIMEOUT(), db, true, &h);
4874         if (ret != 0) {
4875                 fprintf(stderr, "Failed to start transaction on db %s\n",
4876                         db_name);
4877                 return ret;
4878         }
4879
4880         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
4881         if (ret != 0) {
4882                 fprintf(stderr, "Failed to read record for key %s\n",
4883                         argv[1]);
4884                 ctdb_transaction_cancel(h);
4885                 return ret;
4886         }
4887
4888         printf("%.*s\n", (int)data.dsize, data.dptr);
4889
4890         ctdb_transaction_cancel(h);
4891         return 0;
4892 }
4893
4894 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4895                           int argc, const char **argv)
4896 {
4897         const char *db_name;
4898         struct ctdb_db_context *db;
4899         struct ctdb_transaction_handle *h;
4900         uint8_t db_flags;
4901         TDB_DATA key, data;
4902         int ret;
4903
4904         if (argc != 3) {
4905                 usage("pstore");
4906         }
4907
4908         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4909                 return 1;
4910         }
4911
4912         if (! (db_flags &
4913                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4914                 fprintf(stderr, "Transactions not supported on DB %s\n",
4915                         db_name);
4916                 return 1;
4917         }
4918
4919         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4920                           db_flags, &db);
4921         if (ret != 0) {
4922                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4923                 return ret;
4924         }
4925
4926         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4927         if (ret != 0) {
4928                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4929                 return ret;
4930         }
4931
4932         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
4933         if (ret != 0) {
4934                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
4935                 return ret;
4936         }
4937
4938         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4939                                      TIMEOUT(), db, false, &h);
4940         if (ret != 0) {
4941                 fprintf(stderr, "Failed to start transaction on db %s\n",
4942                         db_name);
4943                 return ret;
4944         }
4945
4946         ret = ctdb_transaction_store_record(h, key, data);
4947         if (ret != 0) {
4948                 fprintf(stderr, "Failed to store record for key %s\n",
4949                         argv[1]);
4950                 ctdb_transaction_cancel(h);
4951                 return ret;
4952         }
4953
4954         ret = ctdb_transaction_commit(h);
4955         if (ret != 0) {
4956                 fprintf(stderr, "Failed to commit transaction on db %s\n",
4957                         db_name);
4958                 ctdb_transaction_cancel(h);
4959                 return ret;
4960         }
4961
4962         return 0;
4963 }
4964
4965 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4966                            int argc, const char **argv)
4967 {
4968         const char *db_name;
4969         struct ctdb_db_context *db;
4970         struct ctdb_transaction_handle *h;
4971         uint8_t db_flags;
4972         TDB_DATA key;
4973         int ret;
4974
4975         if (argc != 2) {
4976                 usage("pdelete");
4977         }
4978
4979         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4980                 return 1;
4981         }
4982
4983         if (! (db_flags &
4984                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4985                 fprintf(stderr, "Transactions not supported on DB %s\n",
4986                         db_name);
4987                 return 1;
4988         }
4989
4990         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4991                           db_flags, &db);
4992         if (ret != 0) {
4993                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4994                 return ret;
4995         }
4996
4997         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4998         if (ret != 0) {
4999                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5000                 return ret;
5001         }
5002
5003         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5004                                      TIMEOUT(), db, false, &h);
5005         if (ret != 0) {
5006                 fprintf(stderr, "Failed to start transaction on db %s\n",
5007                         db_name);
5008                 return ret;
5009         }
5010
5011         ret = ctdb_transaction_delete_record(h, key);
5012         if (ret != 0) {
5013                 fprintf(stderr, "Failed to delete record for key %s\n",
5014                         argv[1]);
5015                 ctdb_transaction_cancel(h);
5016                 return ret;
5017         }
5018
5019         ret = ctdb_transaction_commit(h);
5020         if (ret != 0) {
5021                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5022                         db_name);
5023                 ctdb_transaction_cancel(h);
5024                 return ret;
5025         }
5026
5027         return 0;
5028 }
5029
5030 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
5031 {
5032         const char *t;
5033         size_t n;
5034         int ret;
5035
5036         *data = tdb_null;
5037
5038         /* Skip whitespace */
5039         n = strspn(*ptr, " \t");
5040         t = *ptr + n;
5041
5042         if (t[0] == '"') {
5043                 /* Quoted ASCII string - no wide characters! */
5044                 t++;
5045                 n = strcspn(t, "\"");
5046                 if (t[n] == '"') {
5047                         if (n > 0) {
5048                                 ret = str_to_data(t, n, mem_ctx, data);
5049                                 if (ret != 0) {
5050                                         return ret;
5051                                 }
5052                         }
5053                         *ptr = t + n + 1;
5054                 } else {
5055                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5056                         return 1;
5057                 }
5058         } else {
5059                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5060                 return 1;
5061         }
5062
5063         return 0;
5064 }
5065
5066 #define MAX_LINE_SIZE   1024
5067
5068 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5069                                  TDB_DATA *key, TDB_DATA *value)
5070 {
5071         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5072         const char *ptr;
5073         int ret;
5074
5075         ptr = fgets(line, MAX_LINE_SIZE, file);
5076         if (ptr == NULL) {
5077                 return false;
5078         }
5079
5080         /* Get key */
5081         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5082         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5083                 /* Line Ignored but not EOF */
5084                 *key = tdb_null;
5085                 return true;
5086         }
5087
5088         /* Get value */
5089         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5090         if (ret != 0) {
5091                 /* Line Ignored but not EOF */
5092                 talloc_free(key->dptr);
5093                 *key = tdb_null;
5094                 return true;
5095         }
5096
5097         return true;
5098 }
5099
5100 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5101                           int argc, const char **argv)
5102 {
5103         const char *db_name;
5104         struct ctdb_db_context *db;
5105         struct ctdb_transaction_handle *h;
5106         uint8_t db_flags;
5107         FILE *file;
5108         TDB_DATA key, value;
5109         int ret;
5110
5111         if (argc < 1 || argc > 2) {
5112                 usage("ptrans");
5113         }
5114
5115         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5116                 return 1;
5117         }
5118
5119         if (! (db_flags &
5120                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
5121                 fprintf(stderr, "Transactions not supported on DB %s\n",
5122                         db_name);
5123                 return 1;
5124         }
5125
5126         if (argc == 2) {
5127                 file = fopen(argv[1], "r");
5128                 if (file == NULL) {
5129                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5130                         return 1;
5131                 }
5132         } else {
5133                 file = stdin;
5134         }
5135
5136         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5137                           db_flags, &db);
5138         if (ret != 0) {
5139                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5140                 goto done;
5141         }
5142
5143         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5144                                      TIMEOUT(), db, false, &h);
5145         if (ret != 0) {
5146                 fprintf(stderr, "Failed to start transaction on db %s\n",
5147                         db_name);
5148                 goto done;
5149         }
5150
5151         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5152                 if (key.dsize != 0) {
5153                         ret = ctdb_transaction_store_record(h, key, value);
5154                         if (ret != 0) {
5155                                 fprintf(stderr, "Failed to store record\n");
5156                                 ctdb_transaction_cancel(h);
5157                                 goto done;
5158                         }
5159                         talloc_free(key.dptr);
5160                         talloc_free(value.dptr);
5161                 }
5162         }
5163
5164         ret = ctdb_transaction_commit(h);
5165         if (ret != 0) {
5166                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5167                         db_name);
5168                 ctdb_transaction_cancel(h);
5169         }
5170
5171 done:
5172         if (file != stdin) {
5173                 fclose(file);
5174         }
5175         return ret;
5176 }
5177
5178 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5179                           int argc, const char **argv)
5180 {
5181         struct tdb_context *tdb;
5182         TDB_DATA key, data;
5183         struct ctdb_ltdb_header header;
5184         int ret;
5185
5186         if (argc < 2 || argc > 3) {
5187                 usage("tfetch");
5188         }
5189
5190         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5191         if (tdb == NULL) {
5192                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5193                 return 1;
5194         }
5195
5196         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5197         if (ret != 0) {
5198                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5199                 tdb_close(tdb);
5200                 return ret;
5201         }
5202
5203         data = tdb_fetch(tdb, key);
5204         if (data.dptr == NULL) {
5205                 fprintf(stderr, "No record for key %s\n", argv[1]);
5206                 tdb_close(tdb);
5207                 return 1;
5208         }
5209
5210         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5211                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5212                 tdb_close(tdb);
5213                 return 1;
5214         }
5215
5216         tdb_close(tdb);
5217
5218         if (argc == 3) {
5219                 int fd;
5220                 ssize_t nwritten;
5221
5222                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5223                 if (fd == -1) {
5224                         fprintf(stderr, "Failed to open output file %s\n",
5225                                 argv[2]);
5226                         goto fail;
5227                 }
5228
5229                 nwritten = sys_write(fd, data.dptr, data.dsize);
5230                 if (nwritten != data.dsize) {
5231                         fprintf(stderr, "Failed to write record to file\n");
5232                         close(fd);
5233                         goto fail;
5234                 }
5235
5236                 close(fd);
5237         }
5238
5239 fail:
5240         ret = ctdb_ltdb_header_extract(&data, &header);
5241         if (ret != 0) {
5242                 fprintf(stderr, "Failed to parse header from data\n");
5243                 return 1;
5244         }
5245
5246         dump_ltdb_header(&header);
5247         dump_tdb_data("data", data);
5248
5249         return 0;
5250 }
5251
5252 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5253                           int argc, const char **argv)
5254 {
5255         struct tdb_context *tdb;
5256         TDB_DATA key, data[2], value;
5257         struct ctdb_ltdb_header header;
5258         uint8_t header_buf[sizeof(struct ctdb_ltdb_header)];
5259         size_t np;
5260         int ret;
5261
5262         if (argc < 3 || argc > 5) {
5263                 usage("tstore");
5264         }
5265
5266         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5267         if (tdb == NULL) {
5268                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5269                 return 1;
5270         }
5271
5272         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5273         if (ret != 0) {
5274                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5275                 tdb_close(tdb);
5276                 return ret;
5277         }
5278
5279         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5280         if (ret != 0) {
5281                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5282                 tdb_close(tdb);
5283                 return ret;
5284         }
5285
5286         ZERO_STRUCT(header);
5287
5288         if (argc > 3) {
5289                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5290         }
5291         if (argc > 4) {
5292                 header.dmaster = (uint32_t)atol(argv[4]);
5293         }
5294         if (argc > 5) {
5295                 header.flags = (uint32_t)atol(argv[5]);
5296         }
5297
5298         ctdb_ltdb_header_push(&header, header_buf, &np);
5299
5300         data[0].dsize = np;
5301         data[0].dptr = header_buf;
5302
5303         data[1].dsize = value.dsize;
5304         data[1].dptr = value.dptr;
5305
5306         ret = tdb_storev(tdb, key, data, 2, TDB_REPLACE);
5307         if (ret != 0) {
5308                 fprintf(stderr, "Failed to write record %s to file %s\n",
5309                         argv[1], argv[0]);
5310         }
5311
5312         tdb_close(tdb);
5313
5314         return ret;
5315 }
5316
5317 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5318                            int argc, const char **argv)
5319 {
5320         const char *db_name;
5321         struct ctdb_db_context *db;
5322         struct ctdb_record_handle *h;
5323         uint8_t db_flags;
5324         TDB_DATA key, data;
5325         bool readonly = false;
5326         int ret;
5327
5328         if (argc < 2 || argc > 3) {
5329                 usage("readkey");
5330         }
5331
5332         if (argc == 3) {
5333                 if (strcmp(argv[2], "readonly") == 0) {
5334                         readonly = true;
5335                 } else {
5336                         usage("readkey");
5337                 }
5338         }
5339
5340         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5341                 return 1;
5342         }
5343
5344         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5345                 fprintf(stderr, "DB %s is not a volatile database\n",
5346                         db_name);
5347                 return 1;
5348         }
5349
5350         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5351                           db_flags, &db);
5352         if (ret != 0) {
5353                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5354                 return ret;
5355         }
5356
5357         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5358         if (ret != 0) {
5359                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5360                 return ret;
5361         }
5362
5363         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5364                               db, key, readonly, &h, NULL, &data);
5365         if (ret != 0) {
5366                 fprintf(stderr, "Failed to read record for key %s\n",
5367                         argv[1]);
5368         } else {
5369                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5370                        (int)data.dsize, data.dptr);
5371         }
5372
5373         talloc_free(h);
5374         return ret;
5375 }
5376
5377 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5378                             int argc, const char **argv)
5379 {
5380         const char *db_name;
5381         struct ctdb_db_context *db;
5382         struct ctdb_record_handle *h;
5383         uint8_t db_flags;
5384         TDB_DATA key, data;
5385         int ret;
5386
5387         if (argc != 3) {
5388                 usage("writekey");
5389         }
5390
5391         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5392                 return 1;
5393         }
5394
5395         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5396                 fprintf(stderr, "DB %s is not a volatile database\n",
5397                         db_name);
5398                 return 1;
5399         }
5400
5401         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5402                           db_flags, &db);
5403         if (ret != 0) {
5404                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5405                 return ret;
5406         }
5407
5408         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5409         if (ret != 0) {
5410                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5411                 return ret;
5412         }
5413
5414         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5415         if (ret != 0) {
5416                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5417                 return ret;
5418         }
5419
5420         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5421                               db, key, false, &h, NULL, NULL);
5422         if (ret != 0) {
5423                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5424                 return ret;
5425         }
5426
5427         ret = ctdb_store_record(h, data);
5428         if (ret != 0) {
5429                 fprintf(stderr, "Failed to store record for key %s\n",
5430                         argv[1]);
5431         }
5432
5433         talloc_free(h);
5434         return ret;
5435 }
5436
5437 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5438                              int argc, const char **argv)
5439 {
5440         const char *db_name;
5441         struct ctdb_db_context *db;
5442         struct ctdb_record_handle *h;
5443         uint8_t db_flags;
5444         TDB_DATA key, data;
5445         int ret;
5446
5447         if (argc != 2) {
5448                 usage("deletekey");
5449         }
5450
5451         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5452                 return 1;
5453         }
5454
5455         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5456                 fprintf(stderr, "DB %s is not a volatile database\n",
5457                         db_name);
5458                 return 1;
5459         }
5460
5461         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5462                           db_flags, &db);
5463         if (ret != 0) {
5464                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5465                 return ret;
5466         }
5467
5468         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5469         if (ret != 0) {
5470                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5471                 return ret;
5472         }
5473
5474         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5475                               db, key, false, &h, NULL, &data);
5476         if (ret != 0) {
5477                 fprintf(stderr, "Failed to fetch record for key %s\n",
5478                         argv[1]);
5479                 return ret;
5480         }
5481
5482         ret = ctdb_delete_record(h);
5483         if (ret != 0) {
5484                 fprintf(stderr, "Failed to delete record for key %s\n",
5485                         argv[1]);
5486         }
5487
5488         talloc_free(h);
5489         return ret;
5490 }
5491
5492 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5493                                 int argc, const char **argv)
5494 {
5495         struct sockaddr_in sin;
5496         unsigned int port;
5497         int s, v;
5498         int ret;
5499
5500         if (argc != 1) {
5501                 usage("chktcpport");
5502         }
5503
5504         port = atoi(argv[0]);
5505
5506         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5507         if (s == -1) {
5508                 fprintf(stderr, "Failed to open local socket\n");
5509                 return errno;
5510         }
5511
5512         v = fcntl(s, F_GETFL, 0);
5513         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5514                 fprintf(stderr, "Unable to set socket non-blocking\n");
5515                 close(s);
5516                 return errno;
5517         }
5518
5519         bzero(&sin, sizeof(sin));
5520         sin.sin_family = AF_INET;
5521         sin.sin_port = htons(port);
5522         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5523         close(s);
5524         if (ret == -1) {
5525                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5526                 return errno;
5527         }
5528
5529         return 0;
5530 }
5531
5532 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5533                                int argc, const char **argv)
5534 {
5535         uint32_t db_id;
5536         const char *db_name;
5537         uint64_t seqnum;
5538         int ret;
5539
5540         if (argc != 1) {
5541                 usage("getdbseqnum");
5542         }
5543
5544         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5545                 return 1;
5546         }
5547
5548         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5549                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5550                                       &seqnum);
5551         if (ret != 0) {
5552                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5553                         db_name);
5554                 return ret;
5555         }
5556
5557         printf("0x%"PRIx64"\n", seqnum);
5558         return 0;
5559 }
5560
5561 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5562                               int argc, const char **argv)
5563 {
5564         const char *nodestring = NULL;
5565         struct ctdb_node_map *nodemap;
5566         int ret, i;
5567         bool print_hdr = false;
5568
5569         if (argc > 1) {
5570                 usage("nodestatus");
5571         }
5572
5573         if (argc == 1) {
5574                 nodestring = argv[0];
5575                 if (strcmp(nodestring, "all") == 0) {
5576                         print_hdr = true;
5577                 }
5578         }
5579
5580         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5581                 return 1;
5582         }
5583
5584         if (options.machinereadable) {
5585                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5586         } else {
5587                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, print_hdr);
5588         }
5589
5590         ret = 0;
5591         for (i=0; i<nodemap->num; i++) {
5592                 ret |= nodemap->node[i].flags;
5593         }
5594
5595         return ret;
5596 }
5597
5598 const struct {
5599         const char *name;
5600         uint32_t offset;
5601 } db_stats_fields[] = {
5602 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5603         DBSTATISTICS_FIELD(db_ro_delegations),
5604         DBSTATISTICS_FIELD(db_ro_revokes),
5605         DBSTATISTICS_FIELD(locks.num_calls),
5606         DBSTATISTICS_FIELD(locks.num_current),
5607         DBSTATISTICS_FIELD(locks.num_pending),
5608         DBSTATISTICS_FIELD(locks.num_failed),
5609 };
5610
5611 static void print_dbstatistics(const char *db_name,
5612                                struct ctdb_db_statistics *s)
5613 {
5614         int i;
5615         const char *prefix = NULL;
5616         int preflen = 0;
5617
5618         printf("DB Statistics %s\n", db_name);
5619
5620         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5621                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5622                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5623                         if (! prefix ||
5624                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5625                                 prefix = db_stats_fields[i].name;
5626                                 printf(" %*.*s\n", preflen-1, preflen-1,
5627                                        db_stats_fields[i].name);
5628                         }
5629                 } else {
5630                         preflen = 0;
5631                 }
5632                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5633                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5634                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5635         }
5636
5637         printf(" hop_count_buckets:");
5638         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5639                 printf(" %d", s->hop_count_bucket[i]);
5640         }
5641         printf("\n");
5642
5643         printf(" lock_buckets:");
5644         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5645                 printf(" %d", s->locks.buckets[i]);
5646         }
5647         printf("\n");
5648
5649         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5650                "locks_latency      MIN/AVG/MAX",
5651                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5652                s->locks.latency.max, s->locks.latency.num);
5653
5654         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5655                "vacuum_latency     MIN/AVG/MAX",
5656                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5657                s->vacuum.latency.max, s->vacuum.latency.num);
5658
5659         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5660         for (i=0; i<s->num_hot_keys; i++) {
5661                 int j;
5662                 printf("     Count:%d Key:", s->hot_keys[i].count);
5663                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5664                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5665                 }
5666                 printf("\n");
5667         }
5668 }
5669
5670 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5671                                 int argc, const char **argv)
5672 {
5673         uint32_t db_id;
5674         const char *db_name;
5675         struct ctdb_db_statistics *dbstats;
5676         int ret;
5677
5678         if (argc != 1) {
5679                 usage("dbstatistics");
5680         }
5681
5682         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5683                 return 1;
5684         }
5685
5686         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5687                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5688                                           &dbstats);
5689         if (ret != 0) {
5690                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5691                         db_name);
5692                 return ret;
5693         }
5694
5695         print_dbstatistics(db_name, dbstats);
5696         return 0;
5697 }
5698
5699 struct disable_takeover_runs_state {
5700         uint32_t *pnn_list;
5701         int node_count;
5702         bool *reply;
5703         int status;
5704         bool done;
5705 };
5706
5707 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5708                                          void *private_data)
5709 {
5710         struct disable_takeover_runs_state *state =
5711                 (struct disable_takeover_runs_state *)private_data;
5712         int ret, i;
5713
5714         if (data.dsize != sizeof(int)) {
5715                 /* Ignore packet */
5716                 return;
5717         }
5718
5719         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5720         ret = *(int *)data.dptr;
5721         if (ret < 0) {
5722                 state->status = ret;
5723                 state->done = true;
5724                 return;
5725         }
5726         for (i=0; i<state->node_count; i++) {
5727                 if (state->pnn_list[i] == ret) {
5728                         state->reply[i] = true;
5729                         break;
5730                 }
5731         }
5732
5733         state->done = true;
5734         for (i=0; i<state->node_count; i++) {
5735                 if (! state->reply[i]) {
5736                         state->done = false;
5737                         break;
5738                 }
5739         }
5740 }
5741
5742 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5743                                  struct ctdb_context *ctdb, uint32_t timeout,
5744                                  uint32_t *pnn_list, int count)
5745 {
5746         struct ctdb_disable_message disable = { 0 };
5747         struct disable_takeover_runs_state state;
5748         int ret, i;
5749
5750         disable.pnn = ctdb->pnn;
5751         disable.srvid = next_srvid(ctdb);
5752         disable.timeout = timeout;
5753
5754         state.pnn_list = pnn_list;
5755         state.node_count = count;
5756         state.done = false;
5757         state.status = 0;
5758         state.reply = talloc_zero_array(mem_ctx, bool, count);
5759         if (state.reply == NULL) {
5760                 return ENOMEM;
5761         }
5762
5763         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
5764                                               disable.srvid,
5765                                               disable_takeover_run_handler,
5766                                               &state);
5767         if (ret != 0) {
5768                 return ret;
5769         }
5770
5771         for (i=0; i<count; i++) {
5772                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
5773                                                          ctdb->client,
5774                                                          pnn_list[i],
5775                                                          &disable);
5776                 if (ret != 0) {
5777                         goto fail;
5778                 }
5779         }
5780
5781         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
5782         if (ret == ETIME) {
5783                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
5784         } else {
5785                 ret = (state.status >= 0 ? 0 : 1);
5786         }
5787
5788 fail:
5789         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
5790                                            disable.srvid, &state);
5791         return ret;
5792 }
5793
5794 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5795                              int argc, const char **argv)
5796 {
5797         const char *nodestring = NULL;
5798         struct ctdb_node_map *nodemap, *nodemap2;
5799         struct ctdb_req_control request;
5800         uint32_t *pnn_list, *pnn_list2;
5801         int ret, count, count2;
5802
5803         if (argc > 1) {
5804                 usage("reloadips");
5805         }
5806
5807         if (argc == 1) {
5808                 nodestring = argv[0];
5809         }
5810
5811         nodemap = get_nodemap(ctdb, false);
5812         if (nodemap == NULL) {
5813                 return 1;
5814         }
5815
5816         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
5817                 return 1;
5818         }
5819
5820         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
5821                                         mem_ctx, &pnn_list);
5822         if (count <= 0) {
5823                 fprintf(stderr, "Memory allocation error\n");
5824                 return 1;
5825         }
5826
5827         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
5828                                       mem_ctx, &pnn_list2);
5829         if (count2 <= 0) {
5830                 fprintf(stderr, "Memory allocation error\n");
5831                 return 1;
5832         }
5833
5834         /* Disable takeover runs on all connected nodes.  A reply
5835          * indicating success is needed from each node so all nodes
5836          * will need to be active.
5837          *
5838          * A check could be added to not allow reloading of IPs when
5839          * there are disconnected nodes.  However, this should
5840          * probably be left up to the administrator.
5841          */
5842         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
5843                                     pnn_list, count);
5844         if (ret != 0) {
5845                 fprintf(stderr, "Failed to disable takeover runs\n");
5846                 return ret;
5847         }
5848
5849         /* Now tell all the desired nodes to reload their public IPs.
5850          * Keep trying this until it succeeds.  This assumes all
5851          * failures are transient, which might not be true...
5852          */
5853         ctdb_req_control_reload_public_ips(&request);
5854         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
5855                                         pnn_list2, count2, TIMEOUT(),
5856                                         &request, NULL, NULL);
5857         if (ret != 0) {
5858                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
5859         }
5860
5861         /* It isn't strictly necessary to wait until takeover runs are
5862          * re-enabled but doing so can't hurt.
5863          */
5864         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
5865         if (ret != 0) {
5866                 fprintf(stderr, "Failed to enable takeover runs\n");
5867                 return ret;
5868         }
5869
5870         return ipreallocate(mem_ctx, ctdb);
5871 }
5872
5873 static int control_ipiface(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5874                            int argc, const char **argv)
5875 {
5876         ctdb_sock_addr addr;
5877         char *iface;
5878         int ret;
5879
5880         if (argc != 1) {
5881                 usage("ipiface");
5882         }
5883
5884         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
5885         if (ret != 0) {
5886                 fprintf(stderr, "Failed to Parse IP %s\n", argv[0]);
5887                 return 1;
5888         }
5889
5890         iface = ctdb_sys_find_ifname(&addr);
5891         if (iface == NULL) {
5892                 fprintf(stderr, "Failed to find interface for IP %s\n",
5893                         argv[0]);
5894                 return 1;
5895         }
5896         free(iface);
5897
5898         return 0;
5899 }
5900
5901
5902 static const struct ctdb_cmd {
5903         const char *name;
5904         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
5905         bool without_daemon; /* can be run without daemon running ? */
5906         bool remote; /* can be run on remote nodes */
5907         const char *msg;
5908         const char *args;
5909 } ctdb_commands[] = {
5910         { "version", control_version, true, false,
5911                 "show version of ctdb", NULL },
5912         { "status", control_status, false, true,
5913                 "show node status", NULL },
5914         { "uptime", control_uptime, false, true,
5915                 "show node uptime", NULL },
5916         { "ping", control_ping, false, true,
5917                 "ping all nodes", NULL },
5918         { "runstate", control_runstate, false, true,
5919                 "get/check runstate of a node",
5920                 "[setup|first_recovery|startup|running]" },
5921         { "getvar", control_getvar, false, true,
5922                 "get a tunable variable", "<name>" },
5923         { "setvar", control_setvar, false, true,
5924                 "set a tunable variable", "<name> <value>" },
5925         { "listvars", control_listvars, false, true,
5926                 "list tunable variables", NULL },
5927         { "statistics", control_statistics, false, true,
5928                 "show ctdb statistics", NULL },
5929         { "statisticsreset", control_statistics_reset, false, true,
5930                 "reset ctdb statistics", NULL },
5931         { "stats", control_stats, false, true,
5932                 "show rolling statistics", "[count]" },
5933         { "ip", control_ip, false, true,
5934                 "show public ips", "[all]" },
5935         { "ipinfo", control_ipinfo, false, true,
5936                 "show public ip details", "<ip>" },
5937         { "ifaces", control_ifaces, false, true,
5938                 "show interfaces", NULL },
5939         { "setifacelink", control_setifacelink, false, true,
5940                 "set interface link status", "<iface> up|down" },
5941         { "process-exists", control_process_exists, false, true,
5942                 "check if a process exists on a node",  "<pid> [<srvid>]" },
5943         { "getdbmap", control_getdbmap, false, true,
5944                 "show attached databases", NULL },
5945         { "getdbstatus", control_getdbstatus, false, true,
5946                 "show database status", "<dbname|dbid>" },
5947         { "catdb", control_catdb, false, false,
5948                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
5949         { "cattdb", control_cattdb, false, false,
5950                 "dump local ctdb database", "<dbname|dbid>" },
5951         { "getcapabilities", control_getcapabilities, false, true,
5952                 "show node capabilities", NULL },
5953         { "pnn", control_pnn, false, false,
5954                 "show the pnn of the currnet node", NULL },
5955         { "lvs", control_lvs, false, false,
5956                 "show lvs configuration", "master|list|status" },
5957         { "setdebug", control_setdebug, false, true,
5958                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
5959         { "getdebug", control_getdebug, false, true,
5960                 "get debug level", NULL },
5961         { "attach", control_attach, false, false,
5962                 "attach a database", "<dbname> [persistent|replicated]" },
5963         { "detach", control_detach, false, false,
5964                 "detach database(s)", "<dbname|dbid> ..." },
5965         { "dumpmemory", control_dumpmemory, false, true,
5966                 "dump ctdbd memory map", NULL },
5967         { "rddumpmemory", control_rddumpmemory, false, true,
5968                 "dump recoverd memory map", NULL },
5969         { "getpid", control_getpid, false, true,
5970                 "get ctdbd process ID", NULL },
5971         { "disable", control_disable, false, true,
5972                 "disable a node", NULL },
5973         { "enable", control_enable, false, true,
5974                 "enable a node", NULL },
5975         { "stop", control_stop, false, true,
5976                 "stop a node", NULL },
5977         { "continue", control_continue, false, true,
5978                 "continue a stopped node", NULL },
5979         { "ban", control_ban, false, true,
5980                 "ban a node", "<bantime>"},
5981         { "unban", control_unban, false, true,
5982                 "unban a node", NULL },
5983         { "shutdown", control_shutdown, false, true,
5984                 "shutdown ctdb daemon", NULL },
5985         { "recover", control_recover, false, true,
5986                 "force recovery", NULL },
5987         { "sync", control_ipreallocate, false, true,
5988                 "run ip reallocation (deprecated)", NULL },
5989         { "ipreallocate", control_ipreallocate, false, true,
5990                 "run ip reallocation", NULL },
5991         { "isnotrecmaster", control_isnotrecmaster, false, false,
5992                 "check if local node is the recmaster", NULL },
5993         { "gratarp", control_gratarp, false, true,
5994                 "send a gratuitous arp", "<ip> <interface>" },
5995         { "tickle", control_tickle, true, false,
5996                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
5997         { "gettickles", control_gettickles, false, true,
5998                 "get the list of tickles", "<ip> [<port>]" },
5999         { "addtickle", control_addtickle, false, true,
6000                 "add a tickle", "<ip>:<port> <ip>:<port>" },
6001         { "deltickle", control_deltickle, false, true,
6002                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
6003         { "listnodes", control_listnodes, true, true,
6004                 "list nodes in the cluster", NULL },
6005         { "reloadnodes", control_reloadnodes, false, false,
6006                 "reload the nodes file all nodes", NULL },
6007         { "moveip", control_moveip, false, false,
6008                 "move an ip address to another node", "<ip> <node>" },
6009         { "addip", control_addip, false, true,
6010                 "add an ip address to a node", "<ip/mask> <iface>" },
6011         { "delip", control_delip, false, true,
6012                 "delete an ip address from a node", "<ip>" },
6013         { "backupdb", control_backupdb, false, false,
6014                 "backup a database into a file", "<dbname|dbid> <file>" },
6015         { "restoredb", control_restoredb, false, false,
6016                 "restore a database from a file", "<file> [dbname]" },
6017         { "dumpdbbackup", control_dumpdbbackup, true, false,
6018                 "dump database from a backup file", "<file>" },
6019         { "wipedb", control_wipedb, false, false,
6020                 "wipe the contents of a database.", "<dbname|dbid>"},
6021         { "recmaster", control_recmaster, false, true,
6022                 "show the pnn for the recovery master", NULL },
6023         { "event", control_event, true, false,
6024                 "event and event script commands", NULL },
6025         { "scriptstatus", control_scriptstatus, true, false,
6026                 "show event script status",
6027                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
6028         { "natgw", control_natgw, false, false,
6029                 "show natgw configuration", "master|list|status" },
6030         { "getreclock", control_getreclock, false, true,
6031                 "get recovery lock file", NULL },
6032         { "setlmasterrole", control_setlmasterrole, false, true,
6033                 "set LMASTER role", "on|off" },
6034         { "setrecmasterrole", control_setrecmasterrole, false, true,
6035                 "set RECMASTER role", "on|off"},
6036         { "setdbreadonly", control_setdbreadonly, false, true,
6037                 "enable readonly records", "<dbname|dbid>" },
6038         { "setdbsticky", control_setdbsticky, false, true,
6039                 "enable sticky records", "<dbname|dbid>"},
6040         { "pfetch", control_pfetch, false, false,
6041                 "fetch record from persistent database", "<dbname|dbid> <key> [<file>]" },
6042         { "pstore", control_pstore, false, false,
6043                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
6044         { "pdelete", control_pdelete, false, false,
6045                 "delete record from persistent database", "<dbname|dbid> <key>" },
6046         { "ptrans", control_ptrans, false, false,
6047                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
6048         { "tfetch", control_tfetch, false, true,
6049                 "fetch a record", "<tdb-file> <key> [<file>]" },
6050         { "tstore", control_tstore, false, true,
6051                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
6052         { "readkey", control_readkey, false, false,
6053                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
6054         { "writekey", control_writekey, false, false,
6055                 "write value for a database key", "<dbname|dbid> <key> <value>" },
6056         { "deletekey", control_deletekey, false, false,
6057                 "delete a database key", "<dbname|dbid> <key>" },
6058         { "checktcpport", control_checktcpport, true, false,
6059                 "check if a service is bound to a specific tcp port or not", "<port>" },
6060         { "getdbseqnum", control_getdbseqnum, false, false,
6061                 "get database sequence number", "<dbname|dbid>" },
6062         { "nodestatus", control_nodestatus, false, true,
6063                 "show and return node status", "[all|<pnn-list>]" },
6064         { "dbstatistics", control_dbstatistics, false, true,
6065                 "show database statistics", "<dbname|dbid>" },
6066         { "reloadips", control_reloadips, false, false,
6067                 "reload the public addresses file", "[all|<pnn-list>]" },
6068         { "ipiface", control_ipiface, true, false,
6069                 "Find the interface an ip address is hosted on", "<ip>" },
6070 };
6071
6072 static const struct ctdb_cmd *match_command(const char *command)
6073 {
6074         const struct ctdb_cmd *cmd;
6075         int i;
6076
6077         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6078                 cmd = &ctdb_commands[i];
6079                 if (strlen(command) == strlen(cmd->name) &&
6080                     strncmp(command, cmd->name, strlen(command)) == 0) {
6081                         return cmd;
6082                 }
6083         }
6084
6085         return NULL;
6086 }
6087
6088
6089 /**
6090  * Show usage message
6091  */
6092 static void usage_full(void)
6093 {
6094         int i;
6095
6096         poptPrintHelp(pc, stdout, 0);
6097         printf("\nCommands:\n");
6098         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6099                 printf("  %-15s %-27s  %s\n",
6100                        ctdb_commands[i].name,
6101                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6102                        ctdb_commands[i].msg);
6103         }
6104 }
6105
6106 static void usage(const char *command)
6107 {
6108         const struct ctdb_cmd *cmd;
6109
6110         if (command == NULL) {
6111                 usage_full();
6112                 exit(1);
6113         }
6114
6115         cmd = match_command(command);
6116         if (cmd == NULL) {
6117                 usage_full();
6118         } else {
6119                 poptPrintUsage(pc, stdout, 0);
6120                 printf("\nCommands:\n");
6121                 printf("  %-15s %-27s  %s\n",
6122                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6123         }
6124
6125         exit(1);
6126 }
6127
6128 struct poptOption cmdline_options[] = {
6129         POPT_AUTOHELP
6130         { "socket", 's', POPT_ARG_STRING, &options.socket, 0,
6131                 "CTDB socket path", "filename" },
6132         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6133                 "debug level"},
6134         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6135                 "timelimit (in seconds)" },
6136         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6137                 "node specification - integer" },
6138         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6139                 "enable machine readable output", NULL },
6140         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6141                 "specify separator for machine readable output", "CHAR" },
6142         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6143                 "enable machine parsable output with separator |", NULL },
6144         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6145                 "enable verbose output", NULL },
6146         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6147                 "die if runtime exceeds this limit (in seconds)" },
6148         POPT_TABLEEND
6149 };
6150
6151 static int process_command(const struct ctdb_cmd *cmd, int argc,
6152                            const char **argv)
6153 {
6154         TALLOC_CTX *tmp_ctx;
6155         struct ctdb_context *ctdb;
6156         int ret;
6157         bool status;
6158         uint64_t srvid_offset;
6159
6160         tmp_ctx = talloc_new(NULL);
6161         if (tmp_ctx == NULL) {
6162                 fprintf(stderr, "Memory allocation error\n");
6163                 goto fail;
6164         }
6165
6166         if (cmd->without_daemon) {
6167                 if (options.pnn != -1) {
6168                         fprintf(stderr,
6169                                 "Cannot specify node for command %s\n",
6170                                 cmd->name);
6171                         goto fail;
6172                 }
6173
6174                 ret = cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6175                 talloc_free(tmp_ctx);
6176                 return ret;
6177         }
6178
6179         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6180         if (ctdb == NULL) {
6181                 fprintf(stderr, "Memory allocation error\n");
6182                 goto fail;
6183         }
6184
6185         ctdb->ev = tevent_context_init(ctdb);
6186         if (ctdb->ev == NULL) {
6187                 fprintf(stderr, "Failed to initialize tevent\n");
6188                 goto fail;
6189         }
6190
6191         ret = ctdb_client_init(ctdb, ctdb->ev, options.socket, &ctdb->client);
6192         if (ret != 0) {
6193                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6194                         options.socket);
6195
6196                 if (!find_node_xpnn(ctdb, NULL)) {
6197                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6198                 }
6199                 goto fail;
6200         }
6201
6202         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6203         srvid_offset = getpid() & 0xFFFF;
6204         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6205
6206         if (options.pnn != -1) {
6207                 status = verify_pnn(ctdb, options.pnn);
6208                 if (! status) {
6209                         goto fail;
6210                 }
6211
6212                 ctdb->cmd_pnn = options.pnn;
6213         } else {
6214                 ctdb->cmd_pnn = ctdb->pnn;
6215         }
6216
6217         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6218                 fprintf(stderr, "Node cannot be specified for command %s\n",
6219                         cmd->name);
6220                 goto fail;
6221         }
6222
6223         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6224         talloc_free(tmp_ctx);
6225         return ret;
6226
6227 fail:
6228         talloc_free(tmp_ctx);
6229         return 1;
6230 }
6231
6232 static void signal_handler(int sig)
6233 {
6234         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6235 }
6236
6237 static void alarm_handler(int sig)
6238 {
6239         /* Kill any child processes */
6240         signal(SIGTERM, signal_handler);
6241         kill(0, SIGTERM);
6242
6243         _exit(1);
6244 }
6245
6246 int main(int argc, const char *argv[])
6247 {
6248         int opt;
6249         const char **extra_argv;
6250         int extra_argc;
6251         const struct ctdb_cmd *cmd;
6252         const char *ctdb_socket;
6253         int loglevel;
6254         int ret;
6255
6256         setlinebuf(stdout);
6257
6258         /* Set default options */
6259         options.socket = CTDB_SOCKET;
6260         options.debuglevelstr = NULL;
6261         options.timelimit = 10;
6262         options.sep = "|";
6263         options.maxruntime = 0;
6264         options.pnn = -1;
6265
6266         ctdb_socket = getenv("CTDB_SOCKET");
6267         if (ctdb_socket != NULL) {
6268                 options.socket = ctdb_socket;
6269         }
6270
6271         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6272                             POPT_CONTEXT_KEEP_FIRST);
6273         while ((opt = poptGetNextOpt(pc)) != -1) {
6274                 fprintf(stderr, "Invalid option %s: %s\n",
6275                         poptBadOption(pc, 0), poptStrerror(opt));
6276                 exit(1);
6277         }
6278
6279         if (options.maxruntime == 0) {
6280                 const char *ctdb_timeout;
6281
6282                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6283                 if (ctdb_timeout != NULL) {
6284                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6285                 } else {
6286                         options.maxruntime = 120;
6287                 }
6288         }
6289         if (options.maxruntime <= 120) {
6290                 /* default timeout is 120 seconds */
6291                 options.maxruntime = 120;
6292         }
6293
6294         if (options.machineparsable) {
6295                 options.machinereadable = 1;
6296         }
6297
6298         /* setup the remaining options for the commands */
6299         extra_argc = 0;
6300         extra_argv = poptGetArgs(pc);
6301         if (extra_argv) {
6302                 extra_argv++;
6303                 while (extra_argv[extra_argc]) extra_argc++;
6304         }
6305
6306         if (extra_argc < 1) {
6307                 usage(NULL);
6308         }
6309
6310         cmd = match_command(extra_argv[0]);
6311         if (cmd == NULL) {
6312                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6313                 exit(1);
6314         }
6315
6316         /* Enable logging */
6317         setup_logging("ctdb", DEBUG_STDERR);
6318         if (debug_level_parse(options.debuglevelstr, &loglevel)) {
6319                 DEBUGLEVEL = loglevel;
6320         } else {
6321                 DEBUGLEVEL = DEBUG_ERR;
6322         }
6323
6324         signal(SIGALRM, alarm_handler);
6325         alarm(options.maxruntime);
6326
6327         ret = process_command(cmd, extra_argc, extra_argv);
6328         if (ret == -1) {
6329                 ret = 1;
6330         }
6331
6332         (void)poptFreeContext(pc);
6333
6334         return ret;
6335 }