lib: Give lib/util/util_file.c its own header file
[samba.git] / ctdb / tests / src / fake_ctdbd.c
1 /*
2    Fake CTDB server for testing
3
4    Copyright (C) Amitay Isaacs  2016
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/time.h"
23 #include "system/filesys.h"
24
25 #include <popt.h>
26 #include <talloc.h>
27 #include <tevent.h>
28 #include <tdb.h>
29
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/util/util_file.h"
35 #include "lib/async_req/async_sock.h"
36
37 #include "protocol/protocol.h"
38 #include "protocol/protocol_api.h"
39 #include "protocol/protocol_util.h"
40 #include "protocol/protocol_private.h"
41
42 #include "common/comm.h"
43 #include "common/logging.h"
44 #include "common/tunable.h"
45 #include "common/srvid.h"
46 #include "common/system.h"
47
48 #include "ipalloc_read_known_ips.h"
49
50
51 #define CTDB_PORT 4379
52
53 /* A fake flag that is only supported by some functions */
54 #define NODE_FLAGS_FAKE_TIMEOUT 0x80000000
55
56 struct node {
57         ctdb_sock_addr addr;
58         uint32_t pnn;
59         uint32_t flags;
60         uint32_t capabilities;
61         bool recovery_disabled;
62         void *recovery_substate;
63 };
64
65 struct node_map {
66         uint32_t num_nodes;
67         struct node *node;
68         uint32_t pnn;
69         uint32_t recmaster;
70 };
71
72 struct interface {
73         const char *name;
74         bool link_up;
75         uint32_t references;
76 };
77
78 struct interface_map {
79         int num;
80         struct interface *iface;
81 };
82
83 struct vnn_map {
84         uint32_t recmode;
85         uint32_t generation;
86         uint32_t size;
87         uint32_t *map;
88 };
89
90 struct database {
91         struct database *prev, *next;
92         const char *name;
93         const char *path;
94         struct tdb_context *tdb;
95         uint32_t id;
96         uint8_t flags;
97         uint64_t seq_num;
98 };
99
100 struct database_map {
101         struct database *db;
102         const char *dbdir;
103 };
104
105 struct fake_control_failure {
106         struct fake_control_failure  *prev, *next;
107         enum ctdb_controls opcode;
108         uint32_t pnn;
109         const char *error;
110         const char *comment;
111 };
112
113 struct ctdb_client {
114         struct ctdb_client *prev, *next;
115         struct ctdbd_context *ctdb;
116         pid_t pid;
117         void *state;
118 };
119
120 struct ctdbd_context {
121         struct node_map *node_map;
122         struct interface_map *iface_map;
123         struct vnn_map *vnn_map;
124         struct database_map *db_map;
125         struct srvid_context *srv;
126         int num_clients;
127         struct timeval start_time;
128         struct timeval recovery_start_time;
129         struct timeval recovery_end_time;
130         bool takeover_disabled;
131         int log_level;
132         enum ctdb_runstate runstate;
133         struct ctdb_tunable_list tun_list;
134         char *reclock;
135         struct ctdb_public_ip_list *known_ips;
136         struct fake_control_failure *control_failures;
137         struct ctdb_client *client_list;
138 };
139
140 /*
141  * Parse routines
142  */
143
144 static struct node_map *nodemap_init(TALLOC_CTX *mem_ctx)
145 {
146         struct node_map *node_map;
147
148         node_map = talloc_zero(mem_ctx, struct node_map);
149         if (node_map == NULL) {
150                 return NULL;
151         }
152
153         node_map->pnn = CTDB_UNKNOWN_PNN;
154         node_map->recmaster = CTDB_UNKNOWN_PNN;
155
156         return node_map;
157 }
158
159 /* Read a nodemap from stdin.  Each line looks like:
160  *  <PNN> <FLAGS> [RECMASTER] [CURRENT] [CAPABILITIES]
161  * EOF or a blank line terminates input.
162  *
163  * By default, capabilities for each node are
164  * CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER.  These 2
165  * capabilities can be faked off by adding, for example,
166  * -CTDB_CAP_RECMASTER.
167  */
168
169 static bool nodemap_parse(struct node_map *node_map)
170 {
171         char line[1024];
172
173         while ((fgets(line, sizeof(line), stdin) != NULL)) {
174                 uint32_t pnn, flags, capabilities;
175                 char *tok, *t;
176                 char *ip;
177                 ctdb_sock_addr saddr;
178                 struct node *node;
179                 int ret;
180
181                 if (line[0] == '\n') {
182                         break;
183                 }
184
185                 /* Get rid of pesky newline */
186                 if ((t = strchr(line, '\n')) != NULL) {
187                         *t = '\0';
188                 }
189
190                 /* Get PNN */
191                 tok = strtok(line, " \t");
192                 if (tok == NULL) {
193                         fprintf(stderr, "bad line (%s) - missing PNN\n", line);
194                         continue;
195                 }
196                 pnn = (uint32_t)strtoul(tok, NULL, 0);
197
198                 /* Get IP */
199                 tok = strtok(NULL, " \t");
200                 if (tok == NULL) {
201                         fprintf(stderr, "bad line (%s) - missing IP\n", line);
202                         continue;
203                 }
204                 ret = ctdb_sock_addr_from_string(tok, &saddr, false);
205                 if (ret != 0) {
206                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
207                         continue;
208                 }
209                 ctdb_sock_addr_set_port(&saddr, CTDB_PORT);
210                 ip = talloc_strdup(node_map, tok);
211                 if (ip == NULL) {
212                         goto fail;
213                 }
214
215                 /* Get flags */
216                 tok = strtok(NULL, " \t");
217                 if (tok == NULL) {
218                         fprintf(stderr, "bad line (%s) - missing flags\n",
219                                 line);
220                         continue;
221                 }
222                 flags = (uint32_t)strtoul(tok, NULL, 0);
223                 /* Handle deleted nodes */
224                 if (flags & NODE_FLAGS_DELETED) {
225                         talloc_free(ip);
226                         ip = talloc_strdup(node_map, "0.0.0.0");
227                         if (ip == NULL) {
228                                 goto fail;
229                         }
230                 }
231                 capabilities = CTDB_CAP_RECMASTER|CTDB_CAP_LMASTER;
232
233                 tok = strtok(NULL, " \t");
234                 while (tok != NULL) {
235                         if (strcmp(tok, "CURRENT") == 0) {
236                                 node_map->pnn = pnn;
237                         } else if (strcmp(tok, "RECMASTER") == 0) {
238                                 node_map->recmaster = pnn;
239                         } else if (strcmp(tok, "-CTDB_CAP_RECMASTER") == 0) {
240                                 capabilities &= ~CTDB_CAP_RECMASTER;
241                         } else if (strcmp(tok, "-CTDB_CAP_LMASTER") == 0) {
242                                 capabilities &= ~CTDB_CAP_LMASTER;
243                         } else if (strcmp(tok, "TIMEOUT") == 0) {
244                                 /* This can be done with just a flag
245                                  * value but it is probably clearer
246                                  * and less error-prone to fake this
247                                  * with an explicit token */
248                                 flags |= NODE_FLAGS_FAKE_TIMEOUT;
249                         }
250                         tok = strtok(NULL, " \t");
251                 }
252
253                 node_map->node = talloc_realloc(node_map, node_map->node,
254                                                 struct node,
255                                                 node_map->num_nodes + 1);
256                 if (node_map->node == NULL) {
257                         goto fail;
258                 }
259                 node = &node_map->node[node_map->num_nodes];
260
261                 ret = ctdb_sock_addr_from_string(ip, &node->addr, false);
262                 if (ret != 0) {
263                         fprintf(stderr, "bad line (%s) - invalid IP\n", line);
264                         continue;
265                 }
266                 ctdb_sock_addr_set_port(&node->addr, CTDB_PORT);
267                 node->pnn = pnn;
268                 node->flags = flags;
269                 node->capabilities = capabilities;
270                 node->recovery_disabled = false;
271                 node->recovery_substate = NULL;
272
273                 node_map->num_nodes += 1;
274         }
275
276         if (node_map->num_nodes == 0) {
277                 goto fail;
278         }
279
280         DEBUG(DEBUG_INFO, ("Parsing nodemap done\n"));
281         return true;
282
283 fail:
284         DEBUG(DEBUG_INFO, ("Parsing nodemap failed\n"));
285         return false;
286
287 }
288
289 /* Append a node to a node map with given address and flags */
290 static bool node_map_add(struct ctdb_node_map *nodemap,
291                          const char *nstr, uint32_t flags)
292 {
293         ctdb_sock_addr addr;
294         uint32_t num;
295         struct ctdb_node_and_flags *n;
296         int ret;
297
298         ret = ctdb_sock_addr_from_string(nstr, &addr, false);
299         if (ret != 0) {
300                 fprintf(stderr, "Invalid IP address %s\n", nstr);
301                 return false;
302         }
303         ctdb_sock_addr_set_port(&addr, CTDB_PORT);
304
305         num = nodemap->num;
306         nodemap->node = talloc_realloc(nodemap, nodemap->node,
307                                        struct ctdb_node_and_flags, num+1);
308         if (nodemap->node == NULL) {
309                 return false;
310         }
311
312         n = &nodemap->node[num];
313         n->addr = addr;
314         n->pnn = num;
315         n->flags = flags;
316
317         nodemap->num = num+1;
318         return true;
319 }
320
321 /* Read a nodes file into a node map */
322 static struct ctdb_node_map *ctdb_read_nodes_file(TALLOC_CTX *mem_ctx,
323                                                   const char *nlist)
324 {
325         char **lines;
326         int nlines;
327         int i;
328         struct ctdb_node_map *nodemap;
329
330         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
331         if (nodemap == NULL) {
332                 return NULL;
333         }
334
335         lines = file_lines_load(nlist, &nlines, 0, mem_ctx);
336         if (lines == NULL) {
337                 return NULL;
338         }
339
340         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
341                 nlines--;
342         }
343
344         for (i=0; i<nlines; i++) {
345                 char *node;
346                 uint32_t flags;
347                 size_t len;
348
349                 node = lines[i];
350                 /* strip leading spaces */
351                 while((*node == ' ') || (*node == '\t')) {
352                         node++;
353                 }
354
355                 len = strlen(node);
356
357                 /* strip trailing spaces */
358                 while ((len > 1) &&
359                        ((node[len-1] == ' ') || (node[len-1] == '\t')))
360                 {
361                         node[len-1] = '\0';
362                         len--;
363                 }
364
365                 if (len == 0) {
366                         continue;
367                 }
368                 if (*node == '#') {
369                         /* A "deleted" node is a node that is
370                            commented out in the nodes file.  This is
371                            used instead of removing a line, which
372                            would cause subsequent nodes to change
373                            their PNN. */
374                         flags = NODE_FLAGS_DELETED;
375                         node = discard_const("0.0.0.0");
376                 } else {
377                         flags = 0;
378                 }
379                 if (! node_map_add(nodemap, node, flags)) {
380                         talloc_free(lines);
381                         TALLOC_FREE(nodemap);
382                         return NULL;
383                 }
384         }
385
386         talloc_free(lines);
387         return nodemap;
388 }
389
390 static struct ctdb_node_map *read_nodes_file(TALLOC_CTX *mem_ctx,
391                                              uint32_t pnn)
392 {
393         struct ctdb_node_map *nodemap;
394         char nodes_list[PATH_MAX];
395         const char *ctdb_base;
396         int num;
397
398         ctdb_base = getenv("CTDB_BASE");
399         if (ctdb_base == NULL) {
400                 D_ERR("CTDB_BASE is not set\n");
401                 return NULL;
402         }
403
404         /* read optional node-specific nodes file */
405         num = snprintf(nodes_list, sizeof(nodes_list),
406                        "%s/nodes.%d", ctdb_base, pnn);
407         if (num == sizeof(nodes_list)) {
408                 D_ERR("nodes file path too long\n");
409                 return NULL;
410         }
411         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
412         if (nodemap != NULL) {
413                 /* Fake a load failure for an empty nodemap */
414                 if (nodemap->num == 0) {
415                         talloc_free(nodemap);
416
417                         D_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
418                         return NULL;
419                 }
420
421                 return nodemap;
422         }
423
424         /* read normal nodes file */
425         num = snprintf(nodes_list, sizeof(nodes_list), "%s/nodes", ctdb_base);
426         if (num == sizeof(nodes_list)) {
427                 D_ERR("nodes file path too long\n");
428                 return NULL;
429         }
430         nodemap = ctdb_read_nodes_file(mem_ctx, nodes_list);
431         if (nodemap != NULL) {
432                 return nodemap;
433         }
434
435         DBG_ERR("Failed to read nodes file \"%s\"\n", nodes_list);
436         return NULL;
437 }
438
439 static struct interface_map *interfaces_init(TALLOC_CTX *mem_ctx)
440 {
441         struct interface_map *iface_map;
442
443         iface_map = talloc_zero(mem_ctx, struct interface_map);
444         if (iface_map == NULL) {
445                 return NULL;
446         }
447
448         return iface_map;
449 }
450
451 /* Read interfaces information.  Same format as "ctdb ifaces -Y"
452  * output:
453  *   :Name:LinkStatus:References:
454  *   :eth2:1:4294967294
455  *   :eth1:1:4294967292
456  */
457
458 static bool interfaces_parse(struct interface_map *iface_map)
459 {
460         char line[1024];
461
462         while ((fgets(line, sizeof(line), stdin) != NULL)) {
463                 uint16_t link_state;
464                 uint32_t references;
465                 char *tok, *t, *name;
466                 struct interface *iface;
467
468                 if (line[0] == '\n') {
469                         break;
470                 }
471
472                 /* Get rid of pesky newline */
473                 if ((t = strchr(line, '\n')) != NULL) {
474                         *t = '\0';
475                 }
476
477                 if (strcmp(line, ":Name:LinkStatus:References:") == 0) {
478                         continue;
479                 }
480
481                 /* Leading colon... */
482                 // tok = strtok(line, ":");
483
484                 /* name */
485                 tok = strtok(line, ":");
486                 if (tok == NULL) {
487                         fprintf(stderr, "bad line (%s) - missing name\n", line);
488                         continue;
489                 }
490                 name = tok;
491
492                 /* link_state */
493                 tok = strtok(NULL, ":");
494                 if (tok == NULL) {
495                         fprintf(stderr, "bad line (%s) - missing link state\n",
496                                 line);
497                         continue;
498                 }
499                 link_state = (uint16_t)strtoul(tok, NULL, 0);
500
501                 /* references... */
502                 tok = strtok(NULL, ":");
503                 if (tok == NULL) {
504                         fprintf(stderr, "bad line (%s) - missing references\n",
505                                 line);
506                         continue;
507                 }
508                 references = (uint32_t)strtoul(tok, NULL, 0);
509
510                 iface_map->iface = talloc_realloc(iface_map, iface_map->iface,
511                                                   struct interface,
512                                                   iface_map->num + 1);
513                 if (iface_map->iface == NULL) {
514                         goto fail;
515                 }
516
517                 iface = &iface_map->iface[iface_map->num];
518
519                 iface->name = talloc_strdup(iface_map, name);
520                 if (iface->name == NULL) {
521                         goto fail;
522                 }
523                 iface->link_up = link_state;
524                 iface->references = references;
525
526                 iface_map->num += 1;
527         }
528
529         if (iface_map->num == 0) {
530                 goto fail;
531         }
532
533         DEBUG(DEBUG_INFO, ("Parsing interfaces done\n"));
534         return true;
535
536 fail:
537         fprintf(stderr, "Parsing interfaces failed\n");
538         return false;
539 }
540
541 static struct vnn_map *vnnmap_init(TALLOC_CTX *mem_ctx)
542 {
543         struct vnn_map *vnn_map;
544
545         vnn_map = talloc_zero(mem_ctx, struct vnn_map);
546         if (vnn_map == NULL) {
547                 fprintf(stderr, "Memory error\n");
548                 return NULL;
549         }
550         vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
551         vnn_map->generation = INVALID_GENERATION;
552
553         return vnn_map;
554 }
555
556 /* Read vnn map.
557  * output:
558  *   <GENERATION>
559  *   <LMASTER0>
560  *   <LMASTER1>
561  *   ...
562  */
563
564 static bool vnnmap_parse(struct vnn_map *vnn_map)
565 {
566         char line[1024];
567
568         while (fgets(line, sizeof(line), stdin) != NULL) {
569                 uint32_t n;
570                 char *t;
571
572                 if (line[0] == '\n') {
573                         break;
574                 }
575
576                 /* Get rid of pesky newline */
577                 if ((t = strchr(line, '\n')) != NULL) {
578                         *t = '\0';
579                 }
580
581                 n = (uint32_t) strtol(line, NULL, 0);
582
583                 /* generation */
584                 if (vnn_map->generation == INVALID_GENERATION) {
585                         vnn_map->generation = n;
586                         continue;
587                 }
588
589                 vnn_map->map = talloc_realloc(vnn_map, vnn_map->map, uint32_t,
590                                               vnn_map->size + 1);
591                 if (vnn_map->map == NULL) {
592                         fprintf(stderr, "Memory error\n");
593                         goto fail;
594                 }
595
596                 vnn_map->map[vnn_map->size] = n;
597                 vnn_map->size += 1;
598         }
599
600         if (vnn_map->size == 0) {
601                 goto fail;
602         }
603
604         DEBUG(DEBUG_INFO, ("Parsing vnnmap done\n"));
605         return true;
606
607 fail:
608         fprintf(stderr, "Parsing vnnmap failed\n");
609         return false;
610 }
611
612 static bool reclock_parse(struct ctdbd_context *ctdb)
613 {
614         char line[1024];
615         char *t;
616
617         if (fgets(line, sizeof(line), stdin) == NULL) {
618                 goto fail;
619         }
620
621         if (line[0] == '\n') {
622                 goto fail;
623         }
624
625         /* Get rid of pesky newline */
626         if ((t = strchr(line, '\n')) != NULL) {
627                 *t = '\0';
628         }
629
630         ctdb->reclock = talloc_strdup(ctdb, line);
631         if (ctdb->reclock == NULL) {
632                 goto fail;
633         }
634
635         /* Swallow possible blank line following section.  Picky
636          * compiler settings don't allow the return value to be
637          * ignored, so make the compiler happy.
638          */
639         if (fgets(line, sizeof(line), stdin) == NULL) {
640                 ;
641         }
642         DEBUG(DEBUG_INFO, ("Parsing reclock done\n"));
643         return true;
644
645 fail:
646         fprintf(stderr, "Parsing reclock failed\n");
647         return false;
648 }
649
650 static struct database_map *dbmap_init(TALLOC_CTX *mem_ctx,
651                                        const char *dbdir)
652 {
653         struct database_map *db_map;
654
655         db_map = talloc_zero(mem_ctx, struct database_map);
656         if (db_map == NULL) {
657                 return NULL;
658         }
659
660         db_map->dbdir = talloc_strdup(db_map, dbdir);
661         if (db_map->dbdir == NULL) {
662                 talloc_free(db_map);
663                 return NULL;
664         }
665
666         return db_map;
667 }
668
669 /* Read a database map from stdin.  Each line looks like:
670  *  <ID> <NAME> [FLAGS] [SEQ_NUM]
671  * EOF or a blank line terminates input.
672  *
673  * By default, flags and seq_num are 0
674  */
675
676 static bool dbmap_parse(struct database_map *db_map)
677 {
678         char line[1024];
679
680         while ((fgets(line, sizeof(line), stdin) != NULL)) {
681                 uint32_t id;
682                 uint8_t flags = 0;
683                 uint32_t seq_num = 0;
684                 char *tok, *t;
685                 char *name;
686                 struct database *db;
687
688                 if (line[0] == '\n') {
689                         break;
690                 }
691
692                 /* Get rid of pesky newline */
693                 if ((t = strchr(line, '\n')) != NULL) {
694                         *t = '\0';
695                 }
696
697                 /* Get ID */
698                 tok = strtok(line, " \t");
699                 if (tok == NULL) {
700                         fprintf(stderr, "bad line (%s) - missing ID\n", line);
701                         continue;
702                 }
703                 id = (uint32_t)strtoul(tok, NULL, 0);
704
705                 /* Get NAME */
706                 tok = strtok(NULL, " \t");
707                 if (tok == NULL) {
708                         fprintf(stderr, "bad line (%s) - missing NAME\n", line);
709                         continue;
710                 }
711                 name = talloc_strdup(db_map, tok);
712                 if (name == NULL) {
713                         goto fail;
714                 }
715
716                 /* Get flags */
717                 tok = strtok(NULL, " \t");
718                 while (tok != NULL) {
719                         if (strcmp(tok, "PERSISTENT") == 0) {
720                                 flags |= CTDB_DB_FLAGS_PERSISTENT;
721                         } else if (strcmp(tok, "STICKY") == 0) {
722                                 flags |= CTDB_DB_FLAGS_STICKY;
723                         } else if (strcmp(tok, "READONLY") == 0) {
724                                 flags |= CTDB_DB_FLAGS_READONLY;
725                         } else if (strcmp(tok, "REPLICATED") == 0) {
726                                 flags |= CTDB_DB_FLAGS_REPLICATED;
727                         } else if (tok[0] >= '0'&& tok[0] <= '9') {
728                                 uint8_t nv = CTDB_DB_FLAGS_PERSISTENT |
729                                              CTDB_DB_FLAGS_REPLICATED;
730
731                                 if ((flags & nv) == 0) {
732                                         fprintf(stderr,
733                                                 "seq_num for volatile db\n");
734                                         goto fail;
735                                 }
736                                 seq_num = (uint64_t)strtoull(tok, NULL, 0);
737                         }
738
739                         tok = strtok(NULL, " \t");
740                 }
741
742                 db = talloc_zero(db_map, struct database);
743                 if (db == NULL) {
744                         goto fail;
745                 }
746
747                 db->id = id;
748                 db->name = talloc_steal(db, name);
749                 db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
750                 if (db->path == NULL) {
751                         talloc_free(db);
752                         goto fail;
753                 }
754                 db->flags = flags;
755                 db->seq_num = seq_num;
756
757                 DLIST_ADD_END(db_map->db, db);
758         }
759
760         if (db_map->db == NULL) {
761                 goto fail;
762         }
763
764         DEBUG(DEBUG_INFO, ("Parsing dbmap done\n"));
765         return true;
766
767 fail:
768         DEBUG(DEBUG_INFO, ("Parsing dbmap failed\n"));
769         return false;
770
771 }
772
773 static struct database *database_find(struct database_map *db_map,
774                                       uint32_t db_id)
775 {
776         struct database *db;
777
778         for (db = db_map->db; db != NULL; db = db->next) {
779                 if (db->id == db_id) {
780                         return db;
781                 }
782         }
783
784         return NULL;
785 }
786
787 static int database_count(struct database_map *db_map)
788 {
789         struct database *db;
790         int count = 0;
791
792         for (db = db_map->db; db != NULL; db = db->next) {
793                 count += 1;
794         }
795
796         return count;
797 }
798
799 static int database_flags(uint8_t db_flags)
800 {
801         int tdb_flags = 0;
802
803         if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
804                 tdb_flags = TDB_DEFAULT;
805         } else {
806                 /* volatile and replicated use the same flags */
807                 tdb_flags = TDB_NOSYNC |
808                             TDB_CLEAR_IF_FIRST |
809                             TDB_INCOMPATIBLE_HASH;
810         }
811
812         tdb_flags |= TDB_DISALLOW_NESTING;
813
814         return tdb_flags;
815 }
816
817 static struct database *database_new(struct database_map *db_map,
818                                      const char *name, uint8_t flags)
819 {
820         struct database *db;
821         TDB_DATA key;
822         int tdb_flags;
823
824         db = talloc_zero(db_map, struct database);
825         if (db == NULL) {
826                 return NULL;
827         }
828
829         db->name = talloc_strdup(db, name);
830         if (db->name == NULL) {
831                 goto fail;
832         }
833
834         db->path = talloc_asprintf(db, "%s/%s", db_map->dbdir, name);
835         if (db->path == NULL) {
836                 goto fail;
837         }
838
839         key.dsize = strlen(db->name) + 1;
840         key.dptr = discard_const(db->name);
841
842         db->id = tdb_jenkins_hash(&key);
843         db->flags = flags;
844
845         tdb_flags = database_flags(flags);
846
847         db->tdb = tdb_open(db->path, 8192, tdb_flags, O_CREAT|O_RDWR, 0644);
848         if (db->tdb == NULL) {
849                 DBG_ERR("tdb_open\n");
850                 goto fail;
851         }
852
853         DLIST_ADD_END(db_map->db, db);
854         return db;
855
856 fail:
857         DBG_ERR("Memory error\n");
858         talloc_free(db);
859         return NULL;
860
861 }
862
863 static int ltdb_store(struct database *db, TDB_DATA key,
864                       struct ctdb_ltdb_header *header, TDB_DATA data)
865 {
866         int ret;
867         bool db_volatile = true;
868         bool keep = false;
869
870         if (db->tdb == NULL) {
871                 return EINVAL;
872         }
873
874         if ((db->flags & CTDB_DB_FLAGS_PERSISTENT) ||
875             (db->flags & CTDB_DB_FLAGS_REPLICATED)) {
876                 db_volatile = false;
877         }
878
879         if (data.dsize > 0) {
880                 keep = true;
881         } else {
882                 if (db_volatile && header->rsn == 0) {
883                         keep = true;
884                 }
885         }
886
887         if (keep) {
888                 TDB_DATA rec[2];
889
890                 rec[0].dsize = ctdb_ltdb_header_len(header);
891                 rec[0].dptr = (uint8_t *)header;
892
893                 rec[1].dsize = data.dsize;
894                 rec[1].dptr = data.dptr;
895
896                 ret = tdb_storev(db->tdb, key, rec, 2, TDB_REPLACE);
897         } else {
898                 if (header->rsn > 0) {
899                         ret = tdb_delete(db->tdb, key);
900                 } else {
901                         ret = 0;
902                 }
903         }
904
905         return ret;
906 }
907
908 static int ltdb_fetch(struct database *db, TDB_DATA key,
909                       struct ctdb_ltdb_header *header,
910                       TALLOC_CTX *mem_ctx, TDB_DATA *data)
911 {
912         TDB_DATA rec;
913         size_t np;
914         int ret;
915
916         if (db->tdb == NULL) {
917                 return EINVAL;
918         }
919
920         rec = tdb_fetch(db->tdb, key);
921         ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
922         if (ret != 0) {
923                 if (rec.dptr != NULL) {
924                         free(rec.dptr);
925                 }
926
927                 *header = (struct ctdb_ltdb_header) {
928                         .rsn = 0,
929                         .dmaster = 0,
930                         .flags = 0,
931                 };
932
933                 ret = ltdb_store(db, key, header, tdb_null);
934                 if (ret != 0) {
935                         return ret;
936                 }
937
938                 *data = tdb_null;
939                 return 0;
940         }
941
942         data->dsize = rec.dsize - ctdb_ltdb_header_len(header);
943         data->dptr = talloc_memdup(mem_ctx,
944                                    rec.dptr + ctdb_ltdb_header_len(header),
945                                    data->dsize);
946
947         free(rec.dptr);
948
949         if (data->dptr == NULL) {
950                 return ENOMEM;
951         }
952
953         return 0;
954 }
955
956 static int database_seqnum(struct database *db, uint64_t *seqnum)
957 {
958         const char *keyname = CTDB_DB_SEQNUM_KEY;
959         TDB_DATA key, data;
960         struct ctdb_ltdb_header header;
961         size_t np;
962         int ret;
963
964         if (db->tdb == NULL) {
965                 *seqnum = db->seq_num;
966                 return 0;
967         }
968
969         key.dptr = discard_const(keyname);
970         key.dsize = strlen(keyname) + 1;
971
972         ret = ltdb_fetch(db, key, &header, db, &data);
973         if (ret != 0) {
974                 return ret;
975         }
976
977         if (data.dsize == 0) {
978                 *seqnum = 0;
979                 return 0;
980         }
981
982         ret = ctdb_uint64_pull(data.dptr, data.dsize, seqnum, &np);
983         talloc_free(data.dptr);
984         if (ret != 0) {
985                 *seqnum = 0;
986         }
987
988         return ret;
989 }
990
991 static int ltdb_transaction_update(uint32_t reqid,
992                                    struct ctdb_ltdb_header *no_header,
993                                    TDB_DATA key, TDB_DATA data,
994                                    void *private_data)
995 {
996         struct database *db = (struct database *)private_data;
997         TALLOC_CTX *tmp_ctx = talloc_new(db);
998         struct ctdb_ltdb_header header = { 0 }, oldheader;
999         TDB_DATA olddata;
1000         int ret;
1001
1002         if (db->tdb == NULL) {
1003                 return EINVAL;
1004         }
1005
1006         ret = ctdb_ltdb_header_extract(&data, &header);
1007         if (ret != 0) {
1008                 return ret;
1009         }
1010
1011         ret = ltdb_fetch(db, key, &oldheader, tmp_ctx, &olddata);
1012         if (ret != 0) {
1013                 return ret;
1014         }
1015
1016         if (olddata.dsize > 0) {
1017                 if (oldheader.rsn > header.rsn ||
1018                     (oldheader.rsn == header.rsn &&
1019                      olddata.dsize != data.dsize)) {
1020                         return -1;
1021                 }
1022         }
1023
1024         talloc_free(tmp_ctx);
1025
1026         ret = ltdb_store(db, key, &header, data);
1027         return ret;
1028 }
1029
1030 static int ltdb_transaction(struct database *db,
1031                             struct ctdb_rec_buffer *recbuf)
1032 {
1033         int ret;
1034
1035         if (db->tdb == NULL) {
1036                 return EINVAL;
1037         }
1038
1039         ret = tdb_transaction_start(db->tdb);
1040         if (ret == -1) {
1041                 return ret;
1042         }
1043
1044         ret = ctdb_rec_buffer_traverse(recbuf, ltdb_transaction_update, db);
1045         if (ret != 0) {
1046                 tdb_transaction_cancel(db->tdb);
1047         }
1048
1049         ret = tdb_transaction_commit(db->tdb);
1050         return ret;
1051 }
1052
1053 static bool public_ips_parse(struct ctdbd_context *ctdb,
1054                              uint32_t numnodes)
1055 {
1056         bool status;
1057
1058         if (numnodes == 0) {
1059                 D_ERR("Must initialise nodemap before public IPs\n");
1060                 return false;
1061         }
1062
1063         ctdb->known_ips = ipalloc_read_known_ips(ctdb, numnodes, false);
1064
1065         status = (ctdb->known_ips != NULL && ctdb->known_ips->num != 0);
1066
1067         if (status) {
1068                 D_INFO("Parsing public IPs done\n");
1069         } else {
1070                 D_INFO("Parsing public IPs failed\n");
1071         }
1072
1073         return status;
1074 }
1075
1076 /* Read information about controls to fail.  Format is:
1077  *   <opcode> <pnn> {ERROR|TIMEOUT} <comment>
1078  */
1079 static bool control_failures_parse(struct ctdbd_context *ctdb)
1080 {
1081         char line[1024];
1082
1083         while ((fgets(line, sizeof(line), stdin) != NULL)) {
1084                 char *tok, *t;
1085                 enum ctdb_controls opcode;
1086                 uint32_t pnn;
1087                 const char *error;
1088                 const char *comment;
1089                 struct fake_control_failure *failure = NULL;
1090
1091                 if (line[0] == '\n') {
1092                         break;
1093                 }
1094
1095                 /* Get rid of pesky newline */
1096                 if ((t = strchr(line, '\n')) != NULL) {
1097                         *t = '\0';
1098                 }
1099
1100                 /* Get opcode */
1101                 tok = strtok(line, " \t");
1102                 if (tok == NULL) {
1103                         D_ERR("bad line (%s) - missing opcode\n", line);
1104                         continue;
1105                 }
1106                 opcode = (enum ctdb_controls)strtoul(tok, NULL, 0);
1107
1108                 /* Get PNN */
1109                 tok = strtok(NULL, " \t");
1110                 if (tok == NULL) {
1111                         D_ERR("bad line (%s) - missing PNN\n", line);
1112                         continue;
1113                 }
1114                 pnn = (uint32_t)strtoul(tok, NULL, 0);
1115
1116                 /* Get error */
1117                 tok = strtok(NULL, " \t");
1118                 if (tok == NULL) {
1119                         D_ERR("bad line (%s) - missing errno\n", line);
1120                         continue;
1121                 }
1122                 error = talloc_strdup(ctdb, tok);
1123                 if (error == NULL) {
1124                         goto fail;
1125                 }
1126                 if (strcmp(error, "ERROR") != 0 &&
1127                     strcmp(error, "TIMEOUT") != 0) {
1128                         D_ERR("bad line (%s) "
1129                               "- error must be \"ERROR\" or \"TIMEOUT\"\n",
1130                               line);
1131                         goto fail;
1132                 }
1133
1134                 /* Get comment */
1135                 tok = strtok(NULL, "\n"); /* rest of line */
1136                 if (tok == NULL) {
1137                         D_ERR("bad line (%s) - missing comment\n", line);
1138                         continue;
1139                 }
1140                 comment = talloc_strdup(ctdb, tok);
1141                 if (comment == NULL) {
1142                         goto fail;
1143                 }
1144
1145                 failure = talloc_zero(ctdb, struct fake_control_failure);
1146                 if (failure == NULL) {
1147                         goto fail;
1148                 }
1149
1150                 failure->opcode = opcode;
1151                 failure->pnn = pnn;
1152                 failure->error = error;
1153                 failure->comment = comment;
1154
1155                 DLIST_ADD(ctdb->control_failures, failure);
1156         }
1157
1158         if (ctdb->control_failures == NULL) {
1159                 goto fail;
1160         }
1161
1162         D_INFO("Parsing fake control failures done\n");
1163         return true;
1164
1165 fail:
1166         D_INFO("Parsing fake control failures failed\n");
1167         return false;
1168 }
1169
1170 static bool runstate_parse(struct ctdbd_context *ctdb)
1171 {
1172         char line[1024];
1173         char *t;
1174
1175         if (fgets(line, sizeof(line), stdin) == NULL) {
1176                 goto fail;
1177         }
1178
1179         if (line[0] == '\n') {
1180                 goto fail;
1181         }
1182
1183         /* Get rid of pesky newline */
1184         if ((t = strchr(line, '\n')) != NULL) {
1185                 *t = '\0';
1186         }
1187
1188         ctdb->runstate = ctdb_runstate_from_string(line);
1189         if (ctdb->runstate == CTDB_RUNSTATE_UNKNOWN) {
1190                 goto fail;
1191         }
1192
1193         /* Swallow possible blank line following section.  Picky
1194          * compiler settings don't allow the return value to be
1195          * ignored, so make the compiler happy.
1196          */
1197         if (fgets(line, sizeof(line), stdin) == NULL) {
1198                 ;
1199         }
1200         D_INFO("Parsing runstate done\n");
1201         return true;
1202
1203 fail:
1204         D_ERR("Parsing runstate failed\n");
1205         return false;
1206 }
1207
1208 /*
1209  * Manage clients
1210  */
1211
1212 static int ctdb_client_destructor(struct ctdb_client *client)
1213 {
1214         DLIST_REMOVE(client->ctdb->client_list, client);
1215         return 0;
1216 }
1217
1218 static int client_add(struct ctdbd_context *ctdb, pid_t client_pid,
1219                       void *client_state)
1220 {
1221         struct ctdb_client *client;
1222
1223         client = talloc_zero(client_state, struct ctdb_client);
1224         if (client == NULL) {
1225                 return ENOMEM;
1226         }
1227
1228         client->ctdb = ctdb;
1229         client->pid = client_pid;
1230         client->state = client_state;
1231
1232         DLIST_ADD(ctdb->client_list, client);
1233         talloc_set_destructor(client, ctdb_client_destructor);
1234         return 0;
1235 }
1236
1237 static void *client_find(struct ctdbd_context *ctdb, pid_t client_pid)
1238 {
1239         struct ctdb_client *client;
1240
1241         for (client=ctdb->client_list; client != NULL; client=client->next) {
1242                 if (client->pid == client_pid) {
1243                         return client->state;
1244                 }
1245         }
1246
1247         return NULL;
1248 }
1249
1250 /*
1251  * CTDB context setup
1252  */
1253
1254 static uint32_t new_generation(uint32_t old_generation)
1255 {
1256         uint32_t generation;
1257
1258         while (1) {
1259                 generation = random();
1260                 if (generation != INVALID_GENERATION &&
1261                     generation != old_generation) {
1262                         break;
1263                 }
1264         }
1265
1266         return generation;
1267 }
1268
1269 static struct ctdbd_context *ctdbd_setup(TALLOC_CTX *mem_ctx,
1270                                          const char *dbdir)
1271 {
1272         struct ctdbd_context *ctdb;
1273         char line[1024];
1274         bool status;
1275         int ret;
1276
1277         ctdb = talloc_zero(mem_ctx, struct ctdbd_context);
1278         if (ctdb == NULL) {
1279                 return NULL;
1280         }
1281
1282         ctdb->node_map = nodemap_init(ctdb);
1283         if (ctdb->node_map == NULL) {
1284                 goto fail;
1285         }
1286
1287         ctdb->iface_map = interfaces_init(ctdb);
1288         if (ctdb->iface_map == NULL) {
1289                 goto fail;
1290         }
1291
1292         ctdb->vnn_map = vnnmap_init(ctdb);
1293         if (ctdb->vnn_map == NULL) {
1294                 goto fail;
1295         }
1296
1297         ctdb->db_map = dbmap_init(ctdb, dbdir);
1298         if (ctdb->db_map == NULL) {
1299                 goto fail;
1300         }
1301
1302         ret = srvid_init(ctdb, &ctdb->srv);
1303         if (ret != 0) {
1304                 goto fail;
1305         }
1306
1307         ctdb->runstate = CTDB_RUNSTATE_RUNNING;
1308
1309         while (fgets(line, sizeof(line), stdin) != NULL) {
1310                 char *t;
1311
1312                 if ((t = strchr(line, '\n')) != NULL) {
1313                         *t = '\0';
1314                 }
1315
1316                 if (strcmp(line, "NODEMAP") == 0) {
1317                         status = nodemap_parse(ctdb->node_map);
1318                 } else if (strcmp(line, "IFACES") == 0) {
1319                         status = interfaces_parse(ctdb->iface_map);
1320                 } else if (strcmp(line, "VNNMAP") == 0) {
1321                         status = vnnmap_parse(ctdb->vnn_map);
1322                 } else if (strcmp(line, "DBMAP") == 0) {
1323                         status = dbmap_parse(ctdb->db_map);
1324                 } else if (strcmp(line, "PUBLICIPS") == 0) {
1325                         status = public_ips_parse(ctdb,
1326                                                   ctdb->node_map->num_nodes);
1327                 } else if (strcmp(line, "RECLOCK") == 0) {
1328                         status = reclock_parse(ctdb);
1329                 } else if (strcmp(line, "CONTROLFAILS") == 0) {
1330                         status = control_failures_parse(ctdb);
1331                 } else if (strcmp(line, "RUNSTATE") == 0) {
1332                         status = runstate_parse(ctdb);
1333                 } else {
1334                         fprintf(stderr, "Unknown line %s\n", line);
1335                         status = false;
1336                 }
1337
1338                 if (! status) {
1339                         goto fail;
1340                 }
1341         }
1342
1343         ctdb->start_time = tevent_timeval_current();
1344         ctdb->recovery_start_time = tevent_timeval_current();
1345         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1346         if (ctdb->vnn_map->generation == INVALID_GENERATION) {
1347                 ctdb->vnn_map->generation =
1348                         new_generation(ctdb->vnn_map->generation);
1349         }
1350         ctdb->recovery_end_time = tevent_timeval_current();
1351
1352         ctdb->log_level = DEBUG_ERR;
1353
1354         ctdb_tunable_set_defaults(&ctdb->tun_list);
1355
1356         return ctdb;
1357
1358 fail:
1359         TALLOC_FREE(ctdb);
1360         return NULL;
1361 }
1362
1363 static bool ctdbd_verify(struct ctdbd_context *ctdb)
1364 {
1365         struct node *node;
1366         unsigned int i;
1367
1368         if (ctdb->node_map->num_nodes == 0) {
1369                 return true;
1370         }
1371
1372         /* Make sure all the nodes are in order */
1373         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1374                 node = &ctdb->node_map->node[i];
1375                 if (node->pnn != i) {
1376                         fprintf(stderr, "Expected node %u, found %u\n",
1377                                 i, node->pnn);
1378                         return false;
1379                 }
1380         }
1381
1382         node = &ctdb->node_map->node[ctdb->node_map->pnn];
1383         if (node->flags & NODE_FLAGS_DISCONNECTED) {
1384                 DEBUG(DEBUG_INFO, ("Node disconnected, exiting\n"));
1385                 exit(0);
1386         }
1387
1388         return true;
1389 }
1390
1391 /*
1392  * Doing a recovery
1393  */
1394
1395 struct recover_state {
1396         struct tevent_context *ev;
1397         struct ctdbd_context *ctdb;
1398 };
1399
1400 static int recover_check(struct tevent_req *req);
1401 static void recover_wait_done(struct tevent_req *subreq);
1402 static void recover_done(struct tevent_req *subreq);
1403
1404 static struct tevent_req *recover_send(TALLOC_CTX *mem_ctx,
1405                                        struct tevent_context *ev,
1406                                        struct ctdbd_context *ctdb)
1407 {
1408         struct tevent_req *req;
1409         struct recover_state *state;
1410         int ret;
1411
1412         req = tevent_req_create(mem_ctx, &state, struct recover_state);
1413         if (req == NULL) {
1414                 return NULL;
1415         }
1416
1417         state->ev = ev;
1418         state->ctdb = ctdb;
1419
1420         ret = recover_check(req);
1421         if (ret != 0) {
1422                 tevent_req_error(req, ret);
1423                 return tevent_req_post(req, ev);
1424         }
1425
1426         return req;
1427 }
1428
1429 static int recover_check(struct tevent_req *req)
1430 {
1431         struct recover_state *state = tevent_req_data(
1432                 req, struct recover_state);
1433         struct ctdbd_context *ctdb = state->ctdb;
1434         struct tevent_req *subreq;
1435         bool recovery_disabled;
1436         unsigned int i;
1437
1438         recovery_disabled = false;
1439         for (i=0; i<ctdb->node_map->num_nodes; i++) {
1440                 if (ctdb->node_map->node[i].recovery_disabled) {
1441                         recovery_disabled = true;
1442                         break;
1443                 }
1444         }
1445
1446         subreq = tevent_wakeup_send(state, state->ev,
1447                                     tevent_timeval_current_ofs(1, 0));
1448         if (subreq == NULL) {
1449                 return ENOMEM;
1450         }
1451
1452         if (recovery_disabled) {
1453                 tevent_req_set_callback(subreq, recover_wait_done, req);
1454         } else {
1455                 ctdb->recovery_start_time = tevent_timeval_current();
1456                 tevent_req_set_callback(subreq, recover_done, req);
1457         }
1458
1459         return 0;
1460 }
1461
1462 static void recover_wait_done(struct tevent_req *subreq)
1463 {
1464         struct tevent_req *req = tevent_req_callback_data(
1465                 subreq, struct tevent_req);
1466         int ret;
1467         bool status;
1468
1469         status = tevent_wakeup_recv(subreq);
1470         TALLOC_FREE(subreq);
1471         if (! status) {
1472                 tevent_req_error(req, EIO);
1473                 return;
1474         }
1475
1476         ret = recover_check(req);
1477         if (ret != 0) {
1478                 tevent_req_error(req, ret);
1479         }
1480 }
1481
1482 static void recover_done(struct tevent_req *subreq)
1483 {
1484         struct tevent_req *req = tevent_req_callback_data(
1485                 subreq, struct tevent_req);
1486         struct recover_state *state = tevent_req_data(
1487                 req, struct recover_state);
1488         struct ctdbd_context *ctdb = state->ctdb;
1489         bool status;
1490
1491         status = tevent_wakeup_recv(subreq);
1492         TALLOC_FREE(subreq);
1493         if (! status) {
1494                 tevent_req_error(req, EIO);
1495                 return;
1496         }
1497
1498         ctdb->vnn_map->recmode = CTDB_RECOVERY_NORMAL;
1499         ctdb->recovery_end_time = tevent_timeval_current();
1500         ctdb->vnn_map->generation = new_generation(ctdb->vnn_map->generation);
1501
1502         tevent_req_done(req);
1503 }
1504
1505 static bool recover_recv(struct tevent_req *req, int *perr)
1506 {
1507         int err;
1508
1509         if (tevent_req_is_unix_error(req, &err)) {
1510                 if (perr != NULL) {
1511                         *perr = err;
1512                 }
1513                 return false;
1514         }
1515
1516         return true;
1517 }
1518
1519 /*
1520  * Routines for ctdb_req_header
1521  */
1522
1523 static void header_fix_pnn(struct ctdb_req_header *header,
1524                            struct ctdbd_context *ctdb)
1525 {
1526         if (header->srcnode == CTDB_CURRENT_NODE) {
1527                 header->srcnode = ctdb->node_map->pnn;
1528         }
1529
1530         if (header->destnode == CTDB_CURRENT_NODE) {
1531                 header->destnode = ctdb->node_map->pnn;
1532         }
1533 }
1534
1535 static struct ctdb_req_header header_reply_call(
1536                                         struct ctdb_req_header *header,
1537                                         struct ctdbd_context *ctdb)
1538 {
1539         struct ctdb_req_header reply_header;
1540
1541         reply_header = (struct ctdb_req_header) {
1542                 .ctdb_magic = CTDB_MAGIC,
1543                 .ctdb_version = CTDB_PROTOCOL,
1544                 .generation = ctdb->vnn_map->generation,
1545                 .operation = CTDB_REPLY_CALL,
1546                 .destnode = header->srcnode,
1547                 .srcnode = header->destnode,
1548                 .reqid = header->reqid,
1549         };
1550
1551         return reply_header;
1552 }
1553
1554 static struct ctdb_req_header header_reply_control(
1555                                         struct ctdb_req_header *header,
1556                                         struct ctdbd_context *ctdb)
1557 {
1558         struct ctdb_req_header reply_header;
1559
1560         reply_header = (struct ctdb_req_header) {
1561                 .ctdb_magic = CTDB_MAGIC,
1562                 .ctdb_version = CTDB_PROTOCOL,
1563                 .generation = ctdb->vnn_map->generation,
1564                 .operation = CTDB_REPLY_CONTROL,
1565                 .destnode = header->srcnode,
1566                 .srcnode = header->destnode,
1567                 .reqid = header->reqid,
1568         };
1569
1570         return reply_header;
1571 }
1572
1573 static struct ctdb_req_header header_reply_message(
1574                                         struct ctdb_req_header *header,
1575                                         struct ctdbd_context *ctdb)
1576 {
1577         struct ctdb_req_header reply_header;
1578
1579         reply_header = (struct ctdb_req_header) {
1580                 .ctdb_magic = CTDB_MAGIC,
1581                 .ctdb_version = CTDB_PROTOCOL,
1582                 .generation = ctdb->vnn_map->generation,
1583                 .operation = CTDB_REQ_MESSAGE,
1584                 .destnode = header->srcnode,
1585                 .srcnode = header->destnode,
1586                 .reqid = 0,
1587         };
1588
1589         return reply_header;
1590 }
1591
1592 /*
1593  * Client state
1594  */
1595
1596 struct client_state {
1597         struct tevent_context *ev;
1598         int fd;
1599         struct ctdbd_context *ctdb;
1600         int pnn;
1601         pid_t pid;
1602         struct comm_context *comm;
1603         struct srvid_register_state *rstate;
1604         int status;
1605 };
1606
1607 /*
1608  * Send replies to call, controls and messages
1609  */
1610
1611 static void client_reply_done(struct tevent_req *subreq);
1612
1613 static void client_send_call(struct tevent_req *req,
1614                              struct ctdb_req_header *header,
1615                              struct ctdb_reply_call *reply)
1616 {
1617         struct client_state *state = tevent_req_data(
1618                 req, struct client_state);
1619         struct ctdbd_context *ctdb = state->ctdb;
1620         struct tevent_req *subreq;
1621         struct ctdb_req_header reply_header;
1622         uint8_t *buf;
1623         size_t datalen, buflen;
1624         int ret;
1625
1626         reply_header = header_reply_call(header, ctdb);
1627
1628         datalen = ctdb_reply_call_len(&reply_header, reply);
1629         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1630         if (ret != 0) {
1631                 tevent_req_error(req, ret);
1632                 return;
1633         }
1634
1635         ret = ctdb_reply_call_push(&reply_header, reply, buf, &buflen);
1636         if (ret != 0) {
1637                 tevent_req_error(req, ret);
1638                 return;
1639         }
1640
1641         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1642         if (tevent_req_nomem(subreq, req)) {
1643                 return;
1644         }
1645         tevent_req_set_callback(subreq, client_reply_done, req);
1646
1647         talloc_steal(subreq, buf);
1648 }
1649
1650 static void client_send_message(struct tevent_req *req,
1651                                 struct ctdb_req_header *header,
1652                                 struct ctdb_req_message_data *message)
1653 {
1654         struct client_state *state = tevent_req_data(
1655                 req, struct client_state);
1656         struct ctdbd_context *ctdb = state->ctdb;
1657         struct tevent_req *subreq;
1658         struct ctdb_req_header reply_header;
1659         uint8_t *buf;
1660         size_t datalen, buflen;
1661         int ret;
1662
1663         reply_header = header_reply_message(header, ctdb);
1664
1665         datalen = ctdb_req_message_data_len(&reply_header, message);
1666         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1667         if (ret != 0) {
1668                 tevent_req_error(req, ret);
1669                 return;
1670         }
1671
1672         ret = ctdb_req_message_data_push(&reply_header, message,
1673                                          buf, &buflen);
1674         if (ret != 0) {
1675                 tevent_req_error(req, ret);
1676                 return;
1677         }
1678
1679         DEBUG(DEBUG_INFO, ("message srvid = 0x%"PRIx64"\n", message->srvid));
1680
1681         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1682         if (tevent_req_nomem(subreq, req)) {
1683                 return;
1684         }
1685         tevent_req_set_callback(subreq, client_reply_done, req);
1686
1687         talloc_steal(subreq, buf);
1688 }
1689
1690 static void client_send_control(struct tevent_req *req,
1691                                 struct ctdb_req_header *header,
1692                                 struct ctdb_reply_control *reply)
1693 {
1694         struct client_state *state = tevent_req_data(
1695                 req, struct client_state);
1696         struct ctdbd_context *ctdb = state->ctdb;
1697         struct tevent_req *subreq;
1698         struct ctdb_req_header reply_header;
1699         uint8_t *buf;
1700         size_t datalen, buflen;
1701         int ret;
1702
1703         reply_header = header_reply_control(header, ctdb);
1704
1705         datalen = ctdb_reply_control_len(&reply_header, reply);
1706         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
1707         if (ret != 0) {
1708                 tevent_req_error(req, ret);
1709                 return;
1710         }
1711
1712         ret = ctdb_reply_control_push(&reply_header, reply, buf, &buflen);
1713         if (ret != 0) {
1714                 tevent_req_error(req, ret);
1715                 return;
1716         }
1717
1718         DEBUG(DEBUG_INFO, ("reply opcode = %u\n", reply->rdata.opcode));
1719
1720         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
1721         if (tevent_req_nomem(subreq, req)) {
1722                 return;
1723         }
1724         tevent_req_set_callback(subreq, client_reply_done, req);
1725
1726         talloc_steal(subreq, buf);
1727 }
1728
1729 static void client_reply_done(struct tevent_req *subreq)
1730 {
1731         struct tevent_req *req = tevent_req_callback_data(
1732                 subreq, struct tevent_req);
1733         int ret;
1734         bool status;
1735
1736         status = comm_write_recv(subreq, &ret);
1737         TALLOC_FREE(subreq);
1738         if (! status) {
1739                 tevent_req_error(req, ret);
1740         }
1741 }
1742
1743 /*
1744  * Handling protocol - controls
1745  */
1746
1747 static void control_process_exists(TALLOC_CTX *mem_ctx,
1748                                    struct tevent_req *req,
1749                                    struct ctdb_req_header *header,
1750                                    struct ctdb_req_control *request)
1751 {
1752         struct client_state *state = tevent_req_data(
1753                 req, struct client_state);
1754         struct ctdbd_context *ctdb = state->ctdb;
1755         struct client_state *cstate;
1756         struct ctdb_reply_control reply;
1757
1758         reply.rdata.opcode = request->opcode;
1759
1760         cstate = client_find(ctdb, request->rdata.data.pid);
1761         if (cstate == NULL) {
1762                 reply.status = -1;
1763                 reply.errmsg = "No client for PID";
1764         } else {
1765                 reply.status = kill(request->rdata.data.pid, 0);
1766                 reply.errmsg = NULL;
1767         }
1768
1769         client_send_control(req, header, &reply);
1770 }
1771
1772 static void control_ping(TALLOC_CTX *mem_ctx,
1773                          struct tevent_req *req,
1774                          struct ctdb_req_header *header,
1775                          struct ctdb_req_control *request)
1776 {
1777         struct client_state *state = tevent_req_data(
1778                 req, struct client_state);
1779         struct ctdbd_context *ctdb = state->ctdb;
1780         struct ctdb_reply_control reply;
1781
1782         reply.rdata.opcode = request->opcode;
1783         reply.status = ctdb->num_clients;
1784         reply.errmsg = NULL;
1785
1786         client_send_control(req, header, &reply);
1787 }
1788
1789 static void control_getdbpath(TALLOC_CTX *mem_ctx,
1790                               struct tevent_req *req,
1791                               struct ctdb_req_header *header,
1792                               struct ctdb_req_control *request)
1793 {
1794         struct client_state *state = tevent_req_data(
1795                 req, struct client_state);
1796         struct ctdbd_context *ctdb = state->ctdb;
1797         struct ctdb_reply_control reply;
1798         struct database *db;
1799
1800         reply.rdata.opcode = request->opcode;
1801
1802         db = database_find(ctdb->db_map, request->rdata.data.db_id);
1803         if (db == NULL) {
1804                 reply.status = ENOENT;
1805                 reply.errmsg = "Database not found";
1806         } else {
1807                 reply.rdata.data.db_path =
1808                         talloc_strdup(mem_ctx, db->path);
1809                 if (reply.rdata.data.db_path == NULL) {
1810                         reply.status = ENOMEM;
1811                         reply.errmsg = "Memory error";
1812                 } else {
1813                         reply.status = 0;
1814                         reply.errmsg = NULL;
1815                 }
1816         }
1817
1818         client_send_control(req, header, &reply);
1819 }
1820
1821 static void control_getvnnmap(TALLOC_CTX *mem_ctx,
1822                               struct tevent_req *req,
1823                               struct ctdb_req_header *header,
1824                               struct ctdb_req_control *request)
1825 {
1826         struct client_state *state = tevent_req_data(
1827                 req, struct client_state);
1828         struct ctdbd_context *ctdb = state->ctdb;
1829         struct ctdb_reply_control reply;
1830         struct ctdb_vnn_map *vnnmap;
1831
1832         reply.rdata.opcode = request->opcode;
1833
1834         vnnmap = talloc_zero(mem_ctx, struct ctdb_vnn_map);
1835         if (vnnmap == NULL) {
1836                 reply.status = ENOMEM;
1837                 reply.errmsg = "Memory error";
1838         } else {
1839                 vnnmap->generation = ctdb->vnn_map->generation;
1840                 vnnmap->size = ctdb->vnn_map->size;
1841                 vnnmap->map = ctdb->vnn_map->map;
1842
1843                 reply.rdata.data.vnnmap = vnnmap;
1844                 reply.status = 0;
1845                 reply.errmsg = NULL;
1846         }
1847
1848         client_send_control(req, header, &reply);
1849 }
1850
1851 static void control_get_debug(TALLOC_CTX *mem_ctx,
1852                               struct tevent_req *req,
1853                               struct ctdb_req_header *header,
1854                               struct ctdb_req_control *request)
1855 {
1856         struct client_state *state = tevent_req_data(
1857                 req, struct client_state);
1858         struct ctdbd_context *ctdb = state->ctdb;
1859         struct ctdb_reply_control reply;
1860
1861         reply.rdata.opcode = request->opcode;
1862         reply.rdata.data.loglevel = (uint32_t)ctdb->log_level;
1863         reply.status = 0;
1864         reply.errmsg = NULL;
1865
1866         client_send_control(req, header, &reply);
1867 }
1868
1869 static void control_set_debug(TALLOC_CTX *mem_ctx,
1870                               struct tevent_req *req,
1871                               struct ctdb_req_header *header,
1872                               struct ctdb_req_control *request)
1873 {
1874         struct client_state *state = tevent_req_data(
1875                 req, struct client_state);
1876         struct ctdbd_context *ctdb = state->ctdb;
1877         struct ctdb_reply_control reply;
1878
1879         ctdb->log_level = (int)request->rdata.data.loglevel;
1880
1881         reply.rdata.opcode = request->opcode;
1882         reply.status = 0;
1883         reply.errmsg = NULL;
1884
1885         client_send_control(req, header, &reply);
1886 }
1887
1888 static void control_get_dbmap(TALLOC_CTX *mem_ctx,
1889                               struct tevent_req *req,
1890                                struct ctdb_req_header *header,
1891                               struct ctdb_req_control *request)
1892 {
1893         struct client_state *state = tevent_req_data(
1894                 req, struct client_state);
1895         struct ctdbd_context *ctdb = state->ctdb;
1896         struct ctdb_reply_control reply;
1897         struct ctdb_dbid_map *dbmap;
1898         struct database *db;
1899         unsigned int i;
1900
1901         reply.rdata.opcode = request->opcode;
1902
1903         dbmap = talloc_zero(mem_ctx, struct ctdb_dbid_map);
1904         if (dbmap == NULL) {
1905                 goto fail;
1906         }
1907
1908         dbmap->num = database_count(ctdb->db_map);
1909         dbmap->dbs = talloc_zero_array(dbmap, struct ctdb_dbid, dbmap->num);
1910         if (dbmap->dbs == NULL) {
1911                 goto fail;
1912         }
1913
1914         db = ctdb->db_map->db;
1915         for (i = 0; i < dbmap->num; i++) {
1916                 dbmap->dbs[i] = (struct ctdb_dbid) {
1917                         .db_id = db->id,
1918                         .flags = db->flags,
1919                 };
1920
1921                 db = db->next;
1922         }
1923
1924         reply.rdata.data.dbmap = dbmap;
1925         reply.status = 0;
1926         reply.errmsg = NULL;
1927         client_send_control(req, header, &reply);
1928         return;
1929
1930 fail:
1931         reply.status = -1;
1932         reply.errmsg = "Memory error";
1933         client_send_control(req, header, &reply);
1934 }
1935
1936 static void control_get_recmode(TALLOC_CTX *mem_ctx,
1937                                 struct tevent_req *req,
1938                                 struct ctdb_req_header *header,
1939                                 struct ctdb_req_control *request)
1940 {
1941         struct client_state *state = tevent_req_data(
1942                 req, struct client_state);
1943         struct ctdbd_context *ctdb = state->ctdb;
1944         struct ctdb_reply_control reply;
1945
1946         reply.rdata.opcode = request->opcode;
1947         reply.status = ctdb->vnn_map->recmode;
1948         reply.errmsg = NULL;
1949
1950         client_send_control(req, header, &reply);
1951 }
1952
1953 struct set_recmode_state {
1954         struct tevent_req *req;
1955         struct ctdbd_context *ctdb;
1956         struct ctdb_req_header header;
1957         struct ctdb_reply_control reply;
1958 };
1959
1960 static void set_recmode_callback(struct tevent_req *subreq)
1961 {
1962         struct set_recmode_state *substate = tevent_req_callback_data(
1963                 subreq, struct set_recmode_state);
1964         bool status;
1965         int ret;
1966
1967         status = recover_recv(subreq, &ret);
1968         TALLOC_FREE(subreq);
1969         if (! status) {
1970                 substate->reply.status = ret;
1971                 substate->reply.errmsg = "recovery failed";
1972         } else {
1973                 substate->reply.status = 0;
1974                 substate->reply.errmsg = NULL;
1975         }
1976
1977         client_send_control(substate->req, &substate->header, &substate->reply);
1978         talloc_free(substate);
1979 }
1980
1981 static void control_set_recmode(TALLOC_CTX *mem_ctx,
1982                                 struct tevent_req *req,
1983                                 struct ctdb_req_header *header,
1984                                 struct ctdb_req_control *request)
1985 {
1986         struct client_state *state = tevent_req_data(
1987                 req, struct client_state);
1988         struct tevent_req *subreq;
1989         struct ctdbd_context *ctdb = state->ctdb;
1990         struct set_recmode_state *substate;
1991         struct ctdb_reply_control reply;
1992
1993         reply.rdata.opcode = request->opcode;
1994
1995         if (request->rdata.data.recmode == CTDB_RECOVERY_NORMAL) {
1996                 reply.status = -1;
1997                 reply.errmsg = "Client cannot set recmode to NORMAL";
1998                 goto fail;
1999         }
2000
2001         substate = talloc_zero(ctdb, struct set_recmode_state);
2002         if (substate == NULL) {
2003                 reply.status = -1;
2004                 reply.errmsg = "Memory error";
2005                 goto fail;
2006         }
2007
2008         substate->req = req;
2009         substate->ctdb = ctdb;
2010         substate->header = *header;
2011         substate->reply.rdata.opcode = request->opcode;
2012
2013         subreq = recover_send(substate, state->ev, state->ctdb);
2014         if (subreq == NULL) {
2015                 talloc_free(substate);
2016                 goto fail;
2017         }
2018         tevent_req_set_callback(subreq, set_recmode_callback, substate);
2019
2020         ctdb->vnn_map->recmode = CTDB_RECOVERY_ACTIVE;
2021         return;
2022
2023 fail:
2024         client_send_control(req, header, &reply);
2025
2026 }
2027
2028 static void control_db_attach(TALLOC_CTX *mem_ctx,
2029                               struct tevent_req *req,
2030                               struct ctdb_req_header *header,
2031                               struct ctdb_req_control *request)
2032 {
2033         struct client_state *state = tevent_req_data(
2034                 req, struct client_state);
2035         struct ctdbd_context *ctdb = state->ctdb;
2036         struct ctdb_reply_control reply;
2037         struct database *db;
2038
2039         reply.rdata.opcode = request->opcode;
2040
2041         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2042                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2043                         goto done;
2044                 }
2045         }
2046
2047         db = database_new(ctdb->db_map, request->rdata.data.db_name, 0);
2048         if (db == NULL) {
2049                 reply.status = -1;
2050                 reply.errmsg = "Failed to attach database";
2051                 client_send_control(req, header, &reply);
2052                 return;
2053         }
2054
2055 done:
2056         reply.rdata.data.db_id = db->id;
2057         reply.status = 0;
2058         reply.errmsg = NULL;
2059         client_send_control(req, header, &reply);
2060 }
2061
2062 static void srvid_handler_done(struct tevent_req *subreq);
2063
2064 static void srvid_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2065 {
2066         struct client_state *state = talloc_get_type_abort(
2067                 private_data, struct client_state);
2068         struct ctdbd_context *ctdb = state->ctdb;
2069         struct tevent_req *subreq;
2070         struct ctdb_req_header request_header;
2071         struct ctdb_req_message_data message;
2072         uint8_t *buf;
2073         size_t datalen, buflen;
2074         int ret;
2075
2076         request_header = (struct ctdb_req_header) {
2077                 .ctdb_magic = CTDB_MAGIC,
2078                 .ctdb_version = CTDB_PROTOCOL,
2079                 .generation = ctdb->vnn_map->generation,
2080                 .operation = CTDB_REQ_MESSAGE,
2081                 .destnode = state->pnn,
2082                 .srcnode = ctdb->node_map->recmaster,
2083                 .reqid = 0,
2084         };
2085
2086         message = (struct ctdb_req_message_data) {
2087                 .srvid = srvid,
2088                 .data = data,
2089         };
2090
2091         datalen = ctdb_req_message_data_len(&request_header, &message);
2092         ret = ctdb_allocate_pkt(state, datalen, &buf, &buflen);
2093         if (ret != 0) {
2094                 return;
2095         }
2096
2097         ret = ctdb_req_message_data_push(&request_header,
2098                                          &message,
2099                                          buf,
2100                                          &buflen);
2101         if (ret != 0) {
2102                 talloc_free(buf);
2103                 return;
2104         }
2105
2106         subreq = comm_write_send(state, state->ev, state->comm, buf, buflen);
2107         if (subreq == NULL) {
2108                 talloc_free(buf);
2109                 return;
2110         }
2111         tevent_req_set_callback(subreq, srvid_handler_done, state);
2112
2113         talloc_steal(subreq, buf);
2114 }
2115
2116 static void srvid_handler_done(struct tevent_req *subreq)
2117 {
2118         struct client_state *state = tevent_req_callback_data(
2119                 subreq, struct client_state);
2120         int ret;
2121         bool ok;
2122
2123         ok = comm_write_recv(subreq, &ret);
2124         TALLOC_FREE(subreq);
2125         if (!ok) {
2126                 DEBUG(DEBUG_ERR,
2127                       ("Failed to dispatch message to client pid=%u, ret=%d\n",
2128                        state->pid,
2129                        ret));
2130         }
2131 }
2132
2133 static void control_register_srvid(TALLOC_CTX *mem_ctx,
2134                                    struct tevent_req *req,
2135                                    struct ctdb_req_header *header,
2136                                    struct ctdb_req_control *request)
2137 {
2138         struct client_state *state = tevent_req_data(
2139                 req, struct client_state);
2140         struct ctdbd_context *ctdb = state->ctdb;
2141         struct ctdb_reply_control reply;
2142         int ret;
2143
2144         reply.rdata.opcode = request->opcode;
2145
2146         ret = srvid_register(ctdb->srv, state, request->srvid,
2147                              srvid_handler, state);
2148         if (ret != 0) {
2149                 reply.status = -1;
2150                 reply.errmsg = "Memory error";
2151                 goto fail;
2152         }
2153
2154         DEBUG(DEBUG_INFO, ("Register srvid 0x%"PRIx64"\n", request->srvid));
2155
2156         reply.status = 0;
2157         reply.errmsg = NULL;
2158
2159 fail:
2160         client_send_control(req, header, &reply);
2161 }
2162
2163 static void control_deregister_srvid(TALLOC_CTX *mem_ctx,
2164                                      struct tevent_req *req,
2165                                      struct ctdb_req_header *header,
2166                                      struct ctdb_req_control *request)
2167 {
2168         struct client_state *state = tevent_req_data(
2169                 req, struct client_state);
2170         struct ctdbd_context *ctdb = state->ctdb;
2171         struct ctdb_reply_control reply;
2172         int ret;
2173
2174         reply.rdata.opcode = request->opcode;
2175
2176         ret = srvid_deregister(ctdb->srv, request->srvid, state);
2177         if (ret != 0) {
2178                 reply.status = -1;
2179                 reply.errmsg = "srvid not registered";
2180                 goto fail;
2181         }
2182
2183         DEBUG(DEBUG_INFO, ("Deregister srvid 0x%"PRIx64"\n", request->srvid));
2184
2185         reply.status = 0;
2186         reply.errmsg = NULL;
2187
2188 fail:
2189         client_send_control(req, header, &reply);
2190 }
2191
2192 static void control_get_dbname(TALLOC_CTX *mem_ctx,
2193                                struct tevent_req *req,
2194                                struct ctdb_req_header *header,
2195                                struct ctdb_req_control *request)
2196 {
2197         struct client_state *state = tevent_req_data(
2198                 req, struct client_state);
2199         struct ctdbd_context *ctdb = state->ctdb;
2200         struct ctdb_reply_control reply;
2201         struct database *db;
2202
2203         reply.rdata.opcode = request->opcode;
2204
2205         db = database_find(ctdb->db_map, request->rdata.data.db_id);
2206         if (db == NULL) {
2207                 reply.status = ENOENT;
2208                 reply.errmsg = "Database not found";
2209         } else {
2210                 reply.rdata.data.db_name = talloc_strdup(mem_ctx, db->name);
2211                 if (reply.rdata.data.db_name == NULL) {
2212                         reply.status = ENOMEM;
2213                         reply.errmsg = "Memory error";
2214                 } else {
2215                         reply.status = 0;
2216                         reply.errmsg = NULL;
2217                 }
2218         }
2219
2220         client_send_control(req, header, &reply);
2221 }
2222
2223 static void control_get_pid(TALLOC_CTX *mem_ctx,
2224                             struct tevent_req *req,
2225                             struct ctdb_req_header *header,
2226                             struct ctdb_req_control *request)
2227 {
2228         struct ctdb_reply_control reply;
2229
2230         reply.rdata.opcode = request->opcode;
2231         reply.status = getpid();
2232         reply.errmsg = NULL;
2233
2234         client_send_control(req, header, &reply);
2235 }
2236
2237 static void control_get_pnn(TALLOC_CTX *mem_ctx,
2238                             struct tevent_req *req,
2239                             struct ctdb_req_header *header,
2240                             struct ctdb_req_control *request)
2241 {
2242         struct ctdb_reply_control reply;
2243
2244         reply.rdata.opcode = request->opcode;
2245         reply.status = header->destnode;
2246         reply.errmsg = NULL;
2247
2248         client_send_control(req, header, &reply);
2249 }
2250
2251 static void control_shutdown(TALLOC_CTX *mem_ctx,
2252                              struct tevent_req *req,
2253                              struct ctdb_req_header *hdr,
2254                              struct ctdb_req_control *request)
2255 {
2256         struct client_state *state = tevent_req_data(
2257                 req, struct client_state);
2258
2259         state->status = 99;
2260 }
2261
2262 static void control_set_tunable(TALLOC_CTX *mem_ctx,
2263                                 struct tevent_req *req,
2264                                 struct ctdb_req_header *header,
2265                                 struct ctdb_req_control *request)
2266 {
2267         struct client_state *state = tevent_req_data(
2268                 req, struct client_state);
2269         struct ctdbd_context *ctdb = state->ctdb;
2270         struct ctdb_reply_control reply;
2271         bool ret, obsolete;
2272
2273         reply.rdata.opcode = request->opcode;
2274         reply.errmsg = NULL;
2275
2276         ret = ctdb_tunable_set_value(&ctdb->tun_list,
2277                                      request->rdata.data.tunable->name,
2278                                      request->rdata.data.tunable->value,
2279                                      &obsolete);
2280         if (! ret) {
2281                 reply.status = -1;
2282         } else if (obsolete) {
2283                 reply.status = 1;
2284         } else {
2285                 reply.status = 0;
2286         }
2287
2288         client_send_control(req, header, &reply);
2289 }
2290
2291 static void control_get_tunable(TALLOC_CTX *mem_ctx,
2292                                 struct tevent_req *req,
2293                                 struct ctdb_req_header *header,
2294                                 struct ctdb_req_control *request)
2295 {
2296         struct client_state *state = tevent_req_data(
2297                 req, struct client_state);
2298         struct ctdbd_context *ctdb = state->ctdb;
2299         struct ctdb_reply_control reply;
2300         uint32_t value;
2301         bool ret;
2302
2303         reply.rdata.opcode = request->opcode;
2304         reply.errmsg = NULL;
2305
2306         ret = ctdb_tunable_get_value(&ctdb->tun_list,
2307                                      request->rdata.data.tun_var, &value);
2308         if (! ret) {
2309                 reply.status = -1;
2310         } else {
2311                 reply.rdata.data.tun_value = value;
2312                 reply.status = 0;
2313         }
2314
2315         client_send_control(req, header, &reply);
2316 }
2317
2318 static void control_list_tunables(TALLOC_CTX *mem_ctx,
2319                                   struct tevent_req *req,
2320                                   struct ctdb_req_header *header,
2321                                   struct ctdb_req_control *request)
2322 {
2323         struct ctdb_reply_control reply;
2324         struct ctdb_var_list *var_list;
2325
2326         reply.rdata.opcode = request->opcode;
2327         reply.errmsg = NULL;
2328
2329         var_list = ctdb_tunable_names(mem_ctx);
2330         if (var_list == NULL) {
2331                 reply.status = -1;
2332         } else {
2333                 reply.rdata.data.tun_var_list = var_list;
2334                 reply.status = 0;
2335         }
2336
2337         client_send_control(req, header, &reply);
2338 }
2339
2340 static void control_modify_flags(TALLOC_CTX *mem_ctx,
2341                                  struct tevent_req *req,
2342                                  struct ctdb_req_header *header,
2343                                  struct ctdb_req_control *request)
2344 {
2345         struct client_state *state = tevent_req_data(
2346                 req, struct client_state);
2347         struct ctdbd_context *ctdb = state->ctdb;
2348         struct ctdb_node_flag_change *change = request->rdata.data.flag_change;
2349         struct ctdb_reply_control reply;
2350         struct node *node;
2351
2352         reply.rdata.opcode = request->opcode;
2353
2354         if ((change->old_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) ||
2355             (change->new_flags & ~NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2356                 DEBUG(DEBUG_INFO,
2357                       ("MODIFY_FLAGS control not for PERMANENTLY_DISABLED\n"));
2358                 reply.status = EINVAL;
2359                 reply.errmsg = "Failed to MODIFY_FLAGS";
2360                 client_send_control(req, header, &reply);
2361                 return;
2362         }
2363
2364         /* There's all sorts of broadcast weirdness here.  Only change
2365          * the specified node, not the destination node of the
2366          * control. */
2367         node = &ctdb->node_map->node[change->pnn];
2368
2369         if ((node->flags &
2370              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0 &&
2371             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0) {
2372                 DEBUG(DEBUG_INFO,("Disabling node %d\n", header->destnode));
2373                 node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
2374                 goto done;
2375         }
2376
2377         if ((node->flags &
2378              change->old_flags & NODE_FLAGS_PERMANENTLY_DISABLED) != 0 &&
2379             (change->new_flags & NODE_FLAGS_PERMANENTLY_DISABLED) == 0) {
2380                 DEBUG(DEBUG_INFO,("Enabling node %d\n", header->destnode));
2381                 node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
2382                 goto done;
2383         }
2384
2385         DEBUG(DEBUG_INFO, ("Flags unchanged for node %d\n", header->destnode));
2386
2387 done:
2388         reply.status = 0;
2389         reply.errmsg = NULL;
2390         client_send_control(req, header, &reply);
2391 }
2392
2393 static void control_get_all_tunables(TALLOC_CTX *mem_ctx,
2394                                      struct tevent_req *req,
2395                                      struct ctdb_req_header *header,
2396                                      struct ctdb_req_control *request)
2397 {
2398         struct client_state *state = tevent_req_data(
2399                 req, struct client_state);
2400         struct ctdbd_context *ctdb = state->ctdb;
2401         struct ctdb_reply_control reply;
2402
2403         reply.rdata.opcode = request->opcode;
2404         reply.rdata.data.tun_list = &ctdb->tun_list;
2405         reply.status = 0;
2406         reply.errmsg = NULL;
2407
2408         client_send_control(req, header, &reply);
2409 }
2410
2411 static void control_db_attach_persistent(TALLOC_CTX *mem_ctx,
2412                                          struct tevent_req *req,
2413                                          struct ctdb_req_header *header,
2414                                          struct ctdb_req_control *request)
2415 {
2416         struct client_state *state = tevent_req_data(
2417                 req, struct client_state);
2418         struct ctdbd_context *ctdb = state->ctdb;
2419         struct ctdb_reply_control reply;
2420         struct database *db;
2421
2422         reply.rdata.opcode = request->opcode;
2423
2424         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
2425                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
2426                         goto done;
2427                 }
2428         }
2429
2430         db = database_new(ctdb->db_map, request->rdata.data.db_name,
2431                           CTDB_DB_FLAGS_PERSISTENT);
2432         if (db == NULL) {
2433                 reply.status = -1;
2434                 reply.errmsg = "Failed to attach database";
2435                 client_send_control(req, header, &reply);
2436                 return;
2437         }
2438
2439 done:
2440         reply.rdata.data.db_id = db->id;
2441         reply.status = 0;
2442         reply.errmsg = NULL;
2443         client_send_control(req, header, &reply);
2444 }
2445
2446 static void control_uptime(TALLOC_CTX *mem_ctx,
2447                            struct tevent_req *req,
2448                            struct ctdb_req_header *header,
2449                            struct ctdb_req_control *request)
2450 {
2451         struct client_state *state = tevent_req_data(
2452                 req, struct client_state);
2453         struct ctdbd_context *ctdb = state->ctdb;
2454         struct ctdb_reply_control reply;
2455         struct ctdb_uptime *uptime;;
2456
2457         reply.rdata.opcode = request->opcode;
2458
2459         uptime = talloc_zero(mem_ctx, struct ctdb_uptime);
2460         if (uptime == NULL) {
2461                 goto fail;
2462         }
2463
2464         uptime->current_time = tevent_timeval_current();
2465         uptime->ctdbd_start_time = ctdb->start_time;
2466         uptime->last_recovery_started = ctdb->recovery_start_time;
2467         uptime->last_recovery_finished = ctdb->recovery_end_time;
2468
2469         reply.rdata.data.uptime = uptime;
2470         reply.status = 0;
2471         reply.errmsg = NULL;
2472         client_send_control(req, header, &reply);
2473         return;
2474
2475 fail:
2476         reply.status = -1;
2477         reply.errmsg = "Memory error";
2478         client_send_control(req, header, &reply);
2479 }
2480
2481 static void control_reload_nodes_file(TALLOC_CTX *mem_ctx,
2482                                       struct tevent_req *req,
2483                                       struct ctdb_req_header *header,
2484                                       struct ctdb_req_control *request)
2485 {
2486         struct client_state *state = tevent_req_data(
2487                 req, struct client_state);
2488         struct ctdbd_context *ctdb = state->ctdb;
2489         struct ctdb_reply_control reply;
2490         struct ctdb_node_map *nodemap;
2491         struct node_map *node_map = ctdb->node_map;
2492         unsigned int i;
2493
2494         reply.rdata.opcode = request->opcode;
2495
2496         nodemap = read_nodes_file(mem_ctx, header->destnode);
2497         if (nodemap == NULL) {
2498                 goto fail;
2499         }
2500
2501         for (i=0; i<nodemap->num; i++) {
2502                 struct node *node;
2503
2504                 if (i < node_map->num_nodes &&
2505                     ctdb_sock_addr_same(&nodemap->node[i].addr,
2506                                         &node_map->node[i].addr)) {
2507                         continue;
2508                 }
2509
2510                 if (nodemap->node[i].flags & NODE_FLAGS_DELETED) {
2511                         int ret;
2512
2513                         node = &node_map->node[i];
2514
2515                         node->flags |= NODE_FLAGS_DELETED;
2516                         ret = ctdb_sock_addr_from_string("0.0.0.0", &node->addr,
2517                                                          false);
2518                         if (ret != 0) {
2519                                 /* Can't happen, but Coverity... */
2520                                 goto fail;
2521                         }
2522
2523                         continue;
2524                 }
2525
2526                 if (i < node_map->num_nodes &&
2527                     node_map->node[i].flags & NODE_FLAGS_DELETED) {
2528                         node = &node_map->node[i];
2529
2530                         node->flags &= ~NODE_FLAGS_DELETED;
2531                         node->addr = nodemap->node[i].addr;
2532
2533                         continue;
2534                 }
2535
2536                 node_map->node = talloc_realloc(node_map, node_map->node,
2537                                                 struct node,
2538                                                 node_map->num_nodes+1);
2539                 if (node_map->node == NULL) {
2540                         goto fail;
2541                 }
2542                 node = &node_map->node[node_map->num_nodes];
2543
2544                 node->addr = nodemap->node[i].addr;
2545                 node->pnn = nodemap->node[i].pnn;
2546                 node->flags = 0;
2547                 node->capabilities = CTDB_CAP_DEFAULT;
2548                 node->recovery_disabled = false;
2549                 node->recovery_substate = NULL;
2550
2551                 node_map->num_nodes += 1;
2552         }
2553
2554         talloc_free(nodemap);
2555
2556         reply.status = 0;
2557         reply.errmsg = NULL;
2558         client_send_control(req, header, &reply);
2559         return;
2560
2561 fail:
2562         reply.status = -1;
2563         reply.errmsg = "Memory error";
2564         client_send_control(req, header, &reply);
2565 }
2566
2567 static void control_get_capabilities(TALLOC_CTX *mem_ctx,
2568                                      struct tevent_req *req,
2569                                      struct ctdb_req_header *header,
2570                                      struct ctdb_req_control *request)
2571 {
2572         struct client_state *state = tevent_req_data(
2573                 req, struct client_state);
2574         struct ctdbd_context *ctdb = state->ctdb;
2575         struct ctdb_reply_control reply;
2576         struct node *node;
2577         uint32_t caps = 0;
2578
2579         reply.rdata.opcode = request->opcode;
2580
2581         node = &ctdb->node_map->node[header->destnode];
2582         caps = node->capabilities;
2583
2584         if (node->flags & NODE_FLAGS_FAKE_TIMEOUT) {
2585                 /* Don't send reply */
2586                 return;
2587         }
2588
2589         reply.rdata.data.caps = caps;
2590         reply.status = 0;
2591         reply.errmsg = NULL;
2592
2593         client_send_control(req, header, &reply);
2594 }
2595
2596 static void control_release_ip(TALLOC_CTX *mem_ctx,
2597                                struct tevent_req *req,
2598                                struct ctdb_req_header *header,
2599                                struct ctdb_req_control *request)
2600 {
2601         struct client_state *state = tevent_req_data(
2602                 req, struct client_state);
2603         struct ctdbd_context *ctdb = state->ctdb;
2604         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2605         struct ctdb_reply_control reply;
2606         struct ctdb_public_ip_list *ips = NULL;
2607         struct ctdb_public_ip *t = NULL;
2608         unsigned int i;
2609
2610         reply.rdata.opcode = request->opcode;
2611
2612         if (ctdb->known_ips == NULL) {
2613                 D_INFO("RELEASE_IP %s - not a public IP\n",
2614                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2615                 goto done;
2616         }
2617
2618         ips = &ctdb->known_ips[header->destnode];
2619
2620         t = NULL;
2621         for (i = 0; i < ips->num; i++) {
2622                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2623                         t = &ips->ip[i];
2624                         break;
2625                 }
2626         }
2627         if (t == NULL) {
2628                 D_INFO("RELEASE_IP %s - not a public IP\n",
2629                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2630                 goto done;
2631         }
2632
2633         if (t->pnn != header->destnode) {
2634                 if (header->destnode == ip->pnn) {
2635                         D_ERR("error: RELEASE_IP %s - to TAKE_IP node %d\n",
2636                               ctdb_sock_addr_to_string(mem_ctx,
2637                                                        &ip->addr, false),
2638                               ip->pnn);
2639                         reply.status = -1;
2640                         reply.errmsg = "RELEASE_IP to TAKE_IP node";
2641                         client_send_control(req, header, &reply);
2642                         return;
2643                 }
2644
2645                 D_INFO("RELEASE_IP %s - to node %d - redundant\n",
2646                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2647                        ip->pnn);
2648                 t->pnn = ip->pnn;
2649         } else {
2650                 D_NOTICE("RELEASE_IP %s - to node %d\n",
2651                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false),
2652                           ip->pnn);
2653                 t->pnn = ip->pnn;
2654         }
2655
2656 done:
2657         reply.status = 0;
2658         reply.errmsg = NULL;
2659         client_send_control(req, header, &reply);
2660 }
2661
2662 static void control_takeover_ip(TALLOC_CTX *mem_ctx,
2663                                 struct tevent_req *req,
2664                                 struct ctdb_req_header *header,
2665                                 struct ctdb_req_control *request)
2666 {
2667         struct client_state *state = tevent_req_data(
2668                 req, struct client_state);
2669         struct ctdbd_context *ctdb = state->ctdb;
2670         struct ctdb_public_ip *ip = request->rdata.data.pubip;
2671         struct ctdb_reply_control reply;
2672         struct ctdb_public_ip_list *ips = NULL;
2673         struct ctdb_public_ip *t = NULL;
2674         unsigned int i;
2675
2676         reply.rdata.opcode = request->opcode;
2677
2678         if (ctdb->known_ips == NULL) {
2679                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2680                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2681                 goto done;
2682         }
2683
2684         ips = &ctdb->known_ips[header->destnode];
2685
2686         t = NULL;
2687         for (i = 0; i < ips->num; i++) {
2688                 if (ctdb_sock_addr_same_ip(&ips->ip[i].addr, &ip->addr)) {
2689                         t = &ips->ip[i];
2690                         break;
2691                 }
2692         }
2693         if (t == NULL) {
2694                 D_INFO("TAKEOVER_IP %s - not a public IP\n",
2695                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2696                 goto done;
2697         }
2698
2699         if (t->pnn == header->destnode) {
2700                 D_INFO("TAKEOVER_IP %s - redundant\n",
2701                        ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2702         } else {
2703                 D_NOTICE("TAKEOVER_IP %s\n",
2704                          ctdb_sock_addr_to_string(mem_ctx, &ip->addr, false));
2705                 t->pnn = ip->pnn;
2706         }
2707
2708 done:
2709         reply.status = 0;
2710         reply.errmsg = NULL;
2711         client_send_control(req, header, &reply);
2712 }
2713
2714 static void control_get_public_ips(TALLOC_CTX *mem_ctx,
2715                                    struct tevent_req *req,
2716                                    struct ctdb_req_header *header,
2717                                    struct ctdb_req_control *request)
2718 {
2719         struct client_state *state = tevent_req_data(
2720                 req, struct client_state);
2721         struct ctdbd_context *ctdb = state->ctdb;
2722         struct ctdb_reply_control reply;
2723         struct ctdb_public_ip_list *ips = NULL;
2724
2725         reply.rdata.opcode = request->opcode;
2726
2727         if (ctdb->known_ips == NULL) {
2728                 /* No IPs defined so create a dummy empty struct and ship it */
2729                 ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2730                 if (ips == NULL) {
2731                         reply.status = ENOMEM;
2732                         reply.errmsg = "Memory error";
2733                         goto done;
2734                 }
2735                 goto ok;
2736         }
2737
2738         ips = &ctdb->known_ips[header->destnode];
2739
2740         if (request->flags & CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE) {
2741                 /* If runstate is not RUNNING or a node is then return
2742                  * no available IPs.  Don't worry about interface
2743                  * states here - we're not faking down to that level.
2744                  */
2745                 uint32_t flags = ctdb->node_map->node[header->destnode].flags;
2746                 if (ctdb->runstate != CTDB_RUNSTATE_RUNNING ||
2747                     ((flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED)) != 0)) {
2748                         /* No available IPs: return dummy empty struct */
2749                         ips = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
2750                         if (ips == NULL) {
2751                                 reply.status = ENOMEM;
2752                                 reply.errmsg = "Memory error";
2753                                 goto done;
2754                         }
2755                 }
2756         }
2757
2758 ok:
2759         reply.rdata.data.pubip_list = ips;
2760         reply.status = 0;
2761         reply.errmsg = NULL;
2762
2763 done:
2764         client_send_control(req, header, &reply);
2765 }
2766
2767 static void control_get_nodemap(TALLOC_CTX *mem_ctx,
2768                                 struct tevent_req *req,
2769                                 struct ctdb_req_header *header,
2770                                 struct ctdb_req_control *request)
2771 {
2772         struct client_state *state = tevent_req_data(
2773                 req, struct client_state);
2774         struct ctdbd_context *ctdb = state->ctdb;
2775         struct ctdb_reply_control reply;
2776         struct ctdb_node_map *nodemap;
2777         struct node *node;
2778         unsigned int i;
2779
2780         reply.rdata.opcode = request->opcode;
2781
2782         nodemap = talloc_zero(mem_ctx, struct ctdb_node_map);
2783         if (nodemap == NULL) {
2784                 goto fail;
2785         }
2786
2787         nodemap->num = ctdb->node_map->num_nodes;
2788         nodemap->node = talloc_array(nodemap, struct ctdb_node_and_flags,
2789                                      nodemap->num);
2790         if (nodemap->node == NULL) {
2791                 goto fail;
2792         }
2793
2794         for (i=0; i<nodemap->num; i++) {
2795                 node = &ctdb->node_map->node[i];
2796                 nodemap->node[i] = (struct ctdb_node_and_flags) {
2797                         .pnn = node->pnn,
2798                         .flags = node->flags,
2799                         .addr = node->addr,
2800                 };
2801         }
2802
2803         reply.rdata.data.nodemap = nodemap;
2804         reply.status = 0;
2805         reply.errmsg = NULL;
2806         client_send_control(req, header, &reply);
2807         return;
2808
2809 fail:
2810         reply.status = -1;
2811         reply.errmsg = "Memory error";
2812         client_send_control(req, header, &reply);
2813 }
2814
2815 static void control_get_reclock_file(TALLOC_CTX *mem_ctx,
2816                                      struct tevent_req *req,
2817                                      struct ctdb_req_header *header,
2818                                      struct ctdb_req_control *request)
2819 {
2820         struct client_state *state = tevent_req_data(
2821                 req, struct client_state);
2822         struct ctdbd_context *ctdb = state->ctdb;
2823         struct ctdb_reply_control reply;
2824
2825         reply.rdata.opcode = request->opcode;
2826
2827         if (ctdb->reclock != NULL) {
2828                 reply.rdata.data.reclock_file =
2829                         talloc_strdup(mem_ctx, ctdb->reclock);
2830                 if (reply.rdata.data.reclock_file == NULL) {
2831                         reply.status = ENOMEM;
2832                         reply.errmsg = "Memory error";
2833                         goto done;
2834                 }
2835         } else {
2836                 reply.rdata.data.reclock_file = NULL;
2837         }
2838
2839         reply.status = 0;
2840         reply.errmsg = NULL;
2841
2842 done:
2843         client_send_control(req, header, &reply);
2844 }
2845
2846 static void control_stop_node(TALLOC_CTX *mem_ctx,
2847                               struct tevent_req *req,
2848                               struct ctdb_req_header *header,
2849                               struct ctdb_req_control *request)
2850 {
2851         struct client_state *state = tevent_req_data(
2852                 req, struct client_state);
2853         struct ctdbd_context *ctdb = state->ctdb;
2854         struct ctdb_reply_control reply;
2855
2856         reply.rdata.opcode = request->opcode;
2857
2858         DEBUG(DEBUG_INFO, ("Stopping node\n"));
2859         ctdb->node_map->node[header->destnode].flags |= NODE_FLAGS_STOPPED;
2860
2861         reply.status = 0;
2862         reply.errmsg = NULL;
2863
2864         client_send_control(req, header, &reply);
2865         return;
2866 }
2867
2868 static void control_continue_node(TALLOC_CTX *mem_ctx,
2869                                   struct tevent_req *req,
2870                                   struct ctdb_req_header *header,
2871                                   struct ctdb_req_control *request)
2872 {
2873         struct client_state *state = tevent_req_data(
2874                 req, struct client_state);
2875         struct ctdbd_context *ctdb = state->ctdb;
2876         struct ctdb_reply_control reply;
2877
2878         reply.rdata.opcode = request->opcode;
2879
2880         DEBUG(DEBUG_INFO, ("Continue node\n"));
2881         ctdb->node_map->node[header->destnode].flags &= ~NODE_FLAGS_STOPPED;
2882
2883         reply.status = 0;
2884         reply.errmsg = NULL;
2885
2886         client_send_control(req, header, &reply);
2887         return;
2888 }
2889
2890 static void set_ban_state_callback(struct tevent_req *subreq)
2891 {
2892         struct node *node = tevent_req_callback_data(
2893                 subreq, struct node);
2894         bool status;
2895
2896         status = tevent_wakeup_recv(subreq);
2897         TALLOC_FREE(subreq);
2898         if (! status) {
2899                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
2900         }
2901
2902         node->flags &= ~NODE_FLAGS_BANNED;
2903 }
2904
2905 static void control_set_ban_state(TALLOC_CTX *mem_ctx,
2906                                   struct tevent_req *req,
2907                                   struct ctdb_req_header *header,
2908                                   struct ctdb_req_control *request)
2909 {
2910         struct client_state *state = tevent_req_data(
2911                 req, struct client_state);
2912         struct tevent_req *subreq;
2913         struct ctdbd_context *ctdb = state->ctdb;
2914         struct ctdb_ban_state *ban = request->rdata.data.ban_state;
2915         struct ctdb_reply_control reply;
2916         struct node *node;
2917
2918         reply.rdata.opcode = request->opcode;
2919
2920         if (ban->pnn != header->destnode) {
2921                 DEBUG(DEBUG_INFO,
2922                       ("SET_BAN_STATE control for PNN %d rejected\n",
2923                        ban->pnn));
2924                 reply.status = EINVAL;
2925                 goto fail;
2926         }
2927
2928         node = &ctdb->node_map->node[header->destnode];
2929
2930         if (ban->time == 0) {
2931                 DEBUG(DEBUG_INFO,("Unbanning this node\n"));
2932                 node->flags &= ~NODE_FLAGS_BANNED;
2933                 goto done;
2934         }
2935
2936         subreq = tevent_wakeup_send(ctdb->node_map, state->ev,
2937                                     tevent_timeval_current_ofs(
2938                                             ban->time, 0));
2939         if (subreq == NULL) {
2940                 reply.status = ENOMEM;
2941                 goto fail;
2942         }
2943         tevent_req_set_callback(subreq, set_ban_state_callback, node);
2944
2945         DEBUG(DEBUG_INFO, ("Banning this node for %d seconds\n", ban->time));
2946         node->flags |= NODE_FLAGS_BANNED;
2947         ctdb->vnn_map->generation = INVALID_GENERATION;
2948
2949 done:
2950         reply.status = 0;
2951         reply.errmsg = NULL;
2952
2953         client_send_control(req, header, &reply);
2954         return;
2955
2956 fail:
2957         reply.errmsg = "Failed to ban node";
2958 }
2959
2960 static void control_trans3_commit(TALLOC_CTX *mem_ctx,
2961                                   struct tevent_req *req,
2962                                   struct ctdb_req_header *header,
2963                                   struct ctdb_req_control *request)
2964 {
2965         struct client_state *state = tevent_req_data(
2966                 req, struct client_state);
2967         struct ctdbd_context *ctdb = state->ctdb;
2968         struct ctdb_reply_control reply;
2969         struct database *db;
2970         int ret;
2971
2972         reply.rdata.opcode = request->opcode;
2973
2974         db = database_find(ctdb->db_map, request->rdata.data.recbuf->db_id);
2975         if (db == NULL) {
2976                 reply.status = -1;
2977                 reply.errmsg = "Unknown database";
2978                 client_send_control(req, header, &reply);
2979                 return;
2980         }
2981
2982         if (! (db->flags &
2983                (CTDB_DB_FLAGS_PERSISTENT|CTDB_DB_FLAGS_REPLICATED))) {
2984                 reply.status = -1;
2985                 reply.errmsg = "Transactions on volatile database";
2986                 client_send_control(req, header, &reply);
2987                 return;
2988         }
2989
2990         ret = ltdb_transaction(db, request->rdata.data.recbuf);
2991         if (ret != 0) {
2992                 reply.status = -1;
2993                 reply.errmsg = "Transaction failed";
2994                 client_send_control(req, header, &reply);
2995                 return;
2996         }
2997
2998         reply.status = 0;
2999         reply.errmsg = NULL;
3000         client_send_control(req, header, &reply);
3001 }
3002
3003 static void control_get_db_seqnum(TALLOC_CTX *mem_ctx,
3004                                struct tevent_req *req,
3005                                struct ctdb_req_header *header,
3006                                struct ctdb_req_control *request)
3007 {
3008         struct client_state *state = tevent_req_data(
3009                 req, struct client_state);
3010         struct ctdbd_context *ctdb = state->ctdb;
3011         struct ctdb_reply_control reply;
3012         struct database *db;
3013         int ret;
3014
3015         reply.rdata.opcode = request->opcode;
3016
3017         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3018         if (db == NULL) {
3019                 reply.status = ENOENT;
3020                 reply.errmsg = "Database not found";
3021         } else {
3022                 uint64_t seqnum;
3023
3024                 ret = database_seqnum(db, &seqnum);
3025                 if (ret == 0) {
3026                         reply.rdata.data.seqnum = seqnum;
3027                         reply.status = 0;
3028                         reply.errmsg = NULL;
3029                 } else {
3030                         reply.status = ret;
3031                         reply.errmsg = "Failed to get seqnum";
3032                 }
3033         }
3034
3035         client_send_control(req, header, &reply);
3036 }
3037
3038 static void control_db_get_health(TALLOC_CTX *mem_ctx,
3039                                   struct tevent_req *req,
3040                                   struct ctdb_req_header *header,
3041                                   struct ctdb_req_control *request)
3042 {
3043         struct client_state *state = tevent_req_data(
3044                 req, struct client_state);
3045         struct ctdbd_context *ctdb = state->ctdb;
3046         struct ctdb_reply_control reply;
3047         struct database *db;
3048
3049         reply.rdata.opcode = request->opcode;
3050
3051         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3052         if (db == NULL) {
3053                 reply.status = ENOENT;
3054                 reply.errmsg = "Database not found";
3055         } else {
3056                 reply.rdata.data.reason = NULL;
3057                 reply.status = 0;
3058                 reply.errmsg = NULL;
3059         }
3060
3061         client_send_control(req, header, &reply);
3062 }
3063
3064 static struct ctdb_iface_list *get_ctdb_iface_list(TALLOC_CTX *mem_ctx,
3065                                                    struct ctdbd_context *ctdb)
3066 {
3067         struct ctdb_iface_list *iface_list;
3068         struct interface *iface;
3069         unsigned int i;
3070
3071         iface_list = talloc_zero(mem_ctx, struct ctdb_iface_list);
3072         if (iface_list == NULL) {
3073                 goto done;
3074         }
3075
3076         iface_list->num = ctdb->iface_map->num;
3077         iface_list->iface = talloc_array(iface_list, struct ctdb_iface,
3078                                          iface_list->num);
3079         if (iface_list->iface == NULL) {
3080                 TALLOC_FREE(iface_list);
3081                 goto done;
3082         }
3083
3084         for (i=0; i<iface_list->num; i++) {
3085                 iface = &ctdb->iface_map->iface[i];
3086                 iface_list->iface[i] = (struct ctdb_iface) {
3087                         .link_state = iface->link_up,
3088                         .references = iface->references,
3089                 };
3090                 strlcpy(iface_list->iface[i].name, iface->name,
3091                         sizeof(iface_list->iface[i].name));
3092         }
3093
3094 done:
3095         return iface_list;
3096 }
3097
3098 static void control_get_public_ip_info(TALLOC_CTX *mem_ctx,
3099                                        struct tevent_req *req,
3100                                        struct ctdb_req_header *header,
3101                                        struct ctdb_req_control *request)
3102 {
3103         struct client_state *state = tevent_req_data(
3104                 req, struct client_state);
3105         struct ctdbd_context *ctdb = state->ctdb;
3106         struct ctdb_reply_control reply;
3107         ctdb_sock_addr *addr = request->rdata.data.addr;
3108         struct ctdb_public_ip_list *known = NULL;
3109         struct ctdb_public_ip_info *info = NULL;
3110         unsigned i;
3111
3112         reply.rdata.opcode = request->opcode;
3113
3114         info = talloc_zero(mem_ctx, struct ctdb_public_ip_info);
3115         if (info == NULL) {
3116                 reply.status = ENOMEM;
3117                 reply.errmsg = "Memory error";
3118                 goto done;
3119         }
3120
3121         reply.rdata.data.ipinfo = info;
3122
3123         if (ctdb->known_ips != NULL) {
3124                 known = &ctdb->known_ips[header->destnode];
3125         } else {
3126                 /* No IPs defined so create a dummy empty struct and
3127                  * fall through.  The given IP won't be matched
3128                  * below...
3129                  */
3130                 known = talloc_zero(mem_ctx, struct ctdb_public_ip_list);;
3131                 if (known == NULL) {
3132                         reply.status = ENOMEM;
3133                         reply.errmsg = "Memory error";
3134                         goto done;
3135                 }
3136         }
3137
3138         for (i = 0; i < known->num; i++) {
3139                 if (ctdb_sock_addr_same_ip(&known->ip[i].addr,
3140                                            addr)) {
3141                         break;
3142                 }
3143         }
3144
3145         if (i == known->num) {
3146                 D_ERR("GET_PUBLIC_IP_INFO: not known public IP %s\n",
3147                       ctdb_sock_addr_to_string(mem_ctx, addr, false));
3148                 reply.status = -1;
3149                 reply.errmsg = "Unknown address";
3150                 goto done;
3151         }
3152
3153         info->ip = known->ip[i];
3154
3155         /* The fake PUBLICIPS stanza and resulting known_ips data
3156          * don't know anything about interfaces, so completely fake
3157          * this.
3158          */
3159         info->active_idx = 0;
3160
3161         info->ifaces = get_ctdb_iface_list(mem_ctx, ctdb);
3162         if (info->ifaces == NULL) {
3163                 reply.status = ENOMEM;
3164                 reply.errmsg = "Memory error";
3165                 goto done;
3166         }
3167
3168         reply.status = 0;
3169         reply.errmsg = NULL;
3170
3171 done:
3172         client_send_control(req, header, &reply);
3173 }
3174
3175 static void control_get_ifaces(TALLOC_CTX *mem_ctx,
3176                                struct tevent_req *req,
3177                                struct ctdb_req_header *header,
3178                                struct ctdb_req_control *request)
3179 {
3180         struct client_state *state = tevent_req_data(
3181                 req, struct client_state);
3182         struct ctdbd_context *ctdb = state->ctdb;
3183         struct ctdb_reply_control reply;
3184         struct ctdb_iface_list *iface_list;
3185
3186         reply.rdata.opcode = request->opcode;
3187
3188         iface_list = get_ctdb_iface_list(mem_ctx, ctdb);
3189         if (iface_list == NULL) {
3190                 goto fail;
3191         }
3192
3193         reply.rdata.data.iface_list = iface_list;
3194         reply.status = 0;
3195         reply.errmsg = NULL;
3196         client_send_control(req, header, &reply);
3197         return;
3198
3199 fail:
3200         reply.status = -1;
3201         reply.errmsg = "Memory error";
3202         client_send_control(req, header, &reply);
3203 }
3204
3205 static void control_set_iface_link_state(TALLOC_CTX *mem_ctx,
3206                                          struct tevent_req *req,
3207                                          struct ctdb_req_header *header,
3208                                          struct ctdb_req_control *request)
3209 {
3210         struct client_state *state = tevent_req_data(
3211                 req, struct client_state);
3212         struct ctdbd_context *ctdb = state->ctdb;
3213         struct ctdb_reply_control reply;
3214         struct ctdb_iface *in_iface;
3215         struct interface *iface = NULL;
3216         bool link_up = false;
3217         int i;
3218
3219         reply.rdata.opcode = request->opcode;
3220
3221         in_iface = request->rdata.data.iface;
3222
3223         if (in_iface->name[CTDB_IFACE_SIZE] != '\0') {
3224                 reply.errmsg = "interface name not terminated";
3225                 goto fail;
3226         }
3227
3228         switch (in_iface->link_state) {
3229                 case 0:
3230                         link_up = false;
3231                         break;
3232
3233                 case 1:
3234                         link_up = true;
3235                         break;
3236
3237                 default:
3238                         reply.errmsg = "invalid link state";
3239                         goto fail;
3240         }
3241
3242         if (in_iface->references != 0) {
3243                 reply.errmsg = "references should be 0";
3244                 goto fail;
3245         }
3246
3247         for (i=0; i<ctdb->iface_map->num; i++) {
3248                 if (strcmp(ctdb->iface_map->iface[i].name,
3249                            in_iface->name) == 0) {
3250                         iface = &ctdb->iface_map->iface[i];
3251                         break;
3252                 }
3253         }
3254
3255         if (iface == NULL) {
3256                 reply.errmsg = "interface not found";
3257                 goto fail;
3258         }
3259
3260         iface->link_up = link_up;
3261
3262         reply.status = 0;
3263         reply.errmsg = NULL;
3264         client_send_control(req, header, &reply);
3265         return;
3266
3267 fail:
3268         reply.status = -1;
3269         client_send_control(req, header, &reply);
3270 }
3271
3272 static void control_set_db_readonly(TALLOC_CTX *mem_ctx,
3273                                     struct tevent_req *req,
3274                                     struct ctdb_req_header *header,
3275                                     struct ctdb_req_control *request)
3276 {
3277         struct client_state *state = tevent_req_data(
3278                 req, struct client_state);
3279         struct ctdbd_context *ctdb = state->ctdb;
3280         struct ctdb_reply_control reply;
3281         struct database *db;
3282
3283         reply.rdata.opcode = request->opcode;
3284
3285         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3286         if (db == NULL) {
3287                 reply.status = ENOENT;
3288                 reply.errmsg = "Database not found";
3289                 goto done;
3290         }
3291
3292         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3293                 reply.status = EINVAL;
3294                 reply.errmsg = "Can not set READONLY on persistent db";
3295                 goto done;
3296         }
3297
3298         db->flags |= CTDB_DB_FLAGS_READONLY;
3299         reply.status = 0;
3300         reply.errmsg = NULL;
3301
3302 done:
3303         client_send_control(req, header, &reply);
3304 }
3305
3306 struct traverse_start_ext_state {
3307         struct tevent_req *req;
3308         struct ctdb_req_header *header;
3309         uint32_t reqid;
3310         uint64_t srvid;
3311         bool withemptyrecords;
3312         int status;
3313 };
3314
3315 static int traverse_start_ext_handler(struct tdb_context *tdb,
3316                                       TDB_DATA key, TDB_DATA data,
3317                                       void *private_data)
3318 {
3319         struct traverse_start_ext_state *state =
3320                 (struct traverse_start_ext_state *)private_data;
3321         struct ctdb_rec_data rec;
3322         struct ctdb_req_message_data message;
3323         size_t np;
3324
3325         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
3326                 return 0;
3327         }
3328
3329         if ((data.dsize == sizeof(struct ctdb_ltdb_header)) &&
3330             (!state->withemptyrecords)) {
3331                 return 0;
3332         }
3333
3334         rec = (struct ctdb_rec_data) {
3335                 .reqid = state->reqid,
3336                 .header = NULL,
3337                 .key = key,
3338                 .data = data,
3339         };
3340
3341         message.srvid = state->srvid;
3342         message.data.dsize = ctdb_rec_data_len(&rec);
3343         message.data.dptr = talloc_size(state->req, message.data.dsize);
3344         if (message.data.dptr == NULL) {
3345                 state->status = ENOMEM;
3346                 return 1;
3347         }
3348
3349         ctdb_rec_data_push(&rec, message.data.dptr, &np);
3350         client_send_message(state->req, state->header, &message);
3351
3352         talloc_free(message.data.dptr);
3353
3354         return 0;
3355 }
3356
3357 static void control_traverse_start_ext(TALLOC_CTX *mem_ctx,
3358                                        struct tevent_req *req,
3359                                        struct ctdb_req_header *header,
3360                                        struct ctdb_req_control *request)
3361 {
3362         struct client_state *state = tevent_req_data(
3363                 req, struct client_state);
3364         struct ctdbd_context *ctdb = state->ctdb;
3365         struct ctdb_reply_control reply;
3366         struct database *db;
3367         struct ctdb_traverse_start_ext *ext;
3368         struct traverse_start_ext_state t_state;
3369         struct ctdb_rec_data rec;
3370         struct ctdb_req_message_data message;
3371         uint8_t buffer[32];
3372         size_t np;
3373         int ret;
3374
3375         reply.rdata.opcode = request->opcode;
3376
3377         ext = request->rdata.data.traverse_start_ext;
3378
3379         db = database_find(ctdb->db_map, ext->db_id);
3380         if (db == NULL) {
3381                 reply.status = -1;
3382                 reply.errmsg = "Unknown database";
3383                 client_send_control(req, header, &reply);
3384                 return;
3385         }
3386
3387         t_state = (struct traverse_start_ext_state) {
3388                 .req = req,
3389                 .header = header,
3390                 .reqid = ext->reqid,
3391                 .srvid = ext->srvid,
3392                 .withemptyrecords = ext->withemptyrecords,
3393         };
3394
3395         ret = tdb_traverse_read(db->tdb, traverse_start_ext_handler, &t_state);
3396         DEBUG(DEBUG_INFO, ("traversed %d records\n", ret));
3397         if (t_state.status != 0) {
3398                 reply.status = -1;
3399                 reply.errmsg = "Memory error";
3400                 client_send_control(req, header, &reply);
3401         }
3402
3403         reply.status = 0;
3404         client_send_control(req, header, &reply);
3405
3406         rec = (struct ctdb_rec_data) {
3407                 .reqid = ext->reqid,
3408                 .header = NULL,
3409                 .key = tdb_null,
3410                 .data = tdb_null,
3411         };
3412
3413         message.srvid = ext->srvid;
3414         message.data.dsize = ctdb_rec_data_len(&rec);
3415         ctdb_rec_data_push(&rec, buffer, &np);
3416         message.data.dptr = buffer;
3417         client_send_message(req, header, &message);
3418 }
3419
3420 static void control_set_db_sticky(TALLOC_CTX *mem_ctx,
3421                                     struct tevent_req *req,
3422                                     struct ctdb_req_header *header,
3423                                     struct ctdb_req_control *request)
3424 {
3425         struct client_state *state = tevent_req_data(
3426                 req, struct client_state);
3427         struct ctdbd_context *ctdb = state->ctdb;
3428         struct ctdb_reply_control reply;
3429         struct database *db;
3430
3431         reply.rdata.opcode = request->opcode;
3432
3433         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3434         if (db == NULL) {
3435                 reply.status = ENOENT;
3436                 reply.errmsg = "Database not found";
3437                 goto done;
3438         }
3439
3440         if (db->flags & CTDB_DB_FLAGS_PERSISTENT) {
3441                 reply.status = EINVAL;
3442                 reply.errmsg = "Can not set STICKY on persistent db";
3443                 goto done;
3444         }
3445
3446         db->flags |= CTDB_DB_FLAGS_STICKY;
3447         reply.status = 0;
3448         reply.errmsg = NULL;
3449
3450 done:
3451         client_send_control(req, header, &reply);
3452 }
3453
3454 static void control_start_ipreallocate(TALLOC_CTX *mem_ctx,
3455                                        struct tevent_req *req,
3456                                        struct ctdb_req_header *header,
3457                                        struct ctdb_req_control *request)
3458 {
3459         struct ctdb_reply_control reply;
3460
3461         /* Always succeed */
3462         reply.rdata.opcode = request->opcode;
3463         reply.status = 0;
3464         reply.errmsg = NULL;
3465
3466         client_send_control(req, header, &reply);
3467 }
3468
3469 static void control_ipreallocated(TALLOC_CTX *mem_ctx,
3470                                   struct tevent_req *req,
3471                                   struct ctdb_req_header *header,
3472                                   struct ctdb_req_control *request)
3473 {
3474         struct ctdb_reply_control reply;
3475
3476         /* Always succeed */
3477         reply.rdata.opcode = request->opcode;
3478         reply.status = 0;
3479         reply.errmsg = NULL;
3480
3481         client_send_control(req, header, &reply);
3482 }
3483
3484 static void control_get_runstate(TALLOC_CTX *mem_ctx,
3485                                  struct tevent_req *req,
3486                                  struct ctdb_req_header *header,
3487                                  struct ctdb_req_control *request)
3488 {
3489         struct client_state *state = tevent_req_data(
3490                 req, struct client_state);
3491         struct ctdbd_context *ctdb = state->ctdb;
3492         struct ctdb_reply_control reply;
3493
3494         reply.rdata.opcode = request->opcode;
3495         reply.rdata.data.runstate = ctdb->runstate;
3496         reply.status = 0;
3497         reply.errmsg = NULL;
3498
3499         client_send_control(req, header, &reply);
3500 }
3501
3502 static void control_get_nodes_file(TALLOC_CTX *mem_ctx,
3503                                    struct tevent_req *req,
3504                                    struct ctdb_req_header *header,
3505                                    struct ctdb_req_control *request)
3506 {
3507         struct ctdb_reply_control reply;
3508         struct ctdb_node_map *nodemap;
3509
3510         reply.rdata.opcode = request->opcode;
3511
3512         nodemap = read_nodes_file(mem_ctx, header->destnode);
3513         if (nodemap == NULL) {
3514                 goto fail;
3515         }
3516
3517         reply.rdata.data.nodemap = nodemap;
3518         reply.status = 0;
3519         reply.errmsg = NULL;
3520         client_send_control(req, header, &reply);
3521         return;
3522
3523 fail:
3524         reply.status = -1;
3525         reply.errmsg = "Failed to read nodes file";
3526         client_send_control(req, header, &reply);
3527 }
3528
3529 static void control_db_open_flags(TALLOC_CTX *mem_ctx,
3530                                   struct tevent_req *req,
3531                                   struct ctdb_req_header *header,
3532                                   struct ctdb_req_control *request)
3533 {
3534         struct client_state *state = tevent_req_data(
3535                 req, struct client_state);
3536         struct ctdbd_context *ctdb = state->ctdb;
3537         struct ctdb_reply_control reply;
3538         struct database *db;
3539
3540         reply.rdata.opcode = request->opcode;
3541
3542         db = database_find(ctdb->db_map, request->rdata.data.db_id);
3543         if (db == NULL) {
3544                 reply.status = ENOENT;
3545                 reply.errmsg = "Database not found";
3546         } else {
3547                 reply.rdata.data.tdb_flags = database_flags(db->flags);
3548                 reply.status = 0;
3549                 reply.errmsg = NULL;
3550         }
3551
3552         client_send_control(req, header, &reply);
3553 }
3554
3555 static void control_db_attach_replicated(TALLOC_CTX *mem_ctx,
3556                                          struct tevent_req *req,
3557                                          struct ctdb_req_header *header,
3558                                          struct ctdb_req_control *request)
3559 {
3560         struct client_state *state = tevent_req_data(
3561                 req, struct client_state);
3562         struct ctdbd_context *ctdb = state->ctdb;
3563         struct ctdb_reply_control reply;
3564         struct database *db;
3565
3566         reply.rdata.opcode = request->opcode;
3567
3568         for (db = ctdb->db_map->db; db != NULL; db = db->next) {
3569                 if (strcmp(db->name, request->rdata.data.db_name) == 0) {
3570                         goto done;
3571                 }
3572         }
3573
3574         db = database_new(ctdb->db_map, request->rdata.data.db_name,
3575                           CTDB_DB_FLAGS_REPLICATED);
3576         if (db == NULL) {
3577                 reply.status = -1;
3578                 reply.errmsg = "Failed to attach database";
3579                 client_send_control(req, header, &reply);
3580                 return;
3581         }
3582
3583 done:
3584         reply.rdata.data.db_id = db->id;
3585         reply.status = 0;
3586         reply.errmsg = NULL;
3587         client_send_control(req, header, &reply);
3588 }
3589
3590 static void control_check_pid_srvid(TALLOC_CTX *mem_ctx,
3591                                     struct tevent_req *req,
3592                                     struct ctdb_req_header *header,
3593                                     struct ctdb_req_control *request)
3594 {
3595         struct client_state *state = tevent_req_data(
3596                 req, struct client_state);
3597         struct ctdbd_context *ctdb = state->ctdb;
3598         struct ctdb_client *client;
3599         struct client_state *cstate;
3600         struct ctdb_reply_control reply;
3601         bool pid_found, srvid_found;
3602         int ret;
3603
3604         reply.rdata.opcode = request->opcode;
3605
3606         pid_found = false;
3607         srvid_found = false;
3608
3609         for (client=ctdb->client_list; client != NULL; client=client->next) {
3610                 if (client->pid == request->rdata.data.pid_srvid->pid) {
3611                         pid_found = true;
3612                         cstate = (struct client_state *)client->state;
3613                         ret = srvid_exists(ctdb->srv,
3614                                            request->rdata.data.pid_srvid->srvid,
3615                                            cstate);
3616                         if (ret == 0) {
3617                                 srvid_found = true;
3618                                 ret = kill(cstate->pid, 0);
3619                                 if (ret != 0) {
3620                                         reply.status = ret;
3621                                         reply.errmsg = strerror(errno);
3622                                 } else {
3623                                         reply.status = 0;
3624                                         reply.errmsg = NULL;
3625                                 }
3626                         }
3627                 }
3628         }
3629
3630         if (! pid_found) {
3631                 reply.status = -1;
3632                 reply.errmsg = "No client for PID";
3633         } else if (! srvid_found) {
3634                 reply.status = -1;
3635                 reply.errmsg = "No client for PID and SRVID";
3636         }
3637
3638         client_send_control(req, header, &reply);
3639 }
3640
3641 static void control_disable_node(TALLOC_CTX *mem_ctx,
3642                                  struct tevent_req *req,
3643                                  struct ctdb_req_header *header,
3644                                  struct ctdb_req_control *request)
3645 {
3646         struct client_state *state = tevent_req_data(
3647                 req, struct client_state);
3648         struct ctdbd_context *ctdb = state->ctdb;
3649         struct ctdb_reply_control reply;
3650
3651         reply.rdata.opcode = request->opcode;
3652
3653         DEBUG(DEBUG_INFO, ("Disabling node\n"));
3654         ctdb->node_map->node[header->destnode].flags |=
3655                 NODE_FLAGS_PERMANENTLY_DISABLED;
3656
3657         reply.status = 0;
3658         reply.errmsg = NULL;
3659
3660         client_send_control(req, header, &reply);
3661         return;
3662 }
3663
3664 static void control_enable_node(TALLOC_CTX *mem_ctx,
3665                                   struct tevent_req *req,
3666                                   struct ctdb_req_header *header,
3667                                   struct ctdb_req_control *request)
3668 {
3669         struct client_state *state = tevent_req_data(
3670                 req, struct client_state);
3671         struct ctdbd_context *ctdb = state->ctdb;
3672         struct ctdb_reply_control reply;
3673
3674         reply.rdata.opcode = request->opcode;
3675
3676         DEBUG(DEBUG_INFO, ("Enable node\n"));
3677         ctdb->node_map->node[header->destnode].flags &=
3678                 ~NODE_FLAGS_PERMANENTLY_DISABLED;
3679
3680         reply.status = 0;
3681         reply.errmsg = NULL;
3682
3683         client_send_control(req, header, &reply);
3684         return;
3685 }
3686
3687 static bool fake_control_failure(TALLOC_CTX *mem_ctx,
3688                                  struct tevent_req *req,
3689                                  struct ctdb_req_header *header,
3690                                  struct ctdb_req_control *request)
3691 {
3692         struct client_state *state = tevent_req_data(
3693                 req, struct client_state);
3694         struct ctdbd_context *ctdb = state->ctdb;
3695         struct ctdb_reply_control reply;
3696         struct fake_control_failure *f = NULL;
3697
3698         D_DEBUG("Checking fake control failure for control %u on node %u\n",
3699                 request->opcode, header->destnode);
3700         for (f = ctdb->control_failures; f != NULL; f = f->next) {
3701                 if (f->opcode == request->opcode &&
3702                     (f->pnn == header->destnode ||
3703                      f->pnn == CTDB_UNKNOWN_PNN)) {
3704
3705                         reply.rdata.opcode = request->opcode;
3706                         if (strcmp(f->error, "TIMEOUT") == 0) {
3707                                 /* Causes no reply */
3708                                 D_ERR("Control %u fake timeout on node %u\n",
3709                                       request->opcode, header->destnode);
3710                                 return true;
3711                         } else if (strcmp(f->error, "ERROR") == 0) {
3712                                 D_ERR("Control %u fake error on node %u\n",
3713                                       request->opcode, header->destnode);
3714                                 reply.status = -1;
3715                                 reply.errmsg = f->comment;
3716                                 client_send_control(req, header, &reply);
3717                                 return true;
3718                         }
3719                 }
3720         }
3721
3722         return false;
3723 }
3724
3725 static void control_error(TALLOC_CTX *mem_ctx,
3726                           struct tevent_req *req,
3727                           struct ctdb_req_header *header,
3728                           struct ctdb_req_control *request)
3729 {
3730         struct ctdb_reply_control reply;
3731
3732         D_DEBUG("Control %u not implemented\n", request->opcode);
3733
3734         reply.rdata.opcode = request->opcode;
3735         reply.status = -1;
3736         reply.errmsg = "Not implemented";
3737
3738         client_send_control(req, header, &reply);
3739 }
3740
3741 /*
3742  * Handling protocol - messages
3743  */
3744
3745 struct disable_recoveries_state {
3746         struct node *node;
3747 };
3748
3749 static void disable_recoveries_callback(struct tevent_req *subreq)
3750 {
3751         struct disable_recoveries_state *substate = tevent_req_callback_data(
3752                 subreq, struct disable_recoveries_state);
3753         bool status;
3754
3755         status = tevent_wakeup_recv(subreq);
3756         TALLOC_FREE(subreq);
3757         if (! status) {
3758                 DEBUG(DEBUG_INFO, ("tevent_wakeup_recv failed\n"));
3759         }
3760
3761         substate->node->recovery_disabled = false;
3762         TALLOC_FREE(substate->node->recovery_substate);
3763 }
3764
3765 static void message_disable_recoveries(TALLOC_CTX *mem_ctx,
3766                                        struct tevent_req *req,
3767                                        struct ctdb_req_header *header,
3768                                        struct ctdb_req_message *request)
3769 {
3770         struct client_state *state = tevent_req_data(
3771                 req, struct client_state);
3772         struct tevent_req *subreq;
3773         struct ctdbd_context *ctdb = state->ctdb;
3774         struct disable_recoveries_state *substate;
3775         struct ctdb_disable_message *disable = request->data.disable;
3776         struct ctdb_req_message_data reply;
3777         struct node *node;
3778         int ret = -1;
3779         TDB_DATA data;
3780
3781         node = &ctdb->node_map->node[header->destnode];
3782
3783         if (disable->timeout == 0) {
3784                 TALLOC_FREE(node->recovery_substate);
3785                 node->recovery_disabled = false;
3786                 DEBUG(DEBUG_INFO, ("Enabled recoveries on node %u\n",
3787                                    header->destnode));
3788                 goto done;
3789         }
3790
3791         substate = talloc_zero(ctdb->node_map,
3792                                struct disable_recoveries_state);
3793         if (substate == NULL) {
3794                 goto fail;
3795         }
3796
3797         substate->node = node;
3798
3799         subreq = tevent_wakeup_send(substate, state->ev,
3800                                     tevent_timeval_current_ofs(
3801                                             disable->timeout, 0));
3802         if (subreq == NULL) {
3803                 talloc_free(substate);
3804                 goto fail;
3805         }
3806         tevent_req_set_callback(subreq, disable_recoveries_callback, substate);
3807
3808         DEBUG(DEBUG_INFO, ("Disabled recoveries for %d seconds on node %u\n",
3809                            disable->timeout, header->destnode));
3810         node->recovery_substate = substate;
3811         node->recovery_disabled = true;
3812
3813 done:
3814         ret = header->destnode;
3815
3816 fail:
3817         reply.srvid = disable->srvid;
3818         data.dptr = (uint8_t *)&ret;
3819         data.dsize = sizeof(int);
3820         reply.data = data;
3821
3822         client_send_message(req, header, &reply);
3823 }
3824
3825 static void message_takeover_run(TALLOC_CTX *mem_ctx,
3826                                  struct tevent_req *req,
3827                                  struct ctdb_req_header *header,
3828                                  struct ctdb_req_message *request)
3829 {
3830         struct client_state *state = tevent_req_data(
3831                 req, struct client_state);
3832         struct ctdbd_context *ctdb = state->ctdb;
3833         struct ctdb_srvid_message *srvid = request->data.msg;
3834         struct ctdb_req_message_data reply;
3835         int ret = -1;
3836         TDB_DATA data;
3837
3838         if (header->destnode != ctdb->node_map->recmaster) {
3839                 /* No reply! Only recmaster replies... */
3840                 return;
3841         }
3842
3843         DEBUG(DEBUG_INFO, ("IP takover run on node %u\n",
3844                            header->destnode));
3845         ret = header->destnode;
3846
3847         reply.srvid = srvid->srvid;
3848         data.dptr = (uint8_t *)&ret;
3849         data.dsize = sizeof(int);
3850         reply.data = data;
3851
3852         client_send_message(req, header, &reply);
3853 }
3854
3855 /*
3856  * Handle a single client
3857  */
3858
3859 static void client_read_handler(uint8_t *buf, size_t buflen,
3860                                 void *private_data);
3861 static void client_dead_handler(void *private_data);
3862 static void client_process_packet(struct tevent_req *req,
3863                                   uint8_t *buf, size_t buflen);
3864 static void client_process_call(struct tevent_req *req,
3865                                 uint8_t *buf, size_t buflen);
3866 static void client_process_message(struct tevent_req *req,
3867                                    uint8_t *buf, size_t buflen);
3868 static void client_process_control(struct tevent_req *req,
3869                                    uint8_t *buf, size_t buflen);
3870 static void client_reply_done(struct tevent_req *subreq);
3871
3872 static struct tevent_req *client_send(TALLOC_CTX *mem_ctx,
3873                                       struct tevent_context *ev,
3874                                       int fd, struct ctdbd_context *ctdb,
3875                                       int pnn)
3876 {
3877         struct tevent_req *req;
3878         struct client_state *state;
3879         int ret;
3880
3881         req = tevent_req_create(mem_ctx, &state, struct client_state);
3882         if (req == NULL) {
3883                 return NULL;
3884         }
3885
3886         state->ev = ev;
3887         state->fd = fd;
3888         state->ctdb = ctdb;
3889         state->pnn = pnn;
3890
3891         (void) ctdb_get_peer_pid(fd, &state->pid);
3892
3893         ret = comm_setup(state, ev, fd, client_read_handler, req,
3894                          client_dead_handler, req, &state->comm);
3895         if (ret != 0) {
3896                 tevent_req_error(req, ret);
3897                 return tevent_req_post(req, ev);
3898         }
3899
3900         ret = client_add(ctdb, state->pid, state);
3901         if (ret != 0) {
3902                 tevent_req_error(req, ret);
3903                 return tevent_req_post(req, ev);
3904         }
3905
3906         DEBUG(DEBUG_INFO, ("New client fd=%d\n", fd));
3907
3908         return req;
3909 }
3910
3911 static void client_read_handler(uint8_t *buf, size_t buflen,
3912                                 void *private_data)
3913 {
3914         struct tevent_req *req = talloc_get_type_abort(
3915                 private_data, struct tevent_req);
3916         struct client_state *state = tevent_req_data(
3917                 req, struct client_state);
3918         struct ctdbd_context *ctdb = state->ctdb;
3919         struct ctdb_req_header header;
3920         size_t np;
3921         unsigned int i;
3922         int ret;
3923
3924         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3925         if (ret != 0) {
3926                 return;
3927         }
3928
3929         if (buflen != header.length) {
3930                 return;
3931         }
3932
3933         ret = ctdb_req_header_verify(&header, 0);
3934         if (ret != 0) {
3935                 return;
3936         }
3937
3938         header_fix_pnn(&header, ctdb);
3939
3940         if (header.destnode == CTDB_BROADCAST_ALL) {
3941                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3942                         header.destnode = i;
3943
3944                         ctdb_req_header_push(&header, buf, &np);
3945                         client_process_packet(req, buf, buflen);
3946                 }
3947                 return;
3948         }
3949
3950         if (header.destnode == CTDB_BROADCAST_CONNECTED) {
3951                 for (i=0; i<ctdb->node_map->num_nodes; i++) {
3952                         if (ctdb->node_map->node[i].flags &
3953                             NODE_FLAGS_DISCONNECTED) {
3954                                 continue;
3955                         }
3956
3957                         header.destnode = i;
3958
3959                         ctdb_req_header_push(&header, buf, &np);
3960                         client_process_packet(req, buf, buflen);
3961                 }
3962                 return;
3963         }
3964
3965         if (header.destnode > ctdb->node_map->num_nodes) {
3966                 fprintf(stderr, "Invalid destination pnn 0x%x\n",
3967                         header.destnode);
3968                 return;
3969         }
3970
3971
3972         if (ctdb->node_map->node[header.destnode].flags & NODE_FLAGS_DISCONNECTED) {
3973                 fprintf(stderr, "Packet for disconnected node pnn %u\n",
3974                         header.destnode);
3975                 return;
3976         }
3977
3978         ctdb_req_header_push(&header, buf, &np);
3979         client_process_packet(req, buf, buflen);
3980 }
3981
3982 static void client_dead_handler(void *private_data)
3983 {
3984         struct tevent_req *req = talloc_get_type_abort(
3985                 private_data, struct tevent_req);
3986
3987         tevent_req_done(req);
3988 }
3989
3990 static void client_process_packet(struct tevent_req *req,
3991                                   uint8_t *buf, size_t buflen)
3992 {
3993         struct ctdb_req_header header;
3994         size_t np;
3995         int ret;
3996
3997         ret = ctdb_req_header_pull(buf, buflen, &header, &np);
3998         if (ret != 0) {
3999                 return;
4000         }
4001
4002         switch (header.operation) {
4003         case CTDB_REQ_CALL:
4004                 client_process_call(req, buf, buflen);
4005                 break;
4006
4007         case CTDB_REQ_MESSAGE:
4008                 client_process_message(req, buf, buflen);
4009                 break;
4010
4011         case CTDB_REQ_CONTROL:
4012                 client_process_control(req, buf, buflen);
4013                 break;
4014
4015         default:
4016                 break;
4017         }
4018 }
4019
4020 static void client_process_call(struct tevent_req *req,
4021                                 uint8_t *buf, size_t buflen)
4022 {
4023         struct client_state *state = tevent_req_data(
4024                 req, struct client_state);
4025         struct ctdbd_context *ctdb = state->ctdb;
4026         TALLOC_CTX *mem_ctx;
4027         struct ctdb_req_header header;
4028         struct ctdb_req_call request;
4029         struct ctdb_reply_call reply;
4030         struct database *db;
4031         struct ctdb_ltdb_header hdr;
4032         TDB_DATA data;
4033         int ret;
4034
4035         mem_ctx = talloc_new(state);
4036         if (tevent_req_nomem(mem_ctx, req)) {
4037                 return;
4038         }
4039
4040         ret = ctdb_req_call_pull(buf, buflen, &header, mem_ctx, &request);
4041         if (ret != 0) {
4042                 talloc_free(mem_ctx);
4043                 tevent_req_error(req, ret);
4044                 return;
4045         }
4046
4047         header_fix_pnn(&header, ctdb);
4048
4049         if (header.destnode >= ctdb->node_map->num_nodes) {
4050                 goto fail;
4051         }
4052
4053         DEBUG(DEBUG_INFO, ("call db_id = %u\n", request.db_id));
4054
4055         db = database_find(ctdb->db_map, request.db_id);
4056         if (db == NULL) {
4057                 goto fail;
4058         }
4059
4060         ret = ltdb_fetch(db, request.key, &hdr, mem_ctx, &data);
4061         if (ret != 0) {
4062                 goto fail;
4063         }
4064
4065         /* Fake migration */
4066         if (hdr.dmaster != ctdb->node_map->pnn) {
4067                 hdr.dmaster = ctdb->node_map->pnn;
4068
4069                 ret = ltdb_store(db, request.key, &hdr, data);
4070                 if (ret != 0) {
4071                         goto fail;
4072                 }
4073         }
4074
4075         talloc_free(mem_ctx);
4076
4077         reply.status = 0;
4078         reply.data = tdb_null;
4079
4080         client_send_call(req, &header, &reply);
4081         return;
4082
4083 fail:
4084         talloc_free(mem_ctx);
4085         reply.status = -1;
4086         reply.data = tdb_null;
4087
4088         client_send_call(req, &header, &reply);
4089 }
4090
4091 static void client_process_message(struct tevent_req *req,
4092                                    uint8_t *buf, size_t buflen)
4093 {
4094         struct client_state *state = tevent_req_data(
4095                 req, struct client_state);
4096         struct ctdbd_context *ctdb = state->ctdb;
4097         TALLOC_CTX *mem_ctx;
4098         struct ctdb_req_header header;
4099         struct ctdb_req_message request;
4100         uint64_t srvid;
4101         int ret;
4102
4103         mem_ctx = talloc_new(state);
4104         if (tevent_req_nomem(mem_ctx, req)) {
4105                 return;
4106         }
4107
4108         ret = ctdb_req_message_pull(buf, buflen, &header, mem_ctx, &request);
4109         if (ret != 0) {
4110                 talloc_free(mem_ctx);
4111                 tevent_req_error(req, ret);
4112                 return;
4113         }
4114
4115         header_fix_pnn(&header, ctdb);
4116
4117         if (header.destnode >= ctdb->node_map->num_nodes) {
4118                 /* Many messages are not replied to, so just behave as
4119                  * though this message was not received */
4120                 fprintf(stderr, "Invalid node %d\n", header.destnode);
4121                 talloc_free(mem_ctx);
4122                 return;
4123         }
4124
4125         srvid = request.srvid;
4126         DEBUG(DEBUG_INFO, ("request srvid = 0x%"PRIx64"\n", srvid));
4127
4128         if (srvid == CTDB_SRVID_DISABLE_RECOVERIES) {
4129                 message_disable_recoveries(mem_ctx, req, &header, &request);
4130         } else if (srvid == CTDB_SRVID_TAKEOVER_RUN) {
4131                 message_takeover_run(mem_ctx, req, &header, &request);
4132         } else {
4133                 D_DEBUG("Message id 0x%"PRIx64" not implemented\n", srvid);
4134         }
4135
4136         /* check srvid */
4137         talloc_free(mem_ctx);
4138 }
4139
4140 static void client_process_control(struct tevent_req *req,
4141                                    uint8_t *buf, size_t buflen)
4142 {
4143         struct client_state *state = tevent_req_data(
4144                 req, struct client_state);
4145         struct ctdbd_context *ctdb = state->ctdb;
4146         TALLOC_CTX *mem_ctx;
4147         struct ctdb_req_header header;
4148         struct ctdb_req_control request;
4149         int ret;
4150
4151         mem_ctx = talloc_new(state);
4152         if (tevent_req_nomem(mem_ctx, req)) {
4153                 return;
4154         }
4155
4156         ret = ctdb_req_control_pull(buf, buflen, &header, mem_ctx, &request);
4157         if (ret != 0) {
4158                 talloc_free(mem_ctx);
4159                 tevent_req_error(req, ret);
4160                 return;
4161         }
4162
4163         header_fix_pnn(&header, ctdb);
4164
4165         if (header.destnode >= ctdb->node_map->num_nodes) {
4166                 struct ctdb_reply_control reply;
4167
4168                 reply.rdata.opcode = request.opcode;
4169                 reply.errmsg = "Invalid node";
4170                 reply.status = -1;
4171                 client_send_control(req, &header, &reply);
4172                 return;
4173         }
4174
4175         DEBUG(DEBUG_INFO, ("request opcode = %u, reqid = %u\n",
4176                            request.opcode, header.reqid));
4177
4178         if (fake_control_failure(mem_ctx, req, &header, &request)) {
4179                 goto done;
4180         }
4181
4182         switch (request.opcode) {
4183         case CTDB_CONTROL_PROCESS_EXISTS:
4184                 control_process_exists(mem_ctx, req, &header, &request);
4185                 break;
4186
4187         case CTDB_CONTROL_PING:
4188                 control_ping(mem_ctx, req, &header, &request);
4189                 break;
4190
4191         case CTDB_CONTROL_GETDBPATH:
4192                 control_getdbpath(mem_ctx, req, &header, &request);
4193                 break;
4194
4195         case CTDB_CONTROL_GETVNNMAP:
4196                 control_getvnnmap(mem_ctx, req, &header, &request);
4197                 break;
4198
4199         case CTDB_CONTROL_GET_DEBUG:
4200                 control_get_debug(mem_ctx, req, &header, &request);
4201                 break;
4202
4203         case CTDB_CONTROL_SET_DEBUG:
4204                 control_set_debug(mem_ctx, req, &header, &request);
4205                 break;
4206
4207         case CTDB_CONTROL_GET_DBMAP:
4208                 control_get_dbmap(mem_ctx, req, &header, &request);
4209                 break;
4210
4211         case CTDB_CONTROL_GET_RECMODE:
4212                 control_get_recmode(mem_ctx, req, &header, &request);
4213                 break;
4214
4215         case CTDB_CONTROL_SET_RECMODE:
4216                 control_set_recmode(mem_ctx, req, &header, &request);
4217                 break;
4218
4219         case CTDB_CONTROL_DB_ATTACH:
4220                 control_db_attach(mem_ctx, req, &header, &request);
4221                 break;
4222
4223         case CTDB_CONTROL_REGISTER_SRVID:
4224                 control_register_srvid(mem_ctx, req, &header, &request);
4225                 break;
4226
4227         case CTDB_CONTROL_DEREGISTER_SRVID:
4228                 control_deregister_srvid(mem_ctx, req, &header, &request);
4229                 break;
4230
4231         case CTDB_CONTROL_GET_DBNAME:
4232                 control_get_dbname(mem_ctx, req, &header, &request);
4233                 break;
4234
4235         case CTDB_CONTROL_GET_PID:
4236                 control_get_pid(mem_ctx, req, &header, &request);
4237                 break;
4238
4239         case CTDB_CONTROL_GET_PNN:
4240                 control_get_pnn(mem_ctx, req, &header, &request);
4241                 break;
4242
4243         case CTDB_CONTROL_SHUTDOWN:
4244                 control_shutdown(mem_ctx, req, &header, &request);
4245                 break;
4246
4247         case CTDB_CONTROL_SET_TUNABLE:
4248                 control_set_tunable(mem_ctx, req, &header, &request);
4249                 break;
4250
4251         case CTDB_CONTROL_GET_TUNABLE:
4252                 control_get_tunable(mem_ctx, req, &header, &request);
4253                 break;
4254
4255         case CTDB_CONTROL_LIST_TUNABLES:
4256                 control_list_tunables(mem_ctx, req, &header, &request);
4257                 break;
4258
4259         case CTDB_CONTROL_MODIFY_FLAGS:
4260                 control_modify_flags(mem_ctx, req, &header, &request);
4261                 break;
4262
4263         case CTDB_CONTROL_GET_ALL_TUNABLES:
4264                 control_get_all_tunables(mem_ctx, req, &header, &request);
4265                 break;
4266
4267         case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
4268                 control_db_attach_persistent(mem_ctx, req, &header, &request);
4269                 break;
4270
4271         case CTDB_CONTROL_UPTIME:
4272                 control_uptime(mem_ctx, req, &header, &request);
4273                 break;
4274
4275         case CTDB_CONTROL_RELOAD_NODES_FILE:
4276                 control_reload_nodes_file(mem_ctx, req, &header, &request);
4277                 break;
4278
4279         case CTDB_CONTROL_GET_CAPABILITIES:
4280                 control_get_capabilities(mem_ctx, req, &header, &request);
4281                 break;
4282
4283         case CTDB_CONTROL_RELEASE_IP:
4284                 control_release_ip(mem_ctx, req, &header, &request);
4285                 break;
4286
4287         case CTDB_CONTROL_TAKEOVER_IP:
4288                 control_takeover_ip(mem_ctx, req, &header, &request);
4289                 break;
4290
4291         case CTDB_CONTROL_GET_PUBLIC_IPS:
4292                 control_get_public_ips(mem_ctx, req, &header, &request);
4293                 break;
4294
4295         case CTDB_CONTROL_GET_NODEMAP:
4296                 control_get_nodemap(mem_ctx, req, &header, &request);
4297                 break;
4298
4299         case CTDB_CONTROL_GET_RECLOCK_FILE:
4300                 control_get_reclock_file(mem_ctx, req, &header, &request);
4301                 break;
4302
4303         case CTDB_CONTROL_STOP_NODE:
4304                 control_stop_node(mem_ctx, req, &header, &request);
4305                 break;
4306
4307         case CTDB_CONTROL_CONTINUE_NODE:
4308                 control_continue_node(mem_ctx, req, &header, &request);
4309                 break;
4310
4311         case CTDB_CONTROL_SET_BAN_STATE:
4312                 control_set_ban_state(mem_ctx, req, &header, &request);
4313                 break;
4314
4315         case CTDB_CONTROL_TRANS3_COMMIT:
4316                 control_trans3_commit(mem_ctx, req, &header, &request);
4317                 break;
4318
4319         case CTDB_CONTROL_GET_DB_SEQNUM:
4320                 control_get_db_seqnum(mem_ctx, req, &header, &request);
4321                 break;
4322
4323         case CTDB_CONTROL_DB_GET_HEALTH:
4324                 control_db_get_health(mem_ctx, req, &header, &request);
4325                 break;
4326
4327         case CTDB_CONTROL_GET_PUBLIC_IP_INFO:
4328                 control_get_public_ip_info(mem_ctx, req, &header, &request);
4329                 break;
4330
4331         case CTDB_CONTROL_GET_IFACES:
4332                 control_get_ifaces(mem_ctx, req, &header, &request);
4333                 break;
4334
4335         case CTDB_CONTROL_SET_IFACE_LINK_STATE:
4336                 control_set_iface_link_state(mem_ctx, req, &header, &request);
4337                 break;
4338
4339         case CTDB_CONTROL_SET_DB_READONLY:
4340                 control_set_db_readonly(mem_ctx, req, &header, &request);
4341                 break;
4342
4343         case CTDB_CONTROL_TRAVERSE_START_EXT:
4344                 control_traverse_start_ext(mem_ctx, req, &header, &request);
4345                 break;
4346
4347         case CTDB_CONTROL_SET_DB_STICKY:
4348                 control_set_db_sticky(mem_ctx, req, &header, &request);
4349                 break;
4350
4351         case CTDB_CONTROL_IPREALLOCATED:
4352                 control_ipreallocated(mem_ctx, req, &header, &request);
4353                 break;
4354
4355         case CTDB_CONTROL_GET_RUNSTATE:
4356                 control_get_runstate(mem_ctx, req, &header, &request);
4357                 break;
4358
4359         case CTDB_CONTROL_GET_NODES_FILE:
4360                 control_get_nodes_file(mem_ctx, req, &header, &request);
4361                 break;
4362
4363         case CTDB_CONTROL_DB_OPEN_FLAGS:
4364                 control_db_open_flags(mem_ctx, req, &header, &request);
4365                 break;
4366
4367         case CTDB_CONTROL_DB_ATTACH_REPLICATED:
4368                 control_db_attach_replicated(mem_ctx, req, &header, &request);
4369                 break;
4370
4371         case CTDB_CONTROL_CHECK_PID_SRVID:
4372                 control_check_pid_srvid(mem_ctx, req, &header, &request);
4373                 break;
4374
4375         case CTDB_CONTROL_DISABLE_NODE:
4376                 control_disable_node(mem_ctx, req, &header, &request);
4377                 break;
4378
4379         case CTDB_CONTROL_ENABLE_NODE:
4380                 control_enable_node(mem_ctx, req, &header, &request);
4381                 break;
4382
4383         case CTDB_CONTROL_START_IPREALLOCATE:
4384                 control_start_ipreallocate(mem_ctx, req, &header, &request);
4385                 break;
4386
4387         default:
4388                 if (! (request.flags & CTDB_CTRL_FLAG_NOREPLY)) {
4389                         control_error(mem_ctx, req, &header, &request);
4390                 }
4391                 break;
4392         }
4393
4394 done:
4395         talloc_free(mem_ctx);
4396 }
4397
4398 static int client_recv(struct tevent_req *req, int *perr)
4399 {
4400         struct client_state *state = tevent_req_data(
4401                 req, struct client_state);
4402         int err;
4403
4404         DEBUG(DEBUG_INFO, ("Client done fd=%d\n", state->fd));
4405         close(state->fd);
4406
4407         if (tevent_req_is_unix_error(req, &err)) {
4408                 if (perr != NULL) {
4409                         *perr = err;
4410                 }
4411                 return -1;
4412         }
4413
4414         return state->status;
4415 }
4416
4417 /*
4418  * Fake CTDB server
4419  */
4420
4421 struct server_state {
4422         struct tevent_context *ev;
4423         struct ctdbd_context *ctdb;
4424         struct tevent_timer *leader_broadcast_te;
4425         int fd;
4426 };
4427
4428 static void server_leader_broadcast(struct tevent_context *ev,
4429                                     struct tevent_timer *te,
4430                                     struct timeval current_time,
4431                                     void *private_data);
4432 static void server_new_client(struct tevent_req *subreq);
4433 static void server_client_done(struct tevent_req *subreq);
4434
4435 static struct tevent_req *server_send(TALLOC_CTX *mem_ctx,
4436                                       struct tevent_context *ev,
4437                                       struct ctdbd_context *ctdb,
4438                                       int fd)
4439 {
4440         struct tevent_req *req, *subreq;
4441         struct server_state *state;
4442
4443         req = tevent_req_create(mem_ctx, &state, struct server_state);
4444         if (req == NULL) {
4445                 return NULL;
4446         }
4447
4448         state->ev = ev;
4449         state->ctdb = ctdb;
4450         state->fd = fd;
4451
4452         state->leader_broadcast_te = tevent_add_timer(state->ev,
4453                                                       state,
4454                                                       timeval_current_ofs(0, 0),
4455                                                       server_leader_broadcast,
4456                                                       state);
4457         if (state->leader_broadcast_te == NULL) {
4458                 DBG_WARNING("Failed to set up leader broadcast\n");
4459         }
4460
4461         subreq = accept_send(state, ev, fd);
4462         if (tevent_req_nomem(subreq, req)) {
4463                 return tevent_req_post(req, ev);
4464         }
4465         tevent_req_set_callback(subreq, server_new_client, req);
4466
4467         return req;
4468 }
4469
4470 static void server_leader_broadcast(struct tevent_context *ev,
4471                                     struct tevent_timer *te,
4472                                     struct timeval current_time,
4473                                     void *private_data)
4474 {
4475         struct server_state *state = talloc_get_type_abort(
4476                 private_data, struct server_state);
4477         struct ctdbd_context *ctdb = state->ctdb;
4478         uint32_t leader = ctdb->node_map->recmaster;
4479         TDB_DATA data;
4480         int ret;
4481
4482         if (leader == CTDB_UNKNOWN_PNN) {
4483                 goto done;
4484         }
4485
4486         data.dptr = (uint8_t *)&leader;
4487         data.dsize = sizeof(leader);
4488
4489         ret = srvid_dispatch(ctdb->srv, CTDB_SRVID_LEADER, 0, data);
4490         if (ret != 0) {
4491                 DBG_WARNING("Failed to send leader broadcast, ret=%d\n", ret);
4492         }
4493
4494 done:
4495         state->leader_broadcast_te = tevent_add_timer(state->ev,
4496                                                       state,
4497                                                       timeval_current_ofs(1, 0),
4498                                                       server_leader_broadcast,
4499                                                       state);
4500         if (state->leader_broadcast_te == NULL) {
4501                 DBG_WARNING("Failed to set up leader broadcast\n");
4502         }
4503 }
4504
4505 static void server_new_client(struct tevent_req *subreq)
4506 {
4507         struct tevent_req *req = tevent_req_callback_data(
4508                 subreq, struct tevent_req);
4509         struct server_state *state = tevent_req_data(
4510                 req, struct server_state);
4511         struct ctdbd_context *ctdb = state->ctdb;
4512         int client_fd;
4513         int ret = 0;
4514
4515         client_fd = accept_recv(subreq, NULL, NULL, &ret);
4516         TALLOC_FREE(subreq);
4517         if (client_fd == -1) {
4518                 tevent_req_error(req, ret);
4519                 return;
4520         }
4521
4522         subreq = client_send(state, state->ev, client_fd,
4523                              ctdb, ctdb->node_map->pnn);
4524         if (tevent_req_nomem(subreq, req)) {
4525                 return;
4526         }
4527         tevent_req_set_callback(subreq, server_client_done, req);
4528
4529         ctdb->num_clients += 1;
4530
4531         subreq = accept_send(state, state->ev, state->fd);
4532         if (tevent_req_nomem(subreq, req)) {
4533                 return;
4534         }
4535         tevent_req_set_callback(subreq, server_new_client, req);
4536 }
4537
4538 static void server_client_done(struct tevent_req *subreq)
4539 {
4540         struct tevent_req *req = tevent_req_callback_data(
4541                 subreq, struct tevent_req);
4542         struct server_state *state = tevent_req_data(
4543                 req, struct server_state);
4544         struct ctdbd_context *ctdb = state->ctdb;
4545         int ret = 0;
4546         int status;
4547
4548         status = client_recv(subreq, &ret);
4549         TALLOC_FREE(subreq);
4550         if (status < 0) {
4551                 tevent_req_error(req, ret);
4552                 return;
4553         }
4554
4555         ctdb->num_clients -= 1;
4556
4557         if (status == 99) {
4558                 /* Special status, to shutdown server */
4559                 DEBUG(DEBUG_INFO, ("Shutting down server\n"));
4560                 tevent_req_done(req);
4561         }
4562 }
4563
4564 static bool server_recv(struct tevent_req *req, int *perr)
4565 {
4566         int err;
4567
4568         if (tevent_req_is_unix_error(req, &err)) {
4569                 if (perr != NULL) {
4570                         *perr = err;
4571                 }
4572                 return false;
4573         }
4574         return true;
4575 }
4576
4577 /*
4578  * Main functions
4579  */
4580
4581 static int socket_init(const char *sockpath)
4582 {
4583         struct sockaddr_un addr;
4584         size_t len;
4585         int ret, fd;
4586
4587         memset(&addr, 0, sizeof(addr));
4588         addr.sun_family = AF_UNIX;
4589
4590         len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
4591         if (len >= sizeof(addr.sun_path)) {
4592                 fprintf(stderr, "path too long: %s\n", sockpath);
4593                 return -1;
4594         }
4595
4596         fd = socket(AF_UNIX, SOCK_STREAM, 0);
4597         if (fd == -1) {
4598                 fprintf(stderr, "socket failed - %s\n", sockpath);
4599                 return -1;
4600         }
4601
4602         ret = bind(fd, (struct sockaddr *)&addr, sizeof(addr));
4603         if (ret != 0) {
4604                 fprintf(stderr, "bind failed - %s\n", sockpath);
4605                 goto fail;
4606         }
4607
4608         ret = listen(fd, 10);
4609         if (ret != 0) {
4610                 fprintf(stderr, "listen failed\n");
4611                 goto fail;
4612         }
4613
4614         DEBUG(DEBUG_INFO, ("Socket init done\n"));
4615
4616         return fd;
4617
4618 fail:
4619         if (fd != -1) {
4620                 close(fd);
4621         }
4622         return -1;
4623 }
4624
4625 static struct options {
4626         const char *dbdir;
4627         const char *sockpath;
4628         const char *pidfile;
4629         const char *debuglevel;
4630 } options;
4631
4632 static struct poptOption cmdline_options[] = {
4633         POPT_AUTOHELP
4634         { "dbdir", 'D', POPT_ARG_STRING, &options.dbdir, 0,
4635                 "Database directory", "directory" },
4636         { "socket", 's', POPT_ARG_STRING, &options.sockpath, 0,
4637                 "Unix domain socket path", "filename" },
4638         { "pidfile", 'p', POPT_ARG_STRING, &options.pidfile, 0,
4639                 "pid file", "filename" } ,
4640         { "debug", 'd', POPT_ARG_STRING, &options.debuglevel, 0,
4641                 "debug level", "ERR|WARNING|NOTICE|INFO|DEBUG" } ,
4642         POPT_TABLEEND
4643 };
4644
4645 static void cleanup(void)
4646 {
4647         unlink(options.sockpath);
4648         unlink(options.pidfile);
4649 }
4650
4651 static void signal_handler(int sig)
4652 {
4653         cleanup();
4654         exit(0);
4655 }
4656
4657 static void start_server(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
4658                          struct ctdbd_context *ctdb, int fd, int pfd)
4659 {
4660         struct tevent_req *req;
4661         int ret = 0;
4662         ssize_t len;
4663
4664         atexit(cleanup);
4665         signal(SIGTERM, signal_handler);
4666
4667         req = server_send(mem_ctx, ev, ctdb, fd);
4668         if (req == NULL) {
4669                 fprintf(stderr, "Memory error\n");
4670                 exit(1);
4671         }
4672
4673         len = write(pfd, &ret, sizeof(ret));
4674         if (len != sizeof(ret)) {
4675                 fprintf(stderr, "Failed to send message to parent\n");
4676                 exit(1);
4677         }
4678         close(pfd);
4679
4680         tevent_req_poll(req, ev);
4681
4682         server_recv(req, &ret);
4683         if (ret != 0) {
4684                 exit(1);
4685         }
4686 }
4687
4688 int main(int argc, const char *argv[])
4689 {
4690         TALLOC_CTX *mem_ctx;
4691         struct ctdbd_context *ctdb;
4692         struct tevent_context *ev;
4693         poptContext pc;
4694         int opt, fd, ret, pfd[2];
4695         ssize_t len;
4696         pid_t pid;
4697         FILE *fp;
4698
4699         pc = poptGetContext(argv[0], argc, argv, cmdline_options,
4700                             POPT_CONTEXT_KEEP_FIRST);
4701         while ((opt = poptGetNextOpt(pc)) != -1) {
4702                 fprintf(stderr, "Invalid option %s\n", poptBadOption(pc, 0));
4703                 exit(1);
4704         }
4705
4706         if (options.dbdir == NULL) {
4707                 fprintf(stderr, "Please specify database directory\n");
4708                 poptPrintHelp(pc, stdout, 0);
4709                 exit(1);
4710         }
4711
4712         if (options.sockpath == NULL) {
4713                 fprintf(stderr, "Please specify socket path\n");
4714                 poptPrintHelp(pc, stdout, 0);
4715                 exit(1);
4716         }
4717
4718         if (options.pidfile == NULL) {
4719                 fprintf(stderr, "Please specify pid file\n");
4720                 poptPrintHelp(pc, stdout, 0);
4721                 exit(1);
4722         }
4723
4724         mem_ctx = talloc_new(NULL);
4725         if (mem_ctx == NULL) {
4726                 fprintf(stderr, "Memory error\n");
4727                 exit(1);
4728         }
4729
4730         ret = logging_init(mem_ctx, "file:", options.debuglevel, "fake-ctdbd");
4731         if (ret != 0) {
4732                 fprintf(stderr, "Invalid debug level\n");
4733                 poptPrintHelp(pc, stdout, 0);
4734                 exit(1);
4735         }
4736
4737         ctdb = ctdbd_setup(mem_ctx, options.dbdir);
4738         if (ctdb == NULL) {
4739                 exit(1);
4740         }
4741
4742         if (! ctdbd_verify(ctdb)) {
4743                 exit(1);
4744         }
4745
4746         ev = tevent_context_init(mem_ctx);
4747         if (ev == NULL) {
4748                 fprintf(stderr, "Memory error\n");
4749                 exit(1);
4750         }
4751
4752         fd = socket_init(options.sockpath);
4753         if (fd == -1) {
4754                 exit(1);
4755         }
4756
4757         ret = pipe(pfd);
4758         if (ret != 0) {
4759                 fprintf(stderr, "Failed to create pipe\n");
4760                 cleanup();
4761                 exit(1);
4762         }
4763
4764         pid = fork();
4765         if (pid == -1) {
4766                 fprintf(stderr, "Failed to fork\n");
4767                 cleanup();
4768                 exit(1);
4769         }
4770
4771         if (pid == 0) {
4772                 /* Child */
4773                 close(pfd[0]);
4774                 start_server(mem_ctx, ev, ctdb, fd, pfd[1]);
4775                 exit(1);
4776         }
4777
4778         /* Parent */
4779         close(pfd[1]);
4780
4781         len = read(pfd[0], &ret, sizeof(ret));
4782         close(pfd[0]);
4783         if (len != sizeof(ret)) {
4784                 fprintf(stderr, "len = %zi\n", len);
4785                 fprintf(stderr, "Failed to get message from child\n");
4786                 kill(pid, SIGTERM);
4787                 exit(1);
4788         }
4789
4790         fp = fopen(options.pidfile, "w");
4791         if (fp == NULL) {
4792                 fprintf(stderr, "Failed to open pid file %s\n",
4793                         options.pidfile);
4794                 kill(pid, SIGTERM);
4795                 exit(1);
4796         }
4797         fprintf(fp, "%d\n", pid);
4798         fclose(fp);
4799
4800         return 0;
4801 }