ctdbd_conn: Accept msgs to all registered srvids
[obnox/samba/samba-obnox.git] / source3 / lib / ctdbd_conn.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) 2007 by Volker Lendecke
5    Copyright (C) 2007 by Andrew Tridgell
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "util_tdb.h"
23 #include "serverid.h"
24 #include "ctdbd_conn.h"
25 #include "system/select.h"
26
27 #include "messages.h"
28
29 /*
30  * It is not possible to include ctdb.h and tdb_compat.h (included via
31  * some other include above) without warnings. This fixes those
32  * warnings.
33  */
34
35 #ifdef typesafe_cb
36 #undef typesafe_cb
37 #endif
38
39 #ifdef typesafe_cb_preargs
40 #undef typesafe_cb_preargs
41 #endif
42
43 #ifdef typesafe_cb_postargs
44 #undef typesafe_cb_postargs
45 #endif
46
47 /* paths to these include files come from --with-ctdb= in configure */
48
49 #include "ctdb.h"
50 #include "ctdb_private.h"
51
52 struct ctdbd_connection {
53         struct messaging_context *msg_ctx;
54         uint32_t reqid;
55         uint32_t our_vnn;
56         uint64_t rand_srvid;
57         uint64_t *srvids;
58         int fd;
59         struct tevent_fd *fde;
60
61         bool (*release_ip_handler)(const char *ip_addr, void *private_data);
62         void *release_ip_priv;
63 };
64
65 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
66 {
67         conn->reqid += 1;
68         if (conn->reqid == 0) {
69                 conn->reqid += 1;
70         }
71         return conn->reqid;
72 }
73
74 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
75                               uint32_t vnn, uint32_t opcode,
76                               uint64_t srvid, uint32_t flags, TDB_DATA data,
77                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
78                               int *cstatus);
79
80 /*
81  * exit on fatal communications errors with the ctdbd daemon
82  */
83 static void cluster_fatal(const char *why)
84 {
85         DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
86         /* we don't use smb_panic() as we don't want to delay to write
87            a core file. We need to release this process id immediately
88            so that someone else can take over without getting sharing
89            violations */
90         _exit(1);
91 }
92
93 /*
94  *
95  */
96 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
97 {
98         if (DEBUGLEVEL < 11) {
99                 return;
100         }
101         DEBUGADD(11, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
102                       (int)hdr->length, (int)hdr->ctdb_magic,
103                       (int)hdr->ctdb_version, (int)hdr->generation,
104                       (int)hdr->operation, (int)hdr->reqid));
105 }
106
107 /*
108  * Register a srvid with ctdbd
109  */
110 NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid)
111 {
112
113         NTSTATUS status;
114         int cstatus;
115         size_t num_srvids;
116         uint64_t *tmp;
117
118         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
119                                CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
120                                tdb_null, NULL, NULL, &cstatus);
121         if (!NT_STATUS_IS_OK(status)) {
122                 return status;
123         }
124
125         num_srvids = talloc_array_length(conn->srvids);
126
127         tmp = talloc_realloc(conn, conn->srvids, uint64_t,
128                              num_srvids + 1);
129         if (tmp == NULL) {
130                 return NT_STATUS_NO_MEMORY;
131         }
132         conn->srvids = tmp;
133
134         conn->srvids[num_srvids] = srvid;
135         return NT_STATUS_OK;
136 }
137
138 static bool ctdb_is_our_srvid(struct ctdbd_connection *conn, uint64_t srvid)
139 {
140         size_t i, num_srvids;
141
142         num_srvids = talloc_array_length(conn->srvids);
143
144         for (i=0; i<num_srvids; i++) {
145                 if (srvid == conn->srvids[i]) {
146                         return true;
147                 }
148         }
149         return false;
150 }
151
152 /*
153  * get our vnn from the cluster
154  */
155 static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
156 {
157         int32_t cstatus=-1;
158         NTSTATUS status;
159         status = ctdbd_control(conn,
160                                CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0, 0,
161                                tdb_null, NULL, NULL, &cstatus);
162         if (!NT_STATUS_IS_OK(status)) {
163                 DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
164                 return status;
165         }
166         *vnn = (uint32_t)cstatus;
167         return status;
168 }
169
170 /*
171  * Are we active (i.e. not banned or stopped?)
172  */
173 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
174 {
175         int32_t cstatus=-1;
176         NTSTATUS status;
177         TDB_DATA outdata;
178         struct ctdb_node_map *m;
179         uint32_t failure_flags;
180         bool ret = false;
181         int i;
182
183         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
184                                CTDB_CONTROL_GET_NODEMAP, 0, 0,
185                                tdb_null, talloc_tos(), &outdata, &cstatus);
186         if (!NT_STATUS_IS_OK(status)) {
187                 DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
188                 return false;
189         }
190         if ((cstatus != 0) || (outdata.dptr == NULL)) {
191                 DEBUG(2, ("Received invalid ctdb data\n"));
192                 return false;
193         }
194
195         m = (struct ctdb_node_map *)outdata.dptr;
196
197         for (i=0; i<m->num; i++) {
198                 if (vnn == m->nodes[i].pnn) {
199                         break;
200                 }
201         }
202
203         if (i == m->num) {
204                 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
205                           (int)vnn));
206                 goto fail;
207         }
208
209         failure_flags = NODE_FLAGS_BANNED | NODE_FLAGS_DISCONNECTED
210                 | NODE_FLAGS_PERMANENTLY_DISABLED | NODE_FLAGS_STOPPED;
211
212         if ((m->nodes[i].flags & failure_flags) != 0) {
213                 DEBUG(2, ("Node has status %x, not active\n",
214                           (int)m->nodes[i].flags));
215                 goto fail;
216         }
217
218         ret = true;
219 fail:
220         TALLOC_FREE(outdata.dptr);
221         return ret;
222 }
223
224 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
225 {
226         return conn->our_vnn;
227 }
228
229 const char *lp_ctdbd_socket(void)
230 {
231         const char *ret;
232
233         ret = lp__ctdbd_socket();
234         if (ret != NULL && strlen(ret) > 0) {
235                 return ret;
236         }
237
238         return CTDB_SOCKET;
239 }
240
241 /*
242  * Get us a ctdb connection
243  */
244
245 static int ctdbd_connect(int *pfd)
246 {
247         const char *sockname = lp_ctdbd_socket();
248         struct sockaddr_un addr = { 0, };
249         int fd;
250         socklen_t salen;
251         size_t namelen;
252
253         fd = socket(AF_UNIX, SOCK_STREAM, 0);
254         if (fd == -1) {
255                 int err = errno;
256                 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
257                 return err;
258         }
259
260         addr.sun_family = AF_UNIX;
261
262         namelen = strlcpy(addr.sun_path, sockname, sizeof(addr.sun_path));
263         if (namelen >= sizeof(addr.sun_path)) {
264                 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
265                           sockname));
266                 close(fd);
267                 return ENAMETOOLONG;
268         }
269
270         salen = sizeof(struct sockaddr_un);
271
272         if (connect(fd, (struct sockaddr *)(void *)&addr, salen) == -1) {
273                 int err = errno;
274                 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
275                           strerror(err)));
276                 close(fd);
277                 return err;
278         }
279
280         *pfd = fd;
281         return 0;
282 }
283
284 /*
285  * State necessary to defer an incoming message while we are waiting for a
286  * ctdb reply.
287  */
288
289 struct deferred_msg_state {
290         struct messaging_context *msg_ctx;
291         struct messaging_rec *rec;
292 };
293
294 /*
295  * Timed event handler for the deferred message
296  */
297
298 static void deferred_message_dispatch(struct tevent_context *event_ctx,
299                                       struct tevent_timer *te,
300                                       struct timeval now,
301                                       void *private_data)
302 {
303         struct deferred_msg_state *state = talloc_get_type_abort(
304                 private_data, struct deferred_msg_state);
305
306         messaging_dispatch_rec(state->msg_ctx, state->rec);
307         TALLOC_FREE(state);
308         TALLOC_FREE(te);
309 }
310
311 /*
312  * Fetch a messaging_rec from an incoming ctdb style message
313  */
314
315 static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx,
316                                                      size_t overall_length,
317                                                      struct ctdb_req_message *msg)
318 {
319         struct messaging_rec *result;
320         DATA_BLOB blob;
321         enum ndr_err_code ndr_err;
322
323         if ((overall_length < offsetof(struct ctdb_req_message, data))
324             || (overall_length
325                 < offsetof(struct ctdb_req_message, data) + msg->datalen)) {
326
327                 cluster_fatal("got invalid msg length");
328         }
329
330         if (!(result = talloc(mem_ctx, struct messaging_rec))) {
331                 DEBUG(0, ("talloc failed\n"));
332                 return NULL;
333         }
334
335         blob = data_blob_const(msg->data, msg->datalen);
336
337         ndr_err = ndr_pull_struct_blob(
338                 &blob, result, result,
339                 (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
340
341         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
342                 DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
343                           ndr_errstr(ndr_err)));
344                 TALLOC_FREE(result);
345                 return NULL;
346         }
347
348         if (DEBUGLEVEL >= 11) {
349                 DEBUG(11, ("ctdb_pull_messaging_rec:\n"));
350                 NDR_PRINT_DEBUG(messaging_rec, result);
351         }
352
353         return result;
354 }
355
356 static NTSTATUS ctdb_read_packet(int fd, TALLOC_CTX *mem_ctx,
357                                  struct ctdb_req_header **result)
358 {
359         int timeout = lp_ctdb_timeout();
360         struct ctdb_req_header *req;
361         int ret, revents;
362         uint32_t msglen;
363         NTSTATUS status;
364
365         if (timeout == 0) {
366                 timeout = -1;
367         }
368
369         if (timeout != -1) {
370                 ret = poll_one_fd(fd, POLLIN, timeout, &revents);
371                 if (ret == -1) {
372                         return map_nt_error_from_unix(errno);
373                 }
374                 if (ret == 0) {
375                         return NT_STATUS_IO_TIMEOUT;
376                 }
377                 if (ret != 1) {
378                         return NT_STATUS_UNEXPECTED_IO_ERROR;
379                 }
380         }
381
382         status = read_data(fd, (char *)&msglen, sizeof(msglen));
383         if (!NT_STATUS_IS_OK(status)) {
384                 return status;
385         }
386
387         if (msglen < sizeof(struct ctdb_req_header)) {
388                 return NT_STATUS_UNEXPECTED_IO_ERROR;
389         }
390
391         req = talloc_size(mem_ctx, msglen);
392         if (req == NULL) {
393                 return NT_STATUS_NO_MEMORY;
394         }
395         talloc_set_name_const(req, "struct ctdb_req_header");
396
397         req->length = msglen;
398
399         status = read_data(fd, ((char *)req) + sizeof(msglen),
400                            msglen - sizeof(msglen));
401         if (!NT_STATUS_IS_OK(status)) {
402                 return status;
403         }
404
405         *result = req;
406         return NT_STATUS_OK;
407 }
408
409 /*
410  * Read a full ctdbd request. If we have a messaging context, defer incoming
411  * messages that might come in between.
412  */
413
414 static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
415                               TALLOC_CTX *mem_ctx,
416                               struct ctdb_req_header **result)
417 {
418         struct ctdb_req_header *hdr;
419         NTSTATUS status;
420
421  next_pkt:
422
423         status = ctdb_read_packet(conn->fd, mem_ctx, &hdr);
424         if (!NT_STATUS_IS_OK(status)) {
425                 DEBUG(0, ("ctdb_read_packet failed: %s\n", nt_errstr(status)));
426                 cluster_fatal("ctdbd died\n");
427         }
428
429         DEBUG(11, ("Received ctdb packet\n"));
430         ctdb_packet_dump(hdr);
431
432         if (hdr->operation == CTDB_REQ_MESSAGE) {
433                 struct tevent_timer *evt;
434                 struct deferred_msg_state *msg_state;
435                 struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
436
437                 if (conn->msg_ctx == NULL) {
438                         DEBUG(1, ("Got a message without having a msg ctx, "
439                                   "dropping msg %llu\n",
440                                   (long long unsigned)msg->srvid));
441                         goto next_pkt;
442                 }
443
444                 if ((conn->release_ip_handler != NULL)
445                     && (msg->srvid == CTDB_SRVID_RELEASE_IP)) {
446                         bool ret;
447
448                         /* must be dispatched immediately */
449                         DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n"));
450                         ret = conn->release_ip_handler((const char *)msg->data,
451                                                        conn->release_ip_priv);
452                         TALLOC_FREE(hdr);
453
454                         if (ret) {
455                                 /*
456                                  * We need to release the ip,
457                                  * so return an error to the upper layers.
458                                  *
459                                  * We make sure we don't trigger this again.
460                                  */
461                                 conn->release_ip_handler = NULL;
462                                 conn->release_ip_priv = NULL;
463                                 return NT_STATUS_ADDRESS_CLOSED;
464                         }
465                         goto next_pkt;
466                 }
467
468                 if ((msg->srvid == CTDB_SRVID_RECONFIGURE)
469                     || (msg->srvid == CTDB_SRVID_SAMBA_NOTIFY)) {
470
471                         DEBUG(1, ("ctdb_read_req: Got %s message\n",
472                                   (msg->srvid == CTDB_SRVID_RECONFIGURE)
473                                   ? "cluster reconfigure" : "SAMBA_NOTIFY"));
474
475                         messaging_send(conn->msg_ctx,
476                                        messaging_server_id(conn->msg_ctx),
477                                        MSG_SMB_BRL_VALIDATE, &data_blob_null);
478                         TALLOC_FREE(hdr);
479                         goto next_pkt;
480                 }
481
482                 msg_state = talloc(NULL, struct deferred_msg_state);
483                 if (msg_state == NULL) {
484                         DEBUG(0, ("talloc failed\n"));
485                         TALLOC_FREE(hdr);
486                         goto next_pkt;
487                 }
488
489                 if (!(msg_state->rec = ctdb_pull_messaging_rec(
490                               msg_state, msg->hdr.length, msg))) {
491                         DEBUG(0, ("ctdbd_pull_messaging_rec failed\n"));
492                         TALLOC_FREE(msg_state);
493                         TALLOC_FREE(hdr);
494                         goto next_pkt;
495                 }
496
497                 TALLOC_FREE(hdr);
498
499                 msg_state->msg_ctx = conn->msg_ctx;
500
501                 /*
502                  * We're waiting for a call reply, but an async message has
503                  * crossed. Defer dispatching to the toplevel event loop.
504                  */
505                 evt = tevent_add_timer(messaging_tevent_context(conn->msg_ctx),
506                                       messaging_tevent_context(conn->msg_ctx),
507                                       timeval_zero(),
508                                       deferred_message_dispatch,
509                                       msg_state);
510                 if (evt == NULL) {
511                         DEBUG(0, ("event_add_timed failed\n"));
512                         TALLOC_FREE(msg_state);
513                         TALLOC_FREE(hdr);
514                         goto next_pkt;
515                 }
516
517                 goto next_pkt;
518         }
519
520         if ((reqid != 0) && (hdr->reqid != reqid)) {
521                 /* we got the wrong reply */
522                 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
523                          "been %u\n", hdr->reqid, reqid));
524                 TALLOC_FREE(hdr);
525                 goto next_pkt;
526         }
527
528         *result = talloc_move(mem_ctx, &hdr);
529
530         return NT_STATUS_OK;
531 }
532
533 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
534 {
535         close(c->fd);
536         return 0;
537 }
538 /*
539  * Get us a ctdbd connection
540  */
541
542 static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
543                                       struct ctdbd_connection **pconn)
544 {
545         struct ctdbd_connection *conn;
546         int ret;
547         NTSTATUS status;
548
549         if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
550                 DEBUG(0, ("talloc failed\n"));
551                 return NT_STATUS_NO_MEMORY;
552         }
553
554         ret = ctdbd_connect(&conn->fd);
555         if (ret != 0) {
556                 status = map_nt_error_from_unix(errno);
557                 DEBUG(10, ("ctdbd_connect failed: %s\n", strerror(errno)));
558                 goto fail;
559         }
560         talloc_set_destructor(conn, ctdbd_connection_destructor);
561
562         status = get_cluster_vnn(conn, &conn->our_vnn);
563
564         if (!NT_STATUS_IS_OK(status)) {
565                 DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status)));
566                 goto fail;
567         }
568
569         if (!ctdbd_working(conn, conn->our_vnn)) {
570                 DEBUG(2, ("Node is not working, can not connect\n"));
571                 status = NT_STATUS_INTERNAL_DB_ERROR;
572                 goto fail;
573         }
574
575         generate_random_buffer((unsigned char *)&conn->rand_srvid,
576                                sizeof(conn->rand_srvid));
577
578         status = register_with_ctdbd(conn, conn->rand_srvid);
579
580         if (!NT_STATUS_IS_OK(status)) {
581                 DEBUG(5, ("Could not register random srvid: %s\n",
582                           nt_errstr(status)));
583                 goto fail;
584         }
585
586         *pconn = conn;
587         return NT_STATUS_OK;
588
589  fail:
590         TALLOC_FREE(conn);
591         return status;
592 }
593
594 /*
595  * Get us a ctdbd connection and register us as a process
596  */
597
598 NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
599                                     struct ctdbd_connection **pconn)
600 {
601         struct ctdbd_connection *conn;
602         NTSTATUS status;
603
604         status = ctdbd_init_connection(mem_ctx, &conn);
605
606         if (!NT_STATUS_IS_OK(status)) {
607                 return status;
608         }
609
610         status = register_with_ctdbd(conn, (uint64_t)getpid());
611         if (!NT_STATUS_IS_OK(status)) {
612                 goto fail;
613         }
614
615         status = register_with_ctdbd(conn, MSG_SRVID_SAMBA);
616         if (!NT_STATUS_IS_OK(status)) {
617                 goto fail;
618         }
619
620         status = register_with_ctdbd(conn, CTDB_SRVID_SAMBA_NOTIFY);
621         if (!NT_STATUS_IS_OK(status)) {
622                 goto fail;
623         }
624
625         *pconn = conn;
626         return NT_STATUS_OK;
627
628  fail:
629         TALLOC_FREE(conn);
630         return status;
631 }
632
633 struct messaging_context *ctdb_conn_msg_ctx(struct ctdbd_connection *conn)
634 {
635         return conn->msg_ctx;
636 }
637
638 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
639 {
640         return conn->fd;
641 }
642
643 /*
644  * Packet handler to receive and handle a ctdb message
645  */
646 static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx,
647                                     struct ctdbd_connection *conn,
648                                     struct ctdb_req_header *hdr)
649 {
650         struct ctdb_req_message *msg;
651         struct messaging_rec *msg_rec;
652
653         if (hdr->operation != CTDB_REQ_MESSAGE) {
654                 DEBUG(0, ("Received async msg of type %u, discarding\n",
655                           hdr->operation));
656                 return NT_STATUS_INVALID_PARAMETER;
657         }
658
659         msg = (struct ctdb_req_message *)hdr;
660
661         if ((conn->release_ip_handler != NULL)
662             && (msg->srvid == CTDB_SRVID_RELEASE_IP)) {
663                 bool ret;
664
665                 /* must be dispatched immediately */
666                 DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n"));
667                 ret = conn->release_ip_handler((const char *)msg->data,
668                                                conn->release_ip_priv);
669                 if (ret) {
670                         /*
671                          * We need to release the ip.
672                          *
673                          * We make sure we don't trigger this again.
674                          */
675                         conn->release_ip_handler = NULL;
676                         conn->release_ip_priv = NULL;
677                 }
678                 return NT_STATUS_OK;
679         }
680
681         SMB_ASSERT(conn->msg_ctx != NULL);
682
683         if ((msg->srvid == CTDB_SRVID_RECONFIGURE)
684             || (msg->srvid == CTDB_SRVID_SAMBA_NOTIFY)){
685                 DEBUG(0,("Got cluster reconfigure message\n"));
686                 /*
687                  * when the cluster is reconfigured or someone of the
688                  * family has passed away (SAMBA_NOTIFY), we need to
689                  * clean the brl database
690                  */
691                 messaging_send(conn->msg_ctx,
692                                messaging_server_id(conn->msg_ctx),
693                                MSG_SMB_BRL_VALIDATE, &data_blob_null);
694
695                 return NT_STATUS_OK;
696         }
697
698         if (!ctdb_is_our_srvid(conn, msg->srvid)) {
699                 DEBUG(0,("Got unexpected message with srvid=%llu\n", 
700                          (unsigned long long)msg->srvid));
701                 return NT_STATUS_OK;
702         }
703
704         msg_rec = ctdb_pull_messaging_rec(talloc_tos(), msg->hdr.length, msg);
705         if (msg_rec == NULL) {
706                 DEBUG(10, ("ctdb_pull_messaging_rec failed\n"));
707                 return NT_STATUS_NO_MEMORY;
708         }
709         messaging_dispatch_rec(conn->msg_ctx, msg_rec);
710         return NT_STATUS_OK;
711 }
712
713 /*
714  * The ctdbd socket is readable asynchronuously
715  */
716
717 static void ctdbd_socket_handler(struct tevent_context *event_ctx,
718                                  struct tevent_fd *event,
719                                  uint16 flags,
720                                  void *private_data)
721 {
722         struct ctdbd_connection *conn = talloc_get_type_abort(
723                 private_data, struct ctdbd_connection);
724         struct ctdb_req_header *hdr;
725         NTSTATUS status;
726
727         status = ctdb_read_packet(conn->fd, talloc_tos(), &hdr);
728         if (!NT_STATUS_IS_OK(status)) {
729                 DEBUG(0, ("ctdb_read_packet failed: %s\n", nt_errstr(status)));
730                 cluster_fatal("ctdbd died\n");
731         }
732
733         status = ctdb_handle_message(conn->msg_ctx, conn, hdr);
734         if (!NT_STATUS_IS_OK(status)) {
735                 DEBUG(10, ("could not handle incoming message: %s\n",
736                            nt_errstr(status)));
737         }
738 }
739
740 /*
741  * Prepare a ctdbd connection to receive messages
742  */
743
744 NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn,
745                                 struct messaging_context *msg_ctx)
746 {
747         SMB_ASSERT(conn->msg_ctx == NULL);
748         SMB_ASSERT(conn->fde == NULL);
749
750         if (!(conn->fde = tevent_add_fd(messaging_tevent_context(msg_ctx),
751                                        conn,
752                                        conn->fd,
753                                        TEVENT_FD_READ,
754                                        ctdbd_socket_handler,
755                                        conn))) {
756                 DEBUG(0, ("event_add_fd failed\n"));
757                 return NT_STATUS_NO_MEMORY;
758         }
759
760         conn->msg_ctx = msg_ctx;
761
762         return NT_STATUS_OK;
763 }
764
765 /*
766  * Send a messaging message across a ctdbd
767  */
768
769 NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn,
770                               uint32_t dst_vnn, uint64_t dst_srvid,
771                               struct messaging_rec *msg)
772 {
773         DATA_BLOB blob;
774         NTSTATUS status;
775         enum ndr_err_code ndr_err;
776
777         ndr_err = ndr_push_struct_blob(
778                 &blob, talloc_tos(), msg,
779                 (ndr_push_flags_fn_t)ndr_push_messaging_rec);
780
781         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
782                 DEBUG(0, ("ndr_push_struct_blob failed: %s\n",
783                           ndr_errstr(ndr_err)));
784                 return ndr_map_error2ntstatus(ndr_err);
785         }
786
787         status = ctdbd_messaging_send_blob(conn, dst_vnn, dst_srvid,
788                                            blob.data, blob.length);
789         TALLOC_FREE(blob.data);
790         return status;
791 }
792
793 NTSTATUS ctdbd_messaging_send_blob(struct ctdbd_connection *conn,
794                                    uint32_t dst_vnn, uint64_t dst_srvid,
795                                    const uint8_t *buf, size_t buflen)
796 {
797         struct ctdb_req_message r;
798         struct iovec iov[2];
799         ssize_t nwritten;
800
801         r.hdr.length = offsetof(struct ctdb_req_message, data) + buflen;
802         r.hdr.ctdb_magic = CTDB_MAGIC;
803         r.hdr.ctdb_version = CTDB_PROTOCOL;
804         r.hdr.generation = 1;
805         r.hdr.operation  = CTDB_REQ_MESSAGE;
806         r.hdr.destnode   = dst_vnn;
807         r.hdr.srcnode    = conn->our_vnn;
808         r.hdr.reqid      = 0;
809         r.srvid          = dst_srvid;
810         r.datalen        = buflen;
811
812         DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
813         ctdb_packet_dump(&r.hdr);
814
815         iov[0].iov_base = &r;
816         iov[0].iov_len = offsetof(struct ctdb_req_message, data);
817         iov[1].iov_base = discard_const_p(uint8_t, buf);
818         iov[1].iov_len = buflen;
819
820         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
821         if (nwritten == -1) {
822                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
823                 cluster_fatal("cluster dispatch daemon msg write error\n");
824         }
825
826         return NT_STATUS_OK;
827 }
828
829 /*
830  * send/recv a generic ctdb control message
831  */
832 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
833                               uint32_t vnn, uint32_t opcode,
834                               uint64_t srvid, uint32_t flags,
835                               TDB_DATA data,
836                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
837                               int *cstatus)
838 {
839         struct ctdb_req_control req;
840         struct ctdb_req_header *hdr;
841         struct ctdb_reply_control *reply = NULL;
842         struct ctdbd_connection *new_conn = NULL;
843         struct iovec iov[2];
844         ssize_t nwritten;
845         NTSTATUS status;
846
847         if (conn == NULL) {
848                 status = ctdbd_init_connection(NULL, &new_conn);
849
850                 if (!NT_STATUS_IS_OK(status)) {
851                         DEBUG(10, ("Could not init temp connection: %s\n",
852                                    nt_errstr(status)));
853                         goto fail;
854                 }
855
856                 conn = new_conn;
857         }
858
859         ZERO_STRUCT(req);
860         req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize;
861         req.hdr.ctdb_magic   = CTDB_MAGIC;
862         req.hdr.ctdb_version = CTDB_PROTOCOL;
863         req.hdr.operation    = CTDB_REQ_CONTROL;
864         req.hdr.reqid        = ctdbd_next_reqid(conn);
865         req.hdr.destnode     = vnn;
866         req.opcode           = opcode;
867         req.srvid            = srvid;
868         req.datalen          = data.dsize;
869         req.flags            = flags;
870
871         DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
872         ctdb_packet_dump(&req.hdr);
873
874         iov[0].iov_base = &req;
875         iov[0].iov_len = offsetof(struct ctdb_req_control, data);
876         iov[1].iov_base = data.dptr;
877         iov[1].iov_len = data.dsize;
878
879         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
880         if (nwritten == -1) {
881                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
882                 cluster_fatal("cluster dispatch daemon msg write error\n");
883         }
884
885         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
886                 TALLOC_FREE(new_conn);
887                 if (cstatus) {
888                         *cstatus = 0;
889                 }
890                 return NT_STATUS_OK;
891         }
892
893         status = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
894
895         if (!NT_STATUS_IS_OK(status)) {
896                 DEBUG(10, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
897                 goto fail;
898         }
899
900         if (hdr->operation != CTDB_REPLY_CONTROL) {
901                 DEBUG(0, ("received invalid reply\n"));
902                 goto fail;
903         }
904         reply = (struct ctdb_reply_control *)hdr;
905
906         if (outdata) {
907                 if (!(outdata->dptr = (uint8 *)talloc_memdup(
908                               mem_ctx, reply->data, reply->datalen))) {
909                         TALLOC_FREE(reply);
910                         return NT_STATUS_NO_MEMORY;
911                 }
912                 outdata->dsize = reply->datalen;
913         }
914         if (cstatus) {
915                 (*cstatus) = reply->status;
916         }
917
918         status = NT_STATUS_OK;
919
920  fail:
921         TALLOC_FREE(new_conn);
922         TALLOC_FREE(reply);
923         return status;
924 }
925
926 /*
927  * see if a remote process exists
928  */
929 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn, pid_t pid)
930 {
931         struct server_id id;
932         bool result;
933
934         id.pid = pid;
935         id.vnn = vnn;
936
937         if (!ctdb_processes_exist(conn, &id, 1, &result)) {
938                 DEBUG(10, ("ctdb_processes_exist failed\n"));
939                 return false;
940         }
941         return result;
942 }
943
944 bool ctdb_processes_exist(struct ctdbd_connection *conn,
945                           const struct server_id *pids, int num_pids,
946                           bool *results)
947 {
948         TALLOC_CTX *frame = talloc_stackframe();
949         int i, num_received;
950         NTSTATUS status;
951         uint32_t *reqids;
952         bool result = false;
953
954         reqids = talloc_array(talloc_tos(), uint32_t, num_pids);
955         if (reqids == NULL) {
956                 goto fail;
957         }
958
959         for (i=0; i<num_pids; i++) {
960                 struct ctdb_req_control req;
961                 pid_t pid;
962                 struct iovec iov[2];
963                 ssize_t nwritten;
964
965                 results[i] = false;
966                 reqids[i] = ctdbd_next_reqid(conn);
967
968                 ZERO_STRUCT(req);
969
970                 /*
971                  * pids[i].pid is uint64_t, scale down to pid_t which
972                  * is the wire protocol towards ctdb.
973                  */
974                 pid = pids[i].pid;
975
976                 DEBUG(10, ("Requesting PID %d/%d, reqid=%d\n",
977                            (int)pids[i].vnn, (int)pid,
978                            (int)reqids[i]));
979
980                 req.hdr.length = offsetof(struct ctdb_req_control, data);
981                 req.hdr.length += sizeof(pid);
982                 req.hdr.ctdb_magic   = CTDB_MAGIC;
983                 req.hdr.ctdb_version = CTDB_PROTOCOL;
984                 req.hdr.operation    = CTDB_REQ_CONTROL;
985                 req.hdr.reqid        = reqids[i];
986                 req.hdr.destnode     = pids[i].vnn;
987                 req.opcode           = CTDB_CONTROL_PROCESS_EXISTS;
988                 req.srvid            = 0;
989                 req.datalen          = sizeof(pid);
990                 req.flags            = 0;
991
992                 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
993                 ctdb_packet_dump(&req.hdr);
994
995                 iov[0].iov_base = &req;
996                 iov[0].iov_len = offsetof(struct ctdb_req_control, data);
997                 iov[1].iov_base = &pid;
998                 iov[1].iov_len = sizeof(pid);
999
1000                 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
1001                 if (nwritten == -1) {
1002                         status = map_nt_error_from_unix(errno);
1003                         DEBUG(10, ("write_data_iov failed: %s\n",
1004                                    strerror(errno)));
1005                         goto fail;
1006                 }
1007         }
1008
1009         num_received = 0;
1010
1011         while (num_received < num_pids) {
1012                 struct ctdb_req_header *hdr;
1013                 struct ctdb_reply_control *reply;
1014                 uint32_t reqid;
1015
1016                 status = ctdb_read_req(conn, 0, talloc_tos(), &hdr);
1017                 if (!NT_STATUS_IS_OK(status)) {
1018                         DEBUG(10, ("ctdb_read_req failed: %s\n",
1019                                    nt_errstr(status)));
1020                         goto fail;
1021                 }
1022
1023                 if (hdr->operation != CTDB_REPLY_CONTROL) {
1024                         DEBUG(10, ("Received invalid reply\n"));
1025                         goto fail;
1026                 }
1027                 reply = (struct ctdb_reply_control *)hdr;
1028
1029                 reqid = reply->hdr.reqid;
1030
1031                 DEBUG(10, ("Received reqid %d\n", (int)reqid));
1032
1033                 for (i=0; i<num_pids; i++) {
1034                         if (reqid == reqids[i]) {
1035                                 break;
1036                         }
1037                 }
1038                 if (i == num_pids) {
1039                         DEBUG(10, ("Received unknown record number %u\n",
1040                                    (unsigned)reqid));
1041                         goto fail;
1042                 }
1043                 results[i] = ((reply->status) == 0);
1044                 TALLOC_FREE(reply);
1045                 num_received += 1;
1046         }
1047
1048         result = true;
1049 fail:
1050         TALLOC_FREE(frame);
1051         return result;
1052 }
1053
1054 struct ctdb_vnn_list {
1055         uint32_t vnn;
1056         uint32_t reqid;
1057         unsigned num_srvids;
1058         unsigned num_filled;
1059         uint64_t *srvids;
1060         unsigned *pid_indexes;
1061 };
1062
1063 /*
1064  * Get a list of all vnns mentioned in a list of
1065  * server_ids. vnn_indexes tells where in the vnns array we have to
1066  * place the pids.
1067  */
1068 static bool ctdb_collect_vnns(TALLOC_CTX *mem_ctx,
1069                               const struct server_id *pids, unsigned num_pids,
1070                               struct ctdb_vnn_list **pvnns,
1071                               unsigned *pnum_vnns)
1072 {
1073         struct ctdb_vnn_list *vnns = NULL;
1074         unsigned *vnn_indexes = NULL;
1075         unsigned i, num_vnns = 0;
1076
1077         vnn_indexes = talloc_array(mem_ctx, unsigned, num_pids);
1078         if (vnn_indexes == NULL) {
1079                 DEBUG(1, ("talloc_array failed\n"));
1080                 goto fail;
1081         }
1082
1083         for (i=0; i<num_pids; i++) {
1084                 unsigned j;
1085                 uint32_t vnn = pids[i].vnn;
1086
1087                 for (j=0; j<num_vnns; j++) {
1088                         if (vnn == vnns[j].vnn) {
1089                                 break;
1090                         }
1091                 }
1092                 vnn_indexes[i] = j;
1093
1094                 if (j < num_vnns) {
1095                         /*
1096                          * Already in the array
1097                          */
1098                         vnns[j].num_srvids += 1;
1099                         continue;
1100                 }
1101                 vnns = talloc_realloc(mem_ctx, vnns, struct ctdb_vnn_list,
1102                                       num_vnns+1);
1103                 if (vnns == NULL) {
1104                         DEBUG(1, ("talloc_realloc failed\n"));
1105                         goto fail;
1106                 }
1107                 vnns[num_vnns].vnn = vnn;
1108                 vnns[num_vnns].num_srvids = 1;
1109                 vnns[num_vnns].num_filled = 0;
1110                 num_vnns += 1;
1111         }
1112         for (i=0; i<num_vnns; i++) {
1113                 struct ctdb_vnn_list *vnn = &vnns[i];
1114
1115                 vnn->srvids = talloc_array(vnns, uint64_t, vnn->num_srvids);
1116                 if (vnn->srvids == NULL) {
1117                         DEBUG(1, ("talloc_array failed\n"));
1118                         goto fail;
1119                 }
1120                 vnn->pid_indexes = talloc_array(vnns, unsigned,
1121                                                 vnn->num_srvids);
1122                 if (vnn->pid_indexes == NULL) {
1123                         DEBUG(1, ("talloc_array failed\n"));
1124                         goto fail;
1125                 }
1126         }
1127         for (i=0; i<num_pids; i++) {
1128                 struct ctdb_vnn_list *vnn = &vnns[vnn_indexes[i]];
1129                 vnn->srvids[vnn->num_filled] = pids[i].unique_id;
1130                 vnn->pid_indexes[vnn->num_filled] = i;
1131                 vnn->num_filled += 1;
1132         }
1133
1134         TALLOC_FREE(vnn_indexes);
1135         *pvnns = vnns;
1136         *pnum_vnns = num_vnns;
1137         return true;
1138 fail:
1139         TALLOC_FREE(vnns);
1140         TALLOC_FREE(vnn_indexes);
1141         return false;
1142 }
1143
1144 bool ctdb_serverids_exist_supported(struct ctdbd_connection *conn)
1145 {
1146         return true;
1147 }
1148
1149 bool ctdb_serverids_exist(struct ctdbd_connection *conn,
1150                           const struct server_id *pids, unsigned num_pids,
1151                           bool *results)
1152 {
1153         unsigned i, num_received;
1154         NTSTATUS status;
1155         struct ctdb_vnn_list *vnns = NULL;
1156         unsigned num_vnns;
1157
1158         if (!ctdb_collect_vnns(talloc_tos(), pids, num_pids,
1159                                &vnns, &num_vnns)) {
1160                 DEBUG(1, ("ctdb_collect_vnns failed\n"));
1161                 goto fail;
1162         }
1163
1164         for (i=0; i<num_vnns; i++) {
1165                 struct ctdb_vnn_list *vnn = &vnns[i];
1166                 struct ctdb_req_control req;
1167                 struct iovec iov[2];
1168                 ssize_t nwritten;
1169
1170                 vnn->reqid = ctdbd_next_reqid(conn);
1171
1172                 ZERO_STRUCT(req);
1173
1174                 DEBUG(10, ("Requesting VNN %d, reqid=%d, num_srvids=%u\n",
1175                            (int)vnn->vnn, (int)vnn->reqid, vnn->num_srvids));
1176
1177                 req.hdr.length = offsetof(struct ctdb_req_control, data);
1178                 req.hdr.ctdb_magic   = CTDB_MAGIC;
1179                 req.hdr.ctdb_version = CTDB_PROTOCOL;
1180                 req.hdr.operation    = CTDB_REQ_CONTROL;
1181                 req.hdr.reqid        = vnn->reqid;
1182                 req.hdr.destnode     = vnn->vnn;
1183                 req.opcode           = CTDB_CONTROL_CHECK_SRVIDS;
1184                 req.srvid            = 0;
1185                 req.datalen          = sizeof(uint64_t) * vnn->num_srvids;
1186                 req.hdr.length      += req.datalen;
1187                 req.flags            = 0;
1188
1189                 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
1190                 ctdb_packet_dump(&req.hdr);
1191
1192                 iov[0].iov_base = &req;
1193                 iov[0].iov_len = offsetof(struct ctdb_req_control, data);
1194                 iov[1].iov_base = vnn->srvids;
1195                 iov[1].iov_len = req.datalen;
1196
1197                 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
1198                 if (nwritten == -1) {
1199                         status = map_nt_error_from_unix(errno);
1200                         DEBUG(10, ("write_data_iov failed: %s\n",
1201                                    strerror(errno)));
1202                         goto fail;
1203                 }
1204         }
1205
1206         num_received = 0;
1207
1208         while (num_received < num_vnns) {
1209                 struct ctdb_req_header *hdr;
1210                 struct ctdb_reply_control *reply;
1211                 struct ctdb_vnn_list *vnn;
1212                 uint32_t reqid;
1213                 uint8_t *reply_data;
1214
1215                 status = ctdb_read_req(conn, 0, talloc_tos(), &hdr);
1216                 if (!NT_STATUS_IS_OK(status)) {
1217                         DEBUG(1, ("ctdb_read_req failed: %s\n",
1218                                   nt_errstr(status)));
1219                         goto fail;
1220                 }
1221
1222                 if (hdr->operation != CTDB_REPLY_CONTROL) {
1223                         DEBUG(1, ("Received invalid reply %u\n",
1224                                   (unsigned)reply->hdr.operation));
1225                         goto fail;
1226                 }
1227                 reply = (struct ctdb_reply_control *)hdr;
1228
1229                 reqid = reply->hdr.reqid;
1230
1231                 DEBUG(10, ("Received reqid %d\n", (int)reqid));
1232
1233                 for (i=0; i<num_vnns; i++) {
1234                         if (reqid == vnns[i].reqid) {
1235                                 break;
1236                         }
1237                 }
1238                 if (i == num_vnns) {
1239                         DEBUG(1, ("Received unknown reqid number %u\n",
1240                                   (unsigned)reqid));
1241                         goto fail;
1242                 }
1243
1244                 DEBUG(10, ("Found index %u\n", i));
1245
1246                 vnn = &vnns[i];
1247
1248                 DEBUG(10, ("Received vnn %u, vnn->num_srvids %u, datalen %u\n",
1249                            (unsigned)vnn->vnn, vnn->num_srvids,
1250                            (unsigned)reply->datalen));
1251
1252                 if (reply->datalen >= ((vnn->num_srvids+7)/8)) {
1253                         /*
1254                          * Got a real reply
1255                          */
1256                         reply_data = reply->data;
1257                 } else {
1258                         /*
1259                          * Got an error reply
1260                          */
1261                         DEBUG(5, ("Received short reply len %d, status %u, "
1262                                   "errorlen %u\n",
1263                                   (unsigned)reply->datalen,
1264                                   (unsigned)reply->status,
1265                                   (unsigned)reply->errorlen));
1266                         dump_data(5, reply->data, reply->errorlen);
1267
1268                         /*
1269                          * This will trigger everything set to false
1270                          */
1271                         reply_data = NULL;
1272                 }
1273
1274                 for (i=0; i<vnn->num_srvids; i++) {
1275                         int idx = vnn->pid_indexes[i];
1276
1277                         if (pids[i].unique_id ==
1278                             SERVERID_UNIQUE_ID_NOT_TO_VERIFY) {
1279                                 results[idx] = true;
1280                                 continue;
1281                         }
1282                         results[idx] =
1283                                 (reply_data != NULL) &&
1284                                 ((reply_data[i/8] & (1<<(i%8))) != 0);
1285                 }
1286
1287                 TALLOC_FREE(reply);
1288                 num_received += 1;
1289         }
1290
1291         TALLOC_FREE(vnns);
1292         return true;
1293 fail:
1294         cluster_fatal("serverids_exist failed");
1295         return false;
1296 }
1297
1298 /*
1299  * Get a db path
1300  */
1301 char *ctdbd_dbpath(struct ctdbd_connection *conn,
1302                    TALLOC_CTX *mem_ctx, uint32_t db_id)
1303 {
1304         NTSTATUS status;
1305         TDB_DATA data;
1306         int32_t cstatus;
1307
1308         data.dptr = (uint8_t*)&db_id;
1309         data.dsize = sizeof(db_id);
1310
1311         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1312                                CTDB_CONTROL_GETDBPATH, 0, 0, data, 
1313                                mem_ctx, &data, &cstatus);
1314         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
1315                 DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n"));
1316                 return NULL;
1317         }
1318
1319         return (char *)data.dptr;
1320 }
1321
1322 /*
1323  * attach to a ctdb database
1324  */
1325 NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn,
1326                          const char *name, uint32_t *db_id, int tdb_flags)
1327 {
1328         NTSTATUS status;
1329         TDB_DATA data;
1330         int32_t cstatus;
1331         bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
1332
1333         data = string_term_tdb_data(name);
1334
1335         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1336                                persistent
1337                                ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
1338                                : CTDB_CONTROL_DB_ATTACH,
1339                                tdb_flags, 0, data, NULL, &data, &cstatus);
1340         if (!NT_STATUS_IS_OK(status)) {
1341                 DEBUG(0, (__location__ " ctdb_control for db_attach "
1342                           "failed: %s\n", nt_errstr(status)));
1343                 return status;
1344         }
1345
1346         if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
1347                 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
1348                 return NT_STATUS_INTERNAL_ERROR;
1349         }
1350
1351         *db_id = *(uint32_t *)data.dptr;
1352         talloc_free(data.dptr);
1353
1354         if (!(tdb_flags & TDB_SEQNUM)) {
1355                 return NT_STATUS_OK;
1356         }
1357
1358         data.dptr = (uint8_t *)db_id;
1359         data.dsize = sizeof(*db_id);
1360
1361         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1362                                CTDB_CONTROL_ENABLE_SEQNUM, 0, 0, data, 
1363                                NULL, NULL, &cstatus);
1364         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
1365                 DEBUG(0,(__location__ " ctdb_control for enable seqnum "
1366                          "failed\n"));
1367                 return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR :
1368                         status;
1369         }
1370
1371         return NT_STATUS_OK;
1372 }
1373
1374 /*
1375  * force the migration of a record to this node
1376  */
1377 NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id,
1378                        TDB_DATA key)
1379 {
1380         struct ctdb_req_call req;
1381         struct ctdb_req_header *hdr;
1382         struct iovec iov[2];
1383         ssize_t nwritten;
1384         NTSTATUS status;
1385
1386         ZERO_STRUCT(req);
1387
1388         req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1389         req.hdr.ctdb_magic   = CTDB_MAGIC;
1390         req.hdr.ctdb_version = CTDB_PROTOCOL;
1391         req.hdr.operation    = CTDB_REQ_CALL;
1392         req.hdr.reqid        = ctdbd_next_reqid(conn);
1393         req.flags            = CTDB_IMMEDIATE_MIGRATION;
1394         req.callid           = CTDB_NULL_FUNC;
1395         req.db_id            = db_id;
1396         req.keylen           = key.dsize;
1397
1398         DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
1399         ctdb_packet_dump(&req.hdr);
1400
1401         iov[0].iov_base = &req;
1402         iov[0].iov_len = offsetof(struct ctdb_req_call, data);
1403         iov[1].iov_base = key.dptr;
1404         iov[1].iov_len = key.dsize;
1405
1406         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
1407         if (nwritten == -1) {
1408                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
1409                 cluster_fatal("cluster dispatch daemon msg write error\n");
1410         }
1411
1412         status = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
1413
1414         if (!NT_STATUS_IS_OK(status)) {
1415                 DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
1416                 goto fail;
1417         }
1418
1419         if (hdr->operation != CTDB_REPLY_CALL) {
1420                 DEBUG(0, ("received invalid reply\n"));
1421                 status = NT_STATUS_INTERNAL_ERROR;
1422                 goto fail;
1423         }
1424
1425         status = NT_STATUS_OK;
1426  fail:
1427
1428         TALLOC_FREE(hdr);
1429         return status;
1430 }
1431
1432 /*
1433  * Fetch a record and parse it
1434  */
1435 NTSTATUS ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
1436                      TDB_DATA key, bool local_copy,
1437                      void (*parser)(TDB_DATA key, TDB_DATA data,
1438                                     void *private_data),
1439                      void *private_data)
1440 {
1441         struct ctdb_req_call req;
1442         struct ctdb_req_header *hdr = NULL;
1443         struct ctdb_reply_call *reply;
1444         struct iovec iov[2];
1445         ssize_t nwritten;
1446         NTSTATUS status;
1447         uint32_t flags;
1448
1449         flags = local_copy ? CTDB_WANT_READONLY : 0;
1450
1451         ZERO_STRUCT(req);
1452
1453         req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1454         req.hdr.ctdb_magic   = CTDB_MAGIC;
1455         req.hdr.ctdb_version = CTDB_PROTOCOL;
1456         req.hdr.operation    = CTDB_REQ_CALL;
1457         req.hdr.reqid        = ctdbd_next_reqid(conn);
1458         req.flags            = flags;
1459         req.callid           = CTDB_FETCH_FUNC;
1460         req.db_id            = db_id;
1461         req.keylen           = key.dsize;
1462
1463         iov[0].iov_base = &req;
1464         iov[0].iov_len = offsetof(struct ctdb_req_call, data);
1465         iov[1].iov_base = key.dptr;
1466         iov[1].iov_len = key.dsize;
1467
1468         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
1469         if (nwritten == -1) {
1470                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
1471                 cluster_fatal("cluster dispatch daemon msg write error\n");
1472         }
1473
1474         status = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
1475
1476         if (!NT_STATUS_IS_OK(status)) {
1477                 DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
1478                 goto fail;
1479         }
1480
1481         if (hdr->operation != CTDB_REPLY_CALL) {
1482                 DEBUG(0, ("received invalid reply\n"));
1483                 status = NT_STATUS_INTERNAL_ERROR;
1484                 goto fail;
1485         }
1486         reply = (struct ctdb_reply_call *)hdr;
1487
1488         if (reply->datalen == 0) {
1489                 /*
1490                  * Treat an empty record as non-existing
1491                  */
1492                 status = NT_STATUS_NOT_FOUND;
1493                 goto fail;
1494         }
1495
1496         parser(key, make_tdb_data(&reply->data[0], reply->datalen),
1497                private_data);
1498
1499         status = NT_STATUS_OK;
1500  fail:
1501         TALLOC_FREE(hdr);
1502         return status;
1503 }
1504
1505 /*
1506   Traverse a ctdb database. This uses a kind-of hackish way to open a second
1507   connection to ctdbd to avoid the hairy recursive and async problems with
1508   everything in-line.
1509 */
1510
1511 NTSTATUS ctdbd_traverse(uint32_t db_id,
1512                         void (*fn)(TDB_DATA key, TDB_DATA data,
1513                                    void *private_data),
1514                         void *private_data)
1515 {
1516         struct ctdbd_connection *conn;
1517         NTSTATUS status;
1518
1519         TDB_DATA key, data;
1520         struct ctdb_traverse_start t;
1521         int cstatus;
1522
1523         become_root();
1524         status = ctdbd_init_connection(NULL, &conn);
1525         unbecome_root();
1526         if (!NT_STATUS_IS_OK(status)) {
1527                 DEBUG(0, ("ctdbd_init_connection failed: %s\n",
1528                           nt_errstr(status)));
1529                 return status;
1530         }
1531
1532         t.db_id = db_id;
1533         t.srvid = conn->rand_srvid;
1534         t.reqid = ctdbd_next_reqid(conn);
1535
1536         data.dptr = (uint8_t *)&t;
1537         data.dsize = sizeof(t);
1538
1539         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1540                                CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, 0,
1541                                data, NULL, NULL, &cstatus);
1542
1543         if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1544
1545                 DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status),
1546                          cstatus));
1547
1548                 if (NT_STATUS_IS_OK(status)) {
1549                         /*
1550                          * We need a mapping here
1551                          */
1552                         status = NT_STATUS_UNSUCCESSFUL;
1553                 }
1554                 TALLOC_FREE(conn);
1555                 return status;
1556         }
1557
1558         while (True) {
1559                 struct ctdb_req_header *hdr;
1560                 struct ctdb_req_message *m;
1561                 struct ctdb_rec_data *d;
1562
1563                 status = ctdb_read_packet(conn->fd, conn, &hdr);
1564                 if (!NT_STATUS_IS_OK(status)) {
1565                         DEBUG(0, ("ctdb_read_packet failed: %s\n",
1566                                   nt_errstr(status)));
1567                         cluster_fatal("ctdbd died\n");
1568                 }
1569
1570                 if (hdr->operation != CTDB_REQ_MESSAGE) {
1571                         DEBUG(0, ("Got operation %u, expected a message\n",
1572                                   (unsigned)hdr->operation));
1573                         TALLOC_FREE(conn);
1574                         return NT_STATUS_UNEXPECTED_IO_ERROR;
1575                 }
1576
1577                 m = (struct ctdb_req_message *)hdr;
1578                 d = (struct ctdb_rec_data *)&m->data[0];
1579                 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1580                         DEBUG(0, ("Got invalid traverse data of length %d\n",
1581                                   (int)m->datalen));
1582                         TALLOC_FREE(conn);
1583                         return NT_STATUS_UNEXPECTED_IO_ERROR;
1584                 }
1585
1586                 key.dsize = d->keylen;
1587                 key.dptr  = &d->data[0];
1588                 data.dsize = d->datalen;
1589                 data.dptr = &d->data[d->keylen];
1590
1591                 if (key.dsize == 0 && data.dsize == 0) {
1592                         /* end of traverse */
1593                         TALLOC_FREE(conn);
1594                         return NT_STATUS_OK;
1595                 }
1596
1597                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1598                         DEBUG(0, ("Got invalid ltdb header length %d\n",
1599                                   (int)data.dsize));
1600                         TALLOC_FREE(conn);
1601                         return NT_STATUS_UNEXPECTED_IO_ERROR;
1602                 }
1603                 data.dsize -= sizeof(struct ctdb_ltdb_header);
1604                 data.dptr += sizeof(struct ctdb_ltdb_header);
1605
1606                 if (fn != NULL) {
1607                         fn(key, data, private_data);
1608                 }
1609         }
1610         return NT_STATUS_OK;
1611 }
1612
1613 /*
1614    This is used to canonicalize a ctdb_sock_addr structure.
1615 */
1616 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1617                                       struct sockaddr_storage *out)
1618 {
1619         memcpy(out, in, sizeof (*out));
1620
1621 #ifdef HAVE_IPV6
1622         if (in->ss_family == AF_INET6) {
1623                 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1624                 const struct sockaddr_in6 *in6 =
1625                         (const struct sockaddr_in6 *)in;
1626                 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1627                 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1628                         memset(out, 0, sizeof(*out));
1629 #ifdef HAVE_SOCK_SIN_LEN
1630                         out4->sin_len = sizeof(*out);
1631 #endif
1632                         out4->sin_family = AF_INET;
1633                         out4->sin_port   = in6->sin6_port;
1634                         memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1635                 }
1636         }
1637 #endif
1638 }
1639
1640 /*
1641  * Register us as a server for a particular tcp connection
1642  */
1643
1644 NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn,
1645                             const struct sockaddr_storage *_server,
1646                             const struct sockaddr_storage *_client,
1647                             bool (*release_ip_handler)(const char *ip_addr,
1648                                                        void *private_data),
1649                             void *private_data)
1650 {
1651         /*
1652          * we still use ctdb_control_tcp for ipv4
1653          * because we want to work against older ctdb
1654          * versions at runtime
1655          */
1656         struct ctdb_control_tcp p4;
1657         struct ctdb_control_tcp_addr p;
1658         TDB_DATA data;
1659         NTSTATUS status;
1660         struct sockaddr_storage client;
1661         struct sockaddr_storage server;
1662
1663         /*
1664          * Only one connection so far
1665          */
1666         SMB_ASSERT(conn->release_ip_handler == NULL);
1667
1668         smbd_ctdb_canonicalize_ip(_client, &client);
1669         smbd_ctdb_canonicalize_ip(_server, &server);
1670
1671         switch (client.ss_family) {
1672         case AF_INET:
1673                 memcpy(&p4.dest, &server, sizeof(p4.dest));
1674                 memcpy(&p4.src, &client, sizeof(p4.src));
1675                 data.dptr = (uint8_t *)&p4;
1676                 data.dsize = sizeof(p4);
1677                 break;
1678         case AF_INET6:
1679                 memcpy(&p.dest.ip6, &server, sizeof(p.dest.ip6));
1680                 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1681                 data.dptr = (uint8_t *)&p;
1682                 data.dsize = sizeof(p);
1683                 break;
1684         default:
1685                 return NT_STATUS_INTERNAL_ERROR;
1686         }
1687
1688         conn->release_ip_handler = release_ip_handler;
1689         conn->release_ip_priv = private_data;
1690
1691         /*
1692          * We want to be told about IP releases
1693          */
1694
1695         status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP);
1696         if (!NT_STATUS_IS_OK(status)) {
1697                 return status;
1698         }
1699
1700         /*
1701          * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1702          * can send an extra ack to trigger a reset for our client, so it
1703          * immediately reconnects
1704          */
1705         return ctdbd_control(conn, CTDB_CURRENT_NODE, 
1706                              CTDB_CONTROL_TCP_CLIENT, 0,
1707                              CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL);
1708 }
1709
1710 /*
1711  * We want to handle reconfigure events
1712  */
1713 NTSTATUS ctdbd_register_reconfigure(struct ctdbd_connection *conn)
1714 {
1715         return register_with_ctdbd(conn, CTDB_SRVID_RECONFIGURE);
1716 }
1717
1718 /*
1719   call a control on the local node
1720  */
1721 NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1722                              uint64_t srvid, uint32_t flags, TDB_DATA data,
1723                              TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1724                              int *cstatus)
1725 {
1726         return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data, mem_ctx, outdata, cstatus);
1727 }
1728
1729 NTSTATUS ctdb_watch_us(struct ctdbd_connection *conn)
1730 {
1731         struct ctdb_client_notify_register reg_data;
1732         size_t struct_len;
1733         NTSTATUS status;
1734         int cstatus;
1735
1736         reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1737         reg_data.len = 1;
1738         reg_data.notify_data[0] = 0;
1739
1740         struct_len = offsetof(struct ctdb_client_notify_register,
1741                               notify_data) + reg_data.len;
1742
1743         status = ctdbd_control_local(
1744                 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1745                 make_tdb_data((uint8_t *)&reg_data, struct_len),
1746                 NULL, NULL, &cstatus);
1747         if (!NT_STATUS_IS_OK(status)) {
1748                 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1749                           nt_errstr(status)));
1750         }
1751         return status;
1752 }
1753
1754 NTSTATUS ctdb_unwatch(struct ctdbd_connection *conn)
1755 {
1756         struct ctdb_client_notify_deregister dereg_data;
1757         NTSTATUS status;
1758         int cstatus;
1759
1760         dereg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1761
1762         status = ctdbd_control_local(
1763                 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1764                 make_tdb_data((uint8_t *)&dereg_data, sizeof(dereg_data)),
1765                 NULL, NULL, &cstatus);
1766         if (!NT_STATUS_IS_OK(status)) {
1767                 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1768                           nt_errstr(status)));
1769         }
1770         return status;
1771 }
1772
1773 NTSTATUS ctdbd_probe(void)
1774 {
1775         /*
1776          * Do a very early check if ctdbd is around to avoid an abort and core
1777          * later
1778          */
1779         struct ctdbd_connection *conn = NULL;
1780         NTSTATUS status;
1781
1782         status = ctdbd_messaging_connection(talloc_tos(), &conn);
1783
1784         /*
1785          * We only care if we can connect.
1786          */
1787         TALLOC_FREE(conn);
1788
1789         return status;
1790 }