ctdb-tests: Implement database attach control in fake_ctdbd
[metze/samba/wip.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24
25 #include <popt.h>
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/async_req/async_sock.h"
35
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "protocol/protocol_util.h"
39 #include "protocol/protocol_private.h"
40
41 #include "common/comm.h"
42 #include "common/logging.h"
43 #include "common/tunable.h"
44 #include "common/srvid.h"
45
46 #include "ipalloc_read_known_ips.h"
47
48
49 #define CTDB_PORT 4379
50
51 /* A fake flag that is only supported by some functions */
52 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
53
54 struct node {
55         ctdb_sock_addr addr;
56         uint32_t pnn;
57         uint32_t flags;
58         uint32_t capabilities;
59         bool recovery_disabled;
60         void *recovery_substate;
61 };
62
63 struct node_map {
64         uint32_t num_nodes;
65         struct node *node;
66         uint32_t pnn;
67         uint32_t recmaster;
68 };
69
70 struct interface {
71         const char *name;
72         bool link_up;
73         uint32_t references;
74 };
75
76 struct interface_map {
77         int num;
78         struct interface *iface;
79 };
80
81 struct vnn_map {
82         uint32_t recmode;
83         uint32_t generation;
84         uint32_t size;
85         uint32_t *map;
86 };
87
88 struct database {
89         struct database *prev, *next;
90         const char *name;
91         const char *path;
92         struct tdb_context *tdb;
93         uint32_t id;
94         uint8_t flags;
95         uint64_t seq_num;
96 };
97
98 struct database_map {
99         struct database *db;
100         const char *dbdir;
101 };
102
103 struct fake_control_failure {
104         struct fake_control_failure  *prev, *next;
105         enum ctdb_controls opcode;
106         uint32_t pnn;
107         const char *error;
108         const char *comment;
109 };
110
111 struct ctdb_client {
112         struct ctdb_client *prev, *next;
113         struct ctdbd_context *ctdb;
114         pid_t pid;
115         void *state;
116 };
117
118 struct ctdbd_context {
119         struct node_map *node_map;
120         struct interface_map *iface_map;
121         struct vnn_map *vnn_map;
122         struct database_map *db_map;
123         struct srvid_context *srv;
124         int num_clients;
125         struct timeval start_time;
126         struct timeval recovery_start_time;
127         struct timeval recovery_end_time;
128         bool takeover_disabled;
129         int log_level;
130         enum ctdb_runstate runstate;
131         struct ctdb_tunable_list tun_list;
132         char *reclock;
133         struct ctdb_public_ip_list *known_ips;
134         struct fake_control_failure *control_failures;
135         struct ctdb_client *client_list;
136 };
137
138 /*
139  * Parse routines
140  */
141
142 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
143 {
144         struct node_map *node_map;
145
146         node_map = talloc_zero(mem_ctx, struct node_map);
147         if (node_map == NULL) {
148                 return NULL;
149         }
150
151         node_map->pnn = CTDB_UNKNOWN_PNN;
152         node_map->recmaster = CTDB_UNKNOWN_PNN;
153
154         return node_map;
155 }
156
157 /* Read a nodemap from stdin.  Each line looks like:
158  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
159  * EOF or a blank line terminates input.
160  *
161  * By default, capablities for each node are
162  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
163  * capabilities can be faked off by adding, for example,
164  * -CTDB_CAP_RECMASTER.
165  */
166
167 static bool nodemap_parse(struct node_map *node_map)
168 {
169         char line[1024];
170
171         while ((fgets(line, sizeof(line), stdin) != NULL)) {
172                 uint32_t pnn, flags, capabilities;
173                 char *tok, *t;
174                 char *ip;
175                 ctdb_sock_addr saddr;
176                 struct node *node;
177                 int ret;
178
179                 if (line[0] == '\n') {
180                         break;
181                 }
182
183                 /* Get rid of pesky newline */
184                 if ((t = strchr(line, '\n')) != NULL) {
185                         *t = '\0';
186                 }
187
188                 /* Get PNN */
189                 tok = strtok(line, " \t");
190                 if (tok == NULL) {
191                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
192                         continue;
193                 }
194                 pnn = (uint32_t)strtoul(tok, NULL, 0);
195
196                 /* Get IP */
197                 tok = strtok(NULL, " \t");
198                 if (tok == NULL) {
199                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
200                         continue;
201                 }
202                 ret = ctdb_sock_addr_from_string(tok, &saddr, false);
203                 if (ret != 0) {
204                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
205                         continue;
206                 }
207                 ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
208                 ip = talloc_strdup(node_map, tok);
209                 if (ip == NULL) {
210                         goto fail;
211                 }
212
213                 /* Get flags */
214                 tok = strtok(NULL, " \t");
215                 if (tok == NULL) {
216                         fprintf(stderr, "bad line (%s) - missing flags\n",
217                                 line);
218                         continue;
219                 }
220                 flags = (uint32_t)strtoul(tok, NULL, 0);
221                 /* Handle deleted nodes */
222                 if (flags & NODE_FLAGS_DELETED) {
223                         talloc_free(ip);
224                         ip = talloc_strdup(node_map, "0.0.0.0");
225                         if (ip == NULL) {
226                                 goto fail;
227                         }
228                 }
229                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
230
231                 tok = strtok(NULL, " \t");
232                 while (tok != NULL) {
233                         if (strcmp(tok, "CURRENT") == 0) {
234                                 node_map->pnn = pnn;
235                         } else if (strcmp(tok, "RECMASTER") == 0) {
236                                 node_map->recmaster = pnn;
237                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
238                                 capabilities &= ~CTDB_CAP_RECMASTER;
239                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
240                                 capabilities &= ~CTDB_CAP_LMASTER;
241                         } else if (strcmp(tok, "TIMEOUT") == 0) {
242                                 /* This can be done with just a flag
243                                  * value but it is probably clearer
244                                  * and less error-prone to fake this
245                                  * with an explicit token */
246                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
247                         }
248                         tok = strtok(NULL, " \t");
249                 }
250
251                 node_map->node = talloc_realloc(node_map, node_map->node,
252                                                 struct node,
253                                                 node_map->num_nodes + 1);
254                 if (node_map->node == NULL) {
255                         goto fail;
256                 }
257                 node = &node_map->node[node_map->num_nodes];
258
259                 ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
260                 if (ret != 0) {
261                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
262                         continue;
263                 }
264                 ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
265                 node->pnn = pnn;
266                 node->flags = flags;
267                 node->capabilities = capabilities;
268                 node->recovery_disabled = false;
269                 node->recovery_substate = NULL;
270
271                 node_map->num_nodes += 1;
272         }
273
274         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
275         return true;
276
277 fail:
278         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
279         return false;
280
281 }
282
283 /* Append a node to a node map with given address and flags */
284 static bool node_map_add(struct ctdb_node_map *nodemap,
285                          const char *nstr, uint32_t flags)
286 {
287         ctdb_sock_addr addr;
288         uint32_t num;
289         struct ctdb_node_and_flags *n;
290         int ret;
291
292         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
293         if (ret != 0) {
294                 fprintf(stderr, "Invalid IP address %s\n", nstr);
295                 return false;
296         }
297         ctdb_sock_addr_set_port(&addr, CTDB_PORT);
298
299         num = nodemap->num;
300         nodemap->node = talloc_realloc(nodemap, nodemap->node,
301                                        struct ctdb_node_and_flags, num+1);
302         if (nodemap->node == NULL) {
303                 return false;
304         }
305
306         n = &nodemap->node[num];
307         n->addr = addr;
308         n->pnn = num;
309         n->flags = flags;
310
311         nodemap->num = num+1;
312         return true;
313 }
314
315 /* Read a nodes file into a node map */
316 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
317                                                   const char *nlist)
318 {
319         char **lines;
320         int nlines;
321         int i;
322         struct ctdb_node_map *nodemap;
323
324         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
325         if (nodemap == NULL) {
326                 return NULL;
327         }
328
329         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
330         if (lines == NULL) {
331                 return NULL;
332         }
333
334         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
335                 nlines--;
336         }
337
338         for (i=0; i<nlines; i++) {
339                 char *node;
340                 uint32_t flags;
341                 size_t len;
342
343                 node = lines[i];
344                 /* strip leading spaces */
345                 while((*node == ' ') || (*node == '\t')) {
346                         node++;
347                 }
348
349                 len = strlen(node);
350
351                 /* strip trailing spaces */
352                 while ((len > 1) &&
353                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
354                 {
355                         node[len-1] = '\0';
356                         len--;
357                 }
358
359                 if (len == 0) {
360                         continue;
361                 }
362                 if (*node == '#') {
363                         /* A "deleted" node is a node that is
364                            commented out in the nodes file.  This is
365                            used instead of removing a line, which
366                            would cause subsequent nodes to change
367                            their PNN. */
368                         flags = NODE_FLAGS_DELETED;
369                         node = discard_const("0.0.0.0");
370                 } else {
371                         flags = 0;
372                 }
373                 if (! node_map_add(nodemap, node, flags)) {
374                         talloc_free(lines);
375                         TALLOC_FREE(nodemap);
376                         return NULL;
377                 }
378         }
379
380         talloc_free(lines);
381         return nodemap;
382 }
383
384 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
385                                              uint32_t pnn)
386 {
387         struct ctdb_node_map *nodemap;
388         char nodes_list[PATH_MAX];
389         const char *ctdb_base;
390         int num;
391
392         ctdb_base = getenv("CTDB_BASE");
393         if (ctdb_base == NULL) {
394                 D_ERR("CTDB_BASE is not set\n");
395                 return NULL;
396         }
397
398         /* read optional node-specific nodes file */
399         num = snprintf(nodes_list, sizeof(nodes_list),
400                        "%s/nodes.%d", ctdb_base, pnn);
401         if (num == sizeof(nodes_list)) {
402                 D_ERR("nodes file path too long\n");
403                 return NULL;
404         }
405         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
406         if (nodemap != NULL) {
407                 /* Fake a load failure for an empty nodemap */
408                 if (nodemap->num == 0) {
409                         talloc_free(nodemap);
410
411                         D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
412                         return NULL;
413                 }
414
415                 return nodemap;
416         }
417
418         /* read normal nodes file */
419         num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
420         if (num == sizeof(nodes_list)) {
421                 D_ERR("nodes file path too long\n");
422                 return NULL;
423         }
424         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
425         if (nodemap != NULL) {
426                 return nodemap;
427         }
428
429         DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
430         return NULL;
431 }
432
433 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
434 {
435         struct interface_map *iface_map;
436
437         iface_map = talloc_zero(mem_ctx, struct interface_map);
438         if (iface_map == NULL) {
439                 return NULL;
440         }
441
442         return iface_map;
443 }
444
445 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
446  * output:
447  *   :Name:LinkStatus:References:
448  *   :eth2:1:4294967294
449  *   :eth1:1:4294967292
450  */
451
452 static bool interfaces_parse(struct interface_map *iface_map)
453 {
454         char line[1024];
455
456         while ((fgets(line, sizeof(line), stdin) != NULL)) {
457                 uint16_t link_state;
458                 uint32_t references;
459                 char *tok, *t, *name;
460                 struct interface *iface;
461
462                 if (line[0] == '\n') {
463                         break;
464                 }
465
466                 /* Get rid of pesky newline */
467                 if ((t = strchr(line, '\n')) != NULL) {
468                         *t = '\0';
469                 }
470
471                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
472                         continue;
473                 }
474
475                 /* Leading colon... */
476                 // tok = strtok(line, ":");
477
478                 /* name */
479                 tok = strtok(line, ":");
480                 if (tok == NULL) {
481                         fprintf(stderr, "bad line (%s) - missing name\n", line);
482                         continue;
483                 }
484                 name = tok;
485
486                 /* link_state */
487                 tok = strtok(NULL, ":");
488                 if (tok == NULL) {
489                         fprintf(stderr, "bad line (%s) - missing link state\n",
490                                 line);
491                         continue;
492                 }
493                 link_state = (uint16_t)strtoul(tok, NULL, 0);
494
495                 /* references... */
496                 tok = strtok(NULL, ":");
497                 if (tok == NULL) {
498                         fprintf(stderr, "bad line (%s) - missing references\n",
499                                 line);
500                         continue;
501                 }
502                 references = (uint32_t)strtoul(tok, NULL, 0);
503
504                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
505                                                   struct interface,
506                                                   iface_map->num + 1);
507                 if (iface_map->iface == NULL) {
508                         goto fail;
509                 }
510
511                 iface = &iface_map->iface[iface_map->num];
512
513                 iface->name = talloc_strdup(iface_map, name);
514                 if (iface->name == NULL) {
515                         goto fail;
516                 }
517                 iface->link_up = link_state;
518                 iface->references = references;
519
520                 iface_map->num += 1;
521         }
522
523         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
524         return true;
525
526 fail:
527         fprintf(stderr, "Parsing interfaces failed\n");
528         return false;
529 }
530
531 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
532 {
533         struct vnn_map *vnn_map;
534
535         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
536         if (vnn_map == NULL) {
537                 fprintf(stderr, "Memory error\n");
538                 return NULL;
539         }
540         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
541         vnn_map->generation = INVALID_GENERATION;
542
543         return vnn_map;
544 }
545
546 /* Read vnn map.
547  * output:
548  *   <GENERATION>
549  *   <LMASTER0>
550  *   <LMASTER1>
551  *   ...
552  */
553
554 static bool vnnmap_parse(struct vnn_map *vnn_map)
555 {
556         char line[1024];
557
558         while (fgets(line, sizeof(line), stdin) != NULL) {
559                 uint32_t n;
560                 char *t;
561
562                 if (line[0] == '\n') {
563                         break;
564                 }
565
566                 /* Get rid of pesky newline */
567                 if ((t = strchr(line, '\n')) != NULL) {
568                         *t = '\0';
569                 }
570
571                 n = (uint32_t) strtol(line, NULL, 0);
572
573                 /* generation */
574                 if (vnn_map->generation == INVALID_GENERATION) {
575                         vnn_map->generation = n;
576                         continue;
577                 }
578
579                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
580                                               vnn_map->size + 1);
581                 if (vnn_map->map == NULL) {
582                         fprintf(stderr, "Memory error\n");
583                         goto fail;
584                 }
585
586                 vnn_map->map[vnn_map->size] = n;
587                 vnn_map->size += 1;
588         }
589
590         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
591         return true;
592
593 fail:
594         fprintf(stderr, "Parsing vnnmap failed\n");
595         return false;
596 }
597
598 static bool reclock_parse(struct ctdbd_context *ctdb)
599 {
600         char line[1024];
601         char *t;
602
603         if (fgets(line, sizeof(line), stdin) == NULL) {
604                 goto fail;
605         }
606
607         if (line[0] == '\n') {
608                 /* Recovery lock remains unset */
609                 goto ok;
610         }
611
612         /* Get rid of pesky newline */
613         if ((t = strchr(line, '\n')) != NULL) {
614                 *t = '\0';
615         }
616
617         ctdb->reclock = talloc_strdup(ctdb, line);
618         if (ctdb->reclock == NULL) {
619                 goto fail;
620         }
621 ok:
622         /* Swallow possible blank line following section.  Picky
623          * compiler settings don't allow the return value to be
624          * ignored, so make the compiler happy.
625          */
626         if (fgets(line, sizeof(line), stdin) == NULL) {
627                 ;
628         }
629         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
630         return true;
631
632 fail:
633         fprintf(stderr, "Parsing reclock failed\n");
634         return false;
635 }
636
637 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
638                                        const char *dbdir)
639 {
640         struct database_map *db_map;
641
642         db_map = talloc_zero(mem_ctx, struct database_map);
643         if (db_map == NULL) {
644                 return NULL;
645         }
646
647         db_map->dbdir = talloc_strdup(db_map, dbdir);
648         if (db_map->dbdir == NULL) {
649                 talloc_free(db_map);
650                 return NULL;
651         }
652
653         return db_map;
654 }
655
656 /* Read a database map from stdin.  Each line looks like:
657  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
658  * EOF or a blank line terminates input.
659  *
660  * By default, flags and seq_num are 0
661  */
662
663 static bool dbmap_parse(struct database_map *db_map)
664 {
665         char line[1024];
666
667         while ((fgets(line, sizeof(line), stdin) != NULL)) {
668                 uint32_t id;
669                 uint8_t flags = 0;
670                 uint32_t seq_num = 0;
671                 char *tok, *t;
672                 char *name;
673                 struct database *db;
674
675                 if (line[0] == '\n') {
676                         break;
677                 }
678
679                 /* Get rid of pesky newline */
680                 if ((t = strchr(line, '\n')) != NULL) {
681                         *t = '\0';
682                 }
683
684                 /* Get ID */
685                 tok = strtok(line, " \t");
686                 if (tok == NULL) {
687                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
688                         continue;
689                 }
690                 id = (uint32_t)strtoul(tok, NULL, 0);
691
692                 /* Get NAME */
693                 tok = strtok(NULL, " \t");
694                 if (tok == NULL) {
695                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
696                         continue;
697                 }
698                 name = talloc_strdup(db_map, tok);
699                 if (name == NULL) {
700                         goto fail;
701                 }
702
703                 /* Get flags */
704                 tok = strtok(NULL, " \t");
705                 while (tok != NULL) {
706                         if (strcmp(tok, "PERSISTENT") == 0) {
707                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
708                         } else if (strcmp(tok, "STICKY") == 0) {
709                                 flags |= CTDB_DB_FLAGS_STICKY;
710                         } else if (strcmp(tok, "READONLY") == 0) {
711                                 flags |= CTDB_DB_FLAGS_READONLY;
712                         } else if (strcmp(tok, "REPLICATED") == 0) {
713                                 flags |= CTDB_DB_FLAGS_REPLICATED;
714                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
715                                 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
716                                              CTDB_DB_FLAGS_REPLICATED;
717
718                                 if ((flags & nv) == 0) {
719                                         fprintf(stderr,
720                                                 "seq_num for volatile db\n");
721                                         goto fail;
722                                 }
723                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
724                         }
725
726                         tok = strtok(NULL, " \t");
727                 }
728
729                 db = talloc_zero(db_map, struct database);
730                 if (db == NULL) {
731                         goto fail;
732                 }
733
734                 db->id = id;
735                 db->name = talloc_steal(db, name);
736                 db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
737                 if (db->path == NULL) {
738                         talloc_free(db);
739                         goto fail;
740                 }
741                 db->flags = flags;
742                 db->seq_num = seq_num;
743
744                 DLIST_ADD_END(db_map->db, db);
745         }
746
747         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
748         return true;
749
750 fail:
751         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
752         return false;
753
754 }
755
756 static struct database *database_find(struct database_map *db_map,
757                                       uint32_t db_id)
758 {
759         struct database *db;
760
761         for (db = db_map->db; db != NULL; db = db->next) {
762                 if (db->id == db_id) {
763                         return db;
764                 }
765         }
766
767         return NULL;
768 }
769
770 static int database_count(struct database_map *db_map)
771 {
772         struct database *db;
773         int count = 0;
774
775         for (db = db_map->db; db != NULL; db = db->next) {
776                 count += 1;
777         }
778
779         return count;
780 }
781
782 static int database_flags(uint8_t db_flags)
783 {
784         int tdb_flags = 0;
785
786         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
787                 tdb_flags = TDB_DEFAULT;
788         } else {
789                 /* volatile and replicated use the same flags */
790                 tdb_flags = TDB_NOSYNC |
791                             TDB_CLEAR_IF_FIRST |
792                             TDB_INCOMPATIBLE_HASH;
793         }
794
795         tdb_flags |= TDB_DISALLOW_NESTING;
796
797         return tdb_flags;
798 }
799
800 static struct database *database_new(struct database_map *db_map,
801                                      const char *name, uint8_t flags)
802 {
803         struct database *db;
804         TDB_DATA key;
805         int tdb_flags;
806
807         db = talloc_zero(db_map, struct database);
808         if (db == NULL) {
809                 return NULL;
810         }
811
812         db->name = talloc_strdup(db, name);
813         if (db->name == NULL) {
814                 goto fail;
815         }
816
817         db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
818         if (db->path == NULL) {
819                 goto fail;
820         }
821
822         key.dsize = strlen(db->name) + 1;
823         key.dptr = discard_const(db->name);
824
825         db->id = tdb_jenkins_hash(&key);
826         db->flags = flags;
827
828         tdb_flags = database_flags(flags);
829
830         db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
831         if (db->tdb == NULL) {
832                 DBG_ERR("tdb_open\n");
833                 goto fail;
834         }
835
836         DLIST_ADD_END(db_map->db, db);
837         return db;
838
839 fail:
840         DBG_ERR("Memory error\n");
841         talloc_free(db);
842         return NULL;
843
844 }
845
846 static bool public_ips_parse(struct ctdbd_context *ctdb,
847                              uint32_t numnodes)
848 {
849         bool status;
850
851         if (numnodes == 0) {
852                 D_ERR("Must initialise nodemap before public IPs\n");
853                 return false;
854         }
855
856         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
857
858         status = (ctdb->known_ips != NULL);
859
860         if (status) {
861                 D_INFO("Parsing public IPs done\n");
862         } else {
863                 D_INFO("Parsing public IPs failed\n");
864         }
865
866         return status;
867 }
868
869 /* Read information about controls to fail.  Format is:
870  *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
871  */
872 static bool control_failures_parse(struct ctdbd_context *ctdb)
873 {
874         char line[1024];
875
876         while ((fgets(line, sizeof(line), stdin) != NULL)) {
877                 char *tok, *t;
878                 enum ctdb_controls opcode;
879                 uint32_t pnn;
880                 const char *error;
881                 const char *comment;
882                 struct fake_control_failure *failure = NULL;
883
884                 if (line[0] == '\n') {
885                         break;
886                 }
887
888                 /* Get rid of pesky newline */
889                 if ((t = strchr(line, '\n')) != NULL) {
890                         *t = '\0';
891                 }
892
893                 /* Get opcode */
894                 tok = strtok(line, " \t");
895                 if (tok == NULL) {
896                         D_ERR("bad line (%s) - missing opcode\n", line);
897                         continue;
898                 }
899                 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
900
901                 /* Get PNN */
902                 tok = strtok(NULL, " \t");
903                 if (tok == NULL) {
904                         D_ERR("bad line (%s) - missing PNN\n", line);
905                         continue;
906                 }
907                 pnn = (uint32_t)strtoul(tok, NULL, 0);
908
909                 /* Get error */
910                 tok = strtok(NULL, " \t");
911                 if (tok == NULL) {
912                         D_ERR("bad line (%s) - missing errno\n", line);
913                         continue;
914                 }
915                 error = talloc_strdup(ctdb, tok);
916                 if (error == NULL) {
917                         goto fail;
918                 }
919                 if (strcmp(error, "ERROR") != 0 &&
920                     strcmp(error, "TIMEOUT") != 0) {
921                         D_ERR("bad line (%s) "
922                               "- error must be \"ERROR\" or \"TIMEOUT\"\n",
923                               line);
924                         goto fail;
925                 }
926
927                 /* Get comment */
928                 tok = strtok(NULL, "\n"); /* rest of line */
929                 if (tok == NULL) {
930                         D_ERR("bad line (%s) - missing comment\n", line);
931                         continue;
932                 }
933                 comment = talloc_strdup(ctdb, tok);
934                 if (comment == NULL) {
935                         goto fail;
936                 }
937
938                 failure = talloc_zero(ctdb, struct fake_control_failure);
939                 if (failure == NULL) {
940                         goto fail;
941                 }
942
943                 failure->opcode = opcode;
944                 failure->pnn = pnn;
945                 failure->error = error;
946                 failure->comment = comment;
947
948                 DLIST_ADD(ctdb->control_failures, failure);
949         }
950
951         D_INFO("Parsing fake control failures done\n");
952         return true;
953
954 fail:
955         D_INFO("Parsing fake control failures failed\n");
956         return false;
957 }
958
959 /*
960  * Manage clients
961  */
962
963 static int ctdb_client_destructor(struct ctdb_client *client)
964 {
965         DLIST_REMOVE(client->ctdb->client_list, client);
966         return 0;
967 }
968
969 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
970                       void *client_state)
971 {
972         struct ctdb_client *client;
973
974         client = talloc_zero(client_state, struct ctdb_client);
975         if (client == NULL) {
976                 return ENOMEM;
977         }
978
979         client->ctdb = ctdb;
980         client->pid = client_pid;
981         client->state = client_state;
982
983         DLIST_ADD(ctdb->client_list, client);
984         talloc_set_destructor(client, ctdb_client_destructor);
985         return 0;
986 }
987
988 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
989 {
990         struct ctdb_client *client;
991
992         for (client=ctdb->client_list; client != NULL; client=client->next) {
993                 if (client->pid == client_pid) {
994                         return client->state;
995                 }
996         }
997
998         return NULL;
999 }
1000
1001 /*
1002  * CTDB context setup
1003  */
1004
1005 static uint32_t new_generation(uint32_t old_generation)
1006 {
1007         uint32_t generation;
1008
1009         while (1) {
1010                 generation = random();
1011                 if (generation != INVALID_GENERATION &&
1012                     generation != old_generation) {
1013                         break;
1014                 }
1015         }
1016
1017         return generation;
1018 }
1019
1020 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
1021                                          const char *dbdir)
1022 {
1023         struct ctdbd_context *ctdb;
1024         char line[1024];
1025         bool status;
1026         int ret;
1027
1028         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
1029         if (ctdb == NULL) {
1030                 return NULL;
1031         }
1032
1033         ctdb->node_map = nodemap_init(ctdb);
1034         if (ctdb->node_map == NULL) {
1035                 goto fail;
1036         }
1037
1038         ctdb->iface_map = interfaces_init(ctdb);
1039         if (ctdb->iface_map == NULL) {
1040                 goto fail;
1041         }
1042
1043         ctdb->vnn_map = vnnmap_init(ctdb);
1044         if (ctdb->vnn_map == NULL) {
1045                 goto fail;
1046         }
1047
1048         ctdb->db_map = dbmap_init(ctdb, dbdir);
1049         if (ctdb->db_map == NULL) {
1050                 goto fail;
1051         }
1052
1053         ret = srvid_init(ctdb, &ctdb->srv);
1054         if (ret != 0) {
1055                 goto fail;
1056         }
1057
1058         while (fgets(line, sizeof(line), stdin) != NULL) {
1059                 char *t;
1060
1061                 if ((t = strchr(line, '\n')) != NULL) {
1062                         *t = '\0';
1063                 }
1064
1065                 if (strcmp(line, "NODEMAP") == 0) {
1066                         status = nodemap_parse(ctdb->node_map);
1067                 } else if (strcmp(line, "IFACES") == 0) {
1068                         status = interfaces_parse(ctdb->iface_map);
1069                 } else if (strcmp(line, "VNNMAP") == 0) {
1070                         status = vnnmap_parse(ctdb->vnn_map);
1071                 } else if (strcmp(line, "DBMAP") == 0) {
1072                         status = dbmap_parse(ctdb->db_map);
1073                 } else if (strcmp(line, "PUBLICIPS") == 0) {
1074                         status = public_ips_parse(ctdb,
1075                                                   ctdb->node_map->num_nodes);
1076                 } else if (strcmp(line, "RECLOCK") == 0) {
1077                         status = reclock_parse(ctdb);
1078                 } else if (strcmp(line, "CONTROLFAILS") == 0) {
1079                         status = control_failures_parse(ctdb);
1080                 } else {
1081                         fprintf(stderr, "Unknown line %s\n", line);
1082                         status = false;
1083                 }
1084
1085                 if (! status) {
1086                         goto fail;
1087                 }
1088         }
1089
1090         ctdb->start_time = tevent_timeval_current();
1091         ctdb->recovery_start_time = tevent_timeval_current();
1092         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1093         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
1094                 ctdb->vnn_map->generation =
1095                         new_generation(ctdb->vnn_map->generation);
1096         }
1097         ctdb->recovery_end_time = tevent_timeval_current();
1098
1099         ctdb->log_level = DEBUG_ERR;
1100         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
1101
1102         ctdb_tunable_set_defaults(&ctdb->tun_list);
1103
1104         return ctdb;
1105
1106 fail:
1107         TALLOC_FREE(ctdb);
1108         return NULL;
1109 }
1110
1111 static bool ctdbd_verify(struct ctdbd_context *ctdb)
1112 {
1113         struct node *node;
1114         int i;
1115
1116         if (ctdb->node_map->num_nodes == 0) {
1117                 return true;
1118         }
1119
1120         /* Make sure all the nodes are in order */
1121         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1122                 node = &ctdb->node_map->node[i];
1123                 if (node->pnn != i) {
1124                         fprintf(stderr, "Expected node %u, found %u\n",
1125                                 i, node->pnn);
1126                         return false;
1127                 }
1128         }
1129
1130         node = &ctdb->node_map->node[ctdb->node_map->pnn];
1131         if (node->flags & NODE_FLAGS_DISCONNECTED) {
1132                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1133                 exit(0);
1134         }
1135
1136         return true;
1137 }
1138
1139 /*
1140  * Doing a recovery
1141  */
1142
1143 struct recover_state {
1144         struct tevent_context *ev;
1145         struct ctdbd_context *ctdb;
1146 };
1147
1148 static int recover_check(struct tevent_req *req);
1149 static void recover_wait_done(struct tevent_req *subreq);
1150 static void recover_done(struct tevent_req *subreq);
1151
1152 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1153                                        struct tevent_context *ev,
1154                                        struct ctdbd_context *ctdb)
1155 {
1156         struct tevent_req *req;
1157         struct recover_state *state;
1158         int ret;
1159
1160         req = tevent_req_create(mem_ctx, &state, struct recover_state);
1161         if (req == NULL) {
1162                 return NULL;
1163         }
1164
1165         state->ev = ev;
1166         state->ctdb = ctdb;
1167
1168         ret = recover_check(req);
1169         if (ret != 0) {
1170                 tevent_req_error(req, ret);
1171                 return tevent_req_post(req, ev);
1172         }
1173
1174         return req;
1175 }
1176
1177 static int recover_check(struct tevent_req *req)
1178 {
1179         struct recover_state *state = tevent_req_data(
1180                 req, struct recover_state);
1181         struct ctdbd_context *ctdb = state->ctdb;
1182         struct tevent_req *subreq;
1183         bool recovery_disabled;
1184         int i;
1185
1186         recovery_disabled = false;
1187         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1188                 if (ctdb->node_map->node[i].recovery_disabled) {
1189                         recovery_disabled = true;
1190                         break;
1191                 }
1192         }
1193
1194         subreq = tevent_wakeup_send(state, state->ev,
1195                                     tevent_timeval_current_ofs(1, 0));
1196         if (subreq == NULL) {
1197                 return ENOMEM;
1198         }
1199
1200         if (recovery_disabled) {
1201                 tevent_req_set_callback(subreq, recover_wait_done, req);
1202         } else {
1203                 ctdb->recovery_start_time = tevent_timeval_current();
1204                 tevent_req_set_callback(subreq, recover_done, req);
1205         }
1206
1207         return 0;
1208 }
1209
1210 static void recover_wait_done(struct tevent_req *subreq)
1211 {
1212         struct tevent_req *req = tevent_req_callback_data(
1213                 subreq, struct tevent_req);
1214         int ret;
1215         bool status;
1216
1217         status = tevent_wakeup_recv(subreq);
1218         TALLOC_FREE(subreq);
1219         if (! status) {
1220                 tevent_req_error(req, EIO);
1221                 return;
1222         }
1223
1224         ret = recover_check(req);
1225         if (ret != 0) {
1226                 tevent_req_error(req, ret);
1227         }
1228 }
1229
1230 static void recover_done(struct tevent_req *subreq)
1231 {
1232         struct tevent_req *req = tevent_req_callback_data(
1233                 subreq, struct tevent_req);
1234         struct recover_state *state = tevent_req_data(
1235                 req, struct recover_state);
1236         struct ctdbd_context *ctdb = state->ctdb;
1237         bool status;
1238
1239         status = tevent_wakeup_recv(subreq);
1240         TALLOC_FREE(subreq);
1241         if (! status) {
1242                 tevent_req_error(req, EIO);
1243                 return;
1244         }
1245
1246         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1247         ctdb->recovery_end_time = tevent_timeval_current();
1248         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1249
1250         tevent_req_done(req);
1251 }
1252
1253 static bool recover_recv(struct tevent_req *req, int *perr)
1254 {
1255         int err;
1256
1257         if (tevent_req_is_unix_error(req, &err)) {
1258                 if (perr != NULL) {
1259                         *perr = err;
1260                 }
1261                 return false;
1262         }
1263
1264         return true;
1265 }
1266
1267 /*
1268  * Routines for ctdb_req_header
1269  */
1270
1271 static void header_fix_pnn(struct ctdb_req_header *header,
1272                            struct ctdbd_context *ctdb)
1273 {
1274         if (header->srcnode == CTDB_CURRENT_NODE) {
1275                 header->srcnode = ctdb->node_map->pnn;
1276         }
1277
1278         if (header->destnode == CTDB_CURRENT_NODE) {
1279                 header->destnode = ctdb->node_map->pnn;
1280         }
1281 }
1282
1283 static struct ctdb_req_header header_reply_control(
1284                                         struct ctdb_req_header *header,
1285                                         struct ctdbd_context *ctdb)
1286 {
1287         struct ctdb_req_header reply_header;
1288
1289         reply_header = (struct ctdb_req_header) {
1290                 .ctdb_magic = CTDB_MAGIC,
1291                 .ctdb_version = CTDB_PROTOCOL,
1292                 .generation = ctdb->vnn_map->generation,
1293                 .operation = CTDB_REPLY_CONTROL,
1294                 .destnode = header->srcnode,
1295                 .srcnode = header->destnode,
1296                 .reqid = header->reqid,
1297         };
1298
1299         return reply_header;
1300 }
1301
1302 static struct ctdb_req_header header_reply_message(
1303                                         struct ctdb_req_header *header,
1304                                         struct ctdbd_context *ctdb)
1305 {
1306         struct ctdb_req_header reply_header;
1307
1308         reply_header = (struct ctdb_req_header) {
1309                 .ctdb_magic = CTDB_MAGIC,
1310                 .ctdb_version = CTDB_PROTOCOL,
1311                 .generation = ctdb->vnn_map->generation,
1312                 .operation = CTDB_REQ_MESSAGE,
1313                 .destnode = header->srcnode,
1314                 .srcnode = header->destnode,
1315                 .reqid = 0,
1316         };
1317
1318         return reply_header;
1319 }
1320
1321 /*
1322  * Client state
1323  */
1324
1325 struct client_state {
1326         struct tevent_context *ev;
1327         int fd;
1328         struct ctdbd_context *ctdb;
1329         int pnn;
1330         pid_t pid;
1331         struct comm_context *comm;
1332         struct srvid_register_state *rstate;
1333         int status;
1334 };
1335
1336 /*
1337  * Send replies to controls and messages
1338  */
1339
1340 static void client_reply_done(struct tevent_req *subreq);
1341
1342 static void client_send_message(struct tevent_req *req,
1343                                 struct ctdb_req_header *header,
1344                                 struct ctdb_req_message_data *message)
1345 {
1346         struct client_state *state = tevent_req_data(
1347                 req, struct client_state);
1348         struct ctdbd_context *ctdb = state->ctdb;
1349         struct tevent_req *subreq;
1350         struct ctdb_req_header reply_header;
1351         uint8_t *buf;
1352         size_t datalen, buflen;
1353         int ret;
1354
1355         reply_header = header_reply_message(header, ctdb);
1356
1357         datalen = ctdb_req_message_data_len(&reply_header, message);
1358         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1359         if (ret != 0) {
1360                 tevent_req_error(req, ret);
1361                 return;
1362         }
1363
1364         ret = ctdb_req_message_data_push(&reply_header, message,
1365                                          buf, &buflen);
1366         if (ret != 0) {
1367                 tevent_req_error(req, ret);
1368                 return;
1369         }
1370
1371         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1372
1373         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1374         if (tevent_req_nomem(subreq, req)) {
1375                 return;
1376         }
1377         tevent_req_set_callback(subreq, client_reply_done, req);
1378
1379         talloc_steal(subreq, buf);
1380 }
1381
1382 static void client_send_control(struct tevent_req *req,
1383                                 struct ctdb_req_header *header,
1384                                 struct ctdb_reply_control *reply)
1385 {
1386         struct client_state *state = tevent_req_data(
1387                 req, struct client_state);
1388         struct ctdbd_context *ctdb = state->ctdb;
1389         struct tevent_req *subreq;
1390         struct ctdb_req_header reply_header;
1391         uint8_t *buf;
1392         size_t datalen, buflen;
1393         int ret;
1394
1395         reply_header = header_reply_control(header, ctdb);
1396
1397         datalen = ctdb_reply_control_len(&reply_header, reply);
1398         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1399         if (ret != 0) {
1400                 tevent_req_error(req, ret);
1401                 return;
1402         }
1403
1404         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1405         if (ret != 0) {
1406                 tevent_req_error(req, ret);
1407                 return;
1408         }
1409
1410         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1411
1412         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1413         if (tevent_req_nomem(subreq, req)) {
1414                 return;
1415         }
1416         tevent_req_set_callback(subreq, client_reply_done, req);
1417
1418         talloc_steal(subreq, buf);
1419 }
1420
1421 static void client_reply_done(struct tevent_req *subreq)
1422 {
1423         struct tevent_req *req = tevent_req_callback_data(
1424                 subreq, struct tevent_req);
1425         int ret;
1426         bool status;
1427
1428         status = comm_write_recv(subreq, &ret);
1429         TALLOC_FREE(subreq);
1430         if (! status) {
1431                 tevent_req_error(req, ret);
1432         }
1433 }
1434
1435 /*
1436  * Handling protocol - controls
1437  */
1438
1439 static void control_process_exists(TALLOC_CTX *mem_ctx,
1440                                    struct tevent_req *req,
1441                                    struct ctdb_req_header *header,
1442                                    struct ctdb_req_control *request)
1443 {
1444         struct client_state *state = tevent_req_data(
1445                 req, struct client_state);
1446         struct ctdbd_context *ctdb = state->ctdb;
1447         struct client_state *cstate;
1448         struct ctdb_reply_control reply;
1449
1450         reply.rdata.opcode = request->opcode;
1451
1452         cstate = client_find(ctdb, request->rdata.data.pid);
1453         if (cstate == NULL) {
1454                 reply.status = -1;
1455                 reply.errmsg = "No client for PID";
1456         } else {
1457                 reply.status = kill(request->rdata.data.pid, 0);
1458                 reply.errmsg = NULL;
1459         }
1460
1461         client_send_control(req, header, &reply);
1462 }
1463
1464 static void control_ping(TALLOC_CTX *mem_ctx,
1465                          struct tevent_req *req,
1466                          struct ctdb_req_header *header,
1467                          struct ctdb_req_control *request)
1468 {
1469         struct client_state *state = tevent_req_data(
1470                 req, struct client_state);
1471         struct ctdbd_context *ctdb = state->ctdb;
1472         struct ctdb_reply_control reply;
1473
1474         reply.rdata.opcode = request->opcode;
1475         reply.status = ctdb->num_clients;
1476         reply.errmsg = NULL;
1477
1478         client_send_control(req, header, &reply);
1479 }
1480
1481 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1482                               struct tevent_req *req,
1483                               struct ctdb_req_header *header,
1484                               struct ctdb_req_control *request)
1485 {
1486         struct client_state *state = tevent_req_data(
1487                 req, struct client_state);
1488         struct ctdbd_context *ctdb = state->ctdb;
1489         struct ctdb_reply_control reply;
1490         struct database *db;
1491
1492         reply.rdata.opcode = request->opcode;
1493
1494         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1495         if (db == NULL) {
1496                 reply.status = ENOENT;
1497                 reply.errmsg = "Database not found";
1498         } else {
1499                 reply.rdata.data.db_path =
1500                         talloc_strdup(mem_ctx, db->path);
1501                 if (reply.rdata.data.db_path == NULL) {
1502                         reply.status = ENOMEM;
1503                         reply.errmsg = "Memory error";
1504                 } else {
1505                         reply.status = 0;
1506                         reply.errmsg = NULL;
1507                 }
1508         }
1509
1510         client_send_control(req, header, &reply);
1511 }
1512
1513 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1514                               struct tevent_req *req,
1515                               struct ctdb_req_header *header,
1516                               struct ctdb_req_control *request)
1517 {
1518         struct client_state *state = tevent_req_data(
1519                 req, struct client_state);
1520         struct ctdbd_context *ctdb = state->ctdb;
1521         struct ctdb_reply_control reply;
1522         struct ctdb_vnn_map *vnnmap;
1523
1524         reply.rdata.opcode = request->opcode;
1525
1526         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1527         if (vnnmap == NULL) {
1528                 reply.status = ENOMEM;
1529                 reply.errmsg = "Memory error";
1530         } else {
1531                 vnnmap->generation = ctdb->vnn_map->generation;
1532                 vnnmap->size = ctdb->vnn_map->size;
1533                 vnnmap->map = ctdb->vnn_map->map;
1534
1535                 reply.rdata.data.vnnmap = vnnmap;
1536                 reply.status = 0;
1537                 reply.errmsg = NULL;
1538         }
1539
1540         client_send_control(req, header, &reply);
1541 }
1542
1543 static void control_get_debug(TALLOC_CTX *mem_ctx,
1544                               struct tevent_req *req,
1545                               struct ctdb_req_header *header,
1546                               struct ctdb_req_control *request)
1547 {
1548         struct client_state *state = tevent_req_data(
1549                 req, struct client_state);
1550         struct ctdbd_context *ctdb = state->ctdb;
1551         struct ctdb_reply_control reply;
1552
1553         reply.rdata.opcode = request->opcode;
1554         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1555         reply.status = 0;
1556         reply.errmsg = NULL;
1557
1558         client_send_control(req, header, &reply);
1559 }
1560
1561 static void control_set_debug(TALLOC_CTX *mem_ctx,
1562                               struct tevent_req *req,
1563                               struct ctdb_req_header *header,
1564                               struct ctdb_req_control *request)
1565 {
1566         struct client_state *state = tevent_req_data(
1567                 req, struct client_state);
1568         struct ctdbd_context *ctdb = state->ctdb;
1569         struct ctdb_reply_control reply;
1570
1571         ctdb->log_level = (int)request->rdata.data.loglevel;
1572
1573         reply.rdata.opcode = request->opcode;
1574         reply.status = 0;
1575         reply.errmsg = NULL;
1576
1577         client_send_control(req, header, &reply);
1578 }
1579
1580 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1581                               struct tevent_req *req,
1582                                struct ctdb_req_header *header,
1583                               struct ctdb_req_control *request)
1584 {
1585         struct client_state *state = tevent_req_data(
1586                 req, struct client_state);
1587         struct ctdbd_context *ctdb = state->ctdb;
1588         struct ctdb_reply_control reply;
1589         struct ctdb_dbid_map *dbmap;
1590         struct database *db;
1591         int i;
1592
1593         reply.rdata.opcode = request->opcode;
1594
1595         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1596         if (dbmap == NULL) {
1597                 goto fail;
1598         }
1599
1600         dbmap->num = database_count(ctdb->db_map);
1601         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1602         if (dbmap->dbs == NULL) {
1603                 goto fail;
1604         }
1605
1606         db = ctdb->db_map->db;
1607         for (i = 0; i < dbmap->num; i++) {
1608                 dbmap->dbs[i] = (struct ctdb_dbid) {
1609                         .db_id = db->id,
1610                         .flags = db->flags,
1611                 };
1612
1613                 db = db->next;
1614         }
1615
1616         reply.rdata.data.dbmap = dbmap;
1617         reply.status = 0;
1618         reply.errmsg = NULL;
1619         client_send_control(req, header, &reply);
1620         return;
1621
1622 fail:
1623         reply.status = -1;
1624         reply.errmsg = "Memory error";
1625         client_send_control(req, header, &reply);
1626 }
1627
1628 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1629                                 struct tevent_req *req,
1630                                 struct ctdb_req_header *header,
1631                                 struct ctdb_req_control *request)
1632 {
1633         struct client_state *state = tevent_req_data(
1634                 req, struct client_state);
1635         struct ctdbd_context *ctdb = state->ctdb;
1636         struct ctdb_reply_control reply;
1637
1638         reply.rdata.opcode = request->opcode;
1639         reply.status = ctdb->vnn_map->recmode;
1640         reply.errmsg = NULL;
1641
1642         client_send_control(req, header, &reply);
1643 }
1644
1645 struct set_recmode_state {
1646         struct tevent_req *req;
1647         struct ctdbd_context *ctdb;
1648         struct ctdb_req_header header;
1649         struct ctdb_reply_control reply;
1650 };
1651
1652 static void set_recmode_callback(struct tevent_req *subreq)
1653 {
1654         struct set_recmode_state *substate = tevent_req_callback_data(
1655                 subreq, struct set_recmode_state);
1656         bool status;
1657         int ret;
1658
1659         status = recover_recv(subreq, &ret);
1660         TALLOC_FREE(subreq);
1661         if (! status) {
1662                 substate->reply.status = ret;
1663                 substate->reply.errmsg = "recovery failed";
1664         } else {
1665                 substate->reply.status = 0;
1666                 substate->reply.errmsg = NULL;
1667         }
1668
1669         client_send_control(substate->req, &substate->header, &substate->reply);
1670         talloc_free(substate);
1671 }
1672
1673 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1674                                 struct tevent_req *req,
1675                                 struct ctdb_req_header *header,
1676                                 struct ctdb_req_control *request)
1677 {
1678         struct client_state *state = tevent_req_data(
1679                 req, struct client_state);
1680         struct tevent_req *subreq;
1681         struct ctdbd_context *ctdb = state->ctdb;
1682         struct set_recmode_state *substate;
1683         struct ctdb_reply_control reply;
1684
1685         reply.rdata.opcode = request->opcode;
1686
1687         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1688                 reply.status = -1;
1689                 reply.errmsg = "Client cannot set recmode to NORMAL";
1690                 goto fail;
1691         }
1692
1693         substate = talloc_zero(ctdb, struct set_recmode_state);
1694         if (substate == NULL) {
1695                 reply.status = -1;
1696                 reply.errmsg = "Memory error";
1697                 goto fail;
1698         }
1699
1700         substate->req = req;
1701         substate->ctdb = ctdb;
1702         substate->header = *header;
1703         substate->reply.rdata.opcode = request->opcode;
1704
1705         subreq = recover_send(substate, state->ev, state->ctdb);
1706         if (subreq == NULL) {
1707                 talloc_free(substate);
1708                 goto fail;
1709         }
1710         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1711
1712         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1713         return;
1714
1715 fail:
1716         client_send_control(req, header, &reply);
1717
1718 }
1719
1720 static void control_db_attach(TALLOC_CTX *mem_ctx,
1721                               struct tevent_req *req,
1722                               struct ctdb_req_header *header,
1723                               struct ctdb_req_control *request)
1724 {
1725         struct client_state *state = tevent_req_data(
1726                 req, struct client_state);
1727         struct ctdbd_context *ctdb = state->ctdb;
1728         struct ctdb_reply_control reply;
1729         struct database *db;
1730
1731         reply.rdata.opcode = request->opcode;
1732
1733         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
1734                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
1735                         goto done;
1736                 }
1737         }
1738
1739         db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
1740         if (db == NULL) {
1741                 reply.status = -1;
1742                 reply.errmsg = "Failed to attach database";
1743                 client_send_control(req, header, &reply);
1744                 return;
1745         }
1746
1747 done:
1748         reply.rdata.data.db_id = db->id;
1749         reply.status = 0;
1750         reply.errmsg = NULL;
1751         client_send_control(req, header, &reply);
1752 }
1753
1754 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1755 {
1756         printf("Received a message for SRVID 0x%"PRIx64"\n", srvid);
1757 }
1758
1759 static void control_register_srvid(TALLOC_CTX *mem_ctx,
1760                                    struct tevent_req *req,
1761                                    struct ctdb_req_header *header,
1762                                    struct ctdb_req_control *request)
1763 {
1764         struct client_state *state = tevent_req_data(
1765                 req, struct client_state);
1766         struct ctdbd_context *ctdb = state->ctdb;
1767         struct ctdb_reply_control reply;
1768         int ret;
1769
1770         reply.rdata.opcode = request->opcode;
1771
1772         ret = srvid_register(ctdb->srv, state, request->srvid,
1773                              srvid_handler, state);
1774         if (ret != 0) {
1775                 reply.status = -1;
1776                 reply.errmsg = "Memory error";
1777                 goto fail;
1778         }
1779
1780         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
1781
1782         reply.status = 0;
1783         reply.errmsg = NULL;
1784
1785 fail:
1786         client_send_control(req, header, &reply);
1787 }
1788
1789 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
1790                                      struct tevent_req *req,
1791                                      struct ctdb_req_header *header,
1792                                      struct ctdb_req_control *request)
1793 {
1794         struct client_state *state = tevent_req_data(
1795                 req, struct client_state);
1796         struct ctdbd_context *ctdb = state->ctdb;
1797         struct ctdb_reply_control reply;
1798         int ret;
1799
1800         reply.rdata.opcode = request->opcode;
1801
1802         ret = srvid_deregister(ctdb->srv, request->srvid, state);
1803         if (ret != 0) {
1804                 reply.status = -1;
1805                 reply.errmsg = "srvid not registered";
1806                 goto fail;
1807         }
1808
1809         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
1810
1811         reply.status = 0;
1812         reply.errmsg = NULL;
1813
1814 fail:
1815         client_send_control(req, header, &reply);
1816 }
1817
1818 static void control_get_dbname(TALLOC_CTX *mem_ctx,
1819                                struct tevent_req *req,
1820                                struct ctdb_req_header *header,
1821                                struct ctdb_req_control *request)
1822 {
1823         struct client_state *state = tevent_req_data(
1824                 req, struct client_state);
1825         struct ctdbd_context *ctdb = state->ctdb;
1826         struct ctdb_reply_control reply;
1827         struct database *db;
1828
1829         reply.rdata.opcode = request->opcode;
1830
1831         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1832         if (db == NULL) {
1833                 reply.status = ENOENT;
1834                 reply.errmsg = "Database not found";
1835         } else {
1836                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
1837                 if (reply.rdata.data.db_name == NULL) {
1838                         reply.status = ENOMEM;
1839                         reply.errmsg = "Memory error";
1840                 } else {
1841                         reply.status = 0;
1842                         reply.errmsg = NULL;
1843                 }
1844         }
1845
1846         client_send_control(req, header, &reply);
1847 }
1848
1849 static void control_get_pid(TALLOC_CTX *mem_ctx,
1850                             struct tevent_req *req,
1851                             struct ctdb_req_header *header,
1852                             struct ctdb_req_control *request)
1853 {
1854         struct ctdb_reply_control reply;
1855
1856         reply.rdata.opcode = request->opcode;
1857         reply.status = getpid();
1858         reply.errmsg = NULL;
1859
1860         client_send_control(req, header, &reply);
1861 }
1862
1863 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
1864                                   struct tevent_req *req,
1865                                   struct ctdb_req_header *header,
1866                                   struct ctdb_req_control *request)
1867 {
1868         struct client_state *state = tevent_req_data(
1869                 req, struct client_state);
1870         struct ctdbd_context *ctdb = state->ctdb;
1871         struct ctdb_reply_control reply;
1872
1873         reply.rdata.opcode = request->opcode;
1874         reply.status = ctdb->node_map->recmaster;
1875         reply.errmsg = NULL;
1876
1877         client_send_control(req, header, &reply);
1878 }
1879
1880 static void control_get_pnn(TALLOC_CTX *mem_ctx,
1881                             struct tevent_req *req,
1882                             struct ctdb_req_header *header,
1883                             struct ctdb_req_control *request)
1884 {
1885         struct ctdb_reply_control reply;
1886
1887         reply.rdata.opcode = request->opcode;
1888         reply.status = header->destnode;
1889         reply.errmsg = NULL;
1890
1891         client_send_control(req, header, &reply);
1892 }
1893
1894 static void control_shutdown(TALLOC_CTX *mem_ctx,
1895                              struct tevent_req *req,
1896                              struct ctdb_req_header *hdr,
1897                              struct ctdb_req_control *request)
1898 {
1899         struct client_state *state = tevent_req_data(
1900                 req, struct client_state);
1901
1902         state->status = 99;
1903 }
1904
1905 static void control_set_tunable(TALLOC_CTX *mem_ctx,
1906                                 struct tevent_req *req,
1907                                 struct ctdb_req_header *header,
1908                                 struct ctdb_req_control *request)
1909 {
1910         struct client_state *state = tevent_req_data(
1911                 req, struct client_state);
1912         struct ctdbd_context *ctdb = state->ctdb;
1913         struct ctdb_reply_control reply;
1914         bool ret, obsolete;
1915
1916         reply.rdata.opcode = request->opcode;
1917         reply.errmsg = NULL;
1918
1919         ret = ctdb_tunable_set_value(&ctdb->tun_list,
1920                                      request->rdata.data.tunable->name,
1921                                      request->rdata.data.tunable->value,
1922                                      &obsolete);
1923         if (! ret) {
1924                 reply.status = -1;
1925         } else if (obsolete) {
1926                 reply.status = 1;
1927         } else {
1928                 reply.status = 0;
1929         }
1930
1931         client_send_control(req, header, &reply);
1932 }
1933
1934 static void control_get_tunable(TALLOC_CTX *mem_ctx,
1935                                 struct tevent_req *req,
1936                                 struct ctdb_req_header *header,
1937                                 struct ctdb_req_control *request)
1938 {
1939         struct client_state *state = tevent_req_data(
1940                 req, struct client_state);
1941         struct ctdbd_context *ctdb = state->ctdb;
1942         struct ctdb_reply_control reply;
1943         uint32_t value;
1944         bool ret;
1945
1946         reply.rdata.opcode = request->opcode;
1947         reply.errmsg = NULL;
1948
1949         ret = ctdb_tunable_get_value(&ctdb->tun_list,
1950                                      request->rdata.data.tun_var, &value);
1951         if (! ret) {
1952                 reply.status = -1;
1953         } else {
1954                 reply.rdata.data.tun_value = value;
1955                 reply.status = 0;
1956         }
1957
1958         client_send_control(req, header, &reply);
1959 }
1960
1961 static void control_list_tunables(TALLOC_CTX *mem_ctx,
1962                                   struct tevent_req *req,
1963                                   struct ctdb_req_header *header,
1964                                   struct ctdb_req_control *request)
1965 {
1966         struct ctdb_reply_control reply;
1967         struct ctdb_var_list *var_list;
1968
1969         reply.rdata.opcode = request->opcode;
1970         reply.errmsg = NULL;
1971
1972         var_list = ctdb_tunable_names(mem_ctx);
1973         if (var_list == NULL) {
1974                 reply.status = -1;
1975         } else {
1976                 reply.rdata.data.tun_var_list = var_list;
1977                 reply.status = 0;
1978         }
1979
1980         client_send_control(req, header, &reply);
1981 }
1982
1983 static void control_modify_flags(TALLOC_CTX *mem_ctx,
1984                                  struct tevent_req *req,
1985                                  struct ctdb_req_header *header,
1986                                  struct ctdb_req_control *request)
1987 {
1988         struct client_state *state = tevent_req_data(
1989                 req, struct client_state);
1990         struct ctdbd_context *ctdb = state->ctdb;
1991         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
1992         struct ctdb_reply_control reply;
1993         struct node *node;
1994
1995         reply.rdata.opcode = request->opcode;
1996
1997         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
1998             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
1999                 DEBUG(DEBUG_INFO,
2000                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
2001                 reply.status = EINVAL;
2002                 reply.errmsg = "Failed to MODIFY_FLAGS";
2003                 client_send_control(req, header, &reply);
2004                 return;
2005         }
2006
2007         /* There's all sorts of broadcast weirdness here.  Only change
2008          * the specified node, not the destination node of the
2009          * control. */
2010         node = &ctdb->node_map->node[change->pnn];
2011
2012         if ((node->flags &
2013              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
2014             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2015                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
2016                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
2017                 goto done;
2018         }
2019
2020         if ((node->flags &
2021              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
2022             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
2023                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
2024                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
2025                 goto done;
2026         }
2027
2028         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
2029
2030 done:
2031         reply.status = 0;
2032         reply.errmsg = NULL;
2033         client_send_control(req, header, &reply);
2034 }
2035
2036 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
2037                                      struct tevent_req *req,
2038                                      struct ctdb_req_header *header,
2039                                      struct ctdb_req_control *request)
2040 {
2041         struct client_state *state = tevent_req_data(
2042                 req, struct client_state);
2043         struct ctdbd_context *ctdb = state->ctdb;
2044         struct ctdb_reply_control reply;
2045
2046         reply.rdata.opcode = request->opcode;
2047         reply.rdata.data.tun_list = &ctdb->tun_list;
2048         reply.status = 0;
2049         reply.errmsg = NULL;
2050
2051         client_send_control(req, header, &reply);
2052 }
2053
2054 static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
2055                                          struct tevent_req *req,
2056                                          struct ctdb_req_header *header,
2057                                          struct ctdb_req_control *request)
2058 {
2059         struct client_state *state = tevent_req_data(
2060                 req, struct client_state);
2061         struct ctdbd_context *ctdb = state->ctdb;
2062         struct ctdb_reply_control reply;
2063         struct database *db;
2064
2065         reply.rdata.opcode = request->opcode;
2066
2067         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2068                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2069                         goto done;
2070                 }
2071         }
2072
2073         db = database_new(ctdb->db_map, request->rdata.data.db_name,
2074                           CTDB_DB_FLAGS_PERSISTENT);
2075         if (db == NULL) {
2076                 reply.status = -1;
2077                 reply.errmsg = "Failed to attach database";
2078                 client_send_control(req, header, &reply);
2079                 return;
2080         }
2081
2082 done:
2083         reply.rdata.data.db_id = db->id;
2084         reply.status = 0;
2085         reply.errmsg = NULL;
2086         client_send_control(req, header, &reply);
2087 }
2088
2089 static void control_uptime(TALLOC_CTX *mem_ctx,
2090                            struct tevent_req *req,
2091                            struct ctdb_req_header *header,
2092                            struct ctdb_req_control *request)
2093 {
2094         struct client_state *state = tevent_req_data(
2095                 req, struct client_state);
2096         struct ctdbd_context *ctdb = state->ctdb;
2097         struct ctdb_reply_control reply;
2098         struct ctdb_uptime *uptime;;
2099
2100         reply.rdata.opcode = request->opcode;
2101
2102         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
2103         if (uptime == NULL) {
2104                 goto fail;
2105         }
2106
2107         uptime->current_time = tevent_timeval_current();
2108         uptime->ctdbd_start_time = ctdb->start_time;
2109         uptime->last_recovery_started = ctdb->recovery_start_time;
2110         uptime->last_recovery_finished = ctdb->recovery_end_time;
2111
2112         reply.rdata.data.uptime = uptime;
2113         reply.status = 0;
2114         reply.errmsg = NULL;
2115         client_send_control(req, header, &reply);
2116         return;
2117
2118 fail:
2119         reply.status = -1;
2120         reply.errmsg = "Memory error";
2121         client_send_control(req, header, &reply);
2122 }
2123
2124 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
2125                                       struct tevent_req *req,
2126                                       struct ctdb_req_header *header,
2127                                       struct ctdb_req_control *request)
2128 {
2129         struct client_state *state = tevent_req_data(
2130                 req, struct client_state);
2131         struct ctdbd_context *ctdb = state->ctdb;
2132         struct ctdb_reply_control reply;
2133         struct ctdb_node_map *nodemap;
2134         struct node_map *node_map = ctdb->node_map;
2135         int i;
2136
2137         reply.rdata.opcode = request->opcode;
2138
2139         nodemap = read_nodes_file(mem_ctx, header->destnode);
2140         if (nodemap == NULL) {
2141                 goto fail;
2142         }
2143
2144         for (i=0; i<nodemap->num; i++) {
2145                 struct node *node;
2146
2147                 if (i < node_map->num_nodes &&
2148                     ctdb_sock_addr_same(&nodemap->node[i].addr,
2149                                         &node_map->node[i].addr)) {
2150                         continue;
2151                 }
2152
2153                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2154                         int ret;
2155
2156                         node = &node_map->node[i];
2157
2158                         node->flags |= NODE_FLAGS_DELETED;
2159                         ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
2160                                                          false);
2161                         if (ret != 0) {
2162                                 /* Can't happen, but Coverity... */
2163                                 goto fail;
2164                         }
2165
2166                         continue;
2167                 }
2168
2169                 if (i < node_map->num_nodes &&
2170                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
2171                         node = &node_map->node[i];
2172
2173                         node->flags &= ~NODE_FLAGS_DELETED;
2174                         node->addr = nodemap->node[i].addr;
2175
2176                         continue;
2177                 }
2178
2179                 node_map->node = talloc_realloc(node_map, node_map->node,
2180                                                 struct node,
2181                                                 node_map->num_nodes+1);
2182                 if (node_map->node == NULL) {
2183                         goto fail;
2184                 }
2185                 node = &node_map->node[node_map->num_nodes];
2186
2187                 node->addr = nodemap->node[i].addr;
2188                 node->pnn = nodemap->node[i].pnn;
2189                 node->flags = 0;
2190                 node->capabilities = CTDB_CAP_DEFAULT;
2191                 node->recovery_disabled = false;
2192                 node->recovery_substate = NULL;
2193
2194                 node_map->num_nodes += 1;
2195         }
2196
2197         talloc_free(nodemap);
2198
2199         reply.status = 0;
2200         reply.errmsg = NULL;
2201         client_send_control(req, header, &reply);
2202         return;
2203
2204 fail:
2205         reply.status = -1;
2206         reply.errmsg = "Memory error";
2207         client_send_control(req, header, &reply);
2208 }
2209
2210 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2211                                      struct tevent_req *req,
2212                                      struct ctdb_req_header *header,
2213                                      struct ctdb_req_control *request)
2214 {
2215         struct client_state *state = tevent_req_data(
2216                 req, struct client_state);
2217         struct ctdbd_context *ctdb = state->ctdb;
2218         struct ctdb_reply_control reply;
2219         struct node *node;
2220         uint32_t caps = 0;
2221
2222         reply.rdata.opcode = request->opcode;
2223
2224         node = &ctdb->node_map->node[header->destnode];
2225         caps = node->capabilities;
2226
2227         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2228                 /* Don't send reply */
2229                 return;
2230         }
2231
2232         reply.rdata.data.caps = caps;
2233         reply.status = 0;
2234         reply.errmsg = NULL;
2235
2236         client_send_control(req, header, &reply);
2237 }
2238
2239 static void control_release_ip(TALLOC_CTX *mem_ctx,
2240                                struct tevent_req *req,
2241                                struct ctdb_req_header *header,
2242                                struct ctdb_req_control *request)
2243 {
2244         struct client_state *state = tevent_req_data(
2245                 req, struct client_state);
2246         struct ctdbd_context *ctdb = state->ctdb;
2247         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2248         struct ctdb_reply_control reply;
2249         struct ctdb_public_ip_list *ips = NULL;
2250         struct ctdb_public_ip *t = NULL;
2251         int i;
2252
2253         reply.rdata.opcode = request->opcode;
2254
2255         if (ctdb->known_ips == NULL) {
2256                 D_INFO("RELEASE_IP %s - not a public IP\n",
2257                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2258                 goto done;
2259         }
2260
2261         ips = &ctdb->known_ips[header->destnode];
2262
2263         t = NULL;
2264         for (i = 0; i < ips->num; i++) {
2265                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2266                         t = &ips->ip[i];
2267                         break;
2268                 }
2269         }
2270         if (t == NULL) {
2271                 D_INFO("RELEASE_IP %s - not a public IP\n",
2272                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2273                 goto done;
2274         }
2275
2276         if (t->pnn != header->destnode) {
2277                 if (header->destnode == ip->pnn) {
2278                         D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2279                               ctdb_sock_addr_to_string(mem_ctx,
2280                                                        &ip->addr, false),
2281                               ip->pnn);
2282                         reply.status = -1;
2283                         reply.errmsg = "RELEASE_IP to TAKE_IP node";
2284                         client_send_control(req, header, &reply);
2285                         return;
2286                 }
2287
2288                 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2289                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2290                        ip->pnn);
2291                 t->pnn = ip->pnn;
2292         } else {
2293                 D_NOTICE("RELEASE_IP %s - to node %d\n",
2294                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2295                           ip->pnn);
2296                 t->pnn = ip->pnn;
2297         }
2298
2299 done:
2300         reply.status = 0;
2301         reply.errmsg = NULL;
2302         client_send_control(req, header, &reply);
2303 }
2304
2305 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2306                                 struct tevent_req *req,
2307                                 struct ctdb_req_header *header,
2308                                 struct ctdb_req_control *request)
2309 {
2310         struct client_state *state = tevent_req_data(
2311                 req, struct client_state);
2312         struct ctdbd_context *ctdb = state->ctdb;
2313         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2314         struct ctdb_reply_control reply;
2315         struct ctdb_public_ip_list *ips = NULL;
2316         struct ctdb_public_ip *t = NULL;
2317         int i;
2318
2319         reply.rdata.opcode = request->opcode;
2320
2321         if (ctdb->known_ips == NULL) {
2322                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2323                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2324                 goto done;
2325         }
2326
2327         ips = &ctdb->known_ips[header->destnode];
2328
2329         t = NULL;
2330         for (i = 0; i < ips->num; i++) {
2331                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2332                         t = &ips->ip[i];
2333                         break;
2334                 }
2335         }
2336         if (t == NULL) {
2337                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2338                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2339                 goto done;
2340         }
2341
2342         if (t->pnn == header->destnode) {
2343                 D_INFO("TAKEOVER_IP %s - redundant\n",
2344                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2345         } else {
2346                 D_NOTICE("TAKEOVER_IP %s\n",
2347                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2348                 t->pnn = ip->pnn;
2349         }
2350
2351 done:
2352         reply.status = 0;
2353         reply.errmsg = NULL;
2354         client_send_control(req, header, &reply);
2355 }
2356
2357 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2358                                    struct tevent_req *req,
2359                                    struct ctdb_req_header *header,
2360                                    struct ctdb_req_control *request)
2361 {
2362         struct client_state *state = tevent_req_data(
2363                 req, struct client_state);
2364         struct ctdbd_context *ctdb = state->ctdb;
2365         struct ctdb_reply_control reply;
2366         struct ctdb_public_ip_list *ips = NULL;
2367
2368         reply.rdata.opcode = request->opcode;
2369
2370         if (ctdb->known_ips == NULL) {
2371                 /* No IPs defined so create a dummy empty struct and ship it */
2372                 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2373                 if (ips == NULL) {
2374                         reply.status = ENOMEM;
2375                         reply.errmsg = "Memory error";
2376                         goto done;
2377                 }
2378                 goto ok;
2379         }
2380
2381         ips = &ctdb->known_ips[header->destnode];
2382
2383         if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2384                 /* If runstate is not RUNNING or a node is then return
2385                  * no available IPs.  Don't worry about interface
2386                  * states here - we're not faking down to that level.
2387                  */
2388                 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING) {
2389                         /* No available IPs: return dummy empty struct */
2390                         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2391                         if (ips == NULL) {
2392                                 reply.status = ENOMEM;
2393                                 reply.errmsg = "Memory error";
2394                                 goto done;
2395                         }
2396                 }
2397         }
2398
2399 ok:
2400         reply.rdata.data.pubip_list = ips;
2401         reply.status = 0;
2402         reply.errmsg = NULL;
2403
2404 done:
2405         client_send_control(req, header, &reply);
2406 }
2407
2408 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2409                                 struct tevent_req *req,
2410                                 struct ctdb_req_header *header,
2411                                 struct ctdb_req_control *request)
2412 {
2413         struct client_state *state = tevent_req_data(
2414                 req, struct client_state);
2415         struct ctdbd_context *ctdb = state->ctdb;
2416         struct ctdb_reply_control reply;
2417         struct ctdb_node_map *nodemap;
2418         struct node *node;
2419         int i;
2420
2421         reply.rdata.opcode = request->opcode;
2422
2423         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2424         if (nodemap == NULL) {
2425                 goto fail;
2426         }
2427
2428         nodemap->num = ctdb->node_map->num_nodes;
2429         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2430                                      nodemap->num);
2431         if (nodemap->node == NULL) {
2432                 goto fail;
2433         }
2434
2435         for (i=0; i<nodemap->num; i++) {
2436                 node = &ctdb->node_map->node[i];
2437                 nodemap->node[i] = (struct ctdb_node_and_flags) {
2438                         .pnn = node->pnn,
2439                         .flags = node->flags,
2440                         .addr = node->addr,
2441                 };
2442         }
2443
2444         reply.rdata.data.nodemap = nodemap;
2445         reply.status = 0;
2446         reply.errmsg = NULL;
2447         client_send_control(req, header, &reply);
2448         return;
2449
2450 fail:
2451         reply.status = -1;
2452         reply.errmsg = "Memory error";
2453         client_send_control(req, header, &reply);
2454 }
2455
2456 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2457                                      struct tevent_req *req,
2458                                      struct ctdb_req_header *header,
2459                                      struct ctdb_req_control *request)
2460 {
2461         struct client_state *state = tevent_req_data(
2462                 req, struct client_state);
2463         struct ctdbd_context *ctdb = state->ctdb;
2464         struct ctdb_reply_control reply;
2465
2466         reply.rdata.opcode = request->opcode;
2467
2468         if (ctdb->reclock != NULL) {
2469                 reply.rdata.data.reclock_file =
2470                         talloc_strdup(mem_ctx, ctdb->reclock);
2471                 if (reply.rdata.data.reclock_file == NULL) {
2472                         reply.status = ENOMEM;
2473                         reply.errmsg = "Memory error";
2474                         goto done;
2475                 }
2476         } else {
2477                 reply.rdata.data.reclock_file = NULL;
2478         }
2479
2480         reply.status = 0;
2481         reply.errmsg = NULL;
2482
2483 done:
2484         client_send_control(req, header, &reply);
2485 }
2486
2487 static void control_stop_node(TALLOC_CTX *mem_ctx,
2488                               struct tevent_req *req,
2489                               struct ctdb_req_header *header,
2490                               struct ctdb_req_control *request)
2491 {
2492         struct client_state *state = tevent_req_data(
2493                 req, struct client_state);
2494         struct ctdbd_context *ctdb = state->ctdb;
2495         struct ctdb_reply_control reply;
2496
2497         reply.rdata.opcode = request->opcode;
2498
2499         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2500         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2501
2502         reply.status = 0;
2503         reply.errmsg = NULL;
2504
2505         client_send_control(req, header, &reply);
2506         return;
2507 }
2508
2509 static void control_continue_node(TALLOC_CTX *mem_ctx,
2510                                   struct tevent_req *req,
2511                                   struct ctdb_req_header *header,
2512                                   struct ctdb_req_control *request)
2513 {
2514         struct client_state *state = tevent_req_data(
2515                 req, struct client_state);
2516         struct ctdbd_context *ctdb = state->ctdb;
2517         struct ctdb_reply_control reply;
2518
2519         reply.rdata.opcode = request->opcode;
2520
2521         DEBUG(DEBUG_INFO, ("Continue node\n"));
2522         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2523
2524         reply.status = 0;
2525         reply.errmsg = NULL;
2526
2527         client_send_control(req, header, &reply);
2528         return;
2529 }
2530
2531 static void set_ban_state_callback(struct tevent_req *subreq)
2532 {
2533         struct node *node = tevent_req_callback_data(
2534                 subreq, struct node);
2535         bool status;
2536
2537         status = tevent_wakeup_recv(subreq);
2538         TALLOC_FREE(subreq);
2539         if (! status) {
2540                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2541         }
2542
2543         node->flags &= ~NODE_FLAGS_BANNED;
2544 }
2545
2546 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2547                                   struct tevent_req *req,
2548                                   struct ctdb_req_header *header,
2549                                   struct ctdb_req_control *request)
2550 {
2551         struct client_state *state = tevent_req_data(
2552                 req, struct client_state);
2553         struct tevent_req *subreq;
2554         struct ctdbd_context *ctdb = state->ctdb;
2555         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2556         struct ctdb_reply_control reply;
2557         struct node *node;
2558
2559         reply.rdata.opcode = request->opcode;
2560
2561         if (ban->pnn != header->destnode) {
2562                 DEBUG(DEBUG_INFO,
2563                       ("SET_BAN_STATE control for PNN %d rejected\n",
2564                        ban->pnn));
2565                 reply.status = EINVAL;
2566                 goto fail;
2567         }
2568
2569         node = &ctdb->node_map->node[header->destnode];
2570
2571         if (ban->time == 0) {
2572                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2573                 node->flags &= ~NODE_FLAGS_BANNED;
2574                 goto done;
2575         }
2576
2577         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2578                                     tevent_timeval_current_ofs(
2579                                             ban->time, 0));
2580         if (subreq == NULL) {
2581                 reply.status = ENOMEM;
2582                 goto fail;
2583         }
2584         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2585
2586         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2587         node->flags |= NODE_FLAGS_BANNED;
2588         ctdb->vnn_map->generation = INVALID_GENERATION;
2589
2590 done:
2591         reply.status = 0;
2592         reply.errmsg = NULL;
2593
2594         client_send_control(req, header, &reply);
2595         return;
2596
2597 fail:
2598         reply.errmsg = "Failed to ban node";
2599 }
2600
2601 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
2602                                struct tevent_req *req,
2603                                struct ctdb_req_header *header,
2604                                struct ctdb_req_control *request)
2605 {
2606         struct client_state *state = tevent_req_data(
2607                 req, struct client_state);
2608         struct ctdbd_context *ctdb = state->ctdb;
2609         struct ctdb_reply_control reply;
2610         struct database *db;
2611
2612         reply.rdata.opcode = request->opcode;
2613
2614         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2615         if (db == NULL) {
2616                 reply.status = ENOENT;
2617                 reply.errmsg = "Database not found";
2618         } else {
2619                 reply.rdata.data.seqnum = db->seq_num;
2620                 reply.status = 0;
2621                 reply.errmsg = NULL;
2622         }
2623
2624         client_send_control(req, header, &reply);
2625 }
2626
2627 static void control_db_get_health(TALLOC_CTX *mem_ctx,
2628                                   struct tevent_req *req,
2629                                   struct ctdb_req_header *header,
2630                                   struct ctdb_req_control *request)
2631 {
2632         struct client_state *state = tevent_req_data(
2633                 req, struct client_state);
2634         struct ctdbd_context *ctdb = state->ctdb;
2635         struct ctdb_reply_control reply;
2636         struct database *db;
2637
2638         reply.rdata.opcode = request->opcode;
2639
2640         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2641         if (db == NULL) {
2642                 reply.status = ENOENT;
2643                 reply.errmsg = "Database not found";
2644         } else {
2645                 reply.rdata.data.reason = NULL;
2646                 reply.status = 0;
2647                 reply.errmsg = NULL;
2648         }
2649
2650         client_send_control(req, header, &reply);
2651 }
2652
2653 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
2654                                                    struct ctdbd_context *ctdb)
2655 {
2656         struct ctdb_iface_list *iface_list;
2657         struct interface *iface;
2658         int i;
2659
2660         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
2661         if (iface_list == NULL) {
2662                 goto done;
2663         }
2664
2665         iface_list->num = ctdb->iface_map->num;
2666         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
2667                                          iface_list->num);
2668         if (iface_list->iface == NULL) {
2669                 TALLOC_FREE(iface_list);
2670                 goto done;
2671         }
2672
2673         for (i=0; i<iface_list->num; i++) {
2674                 iface = &ctdb->iface_map->iface[i];
2675                 iface_list->iface[i] = (struct ctdb_iface) {
2676                         .link_state = iface->link_up,
2677                         .references = iface->references,
2678                 };
2679                 strlcpy(iface_list->iface[i].name, iface->name,
2680                         sizeof(iface_list->iface[i].name));
2681         }
2682
2683 done:
2684         return iface_list;
2685 }
2686
2687 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
2688                                        struct tevent_req *req,
2689                                        struct ctdb_req_header *header,
2690                                        struct ctdb_req_control *request)
2691 {
2692         struct client_state *state = tevent_req_data(
2693                 req, struct client_state);
2694         struct ctdbd_context *ctdb = state->ctdb;
2695         struct ctdb_reply_control reply;
2696         ctdb_sock_addr *addr = request->rdata.data.addr;
2697         struct ctdb_public_ip_list *known = NULL;
2698         struct ctdb_public_ip_info *info = NULL;
2699         unsigned i;
2700
2701         reply.rdata.opcode = request->opcode;
2702
2703         info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
2704         if (info == NULL) {
2705                 reply.status = ENOMEM;
2706                 reply.errmsg = "Memory error";
2707                 goto done;
2708         }
2709
2710         reply.rdata.data.ipinfo = info;
2711
2712         if (ctdb->known_ips != NULL) {
2713                 known = &ctdb->known_ips[header->destnode];
2714         } else {
2715                 /* No IPs defined so create a dummy empty struct and
2716                  * fall through.  The given IP won't be matched
2717                  * below...
2718                  */
2719                 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2720                 if (known == NULL) {
2721                         reply.status = ENOMEM;
2722                         reply.errmsg = "Memory error";
2723                         goto done;
2724                 }
2725         }
2726
2727         for (i = 0; i < known->num; i++) {
2728                 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
2729                                            addr)) {
2730                         break;
2731                 }
2732         }
2733
2734         if (i == known->num) {
2735                 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
2736                       ctdb_sock_addr_to_string(mem_ctx, addr, false));
2737                 reply.status = -1;
2738                 reply.errmsg = "Unknown address";
2739                 goto done;
2740         }
2741
2742         info->ip = known->ip[i];
2743
2744         /* The fake PUBLICIPS stanza and resulting known_ips data
2745          * don't know anything about interfaces, so completely fake
2746          * this.
2747          */
2748         info->active_idx = 0;
2749
2750         info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
2751         if (info->ifaces == NULL) {
2752                 reply.status = ENOMEM;
2753                 reply.errmsg = "Memory error";
2754                 goto done;
2755         }
2756
2757         reply.status = 0;
2758         reply.errmsg = NULL;
2759
2760 done:
2761         client_send_control(req, header, &reply);
2762 }
2763
2764 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
2765                                struct tevent_req *req,
2766                                struct ctdb_req_header *header,
2767                                struct ctdb_req_control *request)
2768 {
2769         struct client_state *state = tevent_req_data(
2770                 req, struct client_state);
2771         struct ctdbd_context *ctdb = state->ctdb;
2772         struct ctdb_reply_control reply;
2773         struct ctdb_iface_list *iface_list;
2774
2775         reply.rdata.opcode = request->opcode;
2776
2777         iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
2778         if (iface_list == NULL) {
2779                 goto fail;
2780         }
2781
2782         reply.rdata.data.iface_list = iface_list;
2783         reply.status = 0;
2784         reply.errmsg = NULL;
2785         client_send_control(req, header, &reply);
2786         return;
2787
2788 fail:
2789         reply.status = -1;
2790         reply.errmsg = "Memory error";
2791         client_send_control(req, header, &reply);
2792 }
2793
2794 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
2795                                          struct tevent_req *req,
2796                                          struct ctdb_req_header *header,
2797                                          struct ctdb_req_control *request)
2798 {
2799         struct client_state *state = tevent_req_data(
2800                 req, struct client_state);
2801         struct ctdbd_context *ctdb = state->ctdb;
2802         struct ctdb_reply_control reply;
2803         struct ctdb_iface *in_iface;
2804         struct interface *iface = NULL;
2805         bool link_up = false;
2806         int i;
2807
2808         reply.rdata.opcode = request->opcode;
2809
2810         in_iface = request->rdata.data.iface;
2811
2812         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
2813                 reply.errmsg = "interface name not terminated";
2814                 goto fail;
2815         }
2816
2817         switch (in_iface->link_state) {
2818                 case 0:
2819                         link_up = false;
2820                         break;
2821
2822                 case 1:
2823                         link_up = true;
2824                         break;
2825
2826                 default:
2827                         reply.errmsg = "invalid link state";
2828                         goto fail;
2829         }
2830
2831         if (in_iface->references != 0) {
2832                 reply.errmsg = "references should be 0";
2833                 goto fail;
2834         }
2835
2836         for (i=0; i<ctdb->iface_map->num; i++) {
2837                 if (strcmp(ctdb->iface_map->iface[i].name,
2838                            in_iface->name) == 0) {
2839                         iface = &ctdb->iface_map->iface[i];
2840                         break;
2841                 }
2842         }
2843
2844         if (iface == NULL) {
2845                 reply.errmsg = "interface not found";
2846                 goto fail;
2847         }
2848
2849         iface->link_up = link_up;
2850
2851         reply.status = 0;
2852         reply.errmsg = NULL;
2853         client_send_control(req, header, &reply);
2854         return;
2855
2856 fail:
2857         reply.status = -1;
2858         client_send_control(req, header, &reply);
2859 }
2860
2861 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
2862                                     struct tevent_req *req,
2863                                     struct ctdb_req_header *header,
2864                                     struct ctdb_req_control *request)
2865 {
2866         struct client_state *state = tevent_req_data(
2867                 req, struct client_state);
2868         struct ctdbd_context *ctdb = state->ctdb;
2869         struct ctdb_reply_control reply;
2870         struct database *db;
2871
2872         reply.rdata.opcode = request->opcode;
2873
2874         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2875         if (db == NULL) {
2876                 reply.status = ENOENT;
2877                 reply.errmsg = "Database not found";
2878                 goto done;
2879         }
2880
2881         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
2882                 reply.status = EINVAL;
2883                 reply.errmsg = "Can not set READONLY on persistent db";
2884                 goto done;
2885         }
2886
2887         db->flags |= CTDB_DB_FLAGS_READONLY;
2888         reply.status = 0;
2889         reply.errmsg = NULL;
2890
2891 done:
2892         client_send_control(req, header, &reply);
2893 }
2894
2895 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
2896                                     struct tevent_req *req,
2897                                     struct ctdb_req_header *header,
2898                                     struct ctdb_req_control *request)
2899 {
2900         struct client_state *state = tevent_req_data(
2901                 req, struct client_state);
2902         struct ctdbd_context *ctdb = state->ctdb;
2903         struct ctdb_reply_control reply;
2904         struct database *db;
2905
2906         reply.rdata.opcode = request->opcode;
2907
2908         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2909         if (db == NULL) {
2910                 reply.status = ENOENT;
2911                 reply.errmsg = "Database not found";
2912                 goto done;
2913         }
2914
2915         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
2916                 reply.status = EINVAL;
2917                 reply.errmsg = "Can not set STICKY on persistent db";
2918                 goto done;
2919         }
2920
2921         db->flags |= CTDB_DB_FLAGS_STICKY;
2922         reply.status = 0;
2923         reply.errmsg = NULL;
2924
2925 done:
2926         client_send_control(req, header, &reply);
2927 }
2928
2929 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
2930                                   struct tevent_req *req,
2931                                   struct ctdb_req_header *header,
2932                                   struct ctdb_req_control *request)
2933 {
2934         struct ctdb_reply_control reply;
2935
2936         /* Always succeed */
2937         reply.rdata.opcode = request->opcode;
2938         reply.status = 0;
2939         reply.errmsg = NULL;
2940
2941         client_send_control(req, header, &reply);
2942 }
2943
2944 static void control_get_runstate(TALLOC_CTX *mem_ctx,
2945                                  struct tevent_req *req,
2946                                  struct ctdb_req_header *header,
2947                                  struct ctdb_req_control *request)
2948 {
2949         struct client_state *state = tevent_req_data(
2950                 req, struct client_state);
2951         struct ctdbd_context *ctdb = state->ctdb;
2952         struct ctdb_reply_control reply;
2953
2954         reply.rdata.opcode = request->opcode;
2955         reply.rdata.data.runstate = ctdb->runstate;
2956         reply.status = 0;
2957         reply.errmsg = NULL;
2958
2959         client_send_control(req, header, &reply);
2960 }
2961
2962 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
2963                                    struct tevent_req *req,
2964                                    struct ctdb_req_header *header,
2965                                    struct ctdb_req_control *request)
2966 {
2967         struct ctdb_reply_control reply;
2968         struct ctdb_node_map *nodemap;
2969
2970         reply.rdata.opcode = request->opcode;
2971
2972         nodemap = read_nodes_file(mem_ctx, header->destnode);
2973         if (nodemap == NULL) {
2974                 goto fail;
2975         }
2976
2977         reply.rdata.data.nodemap = nodemap;
2978         reply.status = 0;
2979         reply.errmsg = NULL;
2980         client_send_control(req, header, &reply);
2981         return;
2982
2983 fail:
2984         reply.status = -1;
2985         reply.errmsg = "Failed to read nodes file";
2986         client_send_control(req, header, &reply);
2987 }
2988
2989 static void control_db_open_flags(TALLOC_CTX *mem_ctx,
2990                                   struct tevent_req *req,
2991                                   struct ctdb_req_header *header,
2992                                   struct ctdb_req_control *request)
2993 {
2994         struct client_state *state = tevent_req_data(
2995                 req, struct client_state);
2996         struct ctdbd_context *ctdb = state->ctdb;
2997         struct ctdb_reply_control reply;
2998         struct database *db;
2999
3000         reply.rdata.opcode = request->opcode;
3001
3002         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3003         if (db == NULL) {
3004                 reply.status = ENOENT;
3005                 reply.errmsg = "Database not found";
3006         } else {
3007                 reply.rdata.data.tdb_flags = database_flags(db->flags);
3008                 reply.status = 0;
3009                 reply.errmsg = NULL;
3010         }
3011
3012         client_send_control(req, header, &reply);
3013 }
3014
3015 static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
3016                                          struct tevent_req *req,
3017                                          struct ctdb_req_header *header,
3018                                          struct ctdb_req_control *request)
3019 {
3020         struct client_state *state = tevent_req_data(
3021                 req, struct client_state);
3022         struct ctdbd_context *ctdb = state->ctdb;
3023         struct ctdb_reply_control reply;
3024         struct database *db;
3025
3026         reply.rdata.opcode = request->opcode;
3027
3028         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
3029                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
3030                         goto done;
3031                 }
3032         }
3033
3034         db = database_new(ctdb->db_map, request->rdata.data.db_name,
3035                           CTDB_DB_FLAGS_REPLICATED);
3036         if (db == NULL) {
3037                 reply.status = -1;
3038                 reply.errmsg = "Failed to attach database";
3039                 client_send_control(req, header, &reply);
3040                 return;
3041         }
3042
3043 done:
3044         reply.rdata.data.db_id = db->id;
3045         reply.status = 0;
3046         reply.errmsg = NULL;
3047         client_send_control(req, header, &reply);
3048 }
3049
3050 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
3051                                     struct tevent_req *req,
3052                                     struct ctdb_req_header *header,
3053                                     struct ctdb_req_control *request)
3054 {
3055         struct client_state *state = tevent_req_data(
3056                 req, struct client_state);
3057         struct ctdbd_context *ctdb = state->ctdb;
3058         struct ctdb_client *client;
3059         struct client_state *cstate;
3060         struct ctdb_reply_control reply;
3061         bool pid_found, srvid_found;
3062         int ret;
3063
3064         reply.rdata.opcode = request->opcode;
3065
3066         pid_found = false;
3067         srvid_found = false;
3068
3069         for (client=ctdb->client_list; client != NULL; client=client->next) {
3070                 if (client->pid == request->rdata.data.pid_srvid->pid) {
3071                         pid_found = true;
3072                         cstate = (struct client_state *)client->state;
3073                         ret = srvid_exists(ctdb->srv,
3074                                            request->rdata.data.pid_srvid->srvid,
3075                                            cstate);
3076                         if (ret == 0) {
3077                                 srvid_found = true;
3078                                 ret = kill(cstate->pid, 0);
3079                                 if (ret != 0) {
3080                                         reply.status = ret;
3081                                         reply.errmsg = strerror(errno);
3082                                 } else {
3083                                         reply.status = 0;
3084                                         reply.errmsg = NULL;
3085                                 }
3086                         }
3087                 }
3088         }
3089
3090         if (! pid_found) {
3091                 reply.status = -1;
3092                 reply.errmsg = "No client for PID";
3093         } else if (! srvid_found) {
3094                 reply.status = -1;
3095                 reply.errmsg = "No client for PID and SRVID";
3096         }
3097
3098         client_send_control(req, header, &reply);
3099 }
3100
3101 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
3102                                  struct tevent_req *req,
3103                                  struct ctdb_req_header *header,
3104                                  struct ctdb_req_control *request)
3105 {
3106         struct client_state *state = tevent_req_data(
3107                 req, struct client_state);
3108         struct ctdbd_context *ctdb = state->ctdb;
3109         struct ctdb_reply_control reply;
3110         struct fake_control_failure *f = NULL;
3111
3112         D_DEBUG("Checking fake control failure for control %u on node %u\n",
3113                 request->opcode, header->destnode);
3114         for (f = ctdb->control_failures; f != NULL; f = f->next) {
3115                 if (f->opcode == request->opcode &&
3116                     (f->pnn == header->destnode ||
3117                      f->pnn == CTDB_UNKNOWN_PNN)) {
3118
3119                         reply.rdata.opcode = request->opcode;
3120                         if (strcmp(f->error, "TIMEOUT") == 0) {
3121                                 /* Causes no reply */
3122                                 D_ERR("Control %u fake timeout on node %u\n",
3123                                       request->opcode, header->destnode);
3124                                 return true;
3125                         } else if (strcmp(f->error, "ERROR") == 0) {
3126                                 D_ERR("Control %u fake error on node %u\n",
3127                                       request->opcode, header->destnode);
3128                                 reply.status = -1;
3129                                 reply.errmsg = f->comment;
3130                                 client_send_control(req, header, &reply);
3131                                 return true;
3132                         }
3133                 }
3134         }
3135
3136         return false;
3137 }
3138
3139 static void control_error(TALLOC_CTX *mem_ctx,
3140                           struct tevent_req *req,
3141                           struct ctdb_req_header *header,
3142                           struct ctdb_req_control *request)
3143 {
3144         struct ctdb_reply_control reply;
3145
3146         reply.rdata.opcode = request->opcode;
3147         reply.status = -1;
3148         reply.errmsg = "Not implemented";
3149
3150         client_send_control(req, header, &reply);
3151 }
3152
3153 /*
3154  * Handling protocol - messages
3155  */
3156
3157 struct disable_recoveries_state {
3158         struct node *node;
3159 };
3160
3161 static void disable_recoveries_callback(struct tevent_req *subreq)
3162 {
3163         struct disable_recoveries_state *substate = tevent_req_callback_data(
3164                 subreq, struct disable_recoveries_state);
3165         bool status;
3166
3167         status = tevent_wakeup_recv(subreq);
3168         TALLOC_FREE(subreq);
3169         if (! status) {
3170                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
3171         }
3172
3173         substate->node->recovery_disabled = false;
3174         TALLOC_FREE(substate->node->recovery_substate);
3175 }
3176
3177 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
3178                                        struct tevent_req *req,
3179                                        struct ctdb_req_header *header,
3180                                        struct ctdb_req_message *request)
3181 {
3182         struct client_state *state = tevent_req_data(
3183                 req, struct client_state);
3184         struct tevent_req *subreq;
3185         struct ctdbd_context *ctdb = state->ctdb;
3186         struct disable_recoveries_state *substate;
3187         struct ctdb_disable_message *disable = request->data.disable;
3188         struct ctdb_req_message_data reply;
3189         struct node *node;
3190         int ret = -1;
3191         TDB_DATA data;
3192
3193         node = &ctdb->node_map->node[header->destnode];
3194
3195         if (disable->timeout == 0) {
3196                 TALLOC_FREE(node->recovery_substate);
3197                 node->recovery_disabled = false;
3198                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
3199                                    header->destnode));
3200                 goto done;
3201         }
3202
3203         substate = talloc_zero(ctdb->node_map,
3204                                struct disable_recoveries_state);
3205         if (substate == NULL) {
3206                 goto fail;
3207         }
3208
3209         substate->node = node;
3210
3211         subreq = tevent_wakeup_send(substate, state->ev,
3212                                     tevent_timeval_current_ofs(
3213                                             disable->timeout, 0));
3214         if (subreq == NULL) {
3215                 talloc_free(substate);
3216                 goto fail;
3217         }
3218         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
3219
3220         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
3221                            disable->timeout, header->destnode));
3222         node->recovery_substate = substate;
3223         node->recovery_disabled = true;
3224
3225 done:
3226         ret = header->destnode;
3227
3228 fail:
3229         reply.srvid = disable->srvid;
3230         data.dptr = (uint8_t *)&ret;
3231         data.dsize = sizeof(int);
3232         reply.data = data;
3233
3234         client_send_message(req, header, &reply);
3235 }
3236
3237 static void message_takeover_run(TALLOC_CTX *mem_ctx,
3238                                  struct tevent_req *req,
3239                                  struct ctdb_req_header *header,
3240                                  struct ctdb_req_message *request)
3241 {
3242         struct client_state *state = tevent_req_data(
3243                 req, struct client_state);
3244         struct ctdbd_context *ctdb = state->ctdb;
3245         struct ctdb_srvid_message *srvid = request->data.msg;
3246         struct ctdb_req_message_data reply;
3247         int ret = -1;
3248         TDB_DATA data;
3249
3250         if (header->destnode != ctdb->node_map->recmaster) {
3251                 /* No reply! Only recmaster replies... */
3252                 return;
3253         }
3254
3255         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
3256                            header->destnode));
3257         ret = header->destnode;
3258
3259         reply.srvid = srvid->srvid;
3260         data.dptr = (uint8_t *)&ret;
3261         data.dsize = sizeof(int);
3262         reply.data = data;
3263
3264         client_send_message(req, header, &reply);
3265 }
3266
3267 /*
3268  * Handle a single client
3269  */
3270
3271 static void client_read_handler(uint8_t *buf, size_t buflen,
3272                                 void *private_data);
3273 static void client_dead_handler(void *private_data);
3274 static void client_process_packet(struct tevent_req *req,
3275                                   uint8_t *buf, size_t buflen);
3276 static void client_process_message(struct tevent_req *req,
3277                                    uint8_t *buf, size_t buflen);
3278 static void client_process_control(struct tevent_req *req,
3279                                    uint8_t *buf, size_t buflen);
3280 static void client_reply_done(struct tevent_req *subreq);
3281
3282 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3283                                       struct tevent_context *ev,
3284                                       int fd, struct ctdbd_context *ctdb,
3285                                       int pnn)
3286 {
3287         struct tevent_req *req;
3288         struct client_state *state;
3289         struct ucred cr;
3290         socklen_t crl = sizeof(struct ucred);
3291         int ret;
3292
3293         req = tevent_req_create(mem_ctx, &state, struct client_state);
3294         if (req == NULL) {
3295                 return NULL;
3296         }
3297
3298         state->ev = ev;
3299         state->fd = fd;
3300         state->ctdb = ctdb;
3301         state->pnn = pnn;
3302
3303         ret = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl);
3304         if (ret != 0) {
3305                 tevent_req_error(req, ret);
3306                 return tevent_req_post(req, ev);
3307         }
3308         state->pid = cr.pid;
3309
3310         ret = comm_setup(state, ev, fd, client_read_handler, req,
3311                          client_dead_handler, req, &state->comm);
3312         if (ret != 0) {
3313                 tevent_req_error(req, ret);
3314                 return tevent_req_post(req, ev);
3315         }
3316
3317         ret = client_add(ctdb, state->pid, state);
3318         if (ret != 0) {
3319                 tevent_req_error(req, ret);
3320                 return tevent_req_post(req, ev);
3321         }
3322
3323         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
3324
3325         return req;
3326 }
3327
3328 static void client_read_handler(uint8_t *buf, size_t buflen,
3329                                 void *private_data)
3330 {
3331         struct tevent_req *req = talloc_get_type_abort(
3332                 private_data, struct tevent_req);
3333         struct client_state *state = tevent_req_data(
3334                 req, struct client_state);
3335         struct ctdbd_context *ctdb = state->ctdb;
3336         struct ctdb_req_header header;
3337         size_t np;
3338         int ret, i;
3339
3340         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3341         if (ret != 0) {
3342                 return;
3343         }
3344
3345         if (buflen != header.length) {
3346                 return;
3347         }
3348
3349         ret = ctdb_req_header_verify(&header, 0);
3350         if (ret != 0) {
3351                 return;
3352         }
3353
3354         header_fix_pnn(&header, ctdb);
3355
3356         if (header.destnode == CTDB_BROADCAST_ALL) {
3357                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3358                         header.destnode = i;
3359
3360                         ctdb_req_header_push(&header, buf, &np);
3361                         client_process_packet(req, buf, buflen);
3362                 }
3363                 return;
3364         }
3365
3366         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
3367                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3368                         if (ctdb->node_map->node[i].flags &
3369                             NODE_FLAGS_DISCONNECTED) {
3370                                 continue;
3371                         }
3372
3373                         header.destnode = i;
3374
3375                         ctdb_req_header_push(&header, buf, &np);
3376                         client_process_packet(req, buf, buflen);
3377                 }
3378                 return;
3379         }
3380
3381         if (header.destnode > ctdb->node_map->num_nodes) {
3382                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
3383                         header.destnode);
3384                 return;
3385         }
3386
3387
3388         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
3389                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
3390                         header.destnode);
3391                 return;
3392         }
3393
3394         ctdb_req_header_push(&header, buf, &np);
3395         client_process_packet(req, buf, buflen);
3396 }
3397
3398 static void client_dead_handler(void *private_data)
3399 {
3400         struct tevent_req *req = talloc_get_type_abort(
3401                 private_data, struct tevent_req);
3402
3403         tevent_req_done(req);
3404 }
3405
3406 static void client_process_packet(struct tevent_req *req,
3407                                   uint8_t *buf, size_t buflen)
3408 {
3409         struct ctdb_req_header header;
3410         size_t np;
3411         int ret;
3412
3413         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3414         if (ret != 0) {
3415                 return;
3416         }
3417
3418         switch (header.operation) {
3419         case CTDB_REQ_MESSAGE:
3420                 client_process_message(req, buf, buflen);
3421                 break;
3422
3423         case CTDB_REQ_CONTROL:
3424                 client_process_control(req, buf, buflen);
3425                 break;
3426
3427         default:
3428                 break;
3429         }
3430 }
3431
3432 static void client_process_message(struct tevent_req *req,
3433                                    uint8_t *buf, size_t buflen)
3434 {
3435         struct client_state *state = tevent_req_data(
3436                 req, struct client_state);
3437         struct ctdbd_context *ctdb = state->ctdb;
3438         TALLOC_CTX *mem_ctx;
3439         struct ctdb_req_header header;
3440         struct ctdb_req_message request;
3441         uint64_t srvid;
3442         int ret;
3443
3444         mem_ctx = talloc_new(state);
3445         if (tevent_req_nomem(mem_ctx, req)) {
3446                 return;
3447         }
3448
3449         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
3450         if (ret != 0) {
3451                 talloc_free(mem_ctx);
3452                 tevent_req_error(req, ret);
3453                 return;
3454         }
3455
3456         header_fix_pnn(&header, ctdb);
3457
3458         if (header.destnode >= ctdb->node_map->num_nodes) {
3459                 /* Many messages are not replied to, so just behave as
3460                  * though this message was not received */
3461                 fprintf(stderr, "Invalid node %d\n", header.destnode);
3462                 talloc_free(mem_ctx);
3463                 return;
3464         }
3465
3466         srvid = request.srvid;
3467         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
3468
3469         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
3470                 message_disable_recoveries(mem_ctx, req, &header, &request);
3471         } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
3472                 message_takeover_run(mem_ctx, req, &header, &request);
3473         }
3474
3475         /* check srvid */
3476         talloc_free(mem_ctx);
3477 }
3478
3479 static void client_process_control(struct tevent_req *req,
3480                                    uint8_t *buf, size_t buflen)
3481 {
3482         struct client_state *state = tevent_req_data(
3483                 req, struct client_state);
3484         struct ctdbd_context *ctdb = state->ctdb;
3485         TALLOC_CTX *mem_ctx;
3486         struct ctdb_req_header header;
3487         struct ctdb_req_control request;
3488         int ret;
3489
3490         mem_ctx = talloc_new(state);
3491         if (tevent_req_nomem(mem_ctx, req)) {
3492                 return;
3493         }
3494
3495         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
3496         if (ret != 0) {
3497                 talloc_free(mem_ctx);
3498                 tevent_req_error(req, ret);
3499                 return;
3500         }
3501
3502         header_fix_pnn(&header, ctdb);
3503
3504         if (header.destnode >= ctdb->node_map->num_nodes) {
3505                 struct ctdb_reply_control reply;
3506
3507                 reply.rdata.opcode = request.opcode;
3508                 reply.errmsg = "Invalid node";
3509                 reply.status = -1;
3510                 client_send_control(req, &header, &reply);
3511                 return;
3512         }
3513
3514         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
3515                            request.opcode, header.reqid));
3516
3517         if (fake_control_failure(mem_ctx, req, &header, &request)) {
3518                 goto done;
3519         }
3520
3521         switch (request.opcode) {
3522         case CTDB_CONTROL_PROCESS_EXISTS:
3523                 control_process_exists(mem_ctx, req, &header, &request);
3524                 break;
3525
3526         case CTDB_CONTROL_PING:
3527                 control_ping(mem_ctx, req, &header, &request);
3528                 break;
3529
3530         case CTDB_CONTROL_GETDBPATH:
3531                 control_getdbpath(mem_ctx, req, &header, &request);
3532                 break;
3533
3534         case CTDB_CONTROL_GETVNNMAP:
3535                 control_getvnnmap(mem_ctx, req, &header, &request);
3536                 break;
3537
3538         case CTDB_CONTROL_GET_DEBUG:
3539                 control_get_debug(mem_ctx, req, &header, &request);
3540                 break;
3541
3542         case CTDB_CONTROL_SET_DEBUG:
3543                 control_set_debug(mem_ctx, req, &header, &request);
3544                 break;
3545
3546         case CTDB_CONTROL_GET_DBMAP:
3547                 control_get_dbmap(mem_ctx, req, &header, &request);
3548                 break;
3549
3550         case CTDB_CONTROL_GET_RECMODE:
3551                 control_get_recmode(mem_ctx, req, &header, &request);
3552                 break;
3553
3554         case CTDB_CONTROL_SET_RECMODE:
3555                 control_set_recmode(mem_ctx, req, &header, &request);
3556                 break;
3557
3558         case CTDB_CONTROL_DB_ATTACH:
3559                 control_db_attach(mem_ctx, req, &header, &request);
3560                 break;
3561
3562         case CTDB_CONTROL_REGISTER_SRVID:
3563                 control_register_srvid(mem_ctx, req, &header, &request);
3564                 break;
3565
3566         case CTDB_CONTROL_DEREGISTER_SRVID:
3567                 control_deregister_srvid(mem_ctx, req, &header, &request);
3568                 break;
3569
3570         case CTDB_CONTROL_GET_DBNAME:
3571                 control_get_dbname(mem_ctx, req, &header, &request);
3572                 break;
3573
3574         case CTDB_CONTROL_GET_PID:
3575                 control_get_pid(mem_ctx, req, &header, &request);
3576                 break;
3577
3578         case CTDB_CONTROL_GET_RECMASTER:
3579                 control_get_recmaster(mem_ctx, req, &header, &request);
3580                 break;
3581
3582         case CTDB_CONTROL_GET_PNN:
3583                 control_get_pnn(mem_ctx, req, &header, &request);
3584                 break;
3585
3586         case CTDB_CONTROL_SHUTDOWN:
3587                 control_shutdown(mem_ctx, req, &header, &request);
3588                 break;
3589
3590         case CTDB_CONTROL_SET_TUNABLE:
3591                 control_set_tunable(mem_ctx, req, &header, &request);
3592                 break;
3593
3594         case CTDB_CONTROL_GET_TUNABLE:
3595                 control_get_tunable(mem_ctx, req, &header, &request);
3596                 break;
3597
3598         case CTDB_CONTROL_LIST_TUNABLES:
3599                 control_list_tunables(mem_ctx, req, &header, &request);
3600                 break;
3601
3602         case CTDB_CONTROL_MODIFY_FLAGS:
3603                 control_modify_flags(mem_ctx, req, &header, &request);
3604                 break;
3605
3606         case CTDB_CONTROL_GET_ALL_TUNABLES:
3607                 control_get_all_tunables(mem_ctx, req, &header, &request);
3608                 break;
3609
3610         case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
3611                 control_db_attach_persistent(mem_ctx, req, &header, &request);
3612                 break;
3613
3614         case CTDB_CONTROL_UPTIME:
3615                 control_uptime(mem_ctx, req, &header, &request);
3616                 break;
3617
3618         case CTDB_CONTROL_RELOAD_NODES_FILE:
3619                 control_reload_nodes_file(mem_ctx, req, &header, &request);
3620                 break;
3621
3622         case CTDB_CONTROL_GET_CAPABILITIES:
3623                 control_get_capabilities(mem_ctx, req, &header, &request);
3624                 break;
3625
3626         case CTDB_CONTROL_RELEASE_IP:
3627                 control_release_ip(mem_ctx, req, &header, &request);
3628                 break;
3629
3630         case CTDB_CONTROL_TAKEOVER_IP:
3631                 control_takeover_ip(mem_ctx, req, &header, &request);
3632                 break;
3633
3634         case CTDB_CONTROL_GET_PUBLIC_IPS:
3635                 control_get_public_ips(mem_ctx, req, &header, &request);
3636                 break;
3637
3638         case CTDB_CONTROL_GET_NODEMAP:
3639                 control_get_nodemap(mem_ctx, req, &header, &request);
3640                 break;
3641
3642         case CTDB_CONTROL_GET_RECLOCK_FILE:
3643                 control_get_reclock_file(mem_ctx, req, &header, &request);
3644                 break;
3645
3646         case CTDB_CONTROL_STOP_NODE:
3647                 control_stop_node(mem_ctx, req, &header, &request);
3648                 break;
3649
3650         case CTDB_CONTROL_CONTINUE_NODE:
3651                 control_continue_node(mem_ctx, req, &header, &request);
3652                 break;
3653
3654         case CTDB_CONTROL_SET_BAN_STATE:
3655                 control_set_ban_state(mem_ctx, req, &header, &request);
3656                 break;
3657
3658         case CTDB_CONTROL_GET_DB_SEQNUM:
3659                 control_get_db_seqnum(mem_ctx, req, &header, &request);
3660                 break;
3661
3662         case CTDB_CONTROL_DB_GET_HEALTH:
3663                 control_db_get_health(mem_ctx, req, &header, &request);
3664                 break;
3665
3666         case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
3667                 control_get_public_ip_info(mem_ctx, req, &header, &request);
3668                 break;
3669
3670         case CTDB_CONTROL_GET_IFACES:
3671                 control_get_ifaces(mem_ctx, req, &header, &request);
3672                 break;
3673
3674         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
3675                 control_set_iface_link_state(mem_ctx, req, &header, &request);
3676                 break;
3677
3678         case CTDB_CONTROL_SET_DB_READONLY:
3679                 control_set_db_readonly(mem_ctx, req, &header, &request);
3680                 break;
3681
3682         case CTDB_CONTROL_SET_DB_STICKY:
3683                 control_set_db_sticky(mem_ctx, req, &header, &request);
3684                 break;
3685
3686         case CTDB_CONTROL_IPREALLOCATED:
3687                 control_ipreallocated(mem_ctx, req, &header, &request);
3688                 break;
3689
3690         case CTDB_CONTROL_GET_RUNSTATE:
3691                 control_get_runstate(mem_ctx, req, &header, &request);
3692                 break;
3693
3694         case CTDB_CONTROL_GET_NODES_FILE:
3695                 control_get_nodes_file(mem_ctx, req, &header, &request);
3696                 break;
3697
3698         case CTDB_CONTROL_DB_OPEN_FLAGS:
3699                 control_db_open_flags(mem_ctx, req, &header, &request);
3700                 break;
3701
3702         case CTDB_CONTROL_DB_ATTACH_REPLICATED:
3703                 control_db_attach_replicated(mem_ctx, req, &header, &request);
3704                 break;
3705
3706         case CTDB_CONTROL_CHECK_PID_SRVID:
3707                 control_check_pid_srvid(mem_ctx, req, &header, &request);
3708                 break;
3709
3710         default:
3711                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
3712                         control_error(mem_ctx, req, &header, &request);
3713                 }
3714                 break;
3715         }
3716
3717 done:
3718         talloc_free(mem_ctx);
3719 }
3720
3721 static int client_recv(struct tevent_req *req, int *perr)
3722 {
3723         struct client_state *state = tevent_req_data(
3724                 req, struct client_state);
3725         int err;
3726
3727         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
3728         close(state->fd);
3729
3730         if (tevent_req_is_unix_error(req, &err)) {
3731                 if (perr != NULL) {
3732                         *perr = err;
3733                 }
3734                 return -1;
3735         }
3736
3737         return state->status;
3738 }
3739
3740 /*
3741  * Fake CTDB server
3742  */
3743
3744 struct server_state {
3745         struct tevent_context *ev;
3746         struct ctdbd_context *ctdb;
3747         int fd;
3748 };
3749
3750 static void server_new_client(struct tevent_req *subreq);
3751 static void server_client_done(struct tevent_req *subreq);
3752
3753 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
3754                                       struct tevent_context *ev,
3755                                       struct ctdbd_context *ctdb,
3756                                       int fd)
3757 {
3758         struct tevent_req *req, *subreq;
3759         struct server_state *state;
3760
3761         req = tevent_req_create(mem_ctx, &state, struct server_state);
3762         if (req == NULL) {
3763                 return NULL;
3764         }
3765
3766         state->ev = ev;
3767         state->ctdb = ctdb;
3768         state->fd = fd;
3769
3770         subreq = accept_send(state, ev, fd);
3771         if (tevent_req_nomem(subreq, req)) {
3772                 return tevent_req_post(req, ev);
3773         }
3774         tevent_req_set_callback(subreq, server_new_client, req);
3775
3776         return req;
3777 }
3778
3779 static void server_new_client(struct tevent_req *subreq)
3780 {
3781         struct tevent_req *req = tevent_req_callback_data(
3782                 subreq, struct tevent_req);
3783         struct server_state *state = tevent_req_data(
3784                 req, struct server_state);
3785         struct ctdbd_context *ctdb = state->ctdb;
3786         int client_fd;
3787         int ret = 0;
3788
3789         client_fd = accept_recv(subreq, NULL, NULL, &ret);
3790         TALLOC_FREE(subreq);
3791         if (client_fd == -1) {
3792                 tevent_req_error(req, ret);
3793                 return;
3794         }
3795
3796         subreq = client_send(state, state->ev, client_fd,
3797                              ctdb, ctdb->node_map->pnn);
3798         if (tevent_req_nomem(subreq, req)) {
3799                 return;
3800         }
3801         tevent_req_set_callback(subreq, server_client_done, req);
3802
3803         ctdb->num_clients += 1;
3804
3805         subreq = accept_send(state, state->ev, state->fd);
3806         if (tevent_req_nomem(subreq, req)) {
3807                 return;
3808         }
3809         tevent_req_set_callback(subreq, server_new_client, req);
3810 }
3811
3812 static void server_client_done(struct tevent_req *subreq)
3813 {
3814         struct tevent_req *req = tevent_req_callback_data(
3815                 subreq, struct tevent_req);
3816         struct server_state *state = tevent_req_data(
3817                 req, struct server_state);
3818         struct ctdbd_context *ctdb = state->ctdb;
3819         int ret = 0;
3820         int status;
3821
3822         status = client_recv(subreq, &ret);
3823         TALLOC_FREE(subreq);
3824         if (status < 0) {
3825                 tevent_req_error(req, ret);
3826                 return;
3827         }
3828
3829         ctdb->num_clients -= 1;
3830
3831         if (status == 99) {
3832                 /* Special status, to shutdown server */
3833                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
3834                 tevent_req_done(req);
3835         }
3836 }
3837
3838 static bool server_recv(struct tevent_req *req, int *perr)
3839 {
3840         int err;
3841
3842         if (tevent_req_is_unix_error(req, &err)) {
3843                 if (perr != NULL) {
3844                         *perr = err;
3845                 }
3846                 return false;
3847         }
3848         return true;
3849 }
3850
3851 /*
3852  * Main functions
3853  */
3854
3855 static int socket_init(const char *sockpath)
3856 {
3857         struct sockaddr_un addr;
3858         size_t len;
3859         int ret, fd;
3860
3861         memset(&addr, 0, sizeof(addr));
3862         addr.sun_family = AF_UNIX;
3863
3864         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
3865         if (len >= sizeof(addr.sun_path)) {
3866                 fprintf(stderr, "path too long: %s\n", sockpath);
3867                 return -1;
3868         }
3869
3870         fd = socket(AF_UNIX, SOCK_STREAM, 0);
3871         if (fd == -1) {
3872                 fprintf(stderr, "socket failed - %s\n", sockpath);
3873                 return -1;
3874         }
3875
3876         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
3877         if (ret != 0) {
3878                 fprintf(stderr, "bind failed - %s\n", sockpath);
3879                 goto fail;
3880         }
3881
3882         ret = listen(fd, 10);
3883         if (ret != 0) {
3884                 fprintf(stderr, "listen failed\n");
3885                 goto fail;
3886         }
3887
3888         DEBUG(DEBUG_INFO, ("Socket init done\n"));
3889
3890         return fd;
3891
3892 fail:
3893         if (fd != -1) {
3894                 close(fd);
3895         }
3896         return -1;
3897 }
3898
3899 static struct options {
3900         const char *dbdir;
3901         const char *sockpath;
3902         const char *pidfile;
3903         const char *debuglevel;
3904 } options;
3905
3906 static struct poptOption cmdline_options[] = {
3907         POPT_AUTOHELP
3908         { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
3909                 "Database directory", "directory" },
3910         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
3911                 "Unix domain socket path", "filename" },
3912         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
3913                 "pid file", "filename" } ,
3914         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
3915                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
3916         POPT_TABLEEND
3917 };
3918
3919 static void cleanup(void)
3920 {
3921         unlink(options.sockpath);
3922         unlink(options.pidfile);
3923 }
3924
3925 static void signal_handler(int sig)
3926 {
3927         cleanup();
3928         exit(0);
3929 }
3930
3931 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
3932                          struct ctdbd_context *ctdb, int fd, int pfd)
3933 {
3934         struct tevent_req *req;
3935         int ret = 0;
3936         ssize_t len;
3937
3938         atexit(cleanup);
3939         signal(SIGTERM, signal_handler);
3940
3941         req = server_send(mem_ctx, ev, ctdb, fd);
3942         if (req == NULL) {
3943                 fprintf(stderr, "Memory error\n");
3944                 exit(1);
3945         }
3946
3947         len = write(pfd, &ret, sizeof(ret));
3948         if (len != sizeof(ret)) {
3949                 fprintf(stderr, "Failed to send message to parent\n");
3950                 exit(1);
3951         }
3952         close(pfd);
3953
3954         tevent_req_poll(req, ev);
3955
3956         server_recv(req, &ret);
3957         if (ret != 0) {
3958                 exit(1);
3959         }
3960 }
3961
3962 int main(int argc, const char *argv[])
3963 {
3964         TALLOC_CTX *mem_ctx;
3965         struct ctdbd_context *ctdb;
3966         struct tevent_context *ev;
3967         poptContext pc;
3968         int opt, fd, ret, pfd[2];
3969         ssize_t len;
3970         pid_t pid;
3971         FILE *fp;
3972
3973         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
3974                             POPT_CONTEXT_KEEP_FIRST);
3975         while ((opt = poptGetNextOpt(pc)) != -1) {
3976                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
3977                 exit(1);
3978         }
3979
3980         if (options.dbdir == NULL) {
3981                 fprintf(stderr, "Please specify database directory\n");
3982                 poptPrintHelp(pc, stdout, 0);
3983                 exit(1);
3984         }
3985
3986         if (options.sockpath == NULL) {
3987                 fprintf(stderr, "Please specify socket path\n");
3988                 poptPrintHelp(pc, stdout, 0);
3989                 exit(1);
3990         }
3991
3992         if (options.pidfile == NULL) {
3993                 fprintf(stderr, "Please specify pid file\n");
3994                 poptPrintHelp(pc, stdout, 0);
3995                 exit(1);
3996         }
3997
3998         mem_ctx = talloc_new(NULL);
3999         if (mem_ctx == NULL) {
4000                 fprintf(stderr, "Memory error\n");
4001                 exit(1);
4002         }
4003
4004         ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
4005         if (ret != 0) {
4006                 fprintf(stderr, "Invalid debug level\n");
4007                 poptPrintHelp(pc, stdout, 0);
4008                 exit(1);
4009         }
4010
4011         ctdb = ctdbd_setup(mem_ctx, options.dbdir);
4012         if (ctdb == NULL) {
4013                 exit(1);
4014         }
4015
4016         if (! ctdbd_verify(ctdb)) {
4017                 exit(1);
4018         }
4019
4020         ev = tevent_context_init(mem_ctx);
4021         if (ev == NULL) {
4022                 fprintf(stderr, "Memory error\n");
4023                 exit(1);
4024         }
4025
4026         fd = socket_init(options.sockpath);
4027         if (fd == -1) {
4028                 exit(1);
4029         }
4030
4031         ret = pipe(pfd);
4032         if (ret != 0) {
4033                 fprintf(stderr, "Failed to create pipe\n");
4034                 cleanup();
4035                 exit(1);
4036         }
4037
4038         pid = fork();
4039         if (pid == -1) {
4040                 fprintf(stderr, "Failed to fork\n");
4041                 cleanup();
4042                 exit(1);
4043         }
4044
4045         if (pid == 0) {
4046                 /* Child */
4047                 close(pfd[0]);
4048                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
4049                 exit(1);
4050         }
4051
4052         /* Parent */
4053         close(pfd[1]);
4054
4055         len = read(pfd[0], &ret, sizeof(ret));
4056         close(pfd[0]);
4057         if (len != sizeof(ret)) {
4058                 fprintf(stderr, "len = %zi\n", len);
4059                 fprintf(stderr, "Failed to get message from child\n");
4060                 kill(pid, SIGTERM);
4061                 exit(1);
4062         }
4063
4064         fp = fopen(options.pidfile, "w");
4065         if (fp == NULL) {
4066                 fprintf(stderr, "Failed to open pid file %s\n",
4067                         options.pidfile);
4068                 kill(pid, SIGTERM);
4069                 exit(1);
4070         }
4071         fprintf(fp, "%d\n", pid);
4072         fclose(fp);
4073
4074         return 0;
4075 }