notifyd: Use messaging_register for MSG_SMB_NOTIFY_REC_CHANGE
[metze/samba/wip.git] / source3 / smbd / notifyd / notifyd.c
1 /*
2  * Unix SMB/CIFS implementation.
3  *
4  * Copyright (C) Volker Lendecke 2014
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include <tevent.h>
22 #include "lib/util/server_id.h"
23 #include "lib/util/data_blob.h"
24 #include "librpc/gen_ndr/notify.h"
25 #include "librpc/gen_ndr/messaging.h"
26 #include "librpc/gen_ndr/server_id.h"
27 #include "lib/dbwrap/dbwrap.h"
28 #include "lib/dbwrap/dbwrap_rbt.h"
29 #include "messages.h"
30 #include "tdb.h"
31 #include "util_tdb.h"
32 #include "notifyd.h"
33 #include "lib/util/server_id_db.h"
34 #include "lib/util/tevent_unix.h"
35 #include "lib/util/tevent_ntstatus.h"
36 #include "ctdbd_conn.h"
37 #include "ctdb_srvids.h"
38 #include "server_id_db_util.h"
39 #include "lib/util/iov_buf.h"
40 #include "messages_util.h"
41
42 #ifdef CLUSTER_SUPPORT
43 #include "ctdb_protocol.h"
44 #endif
45
46 struct notifyd_peer;
47
48 /*
49  * All of notifyd's state
50  */
51
52 struct notifyd_state {
53         struct tevent_context *ev;
54         struct messaging_context *msg_ctx;
55         struct ctdbd_connection *ctdbd_conn;
56
57         /*
58          * Database of everything clients show interest in. Indexed by
59          * absolute path. The database keys are not 0-terminated
60          * because the criticial operation, notifyd_trigger, can walk
61          * the structure from the top without adding intermediate 0s.
62          * The database records contain an array of
63          *
64          * struct notifyd_instance
65          *
66          * to be maintained and parsed by notifyd_entry_parse()
67          */
68         struct db_context *entries;
69
70         /*
71          * In the cluster case, this is the place where we store a log
72          * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
73          * forward them to our peer notifyd's in the cluster once a
74          * second or when the log grows too large.
75          */
76
77         struct messaging_reclog *log;
78
79         /*
80          * Array of companion notifyd's in a cluster. Every notifyd
81          * broadcasts its messaging_reclog to every other notifyd in
82          * the cluster. This is done by making ctdb send a message to
83          * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
84          * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
85          * had called register_with_ctdbd this srvid will receive the
86          * broadcasts.
87          *
88          * Database replication happens via these broadcasts. Also,
89          * they serve as liveness indication. If a notifyd receives a
90          * broadcast from an unknown peer, it will create one for this
91          * srvid. Also when we don't hear anything from a peer for a
92          * while, we will discard it.
93          */
94
95         struct notifyd_peer **peers;
96         size_t num_peers;
97
98         sys_notify_watch_fn sys_notify_watch;
99         struct sys_notify_context *sys_notify_ctx;
100 };
101
102 /*
103  * notifyd's representation of a notify instance
104  */
105 struct notifyd_instance {
106         struct server_id client;
107         struct notify_instance instance;
108
109         void *sys_watch; /* inotify/fam/etc handle */
110
111         /*
112          * Filters after sys_watch took responsibility of some bits
113          */
114         uint32_t internal_filter;
115         uint32_t internal_subdir_filter;
116 };
117
118 struct notifyd_peer {
119         struct notifyd_state *state;
120         struct server_id pid;
121         uint64_t rec_index;
122         struct db_context *db;
123         time_t last_broadcast;
124 };
125
126 static void notifyd_rec_change(struct messaging_context *msg_ctx,
127                                void *private_data, uint32_t msg_type,
128                                struct server_id src, DATA_BLOB *data);
129 static bool notifyd_trigger(struct messaging_context *msg_ctx,
130                             struct messaging_rec **prec,
131                             void *private_data);
132 static bool notifyd_get_db(struct messaging_context *msg_ctx,
133                            struct messaging_rec **prec,
134                            void *private_data);
135
136 #ifdef CLUSTER_SUPPORT
137 static bool notifyd_got_db(struct messaging_context *msg_ctx,
138                            struct messaging_rec **prec,
139                            void *private_data);
140 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
141                                      struct server_id src,
142                                      struct messaging_reclog *log);
143 #endif
144 static void notifyd_sys_callback(struct sys_notify_context *ctx,
145                                  void *private_data, struct notify_event *ev,
146                                  uint32_t filter);
147
148 #ifdef CLUSTER_SUPPORT
149 static struct tevent_req *notifyd_broadcast_reclog_send(
150         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
151         struct ctdbd_connection *ctdbd_conn, struct server_id src,
152         struct messaging_reclog *log);
153 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
154
155 static struct tevent_req *notifyd_clean_peers_send(
156         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
157         struct notifyd_state *notifyd);
158 static int notifyd_clean_peers_recv(struct tevent_req *req);
159 #endif
160
161 static int sys_notify_watch_dummy(
162         TALLOC_CTX *mem_ctx,
163         struct sys_notify_context *ctx,
164         const char *path,
165         uint32_t *filter,
166         uint32_t *subdir_filter,
167         void (*callback)(struct sys_notify_context *ctx,
168                          void *private_data,
169                          struct notify_event *ev,
170                          uint32_t filter),
171         void *private_data,
172         void *handle_p)
173 {
174         void **handle = handle_p;
175         *handle = NULL;
176         return 0;
177 }
178
179 static void notifyd_handler_done(struct tevent_req *subreq);
180
181 #ifdef CLUSTER_SUPPORT
182 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
183 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
184 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
185                                    uint64_t dst_srvid,
186                                    const uint8_t *msg, size_t msglen,
187                                    void *private_data);
188 #endif
189
190 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
191                                 struct messaging_context *msg_ctx,
192                                 struct ctdbd_connection *ctdbd_conn,
193                                 sys_notify_watch_fn sys_notify_watch,
194                                 struct sys_notify_context *sys_notify_ctx)
195 {
196         struct tevent_req *req, *subreq;
197         struct notifyd_state *state;
198         struct server_id_db *names_db;
199         NTSTATUS status;
200         int ret;
201
202         req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
203         if (req == NULL) {
204                 return NULL;
205         }
206         state->ev = ev;
207         state->msg_ctx = msg_ctx;
208         state->ctdbd_conn = ctdbd_conn;
209
210         if (sys_notify_watch == NULL) {
211                 sys_notify_watch = sys_notify_watch_dummy;
212         }
213
214         state->sys_notify_watch = sys_notify_watch;
215         state->sys_notify_ctx = sys_notify_ctx;
216
217         state->entries = db_open_rbt(state);
218         if (tevent_req_nomem(state->entries, req)) {
219                 return tevent_req_post(req, ev);
220         }
221
222         status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
223                                     notifyd_rec_change);
224         if (tevent_req_nterror(req, status)) {
225                 return tevent_req_post(req, ev);
226         }
227
228         subreq = messaging_handler_send(state, ev, msg_ctx,
229                                         MSG_SMB_NOTIFY_TRIGGER,
230                                         notifyd_trigger, state);
231         if (tevent_req_nomem(subreq, req)) {
232                 goto deregister_rec_change;
233         }
234         tevent_req_set_callback(subreq, notifyd_handler_done, req);
235
236         subreq = messaging_handler_send(state, ev, msg_ctx,
237                                         MSG_SMB_NOTIFY_GET_DB,
238                                         notifyd_get_db, state);
239         if (tevent_req_nomem(subreq, req)) {
240                 goto deregister_rec_change;
241         }
242         tevent_req_set_callback(subreq, notifyd_handler_done, req);
243
244         names_db = messaging_names_db(msg_ctx);
245
246         ret = server_id_db_set_exclusive(names_db, "notify-daemon");
247         if (ret != 0) {
248                 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
249                            __func__, strerror(ret)));
250                 tevent_req_error(req, ret);
251                 goto deregister_rec_change;
252         }
253
254         if (ctdbd_conn == NULL) {
255                 /*
256                  * No cluster around, skip the database replication
257                  * engine
258                  */
259                 return req;
260         }
261
262 #ifdef CLUSTER_SUPPORT
263         subreq = messaging_handler_send(state, ev, msg_ctx,
264                                         MSG_SMB_NOTIFY_DB,
265                                         notifyd_got_db, state);
266         if (tevent_req_nomem(subreq, req)) {
267                 goto deregister_rec_change;
268         }
269         tevent_req_set_callback(subreq, notifyd_handler_done, req);
270
271         state->log = talloc_zero(state, struct messaging_reclog);
272         if (tevent_req_nomem(state->log, req)) {
273                 goto deregister_rec_change;
274         }
275
276         subreq = notifyd_broadcast_reclog_send(
277                 state->log, ev, ctdbd_conn,
278                 messaging_server_id(msg_ctx),
279                 state->log);
280         if (tevent_req_nomem(subreq, req)) {
281                 goto deregister_rec_change;
282         }
283         tevent_req_set_callback(subreq,
284                                 notifyd_broadcast_reclog_finished,
285                                 req);
286
287         subreq = notifyd_clean_peers_send(state, ev, state);
288         if (tevent_req_nomem(subreq, req)) {
289                 goto deregister_rec_change;
290         }
291         tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
292                                 req);
293
294         ret = register_with_ctdbd(ctdbd_conn,
295                                   CTDB_SRVID_SAMBA_NOTIFY_PROXY,
296                                   notifyd_snoop_broadcast, state);
297         if (ret != 0) {
298                 tevent_req_error(req, ret);
299                 goto deregister_rec_change;
300         }
301 #endif
302
303         return req;
304
305 deregister_rec_change:
306         messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
307         return tevent_req_post(req, ev);
308 }
309
310 static void notifyd_handler_done(struct tevent_req *subreq)
311 {
312         struct tevent_req *req = tevent_req_callback_data(
313                 subreq, struct tevent_req);
314         int ret;
315
316         ret = messaging_handler_recv(subreq);
317         TALLOC_FREE(subreq);
318         tevent_req_error(req, ret);
319 }
320
321 #ifdef CLUSTER_SUPPORT
322
323 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
324 {
325         struct tevent_req *req = tevent_req_callback_data(
326                 subreq, struct tevent_req);
327         int ret;
328
329         ret = notifyd_broadcast_reclog_recv(subreq);
330         TALLOC_FREE(subreq);
331         tevent_req_error(req, ret);
332 }
333
334 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
335 {
336         struct tevent_req *req = tevent_req_callback_data(
337                 subreq, struct tevent_req);
338         int ret;
339
340         ret = notifyd_clean_peers_recv(subreq);
341         TALLOC_FREE(subreq);
342         tevent_req_error(req, ret);
343 }
344
345 #endif
346
347 int notifyd_recv(struct tevent_req *req)
348 {
349         return tevent_req_simple_recv_unix(req);
350 }
351
352 /*
353  * Parse an entry in the notifyd_context->entries database
354  */
355
356 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
357                                 struct notifyd_instance **instances,
358                                 size_t *num_instances)
359 {
360         if ((buflen % sizeof(struct notifyd_instance)) != 0) {
361                 DEBUG(1, ("%s: invalid buffer size: %u\n",
362                           __func__, (unsigned)buflen));
363                 return false;
364         }
365
366         if (instances != NULL) {
367                 *instances = (struct notifyd_instance *)buf;
368         }
369         if (num_instances != NULL) {
370                 *num_instances = buflen / sizeof(struct notifyd_instance);
371         }
372         return true;
373 }
374
375 static bool notifyd_apply_rec_change(
376         const struct server_id *client,
377         const char *path, size_t pathlen,
378         const struct notify_instance *chg,
379         struct db_context *entries,
380         sys_notify_watch_fn sys_notify_watch,
381         struct sys_notify_context *sys_notify_ctx,
382         struct messaging_context *msg_ctx)
383 {
384         struct db_record *rec;
385         struct notifyd_instance *instances;
386         size_t num_instances;
387         size_t i;
388         struct notifyd_instance *instance;
389         TDB_DATA value;
390         NTSTATUS status;
391         bool ok = false;
392
393         if (pathlen == 0) {
394                 DEBUG(1, ("%s: pathlen==0\n", __func__));
395                 return false;
396         }
397         if (path[pathlen-1] != '\0') {
398                 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
399                 return false;
400         }
401
402         DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
403                    "private_data=%p\n", __func__, path,
404                    (unsigned)chg->filter, (unsigned)chg->subdir_filter,
405                    chg->private_data));
406
407         rec = dbwrap_fetch_locked(
408                 entries, entries,
409                 make_tdb_data((const uint8_t *)path, pathlen-1));
410
411         if (rec == NULL) {
412                 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
413                 goto fail;
414         }
415
416         num_instances = 0;
417         value = dbwrap_record_get_value(rec);
418
419         if (value.dsize != 0) {
420                 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
421                                          &num_instances)) {
422                         goto fail;
423                 }
424         }
425
426         /*
427          * Overallocate by one instance to avoid a realloc when adding
428          */
429         instances = talloc_array(rec, struct notifyd_instance,
430                                  num_instances + 1);
431         if (instances == NULL) {
432                 DEBUG(1, ("%s: talloc failed\n", __func__));
433                 goto fail;
434         }
435
436         if (value.dsize != 0) {
437                 memcpy(instances, value.dptr, value.dsize);
438         }
439
440         for (i=0; i<num_instances; i++) {
441                 instance = &instances[i];
442
443                 if (server_id_equal(&instance->client, client) &&
444                     (instance->instance.private_data == chg->private_data)) {
445                         break;
446                 }
447         }
448
449         if (i < num_instances) {
450                 instance->instance = *chg;
451         } else {
452                 /*
453                  * We've overallocated for one instance
454                  */
455                 instance = &instances[num_instances];
456
457                 *instance = (struct notifyd_instance) {
458                         .client = *client,
459                         .instance = *chg,
460                         .internal_filter = chg->filter,
461                         .internal_subdir_filter = chg->subdir_filter
462                 };
463
464                 num_instances += 1;
465         }
466
467         if ((instance->instance.filter != 0) ||
468             (instance->instance.subdir_filter != 0)) {
469                 int ret;
470
471                 TALLOC_FREE(instance->sys_watch);
472
473                 ret = sys_notify_watch(entries, sys_notify_ctx, path,
474                                        &instance->internal_filter,
475                                        &instance->internal_subdir_filter,
476                                        notifyd_sys_callback, msg_ctx,
477                                        &instance->sys_watch);
478                 if (ret != 0) {
479                         DEBUG(1, ("%s: inotify_watch returned %s\n",
480                                   __func__, strerror(errno)));
481                 }
482         }
483
484         if ((instance->instance.filter == 0) &&
485             (instance->instance.subdir_filter == 0)) {
486                 /* This is a delete request */
487                 TALLOC_FREE(instance->sys_watch);
488                 *instance = instances[num_instances-1];
489                 num_instances -= 1;
490         }
491
492         DEBUG(10, ("%s: %s has %u instances\n", __func__,
493                    path, (unsigned)num_instances));
494
495         if (num_instances == 0) {
496                 status = dbwrap_record_delete(rec);
497                 if (!NT_STATUS_IS_OK(status)) {
498                         DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
499                                   __func__, nt_errstr(status)));
500                         goto fail;
501                 }
502         } else {
503                 value = make_tdb_data(
504                         (uint8_t *)instances,
505                         sizeof(struct notifyd_instance) * num_instances);
506
507                 status = dbwrap_record_store(rec, value, 0);
508                 if (!NT_STATUS_IS_OK(status)) {
509                         DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
510                                   __func__, nt_errstr(status)));
511                         goto fail;
512                 }
513         }
514
515         ok = true;
516 fail:
517         TALLOC_FREE(rec);
518         return ok;
519 }
520
521 static void notifyd_sys_callback(struct sys_notify_context *ctx,
522                                  void *private_data, struct notify_event *ev,
523                                  uint32_t filter)
524 {
525         struct messaging_context *msg_ctx = talloc_get_type_abort(
526                 private_data, struct messaging_context);
527         struct notify_trigger_msg msg;
528         struct iovec iov[4];
529         char slash = '/';
530
531         msg = (struct notify_trigger_msg) {
532                 .when = timespec_current(),
533                 .action = ev->action,
534                 .filter = filter,
535         };
536
537         iov[0].iov_base = &msg;
538         iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
539         iov[1].iov_base = discard_const_p(char, ev->dir);
540         iov[1].iov_len = strlen(ev->dir);
541         iov[2].iov_base = &slash;
542         iov[2].iov_len = 1;
543         iov[3].iov_base = discard_const_p(char, ev->path);
544         iov[3].iov_len = strlen(ev->path)+1;
545
546         messaging_send_iov(
547                 msg_ctx, messaging_server_id(msg_ctx),
548                 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
549 }
550
551 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
552                                      struct notify_rec_change_msg **pmsg,
553                                      size_t *pathlen)
554 {
555         struct notify_rec_change_msg *msg;
556
557         if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
558                 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
559                           (unsigned)bufsize));
560                 return false;
561         }
562
563         *pmsg = msg = (struct notify_rec_change_msg *)buf;
564         *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
565
566         DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
567                    "private_data=%p, path=%.*s\n",
568                    __func__, (unsigned)msg->instance.filter,
569                    (unsigned)msg->instance.subdir_filter,
570                    msg->instance.private_data, (int)(*pathlen), msg->path));
571
572         return true;
573 }
574
575 static void notifyd_rec_change(struct messaging_context *msg_ctx,
576                                void *private_data, uint32_t msg_type,
577                                struct server_id src, DATA_BLOB *data)
578 {
579         struct notifyd_state *state = talloc_get_type_abort(
580                 private_data, struct notifyd_state);
581         struct server_id_buf idbuf;
582         struct notify_rec_change_msg *msg;
583         size_t pathlen;
584         bool ok;
585
586         DBG_DEBUG("Got %zu bytes from %s\n", data->length,
587                   server_id_str_buf(src, &idbuf));
588
589         ok = notifyd_parse_rec_change(data->data, data->length,
590                                       &msg, &pathlen);
591         if (!ok) {
592                 return;
593         }
594
595         ok = notifyd_apply_rec_change(
596                 &src, msg->path, pathlen, &msg->instance,
597                 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
598                 state->msg_ctx);
599         if (!ok) {
600                 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
601                           __func__));
602                 return;
603         }
604
605         if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
606                 return;
607         }
608
609 #ifdef CLUSTER_SUPPORT
610         {
611
612         struct messaging_rec **tmp;
613         struct messaging_reclog *log;
614         struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
615
616         log = state->log;
617
618         tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
619                              log->num_recs+1);
620         if (tmp == NULL) {
621                 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
622                 return;
623         }
624         log->recs = tmp;
625
626         log->recs[log->num_recs] = messaging_rec_create(
627                 log->recs, src, messaging_server_id(msg_ctx),
628                 msg_type, &iov, 1, NULL, 0);
629
630         if (log->recs[log->num_recs] == NULL) {
631                 DBG_WARNING("messaging_rec_create failed, ignoring\n");
632                 return;
633         }
634
635         log->num_recs += 1;
636
637         if (log->num_recs >= 100) {
638                 /*
639                  * Don't let the log grow too large
640                  */
641                 notifyd_broadcast_reclog(state->ctdbd_conn,
642                                          messaging_server_id(msg_ctx), log);
643         }
644
645         }
646 #endif
647 }
648
649 struct notifyd_trigger_state {
650         struct messaging_context *msg_ctx;
651         struct notify_trigger_msg *msg;
652         bool recursive;
653         bool covered_by_sys_notify;
654 };
655
656 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
657                                    void *private_data);
658
659 static bool notifyd_trigger(struct messaging_context *msg_ctx,
660                             struct messaging_rec **prec,
661                             void *private_data)
662 {
663         struct notifyd_state *state = talloc_get_type_abort(
664                 private_data, struct notifyd_state);
665         struct server_id my_id = messaging_server_id(msg_ctx);
666         struct messaging_rec *rec = *prec;
667         struct notifyd_trigger_state tstate;
668         const char *path;
669         const char *p, *next_p;
670
671         if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
672                 DEBUG(1, ("message too short, ignoring: %u\n",
673                           (unsigned)rec->buf.length));
674                 return true;
675         }
676         if (rec->buf.data[rec->buf.length-1] != 0) {
677                 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
678                 return true;
679         }
680
681         tstate.msg_ctx = msg_ctx;
682
683         tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
684         tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
685
686         tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
687         path = tstate.msg->path;
688
689         DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
690                    __func__, (unsigned)tstate.msg->action,
691                    (unsigned)tstate.msg->filter, path));
692
693         if (path[0] != '/') {
694                 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
695                           __func__, path));
696                 return true;
697         }
698
699         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
700                 ptrdiff_t path_len = p - path;
701                 TDB_DATA key;
702                 uint32_t i;
703
704                 next_p = strchr(p+1, '/');
705                 tstate.recursive = (next_p != NULL);
706
707                 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
708                            (int)path_len, path));
709
710                 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
711                                    .dsize = path_len };
712
713                 dbwrap_parse_record(state->entries, key,
714                                     notifyd_trigger_parser, &tstate);
715
716                 if (state->peers == NULL) {
717                         continue;
718                 }
719
720                 if (rec->src.vnn != my_id.vnn) {
721                         continue;
722                 }
723
724                 for (i=0; i<state->num_peers; i++) {
725                         if (state->peers[i]->db == NULL) {
726                                 /*
727                                  * Inactive peer, did not get a db yet
728                                  */
729                                 continue;
730                         }
731                         dbwrap_parse_record(state->peers[i]->db, key,
732                                             notifyd_trigger_parser, &tstate);
733                 }
734         }
735
736         return true;
737 }
738
739 static void notifyd_send_delete(struct messaging_context *msg_ctx,
740                                 TDB_DATA key,
741                                 struct notifyd_instance *instance);
742
743 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
744                                    void *private_data)
745
746 {
747         struct notifyd_trigger_state *tstate = private_data;
748         struct notify_event_msg msg = { .action = tstate->msg->action,
749                                         .when = tstate->msg->when };
750         struct iovec iov[2];
751         size_t path_len = key.dsize;
752         struct notifyd_instance *instances = NULL;
753         size_t num_instances = 0;
754         size_t i;
755
756         if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
757                                  &num_instances)) {
758                 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
759                 return;
760         }
761
762         DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
763                    (unsigned)num_instances, (int)key.dsize,
764                    (char *)key.dptr));
765
766         iov[0].iov_base = &msg;
767         iov[0].iov_len = offsetof(struct notify_event_msg, path);
768         iov[1].iov_base = tstate->msg->path + path_len + 1;
769         iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
770
771         for (i=0; i<num_instances; i++) {
772                 struct notifyd_instance *instance = &instances[i];
773                 struct server_id_buf idbuf;
774                 uint32_t i_filter;
775                 NTSTATUS status;
776
777                 if (tstate->covered_by_sys_notify) {
778                         if (tstate->recursive) {
779                                 i_filter = instance->internal_subdir_filter;
780                         } else {
781                                 i_filter = instance->internal_filter;
782                         }
783                 } else {
784                         if (tstate->recursive) {
785                                 i_filter = instance->instance.subdir_filter;
786                         } else {
787                                 i_filter = instance->instance.filter;
788                         }
789                 }
790
791                 if ((i_filter & tstate->msg->filter) == 0) {
792                         continue;
793                 }
794
795                 msg.private_data = instance->instance.private_data;
796
797                 status = messaging_send_iov(
798                         tstate->msg_ctx, instance->client,
799                         MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
800
801                 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
802                            __func__,
803                            server_id_str_buf(instance->client, &idbuf),
804                            nt_errstr(status)));
805
806                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
807                     procid_is_local(&instance->client)) {
808                         /*
809                          * That process has died
810                          */
811                         notifyd_send_delete(tstate->msg_ctx, key, instance);
812                         continue;
813                 }
814
815                 if (!NT_STATUS_IS_OK(status)) {
816                         DEBUG(1, ("%s: messaging_send_iov returned %s\n",
817                                   __func__, nt_errstr(status)));
818                 }
819         }
820 }
821
822 /*
823  * Send a delete request to ourselves to properly discard a notify
824  * record for an smbd that has died.
825  */
826
827 static void notifyd_send_delete(struct messaging_context *msg_ctx,
828                                 TDB_DATA key,
829                                 struct notifyd_instance *instance)
830 {
831         struct notify_rec_change_msg msg = {
832                 .instance.private_data = instance->instance.private_data
833         };
834         uint8_t nul = 0;
835         struct iovec iov[3];
836         int ret;
837
838         /*
839          * Send a rec_change to ourselves to delete a dead entry
840          */
841
842         iov[0] = (struct iovec) {
843                 .iov_base = &msg,
844                 .iov_len = offsetof(struct notify_rec_change_msg, path) };
845         iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
846         iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
847
848         ret = messaging_send_iov_from(
849                 msg_ctx, instance->client, messaging_server_id(msg_ctx),
850                 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
851
852         if (ret != 0) {
853                 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
854                            __func__, strerror(ret)));
855         }
856 }
857
858 static bool notifyd_get_db(struct messaging_context *msg_ctx,
859                            struct messaging_rec **prec,
860                            void *private_data)
861 {
862         struct notifyd_state *state = talloc_get_type_abort(
863                 private_data, struct notifyd_state);
864         struct messaging_rec *rec = *prec;
865         struct server_id_buf id1, id2;
866         NTSTATUS status;
867         uint64_t rec_index = UINT64_MAX;
868         uint8_t index_buf[sizeof(uint64_t)];
869         size_t dbsize;
870         uint8_t *buf;
871         struct iovec iov[2];
872
873         dbsize = dbwrap_marshall(state->entries, NULL, 0);
874
875         buf = talloc_array(rec, uint8_t, dbsize);
876         if (buf == NULL) {
877                 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
878                           __func__, (uintmax_t)dbsize));
879                 return true;
880         }
881
882         dbsize = dbwrap_marshall(state->entries, buf, dbsize);
883
884         if (dbsize != talloc_get_size(buf)) {
885                 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
886                           (uintmax_t)talloc_get_size(buf),
887                           (uintmax_t)dbsize));
888                 TALLOC_FREE(buf);
889                 return true;
890         }
891
892         if (state->log != NULL) {
893                 rec_index = state->log->rec_index;
894         }
895         SBVAL(index_buf, 0, rec_index);
896
897         iov[0] = (struct iovec) { .iov_base = index_buf,
898                                   .iov_len = sizeof(index_buf) };
899         iov[1] = (struct iovec) { .iov_base = buf,
900                                   .iov_len = dbsize };
901
902         DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
903                    (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
904                    server_id_str_buf(messaging_server_id(msg_ctx), &id1),
905                    server_id_str_buf(rec->src, &id2)));
906
907         status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
908                                     iov, ARRAY_SIZE(iov), NULL, 0);
909         TALLOC_FREE(buf);
910         if (!NT_STATUS_IS_OK(status)) {
911                 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
912                           __func__, nt_errstr(status)));
913         }
914
915         return true;
916 }
917
918 #ifdef CLUSTER_SUPPORT
919
920 static int notifyd_add_proxy_syswatches(struct db_record *rec,
921                                         void *private_data);
922
923 static bool notifyd_got_db(struct messaging_context *msg_ctx,
924                            struct messaging_rec **prec,
925                            void *private_data)
926 {
927         struct notifyd_state *state = talloc_get_type_abort(
928                 private_data, struct notifyd_state);
929         struct messaging_rec *rec = *prec;
930         struct notifyd_peer *p = NULL;
931         struct server_id_buf idbuf;
932         NTSTATUS status;
933         int count;
934         size_t i;
935
936         for (i=0; i<state->num_peers; i++) {
937                 if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
938                         p = state->peers[i];
939                         break;
940                 }
941         }
942
943         if (p == NULL) {
944                 DEBUG(10, ("%s: Did not find peer for db from %s\n",
945                            __func__, server_id_str_buf(rec->src, &idbuf)));
946                 return true;
947         }
948
949         if (rec->buf.length < 8) {
950                 DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
951                            (unsigned)rec->buf.length,
952                            server_id_str_buf(rec->src, &idbuf)));
953                 TALLOC_FREE(p);
954                 return true;
955         }
956
957         p->rec_index = BVAL(rec->buf.data, 0);
958
959         p->db = db_open_rbt(p);
960         if (p->db == NULL) {
961                 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
962                 TALLOC_FREE(p);
963                 return true;
964         }
965
966         status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
967                                    rec->buf.length - 8);
968         if (!NT_STATUS_IS_OK(status)) {
969                 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
970                            __func__, nt_errstr(status),
971                            server_id_str_buf(rec->src, &idbuf)));
972                 TALLOC_FREE(p);
973                 return true;
974         }
975
976         dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
977                              &count);
978
979         DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
980                    server_id_str_buf(rec->src, &idbuf), count));
981
982         return true;
983 }
984
985 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
986                                      struct server_id src,
987                                      struct messaging_reclog *log)
988 {
989         enum ndr_err_code ndr_err;
990         uint8_t msghdr[MESSAGE_HDR_LENGTH];
991         DATA_BLOB blob;
992         struct iovec iov[2];
993         int ret;
994
995         if (log == NULL) {
996                 return;
997         }
998
999         DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
1000                    (uintmax_t)log->rec_index, (unsigned)log->num_recs));
1001
1002         message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
1003                         (struct server_id) {0 });
1004         iov[0] = (struct iovec) { .iov_base = msghdr,
1005                                   .iov_len = sizeof(msghdr) };
1006
1007         ndr_err = ndr_push_struct_blob(
1008                 &blob, log, log,
1009                 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
1010         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1011                 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
1012                           __func__, ndr_errstr(ndr_err)));
1013                 goto done;
1014         }
1015         iov[1] = (struct iovec) { .iov_base = blob.data,
1016                                   .iov_len = blob.length };
1017
1018         ret = ctdbd_messaging_send_iov(
1019                 ctdbd_conn, CTDB_BROADCAST_VNNMAP,
1020                 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1021         TALLOC_FREE(blob.data);
1022         if (ret != 0) {
1023                 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
1024                           __func__, strerror(ret)));
1025                 goto done;
1026         }
1027
1028         log->rec_index += 1;
1029
1030 done:
1031         log->num_recs = 0;
1032         TALLOC_FREE(log->recs);
1033 }
1034
1035 struct notifyd_broadcast_reclog_state {
1036         struct tevent_context *ev;
1037         struct ctdbd_connection *ctdbd_conn;
1038         struct server_id src;
1039         struct messaging_reclog *log;
1040 };
1041
1042 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1043
1044 static struct tevent_req *notifyd_broadcast_reclog_send(
1045         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1046         struct ctdbd_connection *ctdbd_conn, struct server_id src,
1047         struct messaging_reclog *log)
1048 {
1049         struct tevent_req *req, *subreq;
1050         struct notifyd_broadcast_reclog_state *state;
1051
1052         req = tevent_req_create(mem_ctx, &state,
1053                                 struct notifyd_broadcast_reclog_state);
1054         if (req == NULL) {
1055                 return NULL;
1056         }
1057         state->ev = ev;
1058         state->ctdbd_conn = ctdbd_conn;
1059         state->src = src;
1060         state->log = log;
1061
1062         subreq = tevent_wakeup_send(state, state->ev,
1063                                     timeval_current_ofs_msec(1000));
1064         if (tevent_req_nomem(subreq, req)) {
1065                 return tevent_req_post(req, ev);
1066         }
1067         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1068         return req;
1069 }
1070
1071 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1072 {
1073         struct tevent_req *req = tevent_req_callback_data(
1074                 subreq, struct tevent_req);
1075         struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1076                 req, struct notifyd_broadcast_reclog_state);
1077         bool ok;
1078
1079         ok = tevent_wakeup_recv(subreq);
1080         TALLOC_FREE(subreq);
1081         if (!ok) {
1082                 tevent_req_oom(req);
1083                 return;
1084         }
1085
1086         notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1087
1088         subreq = tevent_wakeup_send(state, state->ev,
1089                                     timeval_current_ofs_msec(1000));
1090         if (tevent_req_nomem(subreq, req)) {
1091                 return;
1092         }
1093         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1094 }
1095
1096 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1097 {
1098         return tevent_req_simple_recv_unix(req);
1099 }
1100
1101 struct notifyd_clean_peers_state {
1102         struct tevent_context *ev;
1103         struct notifyd_state *notifyd;
1104 };
1105
1106 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1107
1108 static struct tevent_req *notifyd_clean_peers_send(
1109         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1110         struct notifyd_state *notifyd)
1111 {
1112         struct tevent_req *req, *subreq;
1113         struct notifyd_clean_peers_state *state;
1114
1115         req = tevent_req_create(mem_ctx, &state,
1116                                 struct notifyd_clean_peers_state);
1117         if (req == NULL) {
1118                 return NULL;
1119         }
1120         state->ev = ev;
1121         state->notifyd = notifyd;
1122
1123         subreq = tevent_wakeup_send(state, state->ev,
1124                                     timeval_current_ofs_msec(30000));
1125         if (tevent_req_nomem(subreq, req)) {
1126                 return tevent_req_post(req, ev);
1127         }
1128         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1129         return req;
1130 }
1131
1132 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1133 {
1134         struct tevent_req *req = tevent_req_callback_data(
1135                 subreq, struct tevent_req);
1136         struct notifyd_clean_peers_state *state = tevent_req_data(
1137                 req, struct notifyd_clean_peers_state);
1138         struct notifyd_state *notifyd = state->notifyd;
1139         size_t i;
1140         bool ok;
1141         time_t now = time(NULL);
1142
1143         ok = tevent_wakeup_recv(subreq);
1144         TALLOC_FREE(subreq);
1145         if (!ok) {
1146                 tevent_req_oom(req);
1147                 return;
1148         }
1149
1150         i = 0;
1151         while (i < notifyd->num_peers) {
1152                 struct notifyd_peer *p = notifyd->peers[i];
1153
1154                 if ((now - p->last_broadcast) > 60) {
1155                         struct server_id_buf idbuf;
1156
1157                         /*
1158                          * Haven't heard for more than 60 seconds. Call this
1159                          * peer dead
1160                          */
1161
1162                         DEBUG(10, ("%s: peer %s died\n", __func__,
1163                                    server_id_str_buf(p->pid, &idbuf)));
1164                         /*
1165                          * This implicitly decrements notifyd->num_peers
1166                          */
1167                         TALLOC_FREE(p);
1168                 } else {
1169                         i += 1;
1170                 }
1171         }
1172
1173         subreq = tevent_wakeup_send(state, state->ev,
1174                                     timeval_current_ofs_msec(30000));
1175         if (tevent_req_nomem(subreq, req)) {
1176                 return;
1177         }
1178         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1179 }
1180
1181 static int notifyd_clean_peers_recv(struct tevent_req *req)
1182 {
1183         return tevent_req_simple_recv_unix(req);
1184 }
1185
1186 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1187                                         void *private_data)
1188 {
1189         struct notifyd_state *state = talloc_get_type_abort(
1190                 private_data, struct notifyd_state);
1191         struct db_context *db = dbwrap_record_get_db(rec);
1192         TDB_DATA key = dbwrap_record_get_key(rec);
1193         TDB_DATA value = dbwrap_record_get_value(rec);
1194         struct notifyd_instance *instances = NULL;
1195         size_t num_instances = 0;
1196         size_t i;
1197         char path[key.dsize+1];
1198         bool ok;
1199
1200         memcpy(path, key.dptr, key.dsize);
1201         path[key.dsize] = '\0';
1202
1203         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1204                                  &num_instances);
1205         if (!ok) {
1206                 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1207                           __func__, path));
1208                 return 0;
1209         }
1210
1211         for (i=0; i<num_instances; i++) {
1212                 struct notifyd_instance *instance = &instances[i];
1213                 uint32_t filter = instance->instance.filter;
1214                 uint32_t subdir_filter = instance->instance.subdir_filter;
1215                 int ret;
1216
1217                 /*
1218                  * This is a remote database. Pointers that we were
1219                  * given don't make sense locally. Initialize to NULL
1220                  * in case sys_notify_watch fails.
1221                  */
1222                 instances[i].sys_watch = NULL;
1223
1224                 ret = state->sys_notify_watch(
1225                         db, state->sys_notify_ctx, path,
1226                         &filter, &subdir_filter,
1227                         notifyd_sys_callback, state->msg_ctx,
1228                         &instance->sys_watch);
1229                 if (ret != 0) {
1230                         DEBUG(1, ("%s: inotify_watch returned %s\n",
1231                                   __func__, strerror(errno)));
1232                 }
1233         }
1234
1235         return 0;
1236 }
1237
1238 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1239 {
1240         TDB_DATA key = dbwrap_record_get_key(rec);
1241         TDB_DATA value = dbwrap_record_get_value(rec);
1242         struct notifyd_instance *instances = NULL;
1243         size_t num_instances = 0;
1244         size_t i;
1245         bool ok;
1246
1247         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1248                                  &num_instances);
1249         if (!ok) {
1250                 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1251                           __func__, (int)key.dsize, (char *)key.dptr));
1252                 return 0;
1253         }
1254         for (i=0; i<num_instances; i++) {
1255                 TALLOC_FREE(instances[i].sys_watch);
1256         }
1257         return 0;
1258 }
1259
1260 static int notifyd_peer_destructor(struct notifyd_peer *p)
1261 {
1262         struct notifyd_state *state = p->state;
1263         size_t i;
1264
1265         if (p->db != NULL) {
1266                 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1267                                      NULL, NULL);
1268         }
1269
1270         for (i = 0; i<state->num_peers; i++) {
1271                 if (p == state->peers[i]) {
1272                         state->peers[i] = state->peers[state->num_peers-1];
1273                         state->num_peers -= 1;
1274                         break;
1275                 }
1276         }
1277         return 0;
1278 }
1279
1280 static struct notifyd_peer *notifyd_peer_new(
1281         struct notifyd_state *state, struct server_id pid)
1282 {
1283         struct notifyd_peer *p, **tmp;
1284
1285         tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1286                              state->num_peers+1);
1287         if (tmp == NULL) {
1288                 return NULL;
1289         }
1290         state->peers = tmp;
1291
1292         p = talloc_zero(state->peers, struct notifyd_peer);
1293         if (p == NULL) {
1294                 return NULL;
1295         }
1296         p->state = state;
1297         p->pid = pid;
1298
1299         state->peers[state->num_peers] = p;
1300         state->num_peers += 1;
1301
1302         talloc_set_destructor(p, notifyd_peer_destructor);
1303
1304         return p;
1305 }
1306
1307 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1308                                  const uint8_t *msg, size_t msglen)
1309 {
1310         struct notifyd_state *state = peer->state;
1311         DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1312                            .length = msglen };
1313         struct server_id_buf idbuf;
1314         struct messaging_reclog *log;
1315         enum ndr_err_code ndr_err;
1316         uint32_t i;
1317
1318         if (peer->db == NULL) {
1319                 /*
1320                  * No db yet
1321                  */
1322                 return;
1323         }
1324
1325         log = talloc(peer, struct messaging_reclog);
1326         if (log == NULL) {
1327                 DEBUG(10, ("%s: talloc failed\n", __func__));
1328                 return;
1329         }
1330
1331         ndr_err = ndr_pull_struct_blob_all(
1332                 &blob, log, log,
1333                 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1334         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1335                 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1336                            __func__, ndr_errstr(ndr_err)));
1337                 goto fail;
1338         }
1339
1340         DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1341                    (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1342                    server_id_str_buf(peer->pid, &idbuf)));
1343
1344         if (log->rec_index != peer->rec_index) {
1345                 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1346                           __func__, (uintmax_t)log->rec_index,
1347                           server_id_str_buf(peer->pid, &idbuf),
1348                           (uintmax_t)peer->rec_index));
1349                 goto fail;
1350         }
1351
1352         for (i=0; i<log->num_recs; i++) {
1353                 struct messaging_rec *r = log->recs[i];
1354                 struct notify_rec_change_msg *chg;
1355                 size_t pathlen;
1356                 bool ok;
1357
1358                 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1359                                               &chg, &pathlen);
1360                 if (!ok) {
1361                         DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1362                                   __func__));
1363                         goto fail;
1364                 }
1365
1366                 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1367                                               &chg->instance, peer->db,
1368                                               state->sys_notify_watch,
1369                                               state->sys_notify_ctx,
1370                                               state->msg_ctx);
1371                 if (!ok) {
1372                         DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1373                                   __func__));
1374                         goto fail;
1375                 }
1376         }
1377
1378         peer->rec_index += 1;
1379         peer->last_broadcast = time(NULL);
1380
1381         TALLOC_FREE(log);
1382         return;
1383
1384 fail:
1385         DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1386                    server_id_str_buf(peer->pid, &idbuf)));
1387         TALLOC_FREE(peer);
1388 }
1389
1390 /*
1391  * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1392  * messages) broadcasts by other notifyds. Several cases:
1393  *
1394  * We don't know the source. This creates a new peer. Creating a peer
1395  * involves asking the peer for its full database. We assume ordered
1396  * messages, so the new database will arrive before the next broadcast
1397  * will.
1398  *
1399  * We know the source and the log index matches. We will apply the log
1400  * locally to our peer's db as if we had received it from a local
1401  * client.
1402  *
1403  * We know the source but the log index does not match. This means we
1404  * lost a message. We just drop the whole peer and wait for the next
1405  * broadcast, which will then trigger a fresh database pull.
1406  */
1407
1408 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
1409                                    uint64_t dst_srvid,
1410                                    const uint8_t *msg, size_t msglen,
1411                                    void *private_data)
1412 {
1413         struct notifyd_state *state = talloc_get_type_abort(
1414                 private_data, struct notifyd_state);
1415         struct server_id my_id = messaging_server_id(state->msg_ctx);
1416         struct notifyd_peer *p;
1417         uint32_t i;
1418         uint32_t msg_type;
1419         struct server_id src, dst;
1420         struct server_id_buf idbuf;
1421         NTSTATUS status;
1422
1423         if (msglen < MESSAGE_HDR_LENGTH) {
1424                 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1425                 return 0;
1426         }
1427         message_hdr_get(&msg_type, &src, &dst, msg);
1428
1429         if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1430                 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1431                            (unsigned)msg_type));
1432                 return 0;
1433         }
1434         if (server_id_equal(&src, &my_id)) {
1435                 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1436                 return 0;
1437         }
1438
1439         DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1440                    __func__, server_id_str_buf(src, &idbuf)));
1441
1442         for (i=0; i<state->num_peers; i++) {
1443                 if (server_id_equal(&state->peers[i]->pid, &src)) {
1444
1445                         DEBUG(10, ("%s: Applying changes to peer %u\n",
1446                                    __func__, (unsigned)i));
1447
1448                         notifyd_apply_reclog(state->peers[i],
1449                                              msg + MESSAGE_HDR_LENGTH,
1450                                              msglen - MESSAGE_HDR_LENGTH);
1451                         return 0;
1452                 }
1453         }
1454
1455         DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1456                    server_id_str_buf(src, &idbuf)));
1457
1458         p = notifyd_peer_new(state, src);
1459         if (p == NULL) {
1460                 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1461                 return 0;
1462         }
1463
1464         status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1465                                     NULL, 0);
1466         if (!NT_STATUS_IS_OK(status)) {
1467                 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1468                            __func__, nt_errstr(status)));
1469                 TALLOC_FREE(p);
1470                 return 0;
1471         }
1472
1473         return 0;
1474 }
1475 #endif
1476
1477 struct notifyd_parse_db_state {
1478         bool (*fn)(const char *path,
1479                    struct server_id server,
1480                    const struct notify_instance *instance,
1481                    void *private_data);
1482         void *private_data;
1483 };
1484
1485 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1486                                     void *private_data)
1487 {
1488         struct notifyd_parse_db_state *state = private_data;
1489         char path[key.dsize+1];
1490         struct notifyd_instance *instances = NULL;
1491         size_t num_instances = 0;
1492         size_t i;
1493         bool ok;
1494
1495         memcpy(path, key.dptr, key.dsize);
1496         path[key.dsize] = 0;
1497
1498         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1499                                  &num_instances);
1500         if (!ok) {
1501                 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1502                            __func__, path));
1503                 return true;
1504         }
1505
1506         for (i=0; i<num_instances; i++) {
1507                 ok = state->fn(path, instances[i].client,
1508                                &instances[i].instance,
1509                                state->private_data);
1510                 if (!ok) {
1511                         return false;
1512                 }
1513         }
1514
1515         return true;
1516 }
1517
1518 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1519                      uint64_t *log_index,
1520                      bool (*fn)(const char *path,
1521                                 struct server_id server,
1522                                 const struct notify_instance *instance,
1523                                 void *private_data),
1524                      void *private_data)
1525 {
1526         struct notifyd_parse_db_state state = {
1527                 .fn = fn, .private_data = private_data
1528         };
1529         NTSTATUS status;
1530
1531         if (buflen < 8) {
1532                 return EINVAL;
1533         }
1534         *log_index = BVAL(buf, 0);
1535
1536         buf += 8;
1537         buflen -= 8;
1538
1539         status = dbwrap_parse_marshall_buf(
1540                 buf, buflen, notifyd_parse_db_parser, &state);
1541         if (!NT_STATUS_IS_OK(status)) {
1542                 return map_errno_from_nt_status(status);
1543         }
1544
1545         return 0;
1546 }