ctdb-tests: Add public IP state to fake_ctdbd
[samba.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23
24 #include <popt.h>
25 #include <talloc.h>
26 #include <tevent.h>
27 #include <tdb.h>
28
29 #include "lib/util/dlinklist.h"
30 #include "lib/util/tevent_unix.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/async_req/async_sock.h"
34
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37
38 #include "common/comm.h"
39 #include "common/system.h"
40 #include "common/logging.h"
41 #include "common/tunable.h"
42
43 #include "ipalloc_read_known_ips.h"
44
45
46 #define CTDB_PORT 4379
47
48 /* A fake flag that is only supported by some functions */
49 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
50
51 struct node {
52         ctdb_sock_addr addr;
53         uint32_t pnn;
54         uint32_t flags;
55         uint32_t capabilities;
56         bool recovery_disabled;
57         void *recovery_substate;
58 };
59
60 struct node_map {
61         uint32_t num_nodes;
62         struct node *node;
63         uint32_t pnn;
64         uint32_t recmaster;
65 };
66
67 struct interface {
68         const char *name;
69         bool link_up;
70         uint32_t references;
71 };
72
73 struct interface_map {
74         int num;
75         struct interface *iface;
76 };
77
78 struct vnn_map {
79         uint32_t recmode;
80         uint32_t generation;
81         uint32_t size;
82         uint32_t *map;
83 };
84
85 struct database {
86         const char *name;
87         uint32_t id;
88         uint8_t flags;
89         uint64_t seq_num;
90 };
91
92 struct database_map {
93         int num_dbs;
94         struct database *db;
95 };
96
97 struct srvid_register_state {
98         struct srvid_register_state *prev, *next;
99         struct ctdbd_context *ctdb;
100         uint64_t srvid;
101 };
102
103 struct ctdbd_context {
104         struct node_map *node_map;
105         struct interface_map *iface_map;
106         struct vnn_map *vnn_map;
107         struct database_map *db_map;
108         struct srvid_register_state *rstate;
109         int num_clients;
110         struct timeval start_time;
111         struct timeval recovery_start_time;
112         struct timeval recovery_end_time;
113         bool takeover_disabled;
114         int log_level;
115         enum ctdb_runstate runstate;
116         struct ctdb_tunable_list tun_list;
117         int monitoring_mode;
118         char *reclock;
119         struct ctdb_public_ip_list *known_ips;
120 };
121
122 /*
123  * Parse routines
124  */
125
126 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
127 {
128         struct node_map *node_map;
129
130         node_map = talloc_zero(mem_ctx, struct node_map);
131         if (node_map == NULL) {
132                 return NULL;
133         }
134
135         node_map->pnn = CTDB_UNKNOWN_PNN;
136         node_map->recmaster = CTDB_UNKNOWN_PNN;
137
138         return node_map;
139 }
140
141 /* Read a nodemap from stdin.  Each line looks like:
142  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
143  * EOF or a blank line terminates input.
144  *
145  * By default, capablities for each node are
146  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
147  * capabilities can be faked off by adding, for example,
148  * -CTDB_CAP_RECMASTER.
149  */
150
151 static bool nodemap_parse(struct node_map *node_map)
152 {
153         char line[1024];
154
155         while ((fgets(line, sizeof(line), stdin) != NULL)) {
156                 uint32_t pnn, flags, capabilities;
157                 char *tok, *t;
158                 char *ip;
159                 ctdb_sock_addr saddr;
160                 struct node *node;
161
162                 if (line[0] == '\n') {
163                         break;
164                 }
165
166                 /* Get rid of pesky newline */
167                 if ((t = strchr(line, '\n')) != NULL) {
168                         *t = '\0';
169                 }
170
171                 /* Get PNN */
172                 tok = strtok(line, " \t");
173                 if (tok == NULL) {
174                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
175                         continue;
176                 }
177                 pnn = (uint32_t)strtoul(tok, NULL, 0);
178
179                 /* Get IP */
180                 tok = strtok(NULL, " \t");
181                 if (tok == NULL) {
182                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
183                         continue;
184                 }
185                 if (!parse_ip(tok, NULL, CTDB_PORT, &saddr)) {
186                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
187                         continue;
188                 }
189                 ip = talloc_strdup(node_map, tok);
190                 if (ip == NULL) {
191                         goto fail;
192                 }
193
194                 /* Get flags */
195                 tok = strtok(NULL, " \t");
196                 if (tok == NULL) {
197                         fprintf(stderr, "bad line (%s) - missing flags\n",
198                                 line);
199                         continue;
200                 }
201                 flags = (uint32_t)strtoul(tok, NULL, 0);
202                 /* Handle deleted nodes */
203                 if (flags & NODE_FLAGS_DELETED) {
204                         talloc_free(ip);
205                         ip = talloc_strdup(node_map, "0.0.0.0");
206                         if (ip == NULL) {
207                                 goto fail;
208                         }
209                 }
210                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
211
212                 tok = strtok(NULL, " \t");
213                 while (tok != NULL) {
214                         if (strcmp(tok, "CURRENT") == 0) {
215                                 node_map->pnn = pnn;
216                         } else if (strcmp(tok, "RECMASTER") == 0) {
217                                 node_map->recmaster = pnn;
218                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
219                                 capabilities &= ~CTDB_CAP_RECMASTER;
220                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
221                                 capabilities &= ~CTDB_CAP_LMASTER;
222                         } else if (strcmp(tok, "TIMEOUT") == 0) {
223                                 /* This can be done with just a flag
224                                  * value but it is probably clearer
225                                  * and less error-prone to fake this
226                                  * with an explicit token */
227                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
228                         }
229                         tok = strtok(NULL, " \t");
230                 }
231
232                 node_map->node = talloc_realloc(node_map, node_map->node,
233                                                 struct node,
234                                                 node_map->num_nodes + 1);
235                 if (node_map->node == NULL) {
236                         goto fail;
237                 }
238                 node = &node_map->node[node_map->num_nodes];
239
240                 parse_ip(ip, NULL, CTDB_PORT, &node->addr);
241                 node->pnn = pnn;
242                 node->flags = flags;
243                 node->capabilities = capabilities;
244                 node->recovery_disabled = false;
245                 node->recovery_substate = NULL;
246
247                 node_map->num_nodes += 1;
248         }
249
250         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
251         return true;
252
253 fail:
254         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
255         return false;
256
257 }
258
259 /* Append a node to a node map with given address and flags */
260 static bool node_map_add(struct ctdb_node_map *nodemap,
261                          const char *nstr, uint32_t flags)
262 {
263         ctdb_sock_addr addr;
264         uint32_t num;
265         struct ctdb_node_and_flags *n;
266
267         if (! parse_ip(nstr, NULL, CTDB_PORT, &addr)) {
268                 fprintf(stderr, "Invalid IP address %s\n", nstr);
269                 return false;
270         }
271
272         num = nodemap->num;
273         nodemap->node = talloc_realloc(nodemap, nodemap->node,
274                                        struct ctdb_node_and_flags, num+1);
275         if (nodemap->node == NULL) {
276                 return false;
277         }
278
279         n = &nodemap->node[num];
280         n->addr = addr;
281         n->pnn = num;
282         n->flags = flags;
283
284         nodemap->num = num+1;
285         return true;
286 }
287
288 /* Read a nodes file into a node map */
289 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
290                                                   const char *nlist)
291 {
292         char **lines;
293         int nlines;
294         int i;
295         struct ctdb_node_map *nodemap;
296
297         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
298         if (nodemap == NULL) {
299                 return NULL;
300         }
301
302         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
303         if (lines == NULL) {
304                 return NULL;
305         }
306
307         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
308                 nlines--;
309         }
310
311         for (i=0; i<nlines; i++) {
312                 char *node;
313                 uint32_t flags;
314                 size_t len;
315
316                 node = lines[i];
317                 /* strip leading spaces */
318                 while((*node == ' ') || (*node == '\t')) {
319                         node++;
320                 }
321
322                 len = strlen(node);
323
324                 /* strip trailing spaces */
325                 while ((len > 1) &&
326                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
327                 {
328                         node[len-1] = '\0';
329                         len--;
330                 }
331
332                 if (len == 0) {
333                         continue;
334                 }
335                 if (*node == '#') {
336                         /* A "deleted" node is a node that is
337                            commented out in the nodes file.  This is
338                            used instead of removing a line, which
339                            would cause subsequent nodes to change
340                            their PNN. */
341                         flags = NODE_FLAGS_DELETED;
342                         node = discard_const("0.0.0.0");
343                 } else {
344                         flags = 0;
345                 }
346                 if (! node_map_add(nodemap, node, flags)) {
347                         talloc_free(lines);
348                         TALLOC_FREE(nodemap);
349                         return NULL;
350                 }
351         }
352
353         talloc_free(lines);
354         return nodemap;
355 }
356
357 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
358                                              uint32_t pnn)
359 {
360         struct ctdb_node_map *nodemap;
361         char nodepath[PATH_MAX];
362         const char *nodes_list;
363
364         /* read the nodes file */
365         sprintf(nodepath, "CTDB_NODES_%u", pnn);
366         nodes_list = getenv(nodepath);
367         if (nodes_list == NULL) {
368                 nodes_list = getenv("CTDB_NODES");
369                 if (nodes_list == NULL) {
370                         DEBUG(DEBUG_INFO, ("Nodes file not defined\n"));
371                         return NULL;
372                 }
373         }
374
375         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
376         if (nodemap == NULL) {
377                 DEBUG(DEBUG_INFO, ("Failed to read nodes file \"%s\"\n",
378                                    nodes_list));
379                 return NULL;
380         }
381
382         return nodemap;
383 }
384
385 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
386 {
387         struct interface_map *iface_map;
388
389         iface_map = talloc_zero(mem_ctx, struct interface_map);
390         if (iface_map == NULL) {
391                 return NULL;
392         }
393
394         return iface_map;
395 }
396
397 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
398  * output:
399  *   :Name:LinkStatus:References:
400  *   :eth2:1:4294967294
401  *   :eth1:1:4294967292
402  */
403
404 static bool interfaces_parse(struct interface_map *iface_map)
405 {
406         char line[1024];
407
408         while ((fgets(line, sizeof(line), stdin) != NULL)) {
409                 uint16_t link_state;
410                 uint32_t references;
411                 char *tok, *t, *name;
412                 struct interface *iface;
413
414                 if (line[0] == '\n') {
415                         break;
416                 }
417
418                 /* Get rid of pesky newline */
419                 if ((t = strchr(line, '\n')) != NULL) {
420                         *t = '\0';
421                 }
422
423                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
424                         continue;
425                 }
426
427                 /* Leading colon... */
428                 // tok = strtok(line, ":");
429
430                 /* name */
431                 tok = strtok(line, ":");
432                 if (tok == NULL) {
433                         fprintf(stderr, "bad line (%s) - missing name\n", line);
434                         continue;
435                 }
436                 name = tok;
437
438                 /* link_state */
439                 tok = strtok(NULL, ":");
440                 if (tok == NULL) {
441                         fprintf(stderr, "bad line (%s) - missing link state\n",
442                                 line);
443                         continue;
444                 }
445                 link_state = (uint16_t)strtoul(tok, NULL, 0);
446
447                 /* references... */
448                 tok = strtok(NULL, ":");
449                 if (tok == NULL) {
450                         fprintf(stderr, "bad line (%s) - missing references\n",
451                                 line);
452                         continue;
453                 }
454                 references = (uint32_t)strtoul(tok, NULL, 0);
455
456                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
457                                                   struct interface,
458                                                   iface_map->num + 1);
459                 if (iface_map->iface == NULL) {
460                         goto fail;
461                 }
462
463                 iface = &iface_map->iface[iface_map->num];
464
465                 iface->name = talloc_strdup(iface_map, name);
466                 if (iface->name == NULL) {
467                         goto fail;
468                 }
469                 iface->link_up = link_state;
470                 iface->references = references;
471
472                 iface_map->num += 1;
473         }
474
475         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
476         return true;
477
478 fail:
479         fprintf(stderr, "Parsing interfaces failed\n");
480         return false;
481 }
482
483 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
484 {
485         struct vnn_map *vnn_map;
486
487         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
488         if (vnn_map == NULL) {
489                 fprintf(stderr, "Memory error\n");
490                 return NULL;
491         }
492         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
493         vnn_map->generation = INVALID_GENERATION;
494
495         return vnn_map;
496 }
497
498 /* Read vnn map.
499  * output:
500  *   <GENERATION>
501  *   <LMASTER0>
502  *   <LMASTER1>
503  *   ...
504  */
505
506 static bool vnnmap_parse(struct vnn_map *vnn_map)
507 {
508         char line[1024];
509
510         while (fgets(line, sizeof(line), stdin) != NULL) {
511                 uint32_t n;
512                 char *t;
513
514                 if (line[0] == '\n') {
515                         break;
516                 }
517
518                 /* Get rid of pesky newline */
519                 if ((t = strchr(line, '\n')) != NULL) {
520                         *t = '\0';
521                 }
522
523                 n = (uint32_t) strtol(line, NULL, 0);
524
525                 /* generation */
526                 if (vnn_map->generation == INVALID_GENERATION) {
527                         vnn_map->generation = n;
528                         continue;
529                 }
530
531                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
532                                               vnn_map->size + 1);
533                 if (vnn_map->map == NULL) {
534                         fprintf(stderr, "Memory error\n");
535                         goto fail;
536                 }
537
538                 vnn_map->map[vnn_map->size] = n;
539                 vnn_map->size += 1;
540         }
541
542         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
543         return true;
544
545 fail:
546         fprintf(stderr, "Parsing vnnmap failed\n");
547         return false;
548 }
549
550 static bool reclock_parse(struct ctdbd_context *ctdb)
551 {
552         char line[1024];
553         char *t;
554
555         if (fgets(line, sizeof(line), stdin) == NULL) {
556                 goto fail;
557         }
558
559         if (line[0] == '\n') {
560                 /* Recovery lock remains unset */
561                 goto ok;
562         }
563
564         /* Get rid of pesky newline */
565         if ((t = strchr(line, '\n')) != NULL) {
566                 *t = '\0';
567         }
568
569         ctdb->reclock = talloc_strdup(ctdb, line);
570         if (ctdb->reclock == NULL) {
571                 goto fail;
572         }
573 ok:
574         /* Swallow possible blank line following section.  Picky
575          * compiler settings don't allow the return value to be
576          * ignored, so make the compiler happy.
577          */
578         if (fgets(line, sizeof(line), stdin) == NULL) {
579                 ;
580         }
581         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
582         return true;
583
584 fail:
585         fprintf(stderr, "Parsing reclock failed\n");
586         return false;
587 }
588
589 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx)
590 {
591         struct database_map *db_map;
592
593         db_map = talloc_zero(mem_ctx, struct database_map);
594         if (db_map == NULL) {
595                 return NULL;
596         }
597
598         return db_map;
599 }
600
601 /* Read a database map from stdin.  Each line looks like:
602  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
603  * EOF or a blank line terminates input.
604  *
605  * By default, flags and seq_num are 0
606  */
607
608 static bool dbmap_parse(struct database_map *db_map)
609 {
610         char line[1024];
611
612         while ((fgets(line, sizeof(line), stdin) != NULL)) {
613                 uint32_t id;
614                 uint8_t flags = 0;
615                 uint32_t seq_num = 0;
616                 char *tok, *t;
617                 char *name;
618                 struct database *db;
619
620                 if (line[0] == '\n') {
621                         break;
622                 }
623
624                 /* Get rid of pesky newline */
625                 if ((t = strchr(line, '\n')) != NULL) {
626                         *t = '\0';
627                 }
628
629                 /* Get ID */
630                 tok = strtok(line, " \t");
631                 if (tok == NULL) {
632                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
633                         continue;
634                 }
635                 id = (uint32_t)strtoul(tok, NULL, 0);
636
637                 /* Get NAME */
638                 tok = strtok(NULL, " \t");
639                 if (tok == NULL) {
640                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
641                         continue;
642                 }
643                 name = talloc_strdup(db_map, tok);
644                 if (name == NULL) {
645                         goto fail;
646                 }
647
648                 /* Get flags */
649                 tok = strtok(NULL, " \t");
650                 while (tok != NULL) {
651                         if (strcmp(tok, "PERSISTENT") == 0) {
652                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
653                         } else if (strcmp(tok, "STICKY") == 0) {
654                                 flags |= CTDB_DB_FLAGS_STICKY;
655                         } else if (strcmp(tok, "READONLY") == 0) {
656                                 flags |= CTDB_DB_FLAGS_READONLY;
657                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
658                                 if ((flags & CTDB_DB_FLAGS_PERSISTENT) == 0) {
659                                         fprintf(stderr,
660                                                 "seq_num for volatile db\n");
661                                         goto fail;
662                                 }
663                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
664                         }
665
666                         tok = strtok(NULL, " \t");
667                 }
668
669                 db_map->db = talloc_realloc(db_map, db_map->db,
670                                             struct database,
671                                             db_map->num_dbs + 1);
672                 if (db_map->db == NULL) {
673                         goto fail;
674                 }
675                 db = &db_map->db[db_map->num_dbs];
676
677                 db->id = id;
678                 db->name = name;
679                 db->flags = flags;
680                 db->seq_num = seq_num;
681
682                 db_map->num_dbs += 1;
683         }
684
685         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
686         return true;
687
688 fail:
689         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
690         return false;
691
692 }
693
694 static struct database *database_find(struct database_map *map,
695                                       uint32_t db_id)
696 {
697         int i;
698
699         for (i = 0; i < map->num_dbs; i++) {
700                 struct database *db = &map->db[i];
701
702                 if (db->id == db_id) {
703                         return db;
704                 }
705         }
706
707         return NULL;
708 }
709
710 static bool public_ips_parse(struct ctdbd_context *ctdb,
711                              uint32_t numnodes)
712 {
713         if (numnodes == 0) {
714                 D_ERR("Must initialise nodemap before public IPs\n");
715                 return false;
716         }
717
718         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
719
720         return (ctdb->known_ips != NULL);
721 }
722
723 /*
724  * CTDB context setup
725  */
726
727 static uint32_t new_generation(uint32_t old_generation)
728 {
729         uint32_t generation;
730
731         while (1) {
732                 generation = random();
733                 if (generation != INVALID_GENERATION &&
734                     generation != old_generation) {
735                         break;
736                 }
737         }
738
739         return generation;
740 }
741
742 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx)
743 {
744         struct ctdbd_context *ctdb;
745         char line[1024];
746         bool status;
747
748         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
749         if (ctdb == NULL) {
750                 return NULL;
751         }
752
753         ctdb->node_map = nodemap_init(ctdb);
754         if (ctdb->node_map == NULL) {
755                 goto fail;
756         }
757
758         ctdb->iface_map = interfaces_init(ctdb);
759         if (ctdb->iface_map == NULL) {
760                 goto fail;
761         }
762
763         ctdb->vnn_map = vnnmap_init(ctdb);
764         if (ctdb->vnn_map == NULL) {
765                 goto fail;
766         }
767
768         ctdb->db_map = dbmap_init(ctdb);
769         if (ctdb->db_map == NULL) {
770                 goto fail;
771         }
772
773         while (fgets(line, sizeof(line), stdin) != NULL) {
774                 char *t;
775
776                 if ((t = strchr(line, '\n')) != NULL) {
777                         *t = '\0';
778                 }
779
780                 if (strcmp(line, "NODEMAP") == 0) {
781                         status = nodemap_parse(ctdb->node_map);
782                 } else if (strcmp(line, "IFACES") == 0) {
783                         status = interfaces_parse(ctdb->iface_map);
784                 } else if (strcmp(line, "VNNMAP") == 0) {
785                         status = vnnmap_parse(ctdb->vnn_map);
786                 } else if (strcmp(line, "DBMAP") == 0) {
787                         status = dbmap_parse(ctdb->db_map);
788                 } else if (strcmp(line, "PUBLICIPS") == 0) {
789                         status = public_ips_parse(ctdb,
790                                                   ctdb->node_map->num_nodes);
791                 } else if (strcmp(line, "RECLOCK") == 0) {
792                         status = reclock_parse(ctdb);
793                 } else {
794                         fprintf(stderr, "Unknown line %s\n", line);
795                         status = false;
796                 }
797
798                 if (! status) {
799                         goto fail;
800                 }
801         }
802
803         ctdb->start_time = tevent_timeval_current();
804         ctdb->recovery_start_time = tevent_timeval_current();
805         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
806         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
807                 ctdb->vnn_map->generation =
808                         new_generation(ctdb->vnn_map->generation);
809         }
810         ctdb->recovery_end_time = tevent_timeval_current();
811
812         ctdb->log_level = DEBUG_ERR;
813         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
814
815         ctdb_tunable_set_defaults(&ctdb->tun_list);
816
817         ctdb->monitoring_mode = CTDB_MONITORING_ENABLED;
818
819         return ctdb;
820
821 fail:
822         TALLOC_FREE(ctdb);
823         return NULL;
824 }
825
826 static bool ctdbd_verify(struct ctdbd_context *ctdb)
827 {
828         struct node *node;
829         int i;
830
831         if (ctdb->node_map->num_nodes == 0) {
832                 return true;
833         }
834
835         /* Make sure all the nodes are in order */
836         for (i=0; i<ctdb->node_map->num_nodes; i++) {
837                 node = &ctdb->node_map->node[i];
838                 if (node->pnn != i) {
839                         fprintf(stderr, "Expected node %u, found %u\n",
840                                 i, node->pnn);
841                         return false;
842                 }
843         }
844
845         node = &ctdb->node_map->node[ctdb->node_map->pnn];
846         if (node->flags & NODE_FLAGS_DISCONNECTED) {
847                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
848                 exit(0);
849         }
850
851         return true;
852 }
853
854 /*
855  * Doing a recovery
856  */
857
858 struct recover_state {
859         struct tevent_context *ev;
860         struct ctdbd_context *ctdb;
861 };
862
863 static int recover_check(struct tevent_req *req);
864 static void recover_wait_done(struct tevent_req *subreq);
865 static void recover_done(struct tevent_req *subreq);
866
867 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
868                                        struct tevent_context *ev,
869                                        struct ctdbd_context *ctdb)
870 {
871         struct tevent_req *req;
872         struct recover_state *state;
873         int ret;
874
875         req = tevent_req_create(mem_ctx, &state, struct recover_state);
876         if (req == NULL) {
877                 return NULL;
878         }
879
880         state->ev = ev;
881         state->ctdb = ctdb;
882
883         ret = recover_check(req);
884         if (ret != 0) {
885                 tevent_req_error(req, ret);
886                 return tevent_req_post(req, ev);
887         }
888
889         return req;
890 }
891
892 static int recover_check(struct tevent_req *req)
893 {
894         struct recover_state *state = tevent_req_data(
895                 req, struct recover_state);
896         struct ctdbd_context *ctdb = state->ctdb;
897         struct tevent_req *subreq;
898         bool recovery_disabled;
899         int i;
900
901         recovery_disabled = false;
902         for (i=0; i<ctdb->node_map->num_nodes; i++) {
903                 if (ctdb->node_map->node[i].recovery_disabled) {
904                         recovery_disabled = true;
905                         break;
906                 }
907         }
908
909         subreq = tevent_wakeup_send(state, state->ev,
910                                     tevent_timeval_current_ofs(1, 0));
911         if (subreq == NULL) {
912                 return ENOMEM;
913         }
914
915         if (recovery_disabled) {
916                 tevent_req_set_callback(subreq, recover_wait_done, req);
917         } else {
918                 ctdb->recovery_start_time = tevent_timeval_current();
919                 tevent_req_set_callback(subreq, recover_done, req);
920         }
921
922         return 0;
923 }
924
925 static void recover_wait_done(struct tevent_req *subreq)
926 {
927         struct tevent_req *req = tevent_req_callback_data(
928                 subreq, struct tevent_req);
929         int ret;
930         bool status;
931
932         status = tevent_wakeup_recv(subreq);
933         TALLOC_FREE(subreq);
934         if (! status) {
935                 tevent_req_error(req, EIO);
936                 return;
937         }
938
939         ret = recover_check(req);
940         if (ret != 0) {
941                 tevent_req_error(req, ret);
942         }
943 }
944
945 static void recover_done(struct tevent_req *subreq)
946 {
947         struct tevent_req *req = tevent_req_callback_data(
948                 subreq, struct tevent_req);
949         struct recover_state *state = tevent_req_data(
950                 req, struct recover_state);
951         struct ctdbd_context *ctdb = state->ctdb;
952         bool status;
953
954         status = tevent_wakeup_recv(subreq);
955         TALLOC_FREE(subreq);
956         if (! status) {
957                 tevent_req_error(req, EIO);
958                 return;
959         }
960
961         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
962         ctdb->recovery_end_time = tevent_timeval_current();
963         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
964
965         tevent_req_done(req);
966 }
967
968 static bool recover_recv(struct tevent_req *req, int *perr)
969 {
970         int err;
971
972         if (tevent_req_is_unix_error(req, &err)) {
973                 if (perr != NULL) {
974                         *perr = err;
975                 }
976                 return false;
977         }
978
979         return true;
980 }
981
982 /*
983  * Routines for ctdb_req_header
984  */
985
986 static void header_fix_pnn(struct ctdb_req_header *header,
987                            struct ctdbd_context *ctdb)
988 {
989         if (header->srcnode == CTDB_CURRENT_NODE) {
990                 header->srcnode = ctdb->node_map->pnn;
991         }
992
993         if (header->destnode == CTDB_CURRENT_NODE) {
994                 header->destnode = ctdb->node_map->pnn;
995         }
996 }
997
998 static struct ctdb_req_header header_reply_control(
999                                         struct ctdb_req_header *header,
1000                                         struct ctdbd_context *ctdb)
1001 {
1002         struct ctdb_req_header reply_header;
1003
1004         reply_header = (struct ctdb_req_header) {
1005                 .ctdb_magic = CTDB_MAGIC,
1006                 .ctdb_version = CTDB_PROTOCOL,
1007                 .generation = ctdb->vnn_map->generation,
1008                 .operation = CTDB_REPLY_CONTROL,
1009                 .destnode = header->srcnode,
1010                 .srcnode = header->destnode,
1011                 .reqid = header->reqid,
1012         };
1013
1014         return reply_header;
1015 }
1016
1017 static struct ctdb_req_header header_reply_message(
1018                                         struct ctdb_req_header *header,
1019                                         struct ctdbd_context *ctdb)
1020 {
1021         struct ctdb_req_header reply_header;
1022
1023         reply_header = (struct ctdb_req_header) {
1024                 .ctdb_magic = CTDB_MAGIC,
1025                 .ctdb_version = CTDB_PROTOCOL,
1026                 .generation = ctdb->vnn_map->generation,
1027                 .operation = CTDB_REQ_MESSAGE,
1028                 .destnode = header->srcnode,
1029                 .srcnode = header->destnode,
1030                 .reqid = 0,
1031         };
1032
1033         return reply_header;
1034 }
1035
1036 /*
1037  * Client state
1038  */
1039
1040 struct client_state {
1041         struct tevent_context *ev;
1042         int fd;
1043         struct ctdbd_context *ctdb;
1044         int pnn;
1045         struct comm_context *comm;
1046         struct srvid_register_state *rstate;
1047         int status;
1048 };
1049
1050 /*
1051  * Send replies to controls and messages
1052  */
1053
1054 static void client_reply_done(struct tevent_req *subreq);
1055
1056 static void client_send_message(struct tevent_req *req,
1057                                 struct ctdb_req_header *header,
1058                                 struct ctdb_req_message_data *message)
1059 {
1060         struct client_state *state = tevent_req_data(
1061                 req, struct client_state);
1062         struct ctdbd_context *ctdb = state->ctdb;
1063         struct tevent_req *subreq;
1064         struct ctdb_req_header reply_header;
1065         uint8_t *buf;
1066         size_t datalen, buflen;
1067         int ret;
1068
1069         reply_header = header_reply_message(header, ctdb);
1070
1071         datalen = ctdb_req_message_data_len(&reply_header, message);
1072         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1073         if (ret != 0) {
1074                 tevent_req_error(req, ret);
1075                 return;
1076         }
1077
1078         ret = ctdb_req_message_data_push(&reply_header, message,
1079                                          buf, &buflen);
1080         if (ret != 0) {
1081                 tevent_req_error(req, ret);
1082                 return;
1083         }
1084
1085         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1086
1087         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1088         if (tevent_req_nomem(subreq, req)) {
1089                 return;
1090         }
1091         tevent_req_set_callback(subreq, client_reply_done, req);
1092
1093         talloc_steal(subreq, buf);
1094 }
1095
1096 static void client_send_control(struct tevent_req *req,
1097                                 struct ctdb_req_header *header,
1098                                 struct ctdb_reply_control *reply)
1099 {
1100         struct client_state *state = tevent_req_data(
1101                 req, struct client_state);
1102         struct ctdbd_context *ctdb = state->ctdb;
1103         struct tevent_req *subreq;
1104         struct ctdb_req_header reply_header;
1105         uint8_t *buf;
1106         size_t datalen, buflen;
1107         int ret;
1108
1109         reply_header = header_reply_control(header, ctdb);
1110
1111         datalen = ctdb_reply_control_len(&reply_header, reply);
1112         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1113         if (ret != 0) {
1114                 tevent_req_error(req, ret);
1115                 return;
1116         }
1117
1118         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1119         if (ret != 0) {
1120                 tevent_req_error(req, ret);
1121                 return;
1122         }
1123
1124         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1125
1126         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1127         if (tevent_req_nomem(subreq, req)) {
1128                 return;
1129         }
1130         tevent_req_set_callback(subreq, client_reply_done, req);
1131
1132         talloc_steal(subreq, buf);
1133 }
1134
1135 static void client_reply_done(struct tevent_req *subreq)
1136 {
1137         struct tevent_req *req = tevent_req_callback_data(
1138                 subreq, struct tevent_req);
1139         int ret;
1140         bool status;
1141
1142         status = comm_write_recv(subreq, &ret);
1143         TALLOC_FREE(subreq);
1144         if (! status) {
1145                 tevent_req_error(req, ret);
1146         }
1147 }
1148
1149 /*
1150  * Handling protocol - controls
1151  */
1152
1153 static void control_process_exists(TALLOC_CTX *mem_ctx,
1154                                    struct tevent_req *req,
1155                                    struct ctdb_req_header *header,
1156                                    struct ctdb_req_control *request)
1157 {
1158         struct ctdb_reply_control reply;
1159
1160         reply.rdata.opcode = request->opcode;
1161         reply.status = kill(request->rdata.data.pid, 0);
1162         reply.errmsg = NULL;
1163
1164         client_send_control(req, header, &reply);
1165 }
1166
1167 static void control_ping(TALLOC_CTX *mem_ctx,
1168                          struct tevent_req *req,
1169                          struct ctdb_req_header *header,
1170                          struct ctdb_req_control *request)
1171 {
1172         struct client_state *state = tevent_req_data(
1173                 req, struct client_state);
1174         struct ctdbd_context *ctdb = state->ctdb;
1175         struct ctdb_reply_control reply;
1176
1177         reply.rdata.opcode = request->opcode;
1178         reply.status = ctdb->num_clients;
1179         reply.errmsg = NULL;
1180
1181         client_send_control(req, header, &reply);
1182 }
1183
1184 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1185                               struct tevent_req *req,
1186                               struct ctdb_req_header *header,
1187                               struct ctdb_req_control *request)
1188 {
1189         struct client_state *state = tevent_req_data(
1190                 req, struct client_state);
1191         struct ctdbd_context *ctdb = state->ctdb;
1192         struct ctdb_reply_control reply;
1193         struct database *db;
1194
1195         reply.rdata.opcode = request->opcode;
1196
1197         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1198         if (db == NULL) {
1199                 reply.status = ENOENT;
1200                 reply.errmsg = "Database not found";
1201         } else {
1202                 const char *base;
1203                 if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
1204                         base = "/var/lib/ctdb/persistent";
1205                 } else {
1206                         base = "/var/run/ctdb/DB_DIR";
1207                 }
1208                 reply.rdata.data.db_path =
1209                         talloc_asprintf(mem_ctx, "%s/%s.%u",
1210                                         base, db->name, header->destnode);
1211                 if (reply.rdata.data.db_path == NULL) {
1212                         reply.status = ENOMEM;
1213                         reply.errmsg = "Memory error";
1214                 } else {
1215                         reply.status = 0;
1216                         reply.errmsg = NULL;
1217                 }
1218         }
1219
1220         client_send_control(req, header, &reply);
1221 }
1222
1223 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1224                               struct tevent_req *req,
1225                               struct ctdb_req_header *header,
1226                               struct ctdb_req_control *request)
1227 {
1228         struct client_state *state = tevent_req_data(
1229                 req, struct client_state);
1230         struct ctdbd_context *ctdb = state->ctdb;
1231         struct ctdb_reply_control reply;
1232         struct ctdb_vnn_map *vnnmap;
1233
1234         reply.rdata.opcode = request->opcode;
1235
1236         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1237         if (vnnmap == NULL) {
1238                 reply.status = ENOMEM;
1239                 reply.errmsg = "Memory error";
1240         } else {
1241                 vnnmap->generation = ctdb->vnn_map->generation;
1242                 vnnmap->size = ctdb->vnn_map->size;
1243                 vnnmap->map = ctdb->vnn_map->map;
1244
1245                 reply.rdata.data.vnnmap = vnnmap;
1246                 reply.status = 0;
1247                 reply.errmsg = NULL;
1248         }
1249
1250         client_send_control(req, header, &reply);
1251 }
1252
1253 static void control_get_debug(TALLOC_CTX *mem_ctx,
1254                               struct tevent_req *req,
1255                               struct ctdb_req_header *header,
1256                               struct ctdb_req_control *request)
1257 {
1258         struct client_state *state = tevent_req_data(
1259                 req, struct client_state);
1260         struct ctdbd_context *ctdb = state->ctdb;
1261         struct ctdb_reply_control reply;
1262
1263         reply.rdata.opcode = request->opcode;
1264         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1265         reply.status = 0;
1266         reply.errmsg = NULL;
1267
1268         client_send_control(req, header, &reply);
1269 }
1270
1271 static void control_set_debug(TALLOC_CTX *mem_ctx,
1272                               struct tevent_req *req,
1273                               struct ctdb_req_header *header,
1274                               struct ctdb_req_control *request)
1275 {
1276         struct client_state *state = tevent_req_data(
1277                 req, struct client_state);
1278         struct ctdbd_context *ctdb = state->ctdb;
1279         struct ctdb_reply_control reply;
1280
1281         ctdb->log_level = (int)request->rdata.data.loglevel;
1282
1283         reply.rdata.opcode = request->opcode;
1284         reply.status = 0;
1285         reply.errmsg = NULL;
1286
1287         client_send_control(req, header, &reply);
1288 }
1289
1290 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1291                               struct tevent_req *req,
1292                                struct ctdb_req_header *header,
1293                               struct ctdb_req_control *request)
1294 {
1295         struct client_state *state = tevent_req_data(
1296                 req, struct client_state);
1297         struct ctdbd_context *ctdb = state->ctdb;
1298         struct ctdb_reply_control reply;
1299         struct ctdb_dbid_map *dbmap;
1300         int i;
1301
1302         reply.rdata.opcode = request->opcode;
1303
1304         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1305         if (dbmap == NULL) {
1306                 goto fail;
1307         }
1308
1309         dbmap->num = ctdb->db_map->num_dbs;
1310         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1311         if (dbmap->dbs == NULL) {
1312                 goto fail;
1313         }
1314
1315         for (i = 0; i < dbmap->num; i++) {
1316                 struct database *db = &ctdb->db_map->db[i];
1317                 dbmap->dbs[i] = (struct ctdb_dbid) {
1318                         .db_id = db->id,
1319                         .flags = db->flags,
1320                 };
1321         }
1322
1323         reply.rdata.data.dbmap = dbmap;
1324         reply.status = 0;
1325         reply.errmsg = NULL;
1326         client_send_control(req, header, &reply);
1327         return;
1328
1329 fail:
1330         reply.status = -1;
1331         reply.errmsg = "Memory error";
1332         client_send_control(req, header, &reply);
1333 }
1334
1335 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1336                                 struct tevent_req *req,
1337                                 struct ctdb_req_header *header,
1338                                 struct ctdb_req_control *request)
1339 {
1340         struct client_state *state = tevent_req_data(
1341                 req, struct client_state);
1342         struct ctdbd_context *ctdb = state->ctdb;
1343         struct ctdb_reply_control reply;
1344
1345         reply.rdata.opcode = request->opcode;
1346         reply.status = ctdb->vnn_map->recmode;
1347         reply.errmsg = NULL;
1348
1349         client_send_control(req, header, &reply);
1350 }
1351
1352 struct set_recmode_state {
1353         struct tevent_req *req;
1354         struct ctdbd_context *ctdb;
1355         struct ctdb_req_header header;
1356         struct ctdb_reply_control reply;
1357 };
1358
1359 static void set_recmode_callback(struct tevent_req *subreq)
1360 {
1361         struct set_recmode_state *substate = tevent_req_callback_data(
1362                 subreq, struct set_recmode_state);
1363         bool status;
1364         int ret;
1365
1366         status = recover_recv(subreq, &ret);
1367         TALLOC_FREE(subreq);
1368         if (! status) {
1369                 substate->reply.status = ret;
1370                 substate->reply.errmsg = "recovery failed";
1371         } else {
1372                 substate->reply.status = 0;
1373                 substate->reply.errmsg = NULL;
1374         }
1375
1376         client_send_control(substate->req, &substate->header, &substate->reply);
1377         talloc_free(substate);
1378 }
1379
1380 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1381                                 struct tevent_req *req,
1382                                 struct ctdb_req_header *header,
1383                                 struct ctdb_req_control *request)
1384 {
1385         struct client_state *state = tevent_req_data(
1386                 req, struct client_state);
1387         struct tevent_req *subreq;
1388         struct ctdbd_context *ctdb = state->ctdb;
1389         struct set_recmode_state *substate;
1390         struct ctdb_reply_control reply;
1391
1392         reply.rdata.opcode = request->opcode;
1393
1394         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1395                 reply.status = -1;
1396                 reply.errmsg = "Client cannot set recmode to NORMAL";
1397                 goto fail;
1398         }
1399
1400         substate = talloc_zero(ctdb, struct set_recmode_state);
1401         if (substate == NULL) {
1402                 reply.status = -1;
1403                 reply.errmsg = "Memory error";
1404                 goto fail;
1405         }
1406
1407         substate->req = req;
1408         substate->ctdb = ctdb;
1409         substate->header = *header;
1410         substate->reply.rdata.opcode = request->opcode;
1411
1412         subreq = recover_send(substate, state->ev, state->ctdb);
1413         if (subreq == NULL) {
1414                 talloc_free(substate);
1415                 goto fail;
1416         }
1417         tevent_req_set_callback(subreq, set_recmode_callback, substate);
1418
1419         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
1420         return;
1421
1422 fail:
1423         client_send_control(req, header, &reply);
1424
1425 }
1426
1427 static int srvid_register_state_destructor(struct srvid_register_state *rstate)
1428 {
1429         DLIST_REMOVE(rstate->ctdb->rstate, rstate);
1430         return 0;
1431 }
1432
1433 static void control_register_srvid(TALLOC_CTX *mem_ctx,
1434                                    struct tevent_req *req,
1435                                    struct ctdb_req_header *header,
1436                                    struct ctdb_req_control *request)
1437 {
1438         struct client_state *state = tevent_req_data(
1439                 req, struct client_state);
1440         struct ctdbd_context *ctdb = state->ctdb;
1441         struct ctdb_reply_control reply;
1442         struct srvid_register_state *rstate;
1443
1444         reply.rdata.opcode = request->opcode;
1445
1446         rstate = talloc_zero(ctdb, struct srvid_register_state);
1447         if (rstate == NULL) {
1448                 reply.status = -1;
1449                 reply.errmsg = "Memory error";
1450                 goto fail;
1451         }
1452         rstate->ctdb = ctdb;
1453         rstate->srvid = request->srvid;
1454
1455         talloc_set_destructor(rstate, srvid_register_state_destructor);
1456
1457         DLIST_ADD_END(ctdb->rstate, rstate);
1458
1459         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", rstate->srvid));
1460
1461         reply.status = 0;
1462         reply.errmsg = NULL;
1463
1464 fail:
1465         client_send_control(req, header, &reply);
1466 }
1467
1468 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
1469                                      struct tevent_req *req,
1470                                      struct ctdb_req_header *header,
1471                                      struct ctdb_req_control *request)
1472 {
1473         struct client_state *state = tevent_req_data(
1474                 req, struct client_state);
1475         struct ctdbd_context *ctdb = state->ctdb;
1476         struct ctdb_reply_control reply;
1477         struct srvid_register_state *rstate = NULL;
1478
1479         reply.rdata.opcode = request->opcode;
1480
1481         for (rstate = ctdb->rstate; rstate != NULL; rstate = rstate->next) {
1482                 if (rstate->srvid == request->srvid) {
1483                         break;
1484                 }
1485         }
1486
1487         if (rstate == NULL) {
1488                 reply.status = -1;
1489                 reply.errmsg = "srvid not registered";
1490                 goto fail;
1491         }
1492
1493         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", rstate->srvid));
1494         talloc_free(rstate);
1495
1496         reply.status = 0;
1497         reply.errmsg = NULL;
1498
1499         client_send_control(req, header, &reply);
1500         return;
1501
1502 fail:
1503         TALLOC_FREE(rstate);
1504         client_send_control(req, header, &reply);
1505 }
1506
1507 static void control_get_dbname(TALLOC_CTX *mem_ctx,
1508                                struct tevent_req *req,
1509                                struct ctdb_req_header *header,
1510                                struct ctdb_req_control *request)
1511 {
1512         struct client_state *state = tevent_req_data(
1513                 req, struct client_state);
1514         struct ctdbd_context *ctdb = state->ctdb;
1515         struct ctdb_reply_control reply;
1516         struct database *db;
1517
1518         reply.rdata.opcode = request->opcode;
1519
1520         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1521         if (db == NULL) {
1522                 reply.status = ENOENT;
1523                 reply.errmsg = "Database not found";
1524         } else {
1525                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
1526                 if (reply.rdata.data.db_name == NULL) {
1527                         reply.status = ENOMEM;
1528                         reply.errmsg = "Memory error";
1529                 } else {
1530                         reply.status = 0;
1531                         reply.errmsg = NULL;
1532                 }
1533         }
1534
1535         client_send_control(req, header, &reply);
1536 }
1537
1538 static void control_get_pid(TALLOC_CTX *mem_ctx,
1539                             struct tevent_req *req,
1540                             struct ctdb_req_header *header,
1541                             struct ctdb_req_control *request)
1542 {
1543         struct ctdb_reply_control reply;
1544
1545         reply.rdata.opcode = request->opcode;
1546         reply.status = getpid();
1547         reply.errmsg = NULL;
1548
1549         client_send_control(req, header, &reply);
1550 }
1551
1552 static void control_get_recmaster(TALLOC_CTX *mem_ctx,
1553                                   struct tevent_req *req,
1554                                   struct ctdb_req_header *header,
1555                                   struct ctdb_req_control *request)
1556 {
1557         struct client_state *state = tevent_req_data(
1558                 req, struct client_state);
1559         struct ctdbd_context *ctdb = state->ctdb;
1560         struct ctdb_reply_control reply;
1561
1562         reply.rdata.opcode = request->opcode;
1563         reply.status = ctdb->node_map->recmaster;
1564         reply.errmsg = NULL;
1565
1566         client_send_control(req, header, &reply);
1567 }
1568
1569 static void control_get_pnn(TALLOC_CTX *mem_ctx,
1570                             struct tevent_req *req,
1571                             struct ctdb_req_header *header,
1572                             struct ctdb_req_control *request)
1573 {
1574         struct ctdb_reply_control reply;
1575
1576         reply.rdata.opcode = request->opcode;
1577         reply.status = header->destnode;
1578         reply.errmsg = NULL;
1579
1580         client_send_control(req, header, &reply);
1581 }
1582
1583 static void control_shutdown(TALLOC_CTX *mem_ctx,
1584                              struct tevent_req *req,
1585                              struct ctdb_req_header *hdr,
1586                              struct ctdb_req_control *request)
1587 {
1588         struct client_state *state = tevent_req_data(
1589                 req, struct client_state);
1590
1591         state->status = 99;
1592 }
1593
1594 static void control_get_monmode(TALLOC_CTX *mem_ctx,
1595                                 struct tevent_req *req,
1596                                 struct ctdb_req_header *header,
1597                                 struct ctdb_req_control *request)
1598 {
1599         struct client_state *state = tevent_req_data(
1600                 req, struct client_state);
1601         struct ctdbd_context *ctdb = state->ctdb;
1602         struct ctdb_reply_control reply;
1603
1604         reply.rdata.opcode = request->opcode;
1605         reply.status = ctdb->monitoring_mode;
1606         reply.errmsg = NULL;
1607
1608         client_send_control(req, header, &reply);
1609 }
1610
1611 static void control_set_tunable(TALLOC_CTX *mem_ctx,
1612                                 struct tevent_req *req,
1613                                 struct ctdb_req_header *header,
1614                                 struct ctdb_req_control *request)
1615 {
1616         struct client_state *state = tevent_req_data(
1617                 req, struct client_state);
1618         struct ctdbd_context *ctdb = state->ctdb;
1619         struct ctdb_reply_control reply;
1620         bool ret, obsolete;
1621
1622         reply.rdata.opcode = request->opcode;
1623         reply.errmsg = NULL;
1624
1625         ret = ctdb_tunable_set_value(&ctdb->tun_list,
1626                                      request->rdata.data.tunable->name,
1627                                      request->rdata.data.tunable->value,
1628                                      &obsolete);
1629         if (! ret) {
1630                 reply.status = -1;
1631         } else if (obsolete) {
1632                 reply.status = 1;
1633         } else {
1634                 reply.status = 0;
1635         }
1636
1637         client_send_control(req, header, &reply);
1638 }
1639
1640 static void control_get_tunable(TALLOC_CTX *mem_ctx,
1641                                 struct tevent_req *req,
1642                                 struct ctdb_req_header *header,
1643                                 struct ctdb_req_control *request)
1644 {
1645         struct client_state *state = tevent_req_data(
1646                 req, struct client_state);
1647         struct ctdbd_context *ctdb = state->ctdb;
1648         struct ctdb_reply_control reply;
1649         uint32_t value;
1650         bool ret;
1651
1652         reply.rdata.opcode = request->opcode;
1653         reply.errmsg = NULL;
1654
1655         ret = ctdb_tunable_get_value(&ctdb->tun_list,
1656                                      request->rdata.data.tun_var, &value);
1657         if (! ret) {
1658                 reply.status = -1;
1659         } else {
1660                 reply.rdata.data.tun_value = value;
1661                 reply.status = 0;
1662         }
1663
1664         client_send_control(req, header, &reply);
1665 }
1666
1667 static void control_list_tunables(TALLOC_CTX *mem_ctx,
1668                                   struct tevent_req *req,
1669                                   struct ctdb_req_header *header,
1670                                   struct ctdb_req_control *request)
1671 {
1672         struct ctdb_reply_control reply;
1673         struct ctdb_var_list *var_list;
1674
1675         reply.rdata.opcode = request->opcode;
1676         reply.errmsg = NULL;
1677
1678         var_list = ctdb_tunable_names(mem_ctx);
1679         if (var_list == NULL) {
1680                 reply.status = -1;
1681         } else {
1682                 reply.rdata.data.tun_var_list = var_list;
1683                 reply.status = 0;
1684         }
1685
1686         client_send_control(req, header, &reply);
1687 }
1688
1689 static void control_modify_flags(TALLOC_CTX *mem_ctx,
1690                                  struct tevent_req *req,
1691                                  struct ctdb_req_header *header,
1692                                  struct ctdb_req_control *request)
1693 {
1694         struct client_state *state = tevent_req_data(
1695                 req, struct client_state);
1696         struct ctdbd_context *ctdb = state->ctdb;
1697         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
1698         struct ctdb_reply_control reply;
1699         struct node *node;
1700
1701         reply.rdata.opcode = request->opcode;
1702
1703         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
1704             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
1705                 DEBUG(DEBUG_INFO,
1706                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
1707                 reply.status = EINVAL;
1708                 reply.errmsg = "Failed to MODIFY_FLAGS";
1709                 client_send_control(req, header, &reply);
1710                 return;
1711         }
1712
1713         /* There's all sorts of broadcast weirdness here.  Only change
1714          * the specified node, not the destination node of the
1715          * control. */
1716         node = &ctdb->node_map->node[change->pnn];
1717
1718         if ((node->flags &
1719              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
1720             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
1721                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
1722                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
1723                 goto done;
1724         }
1725
1726         if ((node->flags &
1727              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
1728             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
1729                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
1730                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
1731                 goto done;
1732         }
1733
1734         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
1735
1736 done:
1737         reply.status = 0;
1738         reply.errmsg = NULL;
1739         client_send_control(req, header, &reply);
1740 }
1741
1742 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
1743                                      struct tevent_req *req,
1744                                      struct ctdb_req_header *header,
1745                                      struct ctdb_req_control *request)
1746 {
1747         struct client_state *state = tevent_req_data(
1748                 req, struct client_state);
1749         struct ctdbd_context *ctdb = state->ctdb;
1750         struct ctdb_reply_control reply;
1751
1752         reply.rdata.opcode = request->opcode;
1753         reply.rdata.data.tun_list = &ctdb->tun_list;
1754         reply.status = 0;
1755         reply.errmsg = NULL;
1756
1757         client_send_control(req, header, &reply);
1758 }
1759
1760 static void control_uptime(TALLOC_CTX *mem_ctx,
1761                            struct tevent_req *req,
1762                            struct ctdb_req_header *header,
1763                            struct ctdb_req_control *request)
1764 {
1765         struct client_state *state = tevent_req_data(
1766                 req, struct client_state);
1767         struct ctdbd_context *ctdb = state->ctdb;
1768         struct ctdb_reply_control reply;
1769         struct ctdb_uptime *uptime;;
1770
1771         reply.rdata.opcode = request->opcode;
1772
1773         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
1774         if (uptime == NULL) {
1775                 goto fail;
1776         }
1777
1778         uptime->current_time = tevent_timeval_current();
1779         uptime->ctdbd_start_time = ctdb->start_time;
1780         uptime->last_recovery_started = ctdb->recovery_start_time;
1781         uptime->last_recovery_finished = ctdb->recovery_end_time;
1782
1783         reply.rdata.data.uptime = uptime;
1784         reply.status = 0;
1785         reply.errmsg = NULL;
1786         client_send_control(req, header, &reply);
1787         return;
1788
1789 fail:
1790         reply.status = -1;
1791         reply.errmsg = "Memory error";
1792         client_send_control(req, header, &reply);
1793 }
1794
1795 static void control_enable_monitor(TALLOC_CTX *mem_ctx,
1796                                    struct tevent_req *req,
1797                                    struct ctdb_req_header *header,
1798                                    struct ctdb_req_control *request)
1799 {
1800         struct client_state *state = tevent_req_data(
1801                 req, struct client_state);
1802         struct ctdbd_context *ctdb = state->ctdb;
1803         struct ctdb_reply_control reply;
1804
1805         ctdb->monitoring_mode = CTDB_MONITORING_ENABLED;
1806
1807         reply.rdata.opcode = request->opcode;
1808         reply.status = 0;
1809         reply.errmsg = NULL;
1810         client_send_control(req, header, &reply);
1811 }
1812
1813 static void control_disable_monitor(TALLOC_CTX *mem_ctx,
1814                                     struct tevent_req *req,
1815                                     struct ctdb_req_header *header,
1816                                     struct ctdb_req_control *request)
1817 {
1818         struct client_state *state = tevent_req_data(
1819                 req, struct client_state);
1820         struct ctdbd_context *ctdb = state->ctdb;
1821         struct ctdb_reply_control reply;
1822
1823         ctdb->monitoring_mode = CTDB_MONITORING_DISABLED;
1824
1825         reply.rdata.opcode = request->opcode;
1826         reply.status = 0;
1827         reply.errmsg = NULL;
1828         client_send_control(req, header, &reply);
1829 }
1830
1831 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
1832                                       struct tevent_req *req,
1833                                       struct ctdb_req_header *header,
1834                                       struct ctdb_req_control *request)
1835 {
1836         struct client_state *state = tevent_req_data(
1837                 req, struct client_state);
1838         struct ctdbd_context *ctdb = state->ctdb;
1839         struct ctdb_reply_control reply;
1840         struct ctdb_node_map *nodemap;
1841         struct node_map *node_map = ctdb->node_map;
1842         int i;
1843
1844         reply.rdata.opcode = request->opcode;
1845
1846         nodemap = read_nodes_file(mem_ctx, header->destnode);
1847         if (nodemap == NULL) {
1848                 goto fail;
1849         }
1850
1851         for (i=0; i<nodemap->num; i++) {
1852                 struct node *node;
1853
1854                 if (i < node_map->num_nodes &&
1855                     ctdb_sock_addr_same(&nodemap->node[i].addr,
1856                                         &node_map->node[i].addr)) {
1857                         continue;
1858                 }
1859
1860                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
1861                         node = &node_map->node[i];
1862
1863                         node->flags |= NODE_FLAGS_DELETED;
1864                         parse_ip("0.0.0.0", NULL, 0, &node->addr);
1865
1866                         continue;
1867                 }
1868
1869                 if (i < node_map->num_nodes &&
1870                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
1871                         node = &node_map->node[i];
1872
1873                         node->flags &= ~NODE_FLAGS_DELETED;
1874                         node->addr = nodemap->node[i].addr;
1875
1876                         continue;
1877                 }
1878
1879                 node_map->node = talloc_realloc(node_map, node_map->node,
1880                                                 struct node,
1881                                                 node_map->num_nodes+1);
1882                 if (node_map->node == NULL) {
1883                         goto fail;
1884                 }
1885                 node = &node_map->node[node_map->num_nodes];
1886
1887                 node->addr = nodemap->node[i].addr;
1888                 node->pnn = nodemap->node[i].pnn;
1889                 node->flags = 0;
1890                 node->capabilities = CTDB_CAP_DEFAULT;
1891                 node->recovery_disabled = false;
1892                 node->recovery_substate = NULL;
1893
1894                 node_map->num_nodes += 1;
1895         }
1896
1897         talloc_free(nodemap);
1898
1899         reply.status = 0;
1900         reply.errmsg = NULL;
1901         client_send_control(req, header, &reply);
1902         return;
1903
1904 fail:
1905         reply.status = -1;
1906         reply.errmsg = "Memory error";
1907         client_send_control(req, header, &reply);
1908 }
1909
1910 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
1911                                      struct tevent_req *req,
1912                                      struct ctdb_req_header *header,
1913                                      struct ctdb_req_control *request)
1914 {
1915         struct client_state *state = tevent_req_data(
1916                 req, struct client_state);
1917         struct ctdbd_context *ctdb = state->ctdb;
1918         struct ctdb_reply_control reply;
1919         struct node *node;
1920         uint32_t caps = 0;
1921
1922         reply.rdata.opcode = request->opcode;
1923
1924         node = &ctdb->node_map->node[header->destnode];
1925         caps = node->capabilities;
1926
1927         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
1928                 /* Don't send reply */
1929                 return;
1930         }
1931
1932         reply.rdata.data.caps = caps;
1933         reply.status = 0;
1934         reply.errmsg = NULL;
1935
1936         client_send_control(req, header, &reply);
1937 }
1938
1939 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
1940                                 struct tevent_req *req,
1941                                 struct ctdb_req_header *header,
1942                                 struct ctdb_req_control *request)
1943 {
1944         struct client_state *state = tevent_req_data(
1945                 req, struct client_state);
1946         struct ctdbd_context *ctdb = state->ctdb;
1947         struct ctdb_reply_control reply;
1948         struct ctdb_node_map *nodemap;
1949         struct node *node;
1950         int i;
1951
1952         reply.rdata.opcode = request->opcode;
1953
1954         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
1955         if (nodemap == NULL) {
1956                 goto fail;
1957         }
1958
1959         nodemap->num = ctdb->node_map->num_nodes;
1960         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
1961                                      nodemap->num);
1962         if (nodemap->node == NULL) {
1963                 goto fail;
1964         }
1965
1966         for (i=0; i<nodemap->num; i++) {
1967                 node = &ctdb->node_map->node[i];
1968                 nodemap->node[i] = (struct ctdb_node_and_flags) {
1969                         .pnn = node->pnn,
1970                         .flags = node->flags,
1971                         .addr = node->addr,
1972                 };
1973         }
1974
1975         reply.rdata.data.nodemap = nodemap;
1976         reply.status = 0;
1977         reply.errmsg = NULL;
1978         client_send_control(req, header, &reply);
1979         return;
1980
1981 fail:
1982         reply.status = -1;
1983         reply.errmsg = "Memory error";
1984         client_send_control(req, header, &reply);
1985 }
1986
1987 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
1988                                      struct tevent_req *req,
1989                                      struct ctdb_req_header *header,
1990                                      struct ctdb_req_control *request)
1991 {
1992         struct client_state *state = tevent_req_data(
1993                 req, struct client_state);
1994         struct ctdbd_context *ctdb = state->ctdb;
1995         struct ctdb_reply_control reply;
1996
1997         reply.rdata.opcode = request->opcode;
1998
1999         if (ctdb->reclock != NULL) {
2000                 reply.rdata.data.reclock_file =
2001                         talloc_strdup(mem_ctx, ctdb->reclock);
2002                 if (reply.rdata.data.reclock_file == NULL) {
2003                         reply.status = ENOMEM;
2004                         reply.errmsg = "Memory error";
2005                         goto done;
2006                 }
2007         } else {
2008                 reply.rdata.data.reclock_file = NULL;
2009         }
2010
2011         reply.status = 0;
2012         reply.errmsg = NULL;
2013
2014 done:
2015         client_send_control(req, header, &reply);
2016 }
2017
2018 static void control_stop_node(TALLOC_CTX *mem_ctx,
2019                               struct tevent_req *req,
2020                               struct ctdb_req_header *header,
2021                               struct ctdb_req_control *request)
2022 {
2023         struct client_state *state = tevent_req_data(
2024                 req, struct client_state);
2025         struct ctdbd_context *ctdb = state->ctdb;
2026         struct ctdb_reply_control reply;
2027
2028         reply.rdata.opcode = request->opcode;
2029
2030         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2031         ctdb->monitoring_mode = CTDB_MONITORING_DISABLED;
2032         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2033
2034         reply.status = 0;
2035         reply.errmsg = NULL;
2036
2037         client_send_control(req, header, &reply);
2038         return;
2039 }
2040
2041 static void control_continue_node(TALLOC_CTX *mem_ctx,
2042                                   struct tevent_req *req,
2043                                   struct ctdb_req_header *header,
2044                                   struct ctdb_req_control *request)
2045 {
2046         struct client_state *state = tevent_req_data(
2047                 req, struct client_state);
2048         struct ctdbd_context *ctdb = state->ctdb;
2049         struct ctdb_reply_control reply;
2050
2051         reply.rdata.opcode = request->opcode;
2052
2053         DEBUG(DEBUG_INFO, ("Continue node\n"));
2054         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2055
2056         reply.status = 0;
2057         reply.errmsg = NULL;
2058
2059         client_send_control(req, header, &reply);
2060         return;
2061 }
2062
2063 static void set_ban_state_callback(struct tevent_req *subreq)
2064 {
2065         struct node *node = tevent_req_callback_data(
2066                 subreq, struct node);
2067         bool status;
2068
2069         status = tevent_wakeup_recv(subreq);
2070         TALLOC_FREE(subreq);
2071         if (! status) {
2072                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2073         }
2074
2075         node->flags &= ~NODE_FLAGS_BANNED;
2076 }
2077
2078 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2079                                   struct tevent_req *req,
2080                                   struct ctdb_req_header *header,
2081                                   struct ctdb_req_control *request)
2082 {
2083         struct client_state *state = tevent_req_data(
2084                 req, struct client_state);
2085         struct tevent_req *subreq;
2086         struct ctdbd_context *ctdb = state->ctdb;
2087         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2088         struct ctdb_reply_control reply;
2089         struct node *node;
2090
2091         reply.rdata.opcode = request->opcode;
2092
2093         if (ban->pnn != header->destnode) {
2094                 DEBUG(DEBUG_INFO,
2095                       ("SET_BAN_STATE control for PNN %d rejected\n",
2096                        ban->pnn));
2097                 reply.status = EINVAL;
2098                 goto fail;
2099         }
2100
2101         node = &ctdb->node_map->node[header->destnode];
2102
2103         if (ban->time == 0) {
2104                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2105                 node->flags &= ~NODE_FLAGS_BANNED;
2106                 goto done;
2107         }
2108
2109         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2110                                     tevent_timeval_current_ofs(
2111                                             ban->time, 0));
2112         if (subreq == NULL) {
2113                 reply.status = ENOMEM;
2114                 goto fail;
2115         }
2116         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2117
2118         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2119         node->flags |= NODE_FLAGS_BANNED;
2120         ctdb->vnn_map->generation = INVALID_GENERATION;
2121
2122 done:
2123         reply.status = 0;
2124         reply.errmsg = NULL;
2125
2126         client_send_control(req, header, &reply);
2127         return;
2128
2129 fail:
2130         reply.errmsg = "Failed to ban node";
2131 }
2132
2133 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
2134                                struct tevent_req *req,
2135                                struct ctdb_req_header *header,
2136                                struct ctdb_req_control *request)
2137 {
2138         struct client_state *state = tevent_req_data(
2139                 req, struct client_state);
2140         struct ctdbd_context *ctdb = state->ctdb;
2141         struct ctdb_reply_control reply;
2142         struct database *db;
2143
2144         reply.rdata.opcode = request->opcode;
2145
2146         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2147         if (db == NULL) {
2148                 reply.status = ENOENT;
2149                 reply.errmsg = "Database not found";
2150         } else {
2151                 reply.rdata.data.seqnum = db->seq_num;
2152                 reply.status = 0;
2153                 reply.errmsg = NULL;
2154         }
2155
2156         client_send_control(req, header, &reply);
2157 }
2158
2159 static void control_db_get_health(TALLOC_CTX *mem_ctx,
2160                                   struct tevent_req *req,
2161                                   struct ctdb_req_header *header,
2162                                   struct ctdb_req_control *request)
2163 {
2164         struct client_state *state = tevent_req_data(
2165                 req, struct client_state);
2166         struct ctdbd_context *ctdb = state->ctdb;
2167         struct ctdb_reply_control reply;
2168         struct database *db;
2169
2170         reply.rdata.opcode = request->opcode;
2171
2172         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2173         if (db == NULL) {
2174                 reply.status = ENOENT;
2175                 reply.errmsg = "Database not found";
2176         } else {
2177                 reply.rdata.data.reason = NULL;
2178                 reply.status = 0;
2179                 reply.errmsg = NULL;
2180         }
2181
2182         client_send_control(req, header, &reply);
2183 }
2184
2185 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
2186                                struct tevent_req *req,
2187                                struct ctdb_req_header *header,
2188                                struct ctdb_req_control *request)
2189 {
2190         struct client_state *state = tevent_req_data(
2191                 req, struct client_state);
2192         struct ctdbd_context *ctdb = state->ctdb;
2193         struct ctdb_reply_control reply;
2194         struct ctdb_iface_list *iface_list;
2195         struct interface *iface;
2196         int i;
2197
2198         reply.rdata.opcode = request->opcode;
2199
2200         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
2201         if (iface_list == NULL) {
2202                 goto fail;
2203         }
2204
2205         iface_list->num = ctdb->iface_map->num;
2206         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
2207                                          iface_list->num);
2208         if (iface_list->iface == NULL) {
2209                 goto fail;
2210         }
2211
2212         for (i=0; i<iface_list->num; i++) {
2213                 iface = &ctdb->iface_map->iface[i];
2214                 iface_list->iface[i] = (struct ctdb_iface) {
2215                         .link_state = iface->link_up,
2216                         .references = iface->references,
2217                 };
2218                 strlcpy(iface_list->iface[i].name, iface->name,
2219                         sizeof(iface_list->iface[i].name));
2220         }
2221
2222         reply.rdata.data.iface_list = iface_list;
2223         reply.status = 0;
2224         reply.errmsg = NULL;
2225         client_send_control(req, header, &reply);
2226         return;
2227
2228 fail:
2229         reply.status = -1;
2230         reply.errmsg = "Memory error";
2231         client_send_control(req, header, &reply);
2232 }
2233
2234 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
2235                                          struct tevent_req *req,
2236                                          struct ctdb_req_header *header,
2237                                          struct ctdb_req_control *request)
2238 {
2239         struct client_state *state = tevent_req_data(
2240                 req, struct client_state);
2241         struct ctdbd_context *ctdb = state->ctdb;
2242         struct ctdb_reply_control reply;
2243         struct ctdb_iface *in_iface;
2244         struct interface *iface = NULL;
2245         bool link_up = false;
2246         int i;
2247
2248         reply.rdata.opcode = request->opcode;
2249
2250         in_iface = request->rdata.data.iface;
2251
2252         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
2253                 reply.errmsg = "interface name not terminated";
2254                 goto fail;
2255         }
2256
2257         switch (in_iface->link_state) {
2258                 case 0:
2259                         link_up = false;
2260                         break;
2261
2262                 case 1:
2263                         link_up = true;
2264                         break;
2265
2266                 default:
2267                         reply.errmsg = "invalid link state";
2268                         goto fail;
2269         }
2270
2271         if (in_iface->references != 0) {
2272                 reply.errmsg = "references should be 0";
2273                 goto fail;
2274         }
2275
2276         for (i=0; i<ctdb->iface_map->num; i++) {
2277                 if (strcmp(ctdb->iface_map->iface[i].name,
2278                            in_iface->name) == 0) {
2279                         iface = &ctdb->iface_map->iface[i];
2280                         break;
2281                 }
2282         }
2283
2284         if (iface == NULL) {
2285                 reply.errmsg = "interface not found";
2286                 goto fail;
2287         }
2288
2289         iface->link_up = link_up;
2290
2291         reply.status = 0;
2292         reply.errmsg = NULL;
2293         client_send_control(req, header, &reply);
2294         return;
2295
2296 fail:
2297         reply.status = -1;
2298         client_send_control(req, header, &reply);
2299 }
2300
2301 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
2302                                     struct tevent_req *req,
2303                                     struct ctdb_req_header *header,
2304                                     struct ctdb_req_control *request)
2305 {
2306         struct client_state *state = tevent_req_data(
2307                 req, struct client_state);
2308         struct ctdbd_context *ctdb = state->ctdb;
2309         struct ctdb_reply_control reply;
2310         struct database *db;
2311
2312         reply.rdata.opcode = request->opcode;
2313
2314         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2315         if (db == NULL) {
2316                 reply.status = ENOENT;
2317                 reply.errmsg = "Database not found";
2318                 goto done;
2319         }
2320
2321         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
2322                 reply.status = EINVAL;
2323                 reply.errmsg = "Can not set READONLY on persistent db";
2324                 goto done;
2325         }
2326
2327         db->flags |= CTDB_DB_FLAGS_READONLY;
2328         reply.status = 0;
2329         reply.errmsg = NULL;
2330
2331 done:
2332         client_send_control(req, header, &reply);
2333 }
2334
2335 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
2336                                     struct tevent_req *req,
2337                                     struct ctdb_req_header *header,
2338                                     struct ctdb_req_control *request)
2339 {
2340         struct client_state *state = tevent_req_data(
2341                 req, struct client_state);
2342         struct ctdbd_context *ctdb = state->ctdb;
2343         struct ctdb_reply_control reply;
2344         struct database *db;
2345
2346         reply.rdata.opcode = request->opcode;
2347
2348         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2349         if (db == NULL) {
2350                 reply.status = ENOENT;
2351                 reply.errmsg = "Database not found";
2352                 goto done;
2353         }
2354
2355         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
2356                 reply.status = EINVAL;
2357                 reply.errmsg = "Can not set STICKY on persistent db";
2358                 goto done;
2359         }
2360
2361         db->flags |= CTDB_DB_FLAGS_STICKY;
2362         reply.status = 0;
2363         reply.errmsg = NULL;
2364
2365 done:
2366         client_send_control(req, header, &reply);
2367 }
2368
2369 static void control_get_runstate(TALLOC_CTX *mem_ctx,
2370                                  struct tevent_req *req,
2371                                  struct ctdb_req_header *header,
2372                                  struct ctdb_req_control *request)
2373 {
2374         struct client_state *state = tevent_req_data(
2375                 req, struct client_state);
2376         struct ctdbd_context *ctdb = state->ctdb;
2377         struct ctdb_reply_control reply;
2378
2379         reply.rdata.opcode = request->opcode;
2380         reply.rdata.data.runstate = ctdb->runstate;
2381         reply.status = 0;
2382         reply.errmsg = NULL;
2383
2384         client_send_control(req, header, &reply);
2385 }
2386
2387 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
2388                                    struct tevent_req *req,
2389                                    struct ctdb_req_header *header,
2390                                    struct ctdb_req_control *request)
2391 {
2392         struct ctdb_reply_control reply;
2393         struct ctdb_node_map *nodemap;
2394
2395         reply.rdata.opcode = request->opcode;
2396
2397         nodemap = read_nodes_file(mem_ctx, header->destnode);
2398         if (nodemap == NULL) {
2399                 goto fail;
2400         }
2401
2402         reply.rdata.data.nodemap = nodemap;
2403         reply.status = 0;
2404         reply.errmsg = NULL;
2405         client_send_control(req, header, &reply);
2406         return;
2407
2408 fail:
2409         reply.status = -1;
2410         reply.errmsg = "Failed to read nodes file";
2411         client_send_control(req, header, &reply);
2412 }
2413
2414 static void control_error(TALLOC_CTX *mem_ctx,
2415                           struct tevent_req *req,
2416                           struct ctdb_req_header *header,
2417                           struct ctdb_req_control *request)
2418 {
2419         struct ctdb_reply_control reply;
2420
2421         reply.rdata.opcode = request->opcode;
2422         reply.status = -1;
2423         reply.errmsg = "Not implemented";
2424
2425         client_send_control(req, header, &reply);
2426 }
2427
2428 /*
2429  * Handling protocol - messages
2430  */
2431
2432 struct disable_recoveries_state {
2433         struct node *node;
2434 };
2435
2436 static void disable_recoveries_callback(struct tevent_req *subreq)
2437 {
2438         struct disable_recoveries_state *substate = tevent_req_callback_data(
2439                 subreq, struct disable_recoveries_state);
2440         bool status;
2441
2442         status = tevent_wakeup_recv(subreq);
2443         TALLOC_FREE(subreq);
2444         if (! status) {
2445                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2446         }
2447
2448         substate->node->recovery_disabled = false;
2449         TALLOC_FREE(substate->node->recovery_substate);
2450 }
2451
2452 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
2453                                        struct tevent_req *req,
2454                                        struct ctdb_req_header *header,
2455                                        struct ctdb_req_message *request)
2456 {
2457         struct client_state *state = tevent_req_data(
2458                 req, struct client_state);
2459         struct tevent_req *subreq;
2460         struct ctdbd_context *ctdb = state->ctdb;
2461         struct disable_recoveries_state *substate;
2462         struct ctdb_disable_message *disable = request->data.disable;
2463         struct ctdb_req_message_data reply;
2464         struct node *node;
2465         int ret = -1;
2466         TDB_DATA data;
2467
2468         node = &ctdb->node_map->node[header->destnode];
2469
2470         if (disable->timeout == 0) {
2471                 TALLOC_FREE(node->recovery_substate);
2472                 node->recovery_disabled = false;
2473                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
2474                                    header->destnode));
2475                 goto done;
2476         }
2477
2478         substate = talloc_zero(ctdb->node_map,
2479                                struct disable_recoveries_state);
2480         if (substate == NULL) {
2481                 goto fail;
2482         }
2483
2484         substate->node = node;
2485
2486         subreq = tevent_wakeup_send(substate, state->ev,
2487                                     tevent_timeval_current_ofs(
2488                                             disable->timeout, 0));
2489         if (subreq == NULL) {
2490                 talloc_free(substate);
2491                 goto fail;
2492         }
2493         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
2494
2495         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
2496                            disable->timeout, header->destnode));
2497         node->recovery_substate = substate;
2498         node->recovery_disabled = true;
2499
2500 done:
2501         ret = header->destnode;
2502
2503 fail:
2504         reply.srvid = disable->srvid;
2505         data.dptr = (uint8_t *)&ret;
2506         data.dsize = sizeof(int);
2507         reply.data = data;
2508
2509         client_send_message(req, header, &reply);
2510 }
2511
2512 static void message_takeover_run(TALLOC_CTX *mem_ctx,
2513                                  struct tevent_req *req,
2514                                  struct ctdb_req_header *header,
2515                                  struct ctdb_req_message *request)
2516 {
2517         struct client_state *state = tevent_req_data(
2518                 req, struct client_state);
2519         struct ctdbd_context *ctdb = state->ctdb;
2520         struct ctdb_srvid_message *srvid = request->data.msg;
2521         struct ctdb_req_message_data reply;
2522         int ret = -1;
2523         TDB_DATA data;
2524
2525         if (header->destnode != ctdb->node_map->recmaster) {
2526                 /* No reply! Only recmaster replies... */
2527                 return;
2528         }
2529
2530         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
2531                            header->destnode));
2532         ret = header->destnode;
2533
2534         reply.srvid = srvid->srvid;
2535         data.dptr = (uint8_t *)&ret;
2536         data.dsize = sizeof(int);
2537         reply.data = data;
2538
2539         client_send_message(req, header, &reply);
2540 }
2541
2542 /*
2543  * Handle a single client
2544  */
2545
2546 static void client_read_handler(uint8_t *buf, size_t buflen,
2547                                 void *private_data);
2548 static void client_dead_handler(void *private_data);
2549 static void client_process_packet(struct tevent_req *req,
2550                                   uint8_t *buf, size_t buflen);
2551 static void client_process_message(struct tevent_req *req,
2552                                    uint8_t *buf, size_t buflen);
2553 static void client_process_control(struct tevent_req *req,
2554                                    uint8_t *buf, size_t buflen);
2555 static void client_reply_done(struct tevent_req *subreq);
2556
2557 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
2558                                       struct tevent_context *ev,
2559                                       int fd, struct ctdbd_context *ctdb,
2560                                       int pnn)
2561 {
2562         struct tevent_req *req;
2563         struct client_state *state;
2564         int ret;
2565
2566         req = tevent_req_create(mem_ctx, &state, struct client_state);
2567         if (req == NULL) {
2568                 return NULL;
2569         }
2570
2571         state->ev = ev;
2572         state->fd = fd;
2573         state->ctdb = ctdb;
2574         state->pnn = pnn;
2575
2576         ret = comm_setup(state, ev, fd, client_read_handler, req,
2577                          client_dead_handler, req, &state->comm);
2578         if (ret != 0) {
2579                 tevent_req_error(req, ret);
2580                 return tevent_req_post(req, ev);
2581         }
2582
2583         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
2584
2585         return req;
2586 }
2587
2588 static void client_read_handler(uint8_t *buf, size_t buflen,
2589                                 void *private_data)
2590 {
2591         struct tevent_req *req = talloc_get_type_abort(
2592                 private_data, struct tevent_req);
2593         struct client_state *state = tevent_req_data(
2594                 req, struct client_state);
2595         struct ctdbd_context *ctdb = state->ctdb;
2596         struct ctdb_req_header header;
2597         int ret, i;
2598
2599         ret = ctdb_req_header_pull(buf, buflen, &header);
2600         if (ret != 0) {
2601                 return;
2602         }
2603
2604         if (buflen != header.length) {
2605                 return;
2606         }
2607
2608         ret = ctdb_req_header_verify(&header, 0);
2609         if (ret != 0) {
2610                 return;
2611         }
2612
2613         header_fix_pnn(&header, ctdb);
2614
2615         if (header.destnode == CTDB_BROADCAST_ALL) {
2616                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
2617                         header.destnode = i;
2618
2619                         ctdb_req_header_push(&header, buf);
2620                         client_process_packet(req, buf, buflen);
2621                 }
2622                 return;
2623         }
2624
2625         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
2626                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
2627                         if (ctdb->node_map->node[i].flags &
2628                             NODE_FLAGS_DISCONNECTED) {
2629                                 continue;
2630                         }
2631
2632                         header.destnode = i;
2633
2634                         ctdb_req_header_push(&header, buf);
2635                         client_process_packet(req, buf, buflen);
2636                 }
2637                 return;
2638         }
2639
2640         if (header.destnode > ctdb->node_map->num_nodes) {
2641                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
2642                         header.destnode);
2643                 return;
2644         }
2645
2646
2647         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
2648                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
2649                         header.destnode);
2650                 return;
2651         }
2652
2653         ctdb_req_header_push(&header, buf);
2654         client_process_packet(req, buf, buflen);
2655 }
2656
2657 static void client_dead_handler(void *private_data)
2658 {
2659         struct tevent_req *req = talloc_get_type_abort(
2660                 private_data, struct tevent_req);
2661
2662         tevent_req_done(req);
2663 }
2664
2665 static void client_process_packet(struct tevent_req *req,
2666                                   uint8_t *buf, size_t buflen)
2667 {
2668         struct ctdb_req_header header;
2669         int ret;
2670
2671         ret = ctdb_req_header_pull(buf, buflen, &header);
2672         if (ret != 0) {
2673                 return;
2674         }
2675
2676         switch (header.operation) {
2677         case CTDB_REQ_MESSAGE:
2678                 client_process_message(req, buf, buflen);
2679                 break;
2680
2681         case CTDB_REQ_CONTROL:
2682                 client_process_control(req, buf, buflen);
2683                 break;
2684
2685         default:
2686                 break;
2687         }
2688 }
2689
2690 static void client_process_message(struct tevent_req *req,
2691                                    uint8_t *buf, size_t buflen)
2692 {
2693         struct client_state *state = tevent_req_data(
2694                 req, struct client_state);
2695         struct ctdbd_context *ctdb = state->ctdb;
2696         TALLOC_CTX *mem_ctx;
2697         struct ctdb_req_header header;
2698         struct ctdb_req_message request;
2699         uint64_t srvid;
2700         int ret;
2701
2702         mem_ctx = talloc_new(state);
2703         if (tevent_req_nomem(mem_ctx, req)) {
2704                 return;
2705         }
2706
2707         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
2708         if (ret != 0) {
2709                 talloc_free(mem_ctx);
2710                 tevent_req_error(req, ret);
2711                 return;
2712         }
2713
2714         header_fix_pnn(&header, ctdb);
2715
2716         if (header.destnode >= ctdb->node_map->num_nodes) {
2717                 /* Many messages are not replied to, so just behave as
2718                  * though this message was not received */
2719                 fprintf(stderr, "Invalid node %d\n", header.destnode);
2720                 talloc_free(mem_ctx);
2721                 return;
2722         }
2723
2724         srvid = request.srvid;
2725         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
2726
2727         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
2728                 message_disable_recoveries(mem_ctx, req, &header, &request);
2729         } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
2730                 message_takeover_run(mem_ctx, req, &header, &request);
2731         }
2732
2733         /* check srvid */
2734         talloc_free(mem_ctx);
2735 }
2736
2737 static void client_process_control(struct tevent_req *req,
2738                                    uint8_t *buf, size_t buflen)
2739 {
2740         struct client_state *state = tevent_req_data(
2741                 req, struct client_state);
2742         struct ctdbd_context *ctdb = state->ctdb;
2743         TALLOC_CTX *mem_ctx;
2744         struct ctdb_req_header header;
2745         struct ctdb_req_control request;
2746         int ret;
2747
2748         mem_ctx = talloc_new(state);
2749         if (tevent_req_nomem(mem_ctx, req)) {
2750                 return;
2751         }
2752
2753         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
2754         if (ret != 0) {
2755                 talloc_free(mem_ctx);
2756                 tevent_req_error(req, ret);
2757                 return;
2758         }
2759
2760         header_fix_pnn(&header, ctdb);
2761
2762         if (header.destnode >= ctdb->node_map->num_nodes) {
2763                 struct ctdb_reply_control reply;
2764
2765                 reply.rdata.opcode = request.opcode;
2766                 reply.errmsg = "Invalid node";
2767                 reply.status = -1;
2768                 client_send_control(req, &header, &reply);
2769                 return;
2770         }
2771
2772         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
2773                            request.opcode, header.reqid));
2774
2775         switch (request.opcode) {
2776         case CTDB_CONTROL_PROCESS_EXISTS:
2777                 control_process_exists(mem_ctx, req, &header, &request);
2778                 break;
2779
2780         case CTDB_CONTROL_PING:
2781                 control_ping(mem_ctx, req, &header, &request);
2782                 break;
2783
2784         case CTDB_CONTROL_GETDBPATH:
2785                 control_getdbpath(mem_ctx, req, &header, &request);
2786                 break;
2787
2788         case CTDB_CONTROL_GETVNNMAP:
2789                 control_getvnnmap(mem_ctx, req, &header, &request);
2790                 break;
2791
2792         case CTDB_CONTROL_GET_DEBUG:
2793                 control_get_debug(mem_ctx, req, &header, &request);
2794                 break;
2795
2796         case CTDB_CONTROL_SET_DEBUG:
2797                 control_set_debug(mem_ctx, req, &header, &request);
2798                 break;
2799
2800         case CTDB_CONTROL_GET_DBMAP:
2801                 control_get_dbmap(mem_ctx, req, &header, &request);
2802                 break;
2803
2804         case CTDB_CONTROL_GET_RECMODE:
2805                 control_get_recmode(mem_ctx, req, &header, &request);
2806                 break;
2807
2808         case CTDB_CONTROL_SET_RECMODE:
2809                 control_set_recmode(mem_ctx, req, &header, &request);
2810                 break;
2811
2812         case CTDB_CONTROL_REGISTER_SRVID:
2813                 control_register_srvid(mem_ctx, req, &header, &request);
2814                 break;
2815
2816         case CTDB_CONTROL_DEREGISTER_SRVID:
2817                 control_deregister_srvid(mem_ctx, req, &header, &request);
2818                 break;
2819
2820         case CTDB_CONTROL_GET_DBNAME:
2821                 control_get_dbname(mem_ctx, req, &header, &request);
2822                 break;
2823
2824         case CTDB_CONTROL_GET_PID:
2825                 control_get_pid(mem_ctx, req, &header, &request);
2826                 break;
2827
2828         case CTDB_CONTROL_GET_RECMASTER:
2829                 control_get_recmaster(mem_ctx, req, &header, &request);
2830                 break;
2831
2832         case CTDB_CONTROL_GET_PNN:
2833                 control_get_pnn(mem_ctx, req, &header, &request);
2834                 break;
2835
2836         case CTDB_CONTROL_SHUTDOWN:
2837                 control_shutdown(mem_ctx, req, &header, &request);
2838                 break;
2839
2840         case CTDB_CONTROL_GET_MONMODE:
2841                 control_get_monmode(mem_ctx, req, &header, &request);
2842                 break;
2843
2844         case CTDB_CONTROL_SET_TUNABLE:
2845                 control_set_tunable(mem_ctx, req, &header, &request);
2846                 break;
2847
2848         case CTDB_CONTROL_GET_TUNABLE:
2849                 control_get_tunable(mem_ctx, req, &header, &request);
2850                 break;
2851
2852         case CTDB_CONTROL_LIST_TUNABLES:
2853                 control_list_tunables(mem_ctx, req, &header, &request);
2854                 break;
2855
2856         case CTDB_CONTROL_MODIFY_FLAGS:
2857                 control_modify_flags(mem_ctx, req, &header, &request);
2858                 break;
2859
2860         case CTDB_CONTROL_GET_ALL_TUNABLES:
2861                 control_get_all_tunables(mem_ctx, req, &header, &request);
2862                 break;
2863
2864         case CTDB_CONTROL_UPTIME:
2865                 control_uptime(mem_ctx, req, &header, &request);
2866                 break;
2867
2868         case CTDB_CONTROL_ENABLE_MONITOR:
2869                 control_enable_monitor(mem_ctx, req, &header, &request);
2870                 break;
2871
2872         case CTDB_CONTROL_DISABLE_MONITOR:
2873                 control_disable_monitor(mem_ctx, req, &header, &request);
2874                 break;
2875
2876         case CTDB_CONTROL_RELOAD_NODES_FILE:
2877                 control_reload_nodes_file(mem_ctx, req, &header, &request);
2878                 break;
2879
2880         case CTDB_CONTROL_GET_CAPABILITIES:
2881                 control_get_capabilities(mem_ctx, req, &header, &request);
2882                 break;
2883
2884         case CTDB_CONTROL_GET_NODEMAP:
2885                 control_get_nodemap(mem_ctx, req, &header, &request);
2886                 break;
2887
2888         case CTDB_CONTROL_GET_RECLOCK_FILE:
2889                 control_get_reclock_file(mem_ctx, req, &header, &request);
2890                 break;
2891
2892         case CTDB_CONTROL_STOP_NODE:
2893                 control_stop_node(mem_ctx, req, &header, &request);
2894                 break;
2895
2896         case CTDB_CONTROL_CONTINUE_NODE:
2897                 control_continue_node(mem_ctx, req, &header, &request);
2898                 break;
2899
2900         case CTDB_CONTROL_SET_BAN_STATE:
2901                 control_set_ban_state(mem_ctx, req, &header, &request);
2902                 break;
2903
2904         case CTDB_CONTROL_GET_DB_SEQNUM:
2905                 control_get_db_seqnum(mem_ctx, req, &header, &request);
2906                 break;
2907
2908         case CTDB_CONTROL_DB_GET_HEALTH:
2909                 control_db_get_health(mem_ctx, req, &header, &request);
2910                 break;
2911
2912         case CTDB_CONTROL_GET_IFACES:
2913                 control_get_ifaces(mem_ctx, req, &header, &request);
2914                 break;
2915
2916         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
2917                 control_set_iface_link_state(mem_ctx, req, &header, &request);
2918                 break;
2919
2920         case CTDB_CONTROL_SET_DB_READONLY:
2921                 control_set_db_readonly(mem_ctx, req, &header, &request);
2922                 break;
2923
2924         case CTDB_CONTROL_SET_DB_STICKY:
2925                 control_set_db_sticky(mem_ctx, req, &header, &request);
2926                 break;
2927
2928         case CTDB_CONTROL_GET_RUNSTATE:
2929                 control_get_runstate(mem_ctx, req, &header, &request);
2930                 break;
2931
2932         case CTDB_CONTROL_GET_NODES_FILE:
2933                 control_get_nodes_file(mem_ctx, req, &header, &request);
2934                 break;
2935
2936         default:
2937                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
2938                         control_error(mem_ctx, req, &header, &request);
2939                 }
2940                 break;
2941         }
2942
2943         talloc_free(mem_ctx);
2944 }
2945
2946 static int client_recv(struct tevent_req *req, int *perr)
2947 {
2948         struct client_state *state = tevent_req_data(
2949                 req, struct client_state);
2950         int err;
2951
2952         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
2953         close(state->fd);
2954
2955         if (tevent_req_is_unix_error(req, &err)) {
2956                 if (perr != NULL) {
2957                         *perr = err;
2958                 }
2959                 return -1;
2960         }
2961
2962         return state->status;
2963 }
2964
2965 /*
2966  * Fake CTDB server
2967  */
2968
2969 struct server_state {
2970         struct tevent_context *ev;
2971         struct ctdbd_context *ctdb;
2972         int fd;
2973 };
2974
2975 static void server_new_client(struct tevent_req *subreq);
2976 static void server_client_done(struct tevent_req *subreq);
2977
2978 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
2979                                       struct tevent_context *ev,
2980                                       struct ctdbd_context *ctdb,
2981                                       int fd)
2982 {
2983         struct tevent_req *req, *subreq;
2984         struct server_state *state;
2985
2986         req = tevent_req_create(mem_ctx, &state, struct server_state);
2987         if (req == NULL) {
2988                 return NULL;
2989         }
2990
2991         state->ev = ev;
2992         state->ctdb = ctdb;
2993         state->fd = fd;
2994
2995         subreq = accept_send(state, ev, fd);
2996         if (tevent_req_nomem(subreq, req)) {
2997                 return tevent_req_post(req, ev);
2998         }
2999         tevent_req_set_callback(subreq, server_new_client, req);
3000
3001         return req;
3002 }
3003
3004 static void server_new_client(struct tevent_req *subreq)
3005 {
3006         struct tevent_req *req = tevent_req_callback_data(
3007                 subreq, struct tevent_req);
3008         struct server_state *state = tevent_req_data(
3009                 req, struct server_state);
3010         struct ctdbd_context *ctdb = state->ctdb;
3011         int client_fd;
3012         int ret = 0;
3013
3014         client_fd = accept_recv(subreq, NULL, NULL, &ret);
3015         TALLOC_FREE(subreq);
3016         if (client_fd == -1) {
3017                 tevent_req_error(req, ret);
3018                 return;
3019         }
3020
3021         subreq = client_send(state, state->ev, client_fd,
3022                              ctdb, ctdb->node_map->pnn);
3023         if (tevent_req_nomem(subreq, req)) {
3024                 return;
3025         }
3026         tevent_req_set_callback(subreq, server_client_done, req);
3027
3028         ctdb->num_clients += 1;
3029
3030         subreq = accept_send(state, state->ev, state->fd);
3031         if (tevent_req_nomem(subreq, req)) {
3032                 return;
3033         }
3034         tevent_req_set_callback(subreq, server_new_client, req);
3035 }
3036
3037 static void server_client_done(struct tevent_req *subreq)
3038 {
3039         struct tevent_req *req = tevent_req_callback_data(
3040                 subreq, struct tevent_req);
3041         struct server_state *state = tevent_req_data(
3042                 req, struct server_state);
3043         struct ctdbd_context *ctdb = state->ctdb;
3044         int ret = 0;
3045         int status;
3046
3047         status = client_recv(subreq, &ret);
3048         TALLOC_FREE(subreq);
3049         if (status < 0) {
3050                 tevent_req_error(req, ret);
3051                 return;
3052         }
3053
3054         ctdb->num_clients -= 1;
3055
3056         if (status == 99) {
3057                 /* Special status, to shutdown server */
3058                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
3059                 tevent_req_done(req);
3060         }
3061 }
3062
3063 static bool server_recv(struct tevent_req *req, int *perr)
3064 {
3065         int err;
3066
3067         if (tevent_req_is_unix_error(req, &err)) {
3068                 if (perr != NULL) {
3069                         *perr = err;
3070                 }
3071                 return false;
3072         }
3073         return true;
3074 }
3075
3076 /*
3077  * Main functions
3078  */
3079
3080 static int socket_init(const char *sockpath)
3081 {
3082         struct sockaddr_un addr;
3083         size_t len;
3084         int ret, fd;
3085
3086         memset(&addr, 0, sizeof(addr));
3087         addr.sun_family = AF_UNIX;
3088
3089         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
3090         if (len >= sizeof(addr.sun_path)) {
3091                 fprintf(stderr, "path too long: %s\n", sockpath);
3092                 return -1;
3093         }
3094
3095         fd = socket(AF_UNIX, SOCK_STREAM, 0);
3096         if (fd == -1) {
3097                 fprintf(stderr, "socket failed - %s\n", sockpath);
3098                 return -1;
3099         }
3100
3101         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
3102         if (ret != 0) {
3103                 fprintf(stderr, "bind failed - %s\n", sockpath);
3104                 goto fail;
3105         }
3106
3107         ret = listen(fd, 10);
3108         if (ret != 0) {
3109                 fprintf(stderr, "listen failed\n");
3110                 goto fail;
3111         }
3112
3113         DEBUG(DEBUG_INFO, ("Socket init done\n"));
3114
3115         return fd;
3116
3117 fail:
3118         if (fd != -1) {
3119                 close(fd);
3120         }
3121         return -1;
3122 }
3123
3124 static struct options {
3125         const char *sockpath;
3126         const char *pidfile;
3127         const char *debuglevel;
3128 } options;
3129
3130 static struct poptOption cmdline_options[] = {
3131         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
3132                 "Unix domain socket path", "filename" },
3133         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
3134                 "pid file", "filename" } ,
3135         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
3136                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
3137 };
3138
3139 static void cleanup(void)
3140 {
3141         unlink(options.sockpath);
3142         unlink(options.pidfile);
3143 }
3144
3145 static void signal_handler(int sig)
3146 {
3147         cleanup();
3148         exit(0);
3149 }
3150
3151 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
3152                          struct ctdbd_context *ctdb, int fd, int pfd)
3153 {
3154         struct tevent_req *req;
3155         int ret = 0;
3156         ssize_t len;
3157
3158         atexit(cleanup);
3159         signal(SIGTERM, signal_handler);
3160
3161         req = server_send(mem_ctx, ev, ctdb, fd);
3162         if (req == NULL) {
3163                 fprintf(stderr, "Memory error\n");
3164                 exit(1);
3165         }
3166
3167         len = write(pfd, &ret, sizeof(ret));
3168         if (len != sizeof(ret)) {
3169                 fprintf(stderr, "Failed to send message to parent\n");
3170                 exit(1);
3171         }
3172         close(pfd);
3173
3174         tevent_req_poll(req, ev);
3175
3176         server_recv(req, &ret);
3177         if (ret != 0) {
3178                 exit(1);
3179         }
3180 }
3181
3182 int main(int argc, const char *argv[])
3183 {
3184         TALLOC_CTX *mem_ctx;
3185         struct ctdbd_context *ctdb;
3186         struct tevent_context *ev;
3187         poptContext pc;
3188         int opt, fd, ret, pfd[2];
3189         ssize_t len;
3190         pid_t pid;
3191         FILE *fp;
3192
3193         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
3194                             POPT_CONTEXT_KEEP_FIRST);
3195         while ((opt = poptGetNextOpt(pc)) != -1) {
3196                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
3197                 exit(1);
3198         }
3199
3200         if (options.sockpath == NULL) {
3201                 fprintf(stderr, "Please specify socket path\n");
3202                 poptPrintHelp(pc, stdout, 0);
3203                 exit(1);
3204         }
3205
3206         if (options.pidfile == NULL) {
3207                 fprintf(stderr, "Please specify pid file\n");
3208                 poptPrintHelp(pc, stdout, 0);
3209                 exit(1);
3210         }
3211
3212         mem_ctx = talloc_new(NULL);
3213         if (mem_ctx == NULL) {
3214                 fprintf(stderr, "Memory error\n");
3215                 exit(1);
3216         }
3217
3218         ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
3219         if (ret != 0) {
3220                 fprintf(stderr, "Invalid debug level\n");
3221                 poptPrintHelp(pc, stdout, 0);
3222                 exit(1);
3223         }
3224
3225         ctdb = ctdbd_setup(mem_ctx);
3226         if (ctdb == NULL) {
3227                 exit(1);
3228         }
3229
3230         if (! ctdbd_verify(ctdb)) {
3231                 exit(1);
3232         }
3233
3234         ev = tevent_context_init(mem_ctx);
3235         if (ev == NULL) {
3236                 fprintf(stderr, "Memory error\n");
3237                 exit(1);
3238         }
3239
3240         fd = socket_init(options.sockpath);
3241         if (fd == -1) {
3242                 exit(1);
3243         }
3244
3245         ret = pipe(pfd);
3246         if (ret != 0) {
3247                 fprintf(stderr, "Failed to create pipe\n");
3248                 cleanup();
3249                 exit(1);
3250         }
3251
3252         pid = fork();
3253         if (pid == -1) {
3254                 fprintf(stderr, "Failed to fork\n");
3255                 cleanup();
3256                 exit(1);
3257         }
3258
3259         if (pid == 0) {
3260                 /* Child */
3261                 close(pfd[0]);
3262                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
3263                 exit(1);
3264         }
3265
3266         /* Parent */
3267         close(pfd[1]);
3268
3269         len = read(pfd[0], &ret, sizeof(ret));
3270         close(pfd[0]);
3271         if (len != sizeof(ret)) {
3272                 fprintf(stderr, "len = %zi\n", len);
3273                 fprintf(stderr, "Failed to get message from child\n");
3274                 kill(pid, SIGTERM);
3275                 exit(1);
3276         }
3277
3278         fp = fopen(options.pidfile, "w");
3279         if (fp == NULL) {
3280                 fprintf(stderr, "Failed to open pid file %s\n",
3281                         options.pidfile);
3282                 kill(pid, SIGTERM);
3283                 exit(1);
3284         }
3285         fprintf(fp, "%d\n", pid);
3286         fclose(fp);
3287
3288         return 0;
3289 }