lib: Make ctdbd_messaging_send_iov return 0/errno
[metze/samba/wip.git] / source3 / smbd / notifyd / notifyd.c
1 /*
2  * Unix SMB/CIFS implementation.
3  *
4  * Copyright (C) Volker Lendecke 2014
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "includes.h"
21 #include "librpc/gen_ndr/notify.h"
22 #include "librpc/gen_ndr/messaging.h"
23 #include "librpc/gen_ndr/server_id.h"
24 #include "lib/dbwrap/dbwrap.h"
25 #include "lib/dbwrap/dbwrap_rbt.h"
26 #include "messages.h"
27 #include "proto.h"
28 #include "tdb.h"
29 #include "util_tdb.h"
30 #include "notifyd.h"
31 #include "lib/util/server_id_db.h"
32 #include "lib/util/tevent_unix.h"
33 #include "ctdbd_conn.h"
34 #include "ctdb_srvids.h"
35 #include "source3/smbd/proto.h"
36 #include "ctdb/include/ctdb_protocol.h"
37 #include "server_id_db_util.h"
38 #include "lib/util/iov_buf.h"
39 #include "messages_util.h"
40
41 struct notifyd_peer;
42
43 /*
44  * All of notifyd's state
45  */
46
47 struct notifyd_state {
48         struct tevent_context *ev;
49         struct messaging_context *msg_ctx;
50         struct ctdbd_connection *ctdbd_conn;
51
52         /*
53          * Database of everything clients show interest in. Indexed by
54          * absolute path. The database keys are not 0-terminated
55          * because the criticial operation, notifyd_trigger, can walk
56          * the structure from the top without adding intermediate 0s.
57          * The database records contain an array of
58          *
59          * struct notifyd_instance
60          *
61          * to be maintained by parsed by notifyd_entry_parse()
62          */
63         struct db_context *entries;
64
65         /*
66          * In the cluster case, this is the place where we store a log
67          * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
68          * forward them to our peer notifyd's in the cluster once a
69          * second or when the log grows too large.
70          */
71
72         struct messaging_reclog *log;
73
74         /*
75          * Array of companion notifyd's in a cluster. Every notifyd
76          * broadcasts its messaging_reclog to every other notifyd in
77          * the cluster. This is done by making ctdb send a message to
78          * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
79          * number CTDB_BROADCAST_VNNMAP. Everybody in the cluster who
80          * had called register_with_ctdbd this srvid will receive the
81          * broadcasts.
82          *
83          * Database replication happens via these broadcasts. Also,
84          * they serve as liveness indication. If a notifyd receives a
85          * broadcast from an unknown peer, it will create one for this
86          * srvid. Also when we don't hear anything from a peer for a
87          * while, we will discard it.
88          */
89
90         struct notifyd_peer **peers;
91         size_t num_peers;
92
93         sys_notify_watch_fn sys_notify_watch;
94         struct sys_notify_context *sys_notify_ctx;
95 };
96
97 /*
98  * notifyd's representation of a notify instance
99  */
100 struct notifyd_instance {
101         struct server_id client;
102         struct notify_instance instance;
103
104         void *sys_watch; /* inotify/fam/etc handle */
105
106         /*
107          * Filters after sys_watch took responsibility of some bits
108          */
109         uint32_t internal_filter;
110         uint32_t internal_subdir_filter;
111 };
112
113 struct notifyd_peer {
114         struct notifyd_state *state;
115         struct server_id pid;
116         uint64_t rec_index;
117         struct db_context *db;
118         time_t last_broadcast;
119 };
120
121 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
122                                struct messaging_rec **prec,
123                                void *private_data);
124 static bool notifyd_trigger(struct messaging_context *msg_ctx,
125                             struct messaging_rec **prec,
126                             void *private_data);
127 static bool notifyd_get_db(struct messaging_context *msg_ctx,
128                            struct messaging_rec **prec,
129                            void *private_data);
130 static bool notifyd_got_db(struct messaging_context *msg_ctx,
131                            struct messaging_rec **prec,
132                            void *private_data);
133 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
134                                      struct server_id src,
135                                      struct messaging_reclog *log);
136 static void notifyd_sys_callback(struct sys_notify_context *ctx,
137                                  void *private_data, struct notify_event *ev);
138
139 static struct tevent_req *notifyd_broadcast_reclog_send(
140         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
141         struct ctdbd_connection *ctdbd_conn, struct server_id src,
142         struct messaging_reclog *log);
143 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
144
145 static struct tevent_req *notifyd_clean_peers_send(
146         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
147         struct notifyd_state *notifyd);
148 static int notifyd_clean_peers_recv(struct tevent_req *req);
149
150 static int sys_notify_watch_dummy(
151         TALLOC_CTX *mem_ctx,
152         struct sys_notify_context *ctx,
153         const char *path,
154         uint32_t *filter,
155         uint32_t *subdir_filter,
156         void (*callback)(struct sys_notify_context *ctx,
157                          void *private_data,
158                          struct notify_event *ev),
159         void *private_data,
160         void *handle_p)
161 {
162         void **handle = handle_p;
163         *handle = NULL;
164         return 0;
165 }
166
167 static void notifyd_handler_done(struct tevent_req *subreq);
168 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
169 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
170 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
171                                    uint64_t dst_srvid,
172                                    const uint8_t *msg, size_t msglen,
173                                    void *private_data);
174
175 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
176                                 struct messaging_context *msg_ctx,
177                                 struct ctdbd_connection *ctdbd_conn,
178                                 sys_notify_watch_fn sys_notify_watch,
179                                 struct sys_notify_context *sys_notify_ctx)
180 {
181         struct tevent_req *req, *subreq;
182         struct notifyd_state *state;
183         struct server_id_db *names_db;
184         int ret;
185
186         req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
187         if (req == NULL) {
188                 return NULL;
189         }
190         state->ev = ev;
191         state->msg_ctx = msg_ctx;
192         state->ctdbd_conn = ctdbd_conn;
193
194         if (sys_notify_watch == NULL) {
195                 sys_notify_watch = sys_notify_watch_dummy;
196         }
197
198         state->sys_notify_watch = sys_notify_watch;
199         state->sys_notify_ctx = sys_notify_ctx;
200
201         state->entries = db_open_rbt(state);
202         if (tevent_req_nomem(state->entries, req)) {
203                 return tevent_req_post(req, ev);
204         }
205
206         subreq = messaging_handler_send(state, ev, msg_ctx,
207                                         MSG_SMB_NOTIFY_REC_CHANGE,
208                                         notifyd_rec_change, state);
209         if (tevent_req_nomem(subreq, req)) {
210                 return tevent_req_post(req, ev);
211         }
212         tevent_req_set_callback(subreq, notifyd_handler_done, req);
213
214         subreq = messaging_handler_send(state, ev, msg_ctx,
215                                         MSG_SMB_NOTIFY_TRIGGER,
216                                         notifyd_trigger, state);
217         if (tevent_req_nomem(subreq, req)) {
218                 return tevent_req_post(req, ev);
219         }
220         tevent_req_set_callback(subreq, notifyd_handler_done, req);
221
222         subreq = messaging_handler_send(state, ev, msg_ctx,
223                                         MSG_SMB_NOTIFY_GET_DB,
224                                         notifyd_get_db, state);
225         if (tevent_req_nomem(subreq, req)) {
226                 return tevent_req_post(req, ev);
227         }
228         tevent_req_set_callback(subreq, notifyd_handler_done, req);
229
230         subreq = messaging_handler_send(state, ev, msg_ctx,
231                                         MSG_SMB_NOTIFY_DB,
232                                         notifyd_got_db, state);
233         if (tevent_req_nomem(subreq, req)) {
234                 return tevent_req_post(req, ev);
235         }
236         tevent_req_set_callback(subreq, notifyd_handler_done, req);
237
238         names_db = messaging_names_db(msg_ctx);
239
240         ret = server_id_db_set_exclusive(names_db, "notify-daemon");
241         if (ret != 0) {
242                 DEBUG(10, ("%s: server_id_db_add failed: %s\n",
243                            __func__, strerror(ret)));
244                 tevent_req_error(req, ret);
245                 return tevent_req_post(req, ev);
246         }
247
248         if (ctdbd_conn == NULL) {
249                 /*
250                  * No cluster around, skip the database replication
251                  * engine
252                  */
253                 return req;
254         }
255
256         state->log = talloc_zero(state, struct messaging_reclog);
257         if (tevent_req_nomem(state->log, req)) {
258                 return tevent_req_post(req, ev);
259         }
260
261         subreq = notifyd_broadcast_reclog_send(
262                 state->log, ev, ctdbd_conn, messaging_server_id(msg_ctx),
263                 state->log);
264         if (tevent_req_nomem(subreq, req)) {
265                 return tevent_req_post(req, ev);
266         }
267         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_finished,
268                                 req);
269
270         subreq = notifyd_clean_peers_send(state, ev, state);
271         if (tevent_req_nomem(subreq, req)) {
272                 return tevent_req_post(req, ev);
273         }
274         tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
275                                 req);
276
277         ret = register_with_ctdbd(ctdbd_conn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
278                                   notifyd_snoop_broadcast, state);
279         if (ret != 0) {
280                 tevent_req_error(req, ret);
281                 return tevent_req_post(req, ev);
282         }
283
284         return req;
285 }
286
287 static void notifyd_handler_done(struct tevent_req *subreq)
288 {
289         struct tevent_req *req = tevent_req_callback_data(
290                 subreq, struct tevent_req);
291         int ret;
292
293         ret = messaging_handler_recv(subreq);
294         TALLOC_FREE(subreq);
295         tevent_req_error(req, ret);
296 }
297
298 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
299 {
300         struct tevent_req *req = tevent_req_callback_data(
301                 subreq, struct tevent_req);
302         int ret;
303
304         ret = notifyd_broadcast_reclog_recv(subreq);
305         TALLOC_FREE(subreq);
306         tevent_req_error(req, ret);
307 }
308
309 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
310 {
311         struct tevent_req *req = tevent_req_callback_data(
312                 subreq, struct tevent_req);
313         int ret;
314
315         ret = notifyd_clean_peers_recv(subreq);
316         TALLOC_FREE(subreq);
317         tevent_req_error(req, ret);
318 }
319
320 int notifyd_recv(struct tevent_req *req)
321 {
322         return tevent_req_simple_recv_unix(req);
323 }
324
325 /*
326  * Parse an entry in the notifyd_context->entries database
327  */
328
329 static bool notifyd_parse_entry(uint8_t *buf, size_t buflen,
330                                 struct notifyd_instance **instances,
331                                 size_t *num_instances)
332 {
333         if ((buflen % sizeof(struct notifyd_instance)) != 0) {
334                 DEBUG(1, ("%s: invalid buffer size: %u\n",
335                           __func__, (unsigned)buflen));
336                 return false;
337         }
338
339         if (instances != NULL) {
340                 *instances = (struct notifyd_instance *)buf;
341         }
342         if (num_instances != NULL) {
343                 *num_instances = buflen / sizeof(struct notifyd_instance);
344         }
345         return true;
346 }
347
348 static bool notifyd_apply_rec_change(
349         const struct server_id *client,
350         const char *path, size_t pathlen,
351         const struct notify_instance *chg,
352         struct db_context *entries,
353         sys_notify_watch_fn sys_notify_watch,
354         struct sys_notify_context *sys_notify_ctx,
355         struct messaging_context *msg_ctx)
356 {
357         struct db_record *rec;
358         struct notifyd_instance *instances;
359         size_t num_instances;
360         size_t i;
361         struct notifyd_instance *instance;
362         TDB_DATA value;
363         NTSTATUS status;
364         bool ok = false;
365
366         if (pathlen == 0) {
367                 DEBUG(1, ("%s: pathlen==0\n", __func__));
368                 return false;
369         }
370         if (path[pathlen-1] != '\0') {
371                 DEBUG(1, ("%s: path not 0-terminated\n", __func__));
372                 return false;
373         }
374
375         DEBUG(10, ("%s: path=%s, filter=%u, subdir_filter=%u, "
376                    "private_data=%p\n", __func__, path,
377                    (unsigned)chg->filter, (unsigned)chg->subdir_filter,
378                    chg->private_data));
379
380         rec = dbwrap_fetch_locked(
381                 entries, entries,
382                 make_tdb_data((const uint8_t *)path, pathlen-1));
383
384         if (rec == NULL) {
385                 DEBUG(1, ("%s: dbwrap_fetch_locked failed\n", __func__));
386                 goto fail;
387         }
388
389         num_instances = 0;
390         value = dbwrap_record_get_value(rec);
391
392         if (value.dsize != 0) {
393                 if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
394                                          &num_instances)) {
395                         goto fail;
396                 }
397         }
398
399         /*
400          * Overallocate by one instance to avoid a realloc when adding
401          */
402         instances = talloc_array(rec, struct notifyd_instance,
403                                  num_instances + 1);
404         if (instances == NULL) {
405                 DEBUG(1, ("%s: talloc failed\n", __func__));
406                 goto fail;
407         }
408
409         if (value.dsize != 0) {
410                 memcpy(instances, value.dptr, value.dsize);
411         }
412
413         for (i=0; i<num_instances; i++) {
414                 instance = &instances[i];
415
416                 if (server_id_equal(&instance->client, client) &&
417                     (instance->instance.private_data == chg->private_data)) {
418                         break;
419                 }
420         }
421
422         if (i < num_instances) {
423                 instance->instance = *chg;
424         } else {
425                 /*
426                  * We've overallocated for one instance
427                  */
428                 instance = &instances[num_instances];
429
430                 *instance = (struct notifyd_instance) {
431                         .client = *client,
432                         .instance = *chg,
433                         .internal_filter = chg->filter,
434                         .internal_subdir_filter = chg->subdir_filter
435                 };
436
437                 num_instances += 1;
438         }
439
440         if ((instance->instance.filter != 0) ||
441             (instance->instance.subdir_filter != 0)) {
442                 int ret;
443
444                 TALLOC_FREE(instance->sys_watch);
445
446                 ret = sys_notify_watch(entries, sys_notify_ctx, path,
447                                        &instance->internal_filter,
448                                        &instance->internal_subdir_filter,
449                                        notifyd_sys_callback, msg_ctx,
450                                        &instance->sys_watch);
451                 if (ret != 0) {
452                         DEBUG(1, ("%s: inotify_watch returned %s\n",
453                                   __func__, strerror(errno)));
454                 }
455         }
456
457         if ((instance->instance.filter == 0) &&
458             (instance->instance.subdir_filter == 0)) {
459                 /* This is a delete request */
460                 TALLOC_FREE(instance->sys_watch);
461                 *instance = instances[num_instances-1];
462                 num_instances -= 1;
463         }
464
465         DEBUG(10, ("%s: %s has %u instances\n", __func__,
466                    path, (unsigned)num_instances));
467
468         if (num_instances == 0) {
469                 status = dbwrap_record_delete(rec);
470                 if (!NT_STATUS_IS_OK(status)) {
471                         DEBUG(1, ("%s: dbwrap_record_delete returned %s\n",
472                                   __func__, nt_errstr(status)));
473                         goto fail;
474                 }
475         } else {
476                 value = make_tdb_data(
477                         (uint8_t *)instances,
478                         sizeof(struct notifyd_instance) * num_instances);
479
480                 status = dbwrap_record_store(rec, value, 0);
481                 if (!NT_STATUS_IS_OK(status)) {
482                         DEBUG(1, ("%s: dbwrap_record_store returned %s\n",
483                                   __func__, nt_errstr(status)));
484                         goto fail;
485                 }
486         }
487
488         ok = true;
489 fail:
490         TALLOC_FREE(rec);
491         return ok;
492 }
493
494 static void notifyd_sys_callback(struct sys_notify_context *ctx,
495                                  void *private_data, struct notify_event *ev)
496 {
497         struct messaging_context *msg_ctx = talloc_get_type_abort(
498                 private_data, struct messaging_context);
499         struct notify_trigger_msg msg;
500         struct iovec iov[4];
501         char slash = '/';
502
503         msg = (struct notify_trigger_msg) {
504                 .when = timespec_current(),
505                 .action = ev->action,
506                 .filter = UINT32_MAX
507         };
508
509         iov[0].iov_base = &msg;
510         iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
511         iov[1].iov_base = discard_const_p(char, ev->dir);
512         iov[1].iov_len = strlen(ev->dir);
513         iov[2].iov_base = &slash;
514         iov[2].iov_len = 1;
515         iov[3].iov_base = discard_const_p(char, ev->path);
516         iov[3].iov_len = strlen(ev->path)+1;
517
518         messaging_send_iov(
519                 msg_ctx, messaging_server_id(msg_ctx),
520                 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
521 }
522
523 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
524                                      struct notify_rec_change_msg **pmsg,
525                                      size_t *pathlen)
526 {
527         struct notify_rec_change_msg *msg;
528
529         if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
530                 DEBUG(1, ("%s: message too short, ignoring: %u\n", __func__,
531                           (unsigned)bufsize));
532                 return false;
533         }
534
535         *pmsg = msg = (struct notify_rec_change_msg *)buf;
536         *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
537
538         DEBUG(10, ("%s: Got rec_change_msg filter=%u, subdir_filter=%u, "
539                    "private_data=%p, path=%.*s\n",
540                    __func__, (unsigned)msg->instance.filter,
541                    (unsigned)msg->instance.subdir_filter,
542                    msg->instance.private_data, (int)(*pathlen), msg->path));
543
544         return true;
545 }
546
547 static bool notifyd_rec_change(struct messaging_context *msg_ctx,
548                                struct messaging_rec **prec,
549                                void *private_data)
550 {
551         struct notifyd_state *state = talloc_get_type_abort(
552                 private_data, struct notifyd_state);
553         struct server_id_buf idbuf;
554         struct messaging_rec *rec = *prec;
555         struct messaging_rec **tmp;
556         struct messaging_reclog *log;
557         struct notify_rec_change_msg *msg;
558         size_t pathlen;
559         bool ok;
560
561         DEBUG(10, ("%s: Got %d bytes from %s\n", __func__,
562                    (unsigned)rec->buf.length,
563                    server_id_str_buf(rec->src, &idbuf)));
564
565         ok = notifyd_parse_rec_change(rec->buf.data, rec->buf.length,
566                                       &msg, &pathlen);
567         if (!ok) {
568                 return true;
569         }
570
571         ok = notifyd_apply_rec_change(
572                 &rec->src, msg->path, pathlen, &msg->instance,
573                 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
574                 state->msg_ctx);
575         if (!ok) {
576                 DEBUG(1, ("%s: notifyd_apply_rec_change failed, ignoring\n",
577                           __func__));
578                 return true;
579         }
580
581         if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
582                 return true;
583         }
584         log = state->log;
585
586         tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
587                              log->num_recs+1);
588         if (tmp == NULL) {
589                 DEBUG(1, ("%s: talloc_realloc failed, ignoring\n", __func__));
590                 return true;
591         }
592         log->recs = tmp;
593
594         log->recs[log->num_recs] = talloc_move(log->recs, prec);
595         log->num_recs += 1;
596
597         if (log->num_recs >= 100) {
598                 /*
599                  * Don't let the log grow too large
600                  */
601                 notifyd_broadcast_reclog(state->ctdbd_conn,
602                                          messaging_server_id(msg_ctx), log);
603         }
604
605         return true;
606 }
607
608 struct notifyd_trigger_state {
609         struct messaging_context *msg_ctx;
610         struct notify_trigger_msg *msg;
611         bool recursive;
612         bool covered_by_sys_notify;
613 };
614
615 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
616                                    void *private_data);
617
618 static bool notifyd_trigger(struct messaging_context *msg_ctx,
619                             struct messaging_rec **prec,
620                             void *private_data)
621 {
622         struct notifyd_state *state = talloc_get_type_abort(
623                 private_data, struct notifyd_state);
624         struct server_id my_id = messaging_server_id(msg_ctx);
625         struct messaging_rec *rec = *prec;
626         struct notifyd_trigger_state tstate;
627         const char *path;
628         const char *p, *next_p;
629
630         if (rec->buf.length < offsetof(struct notify_trigger_msg, path) + 1) {
631                 DEBUG(1, ("message too short, ignoring: %u\n",
632                           (unsigned)rec->buf.length));
633                 return true;
634         }
635         if (rec->buf.data[rec->buf.length-1] != 0) {
636                 DEBUG(1, ("%s: path not 0-terminated, ignoring\n", __func__));
637                 return true;
638         }
639
640         tstate.msg_ctx = msg_ctx;
641
642         tstate.covered_by_sys_notify = (rec->src.vnn == my_id.vnn);
643         tstate.covered_by_sys_notify &= !server_id_equal(&rec->src, &my_id);
644
645         tstate.msg = (struct notify_trigger_msg *)rec->buf.data;
646         path = tstate.msg->path;
647
648         DEBUG(10, ("%s: Got trigger_msg action=%u, filter=%u, path=%s\n",
649                    __func__, (unsigned)tstate.msg->action,
650                    (unsigned)tstate.msg->filter, path));
651
652         if (path[0] != '/') {
653                 DEBUG(1, ("%s: path %s does not start with /, ignoring\n",
654                           __func__, path));
655                 return true;
656         }
657
658         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
659                 ptrdiff_t path_len = p - path;
660                 TDB_DATA key;
661                 uint32_t i;
662
663                 next_p = strchr(p+1, '/');
664                 tstate.recursive = (next_p != NULL);
665
666                 DEBUG(10, ("%s: Trying path %.*s\n", __func__,
667                            (int)path_len, path));
668
669                 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
670                                    .dsize = path_len };
671
672                 dbwrap_parse_record(state->entries, key,
673                                     notifyd_trigger_parser, &tstate);
674
675                 if (state->peers == NULL) {
676                         continue;
677                 }
678
679                 if (rec->src.vnn != my_id.vnn) {
680                         continue;
681                 }
682
683                 for (i=0; i<state->num_peers; i++) {
684                         if (state->peers[i]->db == NULL) {
685                                 /*
686                                  * Inactive peer, did not get a db yet
687                                  */
688                                 continue;
689                         }
690                         dbwrap_parse_record(state->peers[i]->db, key,
691                                             notifyd_trigger_parser, &tstate);
692                 }
693         }
694
695         return true;
696 }
697
698 static void notifyd_send_delete(struct messaging_context *msg_ctx,
699                                 TDB_DATA key,
700                                 struct notifyd_instance *instance);
701
702 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
703                                    void *private_data)
704
705 {
706         struct notifyd_trigger_state *tstate = private_data;
707         struct notify_event_msg msg = { .action = tstate->msg->action };
708         struct iovec iov[2];
709         size_t path_len = key.dsize;
710         struct notifyd_instance *instances = NULL;
711         size_t num_instances = 0;
712         size_t i;
713
714         if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
715                                  &num_instances)) {
716                 DEBUG(1, ("%s: Could not parse notifyd_entry\n", __func__));
717                 return;
718         }
719
720         DEBUG(10, ("%s: Found %u instances for %.*s\n", __func__,
721                    (unsigned)num_instances, (int)key.dsize,
722                    (char *)key.dptr));
723
724         iov[0].iov_base = &msg;
725         iov[0].iov_len = offsetof(struct notify_event_msg, path);
726         iov[1].iov_base = tstate->msg->path + path_len + 1;
727         iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
728
729         for (i=0; i<num_instances; i++) {
730                 struct notifyd_instance *instance = &instances[i];
731                 struct server_id_buf idbuf;
732                 uint32_t i_filter;
733                 NTSTATUS status;
734
735                 if (tstate->covered_by_sys_notify) {
736                         if (tstate->recursive) {
737                                 i_filter = instance->internal_subdir_filter;
738                         } else {
739                                 i_filter = instance->internal_filter;
740                         }
741                 } else {
742                         if (tstate->recursive) {
743                                 i_filter = instance->instance.subdir_filter;
744                         } else {
745                                 i_filter = instance->instance.filter;
746                         }
747                 }
748
749                 if ((i_filter & tstate->msg->filter) == 0) {
750                         continue;
751                 }
752
753                 msg.private_data = instance->instance.private_data;
754
755                 status = messaging_send_iov(
756                         tstate->msg_ctx, instance->client,
757                         MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
758
759                 DEBUG(10, ("%s: messaging_send_iov to %s returned %s\n",
760                            __func__,
761                            server_id_str_buf(instance->client, &idbuf),
762                            nt_errstr(status)));
763
764                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
765                     procid_is_local(&instance->client)) {
766                         /*
767                          * That process has died
768                          */
769                         notifyd_send_delete(tstate->msg_ctx, key, instance);
770                         continue;
771                 }
772
773                 if (!NT_STATUS_IS_OK(status)) {
774                         DEBUG(1, ("%s: messaging_send_iov returned %s\n",
775                                   __func__, nt_errstr(status)));
776                 }
777         }
778 }
779
780 /*
781  * Send a delete request to ourselves to properly discard a notify
782  * record for an smbd that has died.
783  */
784
785 static void notifyd_send_delete(struct messaging_context *msg_ctx,
786                                 TDB_DATA key,
787                                 struct notifyd_instance *instance)
788 {
789         struct notify_rec_change_msg msg = {
790                 .instance.private_data = instance->instance.private_data
791         };
792         uint8_t nul = 0;
793         struct iovec iov[3];
794         NTSTATUS status;
795
796         /*
797          * Send a rec_change to ourselves to delete a dead entry
798          */
799
800         iov[0] = (struct iovec) {
801                 .iov_base = &msg,
802                 .iov_len = offsetof(struct notify_rec_change_msg, path) };
803         iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
804         iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
805
806         status = messaging_send_iov_from(
807                 msg_ctx, instance->client, messaging_server_id(msg_ctx),
808                 MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
809
810         if (!NT_STATUS_IS_OK(status)) {
811                 DEBUG(10, ("%s: messaging_send_iov_from returned %s\n",
812                            __func__, nt_errstr(status)));
813         }
814 }
815
816 static bool notifyd_get_db(struct messaging_context *msg_ctx,
817                            struct messaging_rec **prec,
818                            void *private_data)
819 {
820         struct notifyd_state *state = talloc_get_type_abort(
821                 private_data, struct notifyd_state);
822         struct messaging_rec *rec = *prec;
823         struct server_id_buf id1, id2;
824         NTSTATUS status;
825         uint64_t rec_index = UINT64_MAX;
826         uint8_t index_buf[sizeof(uint64_t)];
827         size_t dbsize;
828         uint8_t *buf;
829         struct iovec iov[2];
830
831         dbsize = dbwrap_marshall(state->entries, NULL, 0);
832
833         buf = talloc_array(rec, uint8_t, dbsize);
834         if (buf == NULL) {
835                 DEBUG(1, ("%s: talloc_array(%ju) failed\n",
836                           __func__, (uintmax_t)dbsize));
837                 return true;
838         }
839
840         dbsize = dbwrap_marshall(state->entries, buf, dbsize);
841
842         if (dbsize != talloc_get_size(buf)) {
843                 DEBUG(1, ("%s: dbsize changed: %ju->%ju\n", __func__,
844                           (uintmax_t)talloc_get_size(buf),
845                           (uintmax_t)dbsize));
846                 TALLOC_FREE(buf);
847                 return true;
848         }
849
850         if (state->log != NULL) {
851                 rec_index = state->log->rec_index;
852         }
853         SBVAL(index_buf, 0, rec_index);
854
855         iov[0] = (struct iovec) { .iov_base = index_buf,
856                                   .iov_len = sizeof(index_buf) };
857         iov[1] = (struct iovec) { .iov_base = buf,
858                                   .iov_len = dbsize };
859
860         DEBUG(10, ("%s: Sending %ju bytes to %s->%s\n", __func__,
861                    (uintmax_t)iov_buflen(iov, ARRAY_SIZE(iov)),
862                    server_id_str_buf(messaging_server_id(msg_ctx), &id1),
863                    server_id_str_buf(rec->src, &id2)));
864
865         status = messaging_send_iov(msg_ctx, rec->src, MSG_SMB_NOTIFY_DB,
866                                     iov, ARRAY_SIZE(iov), NULL, 0);
867         TALLOC_FREE(buf);
868         if (!NT_STATUS_IS_OK(status)) {
869                 DEBUG(1, ("%s: messaging_send_iov failed: %s\n",
870                           __func__, nt_errstr(status)));
871         }
872
873         return true;
874 }
875
876 static int notifyd_add_proxy_syswatches(struct db_record *rec,
877                                         void *private_data);
878
879 static bool notifyd_got_db(struct messaging_context *msg_ctx,
880                            struct messaging_rec **prec,
881                            void *private_data)
882 {
883         struct notifyd_state *state = talloc_get_type_abort(
884                 private_data, struct notifyd_state);
885         struct messaging_rec *rec = *prec;
886         struct notifyd_peer *p = NULL;
887         struct server_id_buf idbuf;
888         NTSTATUS status;
889         int count;
890         size_t i;
891
892         for (i=0; i<state->num_peers; i++) {
893                 if (server_id_equal(&rec->src, &state->peers[i]->pid)) {
894                         p = state->peers[i];
895                         break;
896                 }
897         }
898
899         if (p == NULL) {
900                 DEBUG(10, ("%s: Did not find peer for db from %s\n",
901                            __func__, server_id_str_buf(rec->src, &idbuf)));
902                 return true;
903         }
904
905         if (rec->buf.length < 8) {
906                 DEBUG(10, ("%s: Got short db length %u from %s\n", __func__,
907                            (unsigned)rec->buf.length,
908                            server_id_str_buf(rec->src, &idbuf)));
909                 TALLOC_FREE(p);
910                 return true;
911         }
912
913         p->rec_index = BVAL(rec->buf.data, 0);
914
915         p->db = db_open_rbt(p);
916         if (p->db == NULL) {
917                 DEBUG(10, ("%s: db_open_rbt failed\n", __func__));
918                 TALLOC_FREE(p);
919                 return true;
920         }
921
922         status = dbwrap_unmarshall(p->db, rec->buf.data + 8,
923                                    rec->buf.length - 8);
924         if (!NT_STATUS_IS_OK(status)) {
925                 DEBUG(10, ("%s: dbwrap_unmarshall returned %s for db %s\n",
926                            __func__, nt_errstr(status),
927                            server_id_str_buf(rec->src, &idbuf)));
928                 TALLOC_FREE(p);
929                 return true;
930         }
931
932         dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
933                              &count);
934
935         DEBUG(10, ("%s: Database from %s contained %d records\n", __func__,
936                    server_id_str_buf(rec->src, &idbuf), count));
937
938         return true;
939 }
940
941 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
942                                      struct server_id src,
943                                      struct messaging_reclog *log)
944 {
945         enum ndr_err_code ndr_err;
946         uint8_t msghdr[MESSAGE_HDR_LENGTH];
947         DATA_BLOB blob;
948         struct iovec iov[2];
949         int ret;
950
951         if (log == NULL) {
952                 return;
953         }
954
955         DEBUG(10, ("%s: rec_index=%ju, num_recs=%u\n", __func__,
956                    (uintmax_t)log->rec_index, (unsigned)log->num_recs));
957
958         message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
959                         (struct server_id) {0 });
960         iov[0] = (struct iovec) { .iov_base = msghdr,
961                                   .iov_len = sizeof(msghdr) };
962
963         ndr_err = ndr_push_struct_blob(
964                 &blob, log, log,
965                 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
966         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
967                 DEBUG(1, ("%s: ndr_push_messaging_recs failed: %s\n",
968                           __func__, ndr_errstr(ndr_err)));
969                 goto done;
970         }
971         iov[1] = (struct iovec) { .iov_base = blob.data,
972                                   .iov_len = blob.length };
973
974         ret = ctdbd_messaging_send_iov(
975                 ctdbd_conn, CTDB_BROADCAST_VNNMAP,
976                 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
977         TALLOC_FREE(blob.data);
978         if (ret != 0) {
979                 DEBUG(1, ("%s: ctdbd_messaging_send failed: %s\n",
980                           __func__, strerror(ret)));
981                 goto done;
982         }
983
984         log->rec_index += 1;
985
986 done:
987         log->num_recs = 0;
988         TALLOC_FREE(log->recs);
989 }
990
991 struct notifyd_broadcast_reclog_state {
992         struct tevent_context *ev;
993         struct ctdbd_connection *ctdbd_conn;
994         struct server_id src;
995         struct messaging_reclog *log;
996 };
997
998 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
999
1000 static struct tevent_req *notifyd_broadcast_reclog_send(
1001         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1002         struct ctdbd_connection *ctdbd_conn, struct server_id src,
1003         struct messaging_reclog *log)
1004 {
1005         struct tevent_req *req, *subreq;
1006         struct notifyd_broadcast_reclog_state *state;
1007
1008         req = tevent_req_create(mem_ctx, &state,
1009                                 struct notifyd_broadcast_reclog_state);
1010         if (req == NULL) {
1011                 return NULL;
1012         }
1013         state->ev = ev;
1014         state->ctdbd_conn = ctdbd_conn;
1015         state->src = src;
1016         state->log = log;
1017
1018         subreq = tevent_wakeup_send(state, state->ev,
1019                                     timeval_current_ofs_msec(1000));
1020         if (tevent_req_nomem(subreq, req)) {
1021                 return tevent_req_post(req, ev);
1022         }
1023         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1024         return req;
1025 }
1026
1027 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1028 {
1029         struct tevent_req *req = tevent_req_callback_data(
1030                 subreq, struct tevent_req);
1031         struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1032                 req, struct notifyd_broadcast_reclog_state);
1033         bool ok;
1034
1035         ok = tevent_wakeup_recv(subreq);
1036         TALLOC_FREE(subreq);
1037         if (!ok) {
1038                 tevent_req_oom(req);
1039                 return;
1040         }
1041
1042         notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1043
1044         subreq = tevent_wakeup_send(state, state->ev,
1045                                     timeval_current_ofs_msec(1000));
1046         if (tevent_req_nomem(subreq, req)) {
1047                 return;
1048         }
1049         tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1050 }
1051
1052 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1053 {
1054         return tevent_req_simple_recv_unix(req);
1055 }
1056
1057 struct notifyd_clean_peers_state {
1058         struct tevent_context *ev;
1059         struct notifyd_state *notifyd;
1060 };
1061
1062 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1063
1064 static struct tevent_req *notifyd_clean_peers_send(
1065         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1066         struct notifyd_state *notifyd)
1067 {
1068         struct tevent_req *req, *subreq;
1069         struct notifyd_clean_peers_state *state;
1070
1071         req = tevent_req_create(mem_ctx, &state,
1072                                 struct notifyd_clean_peers_state);
1073         if (req == NULL) {
1074                 return NULL;
1075         }
1076         state->ev = ev;
1077         state->notifyd = notifyd;
1078
1079         subreq = tevent_wakeup_send(state, state->ev,
1080                                     timeval_current_ofs_msec(30000));
1081         if (tevent_req_nomem(subreq, req)) {
1082                 return tevent_req_post(req, ev);
1083         }
1084         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1085         return req;
1086 }
1087
1088 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1089 {
1090         struct tevent_req *req = tevent_req_callback_data(
1091                 subreq, struct tevent_req);
1092         struct notifyd_clean_peers_state *state = tevent_req_data(
1093                 req, struct notifyd_clean_peers_state);
1094         struct notifyd_state *notifyd = state->notifyd;
1095         size_t i;
1096         bool ok;
1097         time_t now = time(NULL);
1098
1099         ok = tevent_wakeup_recv(subreq);
1100         TALLOC_FREE(subreq);
1101         if (!ok) {
1102                 tevent_req_oom(req);
1103                 return;
1104         }
1105
1106         i = 0;
1107         while (i < notifyd->num_peers) {
1108                 struct notifyd_peer *p = notifyd->peers[i];
1109
1110                 if ((now - p->last_broadcast) > 60) {
1111                         struct server_id_buf idbuf;
1112
1113                         /*
1114                          * Haven't heard for more than 60 seconds. Call this
1115                          * peer dead
1116                          */
1117
1118                         DEBUG(10, ("%s: peer %s died\n", __func__,
1119                                    server_id_str_buf(p->pid, &idbuf)));
1120                         /*
1121                          * This implicitly decrements notifyd->num_peers
1122                          */
1123                         TALLOC_FREE(p);
1124                 } else {
1125                         i += 1;
1126                 }
1127         }
1128
1129         subreq = tevent_wakeup_send(state, state->ev,
1130                                     timeval_current_ofs_msec(30000));
1131         if (tevent_req_nomem(subreq, req)) {
1132                 return;
1133         }
1134         tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1135 }
1136
1137 static int notifyd_clean_peers_recv(struct tevent_req *req)
1138 {
1139         return tevent_req_simple_recv_unix(req);
1140 }
1141
1142 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1143                                         void *private_data)
1144 {
1145         struct notifyd_state *state = talloc_get_type_abort(
1146                 private_data, struct notifyd_state);
1147         struct db_context *db = dbwrap_record_get_db(rec);
1148         TDB_DATA key = dbwrap_record_get_key(rec);
1149         TDB_DATA value = dbwrap_record_get_value(rec);
1150         struct notifyd_instance *instances = NULL;
1151         size_t num_instances = 0;
1152         size_t i;
1153         char path[key.dsize+1];
1154         bool ok;
1155
1156         memcpy(path, key.dptr, key.dsize);
1157         path[key.dsize] = '\0';
1158
1159         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1160                                  &num_instances);
1161         if (!ok) {
1162                 DEBUG(1, ("%s: Could not parse notifyd entry for %s\n",
1163                           __func__, path));
1164                 return 0;
1165         }
1166
1167         for (i=0; i<num_instances; i++) {
1168                 struct notifyd_instance *instance = &instances[i];
1169                 uint32_t filter = instance->instance.filter;
1170                 uint32_t subdir_filter = instance->instance.subdir_filter;
1171                 int ret;
1172
1173                 ret = state->sys_notify_watch(
1174                         db, state->sys_notify_ctx, path,
1175                         &filter, &subdir_filter,
1176                         notifyd_sys_callback, state->msg_ctx,
1177                         &instance->sys_watch);
1178                 if (ret != 0) {
1179                         DEBUG(1, ("%s: inotify_watch returned %s\n",
1180                                   __func__, strerror(errno)));
1181                 }
1182         }
1183
1184         return 0;
1185 }
1186
1187 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1188 {
1189         TDB_DATA key = dbwrap_record_get_key(rec);
1190         TDB_DATA value = dbwrap_record_get_value(rec);
1191         struct notifyd_instance *instances = NULL;
1192         size_t num_instances = 0;
1193         size_t i;
1194         bool ok;
1195
1196         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1197                                  &num_instances);
1198         if (!ok) {
1199                 DEBUG(1, ("%s: Could not parse notifyd entry for %.*s\n",
1200                           __func__, (int)key.dsize, (char *)key.dptr));
1201                 return 0;
1202         }
1203         for (i=0; i<num_instances; i++) {
1204                 TALLOC_FREE(instances[i].sys_watch);
1205         }
1206         return 0;
1207 }
1208
1209 static int notifyd_peer_destructor(struct notifyd_peer *p)
1210 {
1211         struct notifyd_state *state = p->state;
1212         size_t i;
1213
1214         dbwrap_traverse_read(p->db, notifyd_db_del_syswatches, NULL, NULL);
1215
1216         for (i = 0; i<state->num_peers; i++) {
1217                 if (p == state->peers[i]) {
1218                         state->peers[i] = state->peers[state->num_peers-1];
1219                         state->num_peers -= 1;
1220                         break;
1221                 }
1222         }
1223         return 0;
1224 }
1225
1226 static struct notifyd_peer *notifyd_peer_new(
1227         struct notifyd_state *state, struct server_id pid)
1228 {
1229         struct notifyd_peer *p, **tmp;
1230
1231         tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1232                              state->num_peers+1);
1233         if (tmp == NULL) {
1234                 return NULL;
1235         }
1236         state->peers = tmp;
1237
1238         p = talloc_zero(state->peers, struct notifyd_peer);
1239         if (p == NULL) {
1240                 return NULL;
1241         }
1242         p->state = state;
1243         p->pid = pid;
1244
1245         state->peers[state->num_peers] = p;
1246         state->num_peers += 1;
1247
1248         talloc_set_destructor(p, notifyd_peer_destructor);
1249
1250         return p;
1251 }
1252
1253 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1254                                  const uint8_t *msg, size_t msglen)
1255 {
1256         struct notifyd_state *state = peer->state;
1257         DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1258                            .length = msglen };
1259         struct server_id_buf idbuf;
1260         struct messaging_reclog *log;
1261         enum ndr_err_code ndr_err;
1262         uint32_t i;
1263
1264         if (peer->db == NULL) {
1265                 /*
1266                  * No db yet
1267                  */
1268                 return;
1269         }
1270
1271         log = talloc(peer, struct messaging_reclog);
1272         if (log == NULL) {
1273                 DEBUG(10, ("%s: talloc failed\n", __func__));
1274                 return;
1275         }
1276
1277         ndr_err = ndr_pull_struct_blob_all(
1278                 &blob, log, log,
1279                 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1280         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1281                 DEBUG(10, ("%s: ndr_pull_messaging_reclog failed: %s\n",
1282                            __func__, ndr_errstr(ndr_err)));
1283                 goto fail;
1284         }
1285
1286         DEBUG(10, ("%s: Got %u recs index %ju from %s\n", __func__,
1287                    (unsigned)log->num_recs, (uintmax_t)log->rec_index,
1288                    server_id_str_buf(peer->pid, &idbuf)));
1289
1290         if (log->rec_index != peer->rec_index) {
1291                 DEBUG(3, ("%s: Got rec index %ju from %s, expected %ju\n",
1292                           __func__, (uintmax_t)log->rec_index,
1293                           server_id_str_buf(peer->pid, &idbuf),
1294                           (uintmax_t)peer->rec_index));
1295                 goto fail;
1296         }
1297
1298         for (i=0; i<log->num_recs; i++) {
1299                 struct messaging_rec *r = log->recs[i];
1300                 struct notify_rec_change_msg *chg;
1301                 size_t pathlen;
1302                 bool ok;
1303
1304                 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1305                                               &chg, &pathlen);
1306                 if (!ok) {
1307                         DEBUG(3, ("%s: notifyd_parse_rec_change failed\n",
1308                                   __func__));
1309                         goto fail;
1310                 }
1311
1312                 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1313                                               &chg->instance, peer->db,
1314                                               state->sys_notify_watch,
1315                                               state->sys_notify_ctx,
1316                                               state->msg_ctx);
1317                 if (!ok) {
1318                         DEBUG(3, ("%s: notifyd_apply_rec_change failed\n",
1319                                   __func__));
1320                         goto fail;
1321                 }
1322         }
1323
1324         peer->rec_index += 1;
1325         peer->last_broadcast = time(NULL);
1326
1327         TALLOC_FREE(log);
1328         return;
1329
1330 fail:
1331         DEBUG(10, ("%s: Dropping peer %s\n", __func__,
1332                    server_id_str_buf(peer->pid, &idbuf)));
1333         TALLOC_FREE(peer);
1334 }
1335
1336 /*
1337  * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1338  * messages) broadcasts by other notifyds. Several cases:
1339  *
1340  * We don't know the source. This creates a new peer. Creating a peer
1341  * involves asking the peer for its full database. We assume ordered
1342  * messages, so the new database will arrive before the next broadcast
1343  * will.
1344  *
1345  * We know the source and the log index matches. We will apply the log
1346  * locally to our peer's db as if we had received it from a local
1347  * client.
1348  *
1349  * We know the source but the log index does not match. This means we
1350  * lost a message. We just drop the whole peer and wait for the next
1351  * broadcast, which will then trigger a fresh database pull.
1352  */
1353
1354 static int notifyd_snoop_broadcast(uint32_t src_vnn, uint32_t dst_vnn,
1355                                    uint64_t dst_srvid,
1356                                    const uint8_t *msg, size_t msglen,
1357                                    void *private_data)
1358 {
1359         struct notifyd_state *state = talloc_get_type_abort(
1360                 private_data, struct notifyd_state);
1361         struct server_id my_id = messaging_server_id(state->msg_ctx);
1362         struct notifyd_peer *p;
1363         uint32_t i;
1364         uint32_t msg_type;
1365         struct server_id src, dst;
1366         struct server_id_buf idbuf;
1367         NTSTATUS status;
1368
1369         if (msglen < MESSAGE_HDR_LENGTH) {
1370                 DEBUG(10, ("%s: Got short broadcast\n", __func__));
1371                 return 0;
1372         }
1373         message_hdr_get(&msg_type, &src, &dst, msg);
1374
1375         if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1376                 DEBUG(10, ("%s Got message %u, ignoring\n", __func__,
1377                            (unsigned)msg_type));
1378                 return 0;
1379         }
1380         if (server_id_equal(&src, &my_id)) {
1381                 DEBUG(10, ("%s: Ignoring my own broadcast\n", __func__));
1382                 return 0;
1383         }
1384
1385         DEBUG(10, ("%s: Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1386                    __func__, server_id_str_buf(src, &idbuf)));
1387
1388         for (i=0; i<state->num_peers; i++) {
1389                 if (server_id_equal(&state->peers[i]->pid, &src)) {
1390
1391                         DEBUG(10, ("%s: Applying changes to peer %u\n",
1392                                    __func__, (unsigned)i));
1393
1394                         notifyd_apply_reclog(state->peers[i],
1395                                              msg + MESSAGE_HDR_LENGTH,
1396                                              msglen - MESSAGE_HDR_LENGTH);
1397                         return 0;
1398                 }
1399         }
1400
1401         DEBUG(10, ("%s: Creating new peer for %s\n", __func__,
1402                    server_id_str_buf(src, &idbuf)));
1403
1404         p = notifyd_peer_new(state, src);
1405         if (p == NULL) {
1406                 DEBUG(10, ("%s: notifyd_peer_new failed\n", __func__));
1407                 return 0;
1408         }
1409
1410         status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1411                                     NULL, 0);
1412         if (!NT_STATUS_IS_OK(status)) {
1413                 DEBUG(10, ("%s: messaging_send_buf failed: %s\n",
1414                            __func__, nt_errstr(status)));
1415                 TALLOC_FREE(p);
1416                 return 0;
1417         }
1418
1419         return 0;
1420 }
1421
1422 struct notifyd_parse_db_state {
1423         bool (*fn)(const char *path,
1424                    struct server_id server,
1425                    const struct notify_instance *instance,
1426                    void *private_data);
1427         void *private_data;
1428 };
1429
1430 static bool notifyd_parse_db_parser(TDB_DATA key, TDB_DATA value,
1431                                     void *private_data)
1432 {
1433         struct notifyd_parse_db_state *state = private_data;
1434         char path[key.dsize+1];
1435         struct notifyd_instance *instances = NULL;
1436         size_t num_instances = 0;
1437         size_t i;
1438         bool ok;
1439
1440         memcpy(path, key.dptr, key.dsize);
1441         path[key.dsize] = 0;
1442
1443         ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1444                                  &num_instances);
1445         if (!ok) {
1446                 DEBUG(10, ("%s: Could not parse entry for path %s\n",
1447                            __func__, path));
1448                 return true;
1449         }
1450
1451         for (i=0; i<num_instances; i++) {
1452                 ok = state->fn(path, instances[i].client,
1453                                &instances[i].instance,
1454                                state->private_data);
1455                 if (!ok) {
1456                         return false;
1457                 }
1458         }
1459
1460         return true;
1461 }
1462
1463 int notifyd_parse_db(const uint8_t *buf, size_t buflen,
1464                      uint64_t *log_index,
1465                      bool (*fn)(const char *path,
1466                                 struct server_id server,
1467                                 const struct notify_instance *instance,
1468                                 void *private_data),
1469                      void *private_data)
1470 {
1471         struct notifyd_parse_db_state state = {
1472                 .fn = fn, .private_data = private_data
1473         };
1474         NTSTATUS status;
1475
1476         if (buflen < 8) {
1477                 return EINVAL;
1478         }
1479         *log_index = BVAL(buf, 0);
1480
1481         buf += 8;
1482         buflen -= 8;
1483
1484         status = dbwrap_parse_marshall_buf(
1485                 buf, buflen, notifyd_parse_db_parser, &state);
1486         if (!NT_STATUS_IS_OK(status)) {
1487                 return map_errno_from_nt_status(status);
1488         }
1489
1490         return 0;
1491 }