notifyd: Trim down the noncluster case
[metze/samba/wip.git] / source3 / smbd / notifyd / notifyd.c
1 /*
2  * Unix SMB/CIFS implementation.
3  *
4  * Copyright (C) Volker Lendecke 2014
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include <tevent.h>
22 #include "lib/util/data_blob.h"
23 #include "librpc/gen_ndr/notify.h"
24 #include "librpc/gen_ndr/messaging.h"
25 #include "librpc/gen_ndr/server_id.h"
26 #include "lib/dbwrap/dbwrap.h"
27 #include "lib/dbwrap/dbwrap_rbt.h"
28 #include "messages.h"
29 #include "tdb.h"
30 #include "util_tdb.h"
31 #include "notifyd.h"
32 #include "lib/util/server_id_db.h"
33 #include "lib/util/tevent_unix.h"
34 #include "ctdbd_conn.h"
35 #include "ctdb_srvids.h"
36 #include "server_id_db_util.h"
37 #include "lib/util/iov_buf.h"
38 #include "messages_util.h"
39
40 #ifdef CLUSTER_SUPPORT
41 #include "ctdb_protocol.h"
42 #endif
43
44 struct notifyd_peer;
45
46 /*
47  * All of notifyd's state
48  */
49
50 struct notifyd_state {
51         struct tevent_context *ev;
52         struct messaging_context *msg_ctx;
53         struct ctdbd_connection *ctdbd_conn;
54
55         /*
56          * Database of everything clients show interest in. Indexed by
57          * absolute path. The database keys are not 0-terminated
58          * because the criticial operation, notifyd_trigger, can walk
59          * the structure from the top without adding intermediate 0s.
60          * The database records contain an array of
61          *
62          * struct notifyd_instance
63          *
64          * to be maintained and parsed by notifyd_entry_parse()
65          */
66         struct db_context *entries;
67
68         /*
69          * In the cluster case, this is the place where we store a log
70          * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
71          * forward them to our peer notifyd's in the cluster once a
72          * second or when the log grows too large.
73          */
74
75         struct messaging_reclog *log;
76
77         /*
78          * Array of companion notifyd's in a cluster. Every notifyd
79          * broadcasts its messaging_reclog to every other notifyd in
80          * the cluster. This is done by making ctdb send a message to
81          * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
82          * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
83          * had called register_with_ctdbd this srvid will receive the
84          * broadcasts.
85          *
86          * Database replication happens via these broadcasts. Also,
87          * they serve as liveness indication. If a notifyd receives a
88          * broadcast from an unknown peer, it will create one for this
89          * srvid. Also when we don't hear anything from a peer for a
90          * while, we will discard it.
91          */
92
93         struct notifyd_peer **peers;
94         size_t num_peers;
95
96         sys_notify_watch_fn sys_notify_watch;
97         struct sys_notify_context *sys_notify_ctx;
98 };
99
100 /*
101  * notifyd's representation of a notify instance
102  */
103 struct notifyd_instance {
104         struct server_id client;
105         struct notify_instance instance;
106
107         void *sys_watch; /* inotify/fam/etc handle */
108
109         /*
110          * Filters after sys_watch took responsibility of some bits
111          */
112         uint32_t internal_filter;
113         uint32_t internal_subdir_filter;
114 };
115
116 struct notifyd_peer {
117         struct notifyd_state *state;
118         struct server_id pid;
119         uint64_t rec_index;
120         struct db_context *db;
121         time_t last_broadcast;
122 };
123
124 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
125                                struct messaging_rec **prec,
126                                void *private_data);
127 static bool notifyd_trigger(struct messaging_context *msg_ctx,
128                             struct messaging_rec **prec,
129                             void *private_data);
130 static bool notifyd_get_db(struct messaging_context *msg_ctx,
131                            struct messaging_rec **prec,
132                            void *private_data);
133
134 #ifdef CLUSTER_SUPPORT
135 static bool notifyd_got_db(struct messaging_context *msg_ctx,
136                            struct messaging_rec **prec,
137                            void *private_data);
138 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
139                                      struct server_id src,
140                                      struct messaging_reclog *log);
141 #endif
142 static void notifyd_sys_callback(struct sys_notify_context *ctx,
143                                  void *private_data, struct notify_event *ev,
144                                  uint32_t filter);
145
146 #ifdef CLUSTER_SUPPORT
147 static struct tevent_req *notifyd_broadcast_reclog_send(
148         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
149         struct ctdbd_connection *ctdbd_conn, struct server_id src,
150         struct messaging_reclog *log);
151 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
152
153 static struct tevent_req *notifyd_clean_peers_send(
154         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
155         struct notifyd_state *notifyd);
156 static int notifyd_clean_peers_recv(struct tevent_req *req);
157 #endif
158
159 static int sys_notify_watch_dummy(
160         TALLOC_CTX *mem_ctx,
161         struct sys_notify_context *ctx,
162         const char *path,
163         uint32_t *filter,
164         uint32_t *subdir_filter,
165         void (*callback)(struct sys_notify_context *ctx,
166                          void *private_data,
167                          struct notify_event *ev,
168                          uint32_t filter),
169         void *private_data,
170         void *handle_p)
171 {
172         void **handle = handle_p;
173         *handle = NULL;
174         return 0;
175 }
176
177 static void notifyd_handler_done(struct tevent_req *subreq);
178
179 #ifdef CLUSTER_SUPPORT
180 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
181 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
182 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
183                                    uint64_t dst_srvid,
184                                    const uint8_t *msg, size_t msglen,
185                                    void *private_data);
186 #endif
187
188 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
189                                 struct messaging_context *msg_ctx,
190                                 struct ctdbd_connection *ctdbd_conn,
191                                 sys_notify_watch_fn sys_notify_watch,
192                                 struct sys_notify_context *sys_notify_ctx)
193 {
194         struct tevent_req *req, *subreq;
195         struct notifyd_state *state;
196         struct server_id_db *names_db;
197         int ret;
198
199         req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
200         if (req == NULL) {
201                 return NULL;
202         }
203         state->ev = ev;
204         state->msg_ctx = msg_ctx;
205         state->ctdbd_conn = ctdbd_conn;
206
207         if (sys_notify_watch == NULL) {
208                 sys_notify_watch = sys_notify_watch_dummy;
209         }
210
211         state->sys_notify_watch = sys_notify_watch;
212         state->sys_notify_ctx = sys_notify_ctx;
213
214         state->entries = db_open_rbt(state);
215         if (tevent_req_nomem(state->entries, req)) {
216                 return tevent_req_post(req, ev);
217         }
218
219         subreq = messaging_handler_send(state, ev, msg_ctx,
220                                         MSG_SMB_NOTIFY_REC_CHANGE,
221                                         notifyd_rec_change, state);
222         if (tevent_req_nomem(subreq, req)) {
223                 return tevent_req_post(req, ev);
224         }
225         tevent_req_set_callback(subreq, notifyd_handler_done, req);
226
227         subreq = messaging_handler_send(state, ev, msg_ctx,
228                                         MSG_SMB_NOTIFY_TRIGGER,
229                                         notifyd_trigger, state);
230         if (tevent_req_nomem(subreq, req)) {
231                 return tevent_req_post(req, ev);
232         }
233         tevent_req_set_callback(subreq, notifyd_handler_done, req);
234
235         subreq = messaging_handler_send(state, ev, msg_ctx,
236                                         MSG_SMB_NOTIFY_GET_DB,
237                                         notifyd_get_db, state);
238         if (tevent_req_nomem(subreq, req)) {
239                 return tevent_req_post(req, ev);
240         }
241         tevent_req_set_callback(subreq, notifyd_handler_done, req);
242
243 #ifdef CLUSTER_SUPPORT
244         subreq = messaging_handler_send(state, ev, msg_ctx,
245                                         MSG_SMB_NOTIFY_DB,
246                                         notifyd_got_db, state);
247         if (tevent_req_nomem(subreq, req)) {
248                 return tevent_req_post(req, ev);
249         }
250         tevent_req_set_callback(subreq, notifyd_handler_done, req);
251 #endif
252
253         names_db = messaging_names_db(msg_ctx);
254
255         ret = server_id_db_set_exclusive(names_db, "notify-daemon");
256         if (ret != 0) {
257                 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
258                            __func__, strerror(ret)));
259                 tevent_req_error(req, ret);
260                 return tevent_req_post(req, ev);
261         }
262
263         if (ctdbd_conn == NULL) {
264                 /*
265                  * No cluster around, skip the database replication
266                  * engine
267                  */
268                 return req;
269         }
270
271 #ifdef CLUSTER_SUPPORT
272         state->log = talloc_zero(state, struct messaging_reclog);
273         if (tevent_req_nomem(state->log, req)) {
274                 return tevent_req_post(req, ev);
275         }
276
277         subreq = notifyd_broadcast_reclog_send(
278                 state->log, ev, ctdbd_conn, messaging_server_id(msg_ctx),
279                 state->log);
280         if (tevent_req_nomem(subreq, req)) {
281                 return tevent_req_post(req, ev);
282         }
283         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_finished,
284                                 req);
285
286         subreq = notifyd_clean_peers_send(state, ev, state);
287         if (tevent_req_nomem(subreq, req)) {
288                 return tevent_req_post(req, ev);
289         }
290         tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
291                                 req);
292
293         ret = register_with_ctdbd(ctdbd_conn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
294                                   notifyd_snoop_broadcast, state);
295         if (ret != 0) {
296                 tevent_req_error(req, ret);
297                 return tevent_req_post(req, ev);
298         }
299 #endif
300
301         return req;
302 }
303
304 static void notifyd_handler_done(struct tevent_req *subreq)
305 {
306         struct tevent_req *req = tevent_req_callback_data(
307                 subreq, struct tevent_req);
308         int ret;
309
310         ret = messaging_handler_recv(subreq);
311         TALLOC_FREE(subreq);
312         tevent_req_error(req, ret);
313 }
314
315 #ifdef CLUSTER_SUPPORT
316
317 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
318 {
319         struct tevent_req *req = tevent_req_callback_data(
320                 subreq, struct tevent_req);
321         int ret;
322
323         ret = notifyd_broadcast_reclog_recv(subreq);
324         TALLOC_FREE(subreq);
325         tevent_req_error(req, ret);
326 }
327
328 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
329 {
330         struct tevent_req *req = tevent_req_callback_data(
331                 subreq, struct tevent_req);
332         int ret;
333
334         ret = notifyd_clean_peers_recv(subreq);
335         TALLOC_FREE(subreq);
336         tevent_req_error(req, ret);
337 }
338
339 #endif
340
341 int notifyd_recv(struct tevent_req *req)
342 {
343         return tevent_req_simple_recv_unix(req);
344 }
345
346 /*
347  * Parse an entry in the notifyd_context->entries database
348  */
349
350 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
351                                 struct notifyd_instance **instances,
352                                 size_t *num_instances)
353 {
354         if ((buflen % sizeof(struct notifyd_instance)) != 0) {
355                 DEBUG(1, ("%s: invalid buffer size: %u\n",
356                           __func__, (unsigned)buflen));
357                 return false;
358         }
359
360         if (instances != NULL) {
361                 *instances = (struct notifyd_instance *)buf;
362         }
363         if (num_instances != NULL) {
364                 *num_instances = buflen / sizeof(struct notifyd_instance);
365         }
366         return true;
367 }
368
369 static bool notifyd_apply_rec_change(
370         const struct server_id *client,
371         const char *path, size_t pathlen,
372         const struct notify_instance *chg,
373         struct db_context *entries,
374         sys_notify_watch_fn sys_notify_watch,
375         struct sys_notify_context *sys_notify_ctx,
376         struct messaging_context *msg_ctx)
377 {
378         struct db_record *rec;
379         struct notifyd_instance *instances;
380         size_t num_instances;
381         size_t i;
382         struct notifyd_instance *instance;
383         TDB_DATA value;
384         NTSTATUS status;
385         bool ok = false;
386
387         if (pathlen == 0) {
388                 DEBUG(1, ("%s: pathlen==0\n", __func__));
389                 return false;
390         }
391         if (path[pathlen-1] != '\0') {
392                 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
393                 return false;
394         }
395
396         DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
397                    "private_data=%p\n", __func__, path,
398                    (unsigned)chg->filter, (unsigned)chg->subdir_filter,
399                    chg->private_data));
400
401         rec = dbwrap_fetch_locked(
402                 entries, entries,
403                 make_tdb_data((const uint8_t *)path, pathlen-1));
404
405         if (rec == NULL) {
406                 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
407                 goto fail;
408         }
409
410         num_instances = 0;
411         value = dbwrap_record_get_value(rec);
412
413         if (value.dsize != 0) {
414                 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
415                                          &num_instances)) {
416                         goto fail;
417                 }
418         }
419
420         /*
421          * Overallocate by one instance to avoid a realloc when adding
422          */
423         instances = talloc_array(rec, struct notifyd_instance,
424                                  num_instances + 1);
425         if (instances == NULL) {
426                 DEBUG(1, ("%s: talloc failed\n", __func__));
427                 goto fail;
428         }
429
430         if (value.dsize != 0) {
431                 memcpy(instances, value.dptr, value.dsize);
432         }
433
434         for (i=0; i<num_instances; i++) {
435                 instance = &instances[i];
436
437                 if (server_id_equal(&instance->client, client) &&
438                     (instance->instance.private_data == chg->private_data)) {
439                         break;
440                 }
441         }
442
443         if (i < num_instances) {
444                 instance->instance = *chg;
445         } else {
446                 /*
447                  * We've overallocated for one instance
448                  */
449                 instance = &instances[num_instances];
450
451                 *instance = (struct notifyd_instance) {
452                         .client = *client,
453                         .instance = *chg,
454                         .internal_filter = chg->filter,
455                         .internal_subdir_filter = chg->subdir_filter
456                 };
457
458                 num_instances += 1;
459         }
460
461         if ((instance->instance.filter != 0) ||
462             (instance->instance.subdir_filter != 0)) {
463                 int ret;
464
465                 TALLOC_FREE(instance->sys_watch);
466
467                 ret = sys_notify_watch(entries, sys_notify_ctx, path,
468                                        &instance->internal_filter,
469                                        &instance->internal_subdir_filter,
470                                        notifyd_sys_callback, msg_ctx,
471                                        &instance->sys_watch);
472                 if (ret != 0) {
473                         DEBUG(1, ("%s: inotify_watch returned %s\n",
474                                   __func__, strerror(errno)));
475                 }
476         }
477
478         if ((instance->instance.filter == 0) &&
479             (instance->instance.subdir_filter == 0)) {
480                 /* This is a delete request */
481                 TALLOC_FREE(instance->sys_watch);
482                 *instance = instances[num_instances-1];
483                 num_instances -= 1;
484         }
485
486         DEBUG(10, ("%s: %s has %u instances\n", __func__,
487                    path, (unsigned)num_instances));
488
489         if (num_instances == 0) {
490                 status = dbwrap_record_delete(rec);
491                 if (!NT_STATUS_IS_OK(status)) {
492                         DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
493                                   __func__, nt_errstr(status)));
494                         goto fail;
495                 }
496         } else {
497                 value = make_tdb_data(
498                         (uint8_t *)instances,
499                         sizeof(struct notifyd_instance) * num_instances);
500
501                 status = dbwrap_record_store(rec, value, 0);
502                 if (!NT_STATUS_IS_OK(status)) {
503                         DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
504                                   __func__, nt_errstr(status)));
505                         goto fail;
506                 }
507         }
508
509         ok = true;
510 fail:
511         TALLOC_FREE(rec);
512         return ok;
513 }
514
515 static void notifyd_sys_callback(struct sys_notify_context *ctx,
516                                  void *private_data, struct notify_event *ev,
517                                  uint32_t filter)
518 {
519         struct messaging_context *msg_ctx = talloc_get_type_abort(
520                 private_data, struct messaging_context);
521         struct notify_trigger_msg msg;
522         struct iovec iov[4];
523         char slash = '/';
524
525         msg = (struct notify_trigger_msg) {
526                 .when = timespec_current(),
527                 .action = ev->action,
528                 .filter = filter,
529         };
530
531         iov[0].iov_base = &msg;
532         iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
533         iov[1].iov_base = discard_const_p(char, ev->dir);
534         iov[1].iov_len = strlen(ev->dir);
535         iov[2].iov_base = &slash;
536         iov[2].iov_len = 1;
537         iov[3].iov_base = discard_const_p(char, ev->path);
538         iov[3].iov_len = strlen(ev->path)+1;
539
540         messaging_send_iov(
541                 msg_ctx, messaging_server_id(msg_ctx),
542                 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
543 }
544
545 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
546                                      struct notify_rec_change_msg **pmsg,
547                                      size_t *pathlen)
548 {
549         struct notify_rec_change_msg *msg;
550
551         if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
552                 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
553                           (unsigned)bufsize));
554                 return false;
555         }
556
557         *pmsg = msg = (struct notify_rec_change_msg *)buf;
558         *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
559
560         DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
561                    "private_data=%p, path=%.*s\n",
562                    __func__, (unsigned)msg->instance.filter,
563                    (unsigned)msg->instance.subdir_filter,
564                    msg->instance.private_data, (int)(*pathlen), msg->path));
565
566         return true;
567 }
568
569 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
570                                struct messaging_rec **prec,
571                                void *private_data)
572 {
573         struct notifyd_state *state = talloc_get_type_abort(
574                 private_data, struct notifyd_state);
575         struct server_id_buf idbuf;
576         struct messaging_rec *rec = *prec;
577         struct notify_rec_change_msg *msg;
578         size_t pathlen;
579         bool ok;
580
581         DEBUG(10, ("%s: Got %d bytes from %s\n", __func__,
582                    (unsigned)rec->buf.length,
583                    server_id_str_buf(rec->src, &idbuf)));
584
585         ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length,
586                                       &msg, &pathlen);
587         if (!ok) {
588                 return true;
589         }
590
591         ok = notifyd_apply_rec_change(
592                 &rec->src, msg->path, pathlen, &msg->instance,
593                 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
594                 state->msg_ctx);
595         if (!ok) {
596                 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
597                           __func__));
598                 return true;
599         }
600
601         if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
602                 return true;
603         }
604
605 #ifdef CLUSTER_SUPPORT
606         {
607
608         struct messaging_rec **tmp;
609         struct messaging_reclog *log;
610
611         log = state->log;
612
613         tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
614                              log->num_recs+1);
615         if (tmp == NULL) {
616                 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
617                 return true;
618         }
619         log->recs = tmp;
620
621         log->recs[log->num_recs] = talloc_move(log->recs, prec);
622         log->num_recs += 1;
623
624         if (log->num_recs >= 100) {
625                 /*
626                  * Don't let the log grow too large
627                  */
628                 notifyd_broadcast_reclog(state->ctdbd_conn,
629                                          messaging_server_id(msg_ctx), log);
630         }
631
632         }
633 #endif
634
635         return true;
636 }
637
638 struct notifyd_trigger_state {
639         struct messaging_context *msg_ctx;
640         struct notify_trigger_msg *msg;
641         bool recursive;
642         bool covered_by_sys_notify;
643 };
644
645 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
646                                    void *private_data);
647
648 static bool notifyd_trigger(struct messaging_context *msg_ctx,
649                             struct messaging_rec **prec,
650                             void *private_data)
651 {
652         struct notifyd_state *state = talloc_get_type_abort(
653                 private_data, struct notifyd_state);
654         struct server_id my_id = messaging_server_id(msg_ctx);
655         struct messaging_rec *rec = *prec;
656         struct notifyd_trigger_state tstate;
657         const char *path;
658         const char *p, *next_p;
659
660         if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
661                 DEBUG(1, ("message too short, ignoring: %u\n",
662                           (unsigned)rec->buf.length));
663                 return true;
664         }
665         if (rec->buf.data[rec->buf.length-1] != 0) {
666                 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
667                 return true;
668         }
669
670         tstate.msg_ctx = msg_ctx;
671
672         tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
673         tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
674
675         tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
676         path = tstate.msg->path;
677
678         DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
679                    __func__, (unsigned)tstate.msg->action,
680                    (unsigned)tstate.msg->filter, path));
681
682         if (path[0] != '/') {
683                 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
684                           __func__, path));
685                 return true;
686         }
687
688         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
689                 ptrdiff_t path_len = p - path;
690                 TDB_DATA key;
691                 uint32_t i;
692
693                 next_p = strchr(p+1, '/');
694                 tstate.recursive = (next_p != NULL);
695
696                 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
697                            (int)path_len, path));
698
699                 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
700                                    .dsize = path_len };
701
702                 dbwrap_parse_record(state->entries, key,
703                                     notifyd_trigger_parser, &tstate);
704
705                 if (state->peers == NULL) {
706                         continue;
707                 }
708
709                 if (rec->src.vnn != my_id.vnn) {
710                         continue;
711                 }
712
713                 for (i=0; i<state->num_peers; i++) {
714                         if (state->peers[i]->db == NULL) {
715                                 /*
716                                  * Inactive peer, did not get a db yet
717                                  */
718                                 continue;
719                         }
720                         dbwrap_parse_record(state->peers[i]->db, key,
721                                             notifyd_trigger_parser, &tstate);
722                 }
723         }
724
725         return true;
726 }
727
728 static void notifyd_send_delete(struct messaging_context *msg_ctx,
729                                 TDB_DATA key,
730                                 struct notifyd_instance *instance);
731
732 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
733                                    void *private_data)
734
735 {
736         struct notifyd_trigger_state *tstate = private_data;
737         struct notify_event_msg msg = { .action = tstate->msg->action };
738         struct iovec iov[2];
739         size_t path_len = key.dsize;
740         struct notifyd_instance *instances = NULL;
741         size_t num_instances = 0;
742         size_t i;
743
744         if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
745                                  &num_instances)) {
746                 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
747                 return;
748         }
749
750         DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
751                    (unsigned)num_instances, (int)key.dsize,
752                    (char *)key.dptr));
753
754         iov[0].iov_base = &msg;
755         iov[0].iov_len = offsetof(struct notify_event_msg, path);
756         iov[1].iov_base = tstate->msg->path + path_len + 1;
757         iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
758
759         for (i=0; i<num_instances; i++) {
760                 struct notifyd_instance *instance = &instances[i];
761                 struct server_id_buf idbuf;
762                 uint32_t i_filter;
763                 NTSTATUS status;
764
765                 if (tstate->covered_by_sys_notify) {
766                         if (tstate->recursive) {
767                                 i_filter = instance->internal_subdir_filter;
768                         } else {
769                                 i_filter = instance->internal_filter;
770                         }
771                 } else {
772                         if (tstate->recursive) {
773                                 i_filter = instance->instance.subdir_filter;
774                         } else {
775                                 i_filter = instance->instance.filter;
776                         }
777                 }
778
779                 if ((i_filter & tstate->msg->filter) == 0) {
780                         continue;
781                 }
782
783                 msg.private_data = instance->instance.private_data;
784
785                 status = messaging_send_iov(
786                         tstate->msg_ctx, instance->client,
787                         MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
788
789                 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
790                            __func__,
791                            server_id_str_buf(instance->client, &idbuf),
792                            nt_errstr(status)));
793
794                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
795                     procid_is_local(&instance->client)) {
796                         /*
797                          * That process has died
798                          */
799                         notifyd_send_delete(tstate->msg_ctx, key, instance);
800                         continue;
801                 }
802
803                 if (!NT_STATUS_IS_OK(status)) {
804                         DEBUG(1, ("%s: messaging_send_iov returned %s\n",
805                                   __func__, nt_errstr(status)));
806                 }
807         }
808 }
809
810 /*
811  * Send a delete request to ourselves to properly discard a notify
812  * record for an smbd that has died.
813  */
814
815 static void notifyd_send_delete(struct messaging_context *msg_ctx,
816                                 TDB_DATA key,
817                                 struct notifyd_instance *instance)
818 {
819         struct notify_rec_change_msg msg = {
820                 .instance.private_data = instance->instance.private_data
821         };
822         uint8_t nul = 0;
823         struct iovec iov[3];
824         int ret;
825
826         /*
827          * Send a rec_change to ourselves to delete a dead entry
828          */
829
830         iov[0] = (struct iovec) {
831                 .iov_base = &msg,
832                 .iov_len = offsetof(struct notify_rec_change_msg, path) };
833         iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
834         iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
835
836         ret = messaging_send_iov_from(
837                 msg_ctx, instance->client, messaging_server_id(msg_ctx),
838                 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
839
840         if (ret != 0) {
841                 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
842                            __func__, strerror(ret)));
843         }
844 }
845
846 static bool notifyd_get_db(struct messaging_context *msg_ctx,
847                            struct messaging_rec **prec,
848                            void *private_data)
849 {
850         struct notifyd_state *state = talloc_get_type_abort(
851                 private_data, struct notifyd_state);
852         struct messaging_rec *rec = *prec;
853         struct server_id_buf id1, id2;
854         NTSTATUS status;
855         uint64_t rec_index = UINT64_MAX;
856         uint8_t index_buf[sizeof(uint64_t)];
857         size_t dbsize;
858         uint8_t *buf;
859         struct iovec iov[2];
860
861         dbsize = dbwrap_marshall(state->entries, NULL, 0);
862
863         buf = talloc_array(rec, uint8_t, dbsize);
864         if (buf == NULL) {
865                 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
866                           __func__, (uintmax_t)dbsize));
867                 return true;
868         }
869
870         dbsize = dbwrap_marshall(state->entries, buf, dbsize);
871
872         if (dbsize != talloc_get_size(buf)) {
873                 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
874                           (uintmax_t)talloc_get_size(buf),
875                           (uintmax_t)dbsize));
876                 TALLOC_FREE(buf);
877                 return true;
878         }
879
880         if (state->log != NULL) {
881                 rec_index = state->log->rec_index;
882         }
883         SBVAL(index_buf, 0, rec_index);
884
885         iov[0] = (struct iovec) { .iov_base = index_buf,
886                                   .iov_len = sizeof(index_buf) };
887         iov[1] = (struct iovec) { .iov_base = buf,
888                                   .iov_len = dbsize };
889
890         DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
891                    (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
892                    server_id_str_buf(messaging_server_id(msg_ctx), &id1),
893                    server_id_str_buf(rec->src, &id2)));
894
895         status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
896                                     iov, ARRAY_SIZE(iov), NULL, 0);
897         TALLOC_FREE(buf);
898         if (!NT_STATUS_IS_OK(status)) {
899                 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
900                           __func__, nt_errstr(status)));
901         }
902
903         return true;
904 }
905
906 #ifdef CLUSTER_SUPPORT
907
908 static int notifyd_add_proxy_syswatches(struct db_record *rec,
909                                         void *private_data);
910
911 static bool notifyd_got_db(struct messaging_context *msg_ctx,
912                            struct messaging_rec **prec,
913                            void *private_data)
914 {
915         struct notifyd_state *state = talloc_get_type_abort(
916                 private_data, struct notifyd_state);
917         struct messaging_rec *rec = *prec;
918         struct notifyd_peer *p = NULL;
919         struct server_id_buf idbuf;
920         NTSTATUS status;
921         int count;
922         size_t i;
923
924         for (i=0; i<state->num_peers; i++) {
925                 if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
926                         p = state->peers[i];
927                         break;
928                 }
929         }
930
931         if (p == NULL) {
932                 DEBUG(10, ("%s: Did not find peer for db from %s\n",
933                            __func__, server_id_str_buf(rec->src, &idbuf)));
934                 return true;
935         }
936
937         if (rec->buf.length < 8) {
938                 DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
939                            (unsigned)rec->buf.length,
940                            server_id_str_buf(rec->src, &idbuf)));
941                 TALLOC_FREE(p);
942                 return true;
943         }
944
945         p->rec_index = BVAL(rec->buf.data, 0);
946
947         p->db = db_open_rbt(p);
948         if (p->db == NULL) {
949                 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
950                 TALLOC_FREE(p);
951                 return true;
952         }
953
954         status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
955                                    rec->buf.length - 8);
956         if (!NT_STATUS_IS_OK(status)) {
957                 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
958                            __func__, nt_errstr(status),
959                            server_id_str_buf(rec->src, &idbuf)));
960                 TALLOC_FREE(p);
961                 return true;
962         }
963
964         dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
965                              &count);
966
967         DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
968                    server_id_str_buf(rec->src, &idbuf), count));
969
970         return true;
971 }
972
973 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
974                                      struct server_id src,
975                                      struct messaging_reclog *log)
976 {
977         enum ndr_err_code ndr_err;
978         uint8_t msghdr[MESSAGE_HDR_LENGTH];
979         DATA_BLOB blob;
980         struct iovec iov[2];
981         int ret;
982
983         if (log == NULL) {
984                 return;
985         }
986
987         DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
988                    (uintmax_t)log->rec_index, (unsigned)log->num_recs));
989
990         message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
991                         (struct server_id) {0 });
992         iov[0] = (struct iovec) { .iov_base = msghdr,
993                                   .iov_len = sizeof(msghdr) };
994
995         ndr_err = ndr_push_struct_blob(
996                 &blob, log, log,
997                 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
998         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
999                 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
1000                           __func__, ndr_errstr(ndr_err)));
1001                 goto done;
1002         }
1003         iov[1] = (struct iovec) { .iov_base = blob.data,
1004                                   .iov_len = blob.length };
1005
1006         ret = ctdbd_messaging_send_iov(
1007                 ctdbd_conn, CTDB_BROADCAST_VNNMAP,
1008                 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1009         TALLOC_FREE(blob.data);
1010         if (ret != 0) {
1011                 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
1012                           __func__, strerror(ret)));
1013                 goto done;
1014         }
1015
1016         log->rec_index += 1;
1017
1018 done:
1019         log->num_recs = 0;
1020         TALLOC_FREE(log->recs);
1021 }
1022
1023 struct notifyd_broadcast_reclog_state {
1024         struct tevent_context *ev;
1025         struct ctdbd_connection *ctdbd_conn;
1026         struct server_id src;
1027         struct messaging_reclog *log;
1028 };
1029
1030 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1031
1032 static struct tevent_req *notifyd_broadcast_reclog_send(
1033         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1034         struct ctdbd_connection *ctdbd_conn, struct server_id src,
1035         struct messaging_reclog *log)
1036 {
1037         struct tevent_req *req, *subreq;
1038         struct notifyd_broadcast_reclog_state *state;
1039
1040         req = tevent_req_create(mem_ctx, &state,
1041                                 struct notifyd_broadcast_reclog_state);
1042         if (req == NULL) {
1043                 return NULL;
1044         }
1045         state->ev = ev;
1046         state->ctdbd_conn = ctdbd_conn;
1047         state->src = src;
1048         state->log = log;
1049
1050         subreq = tevent_wakeup_send(state, state->ev,
1051                                     timeval_current_ofs_msec(1000));
1052         if (tevent_req_nomem(subreq, req)) {
1053                 return tevent_req_post(req, ev);
1054         }
1055         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1056         return req;
1057 }
1058
1059 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1060 {
1061         struct tevent_req *req = tevent_req_callback_data(
1062                 subreq, struct tevent_req);
1063         struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1064                 req, struct notifyd_broadcast_reclog_state);
1065         bool ok;
1066
1067         ok = tevent_wakeup_recv(subreq);
1068         TALLOC_FREE(subreq);
1069         if (!ok) {
1070                 tevent_req_oom(req);
1071                 return;
1072         }
1073
1074         notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1075
1076         subreq = tevent_wakeup_send(state, state->ev,
1077                                     timeval_current_ofs_msec(1000));
1078         if (tevent_req_nomem(subreq, req)) {
1079                 return;
1080         }
1081         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1082 }
1083
1084 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1085 {
1086         return tevent_req_simple_recv_unix(req);
1087 }
1088
1089 struct notifyd_clean_peers_state {
1090         struct tevent_context *ev;
1091         struct notifyd_state *notifyd;
1092 };
1093
1094 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1095
1096 static struct tevent_req *notifyd_clean_peers_send(
1097         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1098         struct notifyd_state *notifyd)
1099 {
1100         struct tevent_req *req, *subreq;
1101         struct notifyd_clean_peers_state *state;
1102
1103         req = tevent_req_create(mem_ctx, &state,
1104                                 struct notifyd_clean_peers_state);
1105         if (req == NULL) {
1106                 return NULL;
1107         }
1108         state->ev = ev;
1109         state->notifyd = notifyd;
1110
1111         subreq = tevent_wakeup_send(state, state->ev,
1112                                     timeval_current_ofs_msec(30000));
1113         if (tevent_req_nomem(subreq, req)) {
1114                 return tevent_req_post(req, ev);
1115         }
1116         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1117         return req;
1118 }
1119
1120 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1121 {
1122         struct tevent_req *req = tevent_req_callback_data(
1123                 subreq, struct tevent_req);
1124         struct notifyd_clean_peers_state *state = tevent_req_data(
1125                 req, struct notifyd_clean_peers_state);
1126         struct notifyd_state *notifyd = state->notifyd;
1127         size_t i;
1128         bool ok;
1129         time_t now = time(NULL);
1130
1131         ok = tevent_wakeup_recv(subreq);
1132         TALLOC_FREE(subreq);
1133         if (!ok) {
1134                 tevent_req_oom(req);
1135                 return;
1136         }
1137
1138         i = 0;
1139         while (i < notifyd->num_peers) {
1140                 struct notifyd_peer *p = notifyd->peers[i];
1141
1142                 if ((now - p->last_broadcast) > 60) {
1143                         struct server_id_buf idbuf;
1144
1145                         /*
1146                          * Haven't heard for more than 60 seconds. Call this
1147                          * peer dead
1148                          */
1149
1150                         DEBUG(10, ("%s: peer %s died\n", __func__,
1151                                    server_id_str_buf(p->pid, &idbuf)));
1152                         /*
1153                          * This implicitly decrements notifyd->num_peers
1154                          */
1155                         TALLOC_FREE(p);
1156                 } else {
1157                         i += 1;
1158                 }
1159         }
1160
1161         subreq = tevent_wakeup_send(state, state->ev,
1162                                     timeval_current_ofs_msec(30000));
1163         if (tevent_req_nomem(subreq, req)) {
1164                 return;
1165         }
1166         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1167 }
1168
1169 static int notifyd_clean_peers_recv(struct tevent_req *req)
1170 {
1171         return tevent_req_simple_recv_unix(req);
1172 }
1173
1174 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1175                                         void *private_data)
1176 {
1177         struct notifyd_state *state = talloc_get_type_abort(
1178                 private_data, struct notifyd_state);
1179         struct db_context *db = dbwrap_record_get_db(rec);
1180         TDB_DATA key = dbwrap_record_get_key(rec);
1181         TDB_DATA value = dbwrap_record_get_value(rec);
1182         struct notifyd_instance *instances = NULL;
1183         size_t num_instances = 0;
1184         size_t i;
1185         char path[key.dsize+1];
1186         bool ok;
1187
1188         memcpy(path, key.dptr, key.dsize);
1189         path[key.dsize] = '\0';
1190
1191         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1192                                  &num_instances);
1193         if (!ok) {
1194                 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1195                           __func__, path));
1196                 return 0;
1197         }
1198
1199         for (i=0; i<num_instances; i++) {
1200                 struct notifyd_instance *instance = &instances[i];
1201                 uint32_t filter = instance->instance.filter;
1202                 uint32_t subdir_filter = instance->instance.subdir_filter;
1203                 int ret;
1204
1205                 ret = state->sys_notify_watch(
1206                         db, state->sys_notify_ctx, path,
1207                         &filter, &subdir_filter,
1208                         notifyd_sys_callback, state->msg_ctx,
1209                         &instance->sys_watch);
1210                 if (ret != 0) {
1211                         DEBUG(1, ("%s: inotify_watch returned %s\n",
1212                                   __func__, strerror(errno)));
1213                 }
1214         }
1215
1216         return 0;
1217 }
1218
1219 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1220 {
1221         TDB_DATA key = dbwrap_record_get_key(rec);
1222         TDB_DATA value = dbwrap_record_get_value(rec);
1223         struct notifyd_instance *instances = NULL;
1224         size_t num_instances = 0;
1225         size_t i;
1226         bool ok;
1227
1228         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1229                                  &num_instances);
1230         if (!ok) {
1231                 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1232                           __func__, (int)key.dsize, (char *)key.dptr));
1233                 return 0;
1234         }
1235         for (i=0; i<num_instances; i++) {
1236                 TALLOC_FREE(instances[i].sys_watch);
1237         }
1238         return 0;
1239 }
1240
1241 static int notifyd_peer_destructor(struct notifyd_peer *p)
1242 {
1243         struct notifyd_state *state = p->state;
1244         size_t i;
1245
1246         if (p->db != NULL) {
1247                 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1248                                      NULL, NULL);
1249         }
1250
1251         for (i = 0; i<state->num_peers; i++) {
1252                 if (p == state->peers[i]) {
1253                         state->peers[i] = state->peers[state->num_peers-1];
1254                         state->num_peers -= 1;
1255                         break;
1256                 }
1257         }
1258         return 0;
1259 }
1260
1261 static struct notifyd_peer *notifyd_peer_new(
1262         struct notifyd_state *state, struct server_id pid)
1263 {
1264         struct notifyd_peer *p, **tmp;
1265
1266         tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1267                              state->num_peers+1);
1268         if (tmp == NULL) {
1269                 return NULL;
1270         }
1271         state->peers = tmp;
1272
1273         p = talloc_zero(state->peers, struct notifyd_peer);
1274         if (p == NULL) {
1275                 return NULL;
1276         }
1277         p->state = state;
1278         p->pid = pid;
1279
1280         state->peers[state->num_peers] = p;
1281         state->num_peers += 1;
1282
1283         talloc_set_destructor(p, notifyd_peer_destructor);
1284
1285         return p;
1286 }
1287
1288 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1289                                  const uint8_t *msg, size_t msglen)
1290 {
1291         struct notifyd_state *state = peer->state;
1292         DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1293                            .length = msglen };
1294         struct server_id_buf idbuf;
1295         struct messaging_reclog *log;
1296         enum ndr_err_code ndr_err;
1297         uint32_t i;
1298
1299         if (peer->db == NULL) {
1300                 /*
1301                  * No db yet
1302                  */
1303                 return;
1304         }
1305
1306         log = talloc(peer, struct messaging_reclog);
1307         if (log == NULL) {
1308                 DEBUG(10, ("%s: talloc failed\n", __func__));
1309                 return;
1310         }
1311
1312         ndr_err = ndr_pull_struct_blob_all(
1313                 &blob, log, log,
1314                 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1315         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1316                 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1317                            __func__, ndr_errstr(ndr_err)));
1318                 goto fail;
1319         }
1320
1321         DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1322                    (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1323                    server_id_str_buf(peer->pid, &idbuf)));
1324
1325         if (log->rec_index != peer->rec_index) {
1326                 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1327                           __func__, (uintmax_t)log->rec_index,
1328                           server_id_str_buf(peer->pid, &idbuf),
1329                           (uintmax_t)peer->rec_index));
1330                 goto fail;
1331         }
1332
1333         for (i=0; i<log->num_recs; i++) {
1334                 struct messaging_rec *r = log->recs[i];
1335                 struct notify_rec_change_msg *chg;
1336                 size_t pathlen;
1337                 bool ok;
1338
1339                 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1340                                               &chg, &pathlen);
1341                 if (!ok) {
1342                         DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1343                                   __func__));
1344                         goto fail;
1345                 }
1346
1347                 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1348                                               &chg->instance, peer->db,
1349                                               state->sys_notify_watch,
1350                                               state->sys_notify_ctx,
1351                                               state->msg_ctx);
1352                 if (!ok) {
1353                         DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1354                                   __func__));
1355                         goto fail;
1356                 }
1357         }
1358
1359         peer->rec_index += 1;
1360         peer->last_broadcast = time(NULL);
1361
1362         TALLOC_FREE(log);
1363         return;
1364
1365 fail:
1366         DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1367                    server_id_str_buf(peer->pid, &idbuf)));
1368         TALLOC_FREE(peer);
1369 }
1370
1371 /*
1372  * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1373  * messages) broadcasts by other notifyds. Several cases:
1374  *
1375  * We don't know the source. This creates a new peer. Creating a peer
1376  * involves asking the peer for its full database. We assume ordered
1377  * messages, so the new database will arrive before the next broadcast
1378  * will.
1379  *
1380  * We know the source and the log index matches. We will apply the log
1381  * locally to our peer's db as if we had received it from a local
1382  * client.
1383  *
1384  * We know the source but the log index does not match. This means we
1385  * lost a message. We just drop the whole peer and wait for the next
1386  * broadcast, which will then trigger a fresh database pull.
1387  */
1388
1389 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
1390                                    uint64_t dst_srvid,
1391                                    const uint8_t *msg, size_t msglen,
1392                                    void *private_data)
1393 {
1394         struct notifyd_state *state = talloc_get_type_abort(
1395                 private_data, struct notifyd_state);
1396         struct server_id my_id = messaging_server_id(state->msg_ctx);
1397         struct notifyd_peer *p;
1398         uint32_t i;
1399         uint32_t msg_type;
1400         struct server_id src, dst;
1401         struct server_id_buf idbuf;
1402         NTSTATUS status;
1403
1404         if (msglen < MESSAGE_HDR_LENGTH) {
1405                 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1406                 return 0;
1407         }
1408         message_hdr_get(&msg_type, &src, &dst, msg);
1409
1410         if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1411                 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1412                            (unsigned)msg_type));
1413                 return 0;
1414         }
1415         if (server_id_equal(&src, &my_id)) {
1416                 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1417                 return 0;
1418         }
1419
1420         DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1421                    __func__, server_id_str_buf(src, &idbuf)));
1422
1423         for (i=0; i<state->num_peers; i++) {
1424                 if (server_id_equal(&state->peers[i]->pid, &src)) {
1425
1426                         DEBUG(10, ("%s: Applying changes to peer %u\n",
1427                                    __func__, (unsigned)i));
1428
1429                         notifyd_apply_reclog(state->peers[i],
1430                                              msg + MESSAGE_HDR_LENGTH,
1431                                              msglen - MESSAGE_HDR_LENGTH);
1432                         return 0;
1433                 }
1434         }
1435
1436         DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1437                    server_id_str_buf(src, &idbuf)));
1438
1439         p = notifyd_peer_new(state, src);
1440         if (p == NULL) {
1441                 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1442                 return 0;
1443         }
1444
1445         status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1446                                     NULL, 0);
1447         if (!NT_STATUS_IS_OK(status)) {
1448                 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1449                            __func__, nt_errstr(status)));
1450                 TALLOC_FREE(p);
1451                 return 0;
1452         }
1453
1454         return 0;
1455 }
1456 #endif
1457
1458 struct notifyd_parse_db_state {
1459         bool (*fn)(const char *path,
1460                    struct server_id server,
1461                    const struct notify_instance *instance,
1462                    void *private_data);
1463         void *private_data;
1464 };
1465
1466 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1467                                     void *private_data)
1468 {
1469         struct notifyd_parse_db_state *state = private_data;
1470         char path[key.dsize+1];
1471         struct notifyd_instance *instances = NULL;
1472         size_t num_instances = 0;
1473         size_t i;
1474         bool ok;
1475
1476         memcpy(path, key.dptr, key.dsize);
1477         path[key.dsize] = 0;
1478
1479         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1480                                  &num_instances);
1481         if (!ok) {
1482                 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1483                            __func__, path));
1484                 return true;
1485         }
1486
1487         for (i=0; i<num_instances; i++) {
1488                 ok = state->fn(path, instances[i].client,
1489                                &instances[i].instance,
1490                                state->private_data);
1491                 if (!ok) {
1492                         return false;
1493                 }
1494         }
1495
1496         return true;
1497 }
1498
1499 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1500                      uint64_t *log_index,
1501                      bool (*fn)(const char *path,
1502                                 struct server_id server,
1503                                 const struct notify_instance *instance,
1504                                 void *private_data),
1505                      void *private_data)
1506 {
1507         struct notifyd_parse_db_state state = {
1508                 .fn = fn, .private_data = private_data
1509         };
1510         NTSTATUS status;
1511
1512         if (buflen < 8) {
1513                 return EINVAL;
1514         }
1515         *log_index = BVAL(buf, 0);
1516
1517         buf += 8;
1518         buflen -= 8;
1519
1520         status = dbwrap_parse_marshall_buf(
1521                 buf, buflen, notifyd_parse_db_parser, &state);
1522         if (!NT_STATUS_IS_OK(status)) {
1523                 return map_errno_from_nt_status(status);
1524         }
1525
1526         return 0;
1527 }