lib: Use ctdbd_control_unix in ctdbd_dbpath
[samba.git] / source3 / lib / ctdbd_conn.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) 2007 by Volker Lendecke
5    Copyright (C) 2007 by Andrew Tridgell
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "util_tdb.h"
23 #include "serverid.h"
24 #include "ctdbd_conn.h"
25 #include "system/select.h"
26 #include "lib/sys_rw_data.h"
27 #include "lib/util/iov_buf.h"
28
29 #include "messages.h"
30
31 /* paths to these include files come from --with-ctdb= in configure */
32
33 #include "ctdb.h"
34 #include "ctdb_private.h"
35
36 struct ctdbd_srvid_cb {
37         uint64_t srvid;
38         int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
39                   uint64_t dst_srvid,
40                   const uint8_t *msg, size_t msglen,
41                   void *private_data);
42         void *private_data;
43 };
44
45 struct ctdbd_connection {
46         const char *sockname;   /* Needed in ctdbd_traverse */
47         struct messaging_context *msg_ctx;
48         uint32_t reqid;
49         uint32_t our_vnn;
50         uint64_t rand_srvid;
51         struct ctdbd_srvid_cb *callbacks;
52         int fd;
53         struct tevent_fd *fde;
54         int timeout;
55 };
56
57 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
58 {
59         conn->reqid += 1;
60         if (conn->reqid == 0) {
61                 conn->reqid += 1;
62         }
63         return conn->reqid;
64 }
65
66 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
67                               uint32_t vnn, uint32_t opcode,
68                               uint64_t srvid, uint32_t flags, TDB_DATA data,
69                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
70                               int *cstatus);
71 static int ctdbd_control_unix(struct ctdbd_connection *conn,
72                               uint32_t vnn, uint32_t opcode,
73                               uint64_t srvid, uint32_t flags,
74                               TDB_DATA data,
75                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
76                               int *cstatus);
77
78 /*
79  * exit on fatal communications errors with the ctdbd daemon
80  */
81 static void cluster_fatal(const char *why)
82 {
83         DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
84         /* we don't use smb_panic() as we don't want to delay to write
85            a core file. We need to release this process id immediately
86            so that someone else can take over without getting sharing
87            violations */
88         _exit(1);
89 }
90
91 /*
92  *
93  */
94 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
95 {
96         if (DEBUGLEVEL < 11) {
97                 return;
98         }
99         DEBUGADD(11, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
100                       (int)hdr->length, (int)hdr->ctdb_magic,
101                       (int)hdr->ctdb_version, (int)hdr->generation,
102                       (int)hdr->operation, (int)hdr->reqid));
103 }
104
105 /*
106  * Register a srvid with ctdbd
107  */
108 NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
109                              int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
110                                        uint64_t dst_srvid,
111                                        const uint8_t *msg, size_t msglen,
112                                        void *private_data),
113                              void *private_data)
114 {
115
116         int ret, cstatus;
117         size_t num_callbacks;
118         struct ctdbd_srvid_cb *tmp;
119
120         ret = ctdbd_control_unix(conn, CTDB_CURRENT_NODE,
121                                  CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
122                                  tdb_null, NULL, NULL, &cstatus);
123         if (ret != 0) {
124                 return map_nt_error_from_unix(ret);
125         }
126
127         num_callbacks = talloc_array_length(conn->callbacks);
128
129         tmp = talloc_realloc(conn, conn->callbacks, struct ctdbd_srvid_cb,
130                              num_callbacks + 1);
131         if (tmp == NULL) {
132                 return NT_STATUS_NO_MEMORY;
133         }
134         conn->callbacks = tmp;
135
136         conn->callbacks[num_callbacks] = (struct ctdbd_srvid_cb) {
137                 .srvid = srvid, .cb = cb, .private_data = private_data
138         };
139
140         return NT_STATUS_OK;
141 }
142
143 static int ctdbd_msg_call_back(struct ctdbd_connection *conn,
144                                struct ctdb_req_message *msg)
145 {
146         size_t msg_len;
147         size_t i, num_callbacks;
148
149         msg_len = msg->hdr.length;
150         if (msg_len < offsetof(struct ctdb_req_message, data)) {
151                 DEBUG(10, ("%s: len %u too small\n", __func__,
152                            (unsigned)msg_len));
153                 return 0;
154         }
155         msg_len -= offsetof(struct ctdb_req_message, data);
156
157         if (msg_len < msg->datalen) {
158                 DEBUG(10, ("%s: msg_len=%u < msg->datalen=%u\n", __func__,
159                            (unsigned)msg_len, (unsigned)msg->datalen));
160                 return 0;
161         }
162
163         num_callbacks = talloc_array_length(conn->callbacks);
164
165         for (i=0; i<num_callbacks; i++) {
166                 struct ctdbd_srvid_cb *cb = &conn->callbacks[i];
167
168                 if ((cb->srvid == msg->srvid) && (cb->cb != NULL)) {
169                         int ret;
170
171                         ret = cb->cb(msg->hdr.srcnode, msg->hdr.destnode,
172                                      msg->srvid, msg->data, msg->datalen,
173                                      cb->private_data);
174                         if (ret != 0) {
175                                 return ret;
176                         }
177                 }
178         }
179         return 0;
180 }
181
182 /*
183  * get our vnn from the cluster
184  */
185 static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
186 {
187         int32_t cstatus=-1;
188         int ret;
189         ret = ctdbd_control_unix(conn,
190                                  CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0, 0,
191                                  tdb_null, NULL, NULL, &cstatus);
192         if (ret != 0) {
193                 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
194                 return map_nt_error_from_unix(ret);
195         }
196         *vnn = (uint32_t)cstatus;
197         return NT_STATUS_OK;
198 }
199
200 /*
201  * Are we active (i.e. not banned or stopped?)
202  */
203 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
204 {
205         int32_t cstatus=-1;
206         TDB_DATA outdata;
207         struct ctdb_node_map *m;
208         uint32_t failure_flags;
209         bool ok = false;
210         int i, ret;
211
212         ret = ctdbd_control_unix(conn, CTDB_CURRENT_NODE,
213                                  CTDB_CONTROL_GET_NODEMAP, 0, 0,
214                                  tdb_null, talloc_tos(), &outdata, &cstatus);
215         if (ret != 0) {
216                 DEBUG(1, ("ctdbd_control failed: %s\n", strerror(ret)));
217                 return false;
218         }
219         if ((cstatus != 0) || (outdata.dptr == NULL)) {
220                 DEBUG(2, ("Received invalid ctdb data\n"));
221                 return false;
222         }
223
224         m = (struct ctdb_node_map *)outdata.dptr;
225
226         for (i=0; i<m->num; i++) {
227                 if (vnn == m->nodes[i].pnn) {
228                         break;
229                 }
230         }
231
232         if (i == m->num) {
233                 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
234                           (int)vnn));
235                 goto fail;
236         }
237
238         failure_flags = NODE_FLAGS_BANNED | NODE_FLAGS_DISCONNECTED
239                 | NODE_FLAGS_PERMANENTLY_DISABLED | NODE_FLAGS_STOPPED;
240
241         if ((m->nodes[i].flags & failure_flags) != 0) {
242                 DEBUG(2, ("Node has status %x, not active\n",
243                           (int)m->nodes[i].flags));
244                 goto fail;
245         }
246
247         ok = true;
248 fail:
249         TALLOC_FREE(outdata.dptr);
250         return ok;
251 }
252
253 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
254 {
255         return conn->our_vnn;
256 }
257
258 /*
259  * Get us a ctdb connection
260  */
261
262 static int ctdbd_connect(const char *sockname, int *pfd)
263 {
264         struct sockaddr_un addr = { 0, };
265         int fd;
266         socklen_t salen;
267         size_t namelen;
268
269         fd = socket(AF_UNIX, SOCK_STREAM, 0);
270         if (fd == -1) {
271                 int err = errno;
272                 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
273                 return err;
274         }
275
276         addr.sun_family = AF_UNIX;
277
278         namelen = strlcpy(addr.sun_path, sockname, sizeof(addr.sun_path));
279         if (namelen >= sizeof(addr.sun_path)) {
280                 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
281                           sockname));
282                 close(fd);
283                 return ENAMETOOLONG;
284         }
285
286         salen = sizeof(struct sockaddr_un);
287
288         if (connect(fd, (struct sockaddr *)(void *)&addr, salen) == -1) {
289                 int err = errno;
290                 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
291                           strerror(err)));
292                 close(fd);
293                 return err;
294         }
295
296         *pfd = fd;
297         return 0;
298 }
299
300 static int ctdb_read_packet(int fd, int timeout, TALLOC_CTX *mem_ctx,
301                             struct ctdb_req_header **result)
302 {
303         struct ctdb_req_header *req;
304         int ret, revents;
305         uint32_t msglen;
306         ssize_t nread;
307
308         if (timeout != -1) {
309                 ret = poll_one_fd(fd, POLLIN, timeout, &revents);
310                 if (ret == -1) {
311                         return errno;
312                 }
313                 if (ret == 0) {
314                         return ETIMEDOUT;
315                 }
316                 if (ret != 1) {
317                         return EIO;
318                 }
319         }
320
321         nread = read_data(fd, &msglen, sizeof(msglen));
322         if (nread == -1) {
323                 return errno;
324         }
325         if (nread == 0) {
326                 return EIO;
327         }
328
329         if (msglen < sizeof(struct ctdb_req_header)) {
330                 return EIO;
331         }
332
333         req = talloc_size(mem_ctx, msglen);
334         if (req == NULL) {
335                 return ENOMEM;
336         }
337         talloc_set_name_const(req, "struct ctdb_req_header");
338
339         req->length = msglen;
340
341         nread = read_data(fd, ((char *)req) + sizeof(msglen),
342                           msglen - sizeof(msglen));
343         if (nread == -1) {
344                 TALLOC_FREE(req);
345                 return errno;
346         }
347         if (nread == 0) {
348                 TALLOC_FREE(req);
349                 return EIO;
350         }
351
352         *result = req;
353         return 0;
354 }
355
356 /*
357  * Read a full ctdbd request. If we have a messaging context, defer incoming
358  * messages that might come in between.
359  */
360
361 static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
362                          TALLOC_CTX *mem_ctx, struct ctdb_req_header **result)
363 {
364         struct ctdb_req_header *hdr;
365         int ret;
366
367  next_pkt:
368
369         ret = ctdb_read_packet(conn->fd, conn->timeout, mem_ctx, &hdr);
370         if (ret != 0) {
371                 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret)));
372                 cluster_fatal("ctdbd died\n");
373         }
374
375         DEBUG(11, ("Received ctdb packet\n"));
376         ctdb_packet_dump(hdr);
377
378         if (hdr->operation == CTDB_REQ_MESSAGE) {
379                 struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
380
381                 if (conn->msg_ctx == NULL) {
382                         DEBUG(1, ("Got a message without having a msg ctx, "
383                                   "dropping msg %llu\n",
384                                   (long long unsigned)msg->srvid));
385                         TALLOC_FREE(hdr);
386                         goto next_pkt;
387                 }
388
389                 ret = ctdbd_msg_call_back(conn, msg);
390                 if (ret != 0) {
391                         TALLOC_FREE(hdr);
392                         return ret;
393                 }
394
395                 TALLOC_FREE(hdr);
396                 goto next_pkt;
397         }
398
399         if ((reqid != 0) && (hdr->reqid != reqid)) {
400                 /* we got the wrong reply */
401                 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
402                          "been %u\n", hdr->reqid, reqid));
403                 TALLOC_FREE(hdr);
404                 goto next_pkt;
405         }
406
407         *result = talloc_move(mem_ctx, &hdr);
408
409         return 0;
410 }
411
412 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
413 {
414         TALLOC_FREE(c->fde);
415         if (c->fd != -1) {
416                 close(c->fd);
417                 c->fd = -1;
418         }
419         return 0;
420 }
421 /*
422  * Get us a ctdbd connection
423  */
424
425 static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
426                                       const char *sockname, int timeout,
427                                       struct ctdbd_connection **pconn)
428 {
429         struct ctdbd_connection *conn;
430         int ret;
431         NTSTATUS status;
432
433         if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
434                 DEBUG(0, ("talloc failed\n"));
435                 return NT_STATUS_NO_MEMORY;
436         }
437
438         conn->sockname = talloc_strdup(conn, sockname);
439         if (conn->sockname == NULL) {
440                 DBG_ERR("%s: talloc failed\n", __func__);
441                 status = NT_STATUS_NO_MEMORY;
442                 goto fail;
443         }
444
445         conn->timeout = timeout;
446
447         if (conn->timeout == 0) {
448                 conn->timeout = -1;
449         }
450
451         ret = ctdbd_connect(conn->sockname, &conn->fd);
452         if (ret != 0) {
453                 status = map_nt_error_from_unix(ret);
454                 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret)));
455                 goto fail;
456         }
457         talloc_set_destructor(conn, ctdbd_connection_destructor);
458
459         status = get_cluster_vnn(conn, &conn->our_vnn);
460
461         if (!NT_STATUS_IS_OK(status)) {
462                 DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status)));
463                 goto fail;
464         }
465
466         if (!ctdbd_working(conn, conn->our_vnn)) {
467                 DEBUG(2, ("Node is not working, can not connect\n"));
468                 status = NT_STATUS_INTERNAL_DB_ERROR;
469                 goto fail;
470         }
471
472         generate_random_buffer((unsigned char *)&conn->rand_srvid,
473                                sizeof(conn->rand_srvid));
474
475         status = register_with_ctdbd(conn, conn->rand_srvid, NULL, NULL);
476
477         if (!NT_STATUS_IS_OK(status)) {
478                 DEBUG(5, ("Could not register random srvid: %s\n",
479                           nt_errstr(status)));
480                 goto fail;
481         }
482
483         *pconn = conn;
484         return NT_STATUS_OK;
485
486  fail:
487         TALLOC_FREE(conn);
488         return status;
489 }
490
491 /*
492  * Get us a ctdbd connection and register us as a process
493  */
494
495 NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
496                                     const char *sockname, int timeout,
497                                     struct ctdbd_connection **pconn)
498 {
499         struct ctdbd_connection *conn;
500         NTSTATUS status;
501
502         status = ctdbd_init_connection(mem_ctx, sockname, timeout, &conn);
503
504         if (!NT_STATUS_IS_OK(status)) {
505                 return status;
506         }
507
508         status = register_with_ctdbd(conn, MSG_SRVID_SAMBA, NULL, NULL);
509         if (!NT_STATUS_IS_OK(status)) {
510                 goto fail;
511         }
512
513         *pconn = conn;
514         return NT_STATUS_OK;
515
516  fail:
517         TALLOC_FREE(conn);
518         return status;
519 }
520
521 struct messaging_context *ctdb_conn_msg_ctx(struct ctdbd_connection *conn)
522 {
523         return conn->msg_ctx;
524 }
525
526 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
527 {
528         return conn->fd;
529 }
530
531 /*
532  * Packet handler to receive and handle a ctdb message
533  */
534 static int ctdb_handle_message(struct ctdbd_connection *conn,
535                                struct ctdb_req_header *hdr)
536 {
537         struct ctdb_req_message *msg;
538
539         if (hdr->operation != CTDB_REQ_MESSAGE) {
540                 DEBUG(0, ("Received async msg of type %u, discarding\n",
541                           hdr->operation));
542                 return EINVAL;
543         }
544
545         msg = (struct ctdb_req_message *)hdr;
546
547         ctdbd_msg_call_back(conn, msg);
548
549         return 0;
550 }
551
552 /*
553  * The ctdbd socket is readable asynchronuously
554  */
555
556 static void ctdbd_socket_handler(struct tevent_context *event_ctx,
557                                  struct tevent_fd *event,
558                                  uint16_t flags,
559                                  void *private_data)
560 {
561         struct ctdbd_connection *conn = talloc_get_type_abort(
562                 private_data, struct ctdbd_connection);
563         struct ctdb_req_header *hdr = NULL;
564         int ret;
565
566         ret = ctdb_read_packet(conn->fd, conn->timeout, talloc_tos(), &hdr);
567         if (ret != 0) {
568                 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret)));
569                 cluster_fatal("ctdbd died\n");
570         }
571
572         ret = ctdb_handle_message(conn, hdr);
573
574         TALLOC_FREE(hdr);
575
576         if (ret != 0) {
577                 DEBUG(10, ("could not handle incoming message: %s\n",
578                            strerror(ret)));
579         }
580 }
581
582 /*
583  * Prepare a ctdbd connection to receive messages
584  */
585
586 NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn,
587                                 struct messaging_context *msg_ctx)
588 {
589         SMB_ASSERT(conn->msg_ctx == NULL);
590         SMB_ASSERT(conn->fde == NULL);
591
592         if (!(conn->fde = tevent_add_fd(messaging_tevent_context(msg_ctx),
593                                        conn,
594                                        conn->fd,
595                                        TEVENT_FD_READ,
596                                        ctdbd_socket_handler,
597                                        conn))) {
598                 DEBUG(0, ("event_add_fd failed\n"));
599                 return NT_STATUS_NO_MEMORY;
600         }
601
602         conn->msg_ctx = msg_ctx;
603
604         return NT_STATUS_OK;
605 }
606
607 NTSTATUS ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
608                                   uint32_t dst_vnn, uint64_t dst_srvid,
609                                   const struct iovec *iov, int iovlen)
610 {
611         struct ctdb_req_message r;
612         struct iovec iov2[iovlen+1];
613         size_t buflen = iov_buflen(iov, iovlen);
614         ssize_t nwritten;
615
616         r.hdr.length = offsetof(struct ctdb_req_message, data) + buflen;
617         r.hdr.ctdb_magic = CTDB_MAGIC;
618         r.hdr.ctdb_version = CTDB_PROTOCOL;
619         r.hdr.generation = 1;
620         r.hdr.operation  = CTDB_REQ_MESSAGE;
621         r.hdr.destnode   = dst_vnn;
622         r.hdr.srcnode    = conn->our_vnn;
623         r.hdr.reqid      = 0;
624         r.srvid          = dst_srvid;
625         r.datalen        = buflen;
626
627         DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
628         ctdb_packet_dump(&r.hdr);
629
630         iov2[0].iov_base = &r;
631         iov2[0].iov_len = offsetof(struct ctdb_req_message, data);
632         memcpy(&iov2[1], iov, iovlen * sizeof(struct iovec));
633
634         nwritten = write_data_iov(conn->fd, iov2, iovlen+1);
635         if (nwritten == -1) {
636                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
637                 cluster_fatal("cluster dispatch daemon msg write error\n");
638         }
639
640         return NT_STATUS_OK;
641 }
642
643 /*
644  * send/recv a generic ctdb control message
645  */
646 static int ctdbd_control_unix(struct ctdbd_connection *conn,
647                               uint32_t vnn, uint32_t opcode,
648                               uint64_t srvid, uint32_t flags,
649                               TDB_DATA data,
650                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
651                               int *cstatus)
652 {
653         struct ctdb_req_control req;
654         struct ctdb_req_header *hdr;
655         struct ctdb_reply_control *reply = NULL;
656         struct iovec iov[2];
657         ssize_t nwritten;
658         int ret;
659
660         ZERO_STRUCT(req);
661         req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize;
662         req.hdr.ctdb_magic   = CTDB_MAGIC;
663         req.hdr.ctdb_version = CTDB_PROTOCOL;
664         req.hdr.operation    = CTDB_REQ_CONTROL;
665         req.hdr.reqid        = ctdbd_next_reqid(conn);
666         req.hdr.destnode     = vnn;
667         req.opcode           = opcode;
668         req.srvid            = srvid;
669         req.datalen          = data.dsize;
670         req.flags            = flags;
671
672         DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
673         ctdb_packet_dump(&req.hdr);
674
675         iov[0].iov_base = &req;
676         iov[0].iov_len = offsetof(struct ctdb_req_control, data);
677         iov[1].iov_base = data.dptr;
678         iov[1].iov_len = data.dsize;
679
680         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
681         if (nwritten == -1) {
682                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
683                 cluster_fatal("cluster dispatch daemon msg write error\n");
684         }
685
686         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
687                 if (cstatus) {
688                         *cstatus = 0;
689                 }
690                 return 0;
691         }
692
693         ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
694         if (ret != 0) {
695                 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
696                 return ret;
697         }
698
699         if (hdr->operation != CTDB_REPLY_CONTROL) {
700                 DEBUG(0, ("received invalid reply\n"));
701                 TALLOC_FREE(hdr);
702                 return EIO;
703         }
704         reply = (struct ctdb_reply_control *)hdr;
705
706         if (outdata) {
707                 if (!(outdata->dptr = (uint8_t *)talloc_memdup(
708                               mem_ctx, reply->data, reply->datalen))) {
709                         TALLOC_FREE(reply);
710                         return ENOMEM;
711                 }
712                 outdata->dsize = reply->datalen;
713         }
714         if (cstatus) {
715                 (*cstatus) = reply->status;
716         }
717
718         TALLOC_FREE(reply);
719         return ret;
720 }
721
722 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
723                               uint32_t vnn, uint32_t opcode,
724                               uint64_t srvid, uint32_t flags,
725                               TDB_DATA data,
726                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
727                               int *cstatus)
728 {
729         int ret;
730
731         ret = ctdbd_control_unix(conn, vnn, opcode, srvid, flags, data,
732                                  mem_ctx, outdata, cstatus);
733         if (ret != 0) {
734                 return map_nt_error_from_unix(ret);
735         }
736         return NT_STATUS_OK;
737 }
738
739 /*
740  * see if a remote process exists
741  */
742 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn, pid_t pid)
743 {
744         struct server_id id;
745         bool result;
746
747         id.pid = pid;
748         id.vnn = vnn;
749
750         if (!ctdb_processes_exist(conn, &id, 1, &result)) {
751                 DEBUG(10, ("ctdb_processes_exist failed\n"));
752                 return false;
753         }
754         return result;
755 }
756
757 bool ctdb_processes_exist(struct ctdbd_connection *conn,
758                           const struct server_id *pids, int num_pids,
759                           bool *results)
760 {
761         TALLOC_CTX *frame = talloc_stackframe();
762         int i, num_received;
763         uint32_t *reqids;
764         bool result = false;
765
766         reqids = talloc_array(talloc_tos(), uint32_t, num_pids);
767         if (reqids == NULL) {
768                 goto fail;
769         }
770
771         for (i=0; i<num_pids; i++) {
772                 struct ctdb_req_control req;
773                 pid_t pid;
774                 struct iovec iov[2];
775                 ssize_t nwritten;
776
777                 results[i] = false;
778                 reqids[i] = ctdbd_next_reqid(conn);
779
780                 ZERO_STRUCT(req);
781
782                 /*
783                  * pids[i].pid is uint64_t, scale down to pid_t which
784                  * is the wire protocol towards ctdb.
785                  */
786                 pid = pids[i].pid;
787
788                 DEBUG(10, ("Requesting PID %d/%d, reqid=%d\n",
789                            (int)pids[i].vnn, (int)pid,
790                            (int)reqids[i]));
791
792                 req.hdr.length = offsetof(struct ctdb_req_control, data);
793                 req.hdr.length += sizeof(pid);
794                 req.hdr.ctdb_magic   = CTDB_MAGIC;
795                 req.hdr.ctdb_version = CTDB_PROTOCOL;
796                 req.hdr.operation    = CTDB_REQ_CONTROL;
797                 req.hdr.reqid        = reqids[i];
798                 req.hdr.destnode     = pids[i].vnn;
799                 req.opcode           = CTDB_CONTROL_PROCESS_EXISTS;
800                 req.srvid            = 0;
801                 req.datalen          = sizeof(pid);
802                 req.flags            = 0;
803
804                 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
805                 ctdb_packet_dump(&req.hdr);
806
807                 iov[0].iov_base = &req;
808                 iov[0].iov_len = offsetof(struct ctdb_req_control, data);
809                 iov[1].iov_base = &pid;
810                 iov[1].iov_len = sizeof(pid);
811
812                 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
813                 if (nwritten == -1) {
814                         DEBUG(10, ("write_data_iov failed: %s\n",
815                                    strerror(errno)));
816                         goto fail;
817                 }
818         }
819
820         num_received = 0;
821
822         while (num_received < num_pids) {
823                 struct ctdb_req_header *hdr;
824                 struct ctdb_reply_control *reply;
825                 uint32_t reqid;
826                 int ret;
827
828                 ret = ctdb_read_req(conn, 0, talloc_tos(), &hdr);
829                 if (ret != 0) {
830                         DEBUG(10, ("ctdb_read_req failed: %s\n",
831                                    strerror(ret)));
832                         goto fail;
833                 }
834
835                 if (hdr->operation != CTDB_REPLY_CONTROL) {
836                         DEBUG(10, ("Received invalid reply\n"));
837                         goto fail;
838                 }
839                 reply = (struct ctdb_reply_control *)hdr;
840
841                 reqid = reply->hdr.reqid;
842
843                 DEBUG(10, ("Received reqid %d\n", (int)reqid));
844
845                 for (i=0; i<num_pids; i++) {
846                         if (reqid == reqids[i]) {
847                                 break;
848                         }
849                 }
850                 if (i == num_pids) {
851                         DEBUG(10, ("Received unknown record number %u\n",
852                                    (unsigned)reqid));
853                         goto fail;
854                 }
855                 results[i] = ((reply->status) == 0);
856                 TALLOC_FREE(reply);
857                 num_received += 1;
858         }
859
860         result = true;
861 fail:
862         TALLOC_FREE(frame);
863         return result;
864 }
865
866 /*
867  * Get a db path
868  */
869 char *ctdbd_dbpath(struct ctdbd_connection *conn,
870                    TALLOC_CTX *mem_ctx, uint32_t db_id)
871 {
872         int ret;
873         TDB_DATA data;
874         TDB_DATA rdata = {0};
875         int32_t cstatus = 0;
876
877         data.dptr = (uint8_t*)&db_id;
878         data.dsize = sizeof(db_id);
879
880         ret = ctdbd_control_unix(conn, CTDB_CURRENT_NODE,
881                                  CTDB_CONTROL_GETDBPATH, 0, 0, data,
882                                  mem_ctx, &rdata, &cstatus);
883         if ((ret != 0) || cstatus != 0) {
884                 DEBUG(0, (__location__ " ctdb_control for getdbpath failed: %s\n",
885                           strerror(ret)));
886                 return NULL;
887         }
888
889         return (char *)rdata.dptr;
890 }
891
892 /*
893  * attach to a ctdb database
894  */
895 NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn,
896                          const char *name, uint32_t *db_id, int tdb_flags)
897 {
898         NTSTATUS status;
899         TDB_DATA data;
900         int32_t cstatus;
901         bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
902
903         data = string_term_tdb_data(name);
904
905         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
906                                persistent
907                                ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
908                                : CTDB_CONTROL_DB_ATTACH,
909                                tdb_flags, 0, data, NULL, &data, &cstatus);
910         if (!NT_STATUS_IS_OK(status)) {
911                 DEBUG(0, (__location__ " ctdb_control for db_attach "
912                           "failed: %s\n", nt_errstr(status)));
913                 return status;
914         }
915
916         if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
917                 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
918                 return NT_STATUS_INTERNAL_ERROR;
919         }
920
921         *db_id = *(uint32_t *)data.dptr;
922         talloc_free(data.dptr);
923
924         if (!(tdb_flags & TDB_SEQNUM)) {
925                 return NT_STATUS_OK;
926         }
927
928         data.dptr = (uint8_t *)db_id;
929         data.dsize = sizeof(*db_id);
930
931         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
932                                CTDB_CONTROL_ENABLE_SEQNUM, 0, 0, data,
933                                NULL, NULL, &cstatus);
934         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
935                 DEBUG(0,(__location__ " ctdb_control for enable seqnum "
936                          "failed\n"));
937                 return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR :
938                         status;
939         }
940
941         return NT_STATUS_OK;
942 }
943
944 /*
945  * force the migration of a record to this node
946  */
947 NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id,
948                        TDB_DATA key)
949 {
950         struct ctdb_req_call req;
951         struct ctdb_req_header *hdr;
952         struct iovec iov[2];
953         ssize_t nwritten;
954         NTSTATUS status;
955         int ret;
956
957         ZERO_STRUCT(req);
958
959         req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
960         req.hdr.ctdb_magic   = CTDB_MAGIC;
961         req.hdr.ctdb_version = CTDB_PROTOCOL;
962         req.hdr.operation    = CTDB_REQ_CALL;
963         req.hdr.reqid        = ctdbd_next_reqid(conn);
964         req.flags            = CTDB_IMMEDIATE_MIGRATION;
965         req.callid           = CTDB_NULL_FUNC;
966         req.db_id            = db_id;
967         req.keylen           = key.dsize;
968
969         DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
970         ctdb_packet_dump(&req.hdr);
971
972         iov[0].iov_base = &req;
973         iov[0].iov_len = offsetof(struct ctdb_req_call, data);
974         iov[1].iov_base = key.dptr;
975         iov[1].iov_len = key.dsize;
976
977         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
978         if (nwritten == -1) {
979                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
980                 cluster_fatal("cluster dispatch daemon msg write error\n");
981         }
982
983         ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
984         if (ret != 0) {
985                 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
986                 status = map_nt_error_from_unix(ret);
987                 goto fail;
988         }
989
990         if (hdr->operation != CTDB_REPLY_CALL) {
991                 DEBUG(0, ("received invalid reply\n"));
992                 status = NT_STATUS_INTERNAL_ERROR;
993                 goto fail;
994         }
995
996         status = NT_STATUS_OK;
997  fail:
998
999         TALLOC_FREE(hdr);
1000         return status;
1001 }
1002
1003 /*
1004  * Fetch a record and parse it
1005  */
1006 NTSTATUS ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
1007                      TDB_DATA key, bool local_copy,
1008                      void (*parser)(TDB_DATA key, TDB_DATA data,
1009                                     void *private_data),
1010                      void *private_data)
1011 {
1012         struct ctdb_req_call req;
1013         struct ctdb_req_header *hdr = NULL;
1014         struct ctdb_reply_call *reply;
1015         struct iovec iov[2];
1016         ssize_t nwritten;
1017         NTSTATUS status;
1018         uint32_t flags;
1019         int ret;
1020
1021         flags = local_copy ? CTDB_WANT_READONLY : 0;
1022
1023         ZERO_STRUCT(req);
1024
1025         req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1026         req.hdr.ctdb_magic   = CTDB_MAGIC;
1027         req.hdr.ctdb_version = CTDB_PROTOCOL;
1028         req.hdr.operation    = CTDB_REQ_CALL;
1029         req.hdr.reqid        = ctdbd_next_reqid(conn);
1030         req.flags            = flags;
1031         req.callid           = CTDB_FETCH_FUNC;
1032         req.db_id            = db_id;
1033         req.keylen           = key.dsize;
1034
1035         iov[0].iov_base = &req;
1036         iov[0].iov_len = offsetof(struct ctdb_req_call, data);
1037         iov[1].iov_base = key.dptr;
1038         iov[1].iov_len = key.dsize;
1039
1040         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
1041         if (nwritten == -1) {
1042                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
1043                 cluster_fatal("cluster dispatch daemon msg write error\n");
1044         }
1045
1046         ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
1047         if (ret != 0) {
1048                 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
1049                 status = map_nt_error_from_unix(ret);
1050                 goto fail;
1051         }
1052
1053         if ((hdr == NULL) || (hdr->operation != CTDB_REPLY_CALL)) {
1054                 DEBUG(0, ("received invalid reply\n"));
1055                 status = NT_STATUS_INTERNAL_ERROR;
1056                 goto fail;
1057         }
1058         reply = (struct ctdb_reply_call *)hdr;
1059
1060         if (reply->datalen == 0) {
1061                 /*
1062                  * Treat an empty record as non-existing
1063                  */
1064                 status = NT_STATUS_NOT_FOUND;
1065                 goto fail;
1066         }
1067
1068         parser(key, make_tdb_data(&reply->data[0], reply->datalen),
1069                private_data);
1070
1071         status = NT_STATUS_OK;
1072  fail:
1073         TALLOC_FREE(hdr);
1074         return status;
1075 }
1076
1077 /*
1078   Traverse a ctdb database. This uses a kind-of hackish way to open a second
1079   connection to ctdbd to avoid the hairy recursive and async problems with
1080   everything in-line.
1081 */
1082
1083 NTSTATUS ctdbd_traverse(struct ctdbd_connection *master, uint32_t db_id,
1084                         void (*fn)(TDB_DATA key, TDB_DATA data,
1085                                    void *private_data),
1086                         void *private_data)
1087 {
1088         struct ctdbd_connection *conn;
1089         NTSTATUS status;
1090
1091         TDB_DATA key, data;
1092         struct ctdb_traverse_start t;
1093         int cstatus;
1094
1095         become_root();
1096         status = ctdbd_init_connection(NULL, master->sockname, master->timeout,
1097                                        &conn);
1098         unbecome_root();
1099         if (!NT_STATUS_IS_OK(status)) {
1100                 DEBUG(0, ("ctdbd_init_connection failed: %s\n",
1101                           nt_errstr(status)));
1102                 return status;
1103         }
1104
1105         t.db_id = db_id;
1106         t.srvid = conn->rand_srvid;
1107         t.reqid = ctdbd_next_reqid(conn);
1108
1109         data.dptr = (uint8_t *)&t;
1110         data.dsize = sizeof(t);
1111
1112         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1113                                CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, 0,
1114                                data, NULL, NULL, &cstatus);
1115
1116         if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1117
1118                 DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status),
1119                          cstatus));
1120
1121                 if (NT_STATUS_IS_OK(status)) {
1122                         /*
1123                          * We need a mapping here
1124                          */
1125                         status = NT_STATUS_UNSUCCESSFUL;
1126                 }
1127                 TALLOC_FREE(conn);
1128                 return status;
1129         }
1130
1131         while (True) {
1132                 struct ctdb_req_header *hdr = NULL;
1133                 struct ctdb_req_message *m;
1134                 struct ctdb_rec_data *d;
1135                 int ret;
1136
1137                 ret = ctdb_read_packet(conn->fd, conn->timeout, conn, &hdr);
1138                 if (ret != 0) {
1139                         DEBUG(0, ("ctdb_read_packet failed: %s\n",
1140                                   strerror(ret)));
1141                         cluster_fatal("ctdbd died\n");
1142                 }
1143
1144                 if (hdr->operation != CTDB_REQ_MESSAGE) {
1145                         DEBUG(0, ("Got operation %u, expected a message\n",
1146                                   (unsigned)hdr->operation));
1147                         TALLOC_FREE(conn);
1148                         return NT_STATUS_UNEXPECTED_IO_ERROR;
1149                 }
1150
1151                 m = (struct ctdb_req_message *)hdr;
1152                 d = (struct ctdb_rec_data *)&m->data[0];
1153                 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1154                         DEBUG(0, ("Got invalid traverse data of length %d\n",
1155                                   (int)m->datalen));
1156                         TALLOC_FREE(conn);
1157                         return NT_STATUS_UNEXPECTED_IO_ERROR;
1158                 }
1159
1160                 key.dsize = d->keylen;
1161                 key.dptr  = &d->data[0];
1162                 data.dsize = d->datalen;
1163                 data.dptr = &d->data[d->keylen];
1164
1165                 if (key.dsize == 0 && data.dsize == 0) {
1166                         /* end of traverse */
1167                         TALLOC_FREE(conn);
1168                         return NT_STATUS_OK;
1169                 }
1170
1171                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1172                         DEBUG(0, ("Got invalid ltdb header length %d\n",
1173                                   (int)data.dsize));
1174                         TALLOC_FREE(conn);
1175                         return NT_STATUS_UNEXPECTED_IO_ERROR;
1176                 }
1177                 data.dsize -= sizeof(struct ctdb_ltdb_header);
1178                 data.dptr += sizeof(struct ctdb_ltdb_header);
1179
1180                 if (fn != NULL) {
1181                         fn(key, data, private_data);
1182                 }
1183         }
1184         return NT_STATUS_OK;
1185 }
1186
1187 /*
1188    This is used to canonicalize a ctdb_sock_addr structure.
1189 */
1190 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1191                                       struct sockaddr_storage *out)
1192 {
1193         memcpy(out, in, sizeof (*out));
1194
1195 #ifdef HAVE_IPV6
1196         if (in->ss_family == AF_INET6) {
1197                 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1198                 const struct sockaddr_in6 *in6 =
1199                         (const struct sockaddr_in6 *)in;
1200                 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1201                 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1202                         memset(out, 0, sizeof(*out));
1203 #ifdef HAVE_SOCK_SIN_LEN
1204                         out4->sin_len = sizeof(*out);
1205 #endif
1206                         out4->sin_family = AF_INET;
1207                         out4->sin_port   = in6->sin6_port;
1208                         memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1209                 }
1210         }
1211 #endif
1212 }
1213
1214 /*
1215  * Register us as a server for a particular tcp connection
1216  */
1217
1218 NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn,
1219                             const struct sockaddr_storage *_server,
1220                             const struct sockaddr_storage *_client,
1221                             int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
1222                                       uint64_t dst_srvid,
1223                                       const uint8_t *msg, size_t msglen,
1224                                       void *private_data),
1225                             void *private_data)
1226 {
1227         struct ctdb_control_tcp_addr p;
1228         TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1229         NTSTATUS status;
1230         struct sockaddr_storage client;
1231         struct sockaddr_storage server;
1232
1233         /*
1234          * Only one connection so far
1235          */
1236
1237         smbd_ctdb_canonicalize_ip(_client, &client);
1238         smbd_ctdb_canonicalize_ip(_server, &server);
1239
1240         switch (client.ss_family) {
1241         case AF_INET:
1242                 memcpy(&p.dest.ip, &server, sizeof(p.dest.ip));
1243                 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1244                 break;
1245         case AF_INET6:
1246                 memcpy(&p.dest.ip6, &server, sizeof(p.dest.ip6));
1247                 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1248                 break;
1249         default:
1250                 return NT_STATUS_INTERNAL_ERROR;
1251         }
1252
1253         /*
1254          * We want to be told about IP releases
1255          */
1256
1257         status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1258                                      cb, private_data);
1259         if (!NT_STATUS_IS_OK(status)) {
1260                 return status;
1261         }
1262
1263         /*
1264          * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1265          * can send an extra ack to trigger a reset for our client, so it
1266          * immediately reconnects
1267          */
1268         return ctdbd_control(conn, CTDB_CURRENT_NODE,
1269                              CTDB_CONTROL_TCP_CLIENT, 0,
1270                              CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL);
1271 }
1272
1273 /*
1274   call a control on the local node
1275  */
1276 NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1277                              uint64_t srvid, uint32_t flags, TDB_DATA data,
1278                              TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1279                              int *cstatus)
1280 {
1281         return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data, mem_ctx, outdata, cstatus);
1282 }
1283
1284 NTSTATUS ctdb_watch_us(struct ctdbd_connection *conn)
1285 {
1286         struct ctdb_client_notify_register reg_data;
1287         size_t struct_len;
1288         NTSTATUS status;
1289         int cstatus;
1290
1291         reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1292         reg_data.len = 1;
1293         reg_data.notify_data[0] = 0;
1294
1295         struct_len = offsetof(struct ctdb_client_notify_register,
1296                               notify_data) + reg_data.len;
1297
1298         status = ctdbd_control_local(
1299                 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1300                 make_tdb_data((uint8_t *)&reg_data, struct_len),
1301                 NULL, NULL, &cstatus);
1302         if (!NT_STATUS_IS_OK(status)) {
1303                 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1304                           nt_errstr(status)));
1305         }
1306         return status;
1307 }
1308
1309 NTSTATUS ctdb_unwatch(struct ctdbd_connection *conn)
1310 {
1311         struct ctdb_client_notify_deregister dereg_data;
1312         NTSTATUS status;
1313         int cstatus;
1314
1315         dereg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1316
1317         status = ctdbd_control_local(
1318                 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1319                 make_tdb_data((uint8_t *)&dereg_data, sizeof(dereg_data)),
1320                 NULL, NULL, &cstatus);
1321         if (!NT_STATUS_IS_OK(status)) {
1322                 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1323                           nt_errstr(status)));
1324         }
1325         return status;
1326 }
1327
1328 NTSTATUS ctdbd_probe(const char *sockname, int timeout)
1329 {
1330         /*
1331          * Do a very early check if ctdbd is around to avoid an abort and core
1332          * later
1333          */
1334         struct ctdbd_connection *conn = NULL;
1335         NTSTATUS status;
1336
1337         status = ctdbd_messaging_connection(talloc_tos(), sockname, timeout,
1338                                             &conn);
1339
1340         /*
1341          * We only care if we can connect.
1342          */
1343         TALLOC_FREE(conn);
1344
1345         return status;
1346 }