2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) 2007 by Volker Lendecke
5 Copyright (C) 2007 by Andrew Tridgell
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #ifdef CLUSTER_SUPPORT
25 #include "librpc/gen_ndr/messaging.h"
26 #include "librpc/gen_ndr/ndr_messaging.h"
27 #include "ctdbd_conn.h"
31 /* paths to these include files come from --with-ctdb= in configure */
33 #include "ctdb_private.h"
35 struct ctdbd_connection {
36 struct messaging_context *msg_ctx;
40 struct packet_context *pkt;
43 void (*release_ip_handler)(const char *ip_addr, void *private_data);
44 void *release_ip_priv;
47 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
48 uint32_t vnn, uint32 opcode,
49 uint64_t srvid, uint32_t flags, TDB_DATA data,
50 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
54 * exit on fatal communications errors with the ctdbd daemon
56 static void cluster_fatal(const char *why)
58 DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
59 /* we don't use smb_panic() as we don't want to delay to write
60 a core file. We need to release this process id immediately
61 so that someone else can take over without getting sharing
69 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
71 if (DEBUGLEVEL < 10) {
74 DEBUGADD(10, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
75 (int)hdr->length, (int)hdr->ctdb_magic,
76 (int)hdr->ctdb_version, (int)hdr->generation,
77 (int)hdr->operation, (int)hdr->reqid));
81 * Register a srvid with ctdbd
83 static NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn,
88 return ctdbd_control(conn, CTDB_CURRENT_NODE,
89 CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
90 tdb_null, NULL, NULL, &cstatus);
94 * get our vnn from the cluster
96 static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32 *vnn)
100 status = ctdbd_control(conn,
101 CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0, 0,
102 tdb_null, NULL, NULL, &cstatus);
103 if (!NT_STATUS_IS_OK(status)) {
104 cluster_fatal("ctdbd_control failed\n");
106 *vnn = (uint32_t)cstatus;
111 * Are we active (i.e. not banned or stopped?)
113 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
118 struct ctdb_node_map *m;
119 uint32_t failure_flags;
123 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
124 CTDB_CONTROL_GET_NODEMAP, 0, 0,
125 tdb_null, talloc_tos(), &outdata, &cstatus);
126 if (!NT_STATUS_IS_OK(status)) {
127 cluster_fatal("ctdbd_control failed\n");
129 if ((cstatus != 0) || (outdata.dptr == NULL)) {
130 DEBUG(2, ("Received invalid ctdb data\n"));
134 m = (struct ctdb_node_map *)outdata.dptr;
136 for (i=0; i<m->num; i++) {
137 if (vnn == m->nodes[i].pnn) {
143 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
148 failure_flags = NODE_FLAGS_BANNED | NODE_FLAGS_DISCONNECTED
149 | NODE_FLAGS_PERMANENTLY_DISABLED | NODE_FLAGS_STOPPED;
151 if ((m->nodes[i].flags & failure_flags) != 0) {
152 DEBUG(2, ("Node has status %x, not active\n",
153 (int)m->nodes[i].flags));
159 TALLOC_FREE(outdata.dptr);
163 uint32 ctdbd_vnn(const struct ctdbd_connection *conn)
165 return conn->our_vnn;
169 * Get us a ctdb connection
172 static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx,
173 struct packet_context **presult)
175 struct packet_context *result;
176 const char *sockname = lp_ctdbd_socket();
177 struct sockaddr_un addr;
180 if (!sockname || !*sockname) {
181 sockname = CTDB_PATH;
184 fd = socket(AF_UNIX, SOCK_STREAM, 0);
186 DEBUG(3, ("Could not create socket: %s\n", strerror(errno)));
187 return map_nt_error_from_unix(errno);
191 addr.sun_family = AF_UNIX;
192 strncpy(addr.sun_path, sockname, sizeof(addr.sun_path));
194 if (sys_connect(fd, (struct sockaddr *)(void *)&addr) == -1) {
195 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
198 return map_nt_error_from_unix(errno);
201 if (!(result = packet_init(mem_ctx, fd))) {
203 return NT_STATUS_NO_MEMORY;
211 * Do we have a complete ctdb packet in the queue?
214 static bool ctdb_req_complete(const uint8_t *buf, size_t available,
220 if (available < sizeof(msglen)) {
224 msglen = *((uint32 *)buf);
226 DEBUG(10, ("msglen = %d\n", msglen));
228 if (msglen < sizeof(struct ctdb_req_header)) {
229 DEBUG(0, ("Got invalid msglen: %d, expected at least %d for "
230 "the req_header\n", (int)msglen,
231 (int)sizeof(struct ctdb_req_header)));
232 cluster_fatal("ctdbd protocol error\n");
235 if (available < msglen) {
244 * State necessary to defer an incoming message while we are waiting for a
248 struct deferred_msg_state {
249 struct messaging_context *msg_ctx;
250 struct messaging_rec *rec;
254 * Timed event handler for the deferred message
257 static void deferred_message_dispatch(struct event_context *event_ctx,
258 struct timed_event *te,
262 struct deferred_msg_state *state = talloc_get_type_abort(
263 private_data, struct deferred_msg_state);
265 messaging_dispatch_rec(state->msg_ctx, state->rec);
270 struct req_pull_state {
276 * Pull a ctdb request out of the incoming packet queue
279 static NTSTATUS ctdb_req_pull(uint8_t *buf, size_t length,
282 struct req_pull_state *state = (struct req_pull_state *)private_data;
284 state->req.data = talloc_move(state->mem_ctx, &buf);
285 state->req.length = length;
290 * Fetch a messaging_rec from an incoming ctdb style message
293 static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx,
294 size_t overall_length,
295 struct ctdb_req_message *msg)
297 struct messaging_rec *result;
299 enum ndr_err_code ndr_err;
301 if ((overall_length < offsetof(struct ctdb_req_message, data))
303 < offsetof(struct ctdb_req_message, data) + msg->datalen)) {
305 cluster_fatal("got invalid msg length");
308 if (!(result = TALLOC_P(mem_ctx, struct messaging_rec))) {
309 DEBUG(0, ("talloc failed\n"));
313 blob = data_blob_const(msg->data, msg->datalen);
315 ndr_err = ndr_pull_struct_blob(
316 &blob, result, result,
317 (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
319 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
320 DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
321 ndr_errstr(ndr_err)));
326 if (DEBUGLEVEL >= 10) {
327 DEBUG(10, ("ctdb_pull_messaging_rec:\n"));
328 NDR_PRINT_DEBUG(messaging_rec, result);
334 static NTSTATUS ctdb_packet_fd_read_sync(struct packet_context *ctx)
336 int timeout = lp_ctdb_timeout();
341 return packet_fd_read_sync(ctx, timeout);
345 * Read a full ctdbd request. If we have a messaging context, defer incoming
346 * messages that might come in between.
349 static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32 reqid,
350 TALLOC_CTX *mem_ctx, void *result)
352 struct ctdb_req_header *hdr;
353 struct req_pull_state state;
358 status = ctdb_packet_fd_read_sync(conn->pkt);
360 if (NT_STATUS_EQUAL(status, NT_STATUS_NETWORK_BUSY)) {
363 } else if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) {
368 if (!NT_STATUS_IS_OK(status)) {
369 DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status)));
370 cluster_fatal("ctdbd died\n");
376 state.mem_ctx = mem_ctx;
378 if (!packet_handler(conn->pkt, ctdb_req_complete, ctdb_req_pull,
383 DEBUG(10, ("not enough data from ctdb socket, retrying\n"));
387 if (!NT_STATUS_IS_OK(status)) {
388 DEBUG(0, ("Could not read packet: %s\n", nt_errstr(status)));
389 cluster_fatal("ctdbd died\n");
392 hdr = (struct ctdb_req_header *)state.req.data;
394 DEBUG(10, ("Received ctdb packet\n"));
395 ctdb_packet_dump(hdr);
397 if (hdr->operation == CTDB_REQ_MESSAGE) {
398 struct timed_event *evt;
399 struct deferred_msg_state *msg_state;
400 struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
402 if (conn->msg_ctx == NULL) {
403 DEBUG(1, ("Got a message without having a msg ctx, "
404 "dropping msg %llu\n",
405 (long long unsigned)msg->srvid));
409 if ((conn->release_ip_handler != NULL)
410 && (msg->srvid == CTDB_SRVID_RELEASE_IP)) {
411 /* must be dispatched immediately */
412 DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n"));
413 conn->release_ip_handler((const char *)msg->data,
414 conn->release_ip_priv);
419 if ((msg->srvid == CTDB_SRVID_RECONFIGURE)
420 || (msg->srvid == CTDB_SRVID_SAMBA_NOTIFY)) {
422 DEBUG(1, ("ctdb_read_req: Got %s message\n",
423 (msg->srvid == CTDB_SRVID_RECONFIGURE)
424 ? "cluster reconfigure" : "SAMBA_NOTIFY"));
426 messaging_send(conn->msg_ctx,
427 messaging_server_id(conn->msg_ctx),
428 MSG_SMB_BRL_VALIDATE, &data_blob_null);
429 messaging_send(conn->msg_ctx,
430 messaging_server_id(conn->msg_ctx),
431 MSG_DBWRAP_G_LOCK_RETRY,
437 msg_state = TALLOC_P(NULL, struct deferred_msg_state);
438 if (msg_state == NULL) {
439 DEBUG(0, ("talloc failed\n"));
444 if (!(msg_state->rec = ctdb_pull_messaging_rec(
445 msg_state, state.req.length, msg))) {
446 DEBUG(0, ("ctdbd_pull_messaging_rec failed\n"));
447 TALLOC_FREE(msg_state);
454 msg_state->msg_ctx = conn->msg_ctx;
457 * We're waiting for a call reply, but an async message has
458 * crossed. Defer dispatching to the toplevel event loop.
460 evt = event_add_timed(conn->msg_ctx->event_ctx,
461 conn->msg_ctx->event_ctx,
463 deferred_message_dispatch,
466 DEBUG(0, ("event_add_timed failed\n"));
467 TALLOC_FREE(msg_state);
475 if (hdr->reqid != reqid) {
476 /* we got the wrong reply */
477 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
478 "been %u\n", hdr->reqid, reqid));
483 *((void **)result) = talloc_move(mem_ctx, &hdr);
489 * Get us a ctdbd connection
492 static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
493 struct ctdbd_connection **pconn)
495 struct ctdbd_connection *conn;
498 if (!(conn = TALLOC_ZERO_P(mem_ctx, struct ctdbd_connection))) {
499 DEBUG(0, ("talloc failed\n"));
500 return NT_STATUS_NO_MEMORY;
503 status = ctdbd_connect(conn, &conn->pkt);
505 if (!NT_STATUS_IS_OK(status)) {
506 DEBUG(10, ("ctdbd_connect failed: %s\n", nt_errstr(status)));
510 status = get_cluster_vnn(conn, &conn->our_vnn);
512 if (!NT_STATUS_IS_OK(status)) {
513 DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status)));
517 if (!ctdbd_working(conn, conn->our_vnn)) {
518 DEBUG(2, ("Node is not working, can not connect\n"));
519 status = NT_STATUS_INTERNAL_DB_ERROR;
523 generate_random_buffer((unsigned char *)&conn->rand_srvid,
524 sizeof(conn->rand_srvid));
526 status = register_with_ctdbd(conn, conn->rand_srvid);
528 if (!NT_STATUS_IS_OK(status)) {
529 DEBUG(5, ("Could not register random srvid: %s\n",
543 * Get us a ctdbd connection and register us as a process
546 NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
547 struct ctdbd_connection **pconn)
549 struct ctdbd_connection *conn;
552 status = ctdbd_init_connection(mem_ctx, &conn);
554 if (!NT_STATUS_IS_OK(status)) {
558 status = register_with_ctdbd(conn, (uint64_t)sys_getpid());
559 if (!NT_STATUS_IS_OK(status)) {
563 status = register_with_ctdbd(conn, MSG_SRVID_SAMBA);
564 if (!NT_STATUS_IS_OK(status)) {
568 status = register_with_ctdbd(conn, CTDB_SRVID_SAMBA_NOTIFY);
569 if (!NT_STATUS_IS_OK(status)) {
581 struct messaging_context *ctdb_conn_msg_ctx(struct ctdbd_connection *conn)
583 return conn->msg_ctx;
586 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
588 return packet_get_fd(conn->pkt);
592 * Packet handler to receive and handle a ctdb message
594 static NTSTATUS ctdb_handle_message(uint8_t *buf, size_t length,
597 struct ctdbd_connection *conn = talloc_get_type_abort(
598 private_data, struct ctdbd_connection);
599 struct ctdb_req_message *msg;
600 struct messaging_rec *msg_rec;
602 msg = (struct ctdb_req_message *)buf;
604 if (msg->hdr.operation != CTDB_REQ_MESSAGE) {
605 DEBUG(0, ("Received async msg of type %u, discarding\n",
606 msg->hdr.operation));
608 return NT_STATUS_INVALID_PARAMETER;
611 if ((conn->release_ip_handler != NULL)
612 && (msg->srvid == CTDB_SRVID_RELEASE_IP)) {
613 /* must be dispatched immediately */
614 DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n"));
615 conn->release_ip_handler((const char *)msg->data,
616 conn->release_ip_priv);
621 SMB_ASSERT(conn->msg_ctx != NULL);
623 if ((msg->srvid == CTDB_SRVID_RECONFIGURE)
624 || (msg->srvid == CTDB_SRVID_SAMBA_NOTIFY)){
625 DEBUG(0,("Got cluster reconfigure message\n"));
627 * when the cluster is reconfigured or someone of the
628 * family has passed away (SAMBA_NOTIFY), we need to
629 * clean the brl database
631 messaging_send(conn->msg_ctx,
632 messaging_server_id(conn->msg_ctx),
633 MSG_SMB_BRL_VALIDATE, &data_blob_null);
635 messaging_send(conn->msg_ctx,
636 messaging_server_id(conn->msg_ctx),
637 MSG_DBWRAP_G_LOCK_RETRY,
644 /* only messages to our pid or the broadcast are valid here */
645 if (msg->srvid != sys_getpid() && msg->srvid != MSG_SRVID_SAMBA) {
646 DEBUG(0,("Got unexpected message with srvid=%llu\n",
647 (unsigned long long)msg->srvid));
652 if (!(msg_rec = ctdb_pull_messaging_rec(NULL, length, msg))) {
653 DEBUG(10, ("ctdb_pull_messaging_rec failed\n"));
655 return NT_STATUS_NO_MEMORY;
658 messaging_dispatch_rec(conn->msg_ctx, msg_rec);
660 TALLOC_FREE(msg_rec);
666 * The ctdbd socket is readable asynchronuously
669 static void ctdbd_socket_handler(struct event_context *event_ctx,
670 struct fd_event *event,
674 struct ctdbd_connection *conn = talloc_get_type_abort(
675 private_data, struct ctdbd_connection);
679 status = packet_fd_read(conn->pkt);
681 if (!NT_STATUS_IS_OK(status)) {
682 DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status)));
683 cluster_fatal("ctdbd died\n");
686 while (packet_handler(conn->pkt, ctdb_req_complete,
687 ctdb_handle_message, conn, &status)) {
688 if (!NT_STATUS_IS_OK(status)) {
689 DEBUG(10, ("could not handle incoming message: %s\n",
696 * Prepare a ctdbd connection to receive messages
699 NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn,
700 struct messaging_context *msg_ctx)
702 SMB_ASSERT(conn->msg_ctx == NULL);
703 SMB_ASSERT(conn->fde == NULL);
705 if (!(conn->fde = event_add_fd(msg_ctx->event_ctx, conn,
706 packet_get_fd(conn->pkt),
708 ctdbd_socket_handler,
710 DEBUG(0, ("event_add_fd failed\n"));
711 return NT_STATUS_NO_MEMORY;
714 conn->msg_ctx = msg_ctx;
720 * Send a messaging message across a ctdbd
723 NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn,
724 uint32 dst_vnn, uint64 dst_srvid,
725 struct messaging_rec *msg)
727 struct ctdb_req_message r;
731 enum ndr_err_code ndr_err;
733 if (!(mem_ctx = talloc_init("ctdbd_messaging_send"))) {
734 DEBUG(0, ("talloc failed\n"));
735 return NT_STATUS_NO_MEMORY;
738 ndr_err = ndr_push_struct_blob(
740 (ndr_push_flags_fn_t)ndr_push_messaging_rec);
742 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
743 DEBUG(0, ("ndr_push_struct_blob failed: %s\n",
744 ndr_errstr(ndr_err)));
745 status = ndr_map_error2ntstatus(ndr_err);
749 r.hdr.length = offsetof(struct ctdb_req_message, data) + blob.length;
750 r.hdr.ctdb_magic = CTDB_MAGIC;
751 r.hdr.ctdb_version = CTDB_VERSION;
752 r.hdr.generation = 1;
753 r.hdr.operation = CTDB_REQ_MESSAGE;
754 r.hdr.destnode = dst_vnn;
755 r.hdr.srcnode = conn->our_vnn;
758 r.datalen = blob.length;
760 DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
761 ctdb_packet_dump(&r.hdr);
763 status = packet_send(
765 data_blob_const(&r, offsetof(struct ctdb_req_message, data)),
768 if (!NT_STATUS_IS_OK(status)) {
769 DEBUG(0, ("packet_send failed: %s\n", nt_errstr(status)));
773 status = packet_flush(conn->pkt);
775 if (!NT_STATUS_IS_OK(status)) {
776 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
777 cluster_fatal("cluster dispatch daemon msg write error\n");
780 status = NT_STATUS_OK;
782 TALLOC_FREE(mem_ctx);
787 * send/recv a generic ctdb control message
789 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
790 uint32_t vnn, uint32 opcode,
791 uint64_t srvid, uint32_t flags,
793 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
796 struct ctdb_req_control req;
797 struct ctdb_reply_control *reply = NULL;
798 struct ctdbd_connection *new_conn = NULL;
801 /* the samba3 ctdb code can't handle NOREPLY yet */
802 flags &= ~CTDB_CTRL_FLAG_NOREPLY;
805 status = ctdbd_init_connection(NULL, &new_conn);
807 if (!NT_STATUS_IS_OK(status)) {
808 DEBUG(10, ("Could not init temp connection: %s\n",
817 req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize;
818 req.hdr.ctdb_magic = CTDB_MAGIC;
819 req.hdr.ctdb_version = CTDB_VERSION;
820 req.hdr.operation = CTDB_REQ_CONTROL;
821 req.hdr.reqid = ++conn->reqid;
822 req.hdr.destnode = vnn;
825 req.datalen = data.dsize;
827 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
828 ctdb_packet_dump(&req.hdr);
830 status = packet_send(
832 data_blob_const(&req, offsetof(struct ctdb_req_control, data)),
833 data_blob_const(data.dptr, data.dsize));
835 if (!NT_STATUS_IS_OK(status)) {
836 DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status)));
840 status = packet_flush(conn->pkt);
842 if (!NT_STATUS_IS_OK(status)) {
843 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
844 cluster_fatal("cluster dispatch daemon control write error\n");
847 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
848 TALLOC_FREE(new_conn);
852 status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
854 if (!NT_STATUS_IS_OK(status)) {
855 DEBUG(10, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
859 if (reply->hdr.operation != CTDB_REPLY_CONTROL) {
860 DEBUG(0, ("received invalid reply\n"));
865 if (!(outdata->dptr = (uint8 *)talloc_memdup(
866 mem_ctx, reply->data, reply->datalen))) {
868 return NT_STATUS_NO_MEMORY;
870 outdata->dsize = reply->datalen;
873 (*cstatus) = reply->status;
876 status = NT_STATUS_OK;
879 TALLOC_FREE(new_conn);
885 * see if a remote process exists
887 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32 vnn, pid_t pid)
893 data.dptr = (uint8_t*)&pid;
894 data.dsize = sizeof(pid);
896 status = ctdbd_control(conn, vnn, CTDB_CONTROL_PROCESS_EXISTS, 0, 0,
897 data, NULL, NULL, &cstatus);
898 if (!NT_STATUS_IS_OK(status)) {
899 DEBUG(0, (__location__ " ctdb_control for process_exists "
910 char *ctdbd_dbpath(struct ctdbd_connection *conn,
911 TALLOC_CTX *mem_ctx, uint32_t db_id)
917 data.dptr = (uint8_t*)&db_id;
918 data.dsize = sizeof(db_id);
920 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
921 CTDB_CONTROL_GETDBPATH, 0, 0, data,
922 mem_ctx, &data, &cstatus);
923 if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
924 DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n"));
928 return (char *)data.dptr;
932 * attach to a ctdb database
934 NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn,
935 const char *name, uint32_t *db_id, int tdb_flags)
940 bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
942 data.dptr = (uint8_t*)name;
943 data.dsize = strlen(name)+1;
945 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
947 ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
948 : CTDB_CONTROL_DB_ATTACH,
949 tdb_flags, 0, data, NULL, &data, &cstatus);
950 if (!NT_STATUS_IS_OK(status)) {
951 DEBUG(0, (__location__ " ctdb_control for db_attach "
952 "failed: %s\n", nt_errstr(status)));
956 if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
957 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
958 return NT_STATUS_INTERNAL_ERROR;
961 *db_id = *(uint32_t *)data.dptr;
962 talloc_free(data.dptr);
964 if (!(tdb_flags & TDB_SEQNUM)) {
968 data.dptr = (uint8_t *)db_id;
969 data.dsize = sizeof(*db_id);
971 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
972 CTDB_CONTROL_ENABLE_SEQNUM, 0, 0, data,
973 NULL, NULL, &cstatus);
974 if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
975 DEBUG(0,(__location__ " ctdb_control for enable seqnum "
977 return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR :
985 * force the migration of a record to this node
987 NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id,
990 struct ctdb_req_call req;
991 struct ctdb_reply_call *reply;
996 req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
997 req.hdr.ctdb_magic = CTDB_MAGIC;
998 req.hdr.ctdb_version = CTDB_VERSION;
999 req.hdr.operation = CTDB_REQ_CALL;
1000 req.hdr.reqid = ++conn->reqid;
1001 req.flags = CTDB_IMMEDIATE_MIGRATION;
1002 req.callid = CTDB_NULL_FUNC;
1004 req.keylen = key.dsize;
1006 DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
1007 ctdb_packet_dump(&req.hdr);
1009 status = packet_send(
1011 data_blob_const(&req, offsetof(struct ctdb_req_call, data)),
1012 data_blob_const(key.dptr, key.dsize));
1014 if (!NT_STATUS_IS_OK(status)) {
1015 DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status)));
1019 status = packet_flush(conn->pkt);
1021 if (!NT_STATUS_IS_OK(status)) {
1022 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
1023 cluster_fatal("cluster dispatch daemon control write error\n");
1026 status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
1028 if (!NT_STATUS_IS_OK(status)) {
1029 DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
1033 if (reply->hdr.operation != CTDB_REPLY_CALL) {
1034 DEBUG(0, ("received invalid reply\n"));
1035 status = NT_STATUS_INTERNAL_ERROR;
1039 status = NT_STATUS_OK;
1047 * remotely fetch a record without locking it or forcing a migration
1049 NTSTATUS ctdbd_fetch(struct ctdbd_connection *conn, uint32 db_id,
1050 TDB_DATA key, TALLOC_CTX *mem_ctx, TDB_DATA *data)
1052 struct ctdb_req_call req;
1053 struct ctdb_reply_call *reply;
1058 req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1059 req.hdr.ctdb_magic = CTDB_MAGIC;
1060 req.hdr.ctdb_version = CTDB_VERSION;
1061 req.hdr.operation = CTDB_REQ_CALL;
1062 req.hdr.reqid = ++conn->reqid;
1064 req.callid = CTDB_FETCH_FUNC;
1066 req.keylen = key.dsize;
1068 status = packet_send(
1070 data_blob_const(&req, offsetof(struct ctdb_req_call, data)),
1071 data_blob_const(key.dptr, key.dsize));
1073 if (!NT_STATUS_IS_OK(status)) {
1074 DEBUG(3, ("packet_send failed: %s\n", nt_errstr(status)));
1078 status = packet_flush(conn->pkt);
1080 if (!NT_STATUS_IS_OK(status)) {
1081 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
1082 cluster_fatal("cluster dispatch daemon control write error\n");
1085 status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
1087 if (!NT_STATUS_IS_OK(status)) {
1088 DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
1092 if (reply->hdr.operation != CTDB_REPLY_CALL) {
1093 DEBUG(0, ("received invalid reply\n"));
1094 status = NT_STATUS_INTERNAL_ERROR;
1098 data->dsize = reply->datalen;
1099 if (data->dsize == 0) {
1104 data->dptr = (uint8 *)talloc_memdup(mem_ctx, &reply->data[0],
1106 if (data->dptr == NULL) {
1107 DEBUG(0, ("talloc failed\n"));
1108 status = NT_STATUS_NO_MEMORY;
1113 status = NT_STATUS_OK;
1119 struct ctdbd_traverse_state {
1120 void (*fn)(TDB_DATA key, TDB_DATA data, void *private_data);
1125 * Handle a traverse record coming in on the ctdbd connection
1128 static NTSTATUS ctdb_traverse_handler(uint8_t *buf, size_t length,
1131 struct ctdbd_traverse_state *state =
1132 (struct ctdbd_traverse_state *)private_data;
1134 struct ctdb_req_message *m;
1135 struct ctdb_rec_data *d;
1138 m = (struct ctdb_req_message *)buf;
1140 if (length < sizeof(*m) || m->hdr.length != length) {
1141 DEBUG(0, ("Got invalid message of length %d\n", (int)length));
1143 return NT_STATUS_UNEXPECTED_IO_ERROR;
1146 d = (struct ctdb_rec_data *)&m->data[0];
1147 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1148 DEBUG(0, ("Got invalid traverse data of length %d\n",
1151 return NT_STATUS_UNEXPECTED_IO_ERROR;
1154 key.dsize = d->keylen;
1155 key.dptr = &d->data[0];
1156 data.dsize = d->datalen;
1157 data.dptr = &d->data[d->keylen];
1159 if (key.dsize == 0 && data.dsize == 0) {
1160 /* end of traverse */
1161 return NT_STATUS_END_OF_FILE;
1164 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1165 DEBUG(0, ("Got invalid ltdb header length %d\n",
1168 return NT_STATUS_UNEXPECTED_IO_ERROR;
1170 data.dsize -= sizeof(struct ctdb_ltdb_header);
1171 data.dptr += sizeof(struct ctdb_ltdb_header);
1174 state->fn(key, data, state->private_data);
1178 return NT_STATUS_OK;
1182 Traverse a ctdb database. This uses a kind-of hackish way to open a second
1183 connection to ctdbd to avoid the hairy recursive and async problems with
1187 NTSTATUS ctdbd_traverse(uint32 db_id,
1188 void (*fn)(TDB_DATA key, TDB_DATA data,
1189 void *private_data),
1192 struct ctdbd_connection *conn;
1196 struct ctdb_traverse_start t;
1198 struct ctdbd_traverse_state state;
1200 status = ctdbd_init_connection(NULL, &conn);
1201 if (!NT_STATUS_IS_OK(status)) {
1202 DEBUG(0, ("ctdbd_init_connection failed: %s\n",
1203 nt_errstr(status)));
1208 t.srvid = conn->rand_srvid;
1209 t.reqid = ++conn->reqid;
1211 data.dptr = (uint8_t *)&t;
1212 data.dsize = sizeof(t);
1214 status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1215 CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, 0,
1216 data, NULL, NULL, &cstatus);
1218 if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1220 DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status),
1223 if (NT_STATUS_IS_OK(status)) {
1225 * We need a mapping here
1227 status = NT_STATUS_UNSUCCESSFUL;
1233 state.private_data = private_data;
1237 status = NT_STATUS_OK;
1239 if (packet_handler(conn->pkt, ctdb_req_complete,
1240 ctdb_traverse_handler, &state, &status)) {
1242 if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
1243 status = NT_STATUS_OK;
1248 * There might be more in the queue
1253 if (!NT_STATUS_IS_OK(status)) {
1257 status = ctdb_packet_fd_read_sync(conn->pkt);
1259 if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) {
1261 * There might be more in the queue
1266 if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
1267 status = NT_STATUS_OK;
1271 if (!NT_STATUS_IS_OK(status)) {
1272 DEBUG(0, ("packet_fd_read_sync failed: %s\n", nt_errstr(status)));
1273 cluster_fatal("ctdbd died\n");
1283 This is used to canonicalize a ctdb_sock_addr structure.
1285 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1286 struct sockaddr_storage *out)
1288 memcpy(out, in, sizeof (*out));
1291 if (in->ss_family == AF_INET6) {
1292 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1293 const struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)in;
1294 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1295 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1296 memset(out, 0, sizeof(*out));
1297 #ifdef HAVE_SOCK_SIN_LEN
1298 out4->sin_len = sizeof(*out);
1300 out4->sin_family = AF_INET;
1301 out4->sin_port = in6->sin6_port;
1302 memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr32[3], 4);
1309 * Register us as a server for a particular tcp connection
1312 NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn,
1313 const struct sockaddr_storage *_server,
1314 const struct sockaddr_storage *_client,
1315 void (*release_ip_handler)(const char *ip_addr,
1316 void *private_data),
1320 * we still use ctdb_control_tcp for ipv4
1321 * because we want to work against older ctdb
1322 * versions at runtime
1324 struct ctdb_control_tcp p4;
1325 #ifdef HAVE_STRUCT_CTDB_CONTROL_TCP_ADDR
1326 struct ctdb_control_tcp_addr p;
1330 struct sockaddr_storage client;
1331 struct sockaddr_storage server;
1334 * Only one connection so far
1336 SMB_ASSERT(conn->release_ip_handler == NULL);
1338 smbd_ctdb_canonicalize_ip(_client, &client);
1339 smbd_ctdb_canonicalize_ip(_server, &server);
1341 switch (client.ss_family) {
1343 p4.dest = *(struct sockaddr_in *)(void *)&server;
1344 p4.src = *(struct sockaddr_in *)(void *)&client;
1345 data.dptr = (uint8_t *)&p4;
1346 data.dsize = sizeof(p4);
1348 #ifdef HAVE_STRUCT_CTDB_CONTROL_TCP_ADDR
1350 p.dest.ip6 = *(struct sockaddr_in6 *)(void *)&server;
1351 p.src.ip6 = *(struct sockaddr_in6 *)(void *)&client;
1352 data.dptr = (uint8_t *)&p;
1353 data.dsize = sizeof(p);
1357 return NT_STATUS_INTERNAL_ERROR;
1360 conn->release_ip_handler = release_ip_handler;
1363 * We want to be told about IP releases
1366 status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP);
1367 if (!NT_STATUS_IS_OK(status)) {
1372 * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1373 * can send an extra ack to trigger a reset for our client, so it
1374 * immediately reconnects
1376 return ctdbd_control(conn, CTDB_CURRENT_NODE,
1377 CTDB_CONTROL_TCP_CLIENT, 0,
1378 CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL);
1382 * We want to handle reconfigure events
1384 NTSTATUS ctdbd_register_reconfigure(struct ctdbd_connection *conn)
1386 return register_with_ctdbd(conn, CTDB_SRVID_RECONFIGURE);
1390 call a control on the local node
1392 NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32 opcode,
1393 uint64_t srvid, uint32_t flags, TDB_DATA data,
1394 TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1397 return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data, mem_ctx, outdata, cstatus);
1400 NTSTATUS ctdb_watch_us(struct ctdbd_connection *conn)
1402 struct ctdb_client_notify_register reg_data;
1407 reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1409 reg_data.notify_data[0] = 0;
1411 struct_len = offsetof(struct ctdb_client_notify_register,
1412 notify_data) + reg_data.len;
1414 status = ctdbd_control_local(
1415 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1416 make_tdb_data((uint8_t *)®_data, struct_len),
1417 NULL, NULL, &cstatus);
1418 if (!NT_STATUS_IS_OK(status)) {
1419 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1420 nt_errstr(status)));
1425 NTSTATUS ctdb_unwatch(struct ctdbd_connection *conn)
1427 struct ctdb_client_notify_deregister dereg_data;
1431 dereg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1433 status = ctdbd_control_local(
1434 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1435 make_tdb_data((uint8_t *)&dereg_data, sizeof(dereg_data)),
1436 NULL, NULL, &cstatus);
1437 if (!NT_STATUS_IS_OK(status)) {
1438 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1439 nt_errstr(status)));