notifyd: Move BlockSignals calls to server.c
[metze/samba/wip.git] / source3 / smbd / notifyd / notifyd.c
1 /*
2  * Unix SMB/CIFS implementation.
3  *
4  * Copyright (C) Volker Lendecke 2014
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "includes.h"
21 #include "librpc/gen_ndr/notify.h"
22 #include "librpc/gen_ndr/messaging.h"
23 #include "librpc/gen_ndr/server_id.h"
24 #include "lib/dbwrap/dbwrap.h"
25 #include "lib/dbwrap/dbwrap_rbt.h"
26 #include "messages.h"
27 #include "proto.h"
28 #include "tdb.h"
29 #include "util_tdb.h"
30 #include "notifyd.h"
31 #include "lib/util/server_id_db.h"
32 #include "lib/util/tevent_unix.h"
33 #include "ctdbd_conn.h"
34 #include "ctdb_srvids.h"
35 #include "source3/smbd/proto.h"
36 #include "server_id_db_util.h"
37 #include "lib/util/iov_buf.h"
38 #include "messages_util.h"
39
40 #ifdef CLUSTER_SUPPORT
41 #include "ctdb_protocol.h"
42 #endif
43
44 struct notifyd_peer;
45
46 /*
47  * All of notifyd's state
48  */
49
50 struct notifyd_state {
51         struct tevent_context *ev;
52         struct messaging_context *msg_ctx;
53         struct ctdbd_connection *ctdbd_conn;
54
55         /*
56          * Database of everything clients show interest in. Indexed by
57          * absolute path. The database keys are not 0-terminated
58          * because the criticial operation, notifyd_trigger, can walk
59          * the structure from the top without adding intermediate 0s.
60          * The database records contain an array of
61          *
62          * struct notifyd_instance
63          *
64          * to be maintained by parsed by notifyd_entry_parse()
65          */
66         struct db_context *entries;
67
68         /*
69          * In the cluster case, this is the place where we store a log
70          * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
71          * forward them to our peer notifyd's in the cluster once a
72          * second or when the log grows too large.
73          */
74
75         struct messaging_reclog *log;
76
77         /*
78          * Array of companion notifyd's in a cluster. Every notifyd
79          * broadcasts its messaging_reclog to every other notifyd in
80          * the cluster. This is done by making ctdb send a message to
81          * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
82          * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
83          * had called register_with_ctdbd this srvid will receive the
84          * broadcasts.
85          *
86          * Database replication happens via these broadcasts. Also,
87          * they serve as liveness indication. If a notifyd receives a
88          * broadcast from an unknown peer, it will create one for this
89          * srvid. Also when we don't hear anything from a peer for a
90          * while, we will discard it.
91          */
92
93         struct notifyd_peer **peers;
94         size_t num_peers;
95
96         sys_notify_watch_fn sys_notify_watch;
97         struct sys_notify_context *sys_notify_ctx;
98 };
99
100 /*
101  * notifyd's representation of a notify instance
102  */
103 struct notifyd_instance {
104         struct server_id client;
105         struct notify_instance instance;
106
107         void *sys_watch; /* inotify/fam/etc handle */
108
109         /*
110          * Filters after sys_watch took responsibility of some bits
111          */
112         uint32_t internal_filter;
113         uint32_t internal_subdir_filter;
114 };
115
116 struct notifyd_peer {
117         struct notifyd_state *state;
118         struct server_id pid;
119         uint64_t rec_index;
120         struct db_context *db;
121         time_t last_broadcast;
122 };
123
124 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
125                                struct messaging_rec **prec,
126                                void *private_data);
127 static bool notifyd_trigger(struct messaging_context *msg_ctx,
128                             struct messaging_rec **prec,
129                             void *private_data);
130 static bool notifyd_get_db(struct messaging_context *msg_ctx,
131                            struct messaging_rec **prec,
132                            void *private_data);
133 static bool notifyd_got_db(struct messaging_context *msg_ctx,
134                            struct messaging_rec **prec,
135                            void *private_data);
136
137 #ifdef CLUSTER_SUPPORT
138 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
139                                      struct server_id src,
140                                      struct messaging_reclog *log);
141 #endif
142 static void notifyd_sys_callback(struct sys_notify_context *ctx,
143                                  void *private_data, struct notify_event *ev,
144                                  uint32_t filter);
145
146 #ifdef CLUSTER_SUPPORT
147 static struct tevent_req *notifyd_broadcast_reclog_send(
148         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
149         struct ctdbd_connection *ctdbd_conn, struct server_id src,
150         struct messaging_reclog *log);
151 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
152
153 static struct tevent_req *notifyd_clean_peers_send(
154         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
155         struct notifyd_state *notifyd);
156 static int notifyd_clean_peers_recv(struct tevent_req *req);
157 #endif
158
159 static int sys_notify_watch_dummy(
160         TALLOC_CTX *mem_ctx,
161         struct sys_notify_context *ctx,
162         const char *path,
163         uint32_t *filter,
164         uint32_t *subdir_filter,
165         void (*callback)(struct sys_notify_context *ctx,
166                          void *private_data,
167                          struct notify_event *ev,
168                          uint32_t filter),
169         void *private_data,
170         void *handle_p)
171 {
172         void **handle = handle_p;
173         *handle = NULL;
174         return 0;
175 }
176
177 static void notifyd_handler_done(struct tevent_req *subreq);
178
179 #ifdef CLUSTER_SUPPORT
180 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
181 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
182 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
183                                    uint64_t dst_srvid,
184                                    const uint8_t *msg, size_t msglen,
185                                    void *private_data);
186 #endif
187
188 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
189                                 struct messaging_context *msg_ctx,
190                                 struct ctdbd_connection *ctdbd_conn,
191                                 sys_notify_watch_fn sys_notify_watch,
192                                 struct sys_notify_context *sys_notify_ctx)
193 {
194         struct tevent_req *req, *subreq;
195         struct notifyd_state *state;
196         struct server_id_db *names_db;
197         int ret;
198
199         req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
200         if (req == NULL) {
201                 return NULL;
202         }
203         state->ev = ev;
204         state->msg_ctx = msg_ctx;
205         state->ctdbd_conn = ctdbd_conn;
206
207         if (sys_notify_watch == NULL) {
208                 sys_notify_watch = sys_notify_watch_dummy;
209         }
210
211         state->sys_notify_watch = sys_notify_watch;
212         state->sys_notify_ctx = sys_notify_ctx;
213
214         state->entries = db_open_rbt(state);
215         if (tevent_req_nomem(state->entries, req)) {
216                 return tevent_req_post(req, ev);
217         }
218
219         subreq = messaging_handler_send(state, ev, msg_ctx,
220                                         MSG_SMB_NOTIFY_REC_CHANGE,
221                                         notifyd_rec_change, state);
222         if (tevent_req_nomem(subreq, req)) {
223                 return tevent_req_post(req, ev);
224         }
225         tevent_req_set_callback(subreq, notifyd_handler_done, req);
226
227         subreq = messaging_handler_send(state, ev, msg_ctx,
228                                         MSG_SMB_NOTIFY_TRIGGER,
229                                         notifyd_trigger, state);
230         if (tevent_req_nomem(subreq, req)) {
231                 return tevent_req_post(req, ev);
232         }
233         tevent_req_set_callback(subreq, notifyd_handler_done, req);
234
235         subreq = messaging_handler_send(state, ev, msg_ctx,
236                                         MSG_SMB_NOTIFY_GET_DB,
237                                         notifyd_get_db, state);
238         if (tevent_req_nomem(subreq, req)) {
239                 return tevent_req_post(req, ev);
240         }
241         tevent_req_set_callback(subreq, notifyd_handler_done, req);
242
243         subreq = messaging_handler_send(state, ev, msg_ctx,
244                                         MSG_SMB_NOTIFY_DB,
245                                         notifyd_got_db, state);
246         if (tevent_req_nomem(subreq, req)) {
247                 return tevent_req_post(req, ev);
248         }
249         tevent_req_set_callback(subreq, notifyd_handler_done, req);
250
251         names_db = messaging_names_db(msg_ctx);
252
253         ret = server_id_db_set_exclusive(names_db, "notify-daemon");
254         if (ret != 0) {
255                 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
256                            __func__, strerror(ret)));
257                 tevent_req_error(req, ret);
258                 return tevent_req_post(req, ev);
259         }
260
261         if (ctdbd_conn == NULL) {
262                 /*
263                  * No cluster around, skip the database replication
264                  * engine
265                  */
266                 return req;
267         }
268
269 #ifdef CLUSTER_SUPPORT
270         state->log = talloc_zero(state, struct messaging_reclog);
271         if (tevent_req_nomem(state->log, req)) {
272                 return tevent_req_post(req, ev);
273         }
274
275         subreq = notifyd_broadcast_reclog_send(
276                 state->log, ev, ctdbd_conn, messaging_server_id(msg_ctx),
277                 state->log);
278         if (tevent_req_nomem(subreq, req)) {
279                 return tevent_req_post(req, ev);
280         }
281         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_finished,
282                                 req);
283
284         subreq = notifyd_clean_peers_send(state, ev, state);
285         if (tevent_req_nomem(subreq, req)) {
286                 return tevent_req_post(req, ev);
287         }
288         tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
289                                 req);
290
291         ret = register_with_ctdbd(ctdbd_conn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
292                                   notifyd_snoop_broadcast, state);
293         if (ret != 0) {
294                 tevent_req_error(req, ret);
295                 return tevent_req_post(req, ev);
296         }
297 #endif
298
299         return req;
300 }
301
302 static void notifyd_handler_done(struct tevent_req *subreq)
303 {
304         struct tevent_req *req = tevent_req_callback_data(
305                 subreq, struct tevent_req);
306         int ret;
307
308         ret = messaging_handler_recv(subreq);
309         TALLOC_FREE(subreq);
310         tevent_req_error(req, ret);
311 }
312
313 #ifdef CLUSTER_SUPPORT
314
315 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
316 {
317         struct tevent_req *req = tevent_req_callback_data(
318                 subreq, struct tevent_req);
319         int ret;
320
321         ret = notifyd_broadcast_reclog_recv(subreq);
322         TALLOC_FREE(subreq);
323         tevent_req_error(req, ret);
324 }
325
326 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
327 {
328         struct tevent_req *req = tevent_req_callback_data(
329                 subreq, struct tevent_req);
330         int ret;
331
332         ret = notifyd_clean_peers_recv(subreq);
333         TALLOC_FREE(subreq);
334         tevent_req_error(req, ret);
335 }
336
337 #endif
338
339 int notifyd_recv(struct tevent_req *req)
340 {
341         return tevent_req_simple_recv_unix(req);
342 }
343
344 /*
345  * Parse an entry in the notifyd_context->entries database
346  */
347
348 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
349                                 struct notifyd_instance **instances,
350                                 size_t *num_instances)
351 {
352         if ((buflen % sizeof(struct notifyd_instance)) != 0) {
353                 DEBUG(1, ("%s: invalid buffer size: %u\n",
354                           __func__, (unsigned)buflen));
355                 return false;
356         }
357
358         if (instances != NULL) {
359                 *instances = (struct notifyd_instance *)buf;
360         }
361         if (num_instances != NULL) {
362                 *num_instances = buflen / sizeof(struct notifyd_instance);
363         }
364         return true;
365 }
366
367 static bool notifyd_apply_rec_change(
368         const struct server_id *client,
369         const char *path, size_t pathlen,
370         const struct notify_instance *chg,
371         struct db_context *entries,
372         sys_notify_watch_fn sys_notify_watch,
373         struct sys_notify_context *sys_notify_ctx,
374         struct messaging_context *msg_ctx)
375 {
376         struct db_record *rec;
377         struct notifyd_instance *instances;
378         size_t num_instances;
379         size_t i;
380         struct notifyd_instance *instance;
381         TDB_DATA value;
382         NTSTATUS status;
383         bool ok = false;
384
385         if (pathlen == 0) {
386                 DEBUG(1, ("%s: pathlen==0\n", __func__));
387                 return false;
388         }
389         if (path[pathlen-1] != '\0') {
390                 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
391                 return false;
392         }
393
394         DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
395                    "private_data=%p\n", __func__, path,
396                    (unsigned)chg->filter, (unsigned)chg->subdir_filter,
397                    chg->private_data));
398
399         rec = dbwrap_fetch_locked(
400                 entries, entries,
401                 make_tdb_data((const uint8_t *)path, pathlen-1));
402
403         if (rec == NULL) {
404                 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
405                 goto fail;
406         }
407
408         num_instances = 0;
409         value = dbwrap_record_get_value(rec);
410
411         if (value.dsize != 0) {
412                 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
413                                          &num_instances)) {
414                         goto fail;
415                 }
416         }
417
418         /*
419          * Overallocate by one instance to avoid a realloc when adding
420          */
421         instances = talloc_array(rec, struct notifyd_instance,
422                                  num_instances + 1);
423         if (instances == NULL) {
424                 DEBUG(1, ("%s: talloc failed\n", __func__));
425                 goto fail;
426         }
427
428         if (value.dsize != 0) {
429                 memcpy(instances, value.dptr, value.dsize);
430         }
431
432         for (i=0; i<num_instances; i++) {
433                 instance = &instances[i];
434
435                 if (server_id_equal(&instance->client, client) &&
436                     (instance->instance.private_data == chg->private_data)) {
437                         break;
438                 }
439         }
440
441         if (i < num_instances) {
442                 instance->instance = *chg;
443         } else {
444                 /*
445                  * We've overallocated for one instance
446                  */
447                 instance = &instances[num_instances];
448
449                 *instance = (struct notifyd_instance) {
450                         .client = *client,
451                         .instance = *chg,
452                         .internal_filter = chg->filter,
453                         .internal_subdir_filter = chg->subdir_filter
454                 };
455
456                 num_instances += 1;
457         }
458
459         if ((instance->instance.filter != 0) ||
460             (instance->instance.subdir_filter != 0)) {
461                 int ret;
462
463                 TALLOC_FREE(instance->sys_watch);
464
465                 ret = sys_notify_watch(entries, sys_notify_ctx, path,
466                                        &instance->internal_filter,
467                                        &instance->internal_subdir_filter,
468                                        notifyd_sys_callback, msg_ctx,
469                                        &instance->sys_watch);
470                 if (ret != 0) {
471                         DEBUG(1, ("%s: inotify_watch returned %s\n",
472                                   __func__, strerror(errno)));
473                 }
474         }
475
476         if ((instance->instance.filter == 0) &&
477             (instance->instance.subdir_filter == 0)) {
478                 /* This is a delete request */
479                 TALLOC_FREE(instance->sys_watch);
480                 *instance = instances[num_instances-1];
481                 num_instances -= 1;
482         }
483
484         DEBUG(10, ("%s: %s has %u instances\n", __func__,
485                    path, (unsigned)num_instances));
486
487         if (num_instances == 0) {
488                 status = dbwrap_record_delete(rec);
489                 if (!NT_STATUS_IS_OK(status)) {
490                         DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
491                                   __func__, nt_errstr(status)));
492                         goto fail;
493                 }
494         } else {
495                 value = make_tdb_data(
496                         (uint8_t *)instances,
497                         sizeof(struct notifyd_instance) * num_instances);
498
499                 status = dbwrap_record_store(rec, value, 0);
500                 if (!NT_STATUS_IS_OK(status)) {
501                         DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
502                                   __func__, nt_errstr(status)));
503                         goto fail;
504                 }
505         }
506
507         ok = true;
508 fail:
509         TALLOC_FREE(rec);
510         return ok;
511 }
512
513 static void notifyd_sys_callback(struct sys_notify_context *ctx,
514                                  void *private_data, struct notify_event *ev,
515                                  uint32_t filter)
516 {
517         struct messaging_context *msg_ctx = talloc_get_type_abort(
518                 private_data, struct messaging_context);
519         struct notify_trigger_msg msg;
520         struct iovec iov[4];
521         char slash = '/';
522
523         msg = (struct notify_trigger_msg) {
524                 .when = timespec_current(),
525                 .action = ev->action,
526                 .filter = filter,
527         };
528
529         iov[0].iov_base = &msg;
530         iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
531         iov[1].iov_base = discard_const_p(char, ev->dir);
532         iov[1].iov_len = strlen(ev->dir);
533         iov[2].iov_base = &slash;
534         iov[2].iov_len = 1;
535         iov[3].iov_base = discard_const_p(char, ev->path);
536         iov[3].iov_len = strlen(ev->path)+1;
537
538         messaging_send_iov(
539                 msg_ctx, messaging_server_id(msg_ctx),
540                 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
541 }
542
543 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
544                                      struct notify_rec_change_msg **pmsg,
545                                      size_t *pathlen)
546 {
547         struct notify_rec_change_msg *msg;
548
549         if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
550                 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
551                           (unsigned)bufsize));
552                 return false;
553         }
554
555         *pmsg = msg = (struct notify_rec_change_msg *)buf;
556         *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
557
558         DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
559                    "private_data=%p, path=%.*s\n",
560                    __func__, (unsigned)msg->instance.filter,
561                    (unsigned)msg->instance.subdir_filter,
562                    msg->instance.private_data, (int)(*pathlen), msg->path));
563
564         return true;
565 }
566
567 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
568                                struct messaging_rec **prec,
569                                void *private_data)
570 {
571         struct notifyd_state *state = talloc_get_type_abort(
572                 private_data, struct notifyd_state);
573         struct server_id_buf idbuf;
574         struct messaging_rec *rec = *prec;
575         struct notify_rec_change_msg *msg;
576         size_t pathlen;
577         bool ok;
578
579         DEBUG(10, ("%s: Got %d bytes from %s\n", __func__,
580                    (unsigned)rec->buf.length,
581                    server_id_str_buf(rec->src, &idbuf)));
582
583         ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length,
584                                       &msg, &pathlen);
585         if (!ok) {
586                 return true;
587         }
588
589         ok = notifyd_apply_rec_change(
590                 &rec->src, msg->path, pathlen, &msg->instance,
591                 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
592                 state->msg_ctx);
593         if (!ok) {
594                 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
595                           __func__));
596                 return true;
597         }
598
599         if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
600                 return true;
601         }
602
603 #ifdef CLUSTER_SUPPORT
604         {
605
606         struct messaging_rec **tmp;
607         struct messaging_reclog *log;
608
609         log = state->log;
610
611         tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
612                              log->num_recs+1);
613         if (tmp == NULL) {
614                 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
615                 return true;
616         }
617         log->recs = tmp;
618
619         log->recs[log->num_recs] = talloc_move(log->recs, prec);
620         log->num_recs += 1;
621
622         if (log->num_recs >= 100) {
623                 /*
624                  * Don't let the log grow too large
625                  */
626                 notifyd_broadcast_reclog(state->ctdbd_conn,
627                                          messaging_server_id(msg_ctx), log);
628         }
629
630         }
631 #endif
632
633         return true;
634 }
635
636 struct notifyd_trigger_state {
637         struct messaging_context *msg_ctx;
638         struct notify_trigger_msg *msg;
639         bool recursive;
640         bool covered_by_sys_notify;
641 };
642
643 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
644                                    void *private_data);
645
646 static bool notifyd_trigger(struct messaging_context *msg_ctx,
647                             struct messaging_rec **prec,
648                             void *private_data)
649 {
650         struct notifyd_state *state = talloc_get_type_abort(
651                 private_data, struct notifyd_state);
652         struct server_id my_id = messaging_server_id(msg_ctx);
653         struct messaging_rec *rec = *prec;
654         struct notifyd_trigger_state tstate;
655         const char *path;
656         const char *p, *next_p;
657
658         if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
659                 DEBUG(1, ("message too short, ignoring: %u\n",
660                           (unsigned)rec->buf.length));
661                 return true;
662         }
663         if (rec->buf.data[rec->buf.length-1] != 0) {
664                 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
665                 return true;
666         }
667
668         tstate.msg_ctx = msg_ctx;
669
670         tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
671         tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
672
673         tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
674         path = tstate.msg->path;
675
676         DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
677                    __func__, (unsigned)tstate.msg->action,
678                    (unsigned)tstate.msg->filter, path));
679
680         if (path[0] != '/') {
681                 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
682                           __func__, path));
683                 return true;
684         }
685
686         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
687                 ptrdiff_t path_len = p - path;
688                 TDB_DATA key;
689                 uint32_t i;
690
691                 next_p = strchr(p+1, '/');
692                 tstate.recursive = (next_p != NULL);
693
694                 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
695                            (int)path_len, path));
696
697                 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
698                                    .dsize = path_len };
699
700                 dbwrap_parse_record(state->entries, key,
701                                     notifyd_trigger_parser, &tstate);
702
703                 if (state->peers == NULL) {
704                         continue;
705                 }
706
707                 if (rec->src.vnn != my_id.vnn) {
708                         continue;
709                 }
710
711                 for (i=0; i<state->num_peers; i++) {
712                         if (state->peers[i]->db == NULL) {
713                                 /*
714                                  * Inactive peer, did not get a db yet
715                                  */
716                                 continue;
717                         }
718                         dbwrap_parse_record(state->peers[i]->db, key,
719                                             notifyd_trigger_parser, &tstate);
720                 }
721         }
722
723         return true;
724 }
725
726 static void notifyd_send_delete(struct messaging_context *msg_ctx,
727                                 TDB_DATA key,
728                                 struct notifyd_instance *instance);
729
730 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
731                                    void *private_data)
732
733 {
734         struct notifyd_trigger_state *tstate = private_data;
735         struct notify_event_msg msg = { .action = tstate->msg->action };
736         struct iovec iov[2];
737         size_t path_len = key.dsize;
738         struct notifyd_instance *instances = NULL;
739         size_t num_instances = 0;
740         size_t i;
741
742         if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
743                                  &num_instances)) {
744                 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
745                 return;
746         }
747
748         DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
749                    (unsigned)num_instances, (int)key.dsize,
750                    (char *)key.dptr));
751
752         iov[0].iov_base = &msg;
753         iov[0].iov_len = offsetof(struct notify_event_msg, path);
754         iov[1].iov_base = tstate->msg->path + path_len + 1;
755         iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
756
757         for (i=0; i<num_instances; i++) {
758                 struct notifyd_instance *instance = &instances[i];
759                 struct server_id_buf idbuf;
760                 uint32_t i_filter;
761                 NTSTATUS status;
762
763                 if (tstate->covered_by_sys_notify) {
764                         if (tstate->recursive) {
765                                 i_filter = instance->internal_subdir_filter;
766                         } else {
767                                 i_filter = instance->internal_filter;
768                         }
769                 } else {
770                         if (tstate->recursive) {
771                                 i_filter = instance->instance.subdir_filter;
772                         } else {
773                                 i_filter = instance->instance.filter;
774                         }
775                 }
776
777                 if ((i_filter & tstate->msg->filter) == 0) {
778                         continue;
779                 }
780
781                 msg.private_data = instance->instance.private_data;
782
783                 status = messaging_send_iov(
784                         tstate->msg_ctx, instance->client,
785                         MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
786
787                 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
788                            __func__,
789                            server_id_str_buf(instance->client, &idbuf),
790                            nt_errstr(status)));
791
792                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
793                     procid_is_local(&instance->client)) {
794                         /*
795                          * That process has died
796                          */
797                         notifyd_send_delete(tstate->msg_ctx, key, instance);
798                         continue;
799                 }
800
801                 if (!NT_STATUS_IS_OK(status)) {
802                         DEBUG(1, ("%s: messaging_send_iov returned %s\n",
803                                   __func__, nt_errstr(status)));
804                 }
805         }
806 }
807
808 /*
809  * Send a delete request to ourselves to properly discard a notify
810  * record for an smbd that has died.
811  */
812
813 static void notifyd_send_delete(struct messaging_context *msg_ctx,
814                                 TDB_DATA key,
815                                 struct notifyd_instance *instance)
816 {
817         struct notify_rec_change_msg msg = {
818                 .instance.private_data = instance->instance.private_data
819         };
820         uint8_t nul = 0;
821         struct iovec iov[3];
822         int ret;
823
824         /*
825          * Send a rec_change to ourselves to delete a dead entry
826          */
827
828         iov[0] = (struct iovec) {
829                 .iov_base = &msg,
830                 .iov_len = offsetof(struct notify_rec_change_msg, path) };
831         iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
832         iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
833
834         ret = messaging_send_iov_from(
835                 msg_ctx, instance->client, messaging_server_id(msg_ctx),
836                 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
837
838         if (ret != 0) {
839                 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
840                            __func__, strerror(ret)));
841         }
842 }
843
844 static bool notifyd_get_db(struct messaging_context *msg_ctx,
845                            struct messaging_rec **prec,
846                            void *private_data)
847 {
848         struct notifyd_state *state = talloc_get_type_abort(
849                 private_data, struct notifyd_state);
850         struct messaging_rec *rec = *prec;
851         struct server_id_buf id1, id2;
852         NTSTATUS status;
853         uint64_t rec_index = UINT64_MAX;
854         uint8_t index_buf[sizeof(uint64_t)];
855         size_t dbsize;
856         uint8_t *buf;
857         struct iovec iov[2];
858
859         dbsize = dbwrap_marshall(state->entries, NULL, 0);
860
861         buf = talloc_array(rec, uint8_t, dbsize);
862         if (buf == NULL) {
863                 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
864                           __func__, (uintmax_t)dbsize));
865                 return true;
866         }
867
868         dbsize = dbwrap_marshall(state->entries, buf, dbsize);
869
870         if (dbsize != talloc_get_size(buf)) {
871                 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
872                           (uintmax_t)talloc_get_size(buf),
873                           (uintmax_t)dbsize));
874                 TALLOC_FREE(buf);
875                 return true;
876         }
877
878         if (state->log != NULL) {
879                 rec_index = state->log->rec_index;
880         }
881         SBVAL(index_buf, 0, rec_index);
882
883         iov[0] = (struct iovec) { .iov_base = index_buf,
884                                   .iov_len = sizeof(index_buf) };
885         iov[1] = (struct iovec) { .iov_base = buf,
886                                   .iov_len = dbsize };
887
888         DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
889                    (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
890                    server_id_str_buf(messaging_server_id(msg_ctx), &id1),
891                    server_id_str_buf(rec->src, &id2)));
892
893         status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
894                                     iov, ARRAY_SIZE(iov), NULL, 0);
895         TALLOC_FREE(buf);
896         if (!NT_STATUS_IS_OK(status)) {
897                 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
898                           __func__, nt_errstr(status)));
899         }
900
901         return true;
902 }
903
904 static int notifyd_add_proxy_syswatches(struct db_record *rec,
905                                         void *private_data);
906
907 static bool notifyd_got_db(struct messaging_context *msg_ctx,
908                            struct messaging_rec **prec,
909                            void *private_data)
910 {
911         struct notifyd_state *state = talloc_get_type_abort(
912                 private_data, struct notifyd_state);
913         struct messaging_rec *rec = *prec;
914         struct notifyd_peer *p = NULL;
915         struct server_id_buf idbuf;
916         NTSTATUS status;
917         int count;
918         size_t i;
919
920         for (i=0; i<state->num_peers; i++) {
921                 if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
922                         p = state->peers[i];
923                         break;
924                 }
925         }
926
927         if (p == NULL) {
928                 DEBUG(10, ("%s: Did not find peer for db from %s\n",
929                            __func__, server_id_str_buf(rec->src, &idbuf)));
930                 return true;
931         }
932
933         if (rec->buf.length < 8) {
934                 DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
935                            (unsigned)rec->buf.length,
936                            server_id_str_buf(rec->src, &idbuf)));
937                 TALLOC_FREE(p);
938                 return true;
939         }
940
941         p->rec_index = BVAL(rec->buf.data, 0);
942
943         p->db = db_open_rbt(p);
944         if (p->db == NULL) {
945                 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
946                 TALLOC_FREE(p);
947                 return true;
948         }
949
950         status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
951                                    rec->buf.length - 8);
952         if (!NT_STATUS_IS_OK(status)) {
953                 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
954                            __func__, nt_errstr(status),
955                            server_id_str_buf(rec->src, &idbuf)));
956                 TALLOC_FREE(p);
957                 return true;
958         }
959
960         dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
961                              &count);
962
963         DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
964                    server_id_str_buf(rec->src, &idbuf), count));
965
966         return true;
967 }
968
969 #ifdef CLUSTER_SUPPORT
970
971 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
972                                      struct server_id src,
973                                      struct messaging_reclog *log)
974 {
975         enum ndr_err_code ndr_err;
976         uint8_t msghdr[MESSAGE_HDR_LENGTH];
977         DATA_BLOB blob;
978         struct iovec iov[2];
979         int ret;
980
981         if (log == NULL) {
982                 return;
983         }
984
985         DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
986                    (uintmax_t)log->rec_index, (unsigned)log->num_recs));
987
988         message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
989                         (struct server_id) {0 });
990         iov[0] = (struct iovec) { .iov_base = msghdr,
991                                   .iov_len = sizeof(msghdr) };
992
993         ndr_err = ndr_push_struct_blob(
994                 &blob, log, log,
995                 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
996         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
997                 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
998                           __func__, ndr_errstr(ndr_err)));
999                 goto done;
1000         }
1001         iov[1] = (struct iovec) { .iov_base = blob.data,
1002                                   .iov_len = blob.length };
1003
1004         ret = ctdbd_messaging_send_iov(
1005                 ctdbd_conn, CTDB_BROADCAST_VNNMAP,
1006                 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1007         TALLOC_FREE(blob.data);
1008         if (ret != 0) {
1009                 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
1010                           __func__, strerror(ret)));
1011                 goto done;
1012         }
1013
1014         log->rec_index += 1;
1015
1016 done:
1017         log->num_recs = 0;
1018         TALLOC_FREE(log->recs);
1019 }
1020
1021 struct notifyd_broadcast_reclog_state {
1022         struct tevent_context *ev;
1023         struct ctdbd_connection *ctdbd_conn;
1024         struct server_id src;
1025         struct messaging_reclog *log;
1026 };
1027
1028 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1029
1030 static struct tevent_req *notifyd_broadcast_reclog_send(
1031         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1032         struct ctdbd_connection *ctdbd_conn, struct server_id src,
1033         struct messaging_reclog *log)
1034 {
1035         struct tevent_req *req, *subreq;
1036         struct notifyd_broadcast_reclog_state *state;
1037
1038         req = tevent_req_create(mem_ctx, &state,
1039                                 struct notifyd_broadcast_reclog_state);
1040         if (req == NULL) {
1041                 return NULL;
1042         }
1043         state->ev = ev;
1044         state->ctdbd_conn = ctdbd_conn;
1045         state->src = src;
1046         state->log = log;
1047
1048         subreq = tevent_wakeup_send(state, state->ev,
1049                                     timeval_current_ofs_msec(1000));
1050         if (tevent_req_nomem(subreq, req)) {
1051                 return tevent_req_post(req, ev);
1052         }
1053         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1054         return req;
1055 }
1056
1057 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1058 {
1059         struct tevent_req *req = tevent_req_callback_data(
1060                 subreq, struct tevent_req);
1061         struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1062                 req, struct notifyd_broadcast_reclog_state);
1063         bool ok;
1064
1065         ok = tevent_wakeup_recv(subreq);
1066         TALLOC_FREE(subreq);
1067         if (!ok) {
1068                 tevent_req_oom(req);
1069                 return;
1070         }
1071
1072         notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1073
1074         subreq = tevent_wakeup_send(state, state->ev,
1075                                     timeval_current_ofs_msec(1000));
1076         if (tevent_req_nomem(subreq, req)) {
1077                 return;
1078         }
1079         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1080 }
1081
1082 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1083 {
1084         return tevent_req_simple_recv_unix(req);
1085 }
1086
1087 struct notifyd_clean_peers_state {
1088         struct tevent_context *ev;
1089         struct notifyd_state *notifyd;
1090 };
1091
1092 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1093
1094 static struct tevent_req *notifyd_clean_peers_send(
1095         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1096         struct notifyd_state *notifyd)
1097 {
1098         struct tevent_req *req, *subreq;
1099         struct notifyd_clean_peers_state *state;
1100
1101         req = tevent_req_create(mem_ctx, &state,
1102                                 struct notifyd_clean_peers_state);
1103         if (req == NULL) {
1104                 return NULL;
1105         }
1106         state->ev = ev;
1107         state->notifyd = notifyd;
1108
1109         subreq = tevent_wakeup_send(state, state->ev,
1110                                     timeval_current_ofs_msec(30000));
1111         if (tevent_req_nomem(subreq, req)) {
1112                 return tevent_req_post(req, ev);
1113         }
1114         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1115         return req;
1116 }
1117
1118 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1119 {
1120         struct tevent_req *req = tevent_req_callback_data(
1121                 subreq, struct tevent_req);
1122         struct notifyd_clean_peers_state *state = tevent_req_data(
1123                 req, struct notifyd_clean_peers_state);
1124         struct notifyd_state *notifyd = state->notifyd;
1125         size_t i;
1126         bool ok;
1127         time_t now = time(NULL);
1128
1129         ok = tevent_wakeup_recv(subreq);
1130         TALLOC_FREE(subreq);
1131         if (!ok) {
1132                 tevent_req_oom(req);
1133                 return;
1134         }
1135
1136         i = 0;
1137         while (i < notifyd->num_peers) {
1138                 struct notifyd_peer *p = notifyd->peers[i];
1139
1140                 if ((now - p->last_broadcast) > 60) {
1141                         struct server_id_buf idbuf;
1142
1143                         /*
1144                          * Haven't heard for more than 60 seconds. Call this
1145                          * peer dead
1146                          */
1147
1148                         DEBUG(10, ("%s: peer %s died\n", __func__,
1149                                    server_id_str_buf(p->pid, &idbuf)));
1150                         /*
1151                          * This implicitly decrements notifyd->num_peers
1152                          */
1153                         TALLOC_FREE(p);
1154                 } else {
1155                         i += 1;
1156                 }
1157         }
1158
1159         subreq = tevent_wakeup_send(state, state->ev,
1160                                     timeval_current_ofs_msec(30000));
1161         if (tevent_req_nomem(subreq, req)) {
1162                 return;
1163         }
1164         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1165 }
1166
1167 static int notifyd_clean_peers_recv(struct tevent_req *req)
1168 {
1169         return tevent_req_simple_recv_unix(req);
1170 }
1171
1172 #endif
1173
1174 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1175                                         void *private_data)
1176 {
1177         struct notifyd_state *state = talloc_get_type_abort(
1178                 private_data, struct notifyd_state);
1179         struct db_context *db = dbwrap_record_get_db(rec);
1180         TDB_DATA key = dbwrap_record_get_key(rec);
1181         TDB_DATA value = dbwrap_record_get_value(rec);
1182         struct notifyd_instance *instances = NULL;
1183         size_t num_instances = 0;
1184         size_t i;
1185         char path[key.dsize+1];
1186         bool ok;
1187
1188         memcpy(path, key.dptr, key.dsize);
1189         path[key.dsize] = '\0';
1190
1191         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1192                                  &num_instances);
1193         if (!ok) {
1194                 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1195                           __func__, path));
1196                 return 0;
1197         }
1198
1199         for (i=0; i<num_instances; i++) {
1200                 struct notifyd_instance *instance = &instances[i];
1201                 uint32_t filter = instance->instance.filter;
1202                 uint32_t subdir_filter = instance->instance.subdir_filter;
1203                 int ret;
1204
1205                 ret = state->sys_notify_watch(
1206                         db, state->sys_notify_ctx, path,
1207                         &filter, &subdir_filter,
1208                         notifyd_sys_callback, state->msg_ctx,
1209                         &instance->sys_watch);
1210                 if (ret != 0) {
1211                         DEBUG(1, ("%s: inotify_watch returned %s\n",
1212                                   __func__, strerror(errno)));
1213                 }
1214         }
1215
1216         return 0;
1217 }
1218
1219 #ifdef CLUSTER_SUPPORT
1220
1221 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1222 {
1223         TDB_DATA key = dbwrap_record_get_key(rec);
1224         TDB_DATA value = dbwrap_record_get_value(rec);
1225         struct notifyd_instance *instances = NULL;
1226         size_t num_instances = 0;
1227         size_t i;
1228         bool ok;
1229
1230         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1231                                  &num_instances);
1232         if (!ok) {
1233                 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1234                           __func__, (int)key.dsize, (char *)key.dptr));
1235                 return 0;
1236         }
1237         for (i=0; i<num_instances; i++) {
1238                 TALLOC_FREE(instances[i].sys_watch);
1239         }
1240         return 0;
1241 }
1242
1243 static int notifyd_peer_destructor(struct notifyd_peer *p)
1244 {
1245         struct notifyd_state *state = p->state;
1246         size_t i;
1247
1248         if (p->db != NULL) {
1249                 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1250                                      NULL, NULL);
1251         }
1252
1253         for (i = 0; i<state->num_peers; i++) {
1254                 if (p == state->peers[i]) {
1255                         state->peers[i] = state->peers[state->num_peers-1];
1256                         state->num_peers -= 1;
1257                         break;
1258                 }
1259         }
1260         return 0;
1261 }
1262
1263 static struct notifyd_peer *notifyd_peer_new(
1264         struct notifyd_state *state, struct server_id pid)
1265 {
1266         struct notifyd_peer *p, **tmp;
1267
1268         tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1269                              state->num_peers+1);
1270         if (tmp == NULL) {
1271                 return NULL;
1272         }
1273         state->peers = tmp;
1274
1275         p = talloc_zero(state->peers, struct notifyd_peer);
1276         if (p == NULL) {
1277                 return NULL;
1278         }
1279         p->state = state;
1280         p->pid = pid;
1281
1282         state->peers[state->num_peers] = p;
1283         state->num_peers += 1;
1284
1285         talloc_set_destructor(p, notifyd_peer_destructor);
1286
1287         return p;
1288 }
1289
1290 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1291                                  const uint8_t *msg, size_t msglen)
1292 {
1293         struct notifyd_state *state = peer->state;
1294         DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1295                            .length = msglen };
1296         struct server_id_buf idbuf;
1297         struct messaging_reclog *log;
1298         enum ndr_err_code ndr_err;
1299         uint32_t i;
1300
1301         if (peer->db == NULL) {
1302                 /*
1303                  * No db yet
1304                  */
1305                 return;
1306         }
1307
1308         log = talloc(peer, struct messaging_reclog);
1309         if (log == NULL) {
1310                 DEBUG(10, ("%s: talloc failed\n", __func__));
1311                 return;
1312         }
1313
1314         ndr_err = ndr_pull_struct_blob_all(
1315                 &blob, log, log,
1316                 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1317         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1318                 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1319                            __func__, ndr_errstr(ndr_err)));
1320                 goto fail;
1321         }
1322
1323         DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1324                    (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1325                    server_id_str_buf(peer->pid, &idbuf)));
1326
1327         if (log->rec_index != peer->rec_index) {
1328                 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1329                           __func__, (uintmax_t)log->rec_index,
1330                           server_id_str_buf(peer->pid, &idbuf),
1331                           (uintmax_t)peer->rec_index));
1332                 goto fail;
1333         }
1334
1335         for (i=0; i<log->num_recs; i++) {
1336                 struct messaging_rec *r = log->recs[i];
1337                 struct notify_rec_change_msg *chg;
1338                 size_t pathlen;
1339                 bool ok;
1340
1341                 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1342                                               &chg, &pathlen);
1343                 if (!ok) {
1344                         DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1345                                   __func__));
1346                         goto fail;
1347                 }
1348
1349                 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1350                                               &chg->instance, peer->db,
1351                                               state->sys_notify_watch,
1352                                               state->sys_notify_ctx,
1353                                               state->msg_ctx);
1354                 if (!ok) {
1355                         DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1356                                   __func__));
1357                         goto fail;
1358                 }
1359         }
1360
1361         peer->rec_index += 1;
1362         peer->last_broadcast = time(NULL);
1363
1364         TALLOC_FREE(log);
1365         return;
1366
1367 fail:
1368         DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1369                    server_id_str_buf(peer->pid, &idbuf)));
1370         TALLOC_FREE(peer);
1371 }
1372
1373 /*
1374  * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1375  * messages) broadcasts by other notifyds. Several cases:
1376  *
1377  * We don't know the source. This creates a new peer. Creating a peer
1378  * involves asking the peer for its full database. We assume ordered
1379  * messages, so the new database will arrive before the next broadcast
1380  * will.
1381  *
1382  * We know the source and the log index matches. We will apply the log
1383  * locally to our peer's db as if we had received it from a local
1384  * client.
1385  *
1386  * We know the source but the log index does not match. This means we
1387  * lost a message. We just drop the whole peer and wait for the next
1388  * broadcast, which will then trigger a fresh database pull.
1389  */
1390
1391 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
1392                                    uint64_t dst_srvid,
1393                                    const uint8_t *msg, size_t msglen,
1394                                    void *private_data)
1395 {
1396         struct notifyd_state *state = talloc_get_type_abort(
1397                 private_data, struct notifyd_state);
1398         struct server_id my_id = messaging_server_id(state->msg_ctx);
1399         struct notifyd_peer *p;
1400         uint32_t i;
1401         uint32_t msg_type;
1402         struct server_id src, dst;
1403         struct server_id_buf idbuf;
1404         NTSTATUS status;
1405
1406         if (msglen < MESSAGE_HDR_LENGTH) {
1407                 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1408                 return 0;
1409         }
1410         message_hdr_get(&msg_type, &src, &dst, msg);
1411
1412         if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1413                 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1414                            (unsigned)msg_type));
1415                 return 0;
1416         }
1417         if (server_id_equal(&src, &my_id)) {
1418                 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1419                 return 0;
1420         }
1421
1422         DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1423                    __func__, server_id_str_buf(src, &idbuf)));
1424
1425         for (i=0; i<state->num_peers; i++) {
1426                 if (server_id_equal(&state->peers[i]->pid, &src)) {
1427
1428                         DEBUG(10, ("%s: Applying changes to peer %u\n",
1429                                    __func__, (unsigned)i));
1430
1431                         notifyd_apply_reclog(state->peers[i],
1432                                              msg + MESSAGE_HDR_LENGTH,
1433                                              msglen - MESSAGE_HDR_LENGTH);
1434                         return 0;
1435                 }
1436         }
1437
1438         DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1439                    server_id_str_buf(src, &idbuf)));
1440
1441         p = notifyd_peer_new(state, src);
1442         if (p == NULL) {
1443                 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1444                 return 0;
1445         }
1446
1447         status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1448                                     NULL, 0);
1449         if (!NT_STATUS_IS_OK(status)) {
1450                 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1451                            __func__, nt_errstr(status)));
1452                 TALLOC_FREE(p);
1453                 return 0;
1454         }
1455
1456         return 0;
1457 }
1458 #endif
1459
1460 struct notifyd_parse_db_state {
1461         bool (*fn)(const char *path,
1462                    struct server_id server,
1463                    const struct notify_instance *instance,
1464                    void *private_data);
1465         void *private_data;
1466 };
1467
1468 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1469                                     void *private_data)
1470 {
1471         struct notifyd_parse_db_state *state = private_data;
1472         char path[key.dsize+1];
1473         struct notifyd_instance *instances = NULL;
1474         size_t num_instances = 0;
1475         size_t i;
1476         bool ok;
1477
1478         memcpy(path, key.dptr, key.dsize);
1479         path[key.dsize] = 0;
1480
1481         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1482                                  &num_instances);
1483         if (!ok) {
1484                 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1485                            __func__, path));
1486                 return true;
1487         }
1488
1489         for (i=0; i<num_instances; i++) {
1490                 ok = state->fn(path, instances[i].client,
1491                                &instances[i].instance,
1492                                state->private_data);
1493                 if (!ok) {
1494                         return false;
1495                 }
1496         }
1497
1498         return true;
1499 }
1500
1501 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1502                      uint64_t *log_index,
1503                      bool (*fn)(const char *path,
1504                                 struct server_id server,
1505                                 const struct notify_instance *instance,
1506                                 void *private_data),
1507                      void *private_data)
1508 {
1509         struct notifyd_parse_db_state state = {
1510                 .fn = fn, .private_data = private_data
1511         };
1512         NTSTATUS status;
1513
1514         if (buflen < 8) {
1515                 return EINVAL;
1516         }
1517         *log_index = BVAL(buf, 0);
1518
1519         buf += 8;
1520         buflen -= 8;
1521
1522         status = dbwrap_parse_marshall_buf(
1523                 buf, buflen, notifyd_parse_db_parser, &state);
1524         if (!NT_STATUS_IS_OK(status)) {
1525                 return map_errno_from_nt_status(status);
1526         }
1527
1528         return 0;
1529 }