2 * Unix SMB/CIFS implementation.
4 * Copyright (C) Volker Lendecke 2014
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "librpc/gen_ndr/notify.h"
22 #include "librpc/gen_ndr/messaging.h"
23 #include "librpc/gen_ndr/server_id.h"
24 #include "lib/dbwrap/dbwrap.h"
25 #include "lib/dbwrap/dbwrap_rbt.h"
31 #include "lib/util/server_id_db.h"
32 #include "lib/util/tevent_unix.h"
33 #include "ctdbd_conn.h"
34 #include "ctdb_srvids.h"
35 #include "source3/smbd/proto.h"
36 #include "ctdb/include/ctdb_protocol.h"
37 #include "server_id_db_util.h"
38 #include "lib/util/iov_buf.h"
39 #include "messages_util.h"
44 * All of notifyd's state
47 struct notifyd_state {
48 struct tevent_context *ev;
49 struct messaging_context *msg_ctx;
50 struct ctdbd_connection *ctdbd_conn;
53 * Database of everything clients show interest in. Indexed by
54 * absolute path. The database keys are not 0-terminated
55 * because the criticial operation, notifyd_trigger, can walk
56 * the structure from the top without adding intermediate 0s.
57 * The database records contain an array of
59 * struct notifyd_instance
61 * to be maintained by parsed by notifyd_entry_parse()
63 struct db_context *entries;
66 * In the cluster case, this is the place where we store a log
67 * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
68 * forward them to our peer notifyd's in the cluster once a
69 * second or when the log grows too large.
72 struct messaging_reclog *log;
75 * Array of companion notifyd's in a cluster. Every notifyd
76 * broadcasts its messaging_reclog to every other notifyd in
77 * the cluster. This is done by making ctdb send a message to
78 * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
79 * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
80 * had called register_with_ctdbd this srvid will receive the
83 * Database replication happens via these broadcasts. Also,
84 * they serve as liveness indication. If a notifyd receives a
85 * broadcast from an unknown peer, it will create one for this
86 * srvid. Also when we don't hear anything from a peer for a
87 * while, we will discard it.
90 struct notifyd_peer **peers;
93 sys_notify_watch_fn sys_notify_watch;
94 struct sys_notify_context *sys_notify_ctx;
98 * notifyd's representation of a notify instance
100 struct notifyd_instance {
101 struct server_id client;
102 struct notify_instance instance;
104 void *sys_watch; /* inotify/fam/etc handle */
107 * Filters after sys_watch took responsibility of some bits
109 uint32_t internal_filter;
110 uint32_t internal_subdir_filter;
113 struct notifyd_peer {
114 struct notifyd_state *state;
115 struct server_id pid;
117 struct db_context *db;
118 time_t last_broadcast;
121 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
122 struct messaging_rec **prec,
124 static bool notifyd_trigger(struct messaging_context *msg_ctx,
125 struct messaging_rec **prec,
127 static bool notifyd_get_db(struct messaging_context *msg_ctx,
128 struct messaging_rec **prec,
130 static bool notifyd_got_db(struct messaging_context *msg_ctx,
131 struct messaging_rec **prec,
133 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
134 struct server_id src,
135 struct messaging_reclog *log);
136 static void notifyd_sys_callback(struct sys_notify_context *ctx,
137 void *private_data, struct notify_event *ev);
139 static struct tevent_req *notifyd_broadcast_reclog_send(
140 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
141 struct ctdbd_connection *ctdbd_conn, struct server_id src,
142 struct messaging_reclog *log);
143 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
145 static struct tevent_req *notifyd_clean_peers_send(
146 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
147 struct notifyd_state *notifyd);
148 static int notifyd_clean_peers_recv(struct tevent_req *req);
150 static int sys_notify_watch_dummy(
152 struct sys_notify_context *ctx,
155 uint32_t *subdir_filter,
156 void (*callback)(struct sys_notify_context *ctx,
158 struct notify_event *ev),
162 void **handle = handle_p;
167 static void notifyd_handler_done(struct tevent_req *subreq);
168 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
169 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
170 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
172 const uint8_t *msg, size_t msglen,
175 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
176 struct messaging_context *msg_ctx,
177 struct ctdbd_connection *ctdbd_conn,
178 sys_notify_watch_fn sys_notify_watch,
179 struct sys_notify_context *sys_notify_ctx)
181 struct tevent_req *req, *subreq;
182 struct notifyd_state *state;
183 struct server_id_db *names_db;
186 req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
191 state->msg_ctx = msg_ctx;
192 state->ctdbd_conn = ctdbd_conn;
194 if (sys_notify_watch == NULL) {
195 sys_notify_watch = sys_notify_watch_dummy;
198 state->sys_notify_watch = sys_notify_watch;
199 state->sys_notify_ctx = sys_notify_ctx;
201 state->entries = db_open_rbt(state);
202 if (tevent_req_nomem(state->entries, req)) {
203 return tevent_req_post(req, ev);
206 subreq = messaging_handler_send(state, ev, msg_ctx,
207 MSG_SMB_NOTIFY_REC_CHANGE,
208 notifyd_rec_change, state);
209 if (tevent_req_nomem(subreq, req)) {
210 return tevent_req_post(req, ev);
212 tevent_req_set_callback(subreq, notifyd_handler_done, req);
214 subreq = messaging_handler_send(state, ev, msg_ctx,
215 MSG_SMB_NOTIFY_TRIGGER,
216 notifyd_trigger, state);
217 if (tevent_req_nomem(subreq, req)) {
218 return tevent_req_post(req, ev);
220 tevent_req_set_callback(subreq, notifyd_handler_done, req);
222 subreq = messaging_handler_send(state, ev, msg_ctx,
223 MSG_SMB_NOTIFY_GET_DB,
224 notifyd_get_db, state);
225 if (tevent_req_nomem(subreq, req)) {
226 return tevent_req_post(req, ev);
228 tevent_req_set_callback(subreq, notifyd_handler_done, req);
230 subreq = messaging_handler_send(state, ev, msg_ctx,
232 notifyd_got_db, state);
233 if (tevent_req_nomem(subreq, req)) {
234 return tevent_req_post(req, ev);
236 tevent_req_set_callback(subreq, notifyd_handler_done, req);
238 names_db = messaging_names_db(msg_ctx);
240 ret = server_id_db_set_exclusive(names_db, "notify-daemon");
242 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
243 __func__, strerror(ret)));
244 tevent_req_error(req, ret);
245 return tevent_req_post(req, ev);
248 if (ctdbd_conn == NULL) {
250 * No cluster around, skip the database replication
256 state->log = talloc_zero(state, struct messaging_reclog);
257 if (tevent_req_nomem(state->log, req)) {
258 return tevent_req_post(req, ev);
261 subreq = notifyd_broadcast_reclog_send(
262 state->log, ev, ctdbd_conn, messaging_server_id(msg_ctx),
264 if (tevent_req_nomem(subreq, req)) {
265 return tevent_req_post(req, ev);
267 tevent_req_set_callback(subreq, notifyd_broadcast_reclog_finished,
270 subreq = notifyd_clean_peers_send(state, ev, state);
271 if (tevent_req_nomem(subreq, req)) {
272 return tevent_req_post(req, ev);
274 tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
277 ret = register_with_ctdbd(ctdbd_conn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
278 notifyd_snoop_broadcast, state);
280 tevent_req_error(req, ret);
281 return tevent_req_post(req, ev);
287 static void notifyd_handler_done(struct tevent_req *subreq)
289 struct tevent_req *req = tevent_req_callback_data(
290 subreq, struct tevent_req);
293 ret = messaging_handler_recv(subreq);
295 tevent_req_error(req, ret);
298 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
300 struct tevent_req *req = tevent_req_callback_data(
301 subreq, struct tevent_req);
304 ret = notifyd_broadcast_reclog_recv(subreq);
306 tevent_req_error(req, ret);
309 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
311 struct tevent_req *req = tevent_req_callback_data(
312 subreq, struct tevent_req);
315 ret = notifyd_clean_peers_recv(subreq);
317 tevent_req_error(req, ret);
320 int notifyd_recv(struct tevent_req *req)
322 return tevent_req_simple_recv_unix(req);
326 * Parse an entry in the notifyd_context->entries database
329 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
330 struct notifyd_instance **instances,
331 size_t *num_instances)
333 if ((buflen % sizeof(struct notifyd_instance)) != 0) {
334 DEBUG(1, ("%s: invalid buffer size: %u\n",
335 __func__, (unsigned)buflen));
339 if (instances != NULL) {
340 *instances = (struct notifyd_instance *)buf;
342 if (num_instances != NULL) {
343 *num_instances = buflen / sizeof(struct notifyd_instance);
348 static bool notifyd_apply_rec_change(
349 const struct server_id *client,
350 const char *path, size_t pathlen,
351 const struct notify_instance *chg,
352 struct db_context *entries,
353 sys_notify_watch_fn sys_notify_watch,
354 struct sys_notify_context *sys_notify_ctx,
355 struct messaging_context *msg_ctx)
357 struct db_record *rec;
358 struct notifyd_instance *instances;
359 size_t num_instances;
361 struct notifyd_instance *instance;
367 DEBUG(1, ("%s: pathlen==0\n", __func__));
370 if (path[pathlen-1] != '\0') {
371 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
375 DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
376 "private_data=%p\n", __func__, path,
377 (unsigned)chg->filter, (unsigned)chg->subdir_filter,
380 rec = dbwrap_fetch_locked(
382 make_tdb_data((const uint8_t *)path, pathlen-1));
385 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
390 value = dbwrap_record_get_value(rec);
392 if (value.dsize != 0) {
393 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
400 * Overallocate by one instance to avoid a realloc when adding
402 instances = talloc_array(rec, struct notifyd_instance,
404 if (instances == NULL) {
405 DEBUG(1, ("%s: talloc failed\n", __func__));
409 if (value.dsize != 0) {
410 memcpy(instances, value.dptr, value.dsize);
413 for (i=0; i<num_instances; i++) {
414 instance = &instances[i];
416 if (server_id_equal(&instance->client, client) &&
417 (instance->instance.private_data == chg->private_data)) {
422 if (i < num_instances) {
423 instance->instance = *chg;
426 * We've overallocated for one instance
428 instance = &instances[num_instances];
430 *instance = (struct notifyd_instance) {
433 .internal_filter = chg->filter,
434 .internal_subdir_filter = chg->subdir_filter
440 if ((instance->instance.filter != 0) ||
441 (instance->instance.subdir_filter != 0)) {
444 TALLOC_FREE(instance->sys_watch);
446 ret = sys_notify_watch(entries, sys_notify_ctx, path,
447 &instance->internal_filter,
448 &instance->internal_subdir_filter,
449 notifyd_sys_callback, msg_ctx,
450 &instance->sys_watch);
452 DEBUG(1, ("%s: inotify_watch returned %s\n",
453 __func__, strerror(errno)));
457 if ((instance->instance.filter == 0) &&
458 (instance->instance.subdir_filter == 0)) {
459 /* This is a delete request */
460 TALLOC_FREE(instance->sys_watch);
461 *instance = instances[num_instances-1];
465 DEBUG(10, ("%s: %s has %u instances\n", __func__,
466 path, (unsigned)num_instances));
468 if (num_instances == 0) {
469 status = dbwrap_record_delete(rec);
470 if (!NT_STATUS_IS_OK(status)) {
471 DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
472 __func__, nt_errstr(status)));
476 value = make_tdb_data(
477 (uint8_t *)instances,
478 sizeof(struct notifyd_instance) * num_instances);
480 status = dbwrap_record_store(rec, value, 0);
481 if (!NT_STATUS_IS_OK(status)) {
482 DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
483 __func__, nt_errstr(status)));
494 static void notifyd_sys_callback(struct sys_notify_context *ctx,
495 void *private_data, struct notify_event *ev)
497 struct messaging_context *msg_ctx = talloc_get_type_abort(
498 private_data, struct messaging_context);
499 struct notify_trigger_msg msg;
503 msg = (struct notify_trigger_msg) {
504 .when = timespec_current(),
505 .action = ev->action,
509 iov[0].iov_base = &msg;
510 iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
511 iov[1].iov_base = discard_const_p(char, ev->dir);
512 iov[1].iov_len = strlen(ev->dir);
513 iov[2].iov_base = &slash;
515 iov[3].iov_base = discard_const_p(char, ev->path);
516 iov[3].iov_len = strlen(ev->path)+1;
519 msg_ctx, messaging_server_id(msg_ctx),
520 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
523 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
524 struct notify_rec_change_msg **pmsg,
527 struct notify_rec_change_msg *msg;
529 if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
530 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
535 *pmsg = msg = (struct notify_rec_change_msg *)buf;
536 *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
538 DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
539 "private_data=%p, path=%.*s\n",
540 __func__, (unsigned)msg->instance.filter,
541 (unsigned)msg->instance.subdir_filter,
542 msg->instance.private_data, (int)(*pathlen), msg->path));
547 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
548 struct messaging_rec **prec,
551 struct notifyd_state *state = talloc_get_type_abort(
552 private_data, struct notifyd_state);
553 struct server_id_buf idbuf;
554 struct messaging_rec *rec = *prec;
555 struct messaging_rec **tmp;
556 struct messaging_reclog *log;
557 struct notify_rec_change_msg *msg;
561 DEBUG(10, ("%s: Got %d bytes from %s\n", __func__,
562 (unsigned)rec->buf.length,
563 server_id_str_buf(rec->src, &idbuf)));
565 ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length,
571 ok = notifyd_apply_rec_change(
572 &rec->src, msg->path, pathlen, &msg->instance,
573 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
576 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
581 if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
586 tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
589 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
594 log->recs[log->num_recs] = talloc_move(log->recs, prec);
597 if (log->num_recs >= 100) {
599 * Don't let the log grow too large
601 notifyd_broadcast_reclog(state->ctdbd_conn,
602 messaging_server_id(msg_ctx), log);
608 struct notifyd_trigger_state {
609 struct messaging_context *msg_ctx;
610 struct notify_trigger_msg *msg;
612 bool covered_by_sys_notify;
615 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
618 static bool notifyd_trigger(struct messaging_context *msg_ctx,
619 struct messaging_rec **prec,
622 struct notifyd_state *state = talloc_get_type_abort(
623 private_data, struct notifyd_state);
624 struct server_id my_id = messaging_server_id(msg_ctx);
625 struct messaging_rec *rec = *prec;
626 struct notifyd_trigger_state tstate;
628 const char *p, *next_p;
630 if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
631 DEBUG(1, ("message too short, ignoring: %u\n",
632 (unsigned)rec->buf.length));
635 if (rec->buf.data[rec->buf.length-1] != 0) {
636 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
640 tstate.msg_ctx = msg_ctx;
642 tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
643 tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
645 tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
646 path = tstate.msg->path;
648 DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
649 __func__, (unsigned)tstate.msg->action,
650 (unsigned)tstate.msg->filter, path));
652 if (path[0] != '/') {
653 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
658 for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
659 ptrdiff_t path_len = p - path;
663 next_p = strchr(p+1, '/');
664 tstate.recursive = (next_p != NULL);
666 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
667 (int)path_len, path));
669 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
672 dbwrap_parse_record(state->entries, key,
673 notifyd_trigger_parser, &tstate);
675 if (state->peers == NULL) {
679 if (rec->src.vnn != my_id.vnn) {
683 for (i=0; i<state->num_peers; i++) {
684 if (state->peers[i]->db == NULL) {
686 * Inactive peer, did not get a db yet
690 dbwrap_parse_record(state->peers[i]->db, key,
691 notifyd_trigger_parser, &tstate);
698 static void notifyd_send_delete(struct messaging_context *msg_ctx,
700 struct notifyd_instance *instance);
702 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
706 struct notifyd_trigger_state *tstate = private_data;
707 struct notify_event_msg msg = { .action = tstate->msg->action };
709 size_t path_len = key.dsize;
710 struct notifyd_instance *instances = NULL;
711 size_t num_instances = 0;
714 if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
716 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
720 DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
721 (unsigned)num_instances, (int)key.dsize,
724 iov[0].iov_base = &msg;
725 iov[0].iov_len = offsetof(struct notify_event_msg, path);
726 iov[1].iov_base = tstate->msg->path + path_len + 1;
727 iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
729 for (i=0; i<num_instances; i++) {
730 struct notifyd_instance *instance = &instances[i];
731 struct server_id_buf idbuf;
735 if (tstate->covered_by_sys_notify) {
736 if (tstate->recursive) {
737 i_filter = instance->internal_subdir_filter;
739 i_filter = instance->internal_filter;
742 if (tstate->recursive) {
743 i_filter = instance->instance.subdir_filter;
745 i_filter = instance->instance.filter;
749 if ((i_filter & tstate->msg->filter) == 0) {
753 msg.private_data = instance->instance.private_data;
755 status = messaging_send_iov(
756 tstate->msg_ctx, instance->client,
757 MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
759 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
761 server_id_str_buf(instance->client, &idbuf),
764 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
765 procid_is_local(&instance->client)) {
767 * That process has died
769 notifyd_send_delete(tstate->msg_ctx, key, instance);
773 if (!NT_STATUS_IS_OK(status)) {
774 DEBUG(1, ("%s: messaging_send_iov returned %s\n",
775 __func__, nt_errstr(status)));
781 * Send a delete request to ourselves to properly discard a notify
782 * record for an smbd that has died.
785 static void notifyd_send_delete(struct messaging_context *msg_ctx,
787 struct notifyd_instance *instance)
789 struct notify_rec_change_msg msg = {
790 .instance.private_data = instance->instance.private_data
797 * Send a rec_change to ourselves to delete a dead entry
800 iov[0] = (struct iovec) {
802 .iov_len = offsetof(struct notify_rec_change_msg, path) };
803 iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
804 iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
806 status = messaging_send_iov_from(
807 msg_ctx, instance->client, messaging_server_id(msg_ctx),
808 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
810 if (!NT_STATUS_IS_OK(status)) {
811 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
812 __func__, nt_errstr(status)));
816 static bool notifyd_get_db(struct messaging_context *msg_ctx,
817 struct messaging_rec **prec,
820 struct notifyd_state *state = talloc_get_type_abort(
821 private_data, struct notifyd_state);
822 struct messaging_rec *rec = *prec;
823 struct server_id_buf id1, id2;
825 uint64_t rec_index = UINT64_MAX;
826 uint8_t index_buf[sizeof(uint64_t)];
831 dbsize = dbwrap_marshall(state->entries, NULL, 0);
833 buf = talloc_array(rec, uint8_t, dbsize);
835 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
836 __func__, (uintmax_t)dbsize));
840 dbsize = dbwrap_marshall(state->entries, buf, dbsize);
842 if (dbsize != talloc_get_size(buf)) {
843 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
844 (uintmax_t)talloc_get_size(buf),
850 if (state->log != NULL) {
851 rec_index = state->log->rec_index;
853 SBVAL(index_buf, 0, rec_index);
855 iov[0] = (struct iovec) { .iov_base = index_buf,
856 .iov_len = sizeof(index_buf) };
857 iov[1] = (struct iovec) { .iov_base = buf,
860 DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
861 (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
862 server_id_str_buf(messaging_server_id(msg_ctx), &id1),
863 server_id_str_buf(rec->src, &id2)));
865 status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
866 iov, ARRAY_SIZE(iov), NULL, 0);
868 if (!NT_STATUS_IS_OK(status)) {
869 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
870 __func__, nt_errstr(status)));
876 static int notifyd_add_proxy_syswatches(struct db_record *rec,
879 static bool notifyd_got_db(struct messaging_context *msg_ctx,
880 struct messaging_rec **prec,
883 struct notifyd_state *state = talloc_get_type_abort(
884 private_data, struct notifyd_state);
885 struct messaging_rec *rec = *prec;
886 struct notifyd_peer *p = NULL;
887 struct server_id_buf idbuf;
892 for (i=0; i<state->num_peers; i++) {
893 if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
900 DEBUG(10, ("%s: Did not find peer for db from %s\n",
901 __func__, server_id_str_buf(rec->src, &idbuf)));
905 if (rec->buf.length < 8) {
906 DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
907 (unsigned)rec->buf.length,
908 server_id_str_buf(rec->src, &idbuf)));
913 p->rec_index = BVAL(rec->buf.data, 0);
915 p->db = db_open_rbt(p);
917 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
922 status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
923 rec->buf.length - 8);
924 if (!NT_STATUS_IS_OK(status)) {
925 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
926 __func__, nt_errstr(status),
927 server_id_str_buf(rec->src, &idbuf)));
932 dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
935 DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
936 server_id_str_buf(rec->src, &idbuf), count));
941 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
942 struct server_id src,
943 struct messaging_reclog *log)
945 enum ndr_err_code ndr_err;
946 uint8_t msghdr[MESSAGE_HDR_LENGTH];
955 DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
956 (uintmax_t)log->rec_index, (unsigned)log->num_recs));
958 message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
959 (struct server_id) {0 });
960 iov[0] = (struct iovec) { .iov_base = msghdr,
961 .iov_len = sizeof(msghdr) };
963 ndr_err = ndr_push_struct_blob(
965 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
966 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
967 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
968 __func__, ndr_errstr(ndr_err)));
971 iov[1] = (struct iovec) { .iov_base = blob.data,
972 .iov_len = blob.length };
974 ret = ctdbd_messaging_send_iov(
975 ctdbd_conn, CTDB_BROADCAST_VNNMAP,
976 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
977 TALLOC_FREE(blob.data);
979 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
980 __func__, strerror(ret)));
988 TALLOC_FREE(log->recs);
991 struct notifyd_broadcast_reclog_state {
992 struct tevent_context *ev;
993 struct ctdbd_connection *ctdbd_conn;
994 struct server_id src;
995 struct messaging_reclog *log;
998 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1000 static struct tevent_req *notifyd_broadcast_reclog_send(
1001 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1002 struct ctdbd_connection *ctdbd_conn, struct server_id src,
1003 struct messaging_reclog *log)
1005 struct tevent_req *req, *subreq;
1006 struct notifyd_broadcast_reclog_state *state;
1008 req = tevent_req_create(mem_ctx, &state,
1009 struct notifyd_broadcast_reclog_state);
1014 state->ctdbd_conn = ctdbd_conn;
1018 subreq = tevent_wakeup_send(state, state->ev,
1019 timeval_current_ofs_msec(1000));
1020 if (tevent_req_nomem(subreq, req)) {
1021 return tevent_req_post(req, ev);
1023 tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1027 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1029 struct tevent_req *req = tevent_req_callback_data(
1030 subreq, struct tevent_req);
1031 struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1032 req, struct notifyd_broadcast_reclog_state);
1035 ok = tevent_wakeup_recv(subreq);
1036 TALLOC_FREE(subreq);
1038 tevent_req_oom(req);
1042 notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1044 subreq = tevent_wakeup_send(state, state->ev,
1045 timeval_current_ofs_msec(1000));
1046 if (tevent_req_nomem(subreq, req)) {
1049 tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1052 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1054 return tevent_req_simple_recv_unix(req);
1057 struct notifyd_clean_peers_state {
1058 struct tevent_context *ev;
1059 struct notifyd_state *notifyd;
1062 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1064 static struct tevent_req *notifyd_clean_peers_send(
1065 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1066 struct notifyd_state *notifyd)
1068 struct tevent_req *req, *subreq;
1069 struct notifyd_clean_peers_state *state;
1071 req = tevent_req_create(mem_ctx, &state,
1072 struct notifyd_clean_peers_state);
1077 state->notifyd = notifyd;
1079 subreq = tevent_wakeup_send(state, state->ev,
1080 timeval_current_ofs_msec(30000));
1081 if (tevent_req_nomem(subreq, req)) {
1082 return tevent_req_post(req, ev);
1084 tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1088 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1090 struct tevent_req *req = tevent_req_callback_data(
1091 subreq, struct tevent_req);
1092 struct notifyd_clean_peers_state *state = tevent_req_data(
1093 req, struct notifyd_clean_peers_state);
1094 struct notifyd_state *notifyd = state->notifyd;
1097 time_t now = time(NULL);
1099 ok = tevent_wakeup_recv(subreq);
1100 TALLOC_FREE(subreq);
1102 tevent_req_oom(req);
1107 while (i < notifyd->num_peers) {
1108 struct notifyd_peer *p = notifyd->peers[i];
1110 if ((now - p->last_broadcast) > 60) {
1111 struct server_id_buf idbuf;
1114 * Haven't heard for more than 60 seconds. Call this
1118 DEBUG(10, ("%s: peer %s died\n", __func__,
1119 server_id_str_buf(p->pid, &idbuf)));
1121 * This implicitly decrements notifyd->num_peers
1129 subreq = tevent_wakeup_send(state, state->ev,
1130 timeval_current_ofs_msec(30000));
1131 if (tevent_req_nomem(subreq, req)) {
1134 tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1137 static int notifyd_clean_peers_recv(struct tevent_req *req)
1139 return tevent_req_simple_recv_unix(req);
1142 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1145 struct notifyd_state *state = talloc_get_type_abort(
1146 private_data, struct notifyd_state);
1147 struct db_context *db = dbwrap_record_get_db(rec);
1148 TDB_DATA key = dbwrap_record_get_key(rec);
1149 TDB_DATA value = dbwrap_record_get_value(rec);
1150 struct notifyd_instance *instances = NULL;
1151 size_t num_instances = 0;
1153 char path[key.dsize+1];
1156 memcpy(path, key.dptr, key.dsize);
1157 path[key.dsize] = '\0';
1159 ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1162 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1167 for (i=0; i<num_instances; i++) {
1168 struct notifyd_instance *instance = &instances[i];
1169 uint32_t filter = instance->instance.filter;
1170 uint32_t subdir_filter = instance->instance.subdir_filter;
1173 ret = state->sys_notify_watch(
1174 db, state->sys_notify_ctx, path,
1175 &filter, &subdir_filter,
1176 notifyd_sys_callback, state->msg_ctx,
1177 &instance->sys_watch);
1179 DEBUG(1, ("%s: inotify_watch returned %s\n",
1180 __func__, strerror(errno)));
1187 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1189 TDB_DATA key = dbwrap_record_get_key(rec);
1190 TDB_DATA value = dbwrap_record_get_value(rec);
1191 struct notifyd_instance *instances = NULL;
1192 size_t num_instances = 0;
1196 ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1199 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1200 __func__, (int)key.dsize, (char *)key.dptr));
1203 for (i=0; i<num_instances; i++) {
1204 TALLOC_FREE(instances[i].sys_watch);
1209 static int notifyd_peer_destructor(struct notifyd_peer *p)
1211 struct notifyd_state *state = p->state;
1214 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches, NULL, NULL);
1216 for (i = 0; i<state->num_peers; i++) {
1217 if (p == state->peers[i]) {
1218 state->peers[i] = state->peers[state->num_peers-1];
1219 state->num_peers -= 1;
1226 static struct notifyd_peer *notifyd_peer_new(
1227 struct notifyd_state *state, struct server_id pid)
1229 struct notifyd_peer *p, **tmp;
1231 tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1232 state->num_peers+1);
1238 p = talloc_zero(state->peers, struct notifyd_peer);
1245 state->peers[state->num_peers] = p;
1246 state->num_peers += 1;
1248 talloc_set_destructor(p, notifyd_peer_destructor);
1253 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1254 const uint8_t *msg, size_t msglen)
1256 struct notifyd_state *state = peer->state;
1257 DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1259 struct server_id_buf idbuf;
1260 struct messaging_reclog *log;
1261 enum ndr_err_code ndr_err;
1264 if (peer->db == NULL) {
1271 log = talloc(peer, struct messaging_reclog);
1273 DEBUG(10, ("%s: talloc failed\n", __func__));
1277 ndr_err = ndr_pull_struct_blob_all(
1279 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1280 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1281 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1282 __func__, ndr_errstr(ndr_err)));
1286 DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1287 (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1288 server_id_str_buf(peer->pid, &idbuf)));
1290 if (log->rec_index != peer->rec_index) {
1291 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1292 __func__, (uintmax_t)log->rec_index,
1293 server_id_str_buf(peer->pid, &idbuf),
1294 (uintmax_t)peer->rec_index));
1298 for (i=0; i<log->num_recs; i++) {
1299 struct messaging_rec *r = log->recs[i];
1300 struct notify_rec_change_msg *chg;
1304 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1307 DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1312 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1313 &chg->instance, peer->db,
1314 state->sys_notify_watch,
1315 state->sys_notify_ctx,
1318 DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1324 peer->rec_index += 1;
1325 peer->last_broadcast = time(NULL);
1331 DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1332 server_id_str_buf(peer->pid, &idbuf)));
1337 * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1338 * messages) broadcasts by other notifyds. Several cases:
1340 * We don't know the source. This creates a new peer. Creating a peer
1341 * involves asking the peer for its full database. We assume ordered
1342 * messages, so the new database will arrive before the next broadcast
1345 * We know the source and the log index matches. We will apply the log
1346 * locally to our peer's db as if we had received it from a local
1349 * We know the source but the log index does not match. This means we
1350 * lost a message. We just drop the whole peer and wait for the next
1351 * broadcast, which will then trigger a fresh database pull.
1354 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
1356 const uint8_t *msg, size_t msglen,
1359 struct notifyd_state *state = talloc_get_type_abort(
1360 private_data, struct notifyd_state);
1361 struct server_id my_id = messaging_server_id(state->msg_ctx);
1362 struct notifyd_peer *p;
1365 struct server_id src, dst;
1366 struct server_id_buf idbuf;
1369 if (msglen < MESSAGE_HDR_LENGTH) {
1370 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1373 message_hdr_get(&msg_type, &src, &dst, msg);
1375 if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1376 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1377 (unsigned)msg_type));
1380 if (server_id_equal(&src, &my_id)) {
1381 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1385 DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1386 __func__, server_id_str_buf(src, &idbuf)));
1388 for (i=0; i<state->num_peers; i++) {
1389 if (server_id_equal(&state->peers[i]->pid, &src)) {
1391 DEBUG(10, ("%s: Applying changes to peer %u\n",
1392 __func__, (unsigned)i));
1394 notifyd_apply_reclog(state->peers[i],
1395 msg + MESSAGE_HDR_LENGTH,
1396 msglen - MESSAGE_HDR_LENGTH);
1401 DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1402 server_id_str_buf(src, &idbuf)));
1404 p = notifyd_peer_new(state, src);
1406 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1410 status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1412 if (!NT_STATUS_IS_OK(status)) {
1413 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1414 __func__, nt_errstr(status)));
1422 struct notifyd_parse_db_state {
1423 bool (*fn)(const char *path,
1424 struct server_id server,
1425 const struct notify_instance *instance,
1426 void *private_data);
1430 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1433 struct notifyd_parse_db_state *state = private_data;
1434 char path[key.dsize+1];
1435 struct notifyd_instance *instances = NULL;
1436 size_t num_instances = 0;
1440 memcpy(path, key.dptr, key.dsize);
1441 path[key.dsize] = 0;
1443 ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1446 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1451 for (i=0; i<num_instances; i++) {
1452 ok = state->fn(path, instances[i].client,
1453 &instances[i].instance,
1454 state->private_data);
1463 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1464 uint64_t *log_index,
1465 bool (*fn)(const char *path,
1466 struct server_id server,
1467 const struct notify_instance *instance,
1468 void *private_data),
1471 struct notifyd_parse_db_state state = {
1472 .fn = fn, .private_data = private_data
1479 *log_index = BVAL(buf, 0);
1484 status = dbwrap_parse_marshall_buf(
1485 buf, buflen, notifyd_parse_db_parser, &state);
1486 if (!NT_STATUS_IS_OK(status)) {
1487 return map_errno_from_nt_status(status);