ctdb-tests: Add req_call processing in fake_ctdbd
[metze/samba/wip.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24
25 #include <popt.h>
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/async_req/async_sock.h"
35
36 #include "protocol/protocol.h"
37 #include "protocol/protocol_api.h"
38 #include "protocol/protocol_util.h"
39 #include "protocol/protocol_private.h"
40
41 #include "common/comm.h"
42 #include "common/logging.h"
43 #include "common/tunable.h"
44 #include "common/srvid.h"
45
46 #include "ipalloc_read_known_ips.h"
47
48
49 #define CTDB_PORT 4379
50
51 /* A fake flag that is only supported by some functions */
52 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
53
54 struct node {
55         ctdb_sock_addr addr;
56         uint32_t pnn;
57         uint32_t flags;
58         uint32_t capabilities;
59         bool recovery_disabled;
60         void *recovery_substate;
61 };
62
63 struct node_map {
64         uint32_t num_nodes;
65         struct node *node;
66         uint32_t pnn;
67         uint32_t recmaster;
68 };
69
70 struct interface {
71         const char *name;
72         bool link_up;
73         uint32_t references;
74 };
75
76 struct interface_map {
77         int num;
78         struct interface *iface;
79 };
80
81 struct vnn_map {
82         uint32_t recmode;
83         uint32_t generation;
84         uint32_t size;
85         uint32_t *map;
86 };
87
88 struct database {
89         struct database *prev, *next;
90         const char *name;
91         const char *path;
92         struct tdb_context *tdb;
93         uint32_t id;
94         uint8_t flags;
95         uint64_t seq_num;
96 };
97
98 struct database_map {
99         struct database *db;
100         const char *dbdir;
101 };
102
103 struct fake_control_failure {
104         struct fake_control_failure  *prev, *next;
105         enum ctdb_controls opcode;
106         uint32_t pnn;
107         const char *error;
108         const char *comment;
109 };
110
111 struct ctdb_client {
112         struct ctdb_client *prev, *next;
113         struct ctdbd_context *ctdb;
114         pid_t pid;
115         void *state;
116 };
117
118 struct ctdbd_context {
119         struct node_map *node_map;
120         struct interface_map *iface_map;
121         struct vnn_map *vnn_map;
122         struct database_map *db_map;
123         struct srvid_context *srv;
124         int num_clients;
125         struct timeval start_time;
126         struct timeval recovery_start_time;
127         struct timeval recovery_end_time;
128         bool takeover_disabled;
129         int log_level;
130         enum ctdb_runstate runstate;
131         struct ctdb_tunable_list tun_list;
132         char *reclock;
133         struct ctdb_public_ip_list *known_ips;
134         struct fake_control_failure *control_failures;
135         struct ctdb_client *client_list;
136 };
137
138 /*
139  * Parse routines
140  */
141
142 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
143 {
144         struct node_map *node_map;
145
146         node_map = talloc_zero(mem_ctx, struct node_map);
147         if (node_map == NULL) {
148                 return NULL;
149         }
150
151         node_map->pnn = CTDB_UNKNOWN_PNN;
152         node_map->recmaster = CTDB_UNKNOWN_PNN;
153
154         return node_map;
155 }
156
157 /* Read a nodemap from stdin.  Each line looks like:
158  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
159  * EOF or a blank line terminates input.
160  *
161  * By default, capablities for each node are
162  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
163  * capabilities can be faked off by adding, for example,
164  * -CTDB_CAP_RECMASTER.
165  */
166
167 static bool nodemap_parse(struct node_map *node_map)
168 {
169         char line[1024];
170
171         while ((fgets(line, sizeof(line), stdin) != NULL)) {
172                 uint32_t pnn, flags, capabilities;
173                 char *tok, *t;
174                 char *ip;
175                 ctdb_sock_addr saddr;
176                 struct node *node;
177                 int ret;
178
179                 if (line[0] == '\n') {
180                         break;
181                 }
182
183                 /* Get rid of pesky newline */
184                 if ((t = strchr(line, '\n')) != NULL) {
185                         *t = '\0';
186                 }
187
188                 /* Get PNN */
189                 tok = strtok(line, " \t");
190                 if (tok == NULL) {
191                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
192                         continue;
193                 }
194                 pnn = (uint32_t)strtoul(tok, NULL, 0);
195
196                 /* Get IP */
197                 tok = strtok(NULL, " \t");
198                 if (tok == NULL) {
199                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
200                         continue;
201                 }
202                 ret = ctdb_sock_addr_from_string(tok, &saddr, false);
203                 if (ret != 0) {
204                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
205                         continue;
206                 }
207                 ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
208                 ip = talloc_strdup(node_map, tok);
209                 if (ip == NULL) {
210                         goto fail;
211                 }
212
213                 /* Get flags */
214                 tok = strtok(NULL, " \t");
215                 if (tok == NULL) {
216                         fprintf(stderr, "bad line (%s) - missing flags\n",
217                                 line);
218                         continue;
219                 }
220                 flags = (uint32_t)strtoul(tok, NULL, 0);
221                 /* Handle deleted nodes */
222                 if (flags & NODE_FLAGS_DELETED) {
223                         talloc_free(ip);
224                         ip = talloc_strdup(node_map, "0.0.0.0");
225                         if (ip == NULL) {
226                                 goto fail;
227                         }
228                 }
229                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
230
231                 tok = strtok(NULL, " \t");
232                 while (tok != NULL) {
233                         if (strcmp(tok, "CURRENT") == 0) {
234                                 node_map->pnn = pnn;
235                         } else if (strcmp(tok, "RECMASTER") == 0) {
236                                 node_map->recmaster = pnn;
237                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
238                                 capabilities &= ~CTDB_CAP_RECMASTER;
239                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
240                                 capabilities &= ~CTDB_CAP_LMASTER;
241                         } else if (strcmp(tok, "TIMEOUT") == 0) {
242                                 /* This can be done with just a flag
243                                  * value but it is probably clearer
244                                  * and less error-prone to fake this
245                                  * with an explicit token */
246                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
247                         }
248                         tok = strtok(NULL, " \t");
249                 }
250
251                 node_map->node = talloc_realloc(node_map, node_map->node,
252                                                 struct node,
253                                                 node_map->num_nodes + 1);
254                 if (node_map->node == NULL) {
255                         goto fail;
256                 }
257                 node = &node_map->node[node_map->num_nodes];
258
259                 ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
260                 if (ret != 0) {
261                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
262                         continue;
263                 }
264                 ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
265                 node->pnn = pnn;
266                 node->flags = flags;
267                 node->capabilities = capabilities;
268                 node->recovery_disabled = false;
269                 node->recovery_substate = NULL;
270
271                 node_map->num_nodes += 1;
272         }
273
274         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
275         return true;
276
277 fail:
278         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
279         return false;
280
281 }
282
283 /* Append a node to a node map with given address and flags */
284 static bool node_map_add(struct ctdb_node_map *nodemap,
285                          const char *nstr, uint32_t flags)
286 {
287         ctdb_sock_addr addr;
288         uint32_t num;
289         struct ctdb_node_and_flags *n;
290         int ret;
291
292         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
293         if (ret != 0) {
294                 fprintf(stderr, "Invalid IP address %s\n", nstr);
295                 return false;
296         }
297         ctdb_sock_addr_set_port(&addr, CTDB_PORT);
298
299         num = nodemap->num;
300         nodemap->node = talloc_realloc(nodemap, nodemap->node,
301                                        struct ctdb_node_and_flags, num+1);
302         if (nodemap->node == NULL) {
303                 return false;
304         }
305
306         n = &nodemap->node[num];
307         n->addr = addr;
308         n->pnn = num;
309         n->flags = flags;
310
311         nodemap->num = num+1;
312         return true;
313 }
314
315 /* Read a nodes file into a node map */
316 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
317                                                   const char *nlist)
318 {
319         char **lines;
320         int nlines;
321         int i;
322         struct ctdb_node_map *nodemap;
323
324         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
325         if (nodemap == NULL) {
326                 return NULL;
327         }
328
329         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
330         if (lines == NULL) {
331                 return NULL;
332         }
333
334         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
335                 nlines--;
336         }
337
338         for (i=0; i<nlines; i++) {
339                 char *node;
340                 uint32_t flags;
341                 size_t len;
342
343                 node = lines[i];
344                 /* strip leading spaces */
345                 while((*node == ' ') || (*node == '\t')) {
346                         node++;
347                 }
348
349                 len = strlen(node);
350
351                 /* strip trailing spaces */
352                 while ((len > 1) &&
353                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
354                 {
355                         node[len-1] = '\0';
356                         len--;
357                 }
358
359                 if (len == 0) {
360                         continue;
361                 }
362                 if (*node == '#') {
363                         /* A "deleted" node is a node that is
364                            commented out in the nodes file.  This is
365                            used instead of removing a line, which
366                            would cause subsequent nodes to change
367                            their PNN. */
368                         flags = NODE_FLAGS_DELETED;
369                         node = discard_const("0.0.0.0");
370                 } else {
371                         flags = 0;
372                 }
373                 if (! node_map_add(nodemap, node, flags)) {
374                         talloc_free(lines);
375                         TALLOC_FREE(nodemap);
376                         return NULL;
377                 }
378         }
379
380         talloc_free(lines);
381         return nodemap;
382 }
383
384 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
385                                              uint32_t pnn)
386 {
387         struct ctdb_node_map *nodemap;
388         char nodes_list[PATH_MAX];
389         const char *ctdb_base;
390         int num;
391
392         ctdb_base = getenv("CTDB_BASE");
393         if (ctdb_base == NULL) {
394                 D_ERR("CTDB_BASE is not set\n");
395                 return NULL;
396         }
397
398         /* read optional node-specific nodes file */
399         num = snprintf(nodes_list, sizeof(nodes_list),
400                        "%s/nodes.%d", ctdb_base, pnn);
401         if (num == sizeof(nodes_list)) {
402                 D_ERR("nodes file path too long\n");
403                 return NULL;
404         }
405         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
406         if (nodemap != NULL) {
407                 /* Fake a load failure for an empty nodemap */
408                 if (nodemap->num == 0) {
409                         talloc_free(nodemap);
410
411                         D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
412                         return NULL;
413                 }
414
415                 return nodemap;
416         }
417
418         /* read normal nodes file */
419         num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
420         if (num == sizeof(nodes_list)) {
421                 D_ERR("nodes file path too long\n");
422                 return NULL;
423         }
424         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
425         if (nodemap != NULL) {
426                 return nodemap;
427         }
428
429         DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
430         return NULL;
431 }
432
433 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
434 {
435         struct interface_map *iface_map;
436
437         iface_map = talloc_zero(mem_ctx, struct interface_map);
438         if (iface_map == NULL) {
439                 return NULL;
440         }
441
442         return iface_map;
443 }
444
445 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
446  * output:
447  *   :Name:LinkStatus:References:
448  *   :eth2:1:4294967294
449  *   :eth1:1:4294967292
450  */
451
452 static bool interfaces_parse(struct interface_map *iface_map)
453 {
454         char line[1024];
455
456         while ((fgets(line, sizeof(line), stdin) != NULL)) {
457                 uint16_t link_state;
458                 uint32_t references;
459                 char *tok, *t, *name;
460                 struct interface *iface;
461
462                 if (line[0] == '\n') {
463                         break;
464                 }
465
466                 /* Get rid of pesky newline */
467                 if ((t = strchr(line, '\n')) != NULL) {
468                         *t = '\0';
469                 }
470
471                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
472                         continue;
473                 }
474
475                 /* Leading colon... */
476                 // tok = strtok(line, ":");
477
478                 /* name */
479                 tok = strtok(line, ":");
480                 if (tok == NULL) {
481                         fprintf(stderr, "bad line (%s) - missing name\n", line);
482                         continue;
483                 }
484                 name = tok;
485
486                 /* link_state */
487                 tok = strtok(NULL, ":");
488                 if (tok == NULL) {
489                         fprintf(stderr, "bad line (%s) - missing link state\n",
490                                 line);
491                         continue;
492                 }
493                 link_state = (uint16_t)strtoul(tok, NULL, 0);
494
495                 /* references... */
496                 tok = strtok(NULL, ":");
497                 if (tok == NULL) {
498                         fprintf(stderr, "bad line (%s) - missing references\n",
499                                 line);
500                         continue;
501                 }
502                 references = (uint32_t)strtoul(tok, NULL, 0);
503
504                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
505                                                   struct interface,
506                                                   iface_map->num + 1);
507                 if (iface_map->iface == NULL) {
508                         goto fail;
509                 }
510
511                 iface = &iface_map->iface[iface_map->num];
512
513                 iface->name = talloc_strdup(iface_map, name);
514                 if (iface->name == NULL) {
515                         goto fail;
516                 }
517                 iface->link_up = link_state;
518                 iface->references = references;
519
520                 iface_map->num += 1;
521         }
522
523         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
524         return true;
525
526 fail:
527         fprintf(stderr, "Parsing interfaces failed\n");
528         return false;
529 }
530
531 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
532 {
533         struct vnn_map *vnn_map;
534
535         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
536         if (vnn_map == NULL) {
537                 fprintf(stderr, "Memory error\n");
538                 return NULL;
539         }
540         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
541         vnn_map->generation = INVALID_GENERATION;
542
543         return vnn_map;
544 }
545
546 /* Read vnn map.
547  * output:
548  *   <GENERATION>
549  *   <LMASTER0>
550  *   <LMASTER1>
551  *   ...
552  */
553
554 static bool vnnmap_parse(struct vnn_map *vnn_map)
555 {
556         char line[1024];
557
558         while (fgets(line, sizeof(line), stdin) != NULL) {
559                 uint32_t n;
560                 char *t;
561
562                 if (line[0] == '\n') {
563                         break;
564                 }
565
566                 /* Get rid of pesky newline */
567                 if ((t = strchr(line, '\n')) != NULL) {
568                         *t = '\0';
569                 }
570
571                 n = (uint32_t) strtol(line, NULL, 0);
572
573                 /* generation */
574                 if (vnn_map->generation == INVALID_GENERATION) {
575                         vnn_map->generation = n;
576                         continue;
577                 }
578
579                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
580                                               vnn_map->size + 1);
581                 if (vnn_map->map == NULL) {
582                         fprintf(stderr, "Memory error\n");
583                         goto fail;
584                 }
585
586                 vnn_map->map[vnn_map->size] = n;
587                 vnn_map->size += 1;
588         }
589
590         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
591         return true;
592
593 fail:
594         fprintf(stderr, "Parsing vnnmap failed\n");
595         return false;
596 }
597
598 static bool reclock_parse(struct ctdbd_context *ctdb)
599 {
600         char line[1024];
601         char *t;
602
603         if (fgets(line, sizeof(line), stdin) == NULL) {
604                 goto fail;
605         }
606
607         if (line[0] == '\n') {
608                 /* Recovery lock remains unset */
609                 goto ok;
610         }
611
612         /* Get rid of pesky newline */
613         if ((t = strchr(line, '\n')) != NULL) {
614                 *t = '\0';
615         }
616
617         ctdb->reclock = talloc_strdup(ctdb, line);
618         if (ctdb->reclock == NULL) {
619                 goto fail;
620         }
621 ok:
622         /* Swallow possible blank line following section.  Picky
623          * compiler settings don't allow the return value to be
624          * ignored, so make the compiler happy.
625          */
626         if (fgets(line, sizeof(line), stdin) == NULL) {
627                 ;
628         }
629         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
630         return true;
631
632 fail:
633         fprintf(stderr, "Parsing reclock failed\n");
634         return false;
635 }
636
637 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
638                                        const char *dbdir)
639 {
640         struct database_map *db_map;
641
642         db_map = talloc_zero(mem_ctx, struct database_map);
643         if (db_map == NULL) {
644                 return NULL;
645         }
646
647         db_map->dbdir = talloc_strdup(db_map, dbdir);
648         if (db_map->dbdir == NULL) {
649                 talloc_free(db_map);
650                 return NULL;
651         }
652
653         return db_map;
654 }
655
656 /* Read a database map from stdin.  Each line looks like:
657  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
658  * EOF or a blank line terminates input.
659  *
660  * By default, flags and seq_num are 0
661  */
662
663 static bool dbmap_parse(struct database_map *db_map)
664 {
665         char line[1024];
666
667         while ((fgets(line, sizeof(line), stdin) != NULL)) {
668                 uint32_t id;
669                 uint8_t flags = 0;
670                 uint32_t seq_num = 0;
671                 char *tok, *t;
672                 char *name;
673                 struct database *db;
674
675                 if (line[0] == '\n') {
676                         break;
677                 }
678
679                 /* Get rid of pesky newline */
680                 if ((t = strchr(line, '\n')) != NULL) {
681                         *t = '\0';
682                 }
683
684                 /* Get ID */
685                 tok = strtok(line, " \t");
686                 if (tok == NULL) {
687                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
688                         continue;
689                 }
690                 id = (uint32_t)strtoul(tok, NULL, 0);
691
692                 /* Get NAME */
693                 tok = strtok(NULL, " \t");
694                 if (tok == NULL) {
695                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
696                         continue;
697                 }
698                 name = talloc_strdup(db_map, tok);
699                 if (name == NULL) {
700                         goto fail;
701                 }
702
703                 /* Get flags */
704                 tok = strtok(NULL, " \t");
705                 while (tok != NULL) {
706                         if (strcmp(tok, "PERSISTENT") == 0) {
707                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
708                         } else if (strcmp(tok, "STICKY") == 0) {
709                                 flags |= CTDB_DB_FLAGS_STICKY;
710                         } else if (strcmp(tok, "READONLY") == 0) {
711                                 flags |= CTDB_DB_FLAGS_READONLY;
712                         } else if (strcmp(tok, "REPLICATED") == 0) {
713                                 flags |= CTDB_DB_FLAGS_REPLICATED;
714                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
715                                 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
716                                              CTDB_DB_FLAGS_REPLICATED;
717
718                                 if ((flags & nv) == 0) {
719                                         fprintf(stderr,
720                                                 "seq_num for volatile db\n");
721                                         goto fail;
722                                 }
723                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
724                         }
725
726                         tok = strtok(NULL, " \t");
727                 }
728
729                 db = talloc_zero(db_map, struct database);
730                 if (db == NULL) {
731                         goto fail;
732                 }
733
734                 db->id = id;
735                 db->name = talloc_steal(db, name);
736                 db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
737                 if (db->path == NULL) {
738                         talloc_free(db);
739                         goto fail;
740                 }
741                 db->flags = flags;
742                 db->seq_num = seq_num;
743
744                 DLIST_ADD_END(db_map->db, db);
745         }
746
747         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
748         return true;
749
750 fail:
751         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
752         return false;
753
754 }
755
756 static struct database *database_find(struct database_map *db_map,
757                                       uint32_t db_id)
758 {
759         struct database *db;
760
761         for (db = db_map->db; db != NULL; db = db->next) {
762                 if (db->id == db_id) {
763                         return db;
764                 }
765         }
766
767         return NULL;
768 }
769
770 static int database_count(struct database_map *db_map)
771 {
772         struct database *db;
773         int count = 0;
774
775         for (db = db_map->db; db != NULL; db = db->next) {
776                 count += 1;
777         }
778
779         return count;
780 }
781
782 static int database_flags(uint8_t db_flags)
783 {
784         int tdb_flags = 0;
785
786         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
787                 tdb_flags = TDB_DEFAULT;
788         } else {
789                 /* volatile and replicated use the same flags */
790                 tdb_flags = TDB_NOSYNC |
791                             TDB_CLEAR_IF_FIRST |
792                             TDB_INCOMPATIBLE_HASH;
793         }
794
795         tdb_flags |= TDB_DISALLOW_NESTING;
796
797         return tdb_flags;
798 }
799
800 static struct database *database_new(struct database_map *db_map,
801                                      const char *name, uint8_t flags)
802 {
803         struct database *db;
804         TDB_DATA key;
805         int tdb_flags;
806
807         db = talloc_zero(db_map, struct database);
808         if (db == NULL) {
809                 return NULL;
810         }
811
812         db->name = talloc_strdup(db, name);
813         if (db->name == NULL) {
814                 goto fail;
815         }
816
817         db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
818         if (db->path == NULL) {
819                 goto fail;
820         }
821
822         key.dsize = strlen(db->name) + 1;
823         key.dptr = discard_const(db->name);
824
825         db->id = tdb_jenkins_hash(&key);
826         db->flags = flags;
827
828         tdb_flags = database_flags(flags);
829
830         db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
831         if (db->tdb == NULL) {
832                 DBG_ERR("tdb_open\n");
833                 goto fail;
834         }
835
836         DLIST_ADD_END(db_map->db, db);
837         return db;
838
839 fail:
840         DBG_ERR("Memory error\n");
841         talloc_free(db);
842         return NULL;
843
844 }
845
846 static int ltdb_store(struct database *db, TDB_DATA key,
847                       struct ctdb_ltdb_header *header, TDB_DATA data)
848 {
849         int ret;
850         bool db_volatile = true;
851         bool keep = false;
852
853         if (db->tdb == NULL) {
854                 return EINVAL;
855         }
856
857         if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
858             (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
859                 db_volatile = false;
860         }
861
862         if (data.dsize > 0) {
863                 keep = true;
864         } else {
865                 if (db_volatile && header->rsn == 0) {
866                         keep = true;
867                 }
868         }
869
870         if (keep) {
871                 TDB_DATA rec[2];
872
873                 rec[0].dsize = ctdb_ltdb_header_len(header);
874                 rec[0].dptr = (uint8_t *)header;
875
876                 rec[1].dsize = data.dsize;
877                 rec[1].dptr = data.dptr;
878
879                 ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
880         } else {
881                 if (header->rsn > 0) {
882                         ret = tdb_delete(db->tdb, key);
883                 } else {
884                         ret = 0;
885                 }
886         }
887
888         return ret;
889 }
890
891 static int ltdb_fetch(struct database *db, TDB_DATA key,
892                       struct ctdb_ltdb_header *header,
893                       TALLOC_CTX *mem_ctx, TDB_DATA *data)
894 {
895         TDB_DATA rec;
896         size_t np;
897         int ret;
898
899         if (db->tdb == NULL) {
900                 return EINVAL;
901         }
902
903         rec = tdb_fetch(db->tdb, key);
904         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
905         if (ret != 0) {
906                 if (rec.dptr != NULL) {
907                         free(rec.dptr);
908                 }
909
910                 *header = (struct ctdb_ltdb_header) {
911                         .rsn = 0,
912                         .dmaster = 0,
913                         .flags = 0,
914                 };
915
916                 ret = ltdb_store(db, key, header, tdb_null);
917                 if (ret != 0) {
918                         return ret;
919                 }
920
921                 *data = tdb_null;
922                 return 0;
923         }
924
925         data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
926         data->dptr = talloc_memdup(mem_ctx,
927                                    rec.dptr + ctdb_ltdb_header_len(header),
928                                    data->dsize);
929         if (data->dptr == NULL) {
930                 free(rec.dptr);
931                 return ENOMEM;
932         }
933
934         return 0;
935 }
936
937 static int database_seqnum(struct database *db, uint64_t *seqnum)
938 {
939         const char *keyname = CTDB_DB_SEQNUM_KEY;
940         TDB_DATA key, data;
941         struct ctdb_ltdb_header header;
942         size_t np;
943         int ret;
944
945         if (db->tdb == NULL) {
946                 *seqnum = db->seq_num;
947                 return 0;
948         }
949
950         key.dptr = discard_const(keyname);
951         key.dsize = strlen(keyname) + 1;
952
953         ret = ltdb_fetch(db, key, &header, db, &data);
954         if (ret != 0) {
955                 return ret;
956         }
957
958         if (data.dsize == 0) {
959                 *seqnum = 0;
960                 return 0;
961         }
962
963         ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
964         talloc_free(data.dptr);
965         if (ret != 0) {
966                 *seqnum = 0;
967         }
968
969         return ret;
970 }
971
972 static bool public_ips_parse(struct ctdbd_context *ctdb,
973                              uint32_t numnodes)
974 {
975         bool status;
976
977         if (numnodes == 0) {
978                 D_ERR("Must initialise nodemap before public IPs\n");
979                 return false;
980         }
981
982         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
983
984         status = (ctdb->known_ips != NULL);
985
986         if (status) {
987                 D_INFO("Parsing public IPs done\n");
988         } else {
989                 D_INFO("Parsing public IPs failed\n");
990         }
991
992         return status;
993 }
994
995 /* Read information about controls to fail.  Format is:
996  *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
997  */
998 static bool control_failures_parse(struct ctdbd_context *ctdb)
999 {
1000         char line[1024];
1001
1002         while ((fgets(line, sizeof(line), stdin) != NULL)) {
1003                 char *tok, *t;
1004                 enum ctdb_controls opcode;
1005                 uint32_t pnn;
1006                 const char *error;
1007                 const char *comment;
1008                 struct fake_control_failure *failure = NULL;
1009
1010                 if (line[0] == '\n') {
1011                         break;
1012                 }
1013
1014                 /* Get rid of pesky newline */
1015                 if ((t = strchr(line, '\n')) != NULL) {
1016                         *t = '\0';
1017                 }
1018
1019                 /* Get opcode */
1020                 tok = strtok(line, " \t");
1021                 if (tok == NULL) {
1022                         D_ERR("bad line (%s) - missing opcode\n", line);
1023                         continue;
1024                 }
1025                 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
1026
1027                 /* Get PNN */
1028                 tok = strtok(NULL, " \t");
1029                 if (tok == NULL) {
1030                         D_ERR("bad line (%s) - missing PNN\n", line);
1031                         continue;
1032                 }
1033                 pnn = (uint32_t)strtoul(tok, NULL, 0);
1034
1035                 /* Get error */
1036                 tok = strtok(NULL, " \t");
1037                 if (tok == NULL) {
1038                         D_ERR("bad line (%s) - missing errno\n", line);
1039                         continue;
1040                 }
1041                 error = talloc_strdup(ctdb, tok);
1042                 if (error == NULL) {
1043                         goto fail;
1044                 }
1045                 if (strcmp(error, "ERROR") != 0 &&
1046                     strcmp(error, "TIMEOUT") != 0) {
1047                         D_ERR("bad line (%s) "
1048                               "- error must be \"ERROR\" or \"TIMEOUT\"\n",
1049                               line);
1050                         goto fail;
1051                 }
1052
1053                 /* Get comment */
1054                 tok = strtok(NULL, "\n"); /* rest of line */
1055                 if (tok == NULL) {
1056                         D_ERR("bad line (%s) - missing comment\n", line);
1057                         continue;
1058                 }
1059                 comment = talloc_strdup(ctdb, tok);
1060                 if (comment == NULL) {
1061                         goto fail;
1062                 }
1063
1064                 failure = talloc_zero(ctdb, struct fake_control_failure);
1065                 if (failure == NULL) {
1066                         goto fail;
1067                 }
1068
1069                 failure->opcode = opcode;
1070                 failure->pnn = pnn;
1071                 failure->error = error;
1072                 failure->comment = comment;
1073
1074                 DLIST_ADD(ctdb->control_failures, failure);
1075         }
1076
1077         D_INFO("Parsing fake control failures done\n");
1078         return true;
1079
1080 fail:
1081         D_INFO("Parsing fake control failures failed\n");
1082         return false;
1083 }
1084
1085 /*
1086  * Manage clients
1087  */
1088
1089 static int ctdb_client_destructor(struct ctdb_client *client)
1090 {
1091         DLIST_REMOVE(client->ctdb->client_list, client);
1092         return 0;
1093 }
1094
1095 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
1096                       void *client_state)
1097 {
1098         struct ctdb_client *client;
1099
1100         client = talloc_zero(client_state, struct ctdb_client);
1101         if (client == NULL) {
1102                 return ENOMEM;
1103         }
1104
1105         client->ctdb = ctdb;
1106         client->pid = client_pid;
1107         client->state = client_state;
1108
1109         DLIST_ADD(ctdb->client_list, client);
1110         talloc_set_destructor(client, ctdb_client_destructor);
1111         return 0;
1112 }
1113
1114 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
1115 {
1116         struct ctdb_client *client;
1117
1118         for (client=ctdb->client_list; client != NULL; client=client->next) {
1119                 if (client->pid == client_pid) {
1120                         return client->state;
1121                 }
1122         }
1123
1124         return NULL;
1125 }
1126
1127 /*
1128  * CTDB context setup
1129  */
1130
1131 static uint32_t new_generation(uint32_t old_generation)
1132 {
1133         uint32_t generation;
1134
1135         while (1) {
1136                 generation = random();
1137                 if (generation != INVALID_GENERATION &&
1138                     generation != old_generation) {
1139                         break;
1140                 }
1141         }
1142
1143         return generation;
1144 }
1145
1146 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
1147                                          const char *dbdir)
1148 {
1149         struct ctdbd_context *ctdb;
1150         char line[1024];
1151         bool status;
1152         int ret;
1153
1154         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
1155         if (ctdb == NULL) {
1156                 return NULL;
1157         }
1158
1159         ctdb->node_map = nodemap_init(ctdb);
1160         if (ctdb->node_map == NULL) {
1161                 goto fail;
1162         }
1163
1164         ctdb->iface_map = interfaces_init(ctdb);
1165         if (ctdb->iface_map == NULL) {
1166                 goto fail;
1167         }
1168
1169         ctdb->vnn_map = vnnmap_init(ctdb);
1170         if (ctdb->vnn_map == NULL) {
1171                 goto fail;
1172         }
1173
1174         ctdb->db_map = dbmap_init(ctdb, dbdir);
1175         if (ctdb->db_map == NULL) {
1176                 goto fail;
1177         }
1178
1179         ret = srvid_init(ctdb, &ctdb->srv);
1180         if (ret != 0) {
1181                 goto fail;
1182         }
1183
1184         while (fgets(line, sizeof(line), stdin) != NULL) {
1185                 char *t;
1186
1187                 if ((t = strchr(line, '\n')) != NULL) {
1188                         *t = '\0';
1189                 }
1190
1191                 if (strcmp(line, "NODEMAP") == 0) {
1192                         status = nodemap_parse(ctdb->node_map);
1193                 } else if (strcmp(line, "IFACES") == 0) {
1194                         status = interfaces_parse(ctdb->iface_map);
1195                 } else if (strcmp(line, "VNNMAP") == 0) {
1196                         status = vnnmap_parse(ctdb->vnn_map);
1197                 } else if (strcmp(line, "DBMAP") == 0) {
1198                         status = dbmap_parse(ctdb->db_map);
1199                 } else if (strcmp(line, "PUBLICIPS") == 0) {
1200                         status = public_ips_parse(ctdb,
1201                                                   ctdb->node_map->num_nodes);
1202                 } else if (strcmp(line, "RECLOCK") == 0) {
1203                         status = reclock_parse(ctdb);
1204                 } else if (strcmp(line, "CONTROLFAILS") == 0) {
1205                         status = control_failures_parse(ctdb);
1206                 } else {
1207                         fprintf(stderr, "Unknown line %s\n", line);
1208                         status = false;
1209                 }
1210
1211                 if (! status) {
1212                         goto fail;
1213                 }
1214         }
1215
1216         ctdb->start_time = tevent_timeval_current();
1217         ctdb->recovery_start_time = tevent_timeval_current();
1218         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1219         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
1220                 ctdb->vnn_map->generation =
1221                         new_generation(ctdb->vnn_map->generation);
1222         }
1223         ctdb->recovery_end_time = tevent_timeval_current();
1224
1225         ctdb->log_level = DEBUG_ERR;
1226         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
1227
1228         ctdb_tunable_set_defaults(&ctdb->tun_list);
1229
1230         return ctdb;
1231
1232 fail:
1233         TALLOC_FREE(ctdb);
1234         return NULL;
1235 }
1236
1237 static bool ctdbd_verify(struct ctdbd_context *ctdb)
1238 {
1239         struct node *node;
1240         int i;
1241
1242         if (ctdb->node_map->num_nodes == 0) {
1243                 return true;
1244         }
1245
1246         /* Make sure all the nodes are in order */
1247         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1248                 node = &ctdb->node_map->node[i];
1249                 if (node->pnn != i) {
1250                         fprintf(stderr, "Expected node %u, found %u\n",
1251                                 i, node->pnn);
1252                         return false;
1253                 }
1254         }
1255
1256         node = &ctdb->node_map->node[ctdb->node_map->pnn];
1257         if (node->flags & NODE_FLAGS_DISCONNECTED) {
1258                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1259                 exit(0);
1260         }
1261
1262         return true;
1263 }
1264
1265 /*
1266  * Doing a recovery
1267  */
1268
1269 struct recover_state {
1270         struct tevent_context *ev;
1271         struct ctdbd_context *ctdb;
1272 };
1273
1274 static int recover_check(struct tevent_req *req);
1275 static void recover_wait_done(struct tevent_req *subreq);
1276 static void recover_done(struct tevent_req *subreq);
1277
1278 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1279                                        struct tevent_context *ev,
1280                                        struct ctdbd_context *ctdb)
1281 {
1282         struct tevent_req *req;
1283         struct recover_state *state;
1284         int ret;
1285
1286         req = tevent_req_create(mem_ctx, &state, struct recover_state);
1287         if (req == NULL) {
1288                 return NULL;
1289         }
1290
1291         state->ev = ev;
1292         state->ctdb = ctdb;
1293
1294         ret = recover_check(req);
1295         if (ret != 0) {
1296                 tevent_req_error(req, ret);
1297                 return tevent_req_post(req, ev);
1298         }
1299
1300         return req;
1301 }
1302
1303 static int recover_check(struct tevent_req *req)
1304 {
1305         struct recover_state *state = tevent_req_data(
1306                 req, struct recover_state);
1307         struct ctdbd_context *ctdb = state->ctdb;
1308         struct tevent_req *subreq;
1309         bool recovery_disabled;
1310         int i;
1311
1312         recovery_disabled = false;
1313         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1314                 if (ctdb->node_map->node[i].recovery_disabled) {
1315                         recovery_disabled = true;
1316                         break;
1317                 }
1318         }
1319
1320         subreq = tevent_wakeup_send(state, state->ev,
1321                                     tevent_timeval_current_ofs(1, 0));
1322         if (subreq == NULL) {
1323                 return ENOMEM;
1324         }
1325
1326         if (recovery_disabled) {
1327                 tevent_req_set_callback(subreq, recover_wait_done, req);
1328         } else {
1329                 ctdb->recovery_start_time = tevent_timeval_current();
1330                 tevent_req_set_callback(subreq, recover_done, req);
1331         }
1332
1333         return 0;
1334 }
1335
1336 static void recover_wait_done(struct tevent_req *subreq)
1337 {
1338         struct tevent_req *req = tevent_req_callback_data(
1339                 subreq, struct tevent_req);
1340         int ret;
1341         bool status;
1342
1343         status = tevent_wakeup_recv(subreq);
1344         TALLOC_FREE(subreq);
1345         if (! status) {
1346                 tevent_req_error(req, EIO);
1347                 return;
1348         }
1349
1350         ret = recover_check(req);
1351         if (ret != 0) {
1352                 tevent_req_error(req, ret);
1353         }
1354 }
1355
1356 static void recover_done(struct tevent_req *subreq)
1357 {
1358         struct tevent_req *req = tevent_req_callback_data(
1359                 subreq, struct tevent_req);
1360         struct recover_state *state = tevent_req_data(
1361                 req, struct recover_state);
1362         struct ctdbd_context *ctdb = state->ctdb;
1363         bool status;
1364
1365         status = tevent_wakeup_recv(subreq);
1366         TALLOC_FREE(subreq);
1367         if (! status) {
1368                 tevent_req_error(req, EIO);
1369                 return;
1370         }
1371
1372         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1373         ctdb->recovery_end_time = tevent_timeval_current();
1374         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1375
1376         tevent_req_done(req);
1377 }
1378
1379 static bool recover_recv(struct tevent_req *req, int *perr)
1380 {
1381         int err;
1382
1383         if (tevent_req_is_unix_error(req, &err)) {
1384                 if (perr != NULL) {
1385                         *perr = err;
1386                 }
1387                 return false;
1388         }
1389
1390         return true;
1391 }
1392
1393 /*
1394  * Routines for ctdb_req_header
1395  */
1396
1397 static void header_fix_pnn(struct ctdb_req_header *header,
1398                            struct ctdbd_context *ctdb)
1399 {
1400         if (header->srcnode == CTDB_CURRENT_NODE) {
1401                 header->srcnode = ctdb->node_map->pnn;
1402         }
1403
1404         if (header->destnode == CTDB_CURRENT_NODE) {
1405                 header->destnode = ctdb->node_map->pnn;
1406         }
1407 }
1408
1409 static struct ctdb_req_header header_reply_call(
1410                                         struct ctdb_req_header *header,
1411                                         struct ctdbd_context *ctdb)
1412 {
1413         struct ctdb_req_header reply_header;
1414
1415         reply_header = (struct ctdb_req_header) {
1416                 .ctdb_magic = CTDB_MAGIC,
1417                 .ctdb_version = CTDB_PROTOCOL,
1418                 .generation = ctdb->vnn_map->generation,
1419                 .operation = CTDB_REPLY_CALL,
1420                 .destnode = header->srcnode,
1421                 .srcnode = header->destnode,
1422                 .reqid = header->reqid,
1423         };
1424
1425         return reply_header;
1426 }
1427
1428 static struct ctdb_req_header header_reply_control(
1429                                         struct ctdb_req_header *header,
1430                                         struct ctdbd_context *ctdb)
1431 {
1432         struct ctdb_req_header reply_header;
1433
1434         reply_header = (struct ctdb_req_header) {
1435                 .ctdb_magic = CTDB_MAGIC,
1436                 .ctdb_version = CTDB_PROTOCOL,
1437                 .generation = ctdb->vnn_map->generation,
1438                 .operation = CTDB_REPLY_CONTROL,
1439                 .destnode = header->srcnode,
1440                 .srcnode = header->destnode,
1441                 .reqid = header->reqid,
1442         };
1443
1444         return reply_header;
1445 }
1446
1447 static struct ctdb_req_header header_reply_message(
1448                                         struct ctdb_req_header *header,
1449                                         struct ctdbd_context *ctdb)
1450 {
1451         struct ctdb_req_header reply_header;
1452
1453         reply_header = (struct ctdb_req_header) {
1454                 .ctdb_magic = CTDB_MAGIC,
1455                 .ctdb_version = CTDB_PROTOCOL,
1456                 .generation = ctdb->vnn_map->generation,
1457                 .operation = CTDB_REQ_MESSAGE,
1458                 .destnode = header->srcnode,
1459                 .srcnode = header->destnode,
1460                 .reqid = 0,
1461         };
1462
1463         return reply_header;
1464 }
1465
1466 /*
1467  * Client state
1468  */
1469
1470 struct client_state {
1471         struct tevent_context *ev;
1472         int fd;
1473         struct ctdbd_context *ctdb;
1474         int pnn;
1475         pid_t pid;
1476         struct comm_context *comm;
1477         struct srvid_register_state *rstate;
1478         int status;
1479 };
1480
1481 /*
1482  * Send replies to call, controls and messages
1483  */
1484
1485 static void client_reply_done(struct tevent_req *subreq);
1486
1487 static void client_send_call(struct tevent_req *req,
1488                              struct ctdb_req_header *header,
1489                              struct ctdb_reply_call *reply)
1490 {
1491         struct client_state *state = tevent_req_data(
1492                 req, struct client_state);
1493         struct ctdbd_context *ctdb = state->ctdb;
1494         struct tevent_req *subreq;
1495         struct ctdb_req_header reply_header;
1496         uint8_t *buf;
1497         size_t datalen, buflen;
1498         int ret;
1499
1500         reply_header = header_reply_call(header, ctdb);
1501
1502         datalen = ctdb_reply_call_len(&reply_header, reply);
1503         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1504         if (ret != 0) {
1505                 tevent_req_error(req, ret);
1506                 return;
1507         }
1508
1509         ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
1510         if (ret != 0) {
1511                 tevent_req_error(req, ret);
1512                 return;
1513         }
1514
1515         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1516         if (tevent_req_nomem(subreq, req)) {
1517                 return;
1518         }
1519         tevent_req_set_callback(subreq, client_reply_done, req);
1520
1521         talloc_steal(subreq, buf);
1522 }
1523
1524 static void client_send_message(struct tevent_req *req,
1525                                 struct ctdb_req_header *header,
1526                                 struct ctdb_req_message_data *message)
1527 {
1528         struct client_state *state = tevent_req_data(
1529                 req, struct client_state);
1530         struct ctdbd_context *ctdb = state->ctdb;
1531         struct tevent_req *subreq;
1532         struct ctdb_req_header reply_header;
1533         uint8_t *buf;
1534         size_t datalen, buflen;
1535         int ret;
1536
1537         reply_header = header_reply_message(header, ctdb);
1538
1539         datalen = ctdb_req_message_data_len(&reply_header, message);
1540         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1541         if (ret != 0) {
1542                 tevent_req_error(req, ret);
1543                 return;
1544         }
1545
1546         ret = ctdb_req_message_data_push(&reply_header, message,
1547                                          buf, &buflen);
1548         if (ret != 0) {
1549                 tevent_req_error(req, ret);
1550                 return;
1551         }
1552
1553         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1554
1555         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1556         if (tevent_req_nomem(subreq, req)) {
1557                 return;
1558         }
1559         tevent_req_set_callback(subreq, client_reply_done, req);
1560
1561         talloc_steal(subreq, buf);
1562 }
1563
1564 static void client_send_control(struct tevent_req *req,
1565                                 struct ctdb_req_header *header,
1566                                 struct ctdb_reply_control *reply)
1567 {
1568         struct client_state *state = tevent_req_data(
1569                 req, struct client_state);
1570         struct ctdbd_context *ctdb = state->ctdb;
1571         struct tevent_req *subreq;
1572         struct ctdb_req_header reply_header;
1573         uint8_t *buf;
1574         size_t datalen, buflen;
1575         int ret;
1576
1577         reply_header = header_reply_control(header, ctdb);
1578
1579         datalen = ctdb_reply_control_len(&reply_header, reply);
1580         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1581         if (ret != 0) {
1582                 tevent_req_error(req, ret);
1583                 return;
1584         }
1585
1586         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1587         if (ret != 0) {
1588                 tevent_req_error(req, ret);
1589                 return;
1590         }
1591
1592         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1593
1594         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1595         if (tevent_req_nomem(subreq, req)) {
1596                 return;
1597         }
1598         tevent_req_set_callback(subreq, client_reply_done, req);
1599
1600         talloc_steal(subreq, buf);
1601 }
1602
1603 static void client_reply_done(struct tevent_req *subreq)
1604 {
1605         struct tevent_req *req = tevent_req_callback_data(
1606                 subreq, struct tevent_req);
1607         int ret;
1608         bool status;
1609
1610         status = comm_write_recv(subreq, &ret);
1611         TALLOC_FREE(subreq);
1612         if (! status) {
1613                 tevent_req_error(req, ret);
1614         }
1615 }
1616
1617 /*
1618  * Handling protocol - controls
1619  */
1620
1621 static void control_process_exists(TALLOC_CTX *mem_ctx,
1622                                    struct tevent_req *req,
1623                                    struct ctdb_req_header *header,
1624                                    struct ctdb_req_control *request)
1625 {
1626         struct client_state *state = tevent_req_data(
1627                 req, struct client_state);
1628         struct ctdbd_context *ctdb = state->ctdb;
1629         struct client_state *cstate;
1630         struct ctdb_reply_control reply;
1631
1632         reply.rdata.opcode = request->opcode;
1633
1634         cstate = client_find(ctdb, request->rdata.data.pid);
1635         if (cstate == NULL) {
1636                 reply.status = -1;
1637                 reply.errmsg = "No client for PID";
1638         } else {
1639                 reply.status = kill(request->rdata.data.pid, 0);
1640                 reply.errmsg = NULL;
1641         }
1642
1643         client_send_control(req, header, &reply);
1644 }
1645
1646 static void control_ping(TALLOC_CTX *mem_ctx,
1647                          struct tevent_req *req,
1648                          struct ctdb_req_header *header,
1649                          struct ctdb_req_control *request)
1650 {
1651         struct client_state *state = tevent_req_data(
1652                 req, struct client_state);
1653         struct ctdbd_context *ctdb = state->ctdb;
1654         struct ctdb_reply_control reply;
1655
1656         reply.rdata.opcode = request->opcode;
1657         reply.status = ctdb->num_clients;
1658         reply.errmsg = NULL;
1659
1660         client_send_control(req, header, &reply);
1661 }
1662
1663 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1664                               struct tevent_req *req,
1665                               struct ctdb_req_header *header,
1666                               struct ctdb_req_control *request)
1667 {
1668         struct client_state *state = tevent_req_data(
1669                 req, struct client_state);
1670         struct ctdbd_context *ctdb = state->ctdb;
1671         struct ctdb_reply_control reply;
1672         struct database *db;
1673
1674         reply.rdata.opcode = request->opcode;
1675
1676         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1677         if (db == NULL) {
1678                 reply.status = ENOENT;
1679                 reply.errmsg = "Database not found";
1680         } else {
1681                 reply.rdata.data.db_path =
1682                         talloc_strdup(mem_ctx, db->path);
1683                 if (reply.rdata.data.db_path == NULL) {
1684                         reply.status = ENOMEM;
1685                         reply.errmsg = "Memory error";
1686                 } else {
1687                         reply.status = 0;
1688                         reply.errmsg = NULL;
1689                 }
1690         }
1691
1692         client_send_control(req, header, &reply);
1693 }
1694
1695 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1696                               struct tevent_req *req,
1697                               struct ctdb_req_header *header,
1698                               struct ctdb_req_control *request)
1699 {
1700         struct client_state *state = tevent_req_data(
1701                 req, struct client_state);
1702         struct ctdbd_context *ctdb = state->ctdb;
1703         struct ctdb_reply_control reply;
1704         struct ctdb_vnn_map *vnnmap;
1705
1706         reply.rdata.opcode = request->opcode;
1707
1708         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1709         if (vnnmap == NULL) {
1710                 reply.status = ENOMEM;
1711                 reply.errmsg = "Memory error";
1712         } else {
1713                 vnnmap->generation = ctdb->vnn_map->generation;
1714                 vnnmap->size = ctdb->vnn_map->size;
1715                 vnnmap->map = ctdb->vnn_map->map;
1716
1717                 reply.rdata.data.vnnmap = vnnmap;
1718                 reply.status = 0;
1719                 reply.errmsg = NULL;
1720         }
1721
1722         client_send_control(req, header, &reply);
1723 }
1724
1725 static void control_get_debug(TALLOC_CTX *mem_ctx,
1726                               struct tevent_req *req,
1727                               struct ctdb_req_header *header,
1728                               struct ctdb_req_control *request)
1729 {
1730         struct client_state *state = tevent_req_data(
1731                 req, struct client_state);
1732         struct ctdbd_context *ctdb = state->ctdb;
1733         struct ctdb_reply_control reply;
1734
1735         reply.rdata.opcode = request->opcode;
1736         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1737         reply.status = 0;
1738         reply.errmsg = NULL;
1739
1740         client_send_control(req, header, &reply);
1741 }
1742
1743 static void control_set_debug(TALLOC_CTX *mem_ctx,
1744                               struct tevent_req *req,
1745                               struct ctdb_req_header *header,
1746                               struct ctdb_req_control *request)
1747 {
1748         struct client_state *state = tevent_req_data(
1749                 req, struct client_state);
1750         struct ctdbd_context *ctdb = state->ctdb;
1751         struct ctdb_reply_control reply;
1752
1753         ctdb->log_level = (int)request->rdata.data.loglevel;
1754
1755         reply.rdata.opcode = request->opcode;
1756         reply.status = 0;
1757         reply.errmsg = NULL;
1758
1759         client_send_control(req, header, &reply);
1760 }
1761
1762 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1763                               struct tevent_req *req,
1764                                struct ctdb_req_header *header,
1765                               struct ctdb_req_control *request)
1766 {
1767         struct client_state *state = tevent_req_data(
1768                 req, struct client_state);
1769         struct ctdbd_context *ctdb = state->ctdb;
1770         struct ctdb_reply_control reply;
1771         struct ctdb_dbid_map *dbmap;
1772         struct database *db;
1773         int i;
1774
1775         reply.rdata.opcode = request->opcode;
1776
1777         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1778         if (dbmap == NULL) {
1779                 goto fail;
1780         }
1781
1782         dbmap->num = database_count(ctdb->db_map);
1783         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1784         if (dbmap->dbs == NULL) {
1785                 goto fail;
1786         }
1787
1788         db = ctdb->db_map->db;
1789         for (i = 0; i < dbmap->num; i++) {
1790                 dbmap->dbs[i] = (struct ctdb_dbid) {
1791                         .db_id = db->id,
1792                         .flags = db->flags,
1793                 };
1794
1795                 db = db->next;
1796         }
1797
1798         reply.rdata.data.dbmap = dbmap;
1799         reply.status = 0;
1800         reply.errmsg = NULL;
1801         client_send_control(req, header, &reply);
1802         return;
1803
1804 fail:
1805         reply.status = -1;
1806         reply.errmsg = "Memory error";
1807         client_send_control(req, header, &reply);
1808 }
1809
1810 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1811                                 struct tevent_req *req,
1812                                 struct ctdb_req_header *header,
1813                                 struct ctdb_req_control *request)
1814 {
1815         struct client_state *state = tevent_req_data(
1816                 req, struct client_state);
1817         struct ctdbd_context *ctdb = state->ctdb;
1818         struct ctdb_reply_control reply;
1819
1820         reply.rdata.opcode = request->opcode;
1821         reply.status = ctdb->vnn_map->recmode;
1822         reply.errmsg = NULL;
1823
1824         client_send_control(req, header, &reply);
1825 }
1826
1827 struct set_recmode_state {
1828         struct tevent_req *req;
1829         struct ctdbd_context *ctdb;
1830         struct ctdb_req_header header;
1831         struct ctdb_reply_control reply;
1832 };
1833
1834 static void set_recmode_callback(struct tevent_req *subreq)
1835 {
1836         struct set_recmode_state *substate = tevent_req_callback_data(
1837                 subreq, struct set_recmode_state);
1838         bool status;
1839         int ret;
1840
1841         status = recover_recv(subreq, &ret);
1842         TALLOC_FREE(subreq);
1843         if (! status) {
1844                 substate->reply.status = ret;
1845                 substate->reply.errmsg = "recovery failed";
1846         } else {
1847                 substate->reply.status = 0;
1848                 substate->reply.errmsg = NULL;
1849         }
1850
1851         client_send_control(substate->req, &substate->header, &substate->reply);
1852         talloc_free(substate);
1853 }
1854
1855 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1856                                 struct tevent_req *req,
1857                                 struct ctdb_req_header *header,
1858                                 struct ctdb_req_control *request)
1859 {
1860         struct client_state *state = tevent_req_data(
1861                 req, struct client_state);
1862         struct tevent_req *subreq;
1863         struct ctdbd_context *ctdb = state->ctdb;
1864         struct set_recmode_state *substate;
1865         struct ctdb_reply_control reply;
1866
1867         reply.rdata.opcode = request->opcode;
1868
1869         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1870                 reply.status = -1;
1871                 reply.errmsg = "Client cannot set recmode to NORMAL";
1872                 goto fail;
1873         }
1874
1875         substate = talloc_zero(ctdb, struct set_recmode_state);
1876         if (substate == NULL) {
1877                 reply.status = -1;
1878                 reply.errmsg = "Memory error";
1879                 goto fail;
1880         }
1881
1882         substate->req = req;
1883         substate->ctdb = ctdb;
1884         substate->header = *header;
1885         substate->reply.rdata.opcode = request->opcode;
1886
1887         subreq = recover_send(substate, state->ev, state->ctdb);
1888         if (subreq == NULL) {
1889                 talloc_free(substate);
1890                 goto fail;
1891         }
1892         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1893
1894         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1895         return;
1896
1897 fail:
1898         client_send_control(req, header, &reply);
1899
1900 }
1901
1902 static void control_db_attach(TALLOC_CTX *mem_ctx,
1903                               struct tevent_req *req,
1904                               struct ctdb_req_header *header,
1905                               struct ctdb_req_control *request)
1906 {
1907         struct client_state *state = tevent_req_data(
1908                 req, struct client_state);
1909         struct ctdbd_context *ctdb = state->ctdb;
1910         struct ctdb_reply_control reply;
1911         struct database *db;
1912
1913         reply.rdata.opcode = request->opcode;
1914
1915         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
1916                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
1917                         goto done;
1918                 }
1919         }
1920
1921         db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
1922         if (db == NULL) {
1923                 reply.status = -1;
1924                 reply.errmsg = "Failed to attach database";
1925                 client_send_control(req, header, &reply);
1926                 return;
1927         }
1928
1929 done:
1930         reply.rdata.data.db_id = db->id;
1931         reply.status = 0;
1932         reply.errmsg = NULL;
1933         client_send_control(req, header, &reply);
1934 }
1935
1936 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
1937 {
1938         printf("Received a message for SRVID 0x%"PRIx64"\n", srvid);
1939 }
1940
1941 static void control_register_srvid(TALLOC_CTX *mem_ctx,
1942                                    struct tevent_req *req,
1943                                    struct ctdb_req_header *header,
1944                                    struct ctdb_req_control *request)
1945 {
1946         struct client_state *state = tevent_req_data(
1947                 req, struct client_state);
1948         struct ctdbd_context *ctdb = state->ctdb;
1949         struct ctdb_reply_control reply;
1950         int ret;
1951
1952         reply.rdata.opcode = request->opcode;
1953
1954         ret = srvid_register(ctdb->srv, state, request->srvid,
1955                              srvid_handler, state);
1956         if (ret != 0) {
1957                 reply.status = -1;
1958                 reply.errmsg = "Memory error";
1959                 goto fail;
1960         }
1961
1962         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
1963
1964         reply.status = 0;
1965         reply.errmsg = NULL;
1966
1967 fail:
1968         client_send_control(req, header, &reply);
1969 }
1970
1971 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
1972                                      struct tevent_req *req,
1973                                      struct ctdb_req_header *header,
1974                                      struct ctdb_req_control *request)
1975 {
1976         struct client_state *state = tevent_req_data(
1977                 req, struct client_state);
1978         struct ctdbd_context *ctdb = state->ctdb;
1979         struct ctdb_reply_control reply;
1980         int ret;
1981
1982         reply.rdata.opcode = request->opcode;
1983
1984         ret = srvid_deregister(ctdb->srv, request->srvid, state);
1985         if (ret != 0) {
1986                 reply.status = -1;
1987                 reply.errmsg = "srvid not registered";
1988                 goto fail;
1989         }
1990
1991         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
1992
1993         reply.status = 0;
1994         reply.errmsg = NULL;
1995
1996 fail:
1997         client_send_control(req, header, &reply);
1998 }
1999
2000 static void control_get_dbname(TALLOC_CTX *mem_ctx,
2001                                struct tevent_req *req,
2002                                struct ctdb_req_header *header,
2003                                struct ctdb_req_control *request)
2004 {
2005         struct client_state *state = tevent_req_data(
2006                 req, struct client_state);
2007         struct ctdbd_context *ctdb = state->ctdb;
2008         struct ctdb_reply_control reply;
2009         struct database *db;
2010
2011         reply.rdata.opcode = request->opcode;
2012
2013         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2014         if (db == NULL) {
2015                 reply.status = ENOENT;
2016                 reply.errmsg = "Database not found";
2017         } else {
2018                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
2019                 if (reply.rdata.data.db_name == NULL) {
2020                         reply.status = ENOMEM;
2021                         reply.errmsg = "Memory error";
2022                 } else {
2023                         reply.status = 0;
2024                         reply.errmsg = NULL;
2025                 }
2026         }
2027
2028         client_send_control(req, header, &reply);
2029 }
2030
2031 static void control_get_pid(TALLOC_CTX *mem_ctx,
2032                             struct tevent_req *req,
2033                             struct ctdb_req_header *header,
2034                             struct ctdb_req_control *request)
2035 {
2036         struct ctdb_reply_control reply;
2037
2038         reply.rdata.opcode = request->opcode;
2039         reply.status = getpid();
2040         reply.errmsg = NULL;
2041
2042         client_send_control(req, header, &reply);
2043 }
2044
2045 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
2046                                   struct tevent_req *req,
2047                                   struct ctdb_req_header *header,
2048                                   struct ctdb_req_control *request)
2049 {
2050         struct client_state *state = tevent_req_data(
2051                 req, struct client_state);
2052         struct ctdbd_context *ctdb = state->ctdb;
2053         struct ctdb_reply_control reply;
2054
2055         reply.rdata.opcode = request->opcode;
2056         reply.status = ctdb->node_map->recmaster;
2057         reply.errmsg = NULL;
2058
2059         client_send_control(req, header, &reply);
2060 }
2061
2062 static void control_get_pnn(TALLOC_CTX *mem_ctx,
2063                             struct tevent_req *req,
2064                             struct ctdb_req_header *header,
2065                             struct ctdb_req_control *request)
2066 {
2067         struct ctdb_reply_control reply;
2068
2069         reply.rdata.opcode = request->opcode;
2070         reply.status = header->destnode;
2071         reply.errmsg = NULL;
2072
2073         client_send_control(req, header, &reply);
2074 }
2075
2076 static void control_shutdown(TALLOC_CTX *mem_ctx,
2077                              struct tevent_req *req,
2078                              struct ctdb_req_header *hdr,
2079                              struct ctdb_req_control *request)
2080 {
2081         struct client_state *state = tevent_req_data(
2082                 req, struct client_state);
2083
2084         state->status = 99;
2085 }
2086
2087 static void control_set_tunable(TALLOC_CTX *mem_ctx,
2088                                 struct tevent_req *req,
2089                                 struct ctdb_req_header *header,
2090                                 struct ctdb_req_control *request)
2091 {
2092         struct client_state *state = tevent_req_data(
2093                 req, struct client_state);
2094         struct ctdbd_context *ctdb = state->ctdb;
2095         struct ctdb_reply_control reply;
2096         bool ret, obsolete;
2097
2098         reply.rdata.opcode = request->opcode;
2099         reply.errmsg = NULL;
2100
2101         ret = ctdb_tunable_set_value(&ctdb->tun_list,
2102                                      request->rdata.data.tunable->name,
2103                                      request->rdata.data.tunable->value,
2104                                      &obsolete);
2105         if (! ret) {
2106                 reply.status = -1;
2107         } else if (obsolete) {
2108                 reply.status = 1;
2109         } else {
2110                 reply.status = 0;
2111         }
2112
2113         client_send_control(req, header, &reply);
2114 }
2115
2116 static void control_get_tunable(TALLOC_CTX *mem_ctx,
2117                                 struct tevent_req *req,
2118                                 struct ctdb_req_header *header,
2119                                 struct ctdb_req_control *request)
2120 {
2121         struct client_state *state = tevent_req_data(
2122                 req, struct client_state);
2123         struct ctdbd_context *ctdb = state->ctdb;
2124         struct ctdb_reply_control reply;
2125         uint32_t value;
2126         bool ret;
2127
2128         reply.rdata.opcode = request->opcode;
2129         reply.errmsg = NULL;
2130
2131         ret = ctdb_tunable_get_value(&ctdb->tun_list,
2132                                      request->rdata.data.tun_var, &value);
2133         if (! ret) {
2134                 reply.status = -1;
2135         } else {
2136                 reply.rdata.data.tun_value = value;
2137                 reply.status = 0;
2138         }
2139
2140         client_send_control(req, header, &reply);
2141 }
2142
2143 static void control_list_tunables(TALLOC_CTX *mem_ctx,
2144                                   struct tevent_req *req,
2145                                   struct ctdb_req_header *header,
2146                                   struct ctdb_req_control *request)
2147 {
2148         struct ctdb_reply_control reply;
2149         struct ctdb_var_list *var_list;
2150
2151         reply.rdata.opcode = request->opcode;
2152         reply.errmsg = NULL;
2153
2154         var_list = ctdb_tunable_names(mem_ctx);
2155         if (var_list == NULL) {
2156                 reply.status = -1;
2157         } else {
2158                 reply.rdata.data.tun_var_list = var_list;
2159                 reply.status = 0;
2160         }
2161
2162         client_send_control(req, header, &reply);
2163 }
2164
2165 static void control_modify_flags(TALLOC_CTX *mem_ctx,
2166                                  struct tevent_req *req,
2167                                  struct ctdb_req_header *header,
2168                                  struct ctdb_req_control *request)
2169 {
2170         struct client_state *state = tevent_req_data(
2171                 req, struct client_state);
2172         struct ctdbd_context *ctdb = state->ctdb;
2173         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
2174         struct ctdb_reply_control reply;
2175         struct node *node;
2176
2177         reply.rdata.opcode = request->opcode;
2178
2179         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
2180             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2181                 DEBUG(DEBUG_INFO,
2182                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
2183                 reply.status = EINVAL;
2184                 reply.errmsg = "Failed to MODIFY_FLAGS";
2185                 client_send_control(req, header, &reply);
2186                 return;
2187         }
2188
2189         /* There's all sorts of broadcast weirdness here.  Only change
2190          * the specified node, not the destination node of the
2191          * control. */
2192         node = &ctdb->node_map->node[change->pnn];
2193
2194         if ((node->flags &
2195              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
2196             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2197                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
2198                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
2199                 goto done;
2200         }
2201
2202         if ((node->flags &
2203              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
2204             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
2205                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
2206                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
2207                 goto done;
2208         }
2209
2210         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
2211
2212 done:
2213         reply.status = 0;
2214         reply.errmsg = NULL;
2215         client_send_control(req, header, &reply);
2216 }
2217
2218 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
2219                                      struct tevent_req *req,
2220                                      struct ctdb_req_header *header,
2221                                      struct ctdb_req_control *request)
2222 {
2223         struct client_state *state = tevent_req_data(
2224                 req, struct client_state);
2225         struct ctdbd_context *ctdb = state->ctdb;
2226         struct ctdb_reply_control reply;
2227
2228         reply.rdata.opcode = request->opcode;
2229         reply.rdata.data.tun_list = &ctdb->tun_list;
2230         reply.status = 0;
2231         reply.errmsg = NULL;
2232
2233         client_send_control(req, header, &reply);
2234 }
2235
2236 static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
2237                                          struct tevent_req *req,
2238                                          struct ctdb_req_header *header,
2239                                          struct ctdb_req_control *request)
2240 {
2241         struct client_state *state = tevent_req_data(
2242                 req, struct client_state);
2243         struct ctdbd_context *ctdb = state->ctdb;
2244         struct ctdb_reply_control reply;
2245         struct database *db;
2246
2247         reply.rdata.opcode = request->opcode;
2248
2249         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2250                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2251                         goto done;
2252                 }
2253         }
2254
2255         db = database_new(ctdb->db_map, request->rdata.data.db_name,
2256                           CTDB_DB_FLAGS_PERSISTENT);
2257         if (db == NULL) {
2258                 reply.status = -1;
2259                 reply.errmsg = "Failed to attach database";
2260                 client_send_control(req, header, &reply);
2261                 return;
2262         }
2263
2264 done:
2265         reply.rdata.data.db_id = db->id;
2266         reply.status = 0;
2267         reply.errmsg = NULL;
2268         client_send_control(req, header, &reply);
2269 }
2270
2271 static void control_uptime(TALLOC_CTX *mem_ctx,
2272                            struct tevent_req *req,
2273                            struct ctdb_req_header *header,
2274                            struct ctdb_req_control *request)
2275 {
2276         struct client_state *state = tevent_req_data(
2277                 req, struct client_state);
2278         struct ctdbd_context *ctdb = state->ctdb;
2279         struct ctdb_reply_control reply;
2280         struct ctdb_uptime *uptime;;
2281
2282         reply.rdata.opcode = request->opcode;
2283
2284         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
2285         if (uptime == NULL) {
2286                 goto fail;
2287         }
2288
2289         uptime->current_time = tevent_timeval_current();
2290         uptime->ctdbd_start_time = ctdb->start_time;
2291         uptime->last_recovery_started = ctdb->recovery_start_time;
2292         uptime->last_recovery_finished = ctdb->recovery_end_time;
2293
2294         reply.rdata.data.uptime = uptime;
2295         reply.status = 0;
2296         reply.errmsg = NULL;
2297         client_send_control(req, header, &reply);
2298         return;
2299
2300 fail:
2301         reply.status = -1;
2302         reply.errmsg = "Memory error";
2303         client_send_control(req, header, &reply);
2304 }
2305
2306 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
2307                                       struct tevent_req *req,
2308                                       struct ctdb_req_header *header,
2309                                       struct ctdb_req_control *request)
2310 {
2311         struct client_state *state = tevent_req_data(
2312                 req, struct client_state);
2313         struct ctdbd_context *ctdb = state->ctdb;
2314         struct ctdb_reply_control reply;
2315         struct ctdb_node_map *nodemap;
2316         struct node_map *node_map = ctdb->node_map;
2317         int i;
2318
2319         reply.rdata.opcode = request->opcode;
2320
2321         nodemap = read_nodes_file(mem_ctx, header->destnode);
2322         if (nodemap == NULL) {
2323                 goto fail;
2324         }
2325
2326         for (i=0; i<nodemap->num; i++) {
2327                 struct node *node;
2328
2329                 if (i < node_map->num_nodes &&
2330                     ctdb_sock_addr_same(&nodemap->node[i].addr,
2331                                         &node_map->node[i].addr)) {
2332                         continue;
2333                 }
2334
2335                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2336                         int ret;
2337
2338                         node = &node_map->node[i];
2339
2340                         node->flags |= NODE_FLAGS_DELETED;
2341                         ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
2342                                                          false);
2343                         if (ret != 0) {
2344                                 /* Can't happen, but Coverity... */
2345                                 goto fail;
2346                         }
2347
2348                         continue;
2349                 }
2350
2351                 if (i < node_map->num_nodes &&
2352                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
2353                         node = &node_map->node[i];
2354
2355                         node->flags &= ~NODE_FLAGS_DELETED;
2356                         node->addr = nodemap->node[i].addr;
2357
2358                         continue;
2359                 }
2360
2361                 node_map->node = talloc_realloc(node_map, node_map->node,
2362                                                 struct node,
2363                                                 node_map->num_nodes+1);
2364                 if (node_map->node == NULL) {
2365                         goto fail;
2366                 }
2367                 node = &node_map->node[node_map->num_nodes];
2368
2369                 node->addr = nodemap->node[i].addr;
2370                 node->pnn = nodemap->node[i].pnn;
2371                 node->flags = 0;
2372                 node->capabilities = CTDB_CAP_DEFAULT;
2373                 node->recovery_disabled = false;
2374                 node->recovery_substate = NULL;
2375
2376                 node_map->num_nodes += 1;
2377         }
2378
2379         talloc_free(nodemap);
2380
2381         reply.status = 0;
2382         reply.errmsg = NULL;
2383         client_send_control(req, header, &reply);
2384         return;
2385
2386 fail:
2387         reply.status = -1;
2388         reply.errmsg = "Memory error";
2389         client_send_control(req, header, &reply);
2390 }
2391
2392 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2393                                      struct tevent_req *req,
2394                                      struct ctdb_req_header *header,
2395                                      struct ctdb_req_control *request)
2396 {
2397         struct client_state *state = tevent_req_data(
2398                 req, struct client_state);
2399         struct ctdbd_context *ctdb = state->ctdb;
2400         struct ctdb_reply_control reply;
2401         struct node *node;
2402         uint32_t caps = 0;
2403
2404         reply.rdata.opcode = request->opcode;
2405
2406         node = &ctdb->node_map->node[header->destnode];
2407         caps = node->capabilities;
2408
2409         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2410                 /* Don't send reply */
2411                 return;
2412         }
2413
2414         reply.rdata.data.caps = caps;
2415         reply.status = 0;
2416         reply.errmsg = NULL;
2417
2418         client_send_control(req, header, &reply);
2419 }
2420
2421 static void control_release_ip(TALLOC_CTX *mem_ctx,
2422                                struct tevent_req *req,
2423                                struct ctdb_req_header *header,
2424                                struct ctdb_req_control *request)
2425 {
2426         struct client_state *state = tevent_req_data(
2427                 req, struct client_state);
2428         struct ctdbd_context *ctdb = state->ctdb;
2429         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2430         struct ctdb_reply_control reply;
2431         struct ctdb_public_ip_list *ips = NULL;
2432         struct ctdb_public_ip *t = NULL;
2433         int i;
2434
2435         reply.rdata.opcode = request->opcode;
2436
2437         if (ctdb->known_ips == NULL) {
2438                 D_INFO("RELEASE_IP %s - not a public IP\n",
2439                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2440                 goto done;
2441         }
2442
2443         ips = &ctdb->known_ips[header->destnode];
2444
2445         t = NULL;
2446         for (i = 0; i < ips->num; i++) {
2447                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2448                         t = &ips->ip[i];
2449                         break;
2450                 }
2451         }
2452         if (t == NULL) {
2453                 D_INFO("RELEASE_IP %s - not a public IP\n",
2454                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2455                 goto done;
2456         }
2457
2458         if (t->pnn != header->destnode) {
2459                 if (header->destnode == ip->pnn) {
2460                         D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2461                               ctdb_sock_addr_to_string(mem_ctx,
2462                                                        &ip->addr, false),
2463                               ip->pnn);
2464                         reply.status = -1;
2465                         reply.errmsg = "RELEASE_IP to TAKE_IP node";
2466                         client_send_control(req, header, &reply);
2467                         return;
2468                 }
2469
2470                 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2471                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2472                        ip->pnn);
2473                 t->pnn = ip->pnn;
2474         } else {
2475                 D_NOTICE("RELEASE_IP %s - to node %d\n",
2476                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2477                           ip->pnn);
2478                 t->pnn = ip->pnn;
2479         }
2480
2481 done:
2482         reply.status = 0;
2483         reply.errmsg = NULL;
2484         client_send_control(req, header, &reply);
2485 }
2486
2487 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2488                                 struct tevent_req *req,
2489                                 struct ctdb_req_header *header,
2490                                 struct ctdb_req_control *request)
2491 {
2492         struct client_state *state = tevent_req_data(
2493                 req, struct client_state);
2494         struct ctdbd_context *ctdb = state->ctdb;
2495         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2496         struct ctdb_reply_control reply;
2497         struct ctdb_public_ip_list *ips = NULL;
2498         struct ctdb_public_ip *t = NULL;
2499         int i;
2500
2501         reply.rdata.opcode = request->opcode;
2502
2503         if (ctdb->known_ips == NULL) {
2504                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2505                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2506                 goto done;
2507         }
2508
2509         ips = &ctdb->known_ips[header->destnode];
2510
2511         t = NULL;
2512         for (i = 0; i < ips->num; i++) {
2513                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2514                         t = &ips->ip[i];
2515                         break;
2516                 }
2517         }
2518         if (t == NULL) {
2519                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2520                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2521                 goto done;
2522         }
2523
2524         if (t->pnn == header->destnode) {
2525                 D_INFO("TAKEOVER_IP %s - redundant\n",
2526                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2527         } else {
2528                 D_NOTICE("TAKEOVER_IP %s\n",
2529                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2530                 t->pnn = ip->pnn;
2531         }
2532
2533 done:
2534         reply.status = 0;
2535         reply.errmsg = NULL;
2536         client_send_control(req, header, &reply);
2537 }
2538
2539 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2540                                    struct tevent_req *req,
2541                                    struct ctdb_req_header *header,
2542                                    struct ctdb_req_control *request)
2543 {
2544         struct client_state *state = tevent_req_data(
2545                 req, struct client_state);
2546         struct ctdbd_context *ctdb = state->ctdb;
2547         struct ctdb_reply_control reply;
2548         struct ctdb_public_ip_list *ips = NULL;
2549
2550         reply.rdata.opcode = request->opcode;
2551
2552         if (ctdb->known_ips == NULL) {
2553                 /* No IPs defined so create a dummy empty struct and ship it */
2554                 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2555                 if (ips == NULL) {
2556                         reply.status = ENOMEM;
2557                         reply.errmsg = "Memory error";
2558                         goto done;
2559                 }
2560                 goto ok;
2561         }
2562
2563         ips = &ctdb->known_ips[header->destnode];
2564
2565         if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2566                 /* If runstate is not RUNNING or a node is then return
2567                  * no available IPs.  Don't worry about interface
2568                  * states here - we're not faking down to that level.
2569                  */
2570                 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING) {
2571                         /* No available IPs: return dummy empty struct */
2572                         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2573                         if (ips == NULL) {
2574                                 reply.status = ENOMEM;
2575                                 reply.errmsg = "Memory error";
2576                                 goto done;
2577                         }
2578                 }
2579         }
2580
2581 ok:
2582         reply.rdata.data.pubip_list = ips;
2583         reply.status = 0;
2584         reply.errmsg = NULL;
2585
2586 done:
2587         client_send_control(req, header, &reply);
2588 }
2589
2590 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2591                                 struct tevent_req *req,
2592                                 struct ctdb_req_header *header,
2593                                 struct ctdb_req_control *request)
2594 {
2595         struct client_state *state = tevent_req_data(
2596                 req, struct client_state);
2597         struct ctdbd_context *ctdb = state->ctdb;
2598         struct ctdb_reply_control reply;
2599         struct ctdb_node_map *nodemap;
2600         struct node *node;
2601         int i;
2602
2603         reply.rdata.opcode = request->opcode;
2604
2605         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2606         if (nodemap == NULL) {
2607                 goto fail;
2608         }
2609
2610         nodemap->num = ctdb->node_map->num_nodes;
2611         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2612                                      nodemap->num);
2613         if (nodemap->node == NULL) {
2614                 goto fail;
2615         }
2616
2617         for (i=0; i<nodemap->num; i++) {
2618                 node = &ctdb->node_map->node[i];
2619                 nodemap->node[i] = (struct ctdb_node_and_flags) {
2620                         .pnn = node->pnn,
2621                         .flags = node->flags,
2622                         .addr = node->addr,
2623                 };
2624         }
2625
2626         reply.rdata.data.nodemap = nodemap;
2627         reply.status = 0;
2628         reply.errmsg = NULL;
2629         client_send_control(req, header, &reply);
2630         return;
2631
2632 fail:
2633         reply.status = -1;
2634         reply.errmsg = "Memory error";
2635         client_send_control(req, header, &reply);
2636 }
2637
2638 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2639                                      struct tevent_req *req,
2640                                      struct ctdb_req_header *header,
2641                                      struct ctdb_req_control *request)
2642 {
2643         struct client_state *state = tevent_req_data(
2644                 req, struct client_state);
2645         struct ctdbd_context *ctdb = state->ctdb;
2646         struct ctdb_reply_control reply;
2647
2648         reply.rdata.opcode = request->opcode;
2649
2650         if (ctdb->reclock != NULL) {
2651                 reply.rdata.data.reclock_file =
2652                         talloc_strdup(mem_ctx, ctdb->reclock);
2653                 if (reply.rdata.data.reclock_file == NULL) {
2654                         reply.status = ENOMEM;
2655                         reply.errmsg = "Memory error";
2656                         goto done;
2657                 }
2658         } else {
2659                 reply.rdata.data.reclock_file = NULL;
2660         }
2661
2662         reply.status = 0;
2663         reply.errmsg = NULL;
2664
2665 done:
2666         client_send_control(req, header, &reply);
2667 }
2668
2669 static void control_stop_node(TALLOC_CTX *mem_ctx,
2670                               struct tevent_req *req,
2671                               struct ctdb_req_header *header,
2672                               struct ctdb_req_control *request)
2673 {
2674         struct client_state *state = tevent_req_data(
2675                 req, struct client_state);
2676         struct ctdbd_context *ctdb = state->ctdb;
2677         struct ctdb_reply_control reply;
2678
2679         reply.rdata.opcode = request->opcode;
2680
2681         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2682         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2683
2684         reply.status = 0;
2685         reply.errmsg = NULL;
2686
2687         client_send_control(req, header, &reply);
2688         return;
2689 }
2690
2691 static void control_continue_node(TALLOC_CTX *mem_ctx,
2692                                   struct tevent_req *req,
2693                                   struct ctdb_req_header *header,
2694                                   struct ctdb_req_control *request)
2695 {
2696         struct client_state *state = tevent_req_data(
2697                 req, struct client_state);
2698         struct ctdbd_context *ctdb = state->ctdb;
2699         struct ctdb_reply_control reply;
2700
2701         reply.rdata.opcode = request->opcode;
2702
2703         DEBUG(DEBUG_INFO, ("Continue node\n"));
2704         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2705
2706         reply.status = 0;
2707         reply.errmsg = NULL;
2708
2709         client_send_control(req, header, &reply);
2710         return;
2711 }
2712
2713 static void set_ban_state_callback(struct tevent_req *subreq)
2714 {
2715         struct node *node = tevent_req_callback_data(
2716                 subreq, struct node);
2717         bool status;
2718
2719         status = tevent_wakeup_recv(subreq);
2720         TALLOC_FREE(subreq);
2721         if (! status) {
2722                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2723         }
2724
2725         node->flags &= ~NODE_FLAGS_BANNED;
2726 }
2727
2728 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2729                                   struct tevent_req *req,
2730                                   struct ctdb_req_header *header,
2731                                   struct ctdb_req_control *request)
2732 {
2733         struct client_state *state = tevent_req_data(
2734                 req, struct client_state);
2735         struct tevent_req *subreq;
2736         struct ctdbd_context *ctdb = state->ctdb;
2737         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2738         struct ctdb_reply_control reply;
2739         struct node *node;
2740
2741         reply.rdata.opcode = request->opcode;
2742
2743         if (ban->pnn != header->destnode) {
2744                 DEBUG(DEBUG_INFO,
2745                       ("SET_BAN_STATE control for PNN %d rejected\n",
2746                        ban->pnn));
2747                 reply.status = EINVAL;
2748                 goto fail;
2749         }
2750
2751         node = &ctdb->node_map->node[header->destnode];
2752
2753         if (ban->time == 0) {
2754                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2755                 node->flags &= ~NODE_FLAGS_BANNED;
2756                 goto done;
2757         }
2758
2759         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2760                                     tevent_timeval_current_ofs(
2761                                             ban->time, 0));
2762         if (subreq == NULL) {
2763                 reply.status = ENOMEM;
2764                 goto fail;
2765         }
2766         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2767
2768         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2769         node->flags |= NODE_FLAGS_BANNED;
2770         ctdb->vnn_map->generation = INVALID_GENERATION;
2771
2772 done:
2773         reply.status = 0;
2774         reply.errmsg = NULL;
2775
2776         client_send_control(req, header, &reply);
2777         return;
2778
2779 fail:
2780         reply.errmsg = "Failed to ban node";
2781 }
2782
2783 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
2784                                struct tevent_req *req,
2785                                struct ctdb_req_header *header,
2786                                struct ctdb_req_control *request)
2787 {
2788         struct client_state *state = tevent_req_data(
2789                 req, struct client_state);
2790         struct ctdbd_context *ctdb = state->ctdb;
2791         struct ctdb_reply_control reply;
2792         struct database *db;
2793         int ret;
2794
2795         reply.rdata.opcode = request->opcode;
2796
2797         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2798         if (db == NULL) {
2799                 reply.status = ENOENT;
2800                 reply.errmsg = "Database not found";
2801         } else {
2802                 uint64_t seqnum;
2803
2804                 ret = database_seqnum(db, &seqnum);
2805                 if (ret == 0) {
2806                         reply.rdata.data.seqnum = seqnum;
2807                         reply.status = 0;
2808                         reply.errmsg = NULL;
2809                 } else {
2810                         reply.status = ret;
2811                         reply.errmsg = "Failed to get seqnum";
2812                 }
2813         }
2814
2815         client_send_control(req, header, &reply);
2816 }
2817
2818 static void control_db_get_health(TALLOC_CTX *mem_ctx,
2819                                   struct tevent_req *req,
2820                                   struct ctdb_req_header *header,
2821                                   struct ctdb_req_control *request)
2822 {
2823         struct client_state *state = tevent_req_data(
2824                 req, struct client_state);
2825         struct ctdbd_context *ctdb = state->ctdb;
2826         struct ctdb_reply_control reply;
2827         struct database *db;
2828
2829         reply.rdata.opcode = request->opcode;
2830
2831         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2832         if (db == NULL) {
2833                 reply.status = ENOENT;
2834                 reply.errmsg = "Database not found";
2835         } else {
2836                 reply.rdata.data.reason = NULL;
2837                 reply.status = 0;
2838                 reply.errmsg = NULL;
2839         }
2840
2841         client_send_control(req, header, &reply);
2842 }
2843
2844 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
2845                                                    struct ctdbd_context *ctdb)
2846 {
2847         struct ctdb_iface_list *iface_list;
2848         struct interface *iface;
2849         int i;
2850
2851         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
2852         if (iface_list == NULL) {
2853                 goto done;
2854         }
2855
2856         iface_list->num = ctdb->iface_map->num;
2857         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
2858                                          iface_list->num);
2859         if (iface_list->iface == NULL) {
2860                 TALLOC_FREE(iface_list);
2861                 goto done;
2862         }
2863
2864         for (i=0; i<iface_list->num; i++) {
2865                 iface = &ctdb->iface_map->iface[i];
2866                 iface_list->iface[i] = (struct ctdb_iface) {
2867                         .link_state = iface->link_up,
2868                         .references = iface->references,
2869                 };
2870                 strlcpy(iface_list->iface[i].name, iface->name,
2871                         sizeof(iface_list->iface[i].name));
2872         }
2873
2874 done:
2875         return iface_list;
2876 }
2877
2878 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
2879                                        struct tevent_req *req,
2880                                        struct ctdb_req_header *header,
2881                                        struct ctdb_req_control *request)
2882 {
2883         struct client_state *state = tevent_req_data(
2884                 req, struct client_state);
2885         struct ctdbd_context *ctdb = state->ctdb;
2886         struct ctdb_reply_control reply;
2887         ctdb_sock_addr *addr = request->rdata.data.addr;
2888         struct ctdb_public_ip_list *known = NULL;
2889         struct ctdb_public_ip_info *info = NULL;
2890         unsigned i;
2891
2892         reply.rdata.opcode = request->opcode;
2893
2894         info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
2895         if (info == NULL) {
2896                 reply.status = ENOMEM;
2897                 reply.errmsg = "Memory error";
2898                 goto done;
2899         }
2900
2901         reply.rdata.data.ipinfo = info;
2902
2903         if (ctdb->known_ips != NULL) {
2904                 known = &ctdb->known_ips[header->destnode];
2905         } else {
2906                 /* No IPs defined so create a dummy empty struct and
2907                  * fall through.  The given IP won't be matched
2908                  * below...
2909                  */
2910                 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2911                 if (known == NULL) {
2912                         reply.status = ENOMEM;
2913                         reply.errmsg = "Memory error";
2914                         goto done;
2915                 }
2916         }
2917
2918         for (i = 0; i < known->num; i++) {
2919                 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
2920                                            addr)) {
2921                         break;
2922                 }
2923         }
2924
2925         if (i == known->num) {
2926                 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
2927                       ctdb_sock_addr_to_string(mem_ctx, addr, false));
2928                 reply.status = -1;
2929                 reply.errmsg = "Unknown address";
2930                 goto done;
2931         }
2932
2933         info->ip = known->ip[i];
2934
2935         /* The fake PUBLICIPS stanza and resulting known_ips data
2936          * don't know anything about interfaces, so completely fake
2937          * this.
2938          */
2939         info->active_idx = 0;
2940
2941         info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
2942         if (info->ifaces == NULL) {
2943                 reply.status = ENOMEM;
2944                 reply.errmsg = "Memory error";
2945                 goto done;
2946         }
2947
2948         reply.status = 0;
2949         reply.errmsg = NULL;
2950
2951 done:
2952         client_send_control(req, header, &reply);
2953 }
2954
2955 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
2956                                struct tevent_req *req,
2957                                struct ctdb_req_header *header,
2958                                struct ctdb_req_control *request)
2959 {
2960         struct client_state *state = tevent_req_data(
2961                 req, struct client_state);
2962         struct ctdbd_context *ctdb = state->ctdb;
2963         struct ctdb_reply_control reply;
2964         struct ctdb_iface_list *iface_list;
2965
2966         reply.rdata.opcode = request->opcode;
2967
2968         iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
2969         if (iface_list == NULL) {
2970                 goto fail;
2971         }
2972
2973         reply.rdata.data.iface_list = iface_list;
2974         reply.status = 0;
2975         reply.errmsg = NULL;
2976         client_send_control(req, header, &reply);
2977         return;
2978
2979 fail:
2980         reply.status = -1;
2981         reply.errmsg = "Memory error";
2982         client_send_control(req, header, &reply);
2983 }
2984
2985 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
2986                                          struct tevent_req *req,
2987                                          struct ctdb_req_header *header,
2988                                          struct ctdb_req_control *request)
2989 {
2990         struct client_state *state = tevent_req_data(
2991                 req, struct client_state);
2992         struct ctdbd_context *ctdb = state->ctdb;
2993         struct ctdb_reply_control reply;
2994         struct ctdb_iface *in_iface;
2995         struct interface *iface = NULL;
2996         bool link_up = false;
2997         int i;
2998
2999         reply.rdata.opcode = request->opcode;
3000
3001         in_iface = request->rdata.data.iface;
3002
3003         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
3004                 reply.errmsg = "interface name not terminated";
3005                 goto fail;
3006         }
3007
3008         switch (in_iface->link_state) {
3009                 case 0:
3010                         link_up = false;
3011                         break;
3012
3013                 case 1:
3014                         link_up = true;
3015                         break;
3016
3017                 default:
3018                         reply.errmsg = "invalid link state";
3019                         goto fail;
3020         }
3021
3022         if (in_iface->references != 0) {
3023                 reply.errmsg = "references should be 0";
3024                 goto fail;
3025         }
3026
3027         for (i=0; i<ctdb->iface_map->num; i++) {
3028                 if (strcmp(ctdb->iface_map->iface[i].name,
3029                            in_iface->name) == 0) {
3030                         iface = &ctdb->iface_map->iface[i];
3031                         break;
3032                 }
3033         }
3034
3035         if (iface == NULL) {
3036                 reply.errmsg = "interface not found";
3037                 goto fail;
3038         }
3039
3040         iface->link_up = link_up;
3041
3042         reply.status = 0;
3043         reply.errmsg = NULL;
3044         client_send_control(req, header, &reply);
3045         return;
3046
3047 fail:
3048         reply.status = -1;
3049         client_send_control(req, header, &reply);
3050 }
3051
3052 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
3053                                     struct tevent_req *req,
3054                                     struct ctdb_req_header *header,
3055                                     struct ctdb_req_control *request)
3056 {
3057         struct client_state *state = tevent_req_data(
3058                 req, struct client_state);
3059         struct ctdbd_context *ctdb = state->ctdb;
3060         struct ctdb_reply_control reply;
3061         struct database *db;
3062
3063         reply.rdata.opcode = request->opcode;
3064
3065         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3066         if (db == NULL) {
3067                 reply.status = ENOENT;
3068                 reply.errmsg = "Database not found";
3069                 goto done;
3070         }
3071
3072         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3073                 reply.status = EINVAL;
3074                 reply.errmsg = "Can not set READONLY on persistent db";
3075                 goto done;
3076         }
3077
3078         db->flags |= CTDB_DB_FLAGS_READONLY;
3079         reply.status = 0;
3080         reply.errmsg = NULL;
3081
3082 done:
3083         client_send_control(req, header, &reply);
3084 }
3085
3086 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
3087                                     struct tevent_req *req,
3088                                     struct ctdb_req_header *header,
3089                                     struct ctdb_req_control *request)
3090 {
3091         struct client_state *state = tevent_req_data(
3092                 req, struct client_state);
3093         struct ctdbd_context *ctdb = state->ctdb;
3094         struct ctdb_reply_control reply;
3095         struct database *db;
3096
3097         reply.rdata.opcode = request->opcode;
3098
3099         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3100         if (db == NULL) {
3101                 reply.status = ENOENT;
3102                 reply.errmsg = "Database not found";
3103                 goto done;
3104         }
3105
3106         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3107                 reply.status = EINVAL;
3108                 reply.errmsg = "Can not set STICKY on persistent db";
3109                 goto done;
3110         }
3111
3112         db->flags |= CTDB_DB_FLAGS_STICKY;
3113         reply.status = 0;
3114         reply.errmsg = NULL;
3115
3116 done:
3117         client_send_control(req, header, &reply);
3118 }
3119
3120 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
3121                                   struct tevent_req *req,
3122                                   struct ctdb_req_header *header,
3123                                   struct ctdb_req_control *request)
3124 {
3125         struct ctdb_reply_control reply;
3126
3127         /* Always succeed */
3128         reply.rdata.opcode = request->opcode;
3129         reply.status = 0;
3130         reply.errmsg = NULL;
3131
3132         client_send_control(req, header, &reply);
3133 }
3134
3135 static void control_get_runstate(TALLOC_CTX *mem_ctx,
3136                                  struct tevent_req *req,
3137                                  struct ctdb_req_header *header,
3138                                  struct ctdb_req_control *request)
3139 {
3140         struct client_state *state = tevent_req_data(
3141                 req, struct client_state);
3142         struct ctdbd_context *ctdb = state->ctdb;
3143         struct ctdb_reply_control reply;
3144
3145         reply.rdata.opcode = request->opcode;
3146         reply.rdata.data.runstate = ctdb->runstate;
3147         reply.status = 0;
3148         reply.errmsg = NULL;
3149
3150         client_send_control(req, header, &reply);
3151 }
3152
3153 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
3154                                    struct tevent_req *req,
3155                                    struct ctdb_req_header *header,
3156                                    struct ctdb_req_control *request)
3157 {
3158         struct ctdb_reply_control reply;
3159         struct ctdb_node_map *nodemap;
3160
3161         reply.rdata.opcode = request->opcode;
3162
3163         nodemap = read_nodes_file(mem_ctx, header->destnode);
3164         if (nodemap == NULL) {
3165                 goto fail;
3166         }
3167
3168         reply.rdata.data.nodemap = nodemap;
3169         reply.status = 0;
3170         reply.errmsg = NULL;
3171         client_send_control(req, header, &reply);
3172         return;
3173
3174 fail:
3175         reply.status = -1;
3176         reply.errmsg = "Failed to read nodes file";
3177         client_send_control(req, header, &reply);
3178 }
3179
3180 static void control_db_open_flags(TALLOC_CTX *mem_ctx,
3181                                   struct tevent_req *req,
3182                                   struct ctdb_req_header *header,
3183                                   struct ctdb_req_control *request)
3184 {
3185         struct client_state *state = tevent_req_data(
3186                 req, struct client_state);
3187         struct ctdbd_context *ctdb = state->ctdb;
3188         struct ctdb_reply_control reply;
3189         struct database *db;
3190
3191         reply.rdata.opcode = request->opcode;
3192
3193         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3194         if (db == NULL) {
3195                 reply.status = ENOENT;
3196                 reply.errmsg = "Database not found";
3197         } else {
3198                 reply.rdata.data.tdb_flags = database_flags(db->flags);
3199                 reply.status = 0;
3200                 reply.errmsg = NULL;
3201         }
3202
3203         client_send_control(req, header, &reply);
3204 }
3205
3206 static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
3207                                          struct tevent_req *req,
3208                                          struct ctdb_req_header *header,
3209                                          struct ctdb_req_control *request)
3210 {
3211         struct client_state *state = tevent_req_data(
3212                 req, struct client_state);
3213         struct ctdbd_context *ctdb = state->ctdb;
3214         struct ctdb_reply_control reply;
3215         struct database *db;
3216
3217         reply.rdata.opcode = request->opcode;
3218
3219         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
3220                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
3221                         goto done;
3222                 }
3223         }
3224
3225         db = database_new(ctdb->db_map, request->rdata.data.db_name,
3226                           CTDB_DB_FLAGS_REPLICATED);
3227         if (db == NULL) {
3228                 reply.status = -1;
3229                 reply.errmsg = "Failed to attach database";
3230                 client_send_control(req, header, &reply);
3231                 return;
3232         }
3233
3234 done:
3235         reply.rdata.data.db_id = db->id;
3236         reply.status = 0;
3237         reply.errmsg = NULL;
3238         client_send_control(req, header, &reply);
3239 }
3240
3241 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
3242                                     struct tevent_req *req,
3243                                     struct ctdb_req_header *header,
3244                                     struct ctdb_req_control *request)
3245 {
3246         struct client_state *state = tevent_req_data(
3247                 req, struct client_state);
3248         struct ctdbd_context *ctdb = state->ctdb;
3249         struct ctdb_client *client;
3250         struct client_state *cstate;
3251         struct ctdb_reply_control reply;
3252         bool pid_found, srvid_found;
3253         int ret;
3254
3255         reply.rdata.opcode = request->opcode;
3256
3257         pid_found = false;
3258         srvid_found = false;
3259
3260         for (client=ctdb->client_list; client != NULL; client=client->next) {
3261                 if (client->pid == request->rdata.data.pid_srvid->pid) {
3262                         pid_found = true;
3263                         cstate = (struct client_state *)client->state;
3264                         ret = srvid_exists(ctdb->srv,
3265                                            request->rdata.data.pid_srvid->srvid,
3266                                            cstate);
3267                         if (ret == 0) {
3268                                 srvid_found = true;
3269                                 ret = kill(cstate->pid, 0);
3270                                 if (ret != 0) {
3271                                         reply.status = ret;
3272                                         reply.errmsg = strerror(errno);
3273                                 } else {
3274                                         reply.status = 0;
3275                                         reply.errmsg = NULL;
3276                                 }
3277                         }
3278                 }
3279         }
3280
3281         if (! pid_found) {
3282                 reply.status = -1;
3283                 reply.errmsg = "No client for PID";
3284         } else if (! srvid_found) {
3285                 reply.status = -1;
3286                 reply.errmsg = "No client for PID and SRVID";
3287         }
3288
3289         client_send_control(req, header, &reply);
3290 }
3291
3292 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
3293                                  struct tevent_req *req,
3294                                  struct ctdb_req_header *header,
3295                                  struct ctdb_req_control *request)
3296 {
3297         struct client_state *state = tevent_req_data(
3298                 req, struct client_state);
3299         struct ctdbd_context *ctdb = state->ctdb;
3300         struct ctdb_reply_control reply;
3301         struct fake_control_failure *f = NULL;
3302
3303         D_DEBUG("Checking fake control failure for control %u on node %u\n",
3304                 request->opcode, header->destnode);
3305         for (f = ctdb->control_failures; f != NULL; f = f->next) {
3306                 if (f->opcode == request->opcode &&
3307                     (f->pnn == header->destnode ||
3308                      f->pnn == CTDB_UNKNOWN_PNN)) {
3309
3310                         reply.rdata.opcode = request->opcode;
3311                         if (strcmp(f->error, "TIMEOUT") == 0) {
3312                                 /* Causes no reply */
3313                                 D_ERR("Control %u fake timeout on node %u\n",
3314                                       request->opcode, header->destnode);
3315                                 return true;
3316                         } else if (strcmp(f->error, "ERROR") == 0) {
3317                                 D_ERR("Control %u fake error on node %u\n",
3318                                       request->opcode, header->destnode);
3319                                 reply.status = -1;
3320                                 reply.errmsg = f->comment;
3321                                 client_send_control(req, header, &reply);
3322                                 return true;
3323                         }
3324                 }
3325         }
3326
3327         return false;
3328 }
3329
3330 static void control_error(TALLOC_CTX *mem_ctx,
3331                           struct tevent_req *req,
3332                           struct ctdb_req_header *header,
3333                           struct ctdb_req_control *request)
3334 {
3335         struct ctdb_reply_control reply;
3336
3337         reply.rdata.opcode = request->opcode;
3338         reply.status = -1;
3339         reply.errmsg = "Not implemented";
3340
3341         client_send_control(req, header, &reply);
3342 }
3343
3344 /*
3345  * Handling protocol - messages
3346  */
3347
3348 struct disable_recoveries_state {
3349         struct node *node;
3350 };
3351
3352 static void disable_recoveries_callback(struct tevent_req *subreq)
3353 {
3354         struct disable_recoveries_state *substate = tevent_req_callback_data(
3355                 subreq, struct disable_recoveries_state);
3356         bool status;
3357
3358         status = tevent_wakeup_recv(subreq);
3359         TALLOC_FREE(subreq);
3360         if (! status) {
3361                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
3362         }
3363
3364         substate->node->recovery_disabled = false;
3365         TALLOC_FREE(substate->node->recovery_substate);
3366 }
3367
3368 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
3369                                        struct tevent_req *req,
3370                                        struct ctdb_req_header *header,
3371                                        struct ctdb_req_message *request)
3372 {
3373         struct client_state *state = tevent_req_data(
3374                 req, struct client_state);
3375         struct tevent_req *subreq;
3376         struct ctdbd_context *ctdb = state->ctdb;
3377         struct disable_recoveries_state *substate;
3378         struct ctdb_disable_message *disable = request->data.disable;
3379         struct ctdb_req_message_data reply;
3380         struct node *node;
3381         int ret = -1;
3382         TDB_DATA data;
3383
3384         node = &ctdb->node_map->node[header->destnode];
3385
3386         if (disable->timeout == 0) {
3387                 TALLOC_FREE(node->recovery_substate);
3388                 node->recovery_disabled = false;
3389                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
3390                                    header->destnode));
3391                 goto done;
3392         }
3393
3394         substate = talloc_zero(ctdb->node_map,
3395                                struct disable_recoveries_state);
3396         if (substate == NULL) {
3397                 goto fail;
3398         }
3399
3400         substate->node = node;
3401
3402         subreq = tevent_wakeup_send(substate, state->ev,
3403                                     tevent_timeval_current_ofs(
3404                                             disable->timeout, 0));
3405         if (subreq == NULL) {
3406                 talloc_free(substate);
3407                 goto fail;
3408         }
3409         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
3410
3411         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
3412                            disable->timeout, header->destnode));
3413         node->recovery_substate = substate;
3414         node->recovery_disabled = true;
3415
3416 done:
3417         ret = header->destnode;
3418
3419 fail:
3420         reply.srvid = disable->srvid;
3421         data.dptr = (uint8_t *)&ret;
3422         data.dsize = sizeof(int);
3423         reply.data = data;
3424
3425         client_send_message(req, header, &reply);
3426 }
3427
3428 static void message_takeover_run(TALLOC_CTX *mem_ctx,
3429                                  struct tevent_req *req,
3430                                  struct ctdb_req_header *header,
3431                                  struct ctdb_req_message *request)
3432 {
3433         struct client_state *state = tevent_req_data(
3434                 req, struct client_state);
3435         struct ctdbd_context *ctdb = state->ctdb;
3436         struct ctdb_srvid_message *srvid = request->data.msg;
3437         struct ctdb_req_message_data reply;
3438         int ret = -1;
3439         TDB_DATA data;
3440
3441         if (header->destnode != ctdb->node_map->recmaster) {
3442                 /* No reply! Only recmaster replies... */
3443                 return;
3444         }
3445
3446         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
3447                            header->destnode));
3448         ret = header->destnode;
3449
3450         reply.srvid = srvid->srvid;
3451         data.dptr = (uint8_t *)&ret;
3452         data.dsize = sizeof(int);
3453         reply.data = data;
3454
3455         client_send_message(req, header, &reply);
3456 }
3457
3458 /*
3459  * Handle a single client
3460  */
3461
3462 static void client_read_handler(uint8_t *buf, size_t buflen,
3463                                 void *private_data);
3464 static void client_dead_handler(void *private_data);
3465 static void client_process_packet(struct tevent_req *req,
3466                                   uint8_t *buf, size_t buflen);
3467 static void client_process_call(struct tevent_req *req,
3468                                 uint8_t *buf, size_t buflen);
3469 static void client_process_message(struct tevent_req *req,
3470                                    uint8_t *buf, size_t buflen);
3471 static void client_process_control(struct tevent_req *req,
3472                                    uint8_t *buf, size_t buflen);
3473 static void client_reply_done(struct tevent_req *subreq);
3474
3475 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3476                                       struct tevent_context *ev,
3477                                       int fd, struct ctdbd_context *ctdb,
3478                                       int pnn)
3479 {
3480         struct tevent_req *req;
3481         struct client_state *state;
3482         struct ucred cr;
3483         socklen_t crl = sizeof(struct ucred);
3484         int ret;
3485
3486         req = tevent_req_create(mem_ctx, &state, struct client_state);
3487         if (req == NULL) {
3488                 return NULL;
3489         }
3490
3491         state->ev = ev;
3492         state->fd = fd;
3493         state->ctdb = ctdb;
3494         state->pnn = pnn;
3495
3496         ret = getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl);
3497         if (ret != 0) {
3498                 tevent_req_error(req, ret);
3499                 return tevent_req_post(req, ev);
3500         }
3501         state->pid = cr.pid;
3502
3503         ret = comm_setup(state, ev, fd, client_read_handler, req,
3504                          client_dead_handler, req, &state->comm);
3505         if (ret != 0) {
3506                 tevent_req_error(req, ret);
3507                 return tevent_req_post(req, ev);
3508         }
3509
3510         ret = client_add(ctdb, state->pid, state);
3511         if (ret != 0) {
3512                 tevent_req_error(req, ret);
3513                 return tevent_req_post(req, ev);
3514         }
3515
3516         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
3517
3518         return req;
3519 }
3520
3521 static void client_read_handler(uint8_t *buf, size_t buflen,
3522                                 void *private_data)
3523 {
3524         struct tevent_req *req = talloc_get_type_abort(
3525                 private_data, struct tevent_req);
3526         struct client_state *state = tevent_req_data(
3527                 req, struct client_state);
3528         struct ctdbd_context *ctdb = state->ctdb;
3529         struct ctdb_req_header header;
3530         size_t np;
3531         int ret, i;
3532
3533         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3534         if (ret != 0) {
3535                 return;
3536         }
3537
3538         if (buflen != header.length) {
3539                 return;
3540         }
3541
3542         ret = ctdb_req_header_verify(&header, 0);
3543         if (ret != 0) {
3544                 return;
3545         }
3546
3547         header_fix_pnn(&header, ctdb);
3548
3549         if (header.destnode == CTDB_BROADCAST_ALL) {
3550                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3551                         header.destnode = i;
3552
3553                         ctdb_req_header_push(&header, buf, &np);
3554                         client_process_packet(req, buf, buflen);
3555                 }
3556                 return;
3557         }
3558
3559         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
3560                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3561                         if (ctdb->node_map->node[i].flags &
3562                             NODE_FLAGS_DISCONNECTED) {
3563                                 continue;
3564                         }
3565
3566                         header.destnode = i;
3567
3568                         ctdb_req_header_push(&header, buf, &np);
3569                         client_process_packet(req, buf, buflen);
3570                 }
3571                 return;
3572         }
3573
3574         if (header.destnode > ctdb->node_map->num_nodes) {
3575                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
3576                         header.destnode);
3577                 return;
3578         }
3579
3580
3581         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
3582                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
3583                         header.destnode);
3584                 return;
3585         }
3586
3587         ctdb_req_header_push(&header, buf, &np);
3588         client_process_packet(req, buf, buflen);
3589 }
3590
3591 static void client_dead_handler(void *private_data)
3592 {
3593         struct tevent_req *req = talloc_get_type_abort(
3594                 private_data, struct tevent_req);
3595
3596         tevent_req_done(req);
3597 }
3598
3599 static void client_process_packet(struct tevent_req *req,
3600                                   uint8_t *buf, size_t buflen)
3601 {
3602         struct ctdb_req_header header;
3603         size_t np;
3604         int ret;
3605
3606         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3607         if (ret != 0) {
3608                 return;
3609         }
3610
3611         switch (header.operation) {
3612         case CTDB_REQ_CALL:
3613                 client_process_call(req, buf, buflen);
3614                 break;
3615
3616         case CTDB_REQ_MESSAGE:
3617                 client_process_message(req, buf, buflen);
3618                 break;
3619
3620         case CTDB_REQ_CONTROL:
3621                 client_process_control(req, buf, buflen);
3622                 break;
3623
3624         default:
3625                 break;
3626         }
3627 }
3628
3629 static void client_process_call(struct tevent_req *req,
3630                                 uint8_t *buf, size_t buflen)
3631 {
3632         struct client_state *state = tevent_req_data(
3633                 req, struct client_state);
3634         struct ctdbd_context *ctdb = state->ctdb;
3635         TALLOC_CTX *mem_ctx;
3636         struct ctdb_req_header header;
3637         struct ctdb_req_call request;
3638         struct ctdb_reply_call reply;
3639         struct database *db;
3640         struct ctdb_ltdb_header hdr;
3641         TDB_DATA data;
3642         int ret;
3643
3644         mem_ctx = talloc_new(state);
3645         if (tevent_req_nomem(mem_ctx, req)) {
3646                 return;
3647         }
3648
3649         ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
3650         if (ret != 0) {
3651                 talloc_free(mem_ctx);
3652                 tevent_req_error(req, ret);
3653                 return;
3654         }
3655
3656         header_fix_pnn(&header, ctdb);
3657
3658         if (header.destnode >= ctdb->node_map->num_nodes) {
3659                 goto fail;
3660         }
3661
3662         DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
3663
3664         db = database_find(ctdb->db_map, request.db_id);
3665         if (db == NULL) {
3666                 goto fail;
3667         }
3668
3669         ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
3670         if (ret != 0) {
3671                 goto fail;
3672         }
3673
3674         /* Fake migration */
3675         if (hdr.dmaster != ctdb->node_map->pnn) {
3676                 hdr.dmaster = ctdb->node_map->pnn;
3677
3678                 ret = ltdb_store(db, request.key, &hdr, data);
3679                 if (ret != 0) {
3680                         goto fail;
3681                 }
3682         }
3683
3684         talloc_free(mem_ctx);
3685
3686         reply.status = 0;
3687         reply.data = tdb_null;
3688
3689         client_send_call(req, &header, &reply);
3690         return;
3691
3692 fail:
3693         talloc_free(mem_ctx);
3694         reply.status = -1;
3695         reply.data = tdb_null;
3696
3697         client_send_call(req, &header, &reply);
3698 }
3699
3700 static void client_process_message(struct tevent_req *req,
3701                                    uint8_t *buf, size_t buflen)
3702 {
3703         struct client_state *state = tevent_req_data(
3704                 req, struct client_state);
3705         struct ctdbd_context *ctdb = state->ctdb;
3706         TALLOC_CTX *mem_ctx;
3707         struct ctdb_req_header header;
3708         struct ctdb_req_message request;
3709         uint64_t srvid;
3710         int ret;
3711
3712         mem_ctx = talloc_new(state);
3713         if (tevent_req_nomem(mem_ctx, req)) {
3714                 return;
3715         }
3716
3717         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
3718         if (ret != 0) {
3719                 talloc_free(mem_ctx);
3720                 tevent_req_error(req, ret);
3721                 return;
3722         }
3723
3724         header_fix_pnn(&header, ctdb);
3725
3726         if (header.destnode >= ctdb->node_map->num_nodes) {
3727                 /* Many messages are not replied to, so just behave as
3728                  * though this message was not received */
3729                 fprintf(stderr, "Invalid node %d\n", header.destnode);
3730                 talloc_free(mem_ctx);
3731                 return;
3732         }
3733
3734         srvid = request.srvid;
3735         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
3736
3737         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
3738                 message_disable_recoveries(mem_ctx, req, &header, &request);
3739         } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
3740                 message_takeover_run(mem_ctx, req, &header, &request);
3741         }
3742
3743         /* check srvid */
3744         talloc_free(mem_ctx);
3745 }
3746
3747 static void client_process_control(struct tevent_req *req,
3748                                    uint8_t *buf, size_t buflen)
3749 {
3750         struct client_state *state = tevent_req_data(
3751                 req, struct client_state);
3752         struct ctdbd_context *ctdb = state->ctdb;
3753         TALLOC_CTX *mem_ctx;
3754         struct ctdb_req_header header;
3755         struct ctdb_req_control request;
3756         int ret;
3757
3758         mem_ctx = talloc_new(state);
3759         if (tevent_req_nomem(mem_ctx, req)) {
3760                 return;
3761         }
3762
3763         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
3764         if (ret != 0) {
3765                 talloc_free(mem_ctx);
3766                 tevent_req_error(req, ret);
3767                 return;
3768         }
3769
3770         header_fix_pnn(&header, ctdb);
3771
3772         if (header.destnode >= ctdb->node_map->num_nodes) {
3773                 struct ctdb_reply_control reply;
3774
3775                 reply.rdata.opcode = request.opcode;
3776                 reply.errmsg = "Invalid node";
3777                 reply.status = -1;
3778                 client_send_control(req, &header, &reply);
3779                 return;
3780         }
3781
3782         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
3783                            request.opcode, header.reqid));
3784
3785         if (fake_control_failure(mem_ctx, req, &header, &request)) {
3786                 goto done;
3787         }
3788
3789         switch (request.opcode) {
3790         case CTDB_CONTROL_PROCESS_EXISTS:
3791                 control_process_exists(mem_ctx, req, &header, &request);
3792                 break;
3793
3794         case CTDB_CONTROL_PING:
3795                 control_ping(mem_ctx, req, &header, &request);
3796                 break;
3797
3798         case CTDB_CONTROL_GETDBPATH:
3799                 control_getdbpath(mem_ctx, req, &header, &request);
3800                 break;
3801
3802         case CTDB_CONTROL_GETVNNMAP:
3803                 control_getvnnmap(mem_ctx, req, &header, &request);
3804                 break;
3805
3806         case CTDB_CONTROL_GET_DEBUG:
3807                 control_get_debug(mem_ctx, req, &header, &request);
3808                 break;
3809
3810         case CTDB_CONTROL_SET_DEBUG:
3811                 control_set_debug(mem_ctx, req, &header, &request);
3812                 break;
3813
3814         case CTDB_CONTROL_GET_DBMAP:
3815                 control_get_dbmap(mem_ctx, req, &header, &request);
3816                 break;
3817
3818         case CTDB_CONTROL_GET_RECMODE:
3819                 control_get_recmode(mem_ctx, req, &header, &request);
3820                 break;
3821
3822         case CTDB_CONTROL_SET_RECMODE:
3823                 control_set_recmode(mem_ctx, req, &header, &request);
3824                 break;
3825
3826         case CTDB_CONTROL_DB_ATTACH:
3827                 control_db_attach(mem_ctx, req, &header, &request);
3828                 break;
3829
3830         case CTDB_CONTROL_REGISTER_SRVID:
3831                 control_register_srvid(mem_ctx, req, &header, &request);
3832                 break;
3833
3834         case CTDB_CONTROL_DEREGISTER_SRVID:
3835                 control_deregister_srvid(mem_ctx, req, &header, &request);
3836                 break;
3837
3838         case CTDB_CONTROL_GET_DBNAME:
3839                 control_get_dbname(mem_ctx, req, &header, &request);
3840                 break;
3841
3842         case CTDB_CONTROL_GET_PID:
3843                 control_get_pid(mem_ctx, req, &header, &request);
3844                 break;
3845
3846         case CTDB_CONTROL_GET_RECMASTER:
3847                 control_get_recmaster(mem_ctx, req, &header, &request);
3848                 break;
3849
3850         case CTDB_CONTROL_GET_PNN:
3851                 control_get_pnn(mem_ctx, req, &header, &request);
3852                 break;
3853
3854         case CTDB_CONTROL_SHUTDOWN:
3855                 control_shutdown(mem_ctx, req, &header, &request);
3856                 break;
3857
3858         case CTDB_CONTROL_SET_TUNABLE:
3859                 control_set_tunable(mem_ctx, req, &header, &request);
3860                 break;
3861
3862         case CTDB_CONTROL_GET_TUNABLE:
3863                 control_get_tunable(mem_ctx, req, &header, &request);
3864                 break;
3865
3866         case CTDB_CONTROL_LIST_TUNABLES:
3867                 control_list_tunables(mem_ctx, req, &header, &request);
3868                 break;
3869
3870         case CTDB_CONTROL_MODIFY_FLAGS:
3871                 control_modify_flags(mem_ctx, req, &header, &request);
3872                 break;
3873
3874         case CTDB_CONTROL_GET_ALL_TUNABLES:
3875                 control_get_all_tunables(mem_ctx, req, &header, &request);
3876                 break;
3877
3878         case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
3879                 control_db_attach_persistent(mem_ctx, req, &header, &request);
3880                 break;
3881
3882         case CTDB_CONTROL_UPTIME:
3883                 control_uptime(mem_ctx, req, &header, &request);
3884                 break;
3885
3886         case CTDB_CONTROL_RELOAD_NODES_FILE:
3887                 control_reload_nodes_file(mem_ctx, req, &header, &request);
3888                 break;
3889
3890         case CTDB_CONTROL_GET_CAPABILITIES:
3891                 control_get_capabilities(mem_ctx, req, &header, &request);
3892                 break;
3893
3894         case CTDB_CONTROL_RELEASE_IP:
3895                 control_release_ip(mem_ctx, req, &header, &request);
3896                 break;
3897
3898         case CTDB_CONTROL_TAKEOVER_IP:
3899                 control_takeover_ip(mem_ctx, req, &header, &request);
3900                 break;
3901
3902         case CTDB_CONTROL_GET_PUBLIC_IPS:
3903                 control_get_public_ips(mem_ctx, req, &header, &request);
3904                 break;
3905
3906         case CTDB_CONTROL_GET_NODEMAP:
3907                 control_get_nodemap(mem_ctx, req, &header, &request);
3908                 break;
3909
3910         case CTDB_CONTROL_GET_RECLOCK_FILE:
3911                 control_get_reclock_file(mem_ctx, req, &header, &request);
3912                 break;
3913
3914         case CTDB_CONTROL_STOP_NODE:
3915                 control_stop_node(mem_ctx, req, &header, &request);
3916                 break;
3917
3918         case CTDB_CONTROL_CONTINUE_NODE:
3919                 control_continue_node(mem_ctx, req, &header, &request);
3920                 break;
3921
3922         case CTDB_CONTROL_SET_BAN_STATE:
3923                 control_set_ban_state(mem_ctx, req, &header, &request);
3924                 break;
3925
3926         case CTDB_CONTROL_GET_DB_SEQNUM:
3927                 control_get_db_seqnum(mem_ctx, req, &header, &request);
3928                 break;
3929
3930         case CTDB_CONTROL_DB_GET_HEALTH:
3931                 control_db_get_health(mem_ctx, req, &header, &request);
3932                 break;
3933
3934         case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
3935                 control_get_public_ip_info(mem_ctx, req, &header, &request);
3936                 break;
3937
3938         case CTDB_CONTROL_GET_IFACES:
3939                 control_get_ifaces(mem_ctx, req, &header, &request);
3940                 break;
3941
3942         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
3943                 control_set_iface_link_state(mem_ctx, req, &header, &request);
3944                 break;
3945
3946         case CTDB_CONTROL_SET_DB_READONLY:
3947                 control_set_db_readonly(mem_ctx, req, &header, &request);
3948                 break;
3949
3950         case CTDB_CONTROL_SET_DB_STICKY:
3951                 control_set_db_sticky(mem_ctx, req, &header, &request);
3952                 break;
3953
3954         case CTDB_CONTROL_IPREALLOCATED:
3955                 control_ipreallocated(mem_ctx, req, &header, &request);
3956                 break;
3957
3958         case CTDB_CONTROL_GET_RUNSTATE:
3959                 control_get_runstate(mem_ctx, req, &header, &request);
3960                 break;
3961
3962         case CTDB_CONTROL_GET_NODES_FILE:
3963                 control_get_nodes_file(mem_ctx, req, &header, &request);
3964                 break;
3965
3966         case CTDB_CONTROL_DB_OPEN_FLAGS:
3967                 control_db_open_flags(mem_ctx, req, &header, &request);
3968                 break;
3969
3970         case CTDB_CONTROL_DB_ATTACH_REPLICATED:
3971                 control_db_attach_replicated(mem_ctx, req, &header, &request);
3972                 break;
3973
3974         case CTDB_CONTROL_CHECK_PID_SRVID:
3975                 control_check_pid_srvid(mem_ctx, req, &header, &request);
3976                 break;
3977
3978         default:
3979                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
3980                         control_error(mem_ctx, req, &header, &request);
3981                 }
3982                 break;
3983         }
3984
3985 done:
3986         talloc_free(mem_ctx);
3987 }
3988
3989 static int client_recv(struct tevent_req *req, int *perr)
3990 {
3991         struct client_state *state = tevent_req_data(
3992                 req, struct client_state);
3993         int err;
3994
3995         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
3996         close(state->fd);
3997
3998         if (tevent_req_is_unix_error(req, &err)) {
3999                 if (perr != NULL) {
4000                         *perr = err;
4001                 }
4002                 return -1;
4003         }
4004
4005         return state->status;
4006 }
4007
4008 /*
4009  * Fake CTDB server
4010  */
4011
4012 struct server_state {
4013         struct tevent_context *ev;
4014         struct ctdbd_context *ctdb;
4015         int fd;
4016 };
4017
4018 static void server_new_client(struct tevent_req *subreq);
4019 static void server_client_done(struct tevent_req *subreq);
4020
4021 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
4022                                       struct tevent_context *ev,
4023                                       struct ctdbd_context *ctdb,
4024                                       int fd)
4025 {
4026         struct tevent_req *req, *subreq;
4027         struct server_state *state;
4028
4029         req = tevent_req_create(mem_ctx, &state, struct server_state);
4030         if (req == NULL) {
4031                 return NULL;
4032         }
4033
4034         state->ev = ev;
4035         state->ctdb = ctdb;
4036         state->fd = fd;
4037
4038         subreq = accept_send(state, ev, fd);
4039         if (tevent_req_nomem(subreq, req)) {
4040                 return tevent_req_post(req, ev);
4041         }
4042         tevent_req_set_callback(subreq, server_new_client, req);
4043
4044         return req;
4045 }
4046
4047 static void server_new_client(struct tevent_req *subreq)
4048 {
4049         struct tevent_req *req = tevent_req_callback_data(
4050                 subreq, struct tevent_req);
4051         struct server_state *state = tevent_req_data(
4052                 req, struct server_state);
4053         struct ctdbd_context *ctdb = state->ctdb;
4054         int client_fd;
4055         int ret = 0;
4056
4057         client_fd = accept_recv(subreq, NULL, NULL, &ret);
4058         TALLOC_FREE(subreq);
4059         if (client_fd == -1) {
4060                 tevent_req_error(req, ret);
4061                 return;
4062         }
4063
4064         subreq = client_send(state, state->ev, client_fd,
4065                              ctdb, ctdb->node_map->pnn);
4066         if (tevent_req_nomem(subreq, req)) {
4067                 return;
4068         }
4069         tevent_req_set_callback(subreq, server_client_done, req);
4070
4071         ctdb->num_clients += 1;
4072
4073         subreq = accept_send(state, state->ev, state->fd);
4074         if (tevent_req_nomem(subreq, req)) {
4075                 return;
4076         }
4077         tevent_req_set_callback(subreq, server_new_client, req);
4078 }
4079
4080 static void server_client_done(struct tevent_req *subreq)
4081 {
4082         struct tevent_req *req = tevent_req_callback_data(
4083                 subreq, struct tevent_req);
4084         struct server_state *state = tevent_req_data(
4085                 req, struct server_state);
4086         struct ctdbd_context *ctdb = state->ctdb;
4087         int ret = 0;
4088         int status;
4089
4090         status = client_recv(subreq, &ret);
4091         TALLOC_FREE(subreq);
4092         if (status < 0) {
4093                 tevent_req_error(req, ret);
4094                 return;
4095         }
4096
4097         ctdb->num_clients -= 1;
4098
4099         if (status == 99) {
4100                 /* Special status, to shutdown server */
4101                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
4102                 tevent_req_done(req);
4103         }
4104 }
4105
4106 static bool server_recv(struct tevent_req *req, int *perr)
4107 {
4108         int err;
4109
4110         if (tevent_req_is_unix_error(req, &err)) {
4111                 if (perr != NULL) {
4112                         *perr = err;
4113                 }
4114                 return false;
4115         }
4116         return true;
4117 }
4118
4119 /*
4120  * Main functions
4121  */
4122
4123 static int socket_init(const char *sockpath)
4124 {
4125         struct sockaddr_un addr;
4126         size_t len;
4127         int ret, fd;
4128
4129         memset(&addr, 0, sizeof(addr));
4130         addr.sun_family = AF_UNIX;
4131
4132         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
4133         if (len >= sizeof(addr.sun_path)) {
4134                 fprintf(stderr, "path too long: %s\n", sockpath);
4135                 return -1;
4136         }
4137
4138         fd = socket(AF_UNIX, SOCK_STREAM, 0);
4139         if (fd == -1) {
4140                 fprintf(stderr, "socket failed - %s\n", sockpath);
4141                 return -1;
4142         }
4143
4144         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
4145         if (ret != 0) {
4146                 fprintf(stderr, "bind failed - %s\n", sockpath);
4147                 goto fail;
4148         }
4149
4150         ret = listen(fd, 10);
4151         if (ret != 0) {
4152                 fprintf(stderr, "listen failed\n");
4153                 goto fail;
4154         }
4155
4156         DEBUG(DEBUG_INFO, ("Socket init done\n"));
4157
4158         return fd;
4159
4160 fail:
4161         if (fd != -1) {
4162                 close(fd);
4163         }
4164         return -1;
4165 }
4166
4167 static struct options {
4168         const char *dbdir;
4169         const char *sockpath;
4170         const char *pidfile;
4171         const char *debuglevel;
4172 } options;
4173
4174 static struct poptOption cmdline_options[] = {
4175         POPT_AUTOHELP
4176         { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
4177                 "Database directory", "directory" },
4178         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
4179                 "Unix domain socket path", "filename" },
4180         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
4181                 "pid file", "filename" } ,
4182         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
4183                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
4184         POPT_TABLEEND
4185 };
4186
4187 static void cleanup(void)
4188 {
4189         unlink(options.sockpath);
4190         unlink(options.pidfile);
4191 }
4192
4193 static void signal_handler(int sig)
4194 {
4195         cleanup();
4196         exit(0);
4197 }
4198
4199 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
4200                          struct ctdbd_context *ctdb, int fd, int pfd)
4201 {
4202         struct tevent_req *req;
4203         int ret = 0;
4204         ssize_t len;
4205
4206         atexit(cleanup);
4207         signal(SIGTERM, signal_handler);
4208
4209         req = server_send(mem_ctx, ev, ctdb, fd);
4210         if (req == NULL) {
4211                 fprintf(stderr, "Memory error\n");
4212                 exit(1);
4213         }
4214
4215         len = write(pfd, &ret, sizeof(ret));
4216         if (len != sizeof(ret)) {
4217                 fprintf(stderr, "Failed to send message to parent\n");
4218                 exit(1);
4219         }
4220         close(pfd);
4221
4222         tevent_req_poll(req, ev);
4223
4224         server_recv(req, &ret);
4225         if (ret != 0) {
4226                 exit(1);
4227         }
4228 }
4229
4230 int main(int argc, const char *argv[])
4231 {
4232         TALLOC_CTX *mem_ctx;
4233         struct ctdbd_context *ctdb;
4234         struct tevent_context *ev;
4235         poptContext pc;
4236         int opt, fd, ret, pfd[2];
4237         ssize_t len;
4238         pid_t pid;
4239         FILE *fp;
4240
4241         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
4242                             POPT_CONTEXT_KEEP_FIRST);
4243         while ((opt = poptGetNextOpt(pc)) != -1) {
4244                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
4245                 exit(1);
4246         }
4247
4248         if (options.dbdir == NULL) {
4249                 fprintf(stderr, "Please specify database directory\n");
4250                 poptPrintHelp(pc, stdout, 0);
4251                 exit(1);
4252         }
4253
4254         if (options.sockpath == NULL) {
4255                 fprintf(stderr, "Please specify socket path\n");
4256                 poptPrintHelp(pc, stdout, 0);
4257                 exit(1);
4258         }
4259
4260         if (options.pidfile == NULL) {
4261                 fprintf(stderr, "Please specify pid file\n");
4262                 poptPrintHelp(pc, stdout, 0);
4263                 exit(1);
4264         }
4265
4266         mem_ctx = talloc_new(NULL);
4267         if (mem_ctx == NULL) {
4268                 fprintf(stderr, "Memory error\n");
4269                 exit(1);
4270         }
4271
4272         ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
4273         if (ret != 0) {
4274                 fprintf(stderr, "Invalid debug level\n");
4275                 poptPrintHelp(pc, stdout, 0);
4276                 exit(1);
4277         }
4278
4279         ctdb = ctdbd_setup(mem_ctx, options.dbdir);
4280         if (ctdb == NULL) {
4281                 exit(1);
4282         }
4283
4284         if (! ctdbd_verify(ctdb)) {
4285                 exit(1);
4286         }
4287
4288         ev = tevent_context_init(mem_ctx);
4289         if (ev == NULL) {
4290                 fprintf(stderr, "Memory error\n");
4291                 exit(1);
4292         }
4293
4294         fd = socket_init(options.sockpath);
4295         if (fd == -1) {
4296                 exit(1);
4297         }
4298
4299         ret = pipe(pfd);
4300         if (ret != 0) {
4301                 fprintf(stderr, "Failed to create pipe\n");
4302                 cleanup();
4303                 exit(1);
4304         }
4305
4306         pid = fork();
4307         if (pid == -1) {
4308                 fprintf(stderr, "Failed to fork\n");
4309                 cleanup();
4310                 exit(1);
4311         }
4312
4313         if (pid == 0) {
4314                 /* Child */
4315                 close(pfd[0]);
4316                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
4317                 exit(1);
4318         }
4319
4320         /* Parent */
4321         close(pfd[1]);
4322
4323         len = read(pfd[0], &ret, sizeof(ret));
4324         close(pfd[0]);
4325         if (len != sizeof(ret)) {
4326                 fprintf(stderr, "len = %zi\n", len);
4327                 fprintf(stderr, "Failed to get message from child\n");
4328                 kill(pid, SIGTERM);
4329                 exit(1);
4330         }
4331
4332         fp = fopen(options.pidfile, "w");
4333         if (fp == NULL) {
4334                 fprintf(stderr, "Failed to open pid file %s\n",
4335                         options.pidfile);
4336                 kill(pid, SIGTERM);
4337                 exit(1);
4338         }
4339         fprintf(fp, "%d\n", pid);
4340         fclose(fp);
4341
4342         return 0;
4343 }