ctdb: Print locks latency in machinereadable stats
[samba.git] / ctdb / tools / ctdb.c
1 /*
2    CTDB control tool
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/wait.h"
25 #include "system/dir.h"
26
27 #include <ctype.h>
28 #include <popt.h>
29 #include <talloc.h>
30 #include <tevent.h>
31 #include <tdb.h>
32
33 #include "common/version.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36 #include "lib/util/sys_rw.h"
37
38 #include "common/db_hash.h"
39 #include "common/logging.h"
40 #include "common/path.h"
41 #include "protocol/protocol.h"
42 #include "protocol/protocol_api.h"
43 #include "protocol/protocol_util.h"
44 #include "common/system_socket.h"
45 #include "client/client.h"
46 #include "client/client_sync.h"
47
48 #define TIMEOUT()       timeval_current_ofs(options.timelimit, 0)
49
50 #define SRVID_CTDB_TOOL    (CTDB_SRVID_TOOL_RANGE | 0x0001000000000000LL)
51 #define SRVID_CTDB_PUSHDB  (CTDB_SRVID_TOOL_RANGE | 0x0002000000000000LL)
52
53 static struct {
54         const char *debuglevelstr;
55         int timelimit;
56         int pnn;
57         int machinereadable;
58         const char *sep;
59         int machineparsable;
60         int verbose;
61         int maxruntime;
62         int printemptyrecords;
63         int printdatasize;
64         int printlmaster;
65         int printhash;
66         int printrecordflags;
67 } options;
68
69 static poptContext pc;
70
71 struct ctdb_context {
72         struct tevent_context *ev;
73         struct ctdb_client_context *client;
74         struct ctdb_node_map *nodemap;
75         uint32_t pnn, cmd_pnn;
76         uint64_t srvid;
77 };
78
79 static void usage(const char *command);
80
81 /*
82  * Utility Functions
83  */
84
85 static double timeval_delta(struct timeval *tv2, struct timeval *tv)
86 {
87         return (tv2->tv_sec - tv->tv_sec) +
88                (tv2->tv_usec - tv->tv_usec) * 1.0e-6;
89 }
90
91 static struct ctdb_node_and_flags *get_node_by_pnn(
92                                         struct ctdb_node_map *nodemap,
93                                         uint32_t pnn)
94 {
95         int i;
96
97         for (i=0; i<nodemap->num; i++) {
98                 if (nodemap->node[i].pnn == pnn) {
99                         return &nodemap->node[i];
100                 }
101         }
102         return NULL;
103 }
104
105 static const char *pretty_print_flags(TALLOC_CTX *mem_ctx, uint32_t flags)
106 {
107         static const struct {
108                 uint32_t flag;
109                 const char *name;
110         } flag_names[] = {
111                 { NODE_FLAGS_DISCONNECTED,          "DISCONNECTED" },
112                 { NODE_FLAGS_PERMANENTLY_DISABLED,  "DISABLED" },
113                 { NODE_FLAGS_BANNED,                "BANNED" },
114                 { NODE_FLAGS_UNHEALTHY,             "UNHEALTHY" },
115                 { NODE_FLAGS_DELETED,               "DELETED" },
116                 { NODE_FLAGS_STOPPED,               "STOPPED" },
117                 { NODE_FLAGS_INACTIVE,              "INACTIVE" },
118         };
119         char *flags_str = NULL;
120         int i;
121
122         for (i=0; i<ARRAY_SIZE(flag_names); i++) {
123                 if (flags & flag_names[i].flag) {
124                         if (flags_str == NULL) {
125                                 flags_str = talloc_asprintf(mem_ctx,
126                                                 "%s", flag_names[i].name);
127                         } else {
128                                 flags_str = talloc_asprintf_append(flags_str,
129                                                 "|%s", flag_names[i].name);
130                         }
131                         if (flags_str == NULL) {
132                                 return "OUT-OF-MEMORY";
133                         }
134                 }
135         }
136         if (flags_str == NULL) {
137                 return "OK";
138         }
139
140         return flags_str;
141 }
142
143 static uint64_t next_srvid(struct ctdb_context *ctdb)
144 {
145         ctdb->srvid += 1;
146         return ctdb->srvid;
147 }
148
149 /*
150  * Get consistent nodemap information.
151  *
152  * If nodemap is already cached, use that. If not get it.
153  * If the current node is BANNED, then get nodemap from "better" node.
154  */
155 static struct ctdb_node_map *get_nodemap(struct ctdb_context *ctdb, bool force)
156 {
157         TALLOC_CTX *tmp_ctx;
158         struct ctdb_node_map *nodemap;
159         struct ctdb_node_and_flags *node;
160         uint32_t current_node;
161         int ret;
162
163         if (force) {
164                 TALLOC_FREE(ctdb->nodemap);
165         }
166
167         if (ctdb->nodemap != NULL) {
168                 return ctdb->nodemap;
169         }
170
171         tmp_ctx = talloc_new(ctdb);
172         if (tmp_ctx == NULL) {
173                 return false;
174         }
175
176         current_node = ctdb->pnn;
177 again:
178         ret = ctdb_ctrl_get_nodemap(tmp_ctx, ctdb->ev, ctdb->client,
179                                     current_node, TIMEOUT(), &nodemap);
180         if (ret != 0) {
181                 fprintf(stderr, "Failed to get nodemap from node %u\n",
182                         current_node);
183                 goto failed;
184         }
185
186         node = get_node_by_pnn(nodemap, current_node);
187         if (node->flags & NODE_FLAGS_BANNED) {
188                 /* Pick next node */
189                 do {
190                         current_node = (current_node + 1) % nodemap->num;
191                         node = get_node_by_pnn(nodemap, current_node);
192                         if (! (node->flags &
193                               (NODE_FLAGS_DELETED|NODE_FLAGS_DISCONNECTED))) {
194                                 break;
195                         }
196                 } while (current_node != ctdb->pnn);
197
198                 if (current_node == ctdb->pnn) {
199                         /* Tried all nodes in the cluster */
200                         fprintf(stderr, "Warning: All nodes are banned.\n");
201                         goto failed;
202                 }
203
204                 goto again;
205         }
206
207         ctdb->nodemap = talloc_steal(ctdb, nodemap);
208         return nodemap;
209
210 failed:
211         talloc_free(tmp_ctx);
212         return NULL;
213 }
214
215 static bool verify_pnn(struct ctdb_context *ctdb, int pnn)
216 {
217         struct ctdb_node_map *nodemap;
218         bool found;
219         int i;
220
221         if (pnn == -1) {
222                 return false;
223         }
224
225         nodemap = get_nodemap(ctdb, false);
226         if (nodemap == NULL) {
227                 return false;
228         }
229
230         found = false;
231         for (i=0; i<nodemap->num; i++) {
232                 if (nodemap->node[i].pnn == pnn) {
233                         found = true;
234                         break;
235                 }
236         }
237         if (! found) {
238                 fprintf(stderr, "Node %u does not exist\n", pnn);
239                 return false;
240         }
241
242         if (nodemap->node[i].flags &
243             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
244                 fprintf(stderr, "Node %u has status %s\n", pnn,
245                         pretty_print_flags(ctdb, nodemap->node[i].flags));
246                 return false;
247         }
248
249         return true;
250 }
251
252 static struct ctdb_node_map *talloc_nodemap(TALLOC_CTX *mem_ctx,
253                                             struct ctdb_node_map *nodemap)
254 {
255         struct ctdb_node_map *nodemap2;
256
257         nodemap2 = talloc_zero(mem_ctx, struct ctdb_node_map);
258         if (nodemap2 == NULL) {
259                 return NULL;
260         }
261
262         nodemap2->node = talloc_array(nodemap2, struct ctdb_node_and_flags,
263                                       nodemap->num);
264         if (nodemap2->node == NULL) {
265                 talloc_free(nodemap2);
266                 return NULL;
267         }
268
269         return nodemap2;
270 }
271
272 /*
273  * Get the number and the list of matching nodes
274  *
275  *   nodestring :=  NULL | all | pnn,[pnn,...]
276  *
277  * If nodestring is NULL, use the current node.
278  */
279 static bool parse_nodestring(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
280                              const char *nodestring,
281                              struct ctdb_node_map **out)
282 {
283         struct ctdb_node_map *nodemap, *nodemap2;
284         struct ctdb_node_and_flags *node;
285         int i;
286
287         nodemap = get_nodemap(ctdb, false);
288         if (nodemap == NULL) {
289                 return false;
290         }
291
292         nodemap2 = talloc_nodemap(mem_ctx, nodemap);
293         if (nodemap2 == NULL) {
294                 return false;
295         }
296
297         if (nodestring == NULL) {
298                 for (i=0; i<nodemap->num; i++) {
299                         if (nodemap->node[i].pnn == ctdb->cmd_pnn) {
300                                 nodemap2->node[0] = nodemap->node[i];
301                                 break;
302                         }
303                 }
304                 nodemap2->num = 1;
305
306                 goto done;
307         }
308
309         if (strcmp(nodestring, "all") == 0) {
310                 for (i=0; i<nodemap->num; i++) {
311                         nodemap2->node[i] = nodemap->node[i];
312                 }
313                 nodemap2->num = nodemap->num;
314
315                 goto done;
316         } else {
317                 char *ns, *tok;
318
319                 ns = talloc_strdup(mem_ctx, nodestring);
320                 if (ns == NULL) {
321                         return false;
322                 }
323
324                 tok = strtok(ns, ",");
325                 while (tok != NULL) {
326                         uint32_t pnn;
327                         char *endptr;
328
329                         pnn = (uint32_t)strtoul(tok, &endptr, 0);
330                         if (pnn == 0 && tok == endptr) {
331                                 fprintf(stderr, "Invalid node %s\n", tok);
332                                         return false;
333                         }
334
335                         node = get_node_by_pnn(nodemap, pnn);
336                         if (node == NULL) {
337                                 fprintf(stderr, "Node %u does not exist\n",
338                                         pnn);
339                                 return false;
340                         }
341
342                         nodemap2->node[nodemap2->num] = *node;
343                         nodemap2->num += 1;
344
345                         tok = strtok(NULL, ",");
346                 }
347         }
348
349 done:
350         *out = nodemap2;
351         return true;
352 }
353
354 /* Compare IP address */
355 static bool ctdb_same_ip(ctdb_sock_addr *ip1, ctdb_sock_addr *ip2)
356 {
357         bool ret = false;
358
359         if (ip1->sa.sa_family != ip2->sa.sa_family) {
360                 return false;
361         }
362
363         switch (ip1->sa.sa_family) {
364         case AF_INET:
365                 ret = (memcmp(&ip1->ip.sin_addr, &ip2->ip.sin_addr,
366                               sizeof(struct in_addr)) == 0);
367                 break;
368
369         case AF_INET6:
370                 ret = (memcmp(&ip1->ip6.sin6_addr, &ip2->ip6.sin6_addr,
371                               sizeof(struct in6_addr)) == 0);
372                 break;
373         }
374
375         return ret;
376 }
377
378 /* Append a node to a node map with given address and flags */
379 static bool node_map_add(struct ctdb_node_map *nodemap,
380                          const char *nstr, uint32_t flags)
381 {
382         ctdb_sock_addr addr;
383         uint32_t num;
384         struct ctdb_node_and_flags *n;
385         int ret;
386
387         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
388         if (ret != 0) {
389                 fprintf(stderr, "Invalid IP address %s\n", nstr);
390                 return false;
391         }
392
393         num = nodemap->num;
394         nodemap->node = talloc_realloc(nodemap, nodemap->node,
395                                        struct ctdb_node_and_flags, num+1);
396         if (nodemap->node == NULL) {
397                 return false;
398         }
399
400         n = &nodemap->node[num];
401         n->addr = addr;
402         n->pnn = num;
403         n->flags = flags;
404
405         nodemap->num = num+1;
406         return true;
407 }
408
409 /* Read a nodes file into a node map */
410 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
411                                                   const char *nlist)
412 {
413         char **lines;
414         int nlines;
415         int i;
416         struct ctdb_node_map *nodemap;
417
418         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
419         if (nodemap == NULL) {
420                 return NULL;
421         }
422
423         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
424         if (lines == NULL) {
425                 return NULL;
426         }
427
428         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
429                 nlines--;
430         }
431
432         for (i=0; i<nlines; i++) {
433                 char *node;
434                 uint32_t flags;
435                 size_t len;
436
437                 node = lines[i];
438                 /* strip leading spaces */
439                 while((*node == ' ') || (*node == '\t')) {
440                         node++;
441                 }
442
443                 len = strlen(node);
444
445                 /* strip trailing spaces */
446                 while ((len > 1) &&
447                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
448                 {
449                         node[len-1] = '\0';
450                         len--;
451                 }
452
453                 if (len == 0) {
454                         continue;
455                 }
456                 if (*node == '#') {
457                         /* A "deleted" node is a node that is
458                            commented out in the nodes file.  This is
459                            used instead of removing a line, which
460                            would cause subsequent nodes to change
461                            their PNN. */
462                         flags = NODE_FLAGS_DELETED;
463                         node = discard_const("0.0.0.0");
464                 } else {
465                         flags = 0;
466                 }
467                 if (! node_map_add(nodemap, node, flags)) {
468                         talloc_free(lines);
469                         TALLOC_FREE(nodemap);
470                         return NULL;
471                 }
472         }
473
474         talloc_free(lines);
475         return nodemap;
476 }
477
478 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx, uint32_t pnn)
479 {
480         struct ctdb_node_map *nodemap;
481         const char *nodes_list = NULL;
482
483         const char *basedir = getenv("CTDB_BASE");
484         if (basedir == NULL) {
485                 basedir = CTDB_ETCDIR;
486         }
487         nodes_list = talloc_asprintf(mem_ctx, "%s/nodes", basedir);
488         if (nodes_list == NULL) {
489                 fprintf(stderr, "Memory allocation error\n");
490                 return NULL;
491         }
492
493         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
494         if (nodemap == NULL) {
495                 fprintf(stderr, "Failed to read nodes file \"%s\"\n",
496                         nodes_list);
497                 return NULL;
498         }
499
500         return nodemap;
501 }
502
503 static struct ctdb_dbid *db_find(TALLOC_CTX *mem_ctx,
504                                  struct ctdb_context *ctdb,
505                                  struct ctdb_dbid_map *dbmap,
506                                  const char *db_name)
507 {
508         struct ctdb_dbid *db = NULL;
509         const char *name;
510         int ret, i;
511
512         for (i=0; i<dbmap->num; i++) {
513                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
514                                            ctdb->pnn, TIMEOUT(),
515                                            dbmap->dbs[i].db_id, &name);
516                 if (ret != 0) {
517                         return false;
518                 }
519
520                 if (strcmp(db_name, name) == 0) {
521                         talloc_free(discard_const(name));
522                         db = &dbmap->dbs[i];
523                         break;
524                 }
525         }
526
527         return db;
528 }
529
530 static bool db_exists(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
531                       const char *db_arg, uint32_t *db_id,
532                       const char **db_name, uint8_t *db_flags)
533 {
534         struct ctdb_dbid_map *dbmap;
535         struct ctdb_dbid *db = NULL;
536         uint32_t id = 0;
537         const char *name = NULL;
538         int ret, i;
539
540         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
541                                   ctdb->pnn, TIMEOUT(), &dbmap);
542         if (ret != 0) {
543                 return false;
544         }
545
546         if (strncmp(db_arg, "0x", 2) == 0) {
547                 id = strtoul(db_arg, NULL, 0);
548                 for (i=0; i<dbmap->num; i++) {
549                         if (id == dbmap->dbs[i].db_id) {
550                                 db = &dbmap->dbs[i];
551                                 break;
552                         }
553                 }
554         } else {
555                 name = db_arg;
556                 db = db_find(mem_ctx, ctdb, dbmap, name);
557         }
558
559         if (db == NULL) {
560                 fprintf(stderr, "No database matching '%s' found\n", db_arg);
561                 return false;
562         }
563
564         if (name == NULL) {
565                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
566                                            ctdb->pnn, TIMEOUT(), id, &name);
567                 if (ret != 0) {
568                         return false;
569                 }
570         }
571
572         if (db_id != NULL) {
573                 *db_id = db->db_id;
574         }
575         if (db_name != NULL) {
576                 *db_name = talloc_strdup(mem_ctx, name);
577         }
578         if (db_flags != NULL) {
579                 *db_flags = db->flags;
580         }
581         return true;
582 }
583
584 static int h2i(char h)
585 {
586         if (h >= 'a' && h <= 'f') {
587                 return h - 'a' + 10;
588         }
589         if (h >= 'A' && h <= 'F') {
590                 return h - 'A' + 10;
591         }
592         return h - '0';
593 }
594
595 static int hex_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
596                        TDB_DATA *out)
597 {
598         int i;
599         TDB_DATA data;
600
601         if (len & 0x01) {
602                 fprintf(stderr, "Key (%s) contains odd number of hex digits\n",
603                         str);
604                 return EINVAL;
605         }
606
607         data.dsize = len / 2;
608         data.dptr = talloc_size(mem_ctx, data.dsize);
609         if (data.dptr == NULL) {
610                 return ENOMEM;
611         }
612
613         for (i=0; i<data.dsize; i++) {
614                 data.dptr[i] = h2i(str[i*2]) << 4 | h2i(str[i*2+1]);
615         }
616
617         *out = data;
618         return 0;
619 }
620
621 static int str_to_data(const char *str, size_t len, TALLOC_CTX *mem_ctx,
622                        TDB_DATA *out)
623 {
624         TDB_DATA data;
625         int ret = 0;
626
627         if (strncmp(str, "0x", 2) == 0) {
628                 ret = hex_to_data(str+2, len-2, mem_ctx, &data);
629                 if (ret != 0) {
630                         return ret;
631                 }
632         } else {
633                 data.dptr = talloc_memdup(mem_ctx, str, len);
634                 if (data.dptr == NULL) {
635                         return ENOMEM;
636                 }
637                 data.dsize = len;
638         }
639
640         *out = data;
641         return 0;
642 }
643
644 static int run_helper(TALLOC_CTX *mem_ctx, const char *command,
645                       const char *path, int argc, const char **argv)
646 {
647         pid_t pid;
648         int save_errno, status, ret;
649         const char **new_argv;
650         int i;
651
652         new_argv = talloc_array(mem_ctx, const char *, argc + 2);
653         if (new_argv == NULL) {
654                 return ENOMEM;
655         }
656
657         new_argv[0] = path;
658         for (i=0; i<argc; i++) {
659                 new_argv[i+1] = argv[i];
660         }
661         new_argv[argc+1] = NULL;
662
663         pid = fork();
664         if (pid < 0) {
665                 save_errno = errno;
666                 talloc_free(new_argv);
667                 fprintf(stderr, "Failed to fork %s (%s) - %s\n",
668                         command, path, strerror(save_errno));
669                 return save_errno;
670         }
671
672         if (pid == 0) {
673                 ret = execv(path, discard_const(new_argv));
674                 if (ret == -1) {
675                         _exit(64+errno);
676                 }
677                 /* Should not happen */
678                 _exit(64+ENOEXEC);
679         }
680
681         talloc_free(new_argv);
682
683         ret = waitpid(pid, &status, 0);
684         if (ret == -1) {
685                 save_errno = errno;
686                 fprintf(stderr, "waitpid() failed for %s - %s\n",
687                         command, strerror(save_errno));
688                 return save_errno;
689         }
690
691         if (WIFEXITED(status)) {
692                 int pstatus = WEXITSTATUS(status);
693                 if (WIFSIGNALED(status)) {
694                         fprintf(stderr, "%s terminated with signal %d\n",
695                                 command, WTERMSIG(status));
696                         ret = EINTR;
697                 } else if (pstatus >= 64 && pstatus < 255) {
698                         fprintf(stderr, "%s failed with error %d\n",
699                                 command, pstatus-64);
700                         ret = pstatus - 64;
701                 } else {
702                         ret = pstatus;
703                 }
704                 return ret;
705         } else if (WIFSIGNALED(status)) {
706                 fprintf(stderr, "%s terminated with signal %d\n",
707                         command, WTERMSIG(status));
708                 return EINTR;
709         }
710
711         return 0;
712 }
713
714 /*
715  * Command Functions
716  */
717
718 static int control_version(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
719                            int argc, const char **argv)
720 {
721         printf("%s\n", ctdb_version_string);
722         return 0;
723 }
724
725 static bool partially_online(TALLOC_CTX *mem_ctx,
726                              struct ctdb_context *ctdb,
727                              struct ctdb_node_and_flags *node)
728 {
729         struct ctdb_iface_list *iface_list;
730         int ret, i;
731         bool status = false;
732
733         if (node->flags != 0) {
734                 return false;
735         }
736
737         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
738                                    node->pnn, TIMEOUT(), &iface_list);
739         if (ret != 0) {
740                 return false;
741         }
742
743         status = false;
744         for (i=0; i < iface_list->num; i++) {
745                 if (iface_list->iface[i].link_state == 0) {
746                         status = true;
747                         break;
748                 }
749         }
750
751         return status;
752 }
753
754 static void print_nodemap_machine(TALLOC_CTX *mem_ctx,
755                                   struct ctdb_context *ctdb,
756                                   struct ctdb_node_map *nodemap,
757                                   uint32_t mypnn)
758 {
759         struct ctdb_node_and_flags *node;
760         int i;
761
762         printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
763                options.sep,
764                "Node", options.sep,
765                "IP", options.sep,
766                "Disconnected", options.sep,
767                "Banned", options.sep,
768                "Disabled", options.sep,
769                "Unhealthy", options.sep,
770                "Stopped", options.sep,
771                "Inactive", options.sep,
772                "PartiallyOnline", options.sep,
773                "ThisNode", options.sep);
774
775         for (i=0; i<nodemap->num; i++) {
776                 node = &nodemap->node[i];
777                 if (node->flags & NODE_FLAGS_DELETED) {
778                         continue;
779                 }
780
781                 printf("%s%u%s%s%s%d%s%d%s%d%s%d%s%d%s%d%s%d%s%c%s\n",
782                        options.sep,
783                        node->pnn, options.sep,
784                        ctdb_sock_addr_to_string(mem_ctx, &node->addr, false),
785                        options.sep,
786                        !! (node->flags & NODE_FLAGS_DISCONNECTED), options.sep,
787                        !! (node->flags & NODE_FLAGS_BANNED), options.sep,
788                        !! (node->flags & NODE_FLAGS_PERMANENTLY_DISABLED),
789                        options.sep,
790                        !! (node->flags & NODE_FLAGS_UNHEALTHY), options.sep,
791                        !! (node->flags & NODE_FLAGS_STOPPED), options.sep,
792                        !! (node->flags & NODE_FLAGS_INACTIVE), options.sep,
793                        partially_online(mem_ctx, ctdb, node), options.sep,
794                        (node->pnn == mypnn)?'Y':'N', options.sep);
795         }
796
797 }
798
799 static void print_nodemap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
800                           struct ctdb_node_map *nodemap, uint32_t mypnn,
801                           bool print_header)
802 {
803         struct ctdb_node_and_flags *node;
804         int num_deleted_nodes = 0;
805         int i;
806
807         for (i=0; i<nodemap->num; i++) {
808                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
809                         num_deleted_nodes++;
810                 }
811         }
812
813         if (print_header) {
814                 if (num_deleted_nodes == 0) {
815                         printf("Number of nodes:%d\n", nodemap->num);
816                 } else {
817                         printf("Number of nodes:%d "
818                                "(including %d deleted nodes)\n",
819                                nodemap->num, num_deleted_nodes);
820                 }
821         }
822
823         for (i=0; i<nodemap->num; i++) {
824                 node = &nodemap->node[i];
825                 if (node->flags & NODE_FLAGS_DELETED) {
826                         continue;
827                 }
828
829                 printf("pnn:%u %-16s %s%s\n",
830                        node->pnn,
831                        ctdb_sock_addr_to_string(mem_ctx, &node->addr, false),
832                        partially_online(mem_ctx, ctdb, node) ?
833                                 "PARTIALLYONLINE" :
834                                 pretty_print_flags(mem_ctx, node->flags),
835                        node->pnn == mypnn ? " (THIS NODE)" : "");
836         }
837 }
838
839 static void print_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
840                          struct ctdb_node_map *nodemap, uint32_t mypnn,
841                          struct ctdb_vnn_map *vnnmap, int recmode,
842                          uint32_t recmaster)
843 {
844         int i;
845
846         print_nodemap(mem_ctx, ctdb, nodemap, mypnn, true);
847
848         if (vnnmap->generation == INVALID_GENERATION) {
849                 printf("Generation:INVALID\n");
850         } else {
851                 printf("Generation:%u\n", vnnmap->generation);
852         }
853         printf("Size:%d\n", vnnmap->size);
854         for (i=0; i<vnnmap->size; i++) {
855                 printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
856         }
857
858         printf("Recovery mode:%s (%d)\n",
859                recmode == CTDB_RECOVERY_NORMAL ? "NORMAL" : "RECOVERY",
860                recmode);
861         printf("Recovery master:%d\n", recmaster);
862 }
863
864 static int control_status(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
865                           int argc, const char **argv)
866 {
867         struct ctdb_node_map *nodemap;
868         struct ctdb_vnn_map *vnnmap;
869         int recmode;
870         uint32_t recmaster;
871         int ret;
872
873         if (argc != 0) {
874                 usage("status");
875         }
876
877         nodemap = get_nodemap(ctdb, false);
878         if (nodemap == NULL) {
879                 return 1;
880         }
881
882         if (options.machinereadable == 1) {
883                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
884                 return 0;
885         }
886
887         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
888                                   ctdb->cmd_pnn, TIMEOUT(), &vnnmap);
889         if (ret != 0) {
890                 return ret;
891         }
892
893         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
894                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
895         if (ret != 0) {
896                 return ret;
897         }
898
899         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
900                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
901         if (ret != 0) {
902                 return ret;
903         }
904
905         print_status(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, vnnmap,
906                      recmode, recmaster);
907         return 0;
908 }
909
910 static int control_uptime(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
911                           int argc, const char **argv)
912 {
913         struct ctdb_uptime *uptime;
914         int ret, tmp, days, hours, minutes, seconds;
915
916         ret = ctdb_ctrl_uptime(mem_ctx, ctdb->ev, ctdb->client,
917                                ctdb->cmd_pnn, TIMEOUT(), &uptime);
918         if (ret != 0) {
919                 return ret;
920         }
921
922         printf("Current time of node %-4u     :                %s",
923                ctdb->cmd_pnn, ctime(&uptime->current_time.tv_sec));
924
925         tmp = uptime->current_time.tv_sec - uptime->ctdbd_start_time.tv_sec;
926         seconds = tmp % 60; tmp /= 60;
927         minutes = tmp % 60; tmp /= 60;
928         hours = tmp % 24; tmp /= 24;
929         days = tmp;
930
931         printf("Ctdbd start time              : (%03d %02d:%02d:%02d) %s",
932                days, hours, minutes, seconds,
933                ctime(&uptime->ctdbd_start_time.tv_sec));
934
935         tmp = uptime->current_time.tv_sec - uptime->last_recovery_finished.tv_sec;
936         seconds = tmp % 60; tmp /= 60;
937         minutes = tmp % 60; tmp /= 60;
938         hours = tmp % 24; tmp /= 24;
939         days = tmp;
940
941         printf("Time of last recovery/failover: (%03d %02d:%02d:%02d) %s",
942                days, hours, minutes, seconds,
943                ctime(&uptime->last_recovery_finished.tv_sec));
944
945         printf("Duration of last recovery/failover: %lf seconds\n",
946                timeval_delta(&uptime->last_recovery_finished,
947                              &uptime->last_recovery_started));
948
949         return 0;
950 }
951
952 static int control_ping(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
953                         int argc, const char **argv)
954 {
955         struct timeval tv;
956         int ret, num_clients;
957
958         tv = timeval_current();
959         ret = ctdb_ctrl_ping(mem_ctx, ctdb->ev, ctdb->client,
960                              ctdb->cmd_pnn, TIMEOUT(), &num_clients);
961         if (ret != 0) {
962                 return ret;
963         }
964
965         printf("response from %u time=%.6f sec  (%d clients)\n",
966                ctdb->cmd_pnn, timeval_elapsed(&tv), num_clients);
967         return 0;
968 }
969
970 const char *runstate_to_string(enum ctdb_runstate runstate);
971 enum ctdb_runstate runstate_from_string(const char *runstate_str);
972
973 static int control_runstate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
974                             int argc, const char **argv)
975 {
976         enum ctdb_runstate runstate;
977         bool found;
978         int ret, i;
979
980         ret = ctdb_ctrl_get_runstate(mem_ctx, ctdb->ev, ctdb->client,
981                                      ctdb->cmd_pnn, TIMEOUT(), &runstate);
982         if (ret != 0) {
983                 return ret;
984         }
985
986         found = true;
987         for (i=0; i<argc; i++) {
988                 enum ctdb_runstate t;
989
990                 found = false;
991                 t = ctdb_runstate_from_string(argv[i]);
992                 if (t == CTDB_RUNSTATE_UNKNOWN) {
993                         printf("Invalid run state (%s)\n", argv[i]);
994                         return 1;
995                 }
996
997                 if (t == runstate) {
998                         found = true;
999                         break;
1000                 }
1001         }
1002
1003         if (! found) {
1004                 printf("CTDB not in required run state (got %s)\n",
1005                        ctdb_runstate_to_string(runstate));
1006                 return 1;
1007         }
1008
1009         printf("%s\n", ctdb_runstate_to_string(runstate));
1010         return 0;
1011 }
1012
1013 static int control_getvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1014                           int argc, const char **argv)
1015 {
1016         struct ctdb_var_list *tun_var_list;
1017         uint32_t value;
1018         int ret, i;
1019         bool found;
1020
1021         if (argc != 1) {
1022                 usage("getvar");
1023         }
1024
1025         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1026                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1027         if (ret != 0) {
1028                 fprintf(stderr,
1029                         "Failed to get list of variables from node %u\n",
1030                         ctdb->cmd_pnn);
1031                 return ret;
1032         }
1033
1034         found = false;
1035         for (i=0; i<tun_var_list->count; i++) {
1036                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1037                         found = true;
1038                         break;
1039                 }
1040         }
1041
1042         if (! found) {
1043                 printf("No such tunable %s\n", argv[0]);
1044                 return 1;
1045         }
1046
1047         ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
1048                                     ctdb->cmd_pnn, TIMEOUT(), argv[0], &value);
1049         if (ret != 0) {
1050                 return ret;
1051         }
1052
1053         printf("%-26s = %u\n", argv[0], value);
1054         return 0;
1055 }
1056
1057 static int control_setvar(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1058                           int argc, const char **argv)
1059 {
1060         struct ctdb_var_list *tun_var_list;
1061         struct ctdb_tunable tunable;
1062         int ret, i;
1063         bool found;
1064
1065         if (argc != 2) {
1066                 usage("setvar");
1067         }
1068
1069         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1070                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1071         if (ret != 0) {
1072                 fprintf(stderr,
1073                         "Failed to get list of variables from node %u\n",
1074                         ctdb->cmd_pnn);
1075                 return ret;
1076         }
1077
1078         found = false;
1079         for (i=0; i<tun_var_list->count; i++) {
1080                 if (strcasecmp(tun_var_list->var[i], argv[0]) == 0) {
1081                         found = true;
1082                         break;
1083                 }
1084         }
1085
1086         if (! found) {
1087                 printf("No such tunable %s\n", argv[0]);
1088                 return 1;
1089         }
1090
1091         tunable.name = argv[0];
1092         tunable.value = strtoul(argv[1], NULL, 0);
1093
1094         ret = ctdb_ctrl_set_tunable(mem_ctx, ctdb->ev, ctdb->client,
1095                                     ctdb->cmd_pnn, TIMEOUT(), &tunable);
1096         if (ret != 0) {
1097                 if (ret == 1) {
1098                         fprintf(stderr,
1099                                 "Setting obsolete tunable variable '%s'\n",
1100                                tunable.name);
1101                         return 0;
1102                 }
1103         }
1104
1105         return ret;
1106 }
1107
1108 static int control_listvars(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1109                             int argc, const char **argv)
1110 {
1111         struct ctdb_var_list *tun_var_list;
1112         int ret, i;
1113
1114         if (argc != 0) {
1115                 usage("listvars");
1116         }
1117
1118         ret = ctdb_ctrl_list_tunables(mem_ctx, ctdb->ev, ctdb->client,
1119                                       ctdb->cmd_pnn, TIMEOUT(), &tun_var_list);
1120         if (ret != 0) {
1121                 return ret;
1122         }
1123
1124         for (i=0; i<tun_var_list->count; i++) {
1125                 control_getvar(mem_ctx, ctdb, 1, &tun_var_list->var[i]);
1126         }
1127
1128         return 0;
1129 }
1130
1131 const struct {
1132         const char *name;
1133         uint32_t offset;
1134 } stats_fields[] = {
1135 #define STATISTICS_FIELD(n) { #n, offsetof(struct ctdb_statistics, n) }
1136         STATISTICS_FIELD(num_clients),
1137         STATISTICS_FIELD(frozen),
1138         STATISTICS_FIELD(recovering),
1139         STATISTICS_FIELD(num_recoveries),
1140         STATISTICS_FIELD(client_packets_sent),
1141         STATISTICS_FIELD(client_packets_recv),
1142         STATISTICS_FIELD(node_packets_sent),
1143         STATISTICS_FIELD(node_packets_recv),
1144         STATISTICS_FIELD(keepalive_packets_sent),
1145         STATISTICS_FIELD(keepalive_packets_recv),
1146         STATISTICS_FIELD(node.req_call),
1147         STATISTICS_FIELD(node.reply_call),
1148         STATISTICS_FIELD(node.req_dmaster),
1149         STATISTICS_FIELD(node.reply_dmaster),
1150         STATISTICS_FIELD(node.reply_error),
1151         STATISTICS_FIELD(node.req_message),
1152         STATISTICS_FIELD(node.req_control),
1153         STATISTICS_FIELD(node.reply_control),
1154         STATISTICS_FIELD(node.req_tunnel),
1155         STATISTICS_FIELD(client.req_call),
1156         STATISTICS_FIELD(client.req_message),
1157         STATISTICS_FIELD(client.req_control),
1158         STATISTICS_FIELD(client.req_tunnel),
1159         STATISTICS_FIELD(timeouts.call),
1160         STATISTICS_FIELD(timeouts.control),
1161         STATISTICS_FIELD(timeouts.traverse),
1162         STATISTICS_FIELD(locks.num_calls),
1163         STATISTICS_FIELD(locks.num_current),
1164         STATISTICS_FIELD(locks.num_pending),
1165         STATISTICS_FIELD(locks.num_failed),
1166         STATISTICS_FIELD(total_calls),
1167         STATISTICS_FIELD(pending_calls),
1168         STATISTICS_FIELD(childwrite_calls),
1169         STATISTICS_FIELD(pending_childwrite_calls),
1170         STATISTICS_FIELD(memory_used),
1171         STATISTICS_FIELD(max_hop_count),
1172         STATISTICS_FIELD(total_ro_delegations),
1173         STATISTICS_FIELD(total_ro_revokes),
1174 };
1175
1176 #define LATENCY_AVG(v)  ((v).num ? (v).total / (v).num : 0.0 )
1177
1178 static void print_statistics_machine(struct ctdb_statistics *s,
1179                                      bool show_header)
1180 {
1181         int i;
1182
1183         if (show_header) {
1184                 printf("CTDB version%s", options.sep);
1185                 printf("Current time of statistics%s", options.sep);
1186                 printf("Statistics collected since%s", options.sep);
1187                 for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1188                         printf("%s%s", stats_fields[i].name, options.sep);
1189                 }
1190                 printf("num_reclock_ctdbd_latency%s", options.sep);
1191                 printf("min_reclock_ctdbd_latency%s", options.sep);
1192                 printf("avg_reclock_ctdbd_latency%s", options.sep);
1193                 printf("max_reclock_ctdbd_latency%s", options.sep);
1194
1195                 printf("num_reclock_recd_latency%s", options.sep);
1196                 printf("min_reclock_recd_latency%s", options.sep);
1197                 printf("avg_reclock_recd_latency%s", options.sep);
1198                 printf("max_reclock_recd_latency%s", options.sep);
1199
1200                 printf("num_call_latency%s", options.sep);
1201                 printf("min_call_latency%s", options.sep);
1202                 printf("avg_call_latency%s", options.sep);
1203                 printf("max_call_latency%s", options.sep);
1204
1205                 printf("num_lockwait_latency%s", options.sep);
1206                 printf("min_lockwait_latency%s", options.sep);
1207                 printf("avg_lockwait_latency%s", options.sep);
1208                 printf("max_lockwait_latency%s", options.sep);
1209
1210                 printf("num_childwrite_latency%s", options.sep);
1211                 printf("min_childwrite_latency%s", options.sep);
1212                 printf("avg_childwrite_latency%s", options.sep);
1213                 printf("max_childwrite_latency%s", options.sep);
1214                 printf("\n");
1215         }
1216
1217         printf("%u%s", CTDB_PROTOCOL, options.sep);
1218         printf("%u%s", (uint32_t)s->statistics_current_time.tv_sec, options.sep);
1219         printf("%u%s", (uint32_t)s->statistics_start_time.tv_sec, options.sep);
1220         for (i=0;i<ARRAY_SIZE(stats_fields);i++) {
1221                 printf("%u%s",
1222                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s),
1223                        options.sep);
1224         }
1225         printf("%u%s", s->reclock.ctdbd.num, options.sep);
1226         printf("%.6f%s", s->reclock.ctdbd.min, options.sep);
1227         printf("%.6f%s", LATENCY_AVG(s->reclock.ctdbd), options.sep);
1228         printf("%.6f%s", s->reclock.ctdbd.max, options.sep);
1229
1230         printf("%u%s", s->reclock.recd.num, options.sep);
1231         printf("%.6f%s", s->reclock.recd.min, options.sep);
1232         printf("%.6f%s", LATENCY_AVG(s->reclock.recd), options.sep);
1233         printf("%.6f%s", s->reclock.recd.max, options.sep);
1234
1235         printf("%d%s", s->call_latency.num, options.sep);
1236         printf("%.6f%s", s->call_latency.min, options.sep);
1237         printf("%.6f%s", LATENCY_AVG(s->call_latency), options.sep);
1238         printf("%.6f%s", s->call_latency.max, options.sep);
1239
1240         printf("%u%s", s->locks.latency.num, options.sep);
1241         printf("%.6f%s", s->locks.latency.min, options.sep);
1242         printf("%.6f%s", LATENCY_AVG(s->locks.latency), options.sep);
1243         printf("%.6f%s", s->locks.latency.max, options.sep);
1244
1245         printf("%d%s", s->childwrite_latency.num, options.sep);
1246         printf("%.6f%s", s->childwrite_latency.min, options.sep);
1247         printf("%.6f%s", LATENCY_AVG(s->childwrite_latency), options.sep);
1248         printf("%.6f%s", s->childwrite_latency.max, options.sep);
1249         printf("\n");
1250 }
1251
1252 static void print_statistics(struct ctdb_statistics *s)
1253 {
1254         int tmp, days, hours, minutes, seconds;
1255         int i;
1256         const char *prefix = NULL;
1257         int preflen = 0;
1258
1259         tmp = s->statistics_current_time.tv_sec -
1260               s->statistics_start_time.tv_sec;
1261         seconds = tmp % 60; tmp /= 60;
1262         minutes = tmp % 60; tmp /= 60;
1263         hours   = tmp % 24; tmp /= 24;
1264         days    = tmp;
1265
1266         printf("CTDB version %u\n", CTDB_PROTOCOL);
1267         printf("Current time of statistics  :                %s",
1268                ctime(&s->statistics_current_time.tv_sec));
1269         printf("Statistics collected since  : (%03d %02d:%02d:%02d) %s",
1270                days, hours, minutes, seconds,
1271                ctime(&s->statistics_start_time.tv_sec));
1272
1273         for (i=0; i<ARRAY_SIZE(stats_fields); i++) {
1274                 if (strchr(stats_fields[i].name, '.') != NULL) {
1275                         preflen = strcspn(stats_fields[i].name, ".") + 1;
1276                         if (! prefix ||
1277                             strncmp(prefix, stats_fields[i].name, preflen) != 0) {
1278                                 prefix = stats_fields[i].name;
1279                                 printf(" %*.*s\n", preflen-1, preflen-1,
1280                                        stats_fields[i].name);
1281                         }
1282                 } else {
1283                         preflen = 0;
1284                 }
1285                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
1286                        stats_fields[i].name+preflen, preflen ? 0 : 4, "",
1287                        *(uint32_t *)(stats_fields[i].offset+(uint8_t *)s));
1288         }
1289
1290         printf(" hop_count_buckets:");
1291         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1292                 printf(" %d", s->hop_count_bucket[i]);
1293         }
1294         printf("\n");
1295         printf(" lock_buckets:");
1296         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
1297                 printf(" %d", s->locks.buckets[i]);
1298         }
1299         printf("\n");
1300         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1301                "locks_latency      MIN/AVG/MAX",
1302                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
1303                s->locks.latency.max, s->locks.latency.num);
1304
1305         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1306                "reclock_ctdbd      MIN/AVG/MAX",
1307                s->reclock.ctdbd.min, LATENCY_AVG(s->reclock.ctdbd),
1308                s->reclock.ctdbd.max, s->reclock.ctdbd.num);
1309
1310         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1311                "reclock_recd       MIN/AVG/MAX",
1312                s->reclock.recd.min, LATENCY_AVG(s->reclock.recd),
1313                s->reclock.recd.max, s->reclock.recd.num);
1314
1315         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1316                "call_latency       MIN/AVG/MAX",
1317                s->call_latency.min, LATENCY_AVG(s->call_latency),
1318                s->call_latency.max, s->call_latency.num);
1319
1320         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
1321                "childwrite_latency MIN/AVG/MAX",
1322                s->childwrite_latency.min,
1323                LATENCY_AVG(s->childwrite_latency),
1324                s->childwrite_latency.max, s->childwrite_latency.num);
1325 }
1326
1327 static int control_statistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1328                               int argc, const char **argv)
1329 {
1330         struct ctdb_statistics *stats;
1331         int ret;
1332
1333         if (argc != 0) {
1334                 usage("statistics");
1335         }
1336
1337         ret = ctdb_ctrl_statistics(mem_ctx, ctdb->ev, ctdb->client,
1338                                    ctdb->cmd_pnn, TIMEOUT(), &stats);
1339         if (ret != 0) {
1340                 return ret;
1341         }
1342
1343         if (options.machinereadable) {
1344                 print_statistics_machine(stats, true);
1345         } else {
1346                 print_statistics(stats);
1347         }
1348
1349         return 0;
1350 }
1351
1352 static int control_statistics_reset(TALLOC_CTX *mem_ctx,
1353                                     struct ctdb_context *ctdb,
1354                                     int argc, const char **argv)
1355 {
1356         int ret;
1357
1358         if (argc != 0) {
1359                 usage("statisticsreset");
1360         }
1361
1362         ret = ctdb_ctrl_statistics_reset(mem_ctx, ctdb->ev, ctdb->client,
1363                                          ctdb->cmd_pnn, TIMEOUT());
1364         if (ret != 0) {
1365                 return ret;
1366         }
1367
1368         return 0;
1369 }
1370
1371 static int control_stats(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1372                          int argc, const char **argv)
1373 {
1374         struct ctdb_statistics_list *slist;
1375         int ret, count = 0, i;
1376         bool show_header = true;
1377
1378         if (argc > 1) {
1379                 usage("stats");
1380         }
1381
1382         if (argc == 1) {
1383                 count = atoi(argv[0]);
1384         }
1385
1386         ret = ctdb_ctrl_get_stat_history(mem_ctx, ctdb->ev, ctdb->client,
1387                                          ctdb->cmd_pnn, TIMEOUT(), &slist);
1388         if (ret != 0) {
1389                 return ret;
1390         }
1391
1392         for (i=0; i<slist->num; i++) {
1393                 if (slist->stats[i].statistics_start_time.tv_sec == 0) {
1394                         continue;
1395                 }
1396                 if (options.machinereadable == 1) {
1397                         print_statistics_machine(&slist->stats[i],
1398                                                  show_header);
1399                         show_header = false;
1400                 } else {
1401                         print_statistics(&slist->stats[i]);
1402                 }
1403                 if (count > 0 && i == count) {
1404                         break;
1405                 }
1406         }
1407
1408         return 0;
1409 }
1410
1411 static int ctdb_public_ip_cmp(const void *a, const void *b)
1412 {
1413         const struct ctdb_public_ip *ip_a = a;
1414         const struct ctdb_public_ip *ip_b = b;
1415
1416         return ctdb_sock_addr_cmp(&ip_a->addr, &ip_b->addr);
1417 }
1418
1419 static void print_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1420                      struct ctdb_public_ip_list *ips,
1421                      struct ctdb_public_ip_info **ipinfo,
1422                      bool all_nodes)
1423 {
1424         int i, j;
1425         char *conf, *avail, *active;
1426
1427         if (options.machinereadable == 1) {
1428                 printf("%s%s%s%s%s", options.sep,
1429                        "Public IP", options.sep,
1430                        "Node", options.sep);
1431                 if (options.verbose == 1) {
1432                         printf("%s%s%s%s%s%s\n",
1433                                "ActiveInterfaces", options.sep,
1434                                "AvailableInterfaces", options.sep,
1435                                "ConfiguredInterfaces", options.sep);
1436                 } else {
1437                         printf("\n");
1438                 }
1439         } else {
1440                 if (all_nodes) {
1441                         printf("Public IPs on ALL nodes\n");
1442                 } else {
1443                         printf("Public IPs on node %u\n", ctdb->cmd_pnn);
1444                 }
1445         }
1446
1447         for (i = 0; i < ips->num; i++) {
1448
1449                 if (options.machinereadable == 1) {
1450                         printf("%s%s%s%d%s", options.sep,
1451                                ctdb_sock_addr_to_string(
1452                                        mem_ctx, &ips->ip[i].addr, false),
1453                                options.sep,
1454                                (int)ips->ip[i].pnn, options.sep);
1455                 } else {
1456                         printf("%s", ctdb_sock_addr_to_string(
1457                                        mem_ctx, &ips->ip[i].addr, false));
1458                 }
1459
1460                 if (options.verbose == 0) {
1461                         if (options.machinereadable == 1) {
1462                                 printf("\n");
1463                         } else {
1464                                 printf(" %d\n", (int)ips->ip[i].pnn);
1465                         }
1466                         continue;
1467                 }
1468
1469                 conf = NULL;
1470                 avail = NULL;
1471                 active = NULL;
1472
1473                 if (ipinfo[i] == NULL) {
1474                         goto skip_ipinfo;
1475                 }
1476
1477                 for (j=0; j<ipinfo[i]->ifaces->num; j++) {
1478                         struct ctdb_iface *iface;
1479
1480                         iface = &ipinfo[i]->ifaces->iface[j];
1481                         if (conf == NULL) {
1482                                 conf = talloc_strdup(mem_ctx, iface->name);
1483                         } else {
1484                                 conf = talloc_asprintf_append(
1485                                                 conf, ",%s", iface->name);
1486                         }
1487
1488                         if (ipinfo[i]->active_idx == j) {
1489                                 active = iface->name;
1490                         }
1491
1492                         if (iface->link_state == 0) {
1493                                 continue;
1494                         }
1495
1496                         if (avail == NULL) {
1497                                 avail = talloc_strdup(mem_ctx, iface->name);
1498                         } else {
1499                                 avail = talloc_asprintf_append(
1500                                                 avail, ",%s", iface->name);
1501                         }
1502                 }
1503
1504         skip_ipinfo:
1505
1506                 if (options.machinereadable == 1) {
1507                         printf("%s%s%s%s%s%s\n",
1508                                active ? active : "", options.sep,
1509                                avail ? avail : "", options.sep,
1510                                conf ? conf : "", options.sep);
1511                 } else {
1512                         printf(" node[%d] active[%s] available[%s]"
1513                                " configured[%s]\n",
1514                                (int)ips->ip[i].pnn, active ? active : "",
1515                                avail ? avail : "", conf ? conf : "");
1516                 }
1517         }
1518 }
1519
1520 static int collect_ips(uint8_t *keybuf, size_t keylen, uint8_t *databuf,
1521                        size_t datalen, void *private_data)
1522 {
1523         struct ctdb_public_ip_list *ips = talloc_get_type_abort(
1524                 private_data, struct ctdb_public_ip_list);
1525         struct ctdb_public_ip *ip;
1526
1527         ip = (struct ctdb_public_ip *)databuf;
1528         ips->ip[ips->num] = *ip;
1529         ips->num += 1;
1530
1531         return 0;
1532 }
1533
1534 static int get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1535                               struct ctdb_public_ip_list **out)
1536 {
1537         struct ctdb_node_map *nodemap;
1538         struct ctdb_public_ip_list *ips;
1539         struct db_hash_context *ipdb;
1540         uint32_t *pnn_list;
1541         int ret, count, i, j;
1542
1543         nodemap = get_nodemap(ctdb, false);
1544         if (nodemap == NULL) {
1545                 return 1;
1546         }
1547
1548         ret = db_hash_init(mem_ctx, "ips", 101, DB_HASH_COMPLEX, &ipdb);
1549         if (ret != 0) {
1550                 goto failed;
1551         }
1552
1553         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
1554                                      &pnn_list);
1555         if (count <= 0) {
1556                 goto failed;
1557         }
1558
1559         for (i=0; i<count; i++) {
1560                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1561                                                pnn_list[i], TIMEOUT(),
1562                                                false, &ips);
1563                 if (ret != 0) {
1564                         goto failed;
1565                 }
1566
1567                 for (j=0; j<ips->num; j++) {
1568                         struct ctdb_public_ip ip;
1569
1570                         ip.pnn = ips->ip[j].pnn;
1571                         ip.addr = ips->ip[j].addr;
1572
1573                         if (pnn_list[i] == ip.pnn) {
1574                                 /* Node claims IP is hosted on it, so
1575                                  * save that information
1576                                  */
1577                                 ret = db_hash_add(ipdb, (uint8_t *)&ip.addr,
1578                                                   sizeof(ip.addr),
1579                                                   (uint8_t *)&ip, sizeof(ip));
1580                                 if (ret != 0) {
1581                                         goto failed;
1582                                 }
1583                         } else {
1584                                 /* Node thinks IP is hosted elsewhere,
1585                                  * so overwrite with CTDB_UNKNOWN_PNN
1586                                  * if there's no existing entry
1587                                  */
1588                                 ret = db_hash_exists(ipdb, (uint8_t *)&ip.addr,
1589                                                      sizeof(ip.addr));
1590                                 if (ret == ENOENT) {
1591                                         ip.pnn = CTDB_UNKNOWN_PNN;
1592                                         ret = db_hash_add(ipdb,
1593                                                           (uint8_t *)&ip.addr,
1594                                                           sizeof(ip.addr),
1595                                                           (uint8_t *)&ip,
1596                                                           sizeof(ip));
1597                                         if (ret != 0) {
1598                                                 goto failed;
1599                                         }
1600                                 }
1601                         }
1602                 }
1603
1604                 TALLOC_FREE(ips);
1605         }
1606
1607         talloc_free(pnn_list);
1608
1609         ret = db_hash_traverse(ipdb, NULL, NULL, &count);
1610         if (ret != 0) {
1611                 goto failed;
1612         }
1613
1614         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);
1615         if (ips == NULL) {
1616                 goto failed;
1617         }
1618
1619         ips->ip = talloc_array(ips, struct ctdb_public_ip, count);
1620         if (ips->ip == NULL) {
1621                 goto failed;
1622         }
1623
1624         ret = db_hash_traverse(ipdb, collect_ips, ips, &count);
1625         if (ret != 0) {
1626                 goto failed;
1627         }
1628
1629         if (count != ips->num) {
1630                 goto failed;
1631         }
1632
1633         talloc_free(ipdb);
1634
1635         *out = ips;
1636         return 0;
1637
1638 failed:
1639         talloc_free(ipdb);
1640         return 1;
1641 }
1642
1643 static int control_ip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1644                       int argc, const char **argv)
1645 {
1646         struct ctdb_public_ip_list *ips;
1647         struct ctdb_public_ip_info **ipinfo;
1648         int ret, i;
1649         bool do_all = false;
1650
1651         if (argc > 1) {
1652                 usage("ip");
1653         }
1654
1655         if (argc == 1) {
1656                 if (strcmp(argv[0], "all") == 0) {
1657                         do_all = true;
1658                 } else {
1659                         usage("ip");
1660                 }
1661         }
1662
1663         if (do_all) {
1664                 ret = get_all_public_ips(ctdb, mem_ctx, &ips);
1665         } else {
1666                 ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
1667                                                ctdb->cmd_pnn, TIMEOUT(),
1668                                                false, &ips);
1669         }
1670         if (ret != 0) {
1671                 return ret;
1672         }
1673
1674         qsort(ips->ip, ips->num, sizeof(struct ctdb_public_ip),
1675               ctdb_public_ip_cmp);
1676
1677         ipinfo = talloc_array(mem_ctx, struct ctdb_public_ip_info *, ips->num);
1678         if (ipinfo == NULL) {
1679                 return 1;
1680         }
1681
1682         for (i=0; i<ips->num; i++) {
1683                 uint32_t pnn;
1684                 if (do_all) {
1685                         pnn = ips->ip[i].pnn;
1686                 } else {
1687                         pnn = ctdb->cmd_pnn;
1688                 }
1689                 if (pnn == CTDB_UNKNOWN_PNN) {
1690                         ipinfo[i] = NULL;
1691                         continue;
1692                 }
1693                 ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev,
1694                                                    ctdb->client, pnn,
1695                                                    TIMEOUT(), &ips->ip[i].addr,
1696                                                    &ipinfo[i]);
1697                 if (ret != 0) {
1698                         return ret;
1699                 }
1700         }
1701
1702         print_ip(mem_ctx, ctdb, ips, ipinfo, do_all);
1703         return 0;
1704 }
1705
1706 static int control_ipinfo(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1707                           int argc, const char **argv)
1708 {
1709         struct ctdb_public_ip_info *ipinfo;
1710         ctdb_sock_addr addr;
1711         int ret, i;
1712
1713         if (argc != 1) {
1714                 usage("ipinfo");
1715         }
1716
1717         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
1718         if (ret != 0) {
1719                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
1720                 return 1;
1721         }
1722
1723         ret = ctdb_ctrl_get_public_ip_info(mem_ctx, ctdb->ev, ctdb->client,
1724                                            ctdb->cmd_pnn, TIMEOUT(), &addr,
1725                                            &ipinfo);
1726         if (ret != 0) {
1727                 if (ret == -1) {
1728                         printf("Node %u does not know about IP %s\n",
1729                                ctdb->cmd_pnn, argv[0]);
1730                 }
1731                 return ret;
1732         }
1733
1734         printf("Public IP[%s] info on node %u\n",
1735                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr, false),
1736                                         ctdb->cmd_pnn);
1737
1738         printf("IP:%s\nCurrentNode:%u\nNumInterfaces:%u\n",
1739                ctdb_sock_addr_to_string(mem_ctx, &ipinfo->ip.addr, false),
1740                ipinfo->ip.pnn, ipinfo->ifaces->num);
1741
1742         for (i=0; i<ipinfo->ifaces->num; i++) {
1743                 struct ctdb_iface *iface;
1744
1745                 iface = &ipinfo->ifaces->iface[i];
1746                 iface->name[CTDB_IFACE_SIZE] = '\0';
1747                 printf("Interface[%u]: Name:%s Link:%s References:%u%s\n",
1748                        i+1, iface->name,
1749                        iface->link_state == 0 ? "down" : "up",
1750                        iface->references,
1751                        (i == ipinfo->active_idx) ? " (active)" : "");
1752         }
1753
1754         return 0;
1755 }
1756
1757 static int control_ifaces(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1758                           int argc, const char **argv)
1759 {
1760         struct ctdb_iface_list *ifaces;
1761         int ret, i;
1762
1763         if (argc != 0) {
1764                 usage("ifaces");
1765         }
1766
1767         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1768                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1769         if (ret != 0) {
1770                 return ret;
1771         }
1772
1773         if (ifaces->num == 0) {
1774                 printf("No interfaces configured on node %u\n",
1775                        ctdb->cmd_pnn);
1776                 return 0;
1777         }
1778
1779         if (options.machinereadable) {
1780                 printf("%s%s%s%s%s%s%s\n", options.sep,
1781                        "Name", options.sep,
1782                        "LinkStatus", options.sep,
1783                        "References", options.sep);
1784         } else {
1785                 printf("Interfaces on node %u\n", ctdb->cmd_pnn);
1786         }
1787
1788         for (i=0; i<ifaces->num; i++) {
1789                 if (options.machinereadable) {
1790                         printf("%s%s%s%u%s%u%s\n", options.sep,
1791                                ifaces->iface[i].name, options.sep,
1792                                ifaces->iface[i].link_state, options.sep,
1793                                ifaces->iface[i].references, options.sep);
1794                 } else {
1795                         printf("name:%s link:%s references:%u\n",
1796                                ifaces->iface[i].name,
1797                                ifaces->iface[i].link_state ? "up" : "down",
1798                                ifaces->iface[i].references);
1799                 }
1800         }
1801
1802         return 0;
1803 }
1804
1805 static int control_setifacelink(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1806                                 int argc, const char **argv)
1807 {
1808         struct ctdb_iface_list *ifaces;
1809         struct ctdb_iface *iface;
1810         int ret, i;
1811
1812         if (argc != 2) {
1813                 usage("setifacelink");
1814         }
1815
1816         if (strlen(argv[0]) > CTDB_IFACE_SIZE) {
1817                 fprintf(stderr, "Interface name '%s' too long\n", argv[0]);
1818                 return 1;
1819         }
1820
1821         ret = ctdb_ctrl_get_ifaces(mem_ctx, ctdb->ev, ctdb->client,
1822                                    ctdb->cmd_pnn, TIMEOUT(), &ifaces);
1823         if (ret != 0) {
1824                 fprintf(stderr,
1825                         "Failed to get interface information from node %u\n",
1826                         ctdb->cmd_pnn);
1827                 return ret;
1828         }
1829
1830         iface = NULL;
1831         for (i=0; i<ifaces->num; i++) {
1832                 if (strcmp(ifaces->iface[i].name, argv[0]) == 0) {
1833                         iface = &ifaces->iface[i];
1834                         break;
1835                 }
1836         }
1837
1838         if (iface == NULL) {
1839                 printf("Interface %s not configured on node %u\n",
1840                        argv[0], ctdb->cmd_pnn);
1841                 return 1;
1842         }
1843
1844         if (strcmp(argv[1], "up") == 0) {
1845                 iface->link_state = 1;
1846         } else if (strcmp(argv[1], "down") == 0) {
1847                 iface->link_state = 0;
1848         } else {
1849                 usage("setifacelink");
1850                 return 1;
1851         }
1852
1853         iface->references = 0;
1854
1855         ret = ctdb_ctrl_set_iface_link_state(mem_ctx, ctdb->ev, ctdb->client,
1856                                              ctdb->cmd_pnn, TIMEOUT(), iface);
1857         if (ret != 0) {
1858                 return ret;
1859         }
1860
1861         return 0;
1862 }
1863
1864 static int control_process_exists(TALLOC_CTX *mem_ctx,
1865                                   struct ctdb_context *ctdb,
1866                                   int argc, const char **argv)
1867 {
1868         pid_t pid;
1869         uint64_t srvid = 0;
1870         int ret, status;
1871
1872         if (argc != 1 && argc != 2) {
1873                 usage("process-exists");
1874         }
1875
1876         pid = atoi(argv[0]);
1877         if (argc == 2) {
1878                 srvid = strtoull(argv[1], NULL, 0);
1879         }
1880
1881         if (srvid == 0) {
1882                 ret = ctdb_ctrl_process_exists(mem_ctx, ctdb->ev, ctdb->client,
1883                                        ctdb->cmd_pnn, TIMEOUT(), pid, &status);
1884         } else {
1885                 struct ctdb_pid_srvid pid_srvid;
1886
1887                 pid_srvid.pid = pid;
1888                 pid_srvid.srvid = srvid;
1889
1890                 ret = ctdb_ctrl_check_pid_srvid(mem_ctx, ctdb->ev,
1891                                                 ctdb->client, ctdb->cmd_pnn,
1892                                                 TIMEOUT(), &pid_srvid,
1893                                                 &status);
1894         }
1895
1896         if (ret != 0) {
1897                 return ret;
1898         }
1899
1900         if (srvid == 0) {
1901                 printf("PID %d %s\n", pid,
1902                        (status == 0 ? "exists" : "does not exist"));
1903         } else {
1904                 printf("PID %d with SRVID 0x%"PRIx64" %s\n", pid, srvid,
1905                        (status == 0 ? "exists" : "does not exist"));
1906         }
1907         return status;
1908 }
1909
1910 static int control_getdbmap(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
1911                             int argc, const char **argv)
1912 {
1913         struct ctdb_dbid_map *dbmap;
1914         int ret, i;
1915
1916         if (argc != 0) {
1917                 usage("getdbmap");
1918         }
1919
1920         ret = ctdb_ctrl_get_dbmap(mem_ctx, ctdb->ev, ctdb->client,
1921                                   ctdb->cmd_pnn, TIMEOUT(), &dbmap);
1922         if (ret != 0) {
1923                 return ret;
1924         }
1925
1926         if (options.machinereadable == 1) {
1927                 printf("%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s%s\n",
1928                        options.sep,
1929                        "ID", options.sep,
1930                        "Name", options.sep,
1931                        "Path", options.sep,
1932                        "Persistent", options.sep,
1933                        "Sticky", options.sep,
1934                        "Unhealthy", options.sep,
1935                        "Readonly", options.sep,
1936                        "Replicated", options.sep);
1937         } else {
1938                 printf("Number of databases:%d\n", dbmap->num);
1939         }
1940
1941         for (i=0; i<dbmap->num; i++) {
1942                 const char *name;
1943                 const char *path;
1944                 const char *health;
1945                 bool persistent;
1946                 bool readonly;
1947                 bool sticky;
1948                 bool replicated;
1949                 uint32_t db_id;
1950
1951                 db_id = dbmap->dbs[i].db_id;
1952
1953                 ret = ctdb_ctrl_get_dbname(mem_ctx, ctdb->ev, ctdb->client,
1954                                            ctdb->cmd_pnn, TIMEOUT(), db_id,
1955                                            &name);
1956                 if (ret != 0) {
1957                         return ret;
1958                 }
1959
1960                 ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
1961                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
1962                                           &path);
1963                 if (ret != 0) {
1964                         return ret;
1965                 }
1966
1967                 ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
1968                                               ctdb->cmd_pnn, TIMEOUT(), db_id,
1969                                               &health);
1970                 if (ret != 0) {
1971                         return ret;
1972                 }
1973
1974                 persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1975                 readonly = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
1976                 sticky = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
1977                 replicated = dbmap->dbs[i].flags & CTDB_DB_FLAGS_REPLICATED;
1978
1979                 if (options.machinereadable == 1) {
1980                         printf("%s0x%08X%s%s%s%s%s%d%s%d%s%d%s%d%s%d%s\n",
1981                                options.sep,
1982                                db_id, options.sep,
1983                                name, options.sep,
1984                                path, options.sep,
1985                                !! (persistent), options.sep,
1986                                !! (sticky), options.sep,
1987                                !! (health), options.sep,
1988                                !! (readonly), options.sep,
1989                                !! (replicated), options.sep);
1990                 } else {
1991                         printf("dbid:0x%08x name:%s path:%s%s%s%s%s%s\n",
1992                                db_id, name, path,
1993                                persistent ? " PERSISTENT" : "",
1994                                sticky ? " STICKY" : "",
1995                                readonly ? " READONLY" : "",
1996                                replicated ? " REPLICATED" : "",
1997                                health ? " UNHEALTHY" : "");
1998                 }
1999
2000                 talloc_free(discard_const(name));
2001                 talloc_free(discard_const(path));
2002                 talloc_free(discard_const(health));
2003         }
2004
2005         return 0;
2006 }
2007
2008 static int control_getdbstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2009                                int argc, const char **argv)
2010 {
2011         uint32_t db_id;
2012         const char *db_name, *db_path, *db_health;
2013         uint8_t db_flags;
2014         int ret;
2015
2016         if (argc != 1) {
2017                 usage("getdbstatus");
2018         }
2019
2020         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2021                 return 1;
2022         }
2023
2024         ret = ctdb_ctrl_getdbpath(mem_ctx, ctdb->ev, ctdb->client,
2025                                   ctdb->cmd_pnn, TIMEOUT(), db_id,
2026                                   &db_path);
2027         if (ret != 0) {
2028                 return ret;
2029         }
2030
2031         ret = ctdb_ctrl_db_get_health(mem_ctx, ctdb->ev, ctdb->client,
2032                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
2033                                       &db_health);
2034         if (ret != 0) {
2035                 return ret;
2036         }
2037
2038         printf("dbid: 0x%08x\nname: %s\npath: %s\n", db_id, db_name, db_path);
2039         printf("PERSISTENT: %s\nREPLICATED: %s\nSTICKY: %s\nREADONLY: %s\n",
2040                (db_flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
2041                (db_flags & CTDB_DB_FLAGS_REPLICATED ? "yes" : "no"),
2042                (db_flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
2043                (db_flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"));
2044         printf("HEALTH: %s\n", (db_health ? db_health : "OK"));
2045         return 0;
2046 }
2047
2048 struct dump_record_state {
2049         uint32_t count;
2050 };
2051
2052 #define ISASCII(x) (isprint(x) && ! strchr("\"\\", (x)))
2053
2054 static void dump_tdb_data(const char *name, TDB_DATA val)
2055 {
2056         int i;
2057
2058         fprintf(stdout, "%s(%zu) = \"", name, val.dsize);
2059         for (i=0; i<val.dsize; i++) {
2060                 if (ISASCII(val.dptr[i])) {
2061                         fprintf(stdout, "%c", val.dptr[i]);
2062                 } else {
2063                         fprintf(stdout, "\\%02X", val.dptr[i]);
2064                 }
2065         }
2066         fprintf(stdout, "\"\n");
2067 }
2068
2069 static void dump_ltdb_header(struct ctdb_ltdb_header *header)
2070 {
2071         fprintf(stdout, "dmaster: %u\n", header->dmaster);
2072         fprintf(stdout, "rsn: %" PRIu64 "\n", header->rsn);
2073         fprintf(stdout, "flags: 0x%08x", header->flags);
2074         if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
2075                 fprintf(stdout, " MIGRATED_WITH_DATA");
2076         }
2077         if (header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) {
2078                 fprintf(stdout, " VACUUM_MIGRATED");
2079         }
2080         if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
2081                 fprintf(stdout, " AUTOMATIC");
2082         }
2083         if (header->flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
2084                 fprintf(stdout, " RO_HAVE_DELEGATIONS");
2085         }
2086         if (header->flags & CTDB_REC_RO_HAVE_READONLY) {
2087                 fprintf(stdout, " RO_HAVE_READONLY");
2088         }
2089         if (header->flags & CTDB_REC_RO_REVOKING_READONLY) {
2090                 fprintf(stdout, " RO_REVOKING_READONLY");
2091         }
2092         if (header->flags & CTDB_REC_RO_REVOKE_COMPLETE) {
2093                 fprintf(stdout, " RO_REVOKE_COMPLETE");
2094         }
2095         fprintf(stdout, "\n");
2096
2097 }
2098
2099 static int dump_record(uint32_t reqid, struct ctdb_ltdb_header *header,
2100                        TDB_DATA key, TDB_DATA data, void *private_data)
2101 {
2102         struct dump_record_state *state =
2103                 (struct dump_record_state *)private_data;
2104
2105         state->count += 1;
2106
2107         dump_tdb_data("key", key);
2108         dump_ltdb_header(header);
2109         dump_tdb_data("data", data);
2110         fprintf(stdout, "\n");
2111
2112         return 0;
2113 }
2114
2115 static int control_catdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2116                          int argc, const char **argv)
2117 {
2118         struct ctdb_db_context *db;
2119         const char *db_name;
2120         uint32_t db_id;
2121         uint8_t db_flags;
2122         struct dump_record_state state;
2123         int ret;
2124
2125         if (argc != 1) {
2126                 usage("catdb");
2127         }
2128
2129         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2130                 return 1;
2131         }
2132
2133         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2134                           db_flags, &db);
2135         if (ret != 0) {
2136                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2137                 return ret;
2138         }
2139
2140         state.count = 0;
2141
2142         ret = ctdb_db_traverse(mem_ctx, ctdb->ev, ctdb->client, db,
2143                                ctdb->cmd_pnn, TIMEOUT(),
2144                                dump_record, &state);
2145
2146         printf("Dumped %u records\n", state.count);
2147
2148         return ret;
2149 }
2150
2151 static int control_cattdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2152                           int argc, const char **argv)
2153 {
2154         struct ctdb_db_context *db;
2155         const char *db_name;
2156         uint32_t db_id;
2157         uint8_t db_flags;
2158         struct dump_record_state state;
2159         int ret;
2160
2161         if (argc != 1) {
2162                 usage("catdb");
2163         }
2164
2165         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
2166                 return 1;
2167         }
2168
2169         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2170                           db_flags, &db);
2171         if (ret != 0) {
2172                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
2173                 return ret;
2174         }
2175
2176         state.count = 0;
2177         ret = ctdb_db_traverse_local(db, true, true, dump_record, &state);
2178
2179         printf("Dumped %u record(s)\n", state.count);
2180
2181         return ret;
2182 }
2183
2184 static int control_getcapabilities(TALLOC_CTX *mem_ctx,
2185                                    struct ctdb_context *ctdb,
2186                                    int argc, const char **argv)
2187 {
2188         uint32_t caps;
2189         int ret;
2190
2191         if (argc != 0) {
2192                 usage("getcapabilities");
2193         }
2194
2195         ret = ctdb_ctrl_get_capabilities(mem_ctx, ctdb->ev, ctdb->client,
2196                                          ctdb->cmd_pnn, TIMEOUT(), &caps);
2197         if (ret != 0) {
2198                 return ret;
2199         }
2200
2201         if (options.machinereadable == 1) {
2202                 printf("%s%s%s%s%s\n",
2203                        options.sep,
2204                        "RECMASTER", options.sep,
2205                        "LMASTER", options.sep);
2206                 printf("%s%d%s%d%s\n", options.sep,
2207                        !! (caps & CTDB_CAP_RECMASTER), options.sep,
2208                        !! (caps & CTDB_CAP_LMASTER), options.sep);
2209         } else {
2210                 printf("RECMASTER: %s\n",
2211                        (caps & CTDB_CAP_RECMASTER) ? "YES" : "NO");
2212                 printf("LMASTER: %s\n",
2213                        (caps & CTDB_CAP_LMASTER) ? "YES" : "NO");
2214         }
2215
2216         return 0;
2217 }
2218
2219 static int control_pnn(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2220                        int argc, const char **argv)
2221 {
2222         printf("%u\n", ctdb_client_pnn(ctdb->client));
2223         return 0;
2224 }
2225
2226 static int control_lvs(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2227                        int argc, const char **argv)
2228 {
2229         char *t, *lvs_helper = NULL;
2230
2231         if (argc != 1) {
2232                 usage("lvs");
2233         }
2234
2235         t = getenv("CTDB_LVS_HELPER");
2236         if (t != NULL) {
2237                 lvs_helper = talloc_strdup(mem_ctx, t);
2238         } else {
2239                 lvs_helper = talloc_asprintf(mem_ctx, "%s/ctdb_lvs",
2240                                              CTDB_HELPER_BINDIR);
2241         }
2242
2243         if (lvs_helper == NULL) {
2244                 fprintf(stderr, "Unable to set LVS helper\n");
2245                 return 1;
2246         }
2247
2248         return run_helper(mem_ctx, "LVS helper", lvs_helper, argc, argv);
2249 }
2250
2251 static int control_setdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2252                             int argc, const char **argv)
2253 {
2254         int log_level;
2255         int ret;
2256         bool found;
2257
2258         if (argc != 1) {
2259                 usage("setdebug");
2260         }
2261
2262         found = debug_level_parse(argv[0], &log_level);
2263         if (! found) {
2264                 fprintf(stderr,
2265                         "Invalid debug level '%s'. Valid levels are:\n",
2266                         argv[0]);
2267                 fprintf(stderr, "\tERROR | WARNING | NOTICE | INFO | DEBUG\n");
2268                 return 1;
2269         }
2270
2271         ret = ctdb_ctrl_setdebug(mem_ctx, ctdb->ev, ctdb->client,
2272                                  ctdb->cmd_pnn, TIMEOUT(), log_level);
2273         if (ret != 0) {
2274                 return ret;
2275         }
2276
2277         return 0;
2278 }
2279
2280 static int control_getdebug(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2281                             int argc, const char **argv)
2282 {
2283         int loglevel;
2284         const char *log_str;
2285         int ret;
2286
2287         if (argc != 0) {
2288                 usage("getdebug");
2289         }
2290
2291         ret = ctdb_ctrl_getdebug(mem_ctx, ctdb->ev, ctdb->client,
2292                                  ctdb->cmd_pnn, TIMEOUT(), &loglevel);
2293         if (ret != 0) {
2294                 return ret;
2295         }
2296
2297         log_str = debug_level_to_string(loglevel);
2298         printf("%s\n", log_str);
2299
2300         return 0;
2301 }
2302
2303 static int control_attach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2304                           int argc, const char **argv)
2305 {
2306         const char *db_name;
2307         uint8_t db_flags = 0;
2308         int ret;
2309
2310         if (argc < 1 || argc > 2) {
2311                 usage("attach");
2312         }
2313
2314         db_name = argv[0];
2315         if (argc == 2) {
2316                 if (strcmp(argv[1], "persistent") == 0) {
2317                         db_flags = CTDB_DB_FLAGS_PERSISTENT;
2318                 } else if (strcmp(argv[1], "readonly") == 0) {
2319                         db_flags = CTDB_DB_FLAGS_READONLY;
2320                 } else if (strcmp(argv[1], "sticky") == 0) {
2321                         db_flags = CTDB_DB_FLAGS_STICKY;
2322                 } else if (strcmp(argv[1], "replicated") == 0) {
2323                         db_flags = CTDB_DB_FLAGS_REPLICATED;
2324                 } else {
2325                         usage("attach");
2326                 }
2327         }
2328
2329         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
2330                           db_flags, NULL);
2331         if (ret != 0) {
2332                 return ret;
2333         }
2334
2335         return 0;
2336 }
2337
2338 static int control_detach(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2339                           int argc, const char **argv)
2340 {
2341         const char *db_name;
2342         uint32_t db_id;
2343         uint8_t db_flags;
2344         struct ctdb_node_map *nodemap;
2345         int recmode;
2346         int ret, ret2, i;
2347
2348         if (argc < 1) {
2349                 usage("detach");
2350         }
2351
2352         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2353                                     ctdb->cmd_pnn, TIMEOUT(), &recmode);
2354         if (ret != 0) {
2355                 return ret;
2356         }
2357
2358         if (recmode == CTDB_RECOVERY_ACTIVE) {
2359                 fprintf(stderr, "Database cannot be detached"
2360                                 " when recovery is active\n");
2361                 return 1;
2362         }
2363
2364         nodemap = get_nodemap(ctdb, false);
2365         if (nodemap == NULL) {
2366                 return 1;
2367         }
2368
2369         for (i=0; i<nodemap->num; i++) {
2370                 uint32_t value;
2371
2372                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
2373                         continue;
2374                 }
2375                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2376                         continue;
2377                 }
2378                 if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2379                         fprintf(stderr, "Database cannot be detached on"
2380                                 " inactive (stopped or banned) node %u\n",
2381                                 nodemap->node[i].pnn);
2382                         return 1;
2383                 }
2384
2385                 ret = ctdb_ctrl_get_tunable(mem_ctx, ctdb->ev, ctdb->client,
2386                                             nodemap->node[i].pnn, TIMEOUT(),
2387                                             "AllowClientDBAttach", &value);
2388                 if (ret != 0) {
2389                         fprintf(stderr,
2390                                 "Unable to get tunable AllowClientDBAttach"
2391                                 " from node %u\n", nodemap->node[i].pnn);
2392                         return ret;
2393                 }
2394
2395                 if (value == 1) {
2396                         fprintf(stderr,
2397                                 "Database access is still active on node %u."
2398                                 " Set AllowclientDBAttach=0 on all nodes.\n",
2399                                 nodemap->node[i].pnn);
2400                         return 1;
2401                 }
2402         }
2403
2404         ret2 = 0;
2405         for (i=0; i<argc; i++) {
2406                 if (! db_exists(mem_ctx, ctdb, argv[i], &db_id, &db_name,
2407                                 &db_flags)) {
2408                         continue;
2409                 }
2410
2411                 if (db_flags &
2412                     (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
2413                         fprintf(stderr,
2414                                 "Only volatile databases can be detached\n");
2415                         return 1;
2416                 }
2417
2418                 ret = ctdb_detach(ctdb->ev, ctdb->client, TIMEOUT(), db_id);
2419                 if (ret != 0) {
2420                         fprintf(stderr, "Database %s detach failed\n", db_name);
2421                         ret2 = ret;
2422                 }
2423         }
2424
2425         return ret2;
2426 }
2427
2428 static int control_dumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2429                               int argc, const char **argv)
2430 {
2431         const char *mem_str;
2432         ssize_t n;
2433         int ret;
2434
2435         ret = ctdb_ctrl_dump_memory(mem_ctx, ctdb->ev, ctdb->client,
2436                                     ctdb->cmd_pnn, TIMEOUT(), &mem_str);
2437         if (ret != 0) {
2438                 return ret;
2439         }
2440
2441         n = write(1, mem_str, strlen(mem_str)+1);
2442         if (n < 0 || n != strlen(mem_str)+1) {
2443                 fprintf(stderr, "Failed to write talloc summary\n");
2444                 return 1;
2445         }
2446
2447         return 0;
2448 }
2449
2450 static void dump_memory(uint64_t srvid, TDB_DATA data, void *private_data)
2451 {
2452         bool *done = (bool *)private_data;
2453         ssize_t n;
2454
2455         n = write(1, data.dptr, data.dsize);
2456         if (n < 0 || n != data.dsize) {
2457                 fprintf(stderr, "Failed to write talloc summary\n");
2458         }
2459
2460         *done = true;
2461 }
2462
2463 static int control_rddumpmemory(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2464                                 int argc, const char **argv)
2465 {
2466         struct ctdb_srvid_message msg = { 0 };
2467         int ret;
2468         bool done = false;
2469
2470         msg.pnn = ctdb->pnn;
2471         msg.srvid = next_srvid(ctdb);
2472
2473         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2474                                               msg.srvid, dump_memory, &done);
2475         if (ret != 0) {
2476                 return ret;
2477         }
2478
2479         ret = ctdb_message_mem_dump(mem_ctx, ctdb->ev, ctdb->client,
2480                                     ctdb->cmd_pnn, &msg);
2481         if (ret != 0) {
2482                 return ret;
2483         }
2484
2485         ctdb_client_wait(ctdb->ev, &done);
2486         return 0;
2487 }
2488
2489 static int control_getpid(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2490                           int argc, const char **argv)
2491 {
2492         pid_t pid;
2493         int ret;
2494
2495         ret = ctdb_ctrl_get_pid(mem_ctx, ctdb->ev, ctdb->client,
2496                                 ctdb->cmd_pnn, TIMEOUT(), &pid);
2497         if (ret != 0) {
2498                 return ret;
2499         }
2500
2501         printf("%u\n", pid);
2502         return 0;
2503 }
2504
2505 static int check_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2506                        const char *desc, uint32_t flag, bool set_flag)
2507 {
2508         struct ctdb_node_map *nodemap;
2509         bool flag_is_set;
2510
2511         nodemap = get_nodemap(ctdb, false);
2512         if (nodemap == NULL) {
2513                 return 1;
2514         }
2515
2516         flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2517         if (set_flag == flag_is_set) {
2518                 if (set_flag) {
2519                         fprintf(stderr, "Node %u is already %s\n",
2520                                 ctdb->cmd_pnn, desc);
2521                 } else {
2522                         fprintf(stderr, "Node %u is not %s\n",
2523                                 ctdb->cmd_pnn, desc);
2524                 }
2525                 return 0;
2526         }
2527
2528         return 1;
2529 }
2530
2531 static void wait_for_flags(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2532                            uint32_t flag, bool set_flag)
2533 {
2534         struct ctdb_node_map *nodemap;
2535         bool flag_is_set;
2536
2537         while (1) {
2538                 nodemap = get_nodemap(ctdb, true);
2539                 if (nodemap == NULL) {
2540                         fprintf(stderr,
2541                                 "Failed to get nodemap, trying again\n");
2542                         sleep(1);
2543                         continue;
2544                 }
2545
2546                 flag_is_set = nodemap->node[ctdb->cmd_pnn].flags & flag;
2547                 if (flag_is_set == set_flag) {
2548                         break;
2549                 }
2550
2551                 sleep(1);
2552         }
2553 }
2554
2555 static int ctdb_ctrl_modflags(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2556                               struct ctdb_client_context *client,
2557                               uint32_t destnode, struct timeval timeout,
2558                               uint32_t set, uint32_t clear)
2559 {
2560         struct ctdb_node_map *nodemap;
2561         struct ctdb_node_flag_change flag_change;
2562         struct ctdb_req_control request;
2563         uint32_t *pnn_list;
2564         int ret, count;
2565
2566         ret = ctdb_ctrl_get_nodemap(mem_ctx, ev, client, destnode,
2567                                     tevent_timeval_zero(), &nodemap);
2568         if (ret != 0) {
2569                 return ret;
2570         }
2571
2572         flag_change.pnn = destnode;
2573         flag_change.old_flags = nodemap->node[destnode].flags;
2574         flag_change.new_flags = flag_change.old_flags | set;
2575         flag_change.new_flags &= ~clear;
2576
2577         count = list_of_connected_nodes(nodemap, -1, mem_ctx, &pnn_list);
2578         if (count == -1) {
2579                 return ENOMEM;
2580         }
2581
2582         ctdb_req_control_modify_flags(&request, &flag_change);
2583         ret = ctdb_client_control_multi(mem_ctx, ev, client, pnn_list, count,
2584                                         tevent_timeval_zero(), &request,
2585                                         NULL, NULL);
2586         return ret;
2587 }
2588
2589 struct ipreallocate_state {
2590         int status;
2591         bool done;
2592 };
2593
2594 static void ipreallocate_handler(uint64_t srvid, TDB_DATA data,
2595                                  void *private_data)
2596 {
2597         struct ipreallocate_state *state =
2598                 (struct ipreallocate_state *)private_data;
2599
2600         if (data.dsize != sizeof(int)) {
2601                 /* Ignore packet */
2602                 return;
2603         }
2604
2605         state->status = *(int *)data.dptr;
2606         state->done = true;
2607 }
2608
2609 static int ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb)
2610 {
2611         struct ctdb_srvid_message msg = { 0 };
2612         struct ipreallocate_state state;
2613         int ret;
2614
2615         msg.pnn = ctdb->pnn;
2616         msg.srvid = next_srvid(ctdb);
2617
2618         state.done = false;
2619         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
2620                                               msg.srvid,
2621                                               ipreallocate_handler, &state);
2622         if (ret != 0) {
2623                 return ret;
2624         }
2625
2626         while (true) {
2627                 ret = ctdb_message_takeover_run(mem_ctx, ctdb->ev,
2628                                                 ctdb->client,
2629                                                 CTDB_BROADCAST_CONNECTED,
2630                                                 &msg);
2631                 if (ret != 0) {
2632                         goto fail;
2633                 }
2634
2635                 ret = ctdb_client_wait_timeout(ctdb->ev, &state.done,
2636                                                TIMEOUT());
2637                 if (ret != 0) {
2638                         continue;
2639                 }
2640
2641                 if (state.status >= 0) {
2642                         ret = 0;
2643                 } else {
2644                         ret = state.status;
2645                 }
2646                 break;
2647         }
2648
2649 fail:
2650         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
2651                                            msg.srvid, &state);
2652         return ret;
2653 }
2654
2655 static int control_disable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2656                            int argc, const char **argv)
2657 {
2658         int ret;
2659
2660         if (argc != 0) {
2661                 usage("disable");
2662         }
2663
2664         ret = check_flags(mem_ctx, ctdb, "disabled",
2665                           NODE_FLAGS_PERMANENTLY_DISABLED, true);
2666         if (ret == 0) {
2667                 return 0;
2668         }
2669
2670         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2671                                  ctdb->cmd_pnn, TIMEOUT(),
2672                                  NODE_FLAGS_PERMANENTLY_DISABLED, 0);
2673         if (ret != 0) {
2674                 fprintf(stderr,
2675                         "Failed to set DISABLED flag on node %u\n",
2676                         ctdb->cmd_pnn);
2677                 return ret;
2678         }
2679
2680         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, true);
2681         return ipreallocate(mem_ctx, ctdb);
2682 }
2683
2684 static int control_enable(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2685                           int argc, const char **argv)
2686 {
2687         int ret;
2688
2689         if (argc != 0) {
2690                 usage("enable");
2691         }
2692
2693         ret = check_flags(mem_ctx, ctdb, "disabled",
2694                           NODE_FLAGS_PERMANENTLY_DISABLED, false);
2695         if (ret == 0) {
2696                 return 0;
2697         }
2698
2699         ret = ctdb_ctrl_modflags(mem_ctx, ctdb->ev, ctdb->client,
2700                                  ctdb->cmd_pnn, TIMEOUT(),
2701                                  0, NODE_FLAGS_PERMANENTLY_DISABLED);
2702         if (ret != 0) {
2703                 fprintf(stderr, "Failed to reset DISABLED flag on node %u\n",
2704                         ctdb->cmd_pnn);
2705                 return ret;
2706         }
2707
2708         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_PERMANENTLY_DISABLED, false);
2709         return ipreallocate(mem_ctx, ctdb);
2710 }
2711
2712 static int control_stop(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2713                         int argc, const char **argv)
2714 {
2715         int ret;
2716
2717         if (argc != 0) {
2718                 usage("stop");
2719         }
2720
2721         ret = check_flags(mem_ctx, ctdb, "stopped",
2722                           NODE_FLAGS_STOPPED, true);
2723         if (ret == 0) {
2724                 return 0;
2725         }
2726
2727         ret = ctdb_ctrl_stop_node(mem_ctx, ctdb->ev, ctdb->client,
2728                                   ctdb->cmd_pnn, TIMEOUT());
2729         if (ret != 0) {
2730                 fprintf(stderr, "Failed to stop node %u\n", ctdb->cmd_pnn);
2731                 return ret;
2732         }
2733
2734         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, true);
2735         return ipreallocate(mem_ctx, ctdb);
2736 }
2737
2738 static int control_continue(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2739                             int argc, const char **argv)
2740 {
2741         int ret;
2742
2743         if (argc != 0) {
2744                 usage("continue");
2745         }
2746
2747         ret = check_flags(mem_ctx, ctdb, "stopped",
2748                           NODE_FLAGS_STOPPED, false);
2749         if (ret == 0) {
2750                 return 0;
2751         }
2752
2753         ret = ctdb_ctrl_continue_node(mem_ctx, ctdb->ev, ctdb->client,
2754                                       ctdb->cmd_pnn, TIMEOUT());
2755         if (ret != 0) {
2756                 fprintf(stderr, "Failed to continue stopped node %u\n",
2757                         ctdb->cmd_pnn);
2758                 return ret;
2759         }
2760
2761         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_STOPPED, false);
2762         return ipreallocate(mem_ctx, ctdb);
2763 }
2764
2765 static int control_ban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2766                        int argc, const char **argv)
2767 {
2768         struct ctdb_ban_state ban_state;
2769         int ret;
2770
2771         if (argc != 1) {
2772                 usage("ban");
2773         }
2774
2775         ret = check_flags(mem_ctx, ctdb, "banned",
2776                           NODE_FLAGS_BANNED, true);
2777         if (ret == 0) {
2778                 return 0;
2779         }
2780
2781         ban_state.pnn = ctdb->cmd_pnn;
2782         ban_state.time = strtoul(argv[0], NULL, 0);
2783
2784         if (ban_state.time == 0) {
2785                 fprintf(stderr, "Ban time cannot be zero\n");
2786                 return EINVAL;
2787         }
2788
2789         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2790                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2791         if (ret != 0) {
2792                 fprintf(stderr, "Failed to ban node %u\n", ctdb->cmd_pnn);
2793                 return ret;
2794         }
2795
2796         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, true);
2797         return ipreallocate(mem_ctx, ctdb);
2798
2799 }
2800
2801 static int control_unban(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2802                          int argc, const char **argv)
2803 {
2804         struct ctdb_ban_state ban_state;
2805         int ret;
2806
2807         if (argc != 0) {
2808                 usage("unban");
2809         }
2810
2811         ret = check_flags(mem_ctx, ctdb, "banned",
2812                           NODE_FLAGS_BANNED, false);
2813         if (ret == 0) {
2814                 return 0;
2815         }
2816
2817         ban_state.pnn = ctdb->cmd_pnn;
2818         ban_state.time = 0;
2819
2820         ret = ctdb_ctrl_set_ban_state(mem_ctx, ctdb->ev, ctdb->client,
2821                                       ctdb->cmd_pnn, TIMEOUT(), &ban_state);
2822         if (ret != 0) {
2823                 fprintf(stderr, "Failed to unban node %u\n", ctdb->cmd_pnn);
2824                 return ret;
2825         }
2826
2827         wait_for_flags(mem_ctx, ctdb, NODE_FLAGS_BANNED, false);
2828         return ipreallocate(mem_ctx, ctdb);
2829
2830 }
2831
2832 static void wait_for_shutdown(void *private_data)
2833 {
2834         bool *done = (bool *)private_data;
2835
2836         *done = true;
2837 }
2838
2839 static int control_shutdown(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2840                             int argc, const char **argv)
2841 {
2842         int ret;
2843         bool done = false;
2844
2845         if (argc != 0) {
2846                 usage("shutdown");
2847         }
2848
2849         if (ctdb->pnn == ctdb->cmd_pnn) {
2850                 ctdb_client_set_disconnect_callback(ctdb->client,
2851                                                     wait_for_shutdown,
2852                                                     &done);
2853         }
2854
2855         ret = ctdb_ctrl_shutdown(mem_ctx, ctdb->ev, ctdb->client,
2856                                  ctdb->cmd_pnn, TIMEOUT());
2857         if (ret != 0) {
2858                 fprintf(stderr, "Unable to shutdown node %u\n", ctdb->cmd_pnn);
2859                 return ret;
2860         }
2861
2862         if (ctdb->pnn == ctdb->cmd_pnn) {
2863                 ctdb_client_wait(ctdb->ev, &done);
2864         }
2865
2866         return 0;
2867 }
2868
2869 static int get_generation(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2870                           uint32_t *generation)
2871 {
2872         uint32_t recmaster;
2873         int recmode;
2874         struct ctdb_vnn_map *vnnmap;
2875         int ret;
2876
2877 again:
2878         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2879                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
2880         if (ret != 0) {
2881                 fprintf(stderr, "Failed to find recovery master\n");
2882                 return ret;
2883         }
2884
2885         ret = ctdb_ctrl_get_recmode(mem_ctx, ctdb->ev, ctdb->client,
2886                                     recmaster, TIMEOUT(), &recmode);
2887         if (ret != 0) {
2888                 fprintf(stderr, "Failed to get recovery mode from node %u\n",
2889                         recmaster);
2890                 return ret;
2891         }
2892
2893         if (recmode == CTDB_RECOVERY_ACTIVE) {
2894                 sleep(1);
2895                 goto again;
2896         }
2897
2898         ret = ctdb_ctrl_getvnnmap(mem_ctx, ctdb->ev, ctdb->client,
2899                                   recmaster, TIMEOUT(), &vnnmap);
2900         if (ret != 0) {
2901                 fprintf(stderr, "Failed to get generation from node %u\n",
2902                         recmaster);
2903                 return ret;
2904         }
2905
2906         if (vnnmap->generation == INVALID_GENERATION) {
2907                 talloc_free(vnnmap);
2908                 sleep(1);
2909                 goto again;
2910         }
2911
2912         *generation = vnnmap->generation;
2913         talloc_free(vnnmap);
2914         return 0;
2915 }
2916
2917
2918 static int control_recover(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2919                            int argc, const char **argv)
2920 {
2921         uint32_t generation, next_generation;
2922         int ret;
2923
2924         if (argc != 0) {
2925                 usage("recover");
2926         }
2927
2928         ret = get_generation(mem_ctx, ctdb, &generation);
2929         if (ret != 0) {
2930                 return ret;
2931         }
2932
2933         ret = ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
2934                                     ctdb->cmd_pnn, TIMEOUT(),
2935                                     CTDB_RECOVERY_ACTIVE);
2936         if (ret != 0) {
2937                 fprintf(stderr, "Failed to set recovery mode active\n");
2938                 return ret;
2939         }
2940
2941         while (1) {
2942                 ret = get_generation(mem_ctx, ctdb, &next_generation);
2943                 if (ret != 0) {
2944                         fprintf(stderr,
2945                                 "Failed to confirm end of recovery\n");
2946                         return ret;
2947                 }
2948
2949                 if (next_generation != generation) {
2950                         break;
2951                 }
2952
2953                 sleep (1);
2954         }
2955
2956         return 0;
2957 }
2958
2959 static int control_ipreallocate(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2960                                 int argc, const char **argv)
2961 {
2962         if (argc != 0) {
2963                 usage("ipreallocate");
2964         }
2965
2966         return ipreallocate(mem_ctx, ctdb);
2967 }
2968
2969 static int control_isnotrecmaster(TALLOC_CTX *mem_ctx,
2970                                   struct ctdb_context *ctdb,
2971                                   int argc, const char **argv)
2972 {
2973         uint32_t recmaster;
2974         int ret;
2975
2976         if (argc != 0) {
2977                 usage("isnotrecmaster");
2978         }
2979
2980         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
2981                                       ctdb->pnn, TIMEOUT(), &recmaster);
2982         if (ret != 0) {
2983                 fprintf(stderr, "Failed to get recmaster\n");
2984                 return ret;
2985         }
2986
2987         if (recmaster != ctdb->pnn) {
2988                 printf("this node is not the recmaster\n");
2989                 return 1;
2990         }
2991
2992         printf("this node is the recmaster\n");
2993         return 0;
2994 }
2995
2996 static int control_gratarp(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
2997                            int argc, const char **argv)
2998 {
2999         struct ctdb_addr_info addr_info;
3000         int ret;
3001
3002         if (argc != 2) {
3003                 usage("gratarp");
3004         }
3005
3006         ret = ctdb_sock_addr_from_string(argv[0], &addr_info.addr, false);
3007         if (ret != 0) {
3008                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3009                 return 1;
3010         }
3011         addr_info.iface = argv[1];
3012
3013         ret = ctdb_ctrl_send_gratuitous_arp(mem_ctx, ctdb->ev, ctdb->client,
3014                                             ctdb->cmd_pnn, TIMEOUT(),
3015                                             &addr_info);
3016         if (ret != 0) {
3017                 fprintf(stderr, "Unable to send gratuitous arp from node %u\n",
3018                         ctdb->cmd_pnn);
3019                 return ret;
3020         }
3021
3022         return 0;
3023 }
3024
3025 static int control_tickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3026                            int argc, const char **argv)
3027 {
3028         ctdb_sock_addr src, dst;
3029         int ret;
3030
3031         if (argc != 0 && argc != 2) {
3032                 usage("tickle");
3033         }
3034
3035         if (argc == 0) {
3036                 struct ctdb_connection_list *clist;
3037                 int i;
3038                 unsigned int num_failed;
3039
3040                 /* Client first but the src/dst logic is confused */
3041                 ret = ctdb_connection_list_read(mem_ctx, 0, false, &clist);
3042                 if (ret != 0) {
3043                         return ret;
3044                 }
3045
3046                 num_failed = 0;
3047                 for (i = 0; i < clist->num; i++) {
3048                         ret = ctdb_sys_send_tcp(&clist->conn[i].src,
3049                                                 &clist->conn[i].dst,
3050                                                 0, 0, 0);
3051                         if (ret != 0) {
3052                                 num_failed += 1;
3053                         }
3054                 }
3055
3056                 TALLOC_FREE(clist);
3057
3058                 if (num_failed > 0) {
3059                         fprintf(stderr, "Failed to send %d tickles\n",
3060                                 num_failed);
3061                         return 1;
3062                 }
3063
3064                 return 0;
3065         }
3066
3067
3068         ret = ctdb_sock_addr_from_string(argv[0], &src, true);
3069         if (ret != 0) {
3070                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3071                 return 1;
3072         }
3073
3074         ret = ctdb_sock_addr_from_string(argv[1], &dst, true);
3075         if (ret != 0) {
3076                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3077                 return 1;
3078         }
3079
3080         ret = ctdb_sys_send_tcp(&src, &dst, 0, 0, 0);
3081         if (ret != 0) {
3082                 fprintf(stderr, "Failed to send tickle ack\n");
3083                 return ret;
3084         }
3085
3086         return 0;
3087 }
3088
3089 static int control_gettickles(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3090                               int argc, const char **argv)
3091 {
3092         ctdb_sock_addr addr;
3093         struct ctdb_tickle_list *tickles;
3094         unsigned port = 0;
3095         int ret, i;
3096
3097         if (argc < 1 || argc > 2) {
3098                 usage("gettickles");
3099         }
3100
3101         if (argc == 2) {
3102                 port = strtoul(argv[1], NULL, 10);
3103         }
3104
3105         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
3106         if (ret != 0) {
3107                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3108                 return 1;
3109         }
3110         ctdb_sock_addr_set_port(&addr, port);
3111
3112         ret = ctdb_ctrl_get_tcp_tickle_list(mem_ctx, ctdb->ev, ctdb->client,
3113                                             ctdb->cmd_pnn, TIMEOUT(), &addr,
3114                                             &tickles);
3115         if (ret != 0) {
3116                 fprintf(stderr, "Failed to get list of connections\n");
3117                 return ret;
3118         }
3119
3120         if (options.machinereadable) {
3121                 printf("%s%s%s%s%s%s%s%s%s\n",
3122                        options.sep,
3123                        "Source IP", options.sep,
3124                        "Port", options.sep,
3125                        "Destiation IP", options.sep,
3126                        "Port", options.sep);
3127                 for (i=0; i<tickles->num; i++) {
3128                         printf("%s%s%s%u%s%s%s%u%s\n", options.sep,
3129                                ctdb_sock_addr_to_string(
3130                                        mem_ctx, &tickles->conn[i].src, false),
3131                                options.sep,
3132                                ntohs(tickles->conn[i].src.ip.sin_port),
3133                                options.sep,
3134                                ctdb_sock_addr_to_string(
3135                                        mem_ctx, &tickles->conn[i].dst, false),
3136                                options.sep,
3137                                ntohs(tickles->conn[i].dst.ip.sin_port),
3138                                options.sep);
3139                 }
3140         } else {
3141                 printf("Connections for IP: %s\n",
3142                        ctdb_sock_addr_to_string(mem_ctx,
3143                                                 &tickles->addr, false));
3144                 printf("Num connections: %u\n", tickles->num);
3145                 for (i=0; i<tickles->num; i++) {
3146                         printf("SRC: %s   DST: %s\n",
3147                                ctdb_sock_addr_to_string(
3148                                        mem_ctx, &tickles->conn[i].src, true),
3149                                ctdb_sock_addr_to_string(
3150                                        mem_ctx, &tickles->conn[i].dst, true));
3151                 }
3152         }
3153
3154         talloc_free(tickles);
3155         return 0;
3156 }
3157
3158 typedef void (*clist_request_func)(struct ctdb_req_control *request,
3159                                    struct ctdb_connection *conn);
3160
3161 typedef int (*clist_reply_func)(struct ctdb_reply_control *reply);
3162
3163 struct process_clist_state {
3164         struct ctdb_connection_list *clist;
3165         int count;
3166         int num_failed, num_total;
3167         clist_reply_func reply_func;
3168 };
3169
3170 static void process_clist_done(struct tevent_req *subreq);
3171
3172 static struct tevent_req *process_clist_send(
3173                                         TALLOC_CTX *mem_ctx,
3174                                         struct ctdb_context *ctdb,
3175                                         struct ctdb_connection_list *clist,
3176                                         clist_request_func request_func,
3177                                         clist_reply_func reply_func)
3178 {
3179         struct tevent_req *req, *subreq;
3180         struct process_clist_state *state;
3181         struct ctdb_req_control request;
3182         int i;
3183
3184         req = tevent_req_create(mem_ctx, &state, struct process_clist_state);
3185         if (req == NULL) {
3186                 return NULL;
3187         }
3188
3189         state->clist = clist;
3190         state->reply_func = reply_func;
3191
3192         for (i = 0; i < clist->num; i++) {
3193                 request_func(&request, &clist->conn[i]);
3194                 subreq = ctdb_client_control_send(state, ctdb->ev,
3195                                                   ctdb->client, ctdb->cmd_pnn,
3196                                                   TIMEOUT(), &request);
3197                 if (tevent_req_nomem(subreq, req)) {
3198                         return tevent_req_post(req, ctdb->ev);
3199                 }
3200                 tevent_req_set_callback(subreq, process_clist_done, req);
3201         }
3202
3203         return req;
3204 }
3205
3206 static void process_clist_done(struct tevent_req *subreq)
3207 {
3208         struct tevent_req *req = tevent_req_callback_data(
3209                 subreq, struct tevent_req);
3210         struct process_clist_state *state = tevent_req_data(
3211                 req, struct process_clist_state);
3212         struct ctdb_reply_control *reply;
3213         int ret;
3214         bool status;
3215
3216         status = ctdb_client_control_recv(subreq, NULL, state, &reply);
3217         TALLOC_FREE(subreq);
3218         if (! status) {
3219                 state->num_failed += 1;
3220                 goto done;
3221         }
3222
3223         ret = state->reply_func(reply);
3224         if (ret != 0) {
3225                 state->num_failed += 1;
3226                 goto done;
3227         }
3228
3229 done:
3230         state->num_total += 1;
3231         if (state->num_total == state->clist->num) {
3232                 tevent_req_done(req);
3233         }
3234 }
3235
3236 static int process_clist_recv(struct tevent_req *req)
3237 {
3238         struct process_clist_state *state = tevent_req_data(
3239                 req, struct process_clist_state);
3240
3241         return state->num_failed;
3242 }
3243
3244 static int control_addtickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3245                              int argc, const char **argv)
3246 {
3247         struct ctdb_connection conn;
3248         int ret;
3249
3250         if (argc != 0 && argc != 2) {
3251                 usage("addtickle");
3252         }
3253
3254         if (argc == 0) {
3255                 struct ctdb_connection_list *clist;
3256                 struct tevent_req *req;
3257
3258                 /* Client first but the src/dst logic is confused */
3259                 ret = ctdb_connection_list_read(mem_ctx, 0, false, &clist);
3260                 if (ret != 0) {
3261                         return ret;
3262                 }
3263                 if (clist->num == 0) {
3264                         return 0;
3265                 }
3266
3267                 req = process_clist_send(mem_ctx, ctdb, clist,
3268                                  ctdb_req_control_tcp_add_delayed_update,
3269                                  ctdb_reply_control_tcp_add_delayed_update);
3270                 if (req == NULL) {
3271                         talloc_free(clist);
3272                         return ENOMEM;
3273                 }
3274
3275                 tevent_req_poll(req, ctdb->ev);
3276                 talloc_free(clist);
3277
3278                 ret = process_clist_recv(req);
3279                 if (ret != 0) {
3280                         fprintf(stderr, "Failed to add %d tickles\n", ret);
3281                         return 1;
3282                 }
3283
3284                 return 0;
3285         }
3286
3287         ret = ctdb_sock_addr_from_string(argv[0], &conn.src, true);
3288         if (ret != 0) {
3289                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3290                 return 1;
3291         }
3292         ret = ctdb_sock_addr_from_string(argv[1], &conn.dst, true);
3293         if (ret != 0) {
3294                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3295                 return 1;
3296         }
3297
3298         ret = ctdb_ctrl_tcp_add_delayed_update(mem_ctx, ctdb->ev,
3299                                                ctdb->client, ctdb->cmd_pnn,
3300                                                TIMEOUT(), &conn);
3301         if (ret != 0) {
3302                 fprintf(stderr, "Failed to register connection\n");
3303                 return ret;
3304         }
3305
3306         return 0;
3307 }
3308
3309 static int control_deltickle(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3310                              int argc, const char **argv)
3311 {
3312         struct ctdb_connection conn;
3313         int ret;
3314
3315         if (argc != 0 && argc != 2) {
3316                 usage("deltickle");
3317         }
3318
3319         if (argc == 0) {
3320                 struct ctdb_connection_list *clist;
3321                 struct tevent_req *req;
3322
3323                 /* Client first but the src/dst logic is confused */
3324                 ret = ctdb_connection_list_read(mem_ctx, 0, false, &clist);
3325                 if (ret != 0) {
3326                         return ret;
3327                 }
3328                 if (clist->num == 0) {
3329                         return 0;
3330                 }
3331
3332                 req = process_clist_send(mem_ctx, ctdb, clist,
3333                                          ctdb_req_control_tcp_remove,
3334                                          ctdb_reply_control_tcp_remove);
3335                 if (req == NULL) {
3336                         talloc_free(clist);
3337                         return ENOMEM;
3338                 }
3339
3340                 tevent_req_poll(req, ctdb->ev);
3341                 talloc_free(clist);
3342
3343                 ret = process_clist_recv(req);
3344                 if (ret != 0) {
3345                         fprintf(stderr, "Failed to remove %d tickles\n", ret);
3346                         return 1;
3347                 }
3348
3349                 return 0;
3350         }
3351
3352         ret = ctdb_sock_addr_from_string(argv[0], &conn.src, true);
3353         if (ret != 0) {
3354                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3355                 return 1;
3356         }
3357         ret = ctdb_sock_addr_from_string(argv[1], &conn.dst, true);
3358         if (ret != 0) {
3359                 fprintf(stderr, "Invalid IP address %s\n", argv[1]);
3360                 return 1;
3361         }
3362
3363         ret = ctdb_ctrl_tcp_remove(mem_ctx, ctdb->ev, ctdb->client,
3364                                    ctdb->cmd_pnn, TIMEOUT(), &conn);
3365         if (ret != 0) {
3366                 fprintf(stderr, "Failed to unregister connection\n");
3367                 return ret;
3368         }
3369
3370         return 0;
3371 }
3372
3373 static int control_listnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3374                              int argc, const char **argv)
3375 {
3376         struct ctdb_node_map *nodemap;
3377         int i;
3378
3379         if (argc != 0) {
3380                 usage("listnodes");
3381         }
3382
3383         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
3384         if (nodemap == NULL) {
3385                 return 1;
3386         }
3387
3388         for (i=0; i<nodemap->num; i++) {
3389                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
3390                         continue;
3391                 }
3392
3393                 if (options.machinereadable) {
3394                         printf("%s%u%s%s%s\n", options.sep,
3395                                nodemap->node[i].pnn, options.sep,
3396                                ctdb_sock_addr_to_string(
3397                                        mem_ctx, &nodemap->node[i].addr, false),
3398                                options.sep);
3399                 } else {
3400                         printf("%s\n",
3401                                ctdb_sock_addr_to_string(
3402                                        mem_ctx, &nodemap->node[i].addr, false));
3403                 }
3404         }
3405
3406         return 0;
3407 }
3408
3409 static bool nodemap_identical(struct ctdb_node_map *nodemap1,
3410                               struct ctdb_node_map *nodemap2)
3411 {
3412         int i;
3413
3414         if (nodemap1->num != nodemap2->num) {
3415                 return false;
3416         }
3417
3418         for (i=0; i<nodemap1->num; i++) {
3419                 struct ctdb_node_and_flags *n1, *n2;
3420
3421                 n1 = &nodemap1->node[i];
3422                 n2 = &nodemap2->node[i];
3423
3424                 if ((n1->pnn != n2->pnn) ||
3425                     (n1->flags != n2->flags) ||
3426                     ! ctdb_sock_addr_same_ip(&n1->addr, &n2->addr)) {
3427                         return false;
3428                 }
3429         }
3430
3431         return true;
3432 }
3433
3434 static int check_node_file_changes(TALLOC_CTX *mem_ctx,
3435                                    struct ctdb_node_map *nm,
3436                                    struct ctdb_node_map *fnm,
3437                                    bool *reload)
3438 {
3439         int i;
3440         bool check_failed = false;
3441
3442         *reload = false;
3443
3444         for (i=0; i<nm->num; i++) {
3445                 if (i >= fnm->num) {
3446                         fprintf(stderr,
3447                                 "Node %u (%s) missing from nodes file\n",
3448                                 nm->node[i].pnn,
3449                                 ctdb_sock_addr_to_string(
3450                                         mem_ctx, &nm->node[i].addr, false));
3451                         check_failed = true;
3452                         continue;
3453                 }
3454                 if (nm->node[i].flags & NODE_FLAGS_DELETED &&
3455                     fnm->node[i].flags & NODE_FLAGS_DELETED) {
3456                         /* Node remains deleted */
3457                         continue;
3458                 }
3459
3460                 if (! (nm->node[i].flags & NODE_FLAGS_DELETED) &&
3461                     ! (fnm->node[i].flags & NODE_FLAGS_DELETED)) {
3462                         /* Node not newly nor previously deleted */
3463                         if (! ctdb_same_ip(&nm->node[i].addr,
3464                                            &fnm->node[i].addr)) {
3465                                 fprintf(stderr,
3466                                         "Node %u has changed IP address"
3467                                         " (was %s, now %s)\n",
3468                                         nm->node[i].pnn,
3469                                         ctdb_sock_addr_to_string(
3470                                                 mem_ctx,
3471                                                 &nm->node[i].addr, false),
3472                                         ctdb_sock_addr_to_string(
3473                                                 mem_ctx,
3474                                                 &fnm->node[i].addr, false));
3475                                 check_failed = true;
3476                         } else {
3477                                 if (nm->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3478                                         fprintf(stderr,
3479                                                 "WARNING: Node %u is disconnected."
3480                                                 " You MUST fix this node manually!\n",
3481                                                 nm->node[i].pnn);
3482                                 }
3483                         }
3484                         continue;
3485                 }
3486
3487                 if (fnm->node[i].flags & NODE_FLAGS_DELETED) {
3488                         /* Node is being deleted */
3489                         printf("Node %u is DELETED\n", nm->node[i].pnn);
3490                         *reload = true;
3491                         if (! (nm->node[i].flags & NODE_FLAGS_DISCONNECTED)) {
3492                                 fprintf(stderr,
3493                                         "ERROR: Node %u is still connected\n",
3494                                         nm->node[i].pnn);
3495                                 check_failed = true;
3496                         }
3497                         continue;
3498                 }
3499
3500                 if (nm->node[i].flags & NODE_FLAGS_DELETED) {
3501                         /* Node was previously deleted */
3502                         printf("Node %u is UNDELETED\n", nm->node[i].pnn);
3503                         *reload = true;
3504                 }
3505         }
3506
3507         if (check_failed) {
3508                 fprintf(stderr,
3509                         "ERROR: Nodes will not be reloaded due to previous error\n");
3510                 return 1;
3511         }
3512
3513         /* Leftover nodes in file are NEW */
3514         for (; i < fnm->num; i++) {
3515                 printf("Node %u is NEW\n", fnm->node[i].pnn);
3516                 *reload = true;
3517         }
3518
3519         return 0;
3520 }
3521
3522 struct disable_recoveries_state {
3523         uint32_t *pnn_list;
3524         int node_count;
3525         bool *reply;
3526         int status;
3527         bool done;
3528 };
3529
3530 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
3531                                        void *private_data)
3532 {
3533         struct disable_recoveries_state *state =
3534                 (struct disable_recoveries_state *)private_data;
3535         int ret, i;
3536
3537         if (data.dsize != sizeof(int)) {
3538                 /* Ignore packet */
3539                 return;
3540         }
3541
3542         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
3543         ret = *(int *)data.dptr;
3544         if (ret < 0) {
3545                 state->status = ret;
3546                 state->done = true;
3547                 return;
3548         }
3549         for (i=0; i<state->node_count; i++) {
3550                 if (state->pnn_list[i] == ret) {
3551                         state->reply[i] = true;
3552                         break;
3553                 }
3554         }
3555
3556         state->done = true;
3557         for (i=0; i<state->node_count; i++) {
3558                 if (! state->reply[i]) {
3559                         state->done = false;
3560                         break;
3561                 }
3562         }
3563 }
3564
3565 static int disable_recoveries(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3566                               uint32_t timeout, uint32_t *pnn_list, int count)
3567 {
3568         struct ctdb_disable_message disable = { 0 };
3569         struct disable_recoveries_state state;
3570         int ret, i;
3571
3572         disable.pnn = ctdb->pnn;
3573         disable.srvid = next_srvid(ctdb);
3574         disable.timeout = timeout;
3575
3576         state.pnn_list = pnn_list;
3577         state.node_count = count;
3578         state.done = false;
3579         state.status = 0;
3580         state.reply = talloc_zero_array(mem_ctx, bool, count);
3581         if (state.reply == NULL) {
3582                 return ENOMEM;
3583         }
3584
3585         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
3586                                               disable.srvid,
3587                                               disable_recoveries_handler,
3588                                               &state);
3589         if (ret != 0) {
3590                 return ret;
3591         }
3592
3593         for (i=0; i<count; i++) {
3594                 ret = ctdb_message_disable_recoveries(mem_ctx, ctdb->ev,
3595                                                       ctdb->client,
3596                                                       pnn_list[i],
3597                                                       &disable);
3598                 if (ret != 0) {
3599                         goto fail;
3600                 }
3601         }
3602
3603         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
3604         if (ret == ETIME) {
3605                 fprintf(stderr, "Timed out waiting to disable recoveries\n");
3606         } else {
3607                 ret = (state.status >= 0 ? 0 : 1);
3608         }
3609
3610 fail:
3611         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
3612                                            disable.srvid, &state);
3613         return ret;
3614 }
3615
3616 static int control_reloadnodes(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3617                                int argc, const char **argv)
3618 {
3619         struct ctdb_node_map *nodemap = NULL;
3620         struct ctdb_node_map *file_nodemap;
3621         struct ctdb_node_map *remote_nodemap;
3622         struct ctdb_req_control request;
3623         struct ctdb_reply_control **reply;
3624         bool reload;
3625         int ret, i;
3626         uint32_t *pnn_list;
3627         int count;
3628
3629         nodemap = get_nodemap(ctdb, false);
3630         if (nodemap == NULL) {
3631                 return 1;
3632         }
3633
3634         file_nodemap = read_nodes_file(mem_ctx, ctdb->pnn);
3635         if (file_nodemap == NULL) {
3636                 return 1;
3637         }
3638
3639         for (i=0; i<nodemap->num; i++) {
3640                 if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
3641                         continue;
3642                 }
3643
3644                 ret = ctdb_ctrl_get_nodes_file(mem_ctx, ctdb->ev, ctdb->client,
3645                                                nodemap->node[i].pnn, TIMEOUT(),
3646                                                &remote_nodemap);
3647                 if (ret != 0) {
3648                         fprintf(stderr,
3649                                 "ERROR: Failed to get nodes file from node %u\n",
3650                                 nodemap->node[i].pnn);
3651                         return ret;
3652                 }
3653
3654                 if (! nodemap_identical(file_nodemap, remote_nodemap)) {
3655                         fprintf(stderr,
3656                                 "ERROR: Nodes file on node %u differs"
3657                                  " from current node (%u)\n",
3658                                  nodemap->node[i].pnn, ctdb->pnn);
3659                         return 1;
3660                 }
3661         }
3662
3663         ret = check_node_file_changes(mem_ctx, nodemap, file_nodemap, &reload);
3664         if (ret != 0) {
3665                 return ret;
3666         }
3667
3668         if (! reload) {
3669                 fprintf(stderr, "No change in nodes file,"
3670                                 " skipping unnecessary reload\n");
3671                 return 0;
3672         }
3673
3674         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
3675                                         mem_ctx, &pnn_list);
3676         if (count <= 0) {
3677                 fprintf(stderr, "Memory allocation error\n");
3678                 return 1;
3679         }
3680
3681         ret = disable_recoveries(mem_ctx, ctdb, 2*options.timelimit,
3682                                  pnn_list, count);
3683         if (ret != 0) {
3684                 fprintf(stderr, "Failed to disable recoveries\n");
3685                 return ret;
3686         }
3687
3688         ctdb_req_control_reload_nodes_file(&request);
3689         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3690                                         pnn_list, count, TIMEOUT(),
3691                                         &request, NULL, &reply);
3692         if (ret != 0) {
3693                 bool failed = false;
3694
3695                 for (i=0; i<count; i++) {
3696                         ret = ctdb_reply_control_reload_nodes_file(reply[i]);
3697                         if (ret != 0) {
3698                                 fprintf(stderr,
3699                                         "Node %u failed to reload nodes\n",
3700                                         pnn_list[i]);
3701                                 failed = true;
3702                         }
3703                 }
3704                 if (failed) {
3705                         fprintf(stderr,
3706                                 "You MUST fix failed nodes manually!\n");
3707                 }
3708         }
3709
3710         ret = disable_recoveries(mem_ctx, ctdb, 0, pnn_list, count);
3711         if (ret != 0) {
3712                 fprintf(stderr, "Failed to enable recoveries\n");
3713                 return ret;
3714         }
3715
3716         return 0;
3717 }
3718
3719 static int moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3720                   ctdb_sock_addr *addr, uint32_t pnn)
3721 {
3722         struct ctdb_public_ip_list *pubip_list;
3723         struct ctdb_public_ip pubip;
3724         struct ctdb_node_map *nodemap;
3725         struct ctdb_req_control request;
3726         uint32_t *pnn_list;
3727         int ret, i, count;
3728
3729         ret = ctdb_message_disable_ip_check(mem_ctx, ctdb->ev, ctdb->client,
3730                                             CTDB_BROADCAST_CONNECTED,
3731                                             2*options.timelimit);
3732         if (ret != 0) {
3733                 fprintf(stderr, "Failed to disable IP check\n");
3734                 return ret;
3735         }
3736
3737         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3738                                        pnn, TIMEOUT(), false, &pubip_list);
3739         if (ret != 0) {
3740                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3741                         pnn);
3742                 return ret;
3743         }
3744
3745         for (i=0; i<pubip_list->num; i++) {
3746                 if (ctdb_same_ip(addr, &pubip_list->ip[i].addr)) {
3747                         break;
3748                 }
3749         }
3750
3751         if (i == pubip_list->num) {
3752                 fprintf(stderr, "Node %u CANNOT host IP address %s\n",
3753                         pnn, ctdb_sock_addr_to_string(mem_ctx, addr, false));
3754                 return 1;
3755         }
3756
3757         nodemap = get_nodemap(ctdb, false);
3758         if (nodemap == NULL) {
3759                 return 1;
3760         }
3761
3762         count = list_of_active_nodes(nodemap, pnn, mem_ctx, &pnn_list);
3763         if (count <= 0) {
3764                 fprintf(stderr, "Memory allocation error\n");
3765                 return 1;
3766         }
3767
3768         pubip.pnn = pnn;
3769         pubip.addr = *addr;
3770         ctdb_req_control_release_ip(&request, &pubip);
3771
3772         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
3773                                         pnn_list, count, TIMEOUT(),
3774                                         &request, NULL, NULL);
3775         if (ret != 0) {
3776                 fprintf(stderr, "Failed to release IP on nodes\n");
3777                 return ret;
3778         }
3779
3780         ret = ctdb_ctrl_takeover_ip(mem_ctx, ctdb->ev, ctdb->client,
3781                                     pnn, TIMEOUT(), &pubip);
3782         if (ret != 0) {
3783                 fprintf(stderr, "Failed to takeover IP on node %u\n", pnn);
3784                 return ret;
3785         }
3786
3787         return 0;
3788 }
3789
3790 static int control_moveip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3791                           int argc, const char **argv)
3792 {
3793         ctdb_sock_addr addr;
3794         uint32_t pnn;
3795         int ret, retries = 0;
3796
3797         if (argc != 2) {
3798                 usage("moveip");
3799         }
3800
3801         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
3802         if (ret != 0) {
3803                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3804                 return 1;
3805         }
3806
3807         pnn = strtoul(argv[1], NULL, 10);
3808         if (pnn == CTDB_UNKNOWN_PNN) {
3809                 fprintf(stderr, "Invalid PNN %s\n", argv[1]);
3810                 return 1;
3811         }
3812
3813         while (retries < 5) {
3814                 ret = moveip(mem_ctx, ctdb, &addr, pnn);
3815                 if (ret == 0) {
3816                         break;
3817                 }
3818
3819                 sleep(3);
3820                 retries++;
3821         }
3822
3823         if (ret != 0) {
3824                 fprintf(stderr, "Failed to move IP %s to node %u\n",
3825                         argv[0], pnn);
3826                 return ret;
3827         }
3828
3829         return 0;
3830 }
3831
3832 static int rebalancenode(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3833                          uint32_t pnn)
3834 {
3835         int ret;
3836
3837         ret = ctdb_message_rebalance_node(mem_ctx, ctdb->ev, ctdb->client,
3838                                           CTDB_BROADCAST_CONNECTED, pnn);
3839         if (ret != 0) {
3840                 fprintf(stderr,
3841                         "Failed to ask recovery master to distribute IPs\n");
3842                 return ret;
3843         }
3844
3845         return 0;
3846 }
3847
3848 static int control_addip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3849                          int argc, const char **argv)
3850 {
3851         ctdb_sock_addr addr;
3852         struct ctdb_public_ip_list *pubip_list;
3853         struct ctdb_addr_info addr_info;
3854         unsigned int mask;
3855         int ret, i, retries = 0;
3856
3857         if (argc != 2) {
3858                 usage("addip");
3859         }
3860
3861         ret = ctdb_sock_addr_mask_from_string(argv[0], &addr, &mask);
3862         if (ret != 0) {
3863                 fprintf(stderr, "Invalid IP/Mask %s\n", argv[0]);
3864                 return 1;
3865         }
3866
3867         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3868                                        ctdb->cmd_pnn, TIMEOUT(),
3869                                        false, &pubip_list);
3870         if (ret != 0) {
3871                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3872                         ctdb->cmd_pnn);
3873                 return 1;
3874         }
3875
3876         for (i=0; i<pubip_list->num; i++) {
3877                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3878                         fprintf(stderr, "Node already knows about IP %s\n",
3879                                 ctdb_sock_addr_to_string(mem_ctx,
3880                                                          &addr, false));
3881                         return 0;
3882                 }
3883         }
3884
3885         addr_info.addr = addr;
3886         addr_info.mask = mask;
3887         addr_info.iface = argv[1];
3888
3889         while (retries < 5) {
3890                 ret = ctdb_ctrl_add_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3891                                               ctdb->cmd_pnn, TIMEOUT(),
3892                                               &addr_info);
3893                 if (ret == 0) {
3894                         break;
3895                 }
3896
3897                 sleep(3);
3898                 retries++;
3899         }
3900
3901         if (ret != 0) {
3902                 fprintf(stderr, "Failed to add public IP to node %u."
3903                                 " Giving up\n", ctdb->cmd_pnn);
3904                 return ret;
3905         }
3906
3907         ret = rebalancenode(mem_ctx, ctdb, ctdb->cmd_pnn);
3908         if (ret != 0) {
3909                 return ret;
3910         }
3911
3912         return 0;
3913 }
3914
3915 static int control_delip(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
3916                          int argc, const char **argv)
3917 {
3918         ctdb_sock_addr addr;
3919         struct ctdb_public_ip_list *pubip_list;
3920         struct ctdb_addr_info addr_info;
3921         int ret, i;
3922
3923         if (argc != 1) {
3924                 usage("delip");
3925         }
3926
3927         ret = ctdb_sock_addr_from_string(argv[0], &addr, false);
3928         if (ret != 0) {
3929                 fprintf(stderr, "Invalid IP address %s\n", argv[0]);
3930                 return 1;
3931         }
3932
3933         ret = ctdb_ctrl_get_public_ips(mem_ctx, ctdb->ev, ctdb->client,
3934                                        ctdb->cmd_pnn, TIMEOUT(),
3935                                        false, &pubip_list);
3936         if (ret != 0) {
3937                 fprintf(stderr, "Failed to get Public IPs from node %u\n",
3938                         ctdb->cmd_pnn);
3939                 return 1;
3940         }
3941
3942         for (i=0; i<pubip_list->num; i++) {
3943                 if (ctdb_same_ip(&addr, &pubip_list->ip[i].addr)) {
3944                         break;
3945                 }
3946         }
3947
3948         if (i == pubip_list->num) {
3949                 fprintf(stderr, "Node does not know about IP address %s\n",
3950                         ctdb_sock_addr_to_string(mem_ctx, &addr, false));
3951                 return 0;
3952         }
3953
3954         addr_info.addr = addr;
3955         addr_info.mask = 0;
3956         addr_info.iface = NULL;
3957
3958         ret = ctdb_ctrl_del_public_ip(mem_ctx, ctdb->ev, ctdb->client,
3959                                       ctdb->cmd_pnn, TIMEOUT(), &addr_info);
3960         if (ret != 0) {
3961                 fprintf(stderr, "Failed to delete public IP from node %u\n",
3962                         ctdb->cmd_pnn);
3963                 return ret;
3964         }
3965
3966         return 0;
3967 }
3968
3969 #define DB_VERSION      3
3970 #define MAX_DB_NAME     64
3971 #define MAX_REC_BUFFER_SIZE     (100*1000)
3972
3973 struct db_header {
3974         unsigned long version;
3975         time_t timestamp;
3976         unsigned long flags;
3977         unsigned long nbuf;
3978         unsigned long nrec;
3979         char name[MAX_DB_NAME];
3980 };
3981
3982 struct backup_state {
3983         TALLOC_CTX *mem_ctx;
3984         struct ctdb_rec_buffer *recbuf;
3985         uint32_t db_id;
3986         int fd;
3987         unsigned int nbuf, nrec;
3988 };
3989
3990 static int backup_handler(uint32_t reqid, struct ctdb_ltdb_header *header,
3991                           TDB_DATA key, TDB_DATA data, void *private_data)
3992 {
3993         struct backup_state *state = (struct backup_state *)private_data;
3994         size_t len;
3995         int ret;
3996
3997         if (state->recbuf == NULL) {
3998                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
3999                                                      state->db_id);
4000                 if (state->recbuf == NULL) {
4001                         return ENOMEM;
4002                 }
4003         }
4004
4005         ret = ctdb_rec_buffer_add(state->recbuf, state->recbuf, reqid,
4006                                   header, key, data);
4007         if (ret != 0) {
4008                 return ret;
4009         }
4010
4011         len = ctdb_rec_buffer_len(state->recbuf);
4012         if (len < MAX_REC_BUFFER_SIZE) {
4013                 return 0;
4014         }
4015
4016         ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
4017         if (ret != 0) {
4018                 fprintf(stderr, "Failed to write records to backup file\n");
4019                 return ret;
4020         }
4021
4022         state->nbuf += 1;
4023         state->nrec += state->recbuf->count;
4024         TALLOC_FREE(state->recbuf);
4025
4026         return 0;
4027 }
4028
4029 static int control_backupdb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4030                             int argc, const char **argv)
4031 {
4032         const char *db_name;
4033         struct ctdb_db_context *db;
4034         uint32_t db_id;
4035         uint8_t db_flags;
4036         struct backup_state state;
4037         struct db_header db_hdr;
4038         int fd, ret;
4039
4040         if (argc != 2) {
4041                 usage("backupdb");
4042         }
4043
4044         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4045                 return 1;
4046         }
4047
4048         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4049                           db_flags, &db);
4050         if (ret != 0) {
4051                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4052                 return ret;
4053         }
4054
4055         fd = open(argv[1], O_RDWR|O_CREAT, 0600);
4056         if (fd == -1) {
4057                 ret = errno;
4058                 fprintf(stderr, "Failed to open file %s for writing\n",
4059                         argv[1]);
4060                 return ret;
4061         }
4062
4063         /* Write empty header first */
4064         ZERO_STRUCT(db_hdr);
4065         ret = write(fd, &db_hdr, sizeof(struct db_header));
4066         if (ret == -1) {
4067                 ret = errno;
4068                 close(fd);
4069                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4070                 return ret;
4071         }
4072
4073         state.mem_ctx = mem_ctx;
4074         state.recbuf = NULL;
4075         state.fd = fd;
4076         state.nbuf = 0;
4077         state.nrec = 0;
4078
4079         ret = ctdb_db_traverse_local(db, true, false, backup_handler, &state);
4080         if (ret != 0) {
4081                 fprintf(stderr, "Failed to collect records from DB %s\n",
4082                         db_name);
4083                 close(fd);
4084                 return ret;
4085         }
4086
4087         if (state.recbuf != NULL) {
4088                 ret = ctdb_rec_buffer_write(state.recbuf, state.fd);
4089                 if (ret != 0) {
4090                         fprintf(stderr,
4091                                 "Failed to write records to backup file\n");
4092                         close(fd);
4093                         return ret;
4094                 }
4095
4096                 state.nbuf += 1;
4097                 state.nrec += state.recbuf->count;
4098                 TALLOC_FREE(state.recbuf);
4099         }
4100
4101         db_hdr.version = DB_VERSION;
4102         db_hdr.timestamp = time(NULL);
4103         db_hdr.flags = db_flags;
4104         db_hdr.nbuf = state.nbuf;
4105         db_hdr.nrec = state.nrec;
4106         strncpy(db_hdr.name, db_name, MAX_DB_NAME-1);
4107
4108         lseek(fd, 0, SEEK_SET);
4109         ret = write(fd, &db_hdr, sizeof(struct db_header));
4110         if (ret == -1) {
4111                 ret = errno;
4112                 close(fd);
4113                 fprintf(stderr, "Failed to write header to file %s\n", argv[1]);
4114                 return ret;
4115         }
4116
4117         close(fd);
4118         printf("Database backed up to %s\n", argv[1]);
4119         return 0;
4120 }
4121
4122 static int control_restoredb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4123                              int argc, const char **argv)
4124 {
4125         const char *db_name = NULL;
4126         struct ctdb_db_context *db;
4127         struct db_header db_hdr;
4128         struct ctdb_node_map *nodemap;
4129         struct ctdb_req_control request;
4130         struct ctdb_reply_control **reply;
4131         struct ctdb_transdb wipedb;
4132         struct ctdb_pulldb_ext pulldb;
4133         struct ctdb_rec_buffer *recbuf;
4134         uint32_t generation;
4135         uint32_t *pnn_list;
4136         char timebuf[128];
4137         ssize_t n;
4138         int fd, i;
4139         int count, ret;
4140         uint8_t db_flags;
4141
4142         if (argc < 1 || argc > 2) {
4143                 usage("restoredb");
4144         }
4145
4146         fd = open(argv[0], O_RDONLY, 0600);
4147         if (fd == -1) {
4148                 ret = errno;
4149                 fprintf(stderr, "Failed to open file %s for reading\n",
4150                         argv[0]);
4151                 return ret;
4152         }
4153
4154         if (argc == 2) {
4155                 db_name = argv[1];
4156         }
4157
4158         n = read(fd, &db_hdr, sizeof(struct db_header));
4159         if (n == -1) {
4160                 ret = errno;
4161                 close(fd);
4162                 fprintf(stderr, "Failed to read db header from file %s\n",
4163                         argv[0]);
4164                 return ret;
4165         }
4166         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4167
4168         if (db_hdr.version != DB_VERSION) {
4169                 fprintf(stderr,
4170                         "Wrong version of backup file, expected %u, got %lu\n",
4171                         DB_VERSION, db_hdr.version);
4172                 close(fd);
4173                 return EINVAL;
4174         }
4175
4176         if (db_name == NULL) {
4177                 db_name = db_hdr.name;
4178         }
4179
4180         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4181                  localtime(&db_hdr.timestamp));
4182         printf("Restoring database %s from backup @ %s\n", db_name, timebuf);
4183
4184         db_flags = db_hdr.flags & 0xff;
4185         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4186                           db_flags, &db);
4187         if (ret != 0) {
4188                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4189                 close(fd);
4190                 return ret;
4191         }
4192
4193         nodemap = get_nodemap(ctdb, false);
4194         if (nodemap == NULL) {
4195                 fprintf(stderr, "Failed to get nodemap\n");
4196                 close(fd);
4197                 return ENOMEM;
4198         }
4199
4200         ret = get_generation(mem_ctx, ctdb, &generation);
4201         if (ret != 0) {
4202                 fprintf(stderr, "Failed to get current generation\n");
4203                 close(fd);
4204                 return ret;
4205         }
4206
4207         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4208                                      &pnn_list);
4209         if (count <= 0) {
4210                 close(fd);
4211                 return ENOMEM;
4212         }
4213
4214         wipedb.db_id = ctdb_db_id(db);
4215         wipedb.tid = generation;
4216
4217         ctdb_req_control_db_freeze(&request, wipedb.db_id);
4218         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4219                                         ctdb->client, pnn_list, count,
4220                                         TIMEOUT(), &request, NULL, NULL);
4221         if (ret != 0) {
4222                 goto failed;
4223         }
4224
4225
4226         ctdb_req_control_db_transaction_start(&request, &wipedb);
4227         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4228                                         pnn_list, count, TIMEOUT(),
4229                                         &request, NULL, NULL);
4230         if (ret != 0) {
4231                 goto failed;
4232         }
4233
4234         ctdb_req_control_wipe_database(&request, &wipedb);
4235         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4236                                         pnn_list, count, TIMEOUT(),
4237                                         &request, NULL, NULL);
4238         if (ret != 0) {
4239                 goto failed;
4240         }
4241
4242         pulldb.db_id = ctdb_db_id(db);
4243         pulldb.lmaster = 0;
4244         pulldb.srvid = SRVID_CTDB_PUSHDB;
4245
4246         ctdb_req_control_db_push_start(&request, &pulldb);
4247         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4248                                         pnn_list, count, TIMEOUT(),
4249                                         &request, NULL, NULL);
4250         if (ret != 0) {
4251                 goto failed;
4252         }
4253
4254         for (i=0; i<db_hdr.nbuf; i++) {
4255                 struct ctdb_req_message message;
4256                 TDB_DATA data;
4257                 size_t np;
4258
4259                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4260                 if (ret != 0) {
4261                         goto failed;
4262                 }
4263
4264                 data.dsize = ctdb_rec_buffer_len(recbuf);
4265                 data.dptr = talloc_size(mem_ctx, data.dsize);
4266                 if (data.dptr == NULL) {
4267                         goto failed;
4268                 }
4269
4270                 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
4271
4272                 message.srvid = pulldb.srvid;
4273                 message.data.data = data;
4274
4275                 ret = ctdb_client_message_multi(mem_ctx, ctdb->ev,
4276                                                 ctdb->client,
4277                                                 pnn_list, count,
4278                                                 &message, NULL);
4279                 if (ret != 0) {
4280                         goto failed;
4281                 }
4282
4283                 talloc_free(recbuf);
4284                 talloc_free(data.dptr);
4285         }
4286
4287         ctdb_req_control_db_push_confirm(&request, pulldb.db_id);
4288         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4289                                         pnn_list, count, TIMEOUT(),
4290                                         &request, NULL, &reply);
4291         if (ret != 0) {
4292                 goto failed;
4293         }
4294
4295         for (i=0; i<count; i++) {
4296                 uint32_t num_records;
4297
4298                 ret = ctdb_reply_control_db_push_confirm(reply[i],
4299                                                          &num_records);
4300                 if (ret != 0) {
4301                         fprintf(stderr, "Invalid response from node %u\n",
4302                                 pnn_list[i]);
4303                         goto failed;
4304                 }
4305
4306                 if (num_records != db_hdr.nrec) {
4307                         fprintf(stderr, "Node %u received %u of %lu records\n",
4308                                 pnn_list[i], num_records, db_hdr.nrec);
4309                         goto failed;
4310                 }
4311         }
4312
4313         ctdb_req_control_db_set_healthy(&request, wipedb.db_id);
4314         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4315                                         pnn_list, count, TIMEOUT(),
4316                                         &request, NULL, NULL);
4317         if (ret != 0) {
4318                 goto failed;
4319         }
4320
4321         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4322         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4323                                         pnn_list, count, TIMEOUT(),
4324                                         &request, NULL, NULL);
4325         if (ret != 0) {
4326                 goto failed;
4327         }
4328
4329         ctdb_req_control_db_thaw(&request, wipedb.db_id);
4330         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4331                                         ctdb->client, pnn_list, count,
4332                                         TIMEOUT(), &request, NULL, NULL);
4333         if (ret != 0) {
4334                 goto failed;
4335         }
4336
4337         printf("Database %s restored\n", db_name);
4338         close(fd);
4339         return 0;
4340
4341
4342 failed:
4343         close(fd);
4344         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4345                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4346         return ret;
4347 }
4348
4349 struct dumpdbbackup_state {
4350         ctdb_rec_parser_func_t parser;
4351         struct dump_record_state sub_state;
4352 };
4353
4354 static int dumpdbbackup_handler(uint32_t reqid,
4355                                 struct ctdb_ltdb_header *header,
4356                                 TDB_DATA key, TDB_DATA data,
4357                                 void *private_data)
4358 {
4359         struct dumpdbbackup_state *state =
4360                 (struct dumpdbbackup_state *)private_data;
4361         struct ctdb_ltdb_header hdr;
4362         int ret;
4363
4364         ret = ctdb_ltdb_header_extract(&data, &hdr);
4365         if (ret != 0) {
4366                 return ret;
4367         }
4368
4369         return state->parser(reqid, &hdr, key, data, &state->sub_state);
4370 }
4371
4372 static int control_dumpdbbackup(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4373                                 int argc, const char **argv)
4374 {
4375         struct db_header db_hdr;
4376         char timebuf[128];
4377         struct dumpdbbackup_state state;
4378         ssize_t n;
4379         int fd, ret, i;
4380
4381         if (argc != 1) {
4382                 usage("dumpbackup");
4383         }
4384
4385         fd = open(argv[0], O_RDONLY, 0600);
4386         if (fd == -1) {
4387                 ret = errno;
4388                 fprintf(stderr, "Failed to open file %s for reading\n",
4389                         argv[0]);
4390                 return ret;
4391         }
4392
4393         n = read(fd, &db_hdr, sizeof(struct db_header));
4394         if (n == -1) {
4395                 ret = errno;
4396                 close(fd);
4397                 fprintf(stderr, "Failed to read db header from file %s\n",
4398                         argv[0]);
4399                 return ret;
4400         }
4401         db_hdr.name[sizeof(db_hdr.name)-1] = '\0';
4402
4403         if (db_hdr.version != DB_VERSION) {
4404                 fprintf(stderr,
4405                         "Wrong version of backup file, expected %u, got %lu\n",
4406                         DB_VERSION, db_hdr.version);
4407                 close(fd);
4408                 return EINVAL;
4409         }
4410
4411         strftime(timebuf, sizeof(timebuf)-1, "%Y/%m/%d %H:%M:%S",
4412                  localtime(&db_hdr.timestamp));
4413         printf("Dumping database %s from backup @ %s\n",
4414                db_hdr.name, timebuf);
4415
4416         state.parser = dump_record;
4417         state.sub_state.count = 0;
4418
4419         for (i=0; i<db_hdr.nbuf; i++) {
4420                 struct ctdb_rec_buffer *recbuf;
4421
4422                 ret = ctdb_rec_buffer_read(fd, mem_ctx, &recbuf);
4423                 if (ret != 0) {
4424                         fprintf(stderr, "Failed to read records\n");
4425                         close(fd);
4426                         return ret;
4427                 }
4428
4429                 ret = ctdb_rec_buffer_traverse(recbuf, dumpdbbackup_handler,
4430                                                &state);
4431                 if (ret != 0) {
4432                         fprintf(stderr, "Failed to dump records\n");
4433                         close(fd);
4434                         return ret;
4435                 }
4436         }
4437
4438         close(fd);
4439         printf("Dumped %u record(s)\n", state.sub_state.count);
4440         return 0;
4441 }
4442
4443 static int control_wipedb(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4444                           int argc, const char **argv)
4445 {
4446         const char *db_name;
4447         struct ctdb_db_context *db;
4448         uint32_t db_id;
4449         uint8_t db_flags;
4450         struct ctdb_node_map *nodemap;
4451         struct ctdb_req_control request;
4452         struct ctdb_transdb wipedb;
4453         uint32_t generation;
4454         uint32_t *pnn_list;
4455         int count, ret;
4456
4457         if (argc != 1) {
4458                 usage("wipedb");
4459         }
4460
4461         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, &db_flags)) {
4462                 return 1;
4463         }
4464
4465         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4466                           db_flags, &db);
4467         if (ret != 0) {
4468                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4469                 return ret;
4470         }
4471
4472         nodemap = get_nodemap(ctdb, false);
4473         if (nodemap == NULL) {
4474                 fprintf(stderr, "Failed to get nodemap\n");
4475                 return ENOMEM;
4476         }
4477
4478         ret = get_generation(mem_ctx, ctdb, &generation);
4479         if (ret != 0) {
4480                 fprintf(stderr, "Failed to get current generation\n");
4481                 return ret;
4482         }
4483
4484         count = list_of_active_nodes(nodemap, CTDB_UNKNOWN_PNN, mem_ctx,
4485                                      &pnn_list);
4486         if (count <= 0) {
4487                 return ENOMEM;
4488         }
4489
4490         ctdb_req_control_db_freeze(&request, db_id);
4491         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4492                                         ctdb->client, pnn_list, count,
4493                                         TIMEOUT(), &request, NULL, NULL);
4494         if (ret != 0) {
4495                 goto failed;
4496         }
4497
4498         wipedb.db_id = db_id;
4499         wipedb.tid = generation;
4500
4501         ctdb_req_control_db_transaction_start(&request, &wipedb);
4502         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4503                                         pnn_list, count, TIMEOUT(),
4504                                         &request, NULL, NULL);
4505         if (ret != 0) {
4506                 goto failed;
4507         }
4508
4509         ctdb_req_control_wipe_database(&request, &wipedb);
4510         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4511                                         pnn_list, count, TIMEOUT(),
4512                                         &request, NULL, NULL);
4513         if (ret != 0) {
4514                 goto failed;
4515         }
4516
4517         ctdb_req_control_db_set_healthy(&request, db_id);
4518         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4519                                         pnn_list, count, TIMEOUT(),
4520                                         &request, NULL, NULL);
4521         if (ret != 0) {
4522                 goto failed;
4523         }
4524
4525         ctdb_req_control_db_transaction_commit(&request, &wipedb);
4526         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
4527                                         pnn_list, count, TIMEOUT(),
4528                                         &request, NULL, NULL);
4529         if (ret != 0) {
4530                 goto failed;
4531         }
4532
4533         ctdb_req_control_db_thaw(&request, db_id);
4534         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev,
4535                                         ctdb->client, pnn_list, count,
4536                                         TIMEOUT(), &request, NULL, NULL);
4537         if (ret != 0) {
4538                 goto failed;
4539         }
4540
4541         printf("Database %s wiped\n", db_name);
4542         return 0;
4543
4544
4545 failed:
4546         ctdb_ctrl_set_recmode(mem_ctx, ctdb->ev, ctdb->client,
4547                               ctdb->pnn, TIMEOUT(), CTDB_RECOVERY_ACTIVE);
4548         return ret;
4549 }
4550
4551 static int control_recmaster(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4552                              int argc, const char **argv)
4553 {
4554         uint32_t recmaster;
4555         int ret;
4556
4557         ret = ctdb_ctrl_get_recmaster(mem_ctx, ctdb->ev, ctdb->client,
4558                                       ctdb->cmd_pnn, TIMEOUT(), &recmaster);
4559         if (ret != 0) {
4560                 return ret;
4561         }
4562
4563         printf("%u\n", recmaster);
4564         return 0;
4565 }
4566
4567 static int control_event(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4568                          int argc, const char **argv)
4569 {
4570         char *t, *event_helper = NULL;
4571
4572         t = getenv("CTDB_EVENT_HELPER");
4573         if (t != NULL) {
4574                 event_helper = talloc_strdup(mem_ctx, t);
4575         } else {
4576                 event_helper = talloc_asprintf(mem_ctx, "%s/ctdb-event",
4577                                                CTDB_HELPER_BINDIR);
4578         }
4579
4580         if (event_helper == NULL) {
4581                 fprintf(stderr, "Unable to set event daemon helper\n");
4582                 return 1;
4583         }
4584
4585         return run_helper(mem_ctx, "event daemon helper", event_helper,
4586                           argc, argv);
4587 }
4588
4589 static int control_scriptstatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4590                                 int argc, const char **argv)
4591 {
4592         const char *new_argv[4];
4593
4594         if (argc > 1) {
4595                 usage("scriptstatus");
4596         }
4597
4598         new_argv[0] = "status";
4599         new_argv[1] = "legacy";
4600         new_argv[2] = (argc == 0) ? "monitor" : argv[0];
4601         new_argv[3] = NULL;
4602
4603         (void) control_event(mem_ctx, ctdb, 3, new_argv);
4604         return 0;
4605 }
4606
4607 static int control_natgw(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4608                          int argc, const char **argv)
4609 {
4610         char *t, *natgw_helper = NULL;
4611
4612         if (argc != 1) {
4613                 usage("natgw");
4614         }
4615
4616         t = getenv("CTDB_NATGW_HELPER");
4617         if (t != NULL) {
4618                 natgw_helper = talloc_strdup(mem_ctx, t);
4619         } else {
4620                 natgw_helper = talloc_asprintf(mem_ctx, "%s/ctdb_natgw",
4621                                                CTDB_HELPER_BINDIR);
4622         }
4623
4624         if (natgw_helper == NULL) {
4625                 fprintf(stderr, "Unable to set NAT gateway helper\n");
4626                 return 1;
4627         }
4628
4629         return run_helper(mem_ctx, "NAT gateway helper", natgw_helper,
4630                           argc, argv);
4631 }
4632
4633 /*
4634  * Find the PNN of the current node
4635  * discover the pnn by loading the nodes file and try to bind
4636  * to all addresses one at a time until the ip address is found.
4637  */
4638 static bool find_node_xpnn(TALLOC_CTX *mem_ctx, uint32_t *pnn)
4639 {
4640         struct ctdb_node_map *nodemap;
4641         int i;
4642
4643         nodemap = read_nodes_file(mem_ctx, CTDB_UNKNOWN_PNN);
4644         if (nodemap == NULL) {
4645                 return false;
4646         }
4647
4648         for (i=0; i<nodemap->num; i++) {
4649                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
4650                         continue;
4651                 }
4652                 if (ctdb_sys_have_ip(&nodemap->node[i].addr)) {
4653                         if (pnn != NULL) {
4654                                 *pnn = nodemap->node[i].pnn;
4655                         }
4656                         talloc_free(nodemap);
4657                         return true;
4658                 }
4659         }
4660
4661         fprintf(stderr, "Failed to detect PNN of the current node.\n");
4662         talloc_free(nodemap);
4663         return false;
4664 }
4665
4666 static int control_getreclock(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4667                               int argc, const char **argv)
4668 {
4669         const char *reclock;
4670         int ret;
4671
4672         if (argc != 0) {
4673                 usage("getreclock");
4674         }
4675
4676         ret = ctdb_ctrl_get_reclock_file(mem_ctx, ctdb->ev, ctdb->client,
4677                                          ctdb->cmd_pnn, TIMEOUT(), &reclock);
4678         if (ret != 0) {
4679                 return ret;
4680         }
4681
4682         if (reclock != NULL) {
4683                 printf("%s\n", reclock);
4684         }
4685
4686         return 0;
4687 }
4688
4689 static int control_setlmasterrole(TALLOC_CTX *mem_ctx,
4690                                   struct ctdb_context *ctdb,
4691                                   int argc, const char **argv)
4692 {
4693         uint32_t lmasterrole = 0;
4694         int ret;
4695
4696         if (argc != 1) {
4697                 usage("setlmasterrole");
4698         }
4699
4700         if (strcmp(argv[0], "on") == 0) {
4701                 lmasterrole = 1;
4702         } else if (strcmp(argv[0], "off") == 0) {
4703                 lmasterrole = 0;
4704         } else {
4705                 usage("setlmasterrole");
4706         }
4707
4708         ret = ctdb_ctrl_set_lmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4709                                         ctdb->cmd_pnn, TIMEOUT(), lmasterrole);
4710         if (ret != 0) {
4711                 return ret;
4712         }
4713
4714         return 0;
4715 }
4716
4717 static int control_setrecmasterrole(TALLOC_CTX *mem_ctx,
4718                                     struct ctdb_context *ctdb,
4719                                     int argc, const char **argv)
4720 {
4721         uint32_t recmasterrole = 0;
4722         int ret;
4723
4724         if (argc != 1) {
4725                 usage("setrecmasterrole");
4726         }
4727
4728         if (strcmp(argv[0], "on") == 0) {
4729                 recmasterrole = 1;
4730         } else if (strcmp(argv[0], "off") == 0) {
4731                 recmasterrole = 0;
4732         } else {
4733                 usage("setrecmasterrole");
4734         }
4735
4736         ret = ctdb_ctrl_set_recmasterrole(mem_ctx, ctdb->ev, ctdb->client,
4737                                           ctdb->cmd_pnn, TIMEOUT(),
4738                                           recmasterrole);
4739         if (ret != 0) {
4740                 return ret;
4741         }
4742
4743         return 0;
4744 }
4745
4746 static int control_setdbreadonly(TALLOC_CTX *mem_ctx,
4747                                  struct ctdb_context *ctdb,
4748                                  int argc, const char **argv)
4749 {
4750         uint32_t db_id;
4751         uint8_t db_flags;
4752         int ret;
4753
4754         if (argc != 1) {
4755                 usage("setdbreadonly");
4756         }
4757
4758         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4759                 return 1;
4760         }
4761
4762         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
4763                 fprintf(stderr, "READONLY can be set only on volatile DB\n");
4764                 return 1;
4765         }
4766
4767         ret = ctdb_ctrl_set_db_readonly(mem_ctx, ctdb->ev, ctdb->client,
4768                                         ctdb->cmd_pnn, TIMEOUT(), db_id);
4769         if (ret != 0) {
4770                 return ret;
4771         }
4772
4773         return 0;
4774 }
4775
4776 static int control_setdbsticky(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4777                                int argc, const char **argv)
4778 {
4779         uint32_t db_id;
4780         uint8_t db_flags;
4781         int ret;
4782
4783         if (argc != 1) {
4784                 usage("setdbsticky");
4785         }
4786
4787         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, NULL, &db_flags)) {
4788                 return 1;
4789         }
4790
4791         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
4792                 fprintf(stderr, "STICKY can be set only on volatile DB\n");
4793                 return 1;
4794         }
4795
4796         ret = ctdb_ctrl_set_db_sticky(mem_ctx, ctdb->ev, ctdb->client,
4797                                       ctdb->cmd_pnn, TIMEOUT(), db_id);
4798         if (ret != 0) {
4799                 return ret;
4800         }
4801
4802         return 0;
4803 }
4804
4805 static int control_pfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4806                           int argc, const char **argv)
4807 {
4808         const char *db_name;
4809         struct ctdb_db_context *db;
4810         struct ctdb_transaction_handle *h;
4811         uint8_t db_flags;
4812         TDB_DATA key, data;
4813         int ret;
4814
4815         if (argc != 2) {
4816                 usage("pfetch");
4817         }
4818
4819         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4820                 return 1;
4821         }
4822
4823         if (! (db_flags &
4824                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4825                 fprintf(stderr, "Transactions not supported on DB %s\n",
4826                         db_name);
4827                 return 1;
4828         }
4829
4830         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4831                           db_flags, &db);
4832         if (ret != 0) {
4833                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4834                 return ret;
4835         }
4836
4837         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4838         if (ret != 0) {
4839                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4840                 return ret;
4841         }
4842
4843         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4844                                      TIMEOUT(), db, true, &h);
4845         if (ret != 0) {
4846                 fprintf(stderr, "Failed to start transaction on db %s\n",
4847                         db_name);
4848                 return ret;
4849         }
4850
4851         ret = ctdb_transaction_fetch_record(h, key, mem_ctx, &data);
4852         if (ret != 0) {
4853                 fprintf(stderr, "Failed to read record for key %s\n",
4854                         argv[1]);
4855                 ctdb_transaction_cancel(h);
4856                 return ret;
4857         }
4858
4859         printf("%.*s\n", (int)data.dsize, data.dptr);
4860
4861         ctdb_transaction_cancel(h);
4862         return 0;
4863 }
4864
4865 static int control_pstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4866                           int argc, const char **argv)
4867 {
4868         const char *db_name;
4869         struct ctdb_db_context *db;
4870         struct ctdb_transaction_handle *h;
4871         uint8_t db_flags;
4872         TDB_DATA key, data;
4873         int ret;
4874
4875         if (argc != 3) {
4876                 usage("pstore");
4877         }
4878
4879         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4880                 return 1;
4881         }
4882
4883         if (! (db_flags &
4884                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4885                 fprintf(stderr, "Transactions not supported on DB %s\n",
4886                         db_name);
4887                 return 1;
4888         }
4889
4890         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4891                           db_flags, &db);
4892         if (ret != 0) {
4893                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4894                 return ret;
4895         }
4896
4897         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4898         if (ret != 0) {
4899                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4900                 return ret;
4901         }
4902
4903         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
4904         if (ret != 0) {
4905                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
4906                 return ret;
4907         }
4908
4909         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4910                                      TIMEOUT(), db, false, &h);
4911         if (ret != 0) {
4912                 fprintf(stderr, "Failed to start transaction on db %s\n",
4913                         db_name);
4914                 return ret;
4915         }
4916
4917         ret = ctdb_transaction_store_record(h, key, data);
4918         if (ret != 0) {
4919                 fprintf(stderr, "Failed to store record for key %s\n",
4920                         argv[1]);
4921                 ctdb_transaction_cancel(h);
4922                 return ret;
4923         }
4924
4925         ret = ctdb_transaction_commit(h);
4926         if (ret != 0) {
4927                 fprintf(stderr, "Failed to commit transaction on db %s\n",
4928                         db_name);
4929                 ctdb_transaction_cancel(h);
4930                 return ret;
4931         }
4932
4933         return 0;
4934 }
4935
4936 static int control_pdelete(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
4937                            int argc, const char **argv)
4938 {
4939         const char *db_name;
4940         struct ctdb_db_context *db;
4941         struct ctdb_transaction_handle *h;
4942         uint8_t db_flags;
4943         TDB_DATA key;
4944         int ret;
4945
4946         if (argc != 2) {
4947                 usage("pdelete");
4948         }
4949
4950         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
4951                 return 1;
4952         }
4953
4954         if (! (db_flags &
4955                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
4956                 fprintf(stderr, "Transactions not supported on DB %s\n",
4957                         db_name);
4958                 return 1;
4959         }
4960
4961         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
4962                           db_flags, &db);
4963         if (ret != 0) {
4964                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
4965                 return ret;
4966         }
4967
4968         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
4969         if (ret != 0) {
4970                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
4971                 return ret;
4972         }
4973
4974         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
4975                                      TIMEOUT(), db, false, &h);
4976         if (ret != 0) {
4977                 fprintf(stderr, "Failed to start transaction on db %s\n",
4978                         db_name);
4979                 return ret;
4980         }
4981
4982         ret = ctdb_transaction_delete_record(h, key);
4983         if (ret != 0) {
4984                 fprintf(stderr, "Failed to delete record for key %s\n",
4985                         argv[1]);
4986                 ctdb_transaction_cancel(h);
4987                 return ret;
4988         }
4989
4990         ret = ctdb_transaction_commit(h);
4991         if (ret != 0) {
4992                 fprintf(stderr, "Failed to commit transaction on db %s\n",
4993                         db_name);
4994                 ctdb_transaction_cancel(h);
4995                 return ret;
4996         }
4997
4998         return 0;
4999 }
5000
5001 static int ptrans_parse_string(TALLOC_CTX *mem_ctx, const char **ptr, TDB_DATA *data)
5002 {
5003         const char *t;
5004         size_t n;
5005         int ret;
5006
5007         *data = tdb_null;
5008
5009         /* Skip whitespace */
5010         n = strspn(*ptr, " \t");
5011         t = *ptr + n;
5012
5013         if (t[0] == '"') {
5014                 /* Quoted ASCII string - no wide characters! */
5015                 t++;
5016                 n = strcspn(t, "\"");
5017                 if (t[n] == '"') {
5018                         if (n > 0) {
5019                                 ret = str_to_data(t, n, mem_ctx, data);
5020                                 if (ret != 0) {
5021                                         return ret;
5022                                 }
5023                         }
5024                         *ptr = t + n + 1;
5025                 } else {
5026                         fprintf(stderr, "Unmatched \" in input %s\n", *ptr);
5027                         return 1;
5028                 }
5029         } else {
5030                 fprintf(stderr, "Unsupported input format in %s\n", *ptr);
5031                 return 1;
5032         }
5033
5034         return 0;
5035 }
5036
5037 #define MAX_LINE_SIZE   1024
5038
5039 static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
5040                                  TDB_DATA *key, TDB_DATA *value)
5041 {
5042         char line [MAX_LINE_SIZE]; /* FIXME: make this more flexible? */
5043         const char *ptr;
5044         int ret;
5045
5046         ptr = fgets(line, MAX_LINE_SIZE, file);
5047         if (ptr == NULL) {
5048                 return false;
5049         }
5050
5051         /* Get key */
5052         ret = ptrans_parse_string(mem_ctx, &ptr, key);
5053         if (ret != 0 || ptr == NULL || key->dptr == NULL) {
5054                 /* Line Ignored but not EOF */
5055                 *key = tdb_null;
5056                 return true;
5057         }
5058
5059         /* Get value */
5060         ret = ptrans_parse_string(mem_ctx, &ptr, value);
5061         if (ret != 0) {
5062                 /* Line Ignored but not EOF */
5063                 talloc_free(key->dptr);
5064                 *key = tdb_null;
5065                 return true;
5066         }
5067
5068         return true;
5069 }
5070
5071 static int control_ptrans(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5072                           int argc, const char **argv)
5073 {
5074         const char *db_name;
5075         struct ctdb_db_context *db;
5076         struct ctdb_transaction_handle *h;
5077         uint8_t db_flags;
5078         FILE *file;
5079         TDB_DATA key, value;
5080         int ret;
5081
5082         if (argc < 1 || argc > 2) {
5083                 usage("ptrans");
5084         }
5085
5086         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5087                 return 1;
5088         }
5089
5090         if (! (db_flags &
5091                (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED))) {
5092                 fprintf(stderr, "Transactions not supported on DB %s\n",
5093                         db_name);
5094                 return 1;
5095         }
5096
5097         if (argc == 2) {
5098                 file = fopen(argv[1], "r");
5099                 if (file == NULL) {
5100                         fprintf(stderr, "Failed to open file %s\n", argv[1]);
5101                         return 1;
5102                 }
5103         } else {
5104                 file = stdin;
5105         }
5106
5107         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5108                           db_flags, &db);
5109         if (ret != 0) {
5110                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5111                 goto done;
5112         }
5113
5114         ret = ctdb_transaction_start(mem_ctx, ctdb->ev, ctdb->client,
5115                                      TIMEOUT(), db, false, &h);
5116         if (ret != 0) {
5117                 fprintf(stderr, "Failed to start transaction on db %s\n",
5118                         db_name);
5119                 goto done;
5120         }
5121
5122         while (ptrans_get_key_value(mem_ctx, file, &key, &value)) {
5123                 if (key.dsize != 0) {
5124                         ret = ctdb_transaction_store_record(h, key, value);
5125                         if (ret != 0) {
5126                                 fprintf(stderr, "Failed to store record\n");
5127                                 ctdb_transaction_cancel(h);
5128                                 goto done;
5129                         }
5130                         talloc_free(key.dptr);
5131                         talloc_free(value.dptr);
5132                 }
5133         }
5134
5135         ret = ctdb_transaction_commit(h);
5136         if (ret != 0) {
5137                 fprintf(stderr, "Failed to commit transaction on db %s\n",
5138                         db_name);
5139                 ctdb_transaction_cancel(h);
5140         }
5141
5142 done:
5143         if (file != stdin) {
5144                 fclose(file);
5145         }
5146         return ret;
5147 }
5148
5149 static int control_tfetch(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5150                           int argc, const char **argv)
5151 {
5152         struct tdb_context *tdb;
5153         TDB_DATA key, data;
5154         struct ctdb_ltdb_header header;
5155         int ret;
5156
5157         if (argc < 2 || argc > 3) {
5158                 usage("tfetch");
5159         }
5160
5161         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5162         if (tdb == NULL) {
5163                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5164                 return 1;
5165         }
5166
5167         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5168         if (ret != 0) {
5169                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5170                 tdb_close(tdb);
5171                 return ret;
5172         }
5173
5174         data = tdb_fetch(tdb, key);
5175         if (data.dptr == NULL) {
5176                 fprintf(stderr, "No record for key %s\n", argv[1]);
5177                 tdb_close(tdb);
5178                 return 1;
5179         }
5180
5181         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
5182                 fprintf(stderr, "Invalid record for key %s\n", argv[1]);
5183                 tdb_close(tdb);
5184                 return 1;
5185         }
5186
5187         tdb_close(tdb);
5188
5189         if (argc == 3) {
5190                 int fd;
5191                 ssize_t nwritten;
5192
5193                 fd = open(argv[2], O_WRONLY|O_CREAT|O_TRUNC, 0600);
5194                 if (fd == -1) {
5195                         fprintf(stderr, "Failed to open output file %s\n",
5196                                 argv[2]);
5197                         goto fail;
5198                 }
5199
5200                 nwritten = sys_write(fd, data.dptr, data.dsize);
5201                 if (nwritten != data.dsize) {
5202                         fprintf(stderr, "Failed to write record to file\n");
5203                         close(fd);
5204                         goto fail;
5205                 }
5206
5207                 close(fd);
5208         }
5209
5210 fail:
5211         ret = ctdb_ltdb_header_extract(&data, &header);
5212         if (ret != 0) {
5213                 fprintf(stderr, "Failed to parse header from data\n");
5214                 return 1;
5215         }
5216
5217         dump_ltdb_header(&header);
5218         dump_tdb_data("data", data);
5219
5220         return 0;
5221 }
5222
5223 static int control_tstore(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5224                           int argc, const char **argv)
5225 {
5226         struct tdb_context *tdb;
5227         TDB_DATA key, data[2], value;
5228         struct ctdb_ltdb_header header;
5229         uint8_t header_buf[sizeof(struct ctdb_ltdb_header)];
5230         size_t np;
5231         int ret;
5232
5233         if (argc < 3 || argc > 5) {
5234                 usage("tstore");
5235         }
5236
5237         tdb = tdb_open(argv[0], 0, 0, O_RDWR, 0);
5238         if (tdb == NULL) {
5239                 fprintf(stderr, "Failed to open TDB file %s\n", argv[0]);
5240                 return 1;
5241         }
5242
5243         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5244         if (ret != 0) {
5245                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5246                 tdb_close(tdb);
5247                 return ret;
5248         }
5249
5250         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &value);
5251         if (ret != 0) {
5252                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5253                 tdb_close(tdb);
5254                 return ret;
5255         }
5256
5257         ZERO_STRUCT(header);
5258
5259         if (argc > 3) {
5260                 header.rsn = (uint64_t)strtoull(argv[3], NULL, 0);
5261         }
5262         if (argc > 4) {
5263                 header.dmaster = (uint32_t)atol(argv[4]);
5264         }
5265         if (argc > 5) {
5266                 header.flags = (uint32_t)atol(argv[5]);
5267         }
5268
5269         ctdb_ltdb_header_push(&header, header_buf, &np);
5270
5271         data[0].dsize = np;
5272         data[0].dptr = header_buf;
5273
5274         data[1].dsize = value.dsize;
5275         data[1].dptr = value.dptr;
5276
5277         ret = tdb_storev(tdb, key, data, 2, TDB_REPLACE);
5278         if (ret != 0) {
5279                 fprintf(stderr, "Failed to write record %s to file %s\n",
5280                         argv[1], argv[0]);
5281         }
5282
5283         tdb_close(tdb);
5284
5285         return ret;
5286 }
5287
5288 static int control_readkey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5289                            int argc, const char **argv)
5290 {
5291         const char *db_name;
5292         struct ctdb_db_context *db;
5293         struct ctdb_record_handle *h;
5294         uint8_t db_flags;
5295         TDB_DATA key, data;
5296         bool readonly = false;
5297         int ret;
5298
5299         if (argc < 2 || argc > 3) {
5300                 usage("readkey");
5301         }
5302
5303         if (argc == 3) {
5304                 if (strcmp(argv[2], "readonly") == 0) {
5305                         readonly = true;
5306                 } else {
5307                         usage("readkey");
5308                 }
5309         }
5310
5311         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5312                 return 1;
5313         }
5314
5315         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5316                 fprintf(stderr, "DB %s is not a volatile database\n",
5317                         db_name);
5318                 return 1;
5319         }
5320
5321         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5322                           db_flags, &db);
5323         if (ret != 0) {
5324                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5325                 return ret;
5326         }
5327
5328         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5329         if (ret != 0) {
5330                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5331                 return ret;
5332         }
5333
5334         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5335                               db, key, readonly, &h, NULL, &data);
5336         if (ret != 0) {
5337                 fprintf(stderr, "Failed to read record for key %s\n",
5338                         argv[1]);
5339         } else {
5340                 printf("Data: size:%zu ptr:[%.*s]\n", data.dsize,
5341                        (int)data.dsize, data.dptr);
5342         }
5343
5344         talloc_free(h);
5345         return ret;
5346 }
5347
5348 static int control_writekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5349                             int argc, const char **argv)
5350 {
5351         const char *db_name;
5352         struct ctdb_db_context *db;
5353         struct ctdb_record_handle *h;
5354         uint8_t db_flags;
5355         TDB_DATA key, data;
5356         int ret;
5357
5358         if (argc != 3) {
5359                 usage("writekey");
5360         }
5361
5362         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5363                 return 1;
5364         }
5365
5366         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5367                 fprintf(stderr, "DB %s is not a volatile database\n",
5368                         db_name);
5369                 return 1;
5370         }
5371
5372         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5373                           db_flags, &db);
5374         if (ret != 0) {
5375                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5376                 return ret;
5377         }
5378
5379         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5380         if (ret != 0) {
5381                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5382                 return ret;
5383         }
5384
5385         ret = str_to_data(argv[2], strlen(argv[2]), mem_ctx, &data);
5386         if (ret != 0) {
5387                 fprintf(stderr, "Failed to parse value %s\n", argv[2]);
5388                 return ret;
5389         }
5390
5391         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5392                               db, key, false, &h, NULL, NULL);
5393         if (ret != 0) {
5394                 fprintf(stderr, "Failed to lock record for key %s\n", argv[0]);
5395                 return ret;
5396         }
5397
5398         ret = ctdb_store_record(h, data);
5399         if (ret != 0) {
5400                 fprintf(stderr, "Failed to store record for key %s\n",
5401                         argv[1]);
5402         }
5403
5404         talloc_free(h);
5405         return ret;
5406 }
5407
5408 static int control_deletekey(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5409                              int argc, const char **argv)
5410 {
5411         const char *db_name;
5412         struct ctdb_db_context *db;
5413         struct ctdb_record_handle *h;
5414         uint8_t db_flags;
5415         TDB_DATA key, data;
5416         int ret;
5417
5418         if (argc != 2) {
5419                 usage("deletekey");
5420         }
5421
5422         if (! db_exists(mem_ctx, ctdb, argv[0], NULL, &db_name, &db_flags)) {
5423                 return 1;
5424         }
5425
5426         if (db_flags & (CTDB_DB_FLAGS_PERSISTENT | CTDB_DB_FLAGS_REPLICATED)) {
5427                 fprintf(stderr, "DB %s is not a volatile database\n",
5428                         db_name);
5429                 return 1;
5430         }
5431
5432         ret = ctdb_attach(ctdb->ev, ctdb->client, TIMEOUT(), db_name,
5433                           db_flags, &db);
5434         if (ret != 0) {
5435                 fprintf(stderr, "Failed to attach to DB %s\n", db_name);
5436                 return ret;
5437         }
5438
5439         ret = str_to_data(argv[1], strlen(argv[1]), mem_ctx, &key);
5440         if (ret != 0) {
5441                 fprintf(stderr, "Failed to parse key %s\n", argv[1]);
5442                 return ret;
5443         }
5444
5445         ret = ctdb_fetch_lock(mem_ctx, ctdb->ev, ctdb->client,
5446                               db, key, false, &h, NULL, &data);
5447         if (ret != 0) {
5448                 fprintf(stderr, "Failed to fetch record for key %s\n",
5449                         argv[1]);
5450                 return ret;
5451         }
5452
5453         ret = ctdb_delete_record(h);
5454         if (ret != 0) {
5455                 fprintf(stderr, "Failed to delete record for key %s\n",
5456                         argv[1]);
5457         }
5458
5459         talloc_free(h);
5460         return ret;
5461 }
5462
5463 static int control_checktcpport(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5464                                 int argc, const char **argv)
5465 {
5466         struct sockaddr_in sin;
5467         unsigned int port;
5468         int s, v;
5469         int ret;
5470
5471         if (argc != 1) {
5472                 usage("chktcpport");
5473         }
5474
5475         port = atoi(argv[0]);
5476
5477         s = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
5478         if (s == -1) {
5479                 fprintf(stderr, "Failed to open local socket\n");
5480                 return errno;
5481         }
5482
5483         v = fcntl(s, F_GETFL, 0);
5484         if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK)) {
5485                 fprintf(stderr, "Unable to set socket non-blocking\n");
5486                 close(s);
5487                 return errno;
5488         }
5489
5490         bzero(&sin, sizeof(sin));
5491         sin.sin_family = AF_INET;
5492         sin.sin_port = htons(port);
5493         ret = bind(s, (struct sockaddr *)&sin, sizeof(sin));
5494         close(s);
5495         if (ret == -1) {
5496                 fprintf(stderr, "Failed to bind to TCP port %u\n", port);
5497                 return errno;
5498         }
5499
5500         return 0;
5501 }
5502
5503 static int control_getdbseqnum(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5504                                int argc, const char **argv)
5505 {
5506         uint32_t db_id;
5507         const char *db_name;
5508         uint64_t seqnum;
5509         int ret;
5510
5511         if (argc != 1) {
5512                 usage("getdbseqnum");
5513         }
5514
5515         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5516                 return 1;
5517         }
5518
5519         ret = ctdb_ctrl_get_db_seqnum(mem_ctx, ctdb->ev, ctdb->client,
5520                                       ctdb->cmd_pnn, TIMEOUT(), db_id,
5521                                       &seqnum);
5522         if (ret != 0) {
5523                 fprintf(stderr, "Failed to get sequence number for DB %s\n",
5524                         db_name);
5525                 return ret;
5526         }
5527
5528         printf("0x%"PRIx64"\n", seqnum);
5529         return 0;
5530 }
5531
5532 static int control_nodestatus(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5533                               int argc, const char **argv)
5534 {
5535         const char *nodestring = NULL;
5536         struct ctdb_node_map *nodemap;
5537         int ret, i;
5538         bool print_hdr = false;
5539
5540         if (argc > 1) {
5541                 usage("nodestatus");
5542         }
5543
5544         if (argc == 1) {
5545                 nodestring = argv[0];
5546                 if (strcmp(nodestring, "all") == 0) {
5547                         print_hdr = true;
5548                 }
5549         }
5550
5551         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap)) {
5552                 return 1;
5553         }
5554
5555         if (options.machinereadable) {
5556                 print_nodemap_machine(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn);
5557         } else {
5558                 print_nodemap(mem_ctx, ctdb, nodemap, ctdb->cmd_pnn, print_hdr);
5559         }
5560
5561         ret = 0;
5562         for (i=0; i<nodemap->num; i++) {
5563                 ret |= nodemap->node[i].flags;
5564         }
5565
5566         return ret;
5567 }
5568
5569 const struct {
5570         const char *name;
5571         uint32_t offset;
5572 } db_stats_fields[] = {
5573 #define DBSTATISTICS_FIELD(n) { #n, offsetof(struct ctdb_db_statistics, n) }
5574         DBSTATISTICS_FIELD(db_ro_delegations),
5575         DBSTATISTICS_FIELD(db_ro_revokes),
5576         DBSTATISTICS_FIELD(locks.num_calls),
5577         DBSTATISTICS_FIELD(locks.num_current),
5578         DBSTATISTICS_FIELD(locks.num_pending),
5579         DBSTATISTICS_FIELD(locks.num_failed),
5580 };
5581
5582 static void print_dbstatistics(const char *db_name,
5583                                struct ctdb_db_statistics *s)
5584 {
5585         int i;
5586         const char *prefix = NULL;
5587         int preflen = 0;
5588
5589         printf("DB Statistics %s\n", db_name);
5590
5591         for (i=0; i<ARRAY_SIZE(db_stats_fields); i++) {
5592                 if (strchr(db_stats_fields[i].name, '.') != NULL) {
5593                         preflen = strcspn(db_stats_fields[i].name, ".") + 1;
5594                         if (! prefix ||
5595                             strncmp(prefix, db_stats_fields[i].name, preflen) != 0) {
5596                                 prefix = db_stats_fields[i].name;
5597                                 printf(" %*.*s\n", preflen-1, preflen-1,
5598                                        db_stats_fields[i].name);
5599                         }
5600                 } else {
5601                         preflen = 0;
5602                 }
5603                 printf(" %*s%-22s%*s%10u\n", preflen ? 4 : 0, "",
5604                        db_stats_fields[i].name+preflen, preflen ? 0 : 4, "",
5605                        *(uint32_t *)(db_stats_fields[i].offset+(uint8_t *)s));
5606         }
5607
5608         printf(" hop_count_buckets:");
5609         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5610                 printf(" %d", s->hop_count_bucket[i]);
5611         }
5612         printf("\n");
5613
5614         printf(" lock_buckets:");
5615         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
5616                 printf(" %d", s->locks.buckets[i]);
5617         }
5618         printf("\n");
5619
5620         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5621                "locks_latency      MIN/AVG/MAX",
5622                s->locks.latency.min, LATENCY_AVG(s->locks.latency),
5623                s->locks.latency.max, s->locks.latency.num);
5624
5625         printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
5626                "vacuum_latency     MIN/AVG/MAX",
5627                s->vacuum.latency.min, LATENCY_AVG(s->vacuum.latency),
5628                s->vacuum.latency.max, s->vacuum.latency.num);
5629
5630         printf(" Num Hot Keys:     %d\n", s->num_hot_keys);
5631         for (i=0; i<s->num_hot_keys; i++) {
5632                 int j;
5633                 printf("     Count:%d Key:", s->hot_keys[i].count);
5634                 for (j=0; j<s->hot_keys[i].key.dsize; j++) {
5635                         printf("%02x", s->hot_keys[i].key.dptr[j] & 0xff);
5636                 }
5637                 printf("\n");
5638         }
5639 }
5640
5641 static int control_dbstatistics(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5642                                 int argc, const char **argv)
5643 {
5644         uint32_t db_id;
5645         const char *db_name;
5646         struct ctdb_db_statistics *dbstats;
5647         int ret;
5648
5649         if (argc != 1) {
5650                 usage("dbstatistics");
5651         }
5652
5653         if (! db_exists(mem_ctx, ctdb, argv[0], &db_id, &db_name, NULL)) {
5654                 return 1;
5655         }
5656
5657         ret = ctdb_ctrl_get_db_statistics(mem_ctx, ctdb->ev, ctdb->client,
5658                                           ctdb->cmd_pnn, TIMEOUT(), db_id,
5659                                           &dbstats);
5660         if (ret != 0) {
5661                 fprintf(stderr, "Failed to get statistics for DB %s\n",
5662                         db_name);
5663                 return ret;
5664         }
5665
5666         print_dbstatistics(db_name, dbstats);
5667         return 0;
5668 }
5669
5670 struct disable_takeover_runs_state {
5671         uint32_t *pnn_list;
5672         int node_count;
5673         bool *reply;
5674         int status;
5675         bool done;
5676 };
5677
5678 static void disable_takeover_run_handler(uint64_t srvid, TDB_DATA data,
5679                                          void *private_data)
5680 {
5681         struct disable_takeover_runs_state *state =
5682                 (struct disable_takeover_runs_state *)private_data;
5683         int ret, i;
5684
5685         if (data.dsize != sizeof(int)) {
5686                 /* Ignore packet */
5687                 return;
5688         }
5689
5690         /* ret will be a PNN (i.e. >=0) on success, or negative on error */
5691         ret = *(int *)data.dptr;
5692         if (ret < 0) {
5693                 state->status = ret;
5694                 state->done = true;
5695                 return;
5696         }
5697         for (i=0; i<state->node_count; i++) {
5698                 if (state->pnn_list[i] == ret) {
5699                         state->reply[i] = true;
5700                         break;
5701                 }
5702         }
5703
5704         state->done = true;
5705         for (i=0; i<state->node_count; i++) {
5706                 if (! state->reply[i]) {
5707                         state->done = false;
5708                         break;
5709                 }
5710         }
5711 }
5712
5713 static int disable_takeover_runs(TALLOC_CTX *mem_ctx,
5714                                  struct ctdb_context *ctdb, uint32_t timeout,
5715                                  uint32_t *pnn_list, int count)
5716 {
5717         struct ctdb_disable_message disable = { 0 };
5718         struct disable_takeover_runs_state state;
5719         int ret, i;
5720
5721         disable.pnn = ctdb->pnn;
5722         disable.srvid = next_srvid(ctdb);
5723         disable.timeout = timeout;
5724
5725         state.pnn_list = pnn_list;
5726         state.node_count = count;
5727         state.done = false;
5728         state.status = 0;
5729         state.reply = talloc_zero_array(mem_ctx, bool, count);
5730         if (state.reply == NULL) {
5731                 return ENOMEM;
5732         }
5733
5734         ret = ctdb_client_set_message_handler(ctdb->ev, ctdb->client,
5735                                               disable.srvid,
5736                                               disable_takeover_run_handler,
5737                                               &state);
5738         if (ret != 0) {
5739                 return ret;
5740         }
5741
5742         for (i=0; i<count; i++) {
5743                 ret = ctdb_message_disable_takeover_runs(mem_ctx, ctdb->ev,
5744                                                          ctdb->client,
5745                                                          pnn_list[i],
5746                                                          &disable);
5747                 if (ret != 0) {
5748                         goto fail;
5749                 }
5750         }
5751
5752         ret = ctdb_client_wait_timeout(ctdb->ev, &state.done, TIMEOUT());
5753         if (ret == ETIME) {
5754                 fprintf(stderr, "Timed out waiting to disable takeover runs\n");
5755         } else {
5756                 ret = (state.status >= 0 ? 0 : 1);
5757         }
5758
5759 fail:
5760         ctdb_client_remove_message_handler(ctdb->ev, ctdb->client,
5761                                            disable.srvid, &state);
5762         return ret;
5763 }
5764
5765 static int control_reloadips(TALLOC_CTX *mem_ctx, struct ctdb_context *ctdb,
5766                              int argc, const char **argv)
5767 {
5768         const char *nodestring = NULL;
5769         struct ctdb_node_map *nodemap, *nodemap2;
5770         struct ctdb_req_control request;
5771         uint32_t *pnn_list, *pnn_list2;
5772         int ret, count, count2;
5773
5774         if (argc > 1) {
5775                 usage("reloadips");
5776         }
5777
5778         if (argc == 1) {
5779                 nodestring = argv[0];
5780         }
5781
5782         nodemap = get_nodemap(ctdb, false);
5783         if (nodemap == NULL) {
5784                 return 1;
5785         }
5786
5787         if (! parse_nodestring(mem_ctx, ctdb, nodestring, &nodemap2)) {
5788                 return 1;
5789         }
5790
5791         count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
5792                                         mem_ctx, &pnn_list);
5793         if (count <= 0) {
5794                 fprintf(stderr, "Memory allocation error\n");
5795                 return 1;
5796         }
5797
5798         count2 = list_of_active_nodes(nodemap2, CTDB_UNKNOWN_PNN,
5799                                       mem_ctx, &pnn_list2);
5800         if (count2 <= 0) {
5801                 fprintf(stderr, "Memory allocation error\n");
5802                 return 1;
5803         }
5804
5805         /* Disable takeover runs on all connected nodes.  A reply
5806          * indicating success is needed from each node so all nodes
5807          * will need to be active.
5808          *
5809          * A check could be added to not allow reloading of IPs when
5810          * there are disconnected nodes.  However, this should
5811          * probably be left up to the administrator.
5812          */
5813         ret = disable_takeover_runs(mem_ctx, ctdb, 2*options.timelimit,
5814                                     pnn_list, count);
5815         if (ret != 0) {
5816                 fprintf(stderr, "Failed to disable takeover runs\n");
5817                 return ret;
5818         }
5819
5820         /* Now tell all the desired nodes to reload their public IPs.
5821          * Keep trying this until it succeeds.  This assumes all
5822          * failures are transient, which might not be true...
5823          */
5824         ctdb_req_control_reload_public_ips(&request);
5825         ret = ctdb_client_control_multi(mem_ctx, ctdb->ev, ctdb->client,
5826                                         pnn_list2, count2, TIMEOUT(),
5827                                         &request, NULL, NULL);
5828         if (ret != 0) {
5829                 fprintf(stderr, "Failed to reload IPs on some nodes.\n");
5830         }
5831
5832         /* It isn't strictly necessary to wait until takeover runs are
5833          * re-enabled but doing so can't hurt.
5834          */
5835         ret = disable_takeover_runs(mem_ctx, ctdb, 0, pnn_list, count);
5836         if (ret != 0) {
5837                 fprintf(stderr, "Failed to enable takeover runs\n");
5838                 return ret;
5839         }
5840
5841         return ipreallocate(mem_ctx, ctdb);
5842 }
5843
5844
5845 static const struct ctdb_cmd {
5846         const char *name;
5847         int (*fn)(TALLOC_CTX *, struct ctdb_context *, int, const char **);
5848         bool without_daemon; /* can be run without daemon running ? */
5849         bool remote; /* can be run on remote nodes */
5850         const char *msg;
5851         const char *args;
5852 } ctdb_commands[] = {
5853         { "version", control_version, true, false,
5854                 "show version of ctdb", NULL },
5855         { "status", control_status, false, true,
5856                 "show node status", NULL },
5857         { "uptime", control_uptime, false, true,
5858                 "show node uptime", NULL },
5859         { "ping", control_ping, false, true,
5860                 "ping a node", NULL },
5861         { "runstate", control_runstate, false, true,
5862                 "get/check runstate of a node",
5863                 "[setup|first_recovery|startup|running]" },
5864         { "getvar", control_getvar, false, true,
5865                 "get a tunable variable", "<name>" },
5866         { "setvar", control_setvar, false, true,
5867                 "set a tunable variable", "<name> <value>" },
5868         { "listvars", control_listvars, false, true,
5869                 "list tunable variables", NULL },
5870         { "statistics", control_statistics, false, true,
5871                 "show ctdb statistics", NULL },
5872         { "statisticsreset", control_statistics_reset, false, true,
5873                 "reset ctdb statistics", NULL },
5874         { "stats", control_stats, false, true,
5875                 "show rolling statistics", "[count]" },
5876         { "ip", control_ip, false, true,
5877                 "show public ips", "[all]" },
5878         { "ipinfo", control_ipinfo, false, true,
5879                 "show public ip details", "<ip>" },
5880         { "ifaces", control_ifaces, false, true,
5881                 "show interfaces", NULL },
5882         { "setifacelink", control_setifacelink, false, true,
5883                 "set interface link status", "<iface> up|down" },
5884         { "process-exists", control_process_exists, false, true,
5885                 "check if a process exists on a node",  "<pid> [<srvid>]" },
5886         { "getdbmap", control_getdbmap, false, true,
5887                 "show attached databases", NULL },
5888         { "getdbstatus", control_getdbstatus, false, true,
5889                 "show database status", "<dbname|dbid>" },
5890         { "catdb", control_catdb, false, false,
5891                 "dump cluster-wide ctdb database", "<dbname|dbid>" },
5892         { "cattdb", control_cattdb, false, false,
5893                 "dump local ctdb database", "<dbname|dbid>" },
5894         { "getcapabilities", control_getcapabilities, false, true,
5895                 "show node capabilities", NULL },
5896         { "pnn", control_pnn, false, false,
5897                 "show the pnn of the currnet node", NULL },
5898         { "lvs", control_lvs, false, false,
5899                 "show lvs configuration", "master|list|status" },
5900         { "setdebug", control_setdebug, false, true,
5901                 "set debug level", "ERROR|WARNING|NOTICE|INFO|DEBUG" },
5902         { "getdebug", control_getdebug, false, true,
5903                 "get debug level", NULL },
5904         { "attach", control_attach, false, false,
5905                 "attach a database", "<dbname> [persistent|replicated]" },
5906         { "detach", control_detach, false, false,
5907                 "detach database(s)", "<dbname|dbid> ..." },
5908         { "dumpmemory", control_dumpmemory, false, true,
5909                 "dump ctdbd memory map", NULL },
5910         { "rddumpmemory", control_rddumpmemory, false, true,
5911                 "dump recoverd memory map", NULL },
5912         { "getpid", control_getpid, false, true,
5913                 "get ctdbd process ID", NULL },
5914         { "disable", control_disable, false, true,
5915                 "disable a node", NULL },
5916         { "enable", control_enable, false, true,
5917                 "enable a node", NULL },
5918         { "stop", control_stop, false, true,
5919                 "stop a node", NULL },
5920         { "continue", control_continue, false, true,
5921                 "continue a stopped node", NULL },
5922         { "ban", control_ban, false, true,
5923                 "ban a node", "<bantime>"},
5924         { "unban", control_unban, false, true,
5925                 "unban a node", NULL },
5926         { "shutdown", control_shutdown, false, true,
5927                 "shutdown ctdb daemon", NULL },
5928         { "recover", control_recover, false, true,
5929                 "force recovery", NULL },
5930         { "sync", control_ipreallocate, false, true,
5931                 "run ip reallocation (deprecated)", NULL },
5932         { "ipreallocate", control_ipreallocate, false, true,
5933                 "run ip reallocation", NULL },
5934         { "isnotrecmaster", control_isnotrecmaster, false, false,
5935                 "check if local node is the recmaster", NULL },
5936         { "gratarp", control_gratarp, false, true,
5937                 "send a gratuitous arp", "<ip> <interface>" },
5938         { "tickle", control_tickle, true, false,
5939                 "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
5940         { "gettickles", control_gettickles, false, true,
5941                 "get the list of tickles", "<ip> [<port>]" },
5942         { "addtickle", control_addtickle, false, true,
5943                 "add a tickle", "<ip>:<port> <ip>:<port>" },
5944         { "deltickle", control_deltickle, false, true,
5945                 "delete a tickle", "<ip>:<port> <ip>:<port>" },
5946         { "listnodes", control_listnodes, true, true,
5947                 "list nodes in the cluster", NULL },
5948         { "reloadnodes", control_reloadnodes, false, false,
5949                 "reload the nodes file all nodes", NULL },
5950         { "moveip", control_moveip, false, false,
5951                 "move an ip address to another node", "<ip> <node>" },
5952         { "addip", control_addip, false, true,
5953                 "add an ip address to a node", "<ip/mask> <iface>" },
5954         { "delip", control_delip, false, true,
5955                 "delete an ip address from a node", "<ip>" },
5956         { "backupdb", control_backupdb, false, false,
5957                 "backup a database into a file", "<dbname|dbid> <file>" },
5958         { "restoredb", control_restoredb, false, false,
5959                 "restore a database from a file", "<file> [dbname]" },
5960         { "dumpdbbackup", control_dumpdbbackup, true, false,
5961                 "dump database from a backup file", "<file>" },
5962         { "wipedb", control_wipedb, false, false,
5963                 "wipe the contents of a database.", "<dbname|dbid>"},
5964         { "recmaster", control_recmaster, false, true,
5965                 "show the pnn for the recovery master", NULL },
5966         { "event", control_event, true, false,
5967                 "event and event script commands", NULL },
5968         { "scriptstatus", control_scriptstatus, true, false,
5969                 "show event script status",
5970                 "[init|setup|startup|monitor|takeip|releaseip|ipreallocated]" },
5971         { "natgw", control_natgw, false, false,
5972                 "show natgw configuration", "master|list|status" },
5973         { "getreclock", control_getreclock, false, true,
5974                 "get recovery lock file", NULL },
5975         { "setlmasterrole", control_setlmasterrole, false, true,
5976                 "set LMASTER role", "on|off" },
5977         { "setrecmasterrole", control_setrecmasterrole, false, true,
5978                 "set RECMASTER role", "on|off"},
5979         { "setdbreadonly", control_setdbreadonly, false, true,
5980                 "enable readonly records", "<dbname|dbid>" },
5981         { "setdbsticky", control_setdbsticky, false, true,
5982                 "enable sticky records", "<dbname|dbid>"},
5983         { "pfetch", control_pfetch, false, false,
5984                 "fetch record from persistent database", "<dbname|dbid> <key>" },
5985         { "pstore", control_pstore, false, false,
5986                 "write record to persistent database", "<dbname|dbid> <key> <value>" },
5987         { "pdelete", control_pdelete, false, false,
5988                 "delete record from persistent database", "<dbname|dbid> <key>" },
5989         { "ptrans", control_ptrans, false, false,
5990                 "update a persistent database (from file or stdin)", "<dbname|dbid> [<file>]" },
5991         { "tfetch", control_tfetch, false, true,
5992                 "fetch a record", "<tdb-file> <key> [<file>]" },
5993         { "tstore", control_tstore, false, true,
5994                 "store a record", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
5995         { "readkey", control_readkey, false, false,
5996                 "read value of a database key", "<dbname|dbid> <key> [readonly]" },
5997         { "writekey", control_writekey, false, false,
5998                 "write value for a database key", "<dbname|dbid> <key> <value>" },
5999         { "deletekey", control_deletekey, false, false,
6000                 "delete a database key", "<dbname|dbid> <key>" },
6001         { "checktcpport", control_checktcpport, true, false,
6002                 "check if a service is bound to a specific tcp port or not", "<port>" },
6003         { "getdbseqnum", control_getdbseqnum, false, false,
6004                 "get database sequence number", "<dbname|dbid>" },
6005         { "nodestatus", control_nodestatus, false, true,
6006                 "show and return node status", "[all|<pnn-list>]" },
6007         { "dbstatistics", control_dbstatistics, false, true,
6008                 "show database statistics", "<dbname|dbid>" },
6009         { "reloadips", control_reloadips, false, false,
6010                 "reload the public addresses file", "[all|<pnn-list>]" },
6011 };
6012
6013 static const struct ctdb_cmd *match_command(const char *command)
6014 {
6015         const struct ctdb_cmd *cmd;
6016         int i;
6017
6018         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6019                 cmd = &ctdb_commands[i];
6020                 if (strlen(command) == strlen(cmd->name) &&
6021                     strncmp(command, cmd->name, strlen(command)) == 0) {
6022                         return cmd;
6023                 }
6024         }
6025
6026         return NULL;
6027 }
6028
6029
6030 /**
6031  * Show usage message
6032  */
6033 static void usage_full(void)
6034 {
6035         int i;
6036
6037         poptPrintHelp(pc, stdout, 0);
6038         printf("\nCommands:\n");
6039         for (i=0; i<ARRAY_SIZE(ctdb_commands); i++) {
6040                 printf("  %-15s %-27s  %s\n",
6041                        ctdb_commands[i].name,
6042                        ctdb_commands[i].args ? ctdb_commands[i].args : "",
6043                        ctdb_commands[i].msg);
6044         }
6045 }
6046
6047 static void usage(const char *command)
6048 {
6049         const struct ctdb_cmd *cmd;
6050
6051         if (command == NULL) {
6052                 usage_full();
6053                 exit(1);
6054         }
6055
6056         cmd = match_command(command);
6057         if (cmd == NULL) {
6058                 usage_full();
6059         } else {
6060                 poptPrintUsage(pc, stdout, 0);
6061                 printf("\nCommands:\n");
6062                 printf("  %-15s %-27s  %s\n",
6063                        cmd->name, cmd->args ? cmd->args : "", cmd->msg);
6064         }
6065
6066         exit(1);
6067 }
6068
6069 struct poptOption cmdline_options[] = {
6070         POPT_AUTOHELP
6071         { "debug", 'd', POPT_ARG_STRING, &options.debuglevelstr, 0,
6072                 "debug level"},
6073         { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0,
6074                 "timelimit (in seconds)" },
6075         { "node", 'n', POPT_ARG_INT, &options.pnn, 0,
6076                 "node specification - integer" },
6077         { NULL, 'Y', POPT_ARG_NONE, &options.machinereadable, 0,
6078                 "enable machine readable output", NULL },
6079         { "separator", 'x', POPT_ARG_STRING, &options.sep, 0,
6080                 "specify separator for machine readable output", "CHAR" },
6081         { NULL, 'X', POPT_ARG_NONE, &options.machineparsable, 0,
6082                 "enable machine parsable output with separator |", NULL },
6083         { "verbose", 'v', POPT_ARG_NONE, &options.verbose, 0,
6084                 "enable verbose output", NULL },
6085         { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0,
6086                 "die if runtime exceeds this limit (in seconds)" },
6087         POPT_TABLEEND
6088 };
6089
6090 static int process_command(const struct ctdb_cmd *cmd, int argc,
6091                            const char **argv)
6092 {
6093         TALLOC_CTX *tmp_ctx;
6094         struct ctdb_context *ctdb;
6095         const char *ctdb_socket;
6096         int ret;
6097         bool status;
6098         uint64_t srvid_offset;
6099
6100         tmp_ctx = talloc_new(NULL);
6101         if (tmp_ctx == NULL) {
6102                 fprintf(stderr, "Memory allocation error\n");
6103                 goto fail;
6104         }
6105
6106         if (cmd->without_daemon) {
6107                 if (options.pnn != -1) {
6108                         fprintf(stderr,
6109                                 "Cannot specify node for command %s\n",
6110                                 cmd->name);
6111                         goto fail;
6112                 }
6113
6114                 ret = cmd->fn(tmp_ctx, NULL, argc-1, argv+1);
6115                 talloc_free(tmp_ctx);
6116                 return ret;
6117         }
6118
6119         ctdb = talloc_zero(tmp_ctx, struct ctdb_context);
6120         if (ctdb == NULL) {
6121                 fprintf(stderr, "Memory allocation error\n");
6122                 goto fail;
6123         }
6124
6125         ctdb->ev = tevent_context_init(ctdb);
6126         if (ctdb->ev == NULL) {
6127                 fprintf(stderr, "Failed to initialize tevent\n");
6128                 goto fail;
6129         }
6130
6131         ctdb_socket = path_socket(ctdb, "ctdbd");
6132         if (ctdb_socket == NULL) {
6133                 fprintf(stderr, "Memory allocation error\n");
6134                 goto fail;
6135         }
6136
6137         ret = ctdb_client_init(ctdb, ctdb->ev, ctdb_socket, &ctdb->client);
6138         if (ret != 0) {
6139                 fprintf(stderr, "Failed to connect to CTDB daemon (%s)\n",
6140                         ctdb_socket);
6141
6142                 if (!find_node_xpnn(ctdb, NULL)) {
6143                         fprintf(stderr, "Is this node part of CTDB cluster?\n");
6144                 }
6145                 goto fail;
6146         }
6147
6148         ctdb->pnn = ctdb_client_pnn(ctdb->client);
6149         srvid_offset = getpid() & 0xFFFF;
6150         ctdb->srvid = SRVID_CTDB_TOOL | (srvid_offset << 16);
6151
6152         if (options.pnn != -1) {
6153                 status = verify_pnn(ctdb, options.pnn);
6154                 if (! status) {
6155                         goto fail;
6156                 }
6157
6158                 ctdb->cmd_pnn = options.pnn;
6159         } else {
6160                 ctdb->cmd_pnn = ctdb->pnn;
6161         }
6162
6163         if (! cmd->remote && ctdb->pnn != ctdb->cmd_pnn) {
6164                 fprintf(stderr, "Node cannot be specified for command %s\n",
6165                         cmd->name);
6166                 goto fail;
6167         }
6168
6169         ret = cmd->fn(tmp_ctx, ctdb, argc-1, argv+1);
6170         talloc_free(tmp_ctx);
6171         return ret;
6172
6173 fail:
6174         talloc_free(tmp_ctx);
6175         return 1;
6176 }
6177
6178 static void signal_handler(int sig)
6179 {
6180         fprintf(stderr, "Maximum runtime exceeded - exiting\n");
6181 }
6182
6183 static void alarm_handler(int sig)
6184 {
6185         /* Kill any child processes */
6186         signal(SIGTERM, signal_handler);
6187         kill(0, SIGTERM);
6188
6189         _exit(1);
6190 }
6191
6192 int main(int argc, const char *argv[])
6193 {
6194         int opt;
6195         const char **extra_argv;
6196         int extra_argc;
6197         const struct ctdb_cmd *cmd;
6198         int loglevel;
6199         bool ok;
6200         int ret;
6201
6202         setlinebuf(stdout);
6203
6204         /* Set default options */
6205         options.debuglevelstr = NULL;
6206         options.timelimit = 10;
6207         options.sep = "|";
6208         options.maxruntime = 0;
6209         options.pnn = -1;
6210
6211         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
6212                             POPT_CONTEXT_KEEP_FIRST);
6213         while ((opt = poptGetNextOpt(pc)) != -1) {
6214                 fprintf(stderr, "Invalid option %s: %s\n",
6215                         poptBadOption(pc, 0), poptStrerror(opt));
6216                 exit(1);
6217         }
6218
6219         if (options.maxruntime == 0) {
6220                 const char *ctdb_timeout;
6221
6222                 ctdb_timeout = getenv("CTDB_TIMEOUT");
6223                 if (ctdb_timeout != NULL) {
6224                         options.maxruntime = strtoul(ctdb_timeout, NULL, 0);
6225                 } else {
6226                         options.maxruntime = 120;
6227                 }
6228         }
6229         if (options.maxruntime <= 120) {
6230                 /* default timeout is 120 seconds */
6231                 options.maxruntime = 120;
6232         }
6233
6234         if (options.machineparsable) {
6235                 options.machinereadable = 1;
6236         }
6237
6238         /* setup the remaining options for the commands */
6239         extra_argc = 0;
6240         extra_argv = poptGetArgs(pc);
6241         if (extra_argv) {
6242                 extra_argv++;
6243                 while (extra_argv[extra_argc]) extra_argc++;
6244         }
6245
6246         if (extra_argc < 1) {
6247                 usage(NULL);
6248         }
6249
6250         cmd = match_command(extra_argv[0]);
6251         if (cmd == NULL) {
6252                 fprintf(stderr, "Unknown command '%s'\n", extra_argv[0]);
6253                 exit(1);
6254         }
6255
6256         /* Enable logging */
6257         setup_logging("ctdb", DEBUG_STDERR);
6258         ok = debug_level_parse(options.debuglevelstr, &loglevel);
6259         if (!ok) {
6260                 loglevel = DEBUG_ERR;
6261         }
6262         debuglevel_set(loglevel);
6263
6264         signal(SIGALRM, alarm_handler);
6265         alarm(options.maxruntime);
6266
6267         ret = process_command(cmd, extra_argc, extra_argv);
6268         if (ret == -1) {
6269                 ret = 1;
6270         }
6271
6272         (void)poptFreeContext(pc);
6273
6274         return ret;
6275 }