07ea6b99754ca370f61d25c7a5d3f715eac8125b
[metze/samba/wip.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24
25 #include <popt.h>
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/async_req/async_sock.h"
35
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "protocol/protocol_util.h"
39 #include "protocol/protocol_private.h"
40
41 #include "common/comm.h"
42 #include "common/logging.h"
43 #include "common/tunable.h"
44 #include "common/srvid.h"
45
46 #include "ipalloc_read_known_ips.h"
47
48
49 #define CTDB_PORT 4379
50
51 /* A fake flag that is only supported by some functions */
52 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
53
54 struct node {
55         ctdb_sock_addr addr;
56         uint32_t pnn;
57         uint32_t flags;
58         uint32_t capabilities;
59         bool recovery_disabled;
60         void *recovery_substate;
61 };
62
63 struct node_map {
64         uint32_t num_nodes;
65         struct node *node;
66         uint32_t pnn;
67         uint32_t recmaster;
68 };
69
70 struct interface {
71         const char *name;
72         bool link_up;
73         uint32_t references;
74 };
75
76 struct interface_map {
77         int num;
78         struct interface *iface;
79 };
80
81 struct vnn_map {
82         uint32_t recmode;
83         uint32_t generation;
84         uint32_t size;
85         uint32_t *map;
86 };
87
88 struct database {
89         struct database *prev, *next;
90         const char *name;
91         const char *path;
92         struct tdb_context *tdb;
93         uint32_t id;
94         uint8_t flags;
95         uint64_t seq_num;
96 };
97
98 struct database_map {
99         struct database *db;
100         const char *dbdir;
101 };
102
103 struct fake_control_failure {
104         struct fake_control_failure  *prev, *next;
105         enum ctdb_controls opcode;
106         uint32_t pnn;
107         const char *error;
108         const char *comment;
109 };
110
111 struct ctdb_client {
112         struct ctdb_client *prev, *next;
113         struct ctdbd_context *ctdb;
114         pid_t pid;
115         void *state;
116 };
117
118 struct ctdbd_context {
119         struct node_map *node_map;
120         struct interface_map *iface_map;
121         struct vnn_map *vnn_map;
122         struct database_map *db_map;
123         struct srvid_context *srv;
124         int num_clients;
125         struct timeval start_time;
126         struct timeval recovery_start_time;
127         struct timeval recovery_end_time;
128         bool takeover_disabled;
129         int log_level;
130         enum ctdb_runstate runstate;
131         struct ctdb_tunable_list tun_list;
132         char *reclock;
133         struct ctdb_public_ip_list *known_ips;
134         struct fake_control_failure *control_failures;
135         struct ctdb_client *client_list;
136 };
137
138 /*
139  * Parse routines
140  */
141
142 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
143 {
144         struct node_map *node_map;
145
146         node_map = talloc_zero(mem_ctx, struct node_map);
147         if (node_map == NULL) {
148                 return NULL;
149         }
150
151         node_map->pnn = CTDB_UNKNOWN_PNN;
152         node_map->recmaster = CTDB_UNKNOWN_PNN;
153
154         return node_map;
155 }
156
157 /* Read a nodemap from stdin.  Each line looks like:
158  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
159  * EOF or a blank line terminates input.
160  *
161  * By default, capablities for each node are
162  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
163  * capabilities can be faked off by adding, for example,
164  * -CTDB_CAP_RECMASTER.
165  */
166
167 static bool nodemap_parse(struct node_map *node_map)
168 {
169         char line[1024];
170
171         while ((fgets(line, sizeof(line), stdin) != NULL)) {
172                 uint32_t pnn, flags, capabilities;
173                 char *tok, *t;
174                 char *ip;
175                 ctdb_sock_addr saddr;
176                 struct node *node;
177                 int ret;
178
179                 if (line[0] == '\n') {
180                         break;
181                 }
182
183                 /* Get rid of pesky newline */
184                 if ((t = strchr(line, '\n')) != NULL) {
185                         *t = '\0';
186                 }
187
188                 /* Get PNN */
189                 tok = strtok(line, " \t");
190                 if (tok == NULL) {
191                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
192                         continue;
193                 }
194                 pnn = (uint32_t)strtoul(tok, NULL, 0);
195
196                 /* Get IP */
197                 tok = strtok(NULL, " \t");
198                 if (tok == NULL) {
199                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
200                         continue;
201                 }
202                 ret = ctdb_sock_addr_from_string(tok, &saddr, false);
203                 if (ret != 0) {
204                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
205                         continue;
206                 }
207                 ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
208                 ip = talloc_strdup(node_map, tok);
209                 if (ip == NULL) {
210                         goto fail;
211                 }
212
213                 /* Get flags */
214                 tok = strtok(NULL, " \t");
215                 if (tok == NULL) {
216                         fprintf(stderr, "bad line (%s) - missing flags\n",
217                                 line);
218                         continue;
219                 }
220                 flags = (uint32_t)strtoul(tok, NULL, 0);
221                 /* Handle deleted nodes */
222                 if (flags & NODE_FLAGS_DELETED) {
223                         talloc_free(ip);
224                         ip = talloc_strdup(node_map, "0.0.0.0");
225                         if (ip == NULL) {
226                                 goto fail;
227                         }
228                 }
229                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
230
231                 tok = strtok(NULL, " \t");
232                 while (tok != NULL) {
233                         if (strcmp(tok, "CURRENT") == 0) {
234                                 node_map->pnn = pnn;
235                         } else if (strcmp(tok, "RECMASTER") == 0) {
236                                 node_map->recmaster = pnn;
237                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
238                                 capabilities &= ~CTDB_CAP_RECMASTER;
239                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
240                                 capabilities &= ~CTDB_CAP_LMASTER;
241                         } else if (strcmp(tok, "TIMEOUT") == 0) {
242                                 /* This can be done with just a flag
243                                  * value but it is probably clearer
244                                  * and less error-prone to fake this
245                                  * with an explicit token */
246                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
247                         }
248                         tok = strtok(NULL, " \t");
249                 }
250
251                 node_map->node = talloc_realloc(node_map, node_map->node,
252                                                 struct node,
253                                                 node_map->num_nodes + 1);
254                 if (node_map->node == NULL) {
255                         goto fail;
256                 }
257                 node = &node_map->node[node_map->num_nodes];
258
259                 ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
260                 if (ret != 0) {
261                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
262                         continue;
263                 }
264                 ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
265                 node->pnn = pnn;
266                 node->flags = flags;
267                 node->capabilities = capabilities;
268                 node->recovery_disabled = false;
269                 node->recovery_substate = NULL;
270
271                 node_map->num_nodes += 1;
272         }
273
274         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
275         return true;
276
277 fail:
278         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
279         return false;
280
281 }
282
283 /* Append a node to a node map with given address and flags */
284 static bool node_map_add(struct ctdb_node_map *nodemap,
285                          const char *nstr, uint32_t flags)
286 {
287         ctdb_sock_addr addr;
288         uint32_t num;
289         struct ctdb_node_and_flags *n;
290         int ret;
291
292         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
293         if (ret != 0) {
294                 fprintf(stderr, "Invalid IP address %s\n", nstr);
295                 return false;
296         }
297         ctdb_sock_addr_set_port(&addr, CTDB_PORT);
298
299         num = nodemap->num;
300         nodemap->node = talloc_realloc(nodemap, nodemap->node,
301                                        struct ctdb_node_and_flags, num+1);
302         if (nodemap->node == NULL) {
303                 return false;
304         }
305
306         n = &nodemap->node[num];
307         n->addr = addr;
308         n->pnn = num;
309         n->flags = flags;
310
311         nodemap->num = num+1;
312         return true;
313 }
314
315 /* Read a nodes file into a node map */
316 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
317                                                   const char *nlist)
318 {
319         char **lines;
320         int nlines;
321         int i;
322         struct ctdb_node_map *nodemap;
323
324         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
325         if (nodemap == NULL) {
326                 return NULL;
327         }
328
329         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
330         if (lines == NULL) {
331                 return NULL;
332         }
333
334         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
335                 nlines--;
336         }
337
338         for (i=0; i<nlines; i++) {
339                 char *node;
340                 uint32_t flags;
341                 size_t len;
342
343                 node = lines[i];
344                 /* strip leading spaces */
345                 while((*node == ' ') || (*node == '\t')) {
346                         node++;
347                 }
348
349                 len = strlen(node);
350
351                 /* strip trailing spaces */
352                 while ((len > 1) &&
353                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
354                 {
355                         node[len-1] = '\0';
356                         len--;
357                 }
358
359                 if (len == 0) {
360                         continue;
361                 }
362                 if (*node == '#') {
363                         /* A "deleted" node is a node that is
364                            commented out in the nodes file.  This is
365                            used instead of removing a line, which
366                            would cause subsequent nodes to change
367                            their PNN. */
368                         flags = NODE_FLAGS_DELETED;
369                         node = discard_const("0.0.0.0");
370                 } else {
371                         flags = 0;
372                 }
373                 if (! node_map_add(nodemap, node, flags)) {
374                         talloc_free(lines);
375                         TALLOC_FREE(nodemap);
376                         return NULL;
377                 }
378         }
379
380         talloc_free(lines);
381         return nodemap;
382 }
383
384 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
385                                              uint32_t pnn)
386 {
387         struct ctdb_node_map *nodemap;
388         char nodes_list[PATH_MAX];
389         const char *ctdb_base;
390         int num;
391
392         ctdb_base = getenv("CTDB_BASE");
393         if (ctdb_base == NULL) {
394                 D_ERR("CTDB_BASE is not set\n");
395                 return NULL;
396         }
397
398         /* read optional node-specific nodes file */
399         num = snprintf(nodes_list, sizeof(nodes_list),
400                        "%s/nodes.%d", ctdb_base, pnn);
401         if (num == sizeof(nodes_list)) {
402                 D_ERR("nodes file path too long\n");
403                 return NULL;
404         }
405         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
406         if (nodemap != NULL) {
407                 /* Fake a load failure for an empty nodemap */
408                 if (nodemap->num == 0) {
409                         talloc_free(nodemap);
410
411                         D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
412                         return NULL;
413                 }
414
415                 return nodemap;
416         }
417
418         /* read normal nodes file */
419         num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
420         if (num == sizeof(nodes_list)) {
421                 D_ERR("nodes file path too long\n");
422                 return NULL;
423         }
424         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
425         if (nodemap != NULL) {
426                 return nodemap;
427         }
428
429         DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
430         return NULL;
431 }
432
433 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
434 {
435         struct interface_map *iface_map;
436
437         iface_map = talloc_zero(mem_ctx, struct interface_map);
438         if (iface_map == NULL) {
439                 return NULL;
440         }
441
442         return iface_map;
443 }
444
445 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
446  * output:
447  *   :Name:LinkStatus:References:
448  *   :eth2:1:4294967294
449  *   :eth1:1:4294967292
450  */
451
452 static bool interfaces_parse(struct interface_map *iface_map)
453 {
454         char line[1024];
455
456         while ((fgets(line, sizeof(line), stdin) != NULL)) {
457                 uint16_t link_state;
458                 uint32_t references;
459                 char *tok, *t, *name;
460                 struct interface *iface;
461
462                 if (line[0] == '\n') {
463                         break;
464                 }
465
466                 /* Get rid of pesky newline */
467                 if ((t = strchr(line, '\n')) != NULL) {
468                         *t = '\0';
469                 }
470
471                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
472                         continue;
473                 }
474
475                 /* Leading colon... */
476                 // tok = strtok(line, ":");
477
478                 /* name */
479                 tok = strtok(line, ":");
480                 if (tok == NULL) {
481                         fprintf(stderr, "bad line (%s) - missing name\n", line);
482                         continue;
483                 }
484                 name = tok;
485
486                 /* link_state */
487                 tok = strtok(NULL, ":");
488                 if (tok == NULL) {
489                         fprintf(stderr, "bad line (%s) - missing link state\n",
490                                 line);
491                         continue;
492                 }
493                 link_state = (uint16_t)strtoul(tok, NULL, 0);
494
495                 /* references... */
496                 tok = strtok(NULL, ":");
497                 if (tok == NULL) {
498                         fprintf(stderr, "bad line (%s) - missing references\n",
499                                 line);
500                         continue;
501                 }
502                 references = (uint32_t)strtoul(tok, NULL, 0);
503
504                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
505                                                   struct interface,
506                                                   iface_map->num + 1);
507                 if (iface_map->iface == NULL) {
508                         goto fail;
509                 }
510
511                 iface = &iface_map->iface[iface_map->num];
512
513                 iface->name = talloc_strdup(iface_map, name);
514                 if (iface->name == NULL) {
515                         goto fail;
516                 }
517                 iface->link_up = link_state;
518                 iface->references = references;
519
520                 iface_map->num += 1;
521         }
522
523         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
524         return true;
525
526 fail:
527         fprintf(stderr, "Parsing interfaces failed\n");
528         return false;
529 }
530
531 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
532 {
533         struct vnn_map *vnn_map;
534
535         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
536         if (vnn_map == NULL) {
537                 fprintf(stderr, "Memory error\n");
538                 return NULL;
539         }
540         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
541         vnn_map->generation = INVALID_GENERATION;
542
543         return vnn_map;
544 }
545
546 /* Read vnn map.
547  * output:
548  *   <GENERATION>
549  *   <LMASTER0>
550  *   <LMASTER1>
551  *   ...
552  */
553
554 static bool vnnmap_parse(struct vnn_map *vnn_map)
555 {
556         char line[1024];
557
558         while (fgets(line, sizeof(line), stdin) != NULL) {
559                 uint32_t n;
560                 char *t;
561
562                 if (line[0] == '\n') {
563                         break;
564                 }
565
566                 /* Get rid of pesky newline */
567                 if ((t = strchr(line, '\n')) != NULL) {
568                         *t = '\0';
569                 }
570
571                 n = (uint32_t) strtol(line, NULL, 0);
572
573                 /* generation */
574                 if (vnn_map->generation == INVALID_GENERATION) {
575                         vnn_map->generation = n;
576                         continue;
577                 }
578
579                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
580                                               vnn_map->size + 1);
581                 if (vnn_map->map == NULL) {
582                         fprintf(stderr, "Memory error\n");
583                         goto fail;
584                 }
585
586                 vnn_map->map[vnn_map->size] = n;
587                 vnn_map->size += 1;
588         }
589
590         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
591         return true;
592
593 fail:
594         fprintf(stderr, "Parsing vnnmap failed\n");
595         return false;
596 }
597
598 static bool reclock_parse(struct ctdbd_context *ctdb)
599 {
600         char line[1024];
601         char *t;
602
603         if (fgets(line, sizeof(line), stdin) == NULL) {
604                 goto fail;
605         }
606
607         if (line[0] == '\n') {
608                 /* Recovery lock remains unset */
609                 goto ok;
610         }
611
612         /* Get rid of pesky newline */
613         if ((t = strchr(line, '\n')) != NULL) {
614                 *t = '\0';
615         }
616
617         ctdb->reclock = talloc_strdup(ctdb, line);
618         if (ctdb->reclock == NULL) {
619                 goto fail;
620         }
621 ok:
622         /* Swallow possible blank line following section.  Picky
623          * compiler settings don't allow the return value to be
624          * ignored, so make the compiler happy.
625          */
626         if (fgets(line, sizeof(line), stdin) == NULL) {
627                 ;
628         }
629         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
630         return true;
631
632 fail:
633         fprintf(stderr, "Parsing reclock failed\n");
634         return false;
635 }
636
637 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
638                                        const char *dbdir)
639 {
640         struct database_map *db_map;
641
642         db_map = talloc_zero(mem_ctx, struct database_map);
643         if (db_map == NULL) {
644                 return NULL;
645         }
646
647         db_map->dbdir = talloc_strdup(db_map, dbdir);
648         if (db_map->dbdir == NULL) {
649                 talloc_free(db_map);
650                 return NULL;
651         }
652
653         return db_map;
654 }
655
656 /* Read a database map from stdin.  Each line looks like:
657  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
658  * EOF or a blank line terminates input.
659  *
660  * By default, flags and seq_num are 0
661  */
662
663 static bool dbmap_parse(struct database_map *db_map)
664 {
665         char line[1024];
666
667         while ((fgets(line, sizeof(line), stdin) != NULL)) {
668                 uint32_t id;
669                 uint8_t flags = 0;
670                 uint32_t seq_num = 0;
671                 char *tok, *t;
672                 char *name;
673                 struct database *db;
674
675                 if (line[0] == '\n') {
676                         break;
677                 }
678
679                 /* Get rid of pesky newline */
680                 if ((t = strchr(line, '\n')) != NULL) {
681                         *t = '\0';
682                 }
683
684                 /* Get ID */
685                 tok = strtok(line, " \t");
686                 if (tok == NULL) {
687                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
688                         continue;
689                 }
690                 id = (uint32_t)strtoul(tok, NULL, 0);
691
692                 /* Get NAME */
693                 tok = strtok(NULL, " \t");
694                 if (tok == NULL) {
695                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
696                         continue;
697                 }
698                 name = talloc_strdup(db_map, tok);
699                 if (name == NULL) {
700                         goto fail;
701                 }
702
703                 /* Get flags */
704                 tok = strtok(NULL, " \t");
705                 while (tok != NULL) {
706                         if (strcmp(tok, "PERSISTENT") == 0) {
707                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
708                         } else if (strcmp(tok, "STICKY") == 0) {
709                                 flags |= CTDB_DB_FLAGS_STICKY;
710                         } else if (strcmp(tok, "READONLY") == 0) {
711                                 flags |= CTDB_DB_FLAGS_READONLY;
712                         } else if (strcmp(tok, "REPLICATED") == 0) {
713                                 flags |= CTDB_DB_FLAGS_REPLICATED;
714                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
715                                 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
716                                              CTDB_DB_FLAGS_REPLICATED;
717
718                                 if ((flags & nv) == 0) {
719                                         fprintf(stderr,
720                                                 "seq_num for volatile db\n");
721                                         goto fail;
722                                 }
723                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
724                         }
725
726                         tok = strtok(NULL, " \t");
727                 }
728
729                 db = talloc_zero(db_map, struct database);
730                 if (db == NULL) {
731                         goto fail;
732                 }
733
734                 db->id = id;
735                 db->name = talloc_steal(db, name);
736                 db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
737                 if (db->path == NULL) {
738                         talloc_free(db);
739                         goto fail;
740                 }
741                 db->flags = flags;
742                 db->seq_num = seq_num;
743
744                 DLIST_ADD_END(db_map->db, db);
745         }
746
747         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
748         return true;
749
750 fail:
751         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
752         return false;
753
754 }
755
756 static struct database *database_find(struct database_map *db_map,
757                                       uint32_t db_id)
758 {
759         struct database *db;
760
761         for (db = db_map->db; db != NULL; db = db->next) {
762                 if (db->id == db_id) {
763                         return db;
764                 }
765         }
766
767         return NULL;
768 }
769
770 static int database_count(struct database_map *db_map)
771 {
772         struct database *db;
773         int count = 0;
774
775         for (db = db_map->db; db != NULL; db = db->next) {
776                 count += 1;
777         }
778
779         return count;
780 }
781
782 static int database_flags(uint8_t db_flags)
783 {
784         int tdb_flags = 0;
785
786         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
787                 tdb_flags = TDB_DEFAULT;
788         } else {
789                 /* volatile and replicated use the same flags */
790                 tdb_flags = TDB_NOSYNC |
791                             TDB_CLEAR_IF_FIRST |
792                             TDB_INCOMPATIBLE_HASH;
793         }
794
795         tdb_flags |= TDB_DISALLOW_NESTING;
796
797         return tdb_flags;
798 }
799
800 static struct database *database_new(struct database_map *db_map,
801                                      const char *name, uint8_t flags)
802 {
803         struct database *db;
804         TDB_DATA key;
805         int tdb_flags;
806
807         db = talloc_zero(db_map, struct database);
808         if (db == NULL) {
809                 return NULL;
810         }
811
812         db->name = talloc_strdup(db, name);
813         if (db->name == NULL) {
814                 goto fail;
815         }
816
817         db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
818         if (db->path == NULL) {
819                 goto fail;
820         }
821
822         key.dsize = strlen(db->name) + 1;
823         key.dptr = discard_const(db->name);
824
825         db->id = tdb_jenkins_hash(&key);
826         db->flags = flags;
827
828         tdb_flags = database_flags(flags);
829
830         db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
831         if (db->tdb == NULL) {
832                 DBG_ERR("tdb_open\n");
833                 goto fail;
834         }
835
836         DLIST_ADD_END(db_map->db, db);
837         return db;
838
839 fail:
840         DBG_ERR("Memory error\n");
841         talloc_free(db);
842         return NULL;
843
844 }
845
846 static int ltdb_store(struct database *db, TDB_DATA key,
847                       struct ctdb_ltdb_header *header, TDB_DATA data)
848 {
849         int ret;
850         bool db_volatile = true;
851         bool keep = false;
852
853         if (db->tdb == NULL) {
854                 return EINVAL;
855         }
856
857         if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
858             (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
859                 db_volatile = false;
860         }
861
862         if (data.dsize > 0) {
863                 keep = true;
864         } else {
865                 if (db_volatile && header->rsn == 0) {
866                         keep = true;
867                 }
868         }
869
870         if (keep) {
871                 TDB_DATA rec[2];
872
873                 rec[0].dsize = ctdb_ltdb_header_len(header);
874                 rec[0].dptr = (uint8_t *)header;
875
876                 rec[1].dsize = data.dsize;
877                 rec[1].dptr = data.dptr;
878
879                 ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
880         } else {
881                 if (header->rsn > 0) {
882                         ret = tdb_delete(db->tdb, key);
883                 } else {
884                         ret = 0;
885                 }
886         }
887
888         return ret;
889 }
890
891 static int ltdb_fetch(struct database *db, TDB_DATA key,
892                       struct ctdb_ltdb_header *header,
893                       TALLOC_CTX *mem_ctx, TDB_DATA *data)
894 {
895         TDB_DATA rec;
896         size_t np;
897         int ret;
898
899         if (db->tdb == NULL) {
900                 return EINVAL;
901         }
902
903         rec = tdb_fetch(db->tdb, key);
904         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
905         if (ret != 0) {
906                 if (rec.dptr != NULL) {
907                         free(rec.dptr);
908                 }
909
910                 *header = (struct ctdb_ltdb_header) {
911                         .rsn = 0,
912                         .dmaster = 0,
913                         .flags = 0,
914                 };
915
916                 ret = ltdb_store(db, key, header, tdb_null);
917                 if (ret != 0) {
918                         return ret;
919                 }
920
921                 *data = tdb_null;
922                 return 0;
923         }
924
925         data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
926         data->dptr = talloc_memdup(mem_ctx,
927                                    rec.dptr + ctdb_ltdb_header_len(header),
928                                    data->dsize);
929         if (data->dptr == NULL) {
930                 free(rec.dptr);
931                 return ENOMEM;
932         }
933
934         return 0;
935 }
936
937 static int database_seqnum(struct database *db, uint64_t *seqnum)
938 {
939         const char *keyname = CTDB_DB_SEQNUM_KEY;
940         TDB_DATA key, data;
941         struct ctdb_ltdb_header header;
942         size_t np;
943         int ret;
944
945         if (db->tdb == NULL) {
946                 *seqnum = db->seq_num;
947                 return 0;
948         }
949
950         key.dptr = discard_const(keyname);
951         key.dsize = strlen(keyname) + 1;
952
953         ret = ltdb_fetch(db, key, &header, db, &data);
954         if (ret != 0) {
955                 return ret;
956         }
957
958         if (data.dsize == 0) {
959                 *seqnum = 0;
960                 return 0;
961         }
962
963         ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
964         talloc_free(data.dptr);
965         if (ret != 0) {
966                 *seqnum = 0;
967         }
968
969         return ret;
970 }
971
972 static bool public_ips_parse(struct ctdbd_context *ctdb,
973                              uint32_t numnodes)
974 {
975         bool status;
976
977         if (numnodes == 0) {
978                 D_ERR("Must initialise nodemap before public IPs\n");
979                 return false;
980         }
981
982         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
983
984         status = (ctdb->known_ips != NULL);
985
986         if (status) {
987                 D_INFO("Parsing public IPs done\n");
988         } else {
989                 D_INFO("Parsing public IPs failed\n");
990         }
991
992         return status;
993 }
994
995 /* Read information about controls to fail.  Format is:
996  *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
997  */
998 static bool control_failures_parse(struct ctdbd_context *ctdb)
999 {
1000         char line[1024];
1001
1002         while ((fgets(line, sizeof(line), stdin) != NULL)) {
1003                 char *tok, *t;
1004                 enum ctdb_controls opcode;
1005                 uint32_t pnn;
1006                 const char *error;
1007                 const char *comment;
1008                 struct fake_control_failure *failure = NULL;
1009
1010                 if (line[0] == '\n') {
1011                         break;
1012                 }
1013
1014                 /* Get rid of pesky newline */
1015                 if ((t = strchr(line, '\n')) != NULL) {
1016                         *t = '\0';
1017                 }
1018
1019                 /* Get opcode */
1020                 tok = strtok(line, " \t");
1021                 if (tok == NULL) {
1022                         D_ERR("bad line (%s) - missing opcode\n", line);
1023                         continue;
1024                 }
1025                 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
1026
1027                 /* Get PNN */
1028                 tok = strtok(NULL, " \t");
1029                 if (tok == NULL) {
1030                         D_ERR("bad line (%s) - missing PNN\n", line);
1031                         continue;
1032                 }
1033                 pnn = (uint32_t)strtoul(tok, NULL, 0);
1034
1035                 /* Get error */
1036                 tok = strtok(NULL, " \t");
1037                 if (tok == NULL) {
1038                         D_ERR("bad line (%s) - missing errno\n", line);
1039                         continue;
1040                 }
1041                 error = talloc_strdup(ctdb, tok);
1042                 if (error == NULL) {
1043                         goto fail;
1044                 }
1045                 if (strcmp(error, "ERROR") != 0 &&
1046                     strcmp(error, "TIMEOUT") != 0) {
1047                         D_ERR("bad line (%s) "
1048                               "- error must be \"ERROR\" or \"TIMEOUT\"\n",
1049                               line);
1050                         goto fail;
1051                 }
1052
1053                 /* Get comment */
1054                 tok = strtok(NULL, "\n"); /* rest of line */
1055                 if (tok == NULL) {
1056                         D_ERR("bad line (%s) - missing comment\n", line);
1057                         continue;
1058                 }
1059                 comment = talloc_strdup(ctdb, tok);
1060                 if (comment == NULL) {
1061                         goto fail;
1062                 }
1063
1064                 failure = talloc_zero(ctdb, struct fake_control_failure);
1065                 if (failure == NULL) {
1066                         goto fail;
1067                 }
1068
1069                 failure->opcode = opcode;
1070                 failure->pnn = pnn;
1071                 failure->error = error;
1072                 failure->comment = comment;
1073
1074                 DLIST_ADD(ctdb->control_failures, failure);
1075         }
1076
1077         D_INFO("Parsing fake control failures done\n");
1078         return true;
1079
1080 fail:
1081         D_INFO("Parsing fake control failures failed\n");
1082         return false;
1083 }
1084
1085 /*
1086  * Manage clients
1087  */
1088
1089 static int ctdb_client_destructor(struct ctdb_client *client)
1090 {
1091         DLIST_REMOVE(client->ctdb->client_list, client);
1092         return 0;
1093 }
1094
1095 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
1096                       void *client_state)
1097 {
1098         struct ctdb_client *client;
1099
1100         client = talloc_zero(client_state, struct ctdb_client);
1101         if (client == NULL) {
1102                 return ENOMEM;
1103         }
1104
1105         client->ctdb = ctdb;
1106         client->pid = client_pid;
1107         client->state = client_state;
1108
1109         DLIST_ADD(ctdb->client_list, client);
1110         talloc_set_destructor(client, ctdb_client_destructor);
1111         return 0;
1112 }
1113
1114 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
1115 {
1116         struct ctdb_client *client;
1117
1118         for (client=ctdb->client_list; client != NULL; client=client->next) {
1119                 if (client->pid == client_pid) {
1120                         return client->state;
1121                 }
1122         }
1123
1124         return NULL;
1125 }
1126
1127 /*
1128  * CTDB context setup
1129  */
1130
1131 static uint32_t new_generation(uint32_t old_generation)
1132 {
1133         uint32_t generation;
1134
1135         while (1) {
1136                 generation = random();
1137                 if (generation != INVALID_GENERATION &&
1138                     generation != old_generation) {
1139                         break;
1140                 }
1141         }
1142
1143         return generation;
1144 }
1145
1146 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
1147                                          const char *dbdir)
1148 {
1149         struct ctdbd_context *ctdb;
1150         char line[1024];
1151         bool status;
1152         int ret;
1153
1154         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
1155         if (ctdb == NULL) {
1156                 return NULL;
1157         }
1158
1159         ctdb->node_map = nodemap_init(ctdb);
1160         if (ctdb->node_map == NULL) {
1161                 goto fail;
1162         }
1163
1164         ctdb->iface_map = interfaces_init(ctdb);
1165         if (ctdb->iface_map == NULL) {
1166                 goto fail;
1167         }
1168
1169         ctdb->vnn_map = vnnmap_init(ctdb);
1170         if (ctdb->vnn_map == NULL) {
1171                 goto fail;
1172         }
1173
1174         ctdb->db_map = dbmap_init(ctdb, dbdir);
1175         if (ctdb->db_map == NULL) {
1176                 goto fail;
1177         }
1178
1179         ret = srvid_init(ctdb, &ctdb->srv);
1180         if (ret != 0) {
1181                 goto fail;
1182         }
1183
1184         while (fgets(line, sizeof(line), stdin) != NULL) {
1185                 char *t;
1186
1187                 if ((t = strchr(line, '\n')) != NULL) {
1188                         *t = '\0';
1189                 }
1190
1191                 if (strcmp(line, "NODEMAP") == 0) {
1192                         status = nodemap_parse(ctdb->node_map);
1193                 } else if (strcmp(line, "IFACES") == 0) {
1194                         status = interfaces_parse(ctdb->iface_map);
1195                 } else if (strcmp(line, "VNNMAP") == 0) {
1196                         status = vnnmap_parse(ctdb->vnn_map);
1197                 } else if (strcmp(line, "DBMAP") == 0) {
1198                         status = dbmap_parse(ctdb->db_map);
1199                 } else if (strcmp(line, "PUBLICIPS") == 0) {
1200                         status = public_ips_parse(ctdb,
1201                                                   ctdb->node_map->num_nodes);
1202                 } else if (strcmp(line, "RECLOCK") == 0) {
1203                         status = reclock_parse(ctdb);
1204                 } else if (strcmp(line, "CONTROLFAILS") == 0) {
1205                         status = control_failures_parse(ctdb);
1206                 } else {
1207                         fprintf(stderr, "Unknown line %s\n", line);
1208                         status = false;
1209                 }
1210
1211                 if (! status) {
1212                         goto fail;
1213                 }
1214         }
1215
1216         ctdb->start_time = tevent_timeval_current();
1217         ctdb->recovery_start_time = tevent_timeval_current();
1218         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1219         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
1220                 ctdb->vnn_map->generation =
1221                         new_generation(ctdb->vnn_map->generation);
1222         }
1223         ctdb->recovery_end_time = tevent_timeval_current();
1224
1225         ctdb->log_level = DEBUG_ERR;
1226         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
1227
1228         ctdb_tunable_set_defaults(&ctdb->tun_list);
1229
1230         return ctdb;
1231
1232 fail:
1233         TALLOC_FREE(ctdb);
1234         return NULL;
1235 }
1236
1237 static bool ctdbd_verify(struct ctdbd_context *ctdb)
1238 {
1239         struct node *node;
1240         int i;
1241
1242         if (ctdb->node_map->num_nodes == 0) {
1243                 return true;
1244         }
1245
1246         /* Make sure all the nodes are in order */
1247         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1248                 node = &ctdb->node_map->node[i];
1249                 if (node->pnn != i) {
1250                         fprintf(stderr, "Expected node %u, found %u\n",
1251                                 i, node->pnn);
1252                         return false;
1253                 }
1254         }
1255
1256         node = &ctdb->node_map->node[ctdb->node_map->pnn];
1257         if (node->flags & NODE_FLAGS_DISCONNECTED) {
1258                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1259                 exit(0);
1260         }
1261
1262         return true;
1263 }
1264
1265 /*
1266  * Doing a recovery
1267  */
1268
1269 struct recover_state {
1270         struct tevent_context *ev;
1271         struct ctdbd_context *ctdb;
1272 };
1273
1274 static int recover_check(struct tevent_req *req);
1275 static void recover_wait_done(struct tevent_req *subreq);
1276 static void recover_done(struct tevent_req *subreq);
1277
1278 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1279                                        struct tevent_context *ev,
1280                                        struct ctdbd_context *ctdb)
1281 {
1282         struct tevent_req *req;
1283         struct recover_state *state;
1284         int ret;
1285
1286         req = tevent_req_create(mem_ctx, &state, struct recover_state);
1287         if (req == NULL) {
1288                 return NULL;
1289         }
1290
1291         state->ev = ev;
1292         state->ctdb = ctdb;
1293
1294         ret = recover_check(req);
1295         if (ret != 0) {
1296                 tevent_req_error(req, ret);
1297                 return tevent_req_post(req, ev);
1298         }
1299
1300         return req;
1301 }
1302
1303 static int recover_check(struct tevent_req *req)
1304 {
1305         struct recover_state *state = tevent_req_data(
1306                 req, struct recover_state);
1307         struct ctdbd_context *ctdb = state->ctdb;
1308         struct tevent_req *subreq;
1309         bool recovery_disabled;
1310         int i;
1311
1312         recovery_disabled = false;
1313         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1314                 if (ctdb->node_map->node[i].recovery_disabled) {
1315                         recovery_disabled = true;
1316                         break;
1317                 }
1318         }
1319
1320         subreq = tevent_wakeup_send(state, state->ev,
1321                                     tevent_timeval_current_ofs(1, 0));
1322         if (subreq == NULL) {
1323                 return ENOMEM;
1324         }
1325
1326         if (recovery_disabled) {
1327                 tevent_req_set_callback(subreq, recover_wait_done, req);
1328         } else {
1329                 ctdb->recovery_start_time = tevent_timeval_current();
1330                 tevent_req_set_callback(subreq, recover_done, req);
1331         }
1332
1333         return 0;
1334 }
1335
1336 static void recover_wait_done(struct tevent_req *subreq)
1337 {
1338         struct tevent_req *req = tevent_req_callback_data(
1339                 subreq, struct tevent_req);
1340         int ret;
1341         bool status;
1342
1343         status = tevent_wakeup_recv(subreq);
1344         TALLOC_FREE(subreq);
1345         if (! status) {
1346                 tevent_req_error(req, EIO);
1347                 return;
1348         }
1349
1350         ret = recover_check(req);
1351         if (ret != 0) {
1352                 tevent_req_error(req, ret);
1353         }
1354 }
1355
1356 static void recover_done(struct tevent_req *subreq)
1357 {
1358         struct tevent_req *req = tevent_req_callback_data(
1359                 subreq, struct tevent_req);
1360         struct recover_state *state = tevent_req_data(
1361                 req, struct recover_state);
1362         struct ctdbd_context *ctdb = state->ctdb;
1363         bool status;
1364
1365         status = tevent_wakeup_recv(subreq);
1366         TALLOC_FREE(subreq);
1367         if (! status) {
1368                 tevent_req_error(req, EIO);
1369                 return;
1370         }
1371
1372         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1373         ctdb->recovery_end_time = tevent_timeval_current();
1374         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1375
1376         tevent_req_done(req);
1377 }
1378
1379 static bool recover_recv(struct tevent_req *req, int *perr)
1380 {
1381         int err;
1382
1383         if (tevent_req_is_unix_error(req, &err)) {
1384                 if (perr != NULL) {
1385                         *perr = err;
1386                 }
1387                 return false;
1388         }
1389
1390         return true;
1391 }
1392
1393 /*
1394  * Routines for ctdb_req_header
1395  */
1396
1397 static void header_fix_pnn(struct ctdb_req_header *header,
1398                            struct ctdbd_context *ctdb)
1399 {
1400         if (header->srcnode == CTDB_CURRENT_NODE) {
1401                 header->srcnode = ctdb->node_map->pnn;
1402         }
1403
1404         if (header->destnode == CTDB_CURRENT_NODE) {
1405                 header->destnode = ctdb->node_map->pnn;
1406         }
1407 }
1408
1409 static struct ctdb_req_header header_reply_control(
1410                                         struct ctdb_req_header *header,
1411                                         struct ctdbd_context *ctdb)
1412 {
1413         struct ctdb_req_header reply_header;
1414
1415         reply_header = (struct ctdb_req_header) {
1416                 .ctdb_magic = CTDB_MAGIC,
1417                 .ctdb_version = CTDB_PROTOCOL,
1418                 .generation = ctdb->vnn_map->generation,
1419                 .operation = CTDB_REPLY_CONTROL,
1420                 .destnode = header->srcnode,
1421                 .srcnode = header->destnode,
1422                 .reqid = header->reqid,
1423         };
1424
1425         return reply_header;
1426 }
1427
1428 static struct ctdb_req_header header_reply_message(
1429                                         struct ctdb_req_header *header,
1430                                         struct ctdbd_context *ctdb)
1431 {
1432         struct ctdb_req_header reply_header;
1433
1434         reply_header = (struct ctdb_req_header) {
1435                 .ctdb_magic = CTDB_MAGIC,
1436                 .ctdb_version = CTDB_PROTOCOL,
1437                 .generation = ctdb->vnn_map->generation,
1438                 .operation = CTDB_REQ_MESSAGE,
1439                 .destnode = header->srcnode,
1440                 .srcnode = header->destnode,
1441                 .reqid = 0,
1442         };
1443
1444         return reply_header;
1445 }
1446
1447 /*
1448  * Client state
1449  */
1450
1451 struct client_state {
1452         struct tevent_context *ev;
1453         int fd;
1454         struct ctdbd_context *ctdb;
1455         int pnn;
1456         pid_t pid;
1457         struct comm_context *comm;
1458         struct srvid_register_state *rstate;
1459         int status;
1460 };
1461
1462 /*
1463  * Send replies to controls and messages
1464  */
1465
1466 static void client_reply_done(struct tevent_req *subreq);
1467
1468 static void client_send_message(struct tevent_req *req,
1469                                 struct ctdb_req_header *header,
1470                                 struct ctdb_req_message_data *message)
1471 {
1472         struct client_state *state = tevent_req_data(
1473                 req, struct client_state);
1474         struct ctdbd_context *ctdb = state->ctdb;
1475         struct tevent_req *subreq;
1476         struct ctdb_req_header reply_header;
1477         uint8_t *buf;
1478         size_t datalen, buflen;
1479         int ret;
1480
1481         reply_header = header_reply_message(header, ctdb);
1482
1483         datalen = ctdb_req_message_data_len(&reply_header, message);
1484         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1485         if (ret != 0) {
1486                 tevent_req_error(req, ret);
1487                 return;
1488         }
1489
1490         ret = ctdb_req_message_data_push(&reply_header, message,
1491                                          buf, &buflen);
1492         if (ret != 0) {
1493                 tevent_req_error(req, ret);
1494                 return;
1495         }
1496
1497         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1498
1499         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1500         if (tevent_req_nomem(subreq, req)) {
1501                 return;
1502         }
1503         tevent_req_set_callback(subreq, client_reply_done, req);
1504
1505         talloc_steal(subreq, buf);
1506 }
1507
1508 static void client_send_control(struct tevent_req *req,
1509                                 struct ctdb_req_header *header,
1510                                 struct ctdb_reply_control *reply)
1511 {
1512         struct client_state *state = tevent_req_data(
1513                 req, struct client_state);
1514         struct ctdbd_context *ctdb = state->ctdb;
1515         struct tevent_req *subreq;
1516         struct ctdb_req_header reply_header;
1517         uint8_t *buf;
1518         size_t datalen, buflen;
1519         int ret;
1520
1521         reply_header = header_reply_control(header, ctdb);
1522
1523         datalen = ctdb_reply_control_len(&reply_header, reply);
1524         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1525         if (ret != 0) {
1526                 tevent_req_error(req, ret);
1527                 return;
1528         }
1529
1530         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1531         if (ret != 0) {
1532                 tevent_req_error(req, ret);
1533                 return;
1534         }
1535
1536         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1537
1538         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1539         if (tevent_req_nomem(subreq, req)) {
1540                 return;
1541         }
1542         tevent_req_set_callback(subreq, client_reply_done, req);
1543
1544         talloc_steal(subreq, buf);
1545 }
1546
1547 static void client_reply_done(struct tevent_req *subreq)
1548 {
1549         struct tevent_req *req = tevent_req_callback_data(
1550                 subreq, struct tevent_req);
1551         int ret;
1552         bool status;
1553
1554         status = comm_write_recv(subreq, &ret);
1555         TALLOC_FREE(subreq);
1556         if (! status) {
1557                 tevent_req_error(req, ret);
1558         }
1559 }
1560
1561 /*
1562  * Handling protocol - controls
1563  */
1564
1565 static void control_process_exists(TALLOC_CTX *mem_ctx,
1566                                    struct tevent_req *req,
1567                                    struct ctdb_req_header *header,
1568                                    struct ctdb_req_control *request)
1569 {
1570         struct client_state *state = tevent_req_data(
1571                 req, struct client_state);
1572         struct ctdbd_context *ctdb = state->ctdb;
1573         struct client_state *cstate;
1574         struct ctdb_reply_control reply;
1575
1576         reply.rdata.opcode = request->opcode;
1577
1578         cstate = client_find(ctdb, request->rdata.data.pid);
1579         if (cstate == NULL) {
1580                 reply.status = -1;
1581                 reply.errmsg = "No client for PID";
1582         } else {
1583                 reply.status = kill(request->rdata.data.pid, 0);
1584                 reply.errmsg = NULL;
1585         }
1586
1587         client_send_control(req, header, &reply);
1588 }
1589
1590 static void control_ping(TALLOC_CTX *mem_ctx,
1591                          struct tevent_req *req,
1592                          struct ctdb_req_header *header,
1593                          struct ctdb_req_control *request)
1594 {
1595         struct client_state *state = tevent_req_data(
1596                 req, struct client_state);
1597         struct ctdbd_context *ctdb = state->ctdb;
1598         struct ctdb_reply_control reply;
1599
1600         reply.rdata.opcode = request->opcode;
1601         reply.status = ctdb->num_clients;
1602         reply.errmsg = NULL;
1603
1604         client_send_control(req, header, &reply);
1605 }
1606
1607 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1608                               struct tevent_req *req,
1609                               struct ctdb_req_header *header,
1610                               struct ctdb_req_control *request)
1611 {
1612         struct client_state *state = tevent_req_data(
1613                 req, struct client_state);
1614         struct ctdbd_context *ctdb = state->ctdb;
1615         struct ctdb_reply_control reply;
1616         struct database *db;
1617
1618         reply.rdata.opcode = request->opcode;
1619
1620         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1621         if (db == NULL) {
1622                 reply.status = ENOENT;
1623                 reply.errmsg = "Database not found";
1624         } else {
1625                 reply.rdata.data.db_path =
1626                         talloc_strdup(mem_ctx, db->path);
1627                 if (reply.rdata.data.db_path == NULL) {
1628                         reply.status = ENOMEM;
1629                         reply.errmsg = "Memory error";
1630                 } else {
1631                         reply.status = 0;
1632                         reply.errmsg = NULL;
1633                 }
1634         }
1635
1636         client_send_control(req, header, &reply);
1637 }
1638
1639 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1640                               struct tevent_req *req,
1641                               struct ctdb_req_header *header,
1642                               struct ctdb_req_control *request)
1643 {
1644         struct client_state *state = tevent_req_data(
1645                 req, struct client_state);
1646         struct ctdbd_context *ctdb = state->ctdb;
1647         struct ctdb_reply_control reply;
1648         struct ctdb_vnn_map *vnnmap;
1649
1650         reply.rdata.opcode = request->opcode;
1651
1652         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1653         if (vnnmap == NULL) {
1654                 reply.status = ENOMEM;
1655                 reply.errmsg = "Memory error";
1656         } else {
1657                 vnnmap->generation = ctdb->vnn_map->generation;
1658                 vnnmap->size = ctdb->vnn_map->size;
1659                 vnnmap->map = ctdb->vnn_map->map;
1660
1661                 reply.rdata.data.vnnmap = vnnmap;
1662                 reply.status = 0;
1663                 reply.errmsg = NULL;
1664         }
1665
1666         client_send_control(req, header, &reply);
1667 }
1668
1669 static void control_get_debug(TALLOC_CTX *mem_ctx,
1670                               struct tevent_req *req,
1671                               struct ctdb_req_header *header,
1672                               struct ctdb_req_control *request)
1673 {
1674         struct client_state *state = tevent_req_data(
1675                 req, struct client_state);
1676         struct ctdbd_context *ctdb = state->ctdb;
1677         struct ctdb_reply_control reply;
1678
1679         reply.rdata.opcode = request->opcode;
1680         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1681         reply.status = 0;
1682         reply.errmsg = NULL;
1683
1684         client_send_control(req, header, &reply);
1685 }
1686
1687 static void control_set_debug(TALLOC_CTX *mem_ctx,
1688                               struct tevent_req *req,
1689                               struct ctdb_req_header *header,
1690                               struct ctdb_req_control *request)
1691 {
1692         struct client_state *state = tevent_req_data(
1693                 req, struct client_state);
1694         struct ctdbd_context *ctdb = state->ctdb;
1695         struct ctdb_reply_control reply;
1696
1697         ctdb->log_level = (int)request->rdata.data.loglevel;
1698
1699         reply.rdata.opcode = request->opcode;
1700         reply.status = 0;
1701         reply.errmsg = NULL;
1702
1703         client_send_control(req, header, &reply);
1704 }
1705
1706 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1707                               struct tevent_req *req,
1708                                struct ctdb_req_header *header,
1709                               struct ctdb_req_control *request)
1710 {
1711         struct client_state *state = tevent_req_data(
1712                 req, struct client_state);
1713         struct ctdbd_context *ctdb = state->ctdb;
1714         struct ctdb_reply_control reply;
1715         struct ctdb_dbid_map *dbmap;
1716         struct database *db;
1717         int i;
1718
1719         reply.rdata.opcode = request->opcode;
1720
1721         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1722         if (dbmap == NULL) {
1723                 goto fail;
1724         }
1725
1726         dbmap->num = database_count(ctdb->db_map);
1727         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1728         if (dbmap->dbs == NULL) {
1729                 goto fail;
1730         }
1731
1732         db = ctdb->db_map->db;
1733         for (i = 0; i < dbmap->num; i++) {
1734                 dbmap->dbs[i] = (struct ctdb_dbid) {
1735                         .db_id = db->id,
1736                         .flags = db->flags,
1737                 };
1738
1739                 db = db->next;
1740         }
1741
1742         reply.rdata.data.dbmap = dbmap;
1743         reply.status = 0;
1744         reply.errmsg = NULL;
1745         client_send_control(req, header, &reply);
1746         return;
1747
1748 fail:
1749         reply.status = -1;
1750         reply.errmsg = "Memory error";
1751         client_send_control(req, header, &reply);
1752 }
1753
1754 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1755                                 struct tevent_req *req,
1756                                 struct ctdb_req_header *header,
1757                                 struct ctdb_req_control *request)
1758 {
1759         struct client_state *state = tevent_req_data(
1760                 req, struct client_state);
1761         struct ctdbd_context *ctdb = state->ctdb;
1762         struct ctdb_reply_control reply;
1763
1764         reply.rdata.opcode = request->opcode;
1765         reply.status = ctdb->vnn_map->recmode;
1766         reply.errmsg = NULL;
1767
1768         client_send_control(req, header, &reply);
1769 }
1770
1771 struct set_recmode_state {
1772         struct tevent_req *req;
1773         struct ctdbd_context *ctdb;
1774         struct ctdb_req_header header;
1775         struct ctdb_reply_control reply;
1776 };
1777
1778 static void set_recmode_callback(struct tevent_req *subreq)
1779 {
1780         struct set_recmode_state *substate = tevent_req_callback_data(
1781                 subreq, struct set_recmode_state);
1782         bool status;
1783         int ret;
1784
1785         status = recover_recv(subreq, &ret);
1786         TALLOC_FREE(subreq);
1787         if (! status) {
1788                 substate->reply.status = ret;
1789                 substate->reply.errmsg = "recovery failed";
1790         } else {
1791                 substate->reply.status = 0;
1792                 substate->reply.errmsg = NULL;
1793         }
1794
1795         client_send_control(substate->req, &substate->header, &substate->reply);
1796         talloc_free(substate);
1797 }
1798
1799 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1800                                 struct tevent_req *req,
1801                                 struct ctdb_req_header *header,
1802                                 struct ctdb_req_control *request)
1803 {
1804         struct client_state *state = tevent_req_data(
1805                 req, struct client_state);
1806         struct tevent_req *subreq;
1807         struct ctdbd_context *ctdb = state->ctdb;
1808         struct set_recmode_state *substate;
1809         struct ctdb_reply_control reply;
1810
1811         reply.rdata.opcode = request->opcode;
1812
1813         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1814                 reply.status = -1;
1815                 reply.errmsg = "Client cannot set recmode to NORMAL";
1816                 goto fail;
1817         }
1818
1819         substate = talloc_zero(ctdb, struct set_recmode_state);
1820         if (substate == NULL) {
1821                 reply.status = -1;
1822                 reply.errmsg = "Memory error";
1823                 goto fail;
1824         }
1825
1826         substate->req = req;
1827         substate->ctdb = ctdb;
1828         substate->header = *header;
1829         substate->reply.rdata.opcode = request->opcode;
1830
1831         subreq = recover_send(substate, state->ev, state->ctdb);
1832         if (subreq == NULL) {
1833                 talloc_free(substate);
1834                 goto fail;
1835         }
1836         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1837
1838         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1839         return;
1840
1841 fail:
1842         client_send_control(req, header, &reply);
1843
1844 }
1845
1846 static void control_db_attach(TALLOC_CTX *mem_ctx,
1847                               struct tevent_req *req,
1848                               struct ctdb_req_header *header,
1849                               struct ctdb_req_control *request)
1850 {
1851         struct client_state *state = tevent_req_data(
1852                 req, struct client_state);
1853         struct ctdbd_context *ctdb = state->ctdb;
1854         struct ctdb_reply_control reply;
1855         struct database *db;
1856
1857         reply.rdata.opcode = request->opcode;
1858
1859         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
1860                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
1861                         goto done;
1862                 }
1863         }
1864
1865         db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
1866         if (db == NULL) {
1867                 reply.status = -1;
1868                 reply.errmsg = "Failed to attach database";
1869                 client_send_control(req, header, &reply);
1870                 return;
1871         }
1872
1873 done:
1874         reply.rdata.data.db_id = db->id;
1875         reply.status = 0;
1876         reply.errmsg = NULL;
1877         client_send_control(req, header, &reply);
1878 }
1879
1880 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1881 {
1882         printf("Received a message for SRVID 0x%"PRIx64"\n", srvid);
1883 }
1884
1885 static void control_register_srvid(TALLOC_CTX *mem_ctx,
1886                                    struct tevent_req *req,
1887                                    struct ctdb_req_header *header,
1888                                    struct ctdb_req_control *request)
1889 {
1890         struct client_state *state = tevent_req_data(
1891                 req, struct client_state);
1892         struct ctdbd_context *ctdb = state->ctdb;
1893         struct ctdb_reply_control reply;
1894         int ret;
1895
1896         reply.rdata.opcode = request->opcode;
1897
1898         ret = srvid_register(ctdb->srv, state, request->srvid,
1899                              srvid_handler, state);
1900         if (ret != 0) {
1901                 reply.status = -1;
1902                 reply.errmsg = "Memory error";
1903                 goto fail;
1904         }
1905
1906         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
1907
1908         reply.status = 0;
1909         reply.errmsg = NULL;
1910
1911 fail:
1912         client_send_control(req, header, &reply);
1913 }
1914
1915 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
1916                                      struct tevent_req *req,
1917                                      struct ctdb_req_header *header,
1918                                      struct ctdb_req_control *request)
1919 {
1920         struct client_state *state = tevent_req_data(
1921                 req, struct client_state);
1922         struct ctdbd_context *ctdb = state->ctdb;
1923         struct ctdb_reply_control reply;
1924         int ret;
1925
1926         reply.rdata.opcode = request->opcode;
1927
1928         ret = srvid_deregister(ctdb->srv, request->srvid, state);
1929         if (ret != 0) {
1930                 reply.status = -1;
1931                 reply.errmsg = "srvid not registered";
1932                 goto fail;
1933         }
1934
1935         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
1936
1937         reply.status = 0;
1938         reply.errmsg = NULL;
1939
1940 fail:
1941         client_send_control(req, header, &reply);
1942 }
1943
1944 static void control_get_dbname(TALLOC_CTX *mem_ctx,
1945                                struct tevent_req *req,
1946                                struct ctdb_req_header *header,
1947                                struct ctdb_req_control *request)
1948 {
1949         struct client_state *state = tevent_req_data(
1950                 req, struct client_state);
1951         struct ctdbd_context *ctdb = state->ctdb;
1952         struct ctdb_reply_control reply;
1953         struct database *db;
1954
1955         reply.rdata.opcode = request->opcode;
1956
1957         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1958         if (db == NULL) {
1959                 reply.status = ENOENT;
1960                 reply.errmsg = "Database not found";
1961         } else {
1962                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
1963                 if (reply.rdata.data.db_name == NULL) {
1964                         reply.status = ENOMEM;
1965                         reply.errmsg = "Memory error";
1966                 } else {
1967                         reply.status = 0;
1968                         reply.errmsg = NULL;
1969                 }
1970         }
1971
1972         client_send_control(req, header, &reply);
1973 }
1974
1975 static void control_get_pid(TALLOC_CTX *mem_ctx,
1976                             struct tevent_req *req,
1977                             struct ctdb_req_header *header,
1978                             struct ctdb_req_control *request)
1979 {
1980         struct ctdb_reply_control reply;
1981
1982         reply.rdata.opcode = request->opcode;
1983         reply.status = getpid();
1984         reply.errmsg = NULL;
1985
1986         client_send_control(req, header, &reply);
1987 }
1988
1989 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
1990                                   struct tevent_req *req,
1991                                   struct ctdb_req_header *header,
1992                                   struct ctdb_req_control *request)
1993 {
1994         struct client_state *state = tevent_req_data(
1995                 req, struct client_state);
1996         struct ctdbd_context *ctdb = state->ctdb;
1997         struct ctdb_reply_control reply;
1998
1999         reply.rdata.opcode = request->opcode;
2000         reply.status = ctdb->node_map->recmaster;
2001         reply.errmsg = NULL;
2002
2003         client_send_control(req, header, &reply);
2004 }
2005
2006 static void control_get_pnn(TALLOC_CTX *mem_ctx,
2007                             struct tevent_req *req,
2008                             struct ctdb_req_header *header,
2009                             struct ctdb_req_control *request)
2010 {
2011         struct ctdb_reply_control reply;
2012
2013         reply.rdata.opcode = request->opcode;
2014         reply.status = header->destnode;
2015         reply.errmsg = NULL;
2016
2017         client_send_control(req, header, &reply);
2018 }
2019
2020 static void control_shutdown(TALLOC_CTX *mem_ctx,
2021                              struct tevent_req *req,
2022                              struct ctdb_req_header *hdr,
2023                              struct ctdb_req_control *request)
2024 {
2025         struct client_state *state = tevent_req_data(
2026                 req, struct client_state);
2027
2028         state->status = 99;
2029 }
2030
2031 static void control_set_tunable(TALLOC_CTX *mem_ctx,
2032                                 struct tevent_req *req,
2033                                 struct ctdb_req_header *header,
2034                                 struct ctdb_req_control *request)
2035 {
2036         struct client_state *state = tevent_req_data(
2037                 req, struct client_state);
2038         struct ctdbd_context *ctdb = state->ctdb;
2039         struct ctdb_reply_control reply;
2040         bool ret, obsolete;
2041
2042         reply.rdata.opcode = request->opcode;
2043         reply.errmsg = NULL;
2044
2045         ret = ctdb_tunable_set_value(&ctdb->tun_list,
2046                                      request->rdata.data.tunable->name,
2047                                      request->rdata.data.tunable->value,
2048                                      &obsolete);
2049         if (! ret) {
2050                 reply.status = -1;
2051         } else if (obsolete) {
2052                 reply.status = 1;
2053         } else {
2054                 reply.status = 0;
2055         }
2056
2057         client_send_control(req, header, &reply);
2058 }
2059
2060 static void control_get_tunable(TALLOC_CTX *mem_ctx,
2061                                 struct tevent_req *req,
2062                                 struct ctdb_req_header *header,
2063                                 struct ctdb_req_control *request)
2064 {
2065         struct client_state *state = tevent_req_data(
2066                 req, struct client_state);
2067         struct ctdbd_context *ctdb = state->ctdb;
2068         struct ctdb_reply_control reply;
2069         uint32_t value;
2070         bool ret;
2071
2072         reply.rdata.opcode = request->opcode;
2073         reply.errmsg = NULL;
2074
2075         ret = ctdb_tunable_get_value(&ctdb->tun_list,
2076                                      request->rdata.data.tun_var, &value);
2077         if (! ret) {
2078                 reply.status = -1;
2079         } else {
2080                 reply.rdata.data.tun_value = value;
2081                 reply.status = 0;
2082         }
2083
2084         client_send_control(req, header, &reply);
2085 }
2086
2087 static void control_list_tunables(TALLOC_CTX *mem_ctx,
2088                                   struct tevent_req *req,
2089                                   struct ctdb_req_header *header,
2090                                   struct ctdb_req_control *request)
2091 {
2092         struct ctdb_reply_control reply;
2093         struct ctdb_var_list *var_list;
2094
2095         reply.rdata.opcode = request->opcode;
2096         reply.errmsg = NULL;
2097
2098         var_list = ctdb_tunable_names(mem_ctx);
2099         if (var_list == NULL) {
2100                 reply.status = -1;
2101         } else {
2102                 reply.rdata.data.tun_var_list = var_list;
2103                 reply.status = 0;
2104         }
2105
2106         client_send_control(req, header, &reply);
2107 }
2108
2109 static void control_modify_flags(TALLOC_CTX *mem_ctx,
2110                                  struct tevent_req *req,
2111                                  struct ctdb_req_header *header,
2112                                  struct ctdb_req_control *request)
2113 {
2114         struct client_state *state = tevent_req_data(
2115                 req, struct client_state);
2116         struct ctdbd_context *ctdb = state->ctdb;
2117         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
2118         struct ctdb_reply_control reply;
2119         struct node *node;
2120
2121         reply.rdata.opcode = request->opcode;
2122
2123         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
2124             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2125                 DEBUG(DEBUG_INFO,
2126                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
2127                 reply.status = EINVAL;
2128                 reply.errmsg = "Failed to MODIFY_FLAGS";
2129                 client_send_control(req, header, &reply);
2130                 return;
2131         }
2132
2133         /* There's all sorts of broadcast weirdness here.  Only change
2134          * the specified node, not the destination node of the
2135          * control. */
2136         node = &ctdb->node_map->node[change->pnn];
2137
2138         if ((node->flags &
2139              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
2140             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2141                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
2142                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
2143                 goto done;
2144         }
2145
2146         if ((node->flags &
2147              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
2148             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
2149                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
2150                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
2151                 goto done;
2152         }
2153
2154         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
2155
2156 done:
2157         reply.status = 0;
2158         reply.errmsg = NULL;
2159         client_send_control(req, header, &reply);
2160 }
2161
2162 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
2163                                      struct tevent_req *req,
2164                                      struct ctdb_req_header *header,
2165                                      struct ctdb_req_control *request)
2166 {
2167         struct client_state *state = tevent_req_data(
2168                 req, struct client_state);
2169         struct ctdbd_context *ctdb = state->ctdb;
2170         struct ctdb_reply_control reply;
2171
2172         reply.rdata.opcode = request->opcode;
2173         reply.rdata.data.tun_list = &ctdb->tun_list;
2174         reply.status = 0;
2175         reply.errmsg = NULL;
2176
2177         client_send_control(req, header, &reply);
2178 }
2179
2180 static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
2181                                          struct tevent_req *req,
2182                                          struct ctdb_req_header *header,
2183                                          struct ctdb_req_control *request)
2184 {
2185         struct client_state *state = tevent_req_data(
2186                 req, struct client_state);
2187         struct ctdbd_context *ctdb = state->ctdb;
2188         struct ctdb_reply_control reply;
2189         struct database *db;
2190
2191         reply.rdata.opcode = request->opcode;
2192
2193         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2194                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2195                         goto done;
2196                 }
2197         }
2198
2199         db = database_new(ctdb->db_map, request->rdata.data.db_name,
2200                           CTDB_DB_FLAGS_PERSISTENT);
2201         if (db == NULL) {
2202                 reply.status = -1;
2203                 reply.errmsg = "Failed to attach database";
2204                 client_send_control(req, header, &reply);
2205                 return;
2206         }
2207
2208 done:
2209         reply.rdata.data.db_id = db->id;
2210         reply.status = 0;
2211         reply.errmsg = NULL;
2212         client_send_control(req, header, &reply);
2213 }
2214
2215 static void control_uptime(TALLOC_CTX *mem_ctx,
2216                            struct tevent_req *req,
2217                            struct ctdb_req_header *header,
2218                            struct ctdb_req_control *request)
2219 {
2220         struct client_state *state = tevent_req_data(
2221                 req, struct client_state);
2222         struct ctdbd_context *ctdb = state->ctdb;
2223         struct ctdb_reply_control reply;
2224         struct ctdb_uptime *uptime;;
2225
2226         reply.rdata.opcode = request->opcode;
2227
2228         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
2229         if (uptime == NULL) {
2230                 goto fail;
2231         }
2232
2233         uptime->current_time = tevent_timeval_current();
2234         uptime->ctdbd_start_time = ctdb->start_time;
2235         uptime->last_recovery_started = ctdb->recovery_start_time;
2236         uptime->last_recovery_finished = ctdb->recovery_end_time;
2237
2238         reply.rdata.data.uptime = uptime;
2239         reply.status = 0;
2240         reply.errmsg = NULL;
2241         client_send_control(req, header, &reply);
2242         return;
2243
2244 fail:
2245         reply.status = -1;
2246         reply.errmsg = "Memory error";
2247         client_send_control(req, header, &reply);
2248 }
2249
2250 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
2251                                       struct tevent_req *req,
2252                                       struct ctdb_req_header *header,
2253                                       struct ctdb_req_control *request)
2254 {
2255         struct client_state *state = tevent_req_data(
2256                 req, struct client_state);
2257         struct ctdbd_context *ctdb = state->ctdb;
2258         struct ctdb_reply_control reply;
2259         struct ctdb_node_map *nodemap;
2260         struct node_map *node_map = ctdb->node_map;
2261         int i;
2262
2263         reply.rdata.opcode = request->opcode;
2264
2265         nodemap = read_nodes_file(mem_ctx, header->destnode);
2266         if (nodemap == NULL) {
2267                 goto fail;
2268         }
2269
2270         for (i=0; i<nodemap->num; i++) {
2271                 struct node *node;
2272
2273                 if (i < node_map->num_nodes &&
2274                     ctdb_sock_addr_same(&nodemap->node[i].addr,
2275                                         &node_map->node[i].addr)) {
2276                         continue;
2277                 }
2278
2279                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2280                         int ret;
2281
2282                         node = &node_map->node[i];
2283
2284                         node->flags |= NODE_FLAGS_DELETED;
2285                         ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
2286                                                          false);
2287                         if (ret != 0) {
2288                                 /* Can't happen, but Coverity... */
2289                                 goto fail;
2290                         }
2291
2292                         continue;
2293                 }
2294
2295                 if (i < node_map->num_nodes &&
2296                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
2297                         node = &node_map->node[i];
2298
2299                         node->flags &= ~NODE_FLAGS_DELETED;
2300                         node->addr = nodemap->node[i].addr;
2301
2302                         continue;
2303                 }
2304
2305                 node_map->node = talloc_realloc(node_map, node_map->node,
2306                                                 struct node,
2307                                                 node_map->num_nodes+1);
2308                 if (node_map->node == NULL) {
2309                         goto fail;
2310                 }
2311                 node = &node_map->node[node_map->num_nodes];
2312
2313                 node->addr = nodemap->node[i].addr;
2314                 node->pnn = nodemap->node[i].pnn;
2315                 node->flags = 0;
2316                 node->capabilities = CTDB_CAP_DEFAULT;
2317                 node->recovery_disabled = false;
2318                 node->recovery_substate = NULL;
2319
2320                 node_map->num_nodes += 1;
2321         }
2322
2323         talloc_free(nodemap);
2324
2325         reply.status = 0;
2326         reply.errmsg = NULL;
2327         client_send_control(req, header, &reply);
2328         return;
2329
2330 fail:
2331         reply.status = -1;
2332         reply.errmsg = "Memory error";
2333         client_send_control(req, header, &reply);
2334 }
2335
2336 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2337                                      struct tevent_req *req,
2338                                      struct ctdb_req_header *header,
2339                                      struct ctdb_req_control *request)
2340 {
2341         struct client_state *state = tevent_req_data(
2342                 req, struct client_state);
2343         struct ctdbd_context *ctdb = state->ctdb;
2344         struct ctdb_reply_control reply;
2345         struct node *node;
2346         uint32_t caps = 0;
2347
2348         reply.rdata.opcode = request->opcode;
2349
2350         node = &ctdb->node_map->node[header->destnode];
2351         caps = node->capabilities;
2352
2353         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2354                 /* Don't send reply */
2355                 return;
2356         }
2357
2358         reply.rdata.data.caps = caps;
2359         reply.status = 0;
2360         reply.errmsg = NULL;
2361
2362         client_send_control(req, header, &reply);
2363 }
2364
2365 static void control_release_ip(TALLOC_CTX *mem_ctx,
2366                                struct tevent_req *req,
2367                                struct ctdb_req_header *header,
2368                                struct ctdb_req_control *request)
2369 {
2370         struct client_state *state = tevent_req_data(
2371                 req, struct client_state);
2372         struct ctdbd_context *ctdb = state->ctdb;
2373         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2374         struct ctdb_reply_control reply;
2375         struct ctdb_public_ip_list *ips = NULL;
2376         struct ctdb_public_ip *t = NULL;
2377         int i;
2378
2379         reply.rdata.opcode = request->opcode;
2380
2381         if (ctdb->known_ips == NULL) {
2382                 D_INFO("RELEASE_IP %s - not a public IP\n",
2383                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2384                 goto done;
2385         }
2386
2387         ips = &ctdb->known_ips[header->destnode];
2388
2389         t = NULL;
2390         for (i = 0; i < ips->num; i++) {
2391                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2392                         t = &ips->ip[i];
2393                         break;
2394                 }
2395         }
2396         if (t == NULL) {
2397                 D_INFO("RELEASE_IP %s - not a public IP\n",
2398                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2399                 goto done;
2400         }
2401
2402         if (t->pnn != header->destnode) {
2403                 if (header->destnode == ip->pnn) {
2404                         D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2405                               ctdb_sock_addr_to_string(mem_ctx,
2406                                                        &ip->addr, false),
2407                               ip->pnn);
2408                         reply.status = -1;
2409                         reply.errmsg = "RELEASE_IP to TAKE_IP node";
2410                         client_send_control(req, header, &reply);
2411                         return;
2412                 }
2413
2414                 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2415                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2416                        ip->pnn);
2417                 t->pnn = ip->pnn;
2418         } else {
2419                 D_NOTICE("RELEASE_IP %s - to node %d\n",
2420                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2421                           ip->pnn);
2422                 t->pnn = ip->pnn;
2423         }
2424
2425 done:
2426         reply.status = 0;
2427         reply.errmsg = NULL;
2428         client_send_control(req, header, &reply);
2429 }
2430
2431 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2432                                 struct tevent_req *req,
2433                                 struct ctdb_req_header *header,
2434                                 struct ctdb_req_control *request)
2435 {
2436         struct client_state *state = tevent_req_data(
2437                 req, struct client_state);
2438         struct ctdbd_context *ctdb = state->ctdb;
2439         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2440         struct ctdb_reply_control reply;
2441         struct ctdb_public_ip_list *ips = NULL;
2442         struct ctdb_public_ip *t = NULL;
2443         int i;
2444
2445         reply.rdata.opcode = request->opcode;
2446
2447         if (ctdb->known_ips == NULL) {
2448                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2449                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2450                 goto done;
2451         }
2452
2453         ips = &ctdb->known_ips[header->destnode];
2454
2455         t = NULL;
2456         for (i = 0; i < ips->num; i++) {
2457                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2458                         t = &ips->ip[i];
2459                         break;
2460                 }
2461         }
2462         if (t == NULL) {
2463                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2464                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2465                 goto done;
2466         }
2467
2468         if (t->pnn == header->destnode) {
2469                 D_INFO("TAKEOVER_IP %s - redundant\n",
2470                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2471         } else {
2472                 D_NOTICE("TAKEOVER_IP %s\n",
2473                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2474                 t->pnn = ip->pnn;
2475         }
2476
2477 done:
2478         reply.status = 0;
2479         reply.errmsg = NULL;
2480         client_send_control(req, header, &reply);
2481 }
2482
2483 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2484                                    struct tevent_req *req,
2485                                    struct ctdb_req_header *header,
2486                                    struct ctdb_req_control *request)
2487 {
2488         struct client_state *state = tevent_req_data(
2489                 req, struct client_state);
2490         struct ctdbd_context *ctdb = state->ctdb;
2491         struct ctdb_reply_control reply;
2492         struct ctdb_public_ip_list *ips = NULL;
2493
2494         reply.rdata.opcode = request->opcode;
2495
2496         if (ctdb->known_ips == NULL) {
2497                 /* No IPs defined so create a dummy empty struct and ship it */
2498                 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2499                 if (ips == NULL) {
2500                         reply.status = ENOMEM;
2501                         reply.errmsg = "Memory error";
2502                         goto done;
2503                 }
2504                 goto ok;
2505         }
2506
2507         ips = &ctdb->known_ips[header->destnode];
2508
2509         if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2510                 /* If runstate is not RUNNING or a node is then return
2511                  * no available IPs.  Don't worry about interface
2512                  * states here - we're not faking down to that level.
2513                  */
2514                 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING) {
2515                         /* No available IPs: return dummy empty struct */
2516                         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2517                         if (ips == NULL) {
2518                                 reply.status = ENOMEM;
2519                                 reply.errmsg = "Memory error";
2520                                 goto done;
2521                         }
2522                 }
2523         }
2524
2525 ok:
2526         reply.rdata.data.pubip_list = ips;
2527         reply.status = 0;
2528         reply.errmsg = NULL;
2529
2530 done:
2531         client_send_control(req, header, &reply);
2532 }
2533
2534 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2535                                 struct tevent_req *req,
2536                                 struct ctdb_req_header *header,
2537                                 struct ctdb_req_control *request)
2538 {
2539         struct client_state *state = tevent_req_data(
2540                 req, struct client_state);
2541         struct ctdbd_context *ctdb = state->ctdb;
2542         struct ctdb_reply_control reply;
2543         struct ctdb_node_map *nodemap;
2544         struct node *node;
2545         int i;
2546
2547         reply.rdata.opcode = request->opcode;
2548
2549         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2550         if (nodemap == NULL) {
2551                 goto fail;
2552         }
2553
2554         nodemap->num = ctdb->node_map->num_nodes;
2555         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2556                                      nodemap->num);
2557         if (nodemap->node == NULL) {
2558                 goto fail;
2559         }
2560
2561         for (i=0; i<nodemap->num; i++) {
2562                 node = &ctdb->node_map->node[i];
2563                 nodemap->node[i] = (struct ctdb_node_and_flags) {
2564                         .pnn = node->pnn,
2565                         .flags = node->flags,
2566                         .addr = node->addr,
2567                 };
2568         }
2569
2570         reply.rdata.data.nodemap = nodemap;
2571         reply.status = 0;
2572         reply.errmsg = NULL;
2573         client_send_control(req, header, &reply);
2574         return;
2575
2576 fail:
2577         reply.status = -1;
2578         reply.errmsg = "Memory error";
2579         client_send_control(req, header, &reply);
2580 }
2581
2582 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2583                                      struct tevent_req *req,
2584                                      struct ctdb_req_header *header,
2585                                      struct ctdb_req_control *request)
2586 {
2587         struct client_state *state = tevent_req_data(
2588                 req, struct client_state);
2589         struct ctdbd_context *ctdb = state->ctdb;
2590         struct ctdb_reply_control reply;
2591
2592         reply.rdata.opcode = request->opcode;
2593
2594         if (ctdb->reclock != NULL) {
2595                 reply.rdata.data.reclock_file =
2596                         talloc_strdup(mem_ctx, ctdb->reclock);
2597                 if (reply.rdata.data.reclock_file == NULL) {
2598                         reply.status = ENOMEM;
2599                         reply.errmsg = "Memory error";
2600                         goto done;
2601                 }
2602         } else {
2603                 reply.rdata.data.reclock_file = NULL;
2604         }
2605
2606         reply.status = 0;
2607         reply.errmsg = NULL;
2608
2609 done:
2610         client_send_control(req, header, &reply);
2611 }
2612
2613 static void control_stop_node(TALLOC_CTX *mem_ctx,
2614                               struct tevent_req *req,
2615                               struct ctdb_req_header *header,
2616                               struct ctdb_req_control *request)
2617 {
2618         struct client_state *state = tevent_req_data(
2619                 req, struct client_state);
2620         struct ctdbd_context *ctdb = state->ctdb;
2621         struct ctdb_reply_control reply;
2622
2623         reply.rdata.opcode = request->opcode;
2624
2625         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2626         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2627
2628         reply.status = 0;
2629         reply.errmsg = NULL;
2630
2631         client_send_control(req, header, &reply);
2632         return;
2633 }
2634
2635 static void control_continue_node(TALLOC_CTX *mem_ctx,
2636                                   struct tevent_req *req,
2637                                   struct ctdb_req_header *header,
2638                                   struct ctdb_req_control *request)
2639 {
2640         struct client_state *state = tevent_req_data(
2641                 req, struct client_state);
2642         struct ctdbd_context *ctdb = state->ctdb;
2643         struct ctdb_reply_control reply;
2644
2645         reply.rdata.opcode = request->opcode;
2646
2647         DEBUG(DEBUG_INFO, ("Continue node\n"));
2648         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2649
2650         reply.status = 0;
2651         reply.errmsg = NULL;
2652
2653         client_send_control(req, header, &reply);
2654         return;
2655 }
2656
2657 static void set_ban_state_callback(struct tevent_req *subreq)
2658 {
2659         struct node *node = tevent_req_callback_data(
2660                 subreq, struct node);
2661         bool status;
2662
2663         status = tevent_wakeup_recv(subreq);
2664         TALLOC_FREE(subreq);
2665         if (! status) {
2666                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2667         }
2668
2669         node->flags &= ~NODE_FLAGS_BANNED;
2670 }
2671
2672 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2673                                   struct tevent_req *req,
2674                                   struct ctdb_req_header *header,
2675                                   struct ctdb_req_control *request)
2676 {
2677         struct client_state *state = tevent_req_data(
2678                 req, struct client_state);
2679         struct tevent_req *subreq;
2680         struct ctdbd_context *ctdb = state->ctdb;
2681         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2682         struct ctdb_reply_control reply;
2683         struct node *node;
2684
2685         reply.rdata.opcode = request->opcode;
2686
2687         if (ban->pnn != header->destnode) {
2688                 DEBUG(DEBUG_INFO,
2689                       ("SET_BAN_STATE control for PNN %d rejected\n",
2690                        ban->pnn));
2691                 reply.status = EINVAL;
2692                 goto fail;
2693         }
2694
2695         node = &ctdb->node_map->node[header->destnode];
2696
2697         if (ban->time == 0) {
2698                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2699                 node->flags &= ~NODE_FLAGS_BANNED;
2700                 goto done;
2701         }
2702
2703         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2704                                     tevent_timeval_current_ofs(
2705                                             ban->time, 0));
2706         if (subreq == NULL) {
2707                 reply.status = ENOMEM;
2708                 goto fail;
2709         }
2710         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2711
2712         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2713         node->flags |= NODE_FLAGS_BANNED;
2714         ctdb->vnn_map->generation = INVALID_GENERATION;
2715
2716 done:
2717         reply.status = 0;
2718         reply.errmsg = NULL;
2719
2720         client_send_control(req, header, &reply);
2721         return;
2722
2723 fail:
2724         reply.errmsg = "Failed to ban node";
2725 }
2726
2727 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
2728                                struct tevent_req *req,
2729                                struct ctdb_req_header *header,
2730                                struct ctdb_req_control *request)
2731 {
2732         struct client_state *state = tevent_req_data(
2733                 req, struct client_state);
2734         struct ctdbd_context *ctdb = state->ctdb;
2735         struct ctdb_reply_control reply;
2736         struct database *db;
2737         int ret;
2738
2739         reply.rdata.opcode = request->opcode;
2740
2741         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2742         if (db == NULL) {
2743                 reply.status = ENOENT;
2744                 reply.errmsg = "Database not found";
2745         } else {
2746                 uint64_t seqnum;
2747
2748                 ret = database_seqnum(db, &seqnum);
2749                 if (ret == 0) {
2750                         reply.rdata.data.seqnum = seqnum;
2751                         reply.status = 0;
2752                         reply.errmsg = NULL;
2753                 } else {
2754                         reply.status = ret;
2755                         reply.errmsg = "Failed to get seqnum";
2756                 }
2757         }
2758
2759         client_send_control(req, header, &reply);
2760 }
2761
2762 static void control_db_get_health(TALLOC_CTX *mem_ctx,
2763                                   struct tevent_req *req,
2764                                   struct ctdb_req_header *header,
2765                                   struct ctdb_req_control *request)
2766 {
2767         struct client_state *state = tevent_req_data(
2768                 req, struct client_state);
2769         struct ctdbd_context *ctdb = state->ctdb;
2770         struct ctdb_reply_control reply;
2771         struct database *db;
2772
2773         reply.rdata.opcode = request->opcode;
2774
2775         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2776         if (db == NULL) {
2777                 reply.status = ENOENT;
2778                 reply.errmsg = "Database not found";
2779         } else {
2780                 reply.rdata.data.reason = NULL;
2781                 reply.status = 0;
2782                 reply.errmsg = NULL;
2783         }
2784
2785         client_send_control(req, header, &reply);
2786 }
2787
2788 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
2789                                                    struct ctdbd_context *ctdb)
2790 {
2791         struct ctdb_iface_list *iface_list;
2792         struct interface *iface;
2793         int i;
2794
2795         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
2796         if (iface_list == NULL) {
2797                 goto done;
2798         }
2799
2800         iface_list->num = ctdb->iface_map->num;
2801         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
2802                                          iface_list->num);
2803         if (iface_list->iface == NULL) {
2804                 TALLOC_FREE(iface_list);
2805                 goto done;
2806         }
2807
2808         for (i=0; i<iface_list->num; i++) {
2809                 iface = &ctdb->iface_map->iface[i];
2810                 iface_list->iface[i] = (struct ctdb_iface) {
2811                         .link_state = iface->link_up,
2812                         .references = iface->references,
2813                 };
2814                 strlcpy(iface_list->iface[i].name, iface->name,
2815                         sizeof(iface_list->iface[i].name));
2816         }
2817
2818 done:
2819         return iface_list;
2820 }
2821
2822 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
2823                                        struct tevent_req *req,
2824                                        struct ctdb_req_header *header,
2825                                        struct ctdb_req_control *request)
2826 {
2827         struct client_state *state = tevent_req_data(
2828                 req, struct client_state);
2829         struct ctdbd_context *ctdb = state->ctdb;
2830         struct ctdb_reply_control reply;
2831         ctdb_sock_addr *addr = request->rdata.data.addr;
2832         struct ctdb_public_ip_list *known = NULL;
2833         struct ctdb_public_ip_info *info = NULL;
2834         unsigned i;
2835
2836         reply.rdata.opcode = request->opcode;
2837
2838         info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
2839         if (info == NULL) {
2840                 reply.status = ENOMEM;
2841                 reply.errmsg = "Memory error";
2842                 goto done;
2843         }
2844
2845         reply.rdata.data.ipinfo = info;
2846
2847         if (ctdb->known_ips != NULL) {
2848                 known = &ctdb->known_ips[header->destnode];
2849         } else {
2850                 /* No IPs defined so create a dummy empty struct and
2851                  * fall through.  The given IP won't be matched
2852                  * below...
2853                  */
2854                 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2855                 if (known == NULL) {
2856                         reply.status = ENOMEM;
2857                         reply.errmsg = "Memory error";
2858                         goto done;
2859                 }
2860         }
2861
2862         for (i = 0; i < known->num; i++) {
2863                 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
2864                                            addr)) {
2865                         break;
2866                 }
2867         }
2868
2869         if (i == known->num) {
2870                 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
2871                       ctdb_sock_addr_to_string(mem_ctx, addr, false));
2872                 reply.status = -1;
2873                 reply.errmsg = "Unknown address";
2874                 goto done;
2875         }
2876
2877         info->ip = known->ip[i];
2878
2879         /* The fake PUBLICIPS stanza and resulting known_ips data
2880          * don't know anything about interfaces, so completely fake
2881          * this.
2882          */
2883         info->active_idx = 0;
2884
2885         info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
2886         if (info->ifaces == NULL) {
2887                 reply.status = ENOMEM;
2888                 reply.errmsg = "Memory error";
2889                 goto done;
2890         }
2891
2892         reply.status = 0;
2893         reply.errmsg = NULL;
2894
2895 done:
2896         client_send_control(req, header, &reply);
2897 }
2898
2899 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
2900                                struct tevent_req *req,
2901                                struct ctdb_req_header *header,
2902                                struct ctdb_req_control *request)
2903 {
2904         struct client_state *state = tevent_req_data(
2905                 req, struct client_state);
2906         struct ctdbd_context *ctdb = state->ctdb;
2907         struct ctdb_reply_control reply;
2908         struct ctdb_iface_list *iface_list;
2909
2910         reply.rdata.opcode = request->opcode;
2911
2912         iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
2913         if (iface_list == NULL) {
2914                 goto fail;
2915         }
2916
2917         reply.rdata.data.iface_list = iface_list;
2918         reply.status = 0;
2919         reply.errmsg = NULL;
2920         client_send_control(req, header, &reply);
2921         return;
2922
2923 fail:
2924         reply.status = -1;
2925         reply.errmsg = "Memory error";
2926         client_send_control(req, header, &reply);
2927 }
2928
2929 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
2930                                          struct tevent_req *req,
2931                                          struct ctdb_req_header *header,
2932                                          struct ctdb_req_control *request)
2933 {
2934         struct client_state *state = tevent_req_data(
2935                 req, struct client_state);
2936         struct ctdbd_context *ctdb = state->ctdb;
2937         struct ctdb_reply_control reply;
2938         struct ctdb_iface *in_iface;
2939         struct interface *iface = NULL;
2940         bool link_up = false;
2941         int i;
2942
2943         reply.rdata.opcode = request->opcode;
2944
2945         in_iface = request->rdata.data.iface;
2946
2947         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
2948                 reply.errmsg = "interface name not terminated";
2949                 goto fail;
2950         }
2951
2952         switch (in_iface->link_state) {
2953                 case 0:
2954                         link_up = false;
2955                         break;
2956
2957                 case 1:
2958                         link_up = true;
2959                         break;
2960
2961                 default:
2962                         reply.errmsg = "invalid link state";
2963                         goto fail;
2964         }
2965
2966         if (in_iface->references != 0) {
2967                 reply.errmsg = "references should be 0";
2968                 goto fail;
2969         }
2970
2971         for (i=0; i<ctdb->iface_map->num; i++) {
2972                 if (strcmp(ctdb->iface_map->iface[i].name,
2973                            in_iface->name) == 0) {
2974                         iface = &ctdb->iface_map->iface[i];
2975                         break;
2976                 }
2977         }
2978
2979         if (iface == NULL) {
2980                 reply.errmsg = "interface not found";
2981                 goto fail;
2982         }
2983
2984         iface->link_up = link_up;
2985
2986         reply.status = 0;
2987         reply.errmsg = NULL;
2988         client_send_control(req, header, &reply);
2989         return;
2990
2991 fail:
2992         reply.status = -1;
2993         client_send_control(req, header, &reply);
2994 }
2995
2996 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
2997                                     struct tevent_req *req,
2998                                     struct ctdb_req_header *header,
2999                                     struct ctdb_req_control *request)
3000 {
3001         struct client_state *state = tevent_req_data(
3002                 req, struct client_state);
3003         struct ctdbd_context *ctdb = state->ctdb;
3004         struct ctdb_reply_control reply;
3005         struct database *db;
3006
3007         reply.rdata.opcode = request->opcode;
3008
3009         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3010         if (db == NULL) {
3011                 reply.status = ENOENT;
3012                 reply.errmsg = "Database not found";
3013                 goto done;
3014         }
3015
3016         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3017                 reply.status = EINVAL;
3018                 reply.errmsg = "Can not set READONLY on persistent db";
3019                 goto done;
3020         }
3021
3022         db->flags |= CTDB_DB_FLAGS_READONLY;
3023         reply.status = 0;
3024         reply.errmsg = NULL;
3025
3026 done:
3027         client_send_control(req, header, &reply);
3028 }
3029
3030 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
3031                                     struct tevent_req *req,
3032                                     struct ctdb_req_header *header,
3033                                     struct ctdb_req_control *request)
3034 {
3035         struct client_state *state = tevent_req_data(
3036                 req, struct client_state);
3037         struct ctdbd_context *ctdb = state->ctdb;
3038         struct ctdb_reply_control reply;
3039         struct database *db;
3040
3041         reply.rdata.opcode = request->opcode;
3042
3043         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3044         if (db == NULL) {
3045                 reply.status = ENOENT;
3046                 reply.errmsg = "Database not found";
3047                 goto done;
3048         }
3049
3050         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3051                 reply.status = EINVAL;
3052                 reply.errmsg = "Can not set STICKY on persistent db";
3053                 goto done;
3054         }
3055
3056         db->flags |= CTDB_DB_FLAGS_STICKY;
3057         reply.status = 0;
3058         reply.errmsg = NULL;
3059
3060 done:
3061         client_send_control(req, header, &reply);
3062 }
3063
3064 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
3065                                   struct tevent_req *req,
3066                                   struct ctdb_req_header *header,
3067                                   struct ctdb_req_control *request)
3068 {
3069         struct ctdb_reply_control reply;
3070
3071         /* Always succeed */
3072         reply.rdata.opcode = request->opcode;
3073         reply.status = 0;
3074         reply.errmsg = NULL;
3075
3076         client_send_control(req, header, &reply);
3077 }
3078
3079 static void control_get_runstate(TALLOC_CTX *mem_ctx,
3080                                  struct tevent_req *req,
3081                                  struct ctdb_req_header *header,
3082                                  struct ctdb_req_control *request)
3083 {
3084         struct client_state *state = tevent_req_data(
3085                 req, struct client_state);
3086         struct ctdbd_context *ctdb = state->ctdb;
3087         struct ctdb_reply_control reply;
3088
3089         reply.rdata.opcode = request->opcode;
3090         reply.rdata.data.runstate = ctdb->runstate;
3091         reply.status = 0;
3092         reply.errmsg = NULL;
3093
3094         client_send_control(req, header, &reply);
3095 }
3096
3097 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
3098                                    struct tevent_req *req,
3099                                    struct ctdb_req_header *header,
3100                                    struct ctdb_req_control *request)
3101 {
3102         struct ctdb_reply_control reply;
3103         struct ctdb_node_map *nodemap;
3104
3105         reply.rdata.opcode = request->opcode;
3106
3107         nodemap = read_nodes_file(mem_ctx, header->destnode);
3108         if (nodemap == NULL) {
3109                 goto fail;
3110         }
3111
3112         reply.rdata.data.nodemap = nodemap;
3113         reply.status = 0;
3114         reply.errmsg = NULL;
3115         client_send_control(req, header, &reply);
3116         return;
3117
3118 fail:
3119         reply.status = -1;
3120         reply.errmsg = "Failed to read nodes file";
3121         client_send_control(req, header, &reply);
3122 }
3123
3124 static void control_db_open_flags(TALLOC_CTX *mem_ctx,
3125                                   struct tevent_req *req,
3126                                   struct ctdb_req_header *header,
3127                                   struct ctdb_req_control *request)
3128 {
3129         struct client_state *state = tevent_req_data(
3130                 req, struct client_state);
3131         struct ctdbd_context *ctdb = state->ctdb;
3132         struct ctdb_reply_control reply;
3133         struct database *db;
3134
3135         reply.rdata.opcode = request->opcode;
3136
3137         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3138         if (db == NULL) {
3139                 reply.status = ENOENT;
3140                 reply.errmsg = "Database not found";
3141         } else {
3142                 reply.rdata.data.tdb_flags = database_flags(db->flags);
3143                 reply.status = 0;
3144                 reply.errmsg = NULL;
3145         }
3146
3147         client_send_control(req, header, &reply);
3148 }
3149
3150 static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
3151                                          struct tevent_req *req,
3152                                          struct ctdb_req_header *header,
3153                                          struct ctdb_req_control *request)
3154 {
3155         struct client_state *state = tevent_req_data(
3156                 req, struct client_state);
3157         struct ctdbd_context *ctdb = state->ctdb;
3158         struct ctdb_reply_control reply;
3159         struct database *db;
3160
3161         reply.rdata.opcode = request->opcode;
3162
3163         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
3164                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
3165                         goto done;
3166                 }
3167         }
3168
3169         db = database_new(ctdb->db_map, request->rdata.data.db_name,
3170                           CTDB_DB_FLAGS_REPLICATED);
3171         if (db == NULL) {
3172                 reply.status = -1;
3173                 reply.errmsg = "Failed to attach database";
3174                 client_send_control(req, header, &reply);
3175                 return;
3176         }
3177
3178 done:
3179         reply.rdata.data.db_id = db->id;
3180         reply.status = 0;
3181         reply.errmsg = NULL;
3182         client_send_control(req, header, &reply);
3183 }
3184
3185 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
3186                                     struct tevent_req *req,
3187                                     struct ctdb_req_header *header,
3188                                     struct ctdb_req_control *request)
3189 {
3190         struct client_state *state = tevent_req_data(
3191                 req, struct client_state);
3192         struct ctdbd_context *ctdb = state->ctdb;
3193         struct ctdb_client *client;
3194         struct client_state *cstate;
3195         struct ctdb_reply_control reply;
3196         bool pid_found, srvid_found;
3197         int ret;
3198
3199         reply.rdata.opcode = request->opcode;
3200
3201         pid_found = false;
3202         srvid_found = false;
3203
3204         for (client=ctdb->client_list; client != NULL; client=client->next) {
3205                 if (client->pid == request->rdata.data.pid_srvid->pid) {
3206                         pid_found = true;
3207                         cstate = (struct client_state *)client->state;
3208                         ret = srvid_exists(ctdb->srv,
3209                                            request->rdata.data.pid_srvid->srvid,
3210                                            cstate);
3211                         if (ret == 0) {
3212                                 srvid_found = true;
3213                                 ret = kill(cstate->pid, 0);
3214                                 if (ret != 0) {
3215                                         reply.status = ret;
3216                                         reply.errmsg = strerror(errno);
3217                                 } else {
3218                                         reply.status = 0;
3219                                         reply.errmsg = NULL;
3220                                 }
3221                         }
3222                 }
3223         }
3224
3225         if (! pid_found) {
3226                 reply.status = -1;
3227                 reply.errmsg = "No client for PID";
3228         } else if (! srvid_found) {
3229                 reply.status = -1;
3230                 reply.errmsg = "No client for PID and SRVID";
3231         }
3232
3233         client_send_control(req, header, &reply);
3234 }
3235
3236 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
3237                                  struct tevent_req *req,
3238                                  struct ctdb_req_header *header,
3239                                  struct ctdb_req_control *request)
3240 {
3241         struct client_state *state = tevent_req_data(
3242                 req, struct client_state);
3243         struct ctdbd_context *ctdb = state->ctdb;
3244         struct ctdb_reply_control reply;
3245         struct fake_control_failure *f = NULL;
3246
3247         D_DEBUG("Checking fake control failure for control %u on node %u\n",
3248                 request->opcode, header->destnode);
3249         for (f = ctdb->control_failures; f != NULL; f = f->next) {
3250                 if (f->opcode == request->opcode &&
3251                     (f->pnn == header->destnode ||
3252                      f->pnn == CTDB_UNKNOWN_PNN)) {
3253
3254                         reply.rdata.opcode = request->opcode;
3255                         if (strcmp(f->error, "TIMEOUT") == 0) {
3256                                 /* Causes no reply */
3257                                 D_ERR("Control %u fake timeout on node %u\n",
3258                                       request->opcode, header->destnode);
3259                                 return true;
3260                         } else if (strcmp(f->error, "ERROR") == 0) {
3261                                 D_ERR("Control %u fake error on node %u\n",
3262                                       request->opcode, header->destnode);
3263                                 reply.status = -1;
3264                                 reply.errmsg = f->comment;
3265                                 client_send_control(req, header, &reply);
3266                                 return true;
3267                         }
3268                 }
3269         }
3270
3271         return false;
3272 }
3273
3274 static void control_error(TALLOC_CTX *mem_ctx,
3275                           struct tevent_req *req,
3276                           struct ctdb_req_header *header,
3277                           struct ctdb_req_control *request)
3278 {
3279         struct ctdb_reply_control reply;
3280
3281         reply.rdata.opcode = request->opcode;
3282         reply.status = -1;
3283         reply.errmsg = "Not implemented";
3284
3285         client_send_control(req, header, &reply);
3286 }
3287
3288 /*
3289  * Handling protocol - messages
3290  */
3291
3292 struct disable_recoveries_state {
3293         struct node *node;
3294 };
3295
3296 static void disable_recoveries_callback(struct tevent_req *subreq)
3297 {
3298         struct disable_recoveries_state *substate = tevent_req_callback_data(
3299                 subreq, struct disable_recoveries_state);
3300         bool status;
3301
3302         status = tevent_wakeup_recv(subreq);
3303         TALLOC_FREE(subreq);
3304         if (! status) {
3305                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
3306         }
3307
3308         substate->node->recovery_disabled = false;
3309         TALLOC_FREE(substate->node->recovery_substate);
3310 }
3311
3312 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
3313                                        struct tevent_req *req,
3314                                        struct ctdb_req_header *header,
3315                                        struct ctdb_req_message *request)
3316 {
3317         struct client_state *state = tevent_req_data(
3318                 req, struct client_state);
3319         struct tevent_req *subreq;
3320         struct ctdbd_context *ctdb = state->ctdb;
3321         struct disable_recoveries_state *substate;
3322         struct ctdb_disable_message *disable = request->data.disable;
3323         struct ctdb_req_message_data reply;
3324         struct node *node;
3325         int ret = -1;
3326         TDB_DATA data;
3327
3328         node = &ctdb->node_map->node[header->destnode];
3329
3330         if (disable->timeout == 0) {
3331                 TALLOC_FREE(node->recovery_substate);
3332                 node->recovery_disabled = false;
3333                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
3334                                    header->destnode));
3335                 goto done;
3336         }
3337
3338         substate = talloc_zero(ctdb->node_map,
3339                                struct disable_recoveries_state);
3340         if (substate == NULL) {
3341                 goto fail;
3342         }
3343
3344         substate->node = node;
3345
3346         subreq = tevent_wakeup_send(substate, state->ev,
3347                                     tevent_timeval_current_ofs(
3348                                             disable->timeout, 0));
3349         if (subreq == NULL) {
3350                 talloc_free(substate);
3351                 goto fail;
3352         }
3353         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
3354
3355         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
3356                            disable->timeout, header->destnode));
3357         node->recovery_substate = substate;
3358         node->recovery_disabled = true;
3359
3360 done:
3361         ret = header->destnode;
3362
3363 fail:
3364         reply.srvid = disable->srvid;
3365         data.dptr = (uint8_t *)&ret;
3366         data.dsize = sizeof(int);
3367         reply.data = data;
3368
3369         client_send_message(req, header, &reply);
3370 }
3371
3372 static void message_takeover_run(TALLOC_CTX *mem_ctx,
3373                                  struct tevent_req *req,
3374                                  struct ctdb_req_header *header,
3375                                  struct ctdb_req_message *request)
3376 {
3377         struct client_state *state = tevent_req_data(
3378                 req, struct client_state);
3379         struct ctdbd_context *ctdb = state->ctdb;
3380         struct ctdb_srvid_message *srvid = request->data.msg;
3381         struct ctdb_req_message_data reply;
3382         int ret = -1;
3383         TDB_DATA data;
3384
3385         if (header->destnode != ctdb->node_map->recmaster) {
3386                 /* No reply! Only recmaster replies... */
3387                 return;
3388         }
3389
3390         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
3391                            header->destnode));
3392         ret = header->destnode;
3393
3394         reply.srvid = srvid->srvid;
3395         data.dptr = (uint8_t *)&ret;
3396         data.dsize = sizeof(int);
3397         reply.data = data;
3398
3399         client_send_message(req, header, &reply);
3400 }
3401
3402 /*
3403  * Handle a single client
3404  */
3405
3406 static void client_read_handler(uint8_t *buf, size_t buflen,
3407                                 void *private_data);
3408 static void client_dead_handler(void *private_data);
3409 static void client_process_packet(struct tevent_req *req,
3410                                   uint8_t *buf, size_t buflen);
3411 static void client_process_message(struct tevent_req *req,
3412                                    uint8_t *buf, size_t buflen);
3413 static void client_process_control(struct tevent_req *req,
3414                                    uint8_t *buf, size_t buflen);
3415 static void client_reply_done(struct tevent_req *subreq);
3416
3417 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3418                                       struct tevent_context *ev,
3419                                       int fd, struct ctdbd_context *ctdb,
3420                                       int pnn)
3421 {
3422         struct tevent_req *req;
3423         struct client_state *state;
3424         struct ucred cr;
3425         socklen_t crl = sizeof(struct ucred);
3426         int ret;
3427
3428         req = tevent_req_create(mem_ctx, &state, struct client_state);
3429         if (req == NULL) {
3430                 return NULL;
3431         }
3432
3433         state->ev = ev;
3434         state->fd = fd;
3435         state->ctdb = ctdb;
3436         state->pnn = pnn;
3437
3438         ret = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl);
3439         if (ret != 0) {
3440                 tevent_req_error(req, ret);
3441                 return tevent_req_post(req, ev);
3442         }
3443         state->pid = cr.pid;
3444
3445         ret = comm_setup(state, ev, fd, client_read_handler, req,
3446                          client_dead_handler, req, &state->comm);
3447         if (ret != 0) {
3448                 tevent_req_error(req, ret);
3449                 return tevent_req_post(req, ev);
3450         }
3451
3452         ret = client_add(ctdb, state->pid, state);
3453         if (ret != 0) {
3454                 tevent_req_error(req, ret);
3455                 return tevent_req_post(req, ev);
3456         }
3457
3458         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
3459
3460         return req;
3461 }
3462
3463 static void client_read_handler(uint8_t *buf, size_t buflen,
3464                                 void *private_data)
3465 {
3466         struct tevent_req *req = talloc_get_type_abort(
3467                 private_data, struct tevent_req);
3468         struct client_state *state = tevent_req_data(
3469                 req, struct client_state);
3470         struct ctdbd_context *ctdb = state->ctdb;
3471         struct ctdb_req_header header;
3472         size_t np;
3473         int ret, i;
3474
3475         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3476         if (ret != 0) {
3477                 return;
3478         }
3479
3480         if (buflen != header.length) {
3481                 return;
3482         }
3483
3484         ret = ctdb_req_header_verify(&header, 0);
3485         if (ret != 0) {
3486                 return;
3487         }
3488
3489         header_fix_pnn(&header, ctdb);
3490
3491         if (header.destnode == CTDB_BROADCAST_ALL) {
3492                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3493                         header.destnode = i;
3494
3495                         ctdb_req_header_push(&header, buf, &np);
3496                         client_process_packet(req, buf, buflen);
3497                 }
3498                 return;
3499         }
3500
3501         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
3502                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3503                         if (ctdb->node_map->node[i].flags &
3504                             NODE_FLAGS_DISCONNECTED) {
3505                                 continue;
3506                         }
3507
3508                         header.destnode = i;
3509
3510                         ctdb_req_header_push(&header, buf, &np);
3511                         client_process_packet(req, buf, buflen);
3512                 }
3513                 return;
3514         }
3515
3516         if (header.destnode > ctdb->node_map->num_nodes) {
3517                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
3518                         header.destnode);
3519                 return;
3520         }
3521
3522
3523         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
3524                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
3525                         header.destnode);
3526                 return;
3527         }
3528
3529         ctdb_req_header_push(&header, buf, &np);
3530         client_process_packet(req, buf, buflen);
3531 }
3532
3533 static void client_dead_handler(void *private_data)
3534 {
3535         struct tevent_req *req = talloc_get_type_abort(
3536                 private_data, struct tevent_req);
3537
3538         tevent_req_done(req);
3539 }
3540
3541 static void client_process_packet(struct tevent_req *req,
3542                                   uint8_t *buf, size_t buflen)
3543 {
3544         struct ctdb_req_header header;
3545         size_t np;
3546         int ret;
3547
3548         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3549         if (ret != 0) {
3550                 return;
3551         }
3552
3553         switch (header.operation) {
3554         case CTDB_REQ_MESSAGE:
3555                 client_process_message(req, buf, buflen);
3556                 break;
3557
3558         case CTDB_REQ_CONTROL:
3559                 client_process_control(req, buf, buflen);
3560                 break;
3561
3562         default:
3563                 break;
3564         }
3565 }
3566
3567 static void client_process_message(struct tevent_req *req,
3568                                    uint8_t *buf, size_t buflen)
3569 {
3570         struct client_state *state = tevent_req_data(
3571                 req, struct client_state);
3572         struct ctdbd_context *ctdb = state->ctdb;
3573         TALLOC_CTX *mem_ctx;
3574         struct ctdb_req_header header;
3575         struct ctdb_req_message request;
3576         uint64_t srvid;
3577         int ret;
3578
3579         mem_ctx = talloc_new(state);
3580         if (tevent_req_nomem(mem_ctx, req)) {
3581                 return;
3582         }
3583
3584         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
3585         if (ret != 0) {
3586                 talloc_free(mem_ctx);
3587                 tevent_req_error(req, ret);
3588                 return;
3589         }
3590
3591         header_fix_pnn(&header, ctdb);
3592
3593         if (header.destnode >= ctdb->node_map->num_nodes) {
3594                 /* Many messages are not replied to, so just behave as
3595                  * though this message was not received */
3596                 fprintf(stderr, "Invalid node %d\n", header.destnode);
3597                 talloc_free(mem_ctx);
3598                 return;
3599         }
3600
3601         srvid = request.srvid;
3602         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
3603
3604         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
3605                 message_disable_recoveries(mem_ctx, req, &header, &request);
3606         } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
3607                 message_takeover_run(mem_ctx, req, &header, &request);
3608         }
3609
3610         /* check srvid */
3611         talloc_free(mem_ctx);
3612 }
3613
3614 static void client_process_control(struct tevent_req *req,
3615                                    uint8_t *buf, size_t buflen)
3616 {
3617         struct client_state *state = tevent_req_data(
3618                 req, struct client_state);
3619         struct ctdbd_context *ctdb = state->ctdb;
3620         TALLOC_CTX *mem_ctx;
3621         struct ctdb_req_header header;
3622         struct ctdb_req_control request;
3623         int ret;
3624
3625         mem_ctx = talloc_new(state);
3626         if (tevent_req_nomem(mem_ctx, req)) {
3627                 return;
3628         }
3629
3630         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
3631         if (ret != 0) {
3632                 talloc_free(mem_ctx);
3633                 tevent_req_error(req, ret);
3634                 return;
3635         }
3636
3637         header_fix_pnn(&header, ctdb);
3638
3639         if (header.destnode >= ctdb->node_map->num_nodes) {
3640                 struct ctdb_reply_control reply;
3641
3642                 reply.rdata.opcode = request.opcode;
3643                 reply.errmsg = "Invalid node";
3644                 reply.status = -1;
3645                 client_send_control(req, &header, &reply);
3646                 return;
3647         }
3648
3649         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
3650                            request.opcode, header.reqid));
3651
3652         if (fake_control_failure(mem_ctx, req, &header, &request)) {
3653                 goto done;
3654         }
3655
3656         switch (request.opcode) {
3657         case CTDB_CONTROL_PROCESS_EXISTS:
3658                 control_process_exists(mem_ctx, req, &header, &request);
3659                 break;
3660
3661         case CTDB_CONTROL_PING:
3662                 control_ping(mem_ctx, req, &header, &request);
3663                 break;
3664
3665         case CTDB_CONTROL_GETDBPATH:
3666                 control_getdbpath(mem_ctx, req, &header, &request);
3667                 break;
3668
3669         case CTDB_CONTROL_GETVNNMAP:
3670                 control_getvnnmap(mem_ctx, req, &header, &request);
3671                 break;
3672
3673         case CTDB_CONTROL_GET_DEBUG:
3674                 control_get_debug(mem_ctx, req, &header, &request);
3675                 break;
3676
3677         case CTDB_CONTROL_SET_DEBUG:
3678                 control_set_debug(mem_ctx, req, &header, &request);
3679                 break;
3680
3681         case CTDB_CONTROL_GET_DBMAP:
3682                 control_get_dbmap(mem_ctx, req, &header, &request);
3683                 break;
3684
3685         case CTDB_CONTROL_GET_RECMODE:
3686                 control_get_recmode(mem_ctx, req, &header, &request);
3687                 break;
3688
3689         case CTDB_CONTROL_SET_RECMODE:
3690                 control_set_recmode(mem_ctx, req, &header, &request);
3691                 break;
3692
3693         case CTDB_CONTROL_DB_ATTACH:
3694                 control_db_attach(mem_ctx, req, &header, &request);
3695                 break;
3696
3697         case CTDB_CONTROL_REGISTER_SRVID:
3698                 control_register_srvid(mem_ctx, req, &header, &request);
3699                 break;
3700
3701         case CTDB_CONTROL_DEREGISTER_SRVID:
3702                 control_deregister_srvid(mem_ctx, req, &header, &request);
3703                 break;
3704
3705         case CTDB_CONTROL_GET_DBNAME:
3706                 control_get_dbname(mem_ctx, req, &header, &request);
3707                 break;
3708
3709         case CTDB_CONTROL_GET_PID:
3710                 control_get_pid(mem_ctx, req, &header, &request);
3711                 break;
3712
3713         case CTDB_CONTROL_GET_RECMASTER:
3714                 control_get_recmaster(mem_ctx, req, &header, &request);
3715                 break;
3716
3717         case CTDB_CONTROL_GET_PNN:
3718                 control_get_pnn(mem_ctx, req, &header, &request);
3719                 break;
3720
3721         case CTDB_CONTROL_SHUTDOWN:
3722                 control_shutdown(mem_ctx, req, &header, &request);
3723                 break;
3724
3725         case CTDB_CONTROL_SET_TUNABLE:
3726                 control_set_tunable(mem_ctx, req, &header, &request);
3727                 break;
3728
3729         case CTDB_CONTROL_GET_TUNABLE:
3730                 control_get_tunable(mem_ctx, req, &header, &request);
3731                 break;
3732
3733         case CTDB_CONTROL_LIST_TUNABLES:
3734                 control_list_tunables(mem_ctx, req, &header, &request);
3735                 break;
3736
3737         case CTDB_CONTROL_MODIFY_FLAGS:
3738                 control_modify_flags(mem_ctx, req, &header, &request);
3739                 break;
3740
3741         case CTDB_CONTROL_GET_ALL_TUNABLES:
3742                 control_get_all_tunables(mem_ctx, req, &header, &request);
3743                 break;
3744
3745         case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
3746                 control_db_attach_persistent(mem_ctx, req, &header, &request);
3747                 break;
3748
3749         case CTDB_CONTROL_UPTIME:
3750                 control_uptime(mem_ctx, req, &header, &request);
3751                 break;
3752
3753         case CTDB_CONTROL_RELOAD_NODES_FILE:
3754                 control_reload_nodes_file(mem_ctx, req, &header, &request);
3755                 break;
3756
3757         case CTDB_CONTROL_GET_CAPABILITIES:
3758                 control_get_capabilities(mem_ctx, req, &header, &request);
3759                 break;
3760
3761         case CTDB_CONTROL_RELEASE_IP:
3762                 control_release_ip(mem_ctx, req, &header, &request);
3763                 break;
3764
3765         case CTDB_CONTROL_TAKEOVER_IP:
3766                 control_takeover_ip(mem_ctx, req, &header, &request);
3767                 break;
3768
3769         case CTDB_CONTROL_GET_PUBLIC_IPS:
3770                 control_get_public_ips(mem_ctx, req, &header, &request);
3771                 break;
3772
3773         case CTDB_CONTROL_GET_NODEMAP:
3774                 control_get_nodemap(mem_ctx, req, &header, &request);
3775                 break;
3776
3777         case CTDB_CONTROL_GET_RECLOCK_FILE:
3778                 control_get_reclock_file(mem_ctx, req, &header, &request);
3779                 break;
3780
3781         case CTDB_CONTROL_STOP_NODE:
3782                 control_stop_node(mem_ctx, req, &header, &request);
3783                 break;
3784
3785         case CTDB_CONTROL_CONTINUE_NODE:
3786                 control_continue_node(mem_ctx, req, &header, &request);
3787                 break;
3788
3789         case CTDB_CONTROL_SET_BAN_STATE:
3790                 control_set_ban_state(mem_ctx, req, &header, &request);
3791                 break;
3792
3793         case CTDB_CONTROL_GET_DB_SEQNUM:
3794                 control_get_db_seqnum(mem_ctx, req, &header, &request);
3795                 break;
3796
3797         case CTDB_CONTROL_DB_GET_HEALTH:
3798                 control_db_get_health(mem_ctx, req, &header, &request);
3799                 break;
3800
3801         case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
3802                 control_get_public_ip_info(mem_ctx, req, &header, &request);
3803                 break;
3804
3805         case CTDB_CONTROL_GET_IFACES:
3806                 control_get_ifaces(mem_ctx, req, &header, &request);
3807                 break;
3808
3809         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
3810                 control_set_iface_link_state(mem_ctx, req, &header, &request);
3811                 break;
3812
3813         case CTDB_CONTROL_SET_DB_READONLY:
3814                 control_set_db_readonly(mem_ctx, req, &header, &request);
3815                 break;
3816
3817         case CTDB_CONTROL_SET_DB_STICKY:
3818                 control_set_db_sticky(mem_ctx, req, &header, &request);
3819                 break;
3820
3821         case CTDB_CONTROL_IPREALLOCATED:
3822                 control_ipreallocated(mem_ctx, req, &header, &request);
3823                 break;
3824
3825         case CTDB_CONTROL_GET_RUNSTATE:
3826                 control_get_runstate(mem_ctx, req, &header, &request);
3827                 break;
3828
3829         case CTDB_CONTROL_GET_NODES_FILE:
3830                 control_get_nodes_file(mem_ctx, req, &header, &request);
3831                 break;
3832
3833         case CTDB_CONTROL_DB_OPEN_FLAGS:
3834                 control_db_open_flags(mem_ctx, req, &header, &request);
3835                 break;
3836
3837         case CTDB_CONTROL_DB_ATTACH_REPLICATED:
3838                 control_db_attach_replicated(mem_ctx, req, &header, &request);
3839                 break;
3840
3841         case CTDB_CONTROL_CHECK_PID_SRVID:
3842                 control_check_pid_srvid(mem_ctx, req, &header, &request);
3843                 break;
3844
3845         default:
3846                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
3847                         control_error(mem_ctx, req, &header, &request);
3848                 }
3849                 break;
3850         }
3851
3852 done:
3853         talloc_free(mem_ctx);
3854 }
3855
3856 static int client_recv(struct tevent_req *req, int *perr)
3857 {
3858         struct client_state *state = tevent_req_data(
3859                 req, struct client_state);
3860         int err;
3861
3862         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
3863         close(state->fd);
3864
3865         if (tevent_req_is_unix_error(req, &err)) {
3866                 if (perr != NULL) {
3867                         *perr = err;
3868                 }
3869                 return -1;
3870         }
3871
3872         return state->status;
3873 }
3874
3875 /*
3876  * Fake CTDB server
3877  */
3878
3879 struct server_state {
3880         struct tevent_context *ev;
3881         struct ctdbd_context *ctdb;
3882         int fd;
3883 };
3884
3885 static void server_new_client(struct tevent_req *subreq);
3886 static void server_client_done(struct tevent_req *subreq);
3887
3888 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
3889                                       struct tevent_context *ev,
3890                                       struct ctdbd_context *ctdb,
3891                                       int fd)
3892 {
3893         struct tevent_req *req, *subreq;
3894         struct server_state *state;
3895
3896         req = tevent_req_create(mem_ctx, &state, struct server_state);
3897         if (req == NULL) {
3898                 return NULL;
3899         }
3900
3901         state->ev = ev;
3902         state->ctdb = ctdb;
3903         state->fd = fd;
3904
3905         subreq = accept_send(state, ev, fd);
3906         if (tevent_req_nomem(subreq, req)) {
3907                 return tevent_req_post(req, ev);
3908         }
3909         tevent_req_set_callback(subreq, server_new_client, req);
3910
3911         return req;
3912 }
3913
3914 static void server_new_client(struct tevent_req *subreq)
3915 {
3916         struct tevent_req *req = tevent_req_callback_data(
3917                 subreq, struct tevent_req);
3918         struct server_state *state = tevent_req_data(
3919                 req, struct server_state);
3920         struct ctdbd_context *ctdb = state->ctdb;
3921         int client_fd;
3922         int ret = 0;
3923
3924         client_fd = accept_recv(subreq, NULL, NULL, &ret);
3925         TALLOC_FREE(subreq);
3926         if (client_fd == -1) {
3927                 tevent_req_error(req, ret);
3928                 return;
3929         }
3930
3931         subreq = client_send(state, state->ev, client_fd,
3932                              ctdb, ctdb->node_map->pnn);
3933         if (tevent_req_nomem(subreq, req)) {
3934                 return;
3935         }
3936         tevent_req_set_callback(subreq, server_client_done, req);
3937
3938         ctdb->num_clients += 1;
3939
3940         subreq = accept_send(state, state->ev, state->fd);
3941         if (tevent_req_nomem(subreq, req)) {
3942                 return;
3943         }
3944         tevent_req_set_callback(subreq, server_new_client, req);
3945 }
3946
3947 static void server_client_done(struct tevent_req *subreq)
3948 {
3949         struct tevent_req *req = tevent_req_callback_data(
3950                 subreq, struct tevent_req);
3951         struct server_state *state = tevent_req_data(
3952                 req, struct server_state);
3953         struct ctdbd_context *ctdb = state->ctdb;
3954         int ret = 0;
3955         int status;
3956
3957         status = client_recv(subreq, &ret);
3958         TALLOC_FREE(subreq);
3959         if (status < 0) {
3960                 tevent_req_error(req, ret);
3961                 return;
3962         }
3963
3964         ctdb->num_clients -= 1;
3965
3966         if (status == 99) {
3967                 /* Special status, to shutdown server */
3968                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
3969                 tevent_req_done(req);
3970         }
3971 }
3972
3973 static bool server_recv(struct tevent_req *req, int *perr)
3974 {
3975         int err;
3976
3977         if (tevent_req_is_unix_error(req, &err)) {
3978                 if (perr != NULL) {
3979                         *perr = err;
3980                 }
3981                 return false;
3982         }
3983         return true;
3984 }
3985
3986 /*
3987  * Main functions
3988  */
3989
3990 static int socket_init(const char *sockpath)
3991 {
3992         struct sockaddr_un addr;
3993         size_t len;
3994         int ret, fd;
3995
3996         memset(&addr, 0, sizeof(addr));
3997         addr.sun_family = AF_UNIX;
3998
3999         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
4000         if (len >= sizeof(addr.sun_path)) {
4001                 fprintf(stderr, "path too long: %s\n", sockpath);
4002                 return -1;
4003         }
4004
4005         fd = socket(AF_UNIX, SOCK_STREAM, 0);
4006         if (fd == -1) {
4007                 fprintf(stderr, "socket failed - %s\n", sockpath);
4008                 return -1;
4009         }
4010
4011         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
4012         if (ret != 0) {
4013                 fprintf(stderr, "bind failed - %s\n", sockpath);
4014                 goto fail;
4015         }
4016
4017         ret = listen(fd, 10);
4018         if (ret != 0) {
4019                 fprintf(stderr, "listen failed\n");
4020                 goto fail;
4021         }
4022
4023         DEBUG(DEBUG_INFO, ("Socket init done\n"));
4024
4025         return fd;
4026
4027 fail:
4028         if (fd != -1) {
4029                 close(fd);
4030         }
4031         return -1;
4032 }
4033
4034 static struct options {
4035         const char *dbdir;
4036         const char *sockpath;
4037         const char *pidfile;
4038         const char *debuglevel;
4039 } options;
4040
4041 static struct poptOption cmdline_options[] = {
4042         POPT_AUTOHELP
4043         { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
4044                 "Database directory", "directory" },
4045         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
4046                 "Unix domain socket path", "filename" },
4047         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
4048                 "pid file", "filename" } ,
4049         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
4050                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
4051         POPT_TABLEEND
4052 };
4053
4054 static void cleanup(void)
4055 {
4056         unlink(options.sockpath);
4057         unlink(options.pidfile);
4058 }
4059
4060 static void signal_handler(int sig)
4061 {
4062         cleanup();
4063         exit(0);
4064 }
4065
4066 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
4067                          struct ctdbd_context *ctdb, int fd, int pfd)
4068 {
4069         struct tevent_req *req;
4070         int ret = 0;
4071         ssize_t len;
4072
4073         atexit(cleanup);
4074         signal(SIGTERM, signal_handler);
4075
4076         req = server_send(mem_ctx, ev, ctdb, fd);
4077         if (req == NULL) {
4078                 fprintf(stderr, "Memory error\n");
4079                 exit(1);
4080         }
4081
4082         len = write(pfd, &ret, sizeof(ret));
4083         if (len != sizeof(ret)) {
4084                 fprintf(stderr, "Failed to send message to parent\n");
4085                 exit(1);
4086         }
4087         close(pfd);
4088
4089         tevent_req_poll(req, ev);
4090
4091         server_recv(req, &ret);
4092         if (ret != 0) {
4093                 exit(1);
4094         }
4095 }
4096
4097 int main(int argc, const char *argv[])
4098 {
4099         TALLOC_CTX *mem_ctx;
4100         struct ctdbd_context *ctdb;
4101         struct tevent_context *ev;
4102         poptContext pc;
4103         int opt, fd, ret, pfd[2];
4104         ssize_t len;
4105         pid_t pid;
4106         FILE *fp;
4107
4108         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
4109                             POPT_CONTEXT_KEEP_FIRST);
4110         while ((opt = poptGetNextOpt(pc)) != -1) {
4111                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
4112                 exit(1);
4113         }
4114
4115         if (options.dbdir == NULL) {
4116                 fprintf(stderr, "Please specify database directory\n");
4117                 poptPrintHelp(pc, stdout, 0);
4118                 exit(1);
4119         }
4120
4121         if (options.sockpath == NULL) {
4122                 fprintf(stderr, "Please specify socket path\n");
4123                 poptPrintHelp(pc, stdout, 0);
4124                 exit(1);
4125         }
4126
4127         if (options.pidfile == NULL) {
4128                 fprintf(stderr, "Please specify pid file\n");
4129                 poptPrintHelp(pc, stdout, 0);
4130                 exit(1);
4131         }
4132
4133         mem_ctx = talloc_new(NULL);
4134         if (mem_ctx == NULL) {
4135                 fprintf(stderr, "Memory error\n");
4136                 exit(1);
4137         }
4138
4139         ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
4140         if (ret != 0) {
4141                 fprintf(stderr, "Invalid debug level\n");
4142                 poptPrintHelp(pc, stdout, 0);
4143                 exit(1);
4144         }
4145
4146         ctdb = ctdbd_setup(mem_ctx, options.dbdir);
4147         if (ctdb == NULL) {
4148                 exit(1);
4149         }
4150
4151         if (! ctdbd_verify(ctdb)) {
4152                 exit(1);
4153         }
4154
4155         ev = tevent_context_init(mem_ctx);
4156         if (ev == NULL) {
4157                 fprintf(stderr, "Memory error\n");
4158                 exit(1);
4159         }
4160
4161         fd = socket_init(options.sockpath);
4162         if (fd == -1) {
4163                 exit(1);
4164         }
4165
4166         ret = pipe(pfd);
4167         if (ret != 0) {
4168                 fprintf(stderr, "Failed to create pipe\n");
4169                 cleanup();
4170                 exit(1);
4171         }
4172
4173         pid = fork();
4174         if (pid == -1) {
4175                 fprintf(stderr, "Failed to fork\n");
4176                 cleanup();
4177                 exit(1);
4178         }
4179
4180         if (pid == 0) {
4181                 /* Child */
4182                 close(pfd[0]);
4183                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
4184                 exit(1);
4185         }
4186
4187         /* Parent */
4188         close(pfd[1]);
4189
4190         len = read(pfd[0], &ret, sizeof(ret));
4191         close(pfd[0]);
4192         if (len != sizeof(ret)) {
4193                 fprintf(stderr, "len = %zi\n", len);
4194                 fprintf(stderr, "Failed to get message from child\n");
4195                 kill(pid, SIGTERM);
4196                 exit(1);
4197         }
4198
4199         fp = fopen(options.pidfile, "w");
4200         if (fp == NULL) {
4201                 fprintf(stderr, "Failed to open pid file %s\n",
4202                         options.pidfile);
4203                 kill(pid, SIGTERM);
4204                 exit(1);
4205         }
4206         fprintf(fp, "%d\n", pid);
4207         fclose(fp);
4208
4209         return 0;
4210 }