smbd: Pass timespec_current through the notify_callback
[obnox/samba/samba-obnox.git] / source3 / smbd / notify_internal.c
1 /*
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Andrew Tridgell 2006
5    Copyright (C) Volker Lendecke 2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the change notify database. It implements mechanisms for
23   storing current change notify waiters in a tdb, and checking if a
24   given event matches any of the stored notify waiters.
25 */
26
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "librpc/gen_ndr/ndr_notify.h"
30 #include "dbwrap/dbwrap.h"
31 #include "dbwrap/dbwrap_open.h"
32 #include "dbwrap/dbwrap_tdb.h"
33 #include "smbd/smbd.h"
34 #include "messages.h"
35 #include "lib/tdb_wrap/tdb_wrap.h"
36 #include "util_tdb.h"
37 #include "lib/param/param.h"
38 #include "lib/dbwrap/dbwrap_cache.h"
39 #include "ctdb_srvids.h"
40 #include "ctdbd_conn.h"
41 #include "ctdb_conn.h"
42 #include "lib/util/tevent_unix.h"
43
44 struct notify_list {
45         struct notify_list *next, *prev;
46         const char *path;
47         void (*callback)(void *, struct timespec, const struct notify_event *);
48         void *private_data;
49 };
50
51 struct notify_context {
52         struct messaging_context *msg;
53         struct notify_list *list;
54
55         /*
56          * The notify database is split up into two databases: One
57          * relatively static index db and the real notify db with the
58          * volatile entries.
59          */
60
61         /*
62          * "db_notify" is indexed by pathname. Per record it stores an
63          * array of notify_db_entry structs. These represent the
64          * notify records as requested by the smb client. This
65          * database is always held locally, it is never clustered.
66          */
67         struct db_context *db_notify;
68
69         /*
70          * "db_index" is indexed by pathname. The records are an array
71          * of VNNs which have any interest in notifies for this path
72          * name.
73          *
74          * In the non-clustered case this database is cached in RAM by
75          * means of db_cache_open, which maintains a cache per
76          * process. Cache consistency is maintained by the tdb
77          * sequence number.
78          *
79          * In the clustered case right now we can not use the tdb
80          * sequence number, but by means of read only records we
81          * should be able to avoid a lot of full migrations.
82          *
83          * In both cases, it is important to keep the update
84          * operations to db_index to a minimum. This is achieved by
85          * delayed deletion. When a db_notify is initially created,
86          * the db_index record is also created. When more notifies are
87          * added for a path, then only the db_notify record needs to be
88          * modified, the db_index record is not touched. When the last
89          * entry from the db_notify record is deleted, the db_index
90          * record is not immediately deleted. Instead, the db_notify
91          * record is replaced with a current timestamp. A regular
92          * cleanup process will delete all db_index records that are
93          * older than a minute.
94          */
95         struct db_context *db_index;
96 };
97
98 static void notify_trigger_local(struct notify_context *notify,
99                                  uint32_t action, uint32_t filter,
100                                  const char *path, size_t path_len,
101                                  bool recursive);
102 static NTSTATUS notify_send(struct notify_context *notify,
103                             struct server_id *pid,
104                             const char *path, uint32_t action,
105                             void *private_data);
106 static NTSTATUS notify_add_entry(struct db_record *rec,
107                                  const struct notify_db_entry *e,
108                                  bool *p_add_idx);
109 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn);
110
111 static NTSTATUS notify_del_entry(struct db_record *rec,
112                                  const struct server_id *pid,
113                                  void *private_data);
114 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn);
115
116 static int notify_context_destructor(struct notify_context *notify);
117
118 static void notify_handler(struct messaging_context *msg_ctx,
119                            void *private_data, uint32_t msg_type,
120                            struct server_id server_id, DATA_BLOB *data);
121
122 struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
123                                    struct messaging_context *msg,
124                                    struct tevent_context *ev)
125 {
126         struct loadparm_context *lp_ctx;
127         struct notify_context *notify;
128
129         notify = talloc(mem_ctx, struct notify_context);
130         if (notify == NULL) {
131                 goto fail;
132         }
133         notify->msg = msg;
134         notify->list = NULL;
135
136         lp_ctx = loadparm_init_s3(notify, loadparm_s3_helpers());
137         notify->db_notify = db_open_tdb(
138                 notify, lp_ctx, lock_path("notify.tdb"),
139                 0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
140                 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_2, DBWRAP_FLAG_NONE);
141                 talloc_unlink(notify, lp_ctx);
142         if (notify->db_notify == NULL) {
143                 goto fail;
144         }
145         notify->db_index = db_open(
146                 notify, lock_path("notify_index.tdb"),
147                 0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
148                 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_3, DBWRAP_FLAG_NONE);
149         if (notify->db_index == NULL) {
150                 goto fail;
151         }
152         if (!lp_clustering()) {
153                 notify->db_index = db_open_cache(notify, notify->db_index);
154                 if (notify->db_index == NULL) {
155                         goto fail;
156                 }
157         }
158
159         if (notify->msg != NULL) {
160                 NTSTATUS status;
161
162                 status = messaging_register(notify->msg, notify,
163                                             MSG_PVFS_NOTIFY, notify_handler);
164                 if (!NT_STATUS_IS_OK(status)) {
165                         DEBUG(1, ("messaging_register returned %s\n",
166                                   nt_errstr(status)));
167                         goto fail;
168                 }
169         }
170
171         talloc_set_destructor(notify, notify_context_destructor);
172
173         return notify;
174 fail:
175         TALLOC_FREE(notify);
176         return NULL;
177 }
178
179 static int notify_context_destructor(struct notify_context *notify)
180 {
181         DEBUG(10, ("notify_context_destructor called\n"));
182
183         if (notify->msg != NULL) {
184                 messaging_deregister(notify->msg, MSG_PVFS_NOTIFY, notify);
185         }
186
187         while (notify->list != NULL) {
188                 DEBUG(10, ("Removing private_data=%p\n",
189                            notify->list->private_data));
190                 notify_remove(notify, notify->list->private_data);
191         }
192         return 0;
193 }
194
195 NTSTATUS notify_add(struct notify_context *notify,
196                     const char *path, uint32_t filter, uint32_t subdir_filter,
197                     void (*callback)(void *, struct timespec,
198                                      const struct notify_event *),
199                     void *private_data)
200 {
201         struct notify_db_entry e;
202         struct notify_list *listel;
203         struct db_record *notify_rec, *idx_rec;
204         bool add_idx;
205         NTSTATUS status;
206         TDB_DATA key, notify_copy;
207
208         if (notify == NULL) {
209                 return NT_STATUS_NOT_IMPLEMENTED;
210         }
211
212         DEBUG(10, ("notify_add: path=[%s], private_data=%p\n", path,
213                    private_data));
214
215         listel = talloc(notify, struct notify_list);
216         if (listel == NULL) {
217                 return NT_STATUS_NO_MEMORY;
218         }
219         listel->callback = callback;
220         listel->private_data = private_data;
221         listel->path = talloc_strdup(listel, path);
222         if (listel->path == NULL) {
223                 TALLOC_FREE(listel);
224                 return NT_STATUS_NO_MEMORY;
225         }
226         DLIST_ADD(notify->list, listel);
227
228         ZERO_STRUCT(e);
229         e.filter = filter;
230         e.subdir_filter = subdir_filter;
231         e.server = messaging_server_id(notify->msg);
232         e.private_data = private_data;
233
234         key = string_tdb_data(path);
235
236         notify_rec = dbwrap_fetch_locked(notify->db_notify,
237                                          talloc_tos(), key);
238         if (notify_rec == NULL) {
239                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
240                 goto fail;
241         }
242
243         /*
244          * Make a copy of the notify_rec for easy restore in case
245          * updating the index_rec fails;
246          */
247         notify_copy = dbwrap_record_get_value(notify_rec);
248         if (notify_copy.dsize != 0) {
249                 notify_copy.dptr = (uint8_t *)talloc_memdup(
250                         notify_rec, notify_copy.dptr,
251                         notify_copy.dsize);
252                 if (notify_copy.dptr == NULL) {
253                         TALLOC_FREE(notify_rec);
254                         status = NT_STATUS_NO_MEMORY;
255                         goto fail;
256                 }
257         }
258
259         if (DEBUGLEVEL >= 10) {
260                 NDR_PRINT_DEBUG(notify_db_entry, &e);
261         }
262
263         status = notify_add_entry(notify_rec, &e, &add_idx);
264         if (!NT_STATUS_IS_OK(status)) {
265                 goto fail;
266         }
267         if (!add_idx) {
268                 /*
269                  * Someone else has added the idx entry already
270                  */
271                 TALLOC_FREE(notify_rec);
272                 return NT_STATUS_OK;
273         }
274
275         idx_rec = dbwrap_fetch_locked(notify->db_index,
276                                       talloc_tos(), key);
277         if (idx_rec == NULL) {
278                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
279                 goto restore_notify;
280         }
281         status = notify_add_idx(idx_rec, get_my_vnn());
282         if (!NT_STATUS_IS_OK(status)) {
283                 goto restore_notify;
284         }
285
286         TALLOC_FREE(idx_rec);
287         TALLOC_FREE(notify_rec);
288         return NT_STATUS_OK;
289
290 restore_notify:
291         if (notify_copy.dsize != 0) {
292                 dbwrap_record_store(notify_rec, notify_copy, 0);
293         } else {
294                 dbwrap_record_delete(notify_rec);
295         }
296         TALLOC_FREE(notify_rec);
297 fail:
298         DLIST_REMOVE(notify->list, listel);
299         TALLOC_FREE(listel);
300         return status;
301 }
302
303 static NTSTATUS notify_add_entry(struct db_record *rec,
304                                  const struct notify_db_entry *e,
305                                  bool *p_add_idx)
306 {
307         TDB_DATA value = dbwrap_record_get_value(rec);
308         struct notify_db_entry *entries;
309         size_t num_entries;
310         bool add_idx = true;
311         NTSTATUS status;
312
313         if (value.dsize == sizeof(time_t)) {
314                 DEBUG(10, ("Re-using deleted entry\n"));
315                 value.dsize = 0;
316                 add_idx = false;
317         }
318
319         if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
320                 DEBUG(1, ("Invalid value.dsize = %u\n",
321                           (unsigned)value.dsize));
322                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
323         }
324         num_entries = value.dsize / sizeof(struct notify_db_entry);
325
326         if (num_entries != 0) {
327                 add_idx = false;
328         }
329
330         entries = talloc_array(rec, struct notify_db_entry, num_entries + 1);
331         if (entries == NULL) {
332                 return NT_STATUS_NO_MEMORY;
333         }
334         memcpy(entries, value.dptr, value.dsize);
335
336         entries[num_entries] = *e;
337         value = make_tdb_data((uint8_t *)entries, talloc_get_size(entries));
338         status = dbwrap_record_store(rec, value, 0);
339         TALLOC_FREE(entries);
340         if (!NT_STATUS_IS_OK(status)) {
341                 return status;
342         }
343         *p_add_idx = add_idx;
344         return NT_STATUS_OK;
345 }
346
347 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn)
348 {
349         TDB_DATA value = dbwrap_record_get_value(rec);
350         uint32_t *vnns;
351         size_t i, num_vnns;
352         NTSTATUS status;
353
354         if ((value.dsize % sizeof(uint32_t)) != 0) {
355                 DEBUG(1, ("Invalid value.dsize = %u\n",
356                           (unsigned)value.dsize));
357                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
358         }
359         num_vnns = value.dsize / sizeof(uint32_t);
360         vnns = (uint32_t *)value.dptr;
361
362         for (i=0; i<num_vnns; i++) {
363                 if (vnns[i] == vnn) {
364                         return NT_STATUS_OK;
365                 }
366                 if (vnns[i] > vnn) {
367                         break;
368                 }
369         }
370
371         value.dptr = (uint8_t *)talloc_realloc(
372                 rec, value.dptr, uint32_t, num_vnns + 1);
373         if (value.dptr == NULL) {
374                 return NT_STATUS_NO_MEMORY;
375         }
376         value.dsize = talloc_get_size(value.dptr);
377
378         vnns = (uint32_t *)value.dptr;
379
380         memmove(&vnns[i+1], &vnns[i], sizeof(uint32_t) * (num_vnns - i));
381         vnns[i] = vnn;
382
383         status = dbwrap_record_store(rec, value, 0);
384         if (!NT_STATUS_IS_OK(status)) {
385                 return status;
386         }
387         return NT_STATUS_OK;
388 }
389
390 NTSTATUS notify_remove(struct notify_context *notify, void *private_data)
391 {
392         struct server_id pid;
393         struct notify_list *listel;
394         struct db_record *notify_rec;
395         NTSTATUS status;
396
397         if ((notify == NULL) || (notify->msg == NULL)) {
398                 return NT_STATUS_NOT_IMPLEMENTED;
399         }
400
401         DEBUG(10, ("notify_remove: private_data=%p\n", private_data));
402
403         pid = messaging_server_id(notify->msg);
404
405         for (listel=notify->list;listel;listel=listel->next) {
406                 if (listel->private_data == private_data) {
407                         DLIST_REMOVE(notify->list, listel);
408                         break;
409                 }
410         }
411         if (listel == NULL) {
412                 DEBUG(10, ("%p not found\n", private_data));
413                 return NT_STATUS_NOT_FOUND;
414         }
415         notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(),
416                                          string_tdb_data(listel->path));
417         TALLOC_FREE(listel);
418         if (notify_rec == NULL) {
419                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
420         }
421         status = notify_del_entry(notify_rec, &pid, private_data);
422         DEBUG(10, ("del_entry returned %s\n", nt_errstr(status)));
423         TALLOC_FREE(notify_rec);
424         return status;
425 }
426
427 static NTSTATUS notify_del_entry(struct db_record *rec,
428                                  const struct server_id *pid,
429                                  void *private_data)
430 {
431         TDB_DATA value = dbwrap_record_get_value(rec);
432         struct notify_db_entry *entries;
433         size_t i, num_entries;
434         time_t now;
435
436         DEBUG(10, ("del_entry called for %s %p\n", procid_str_static(pid),
437                    private_data));
438
439         if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
440                 DEBUG(1, ("Invalid value.dsize = %u\n",
441                           (unsigned)value.dsize));
442                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
443         }
444         num_entries = value.dsize / sizeof(struct notify_db_entry);
445         entries = (struct notify_db_entry *)value.dptr;
446
447         for (i=0; i<num_entries; i++) {
448                 struct notify_db_entry *e = &entries[i];
449
450                 if (DEBUGLEVEL >= 10) {
451                         NDR_PRINT_DEBUG(notify_db_entry, e);
452                 }
453
454                 if (e->private_data != private_data) {
455                         continue;
456                 }
457                 if (serverid_equal(&e->server, pid)) {
458                         break;
459                 }
460         }
461         if (i == num_entries) {
462                 return NT_STATUS_NOT_FOUND;
463         }
464         entries[i] = entries[num_entries-1];
465         value.dsize -= sizeof(struct notify_db_entry);
466
467         if (value.dsize == 0) {
468                 now = time(NULL);
469                 value.dptr = (uint8_t *)&now;
470                 value.dsize = sizeof(now);
471         }
472         return dbwrap_record_store(rec, value, 0);
473 }
474
475 struct notify_trigger_index_state {
476         TALLOC_CTX *mem_ctx;
477         uint32_t *vnns;
478         uint32_t my_vnn;
479         bool found_my_vnn;
480 };
481
482 static void notify_trigger_index_parser(TDB_DATA key, TDB_DATA data,
483                                         void *private_data)
484 {
485         struct notify_trigger_index_state *state =
486                 (struct notify_trigger_index_state *)private_data;
487         uint32_t *new_vnns;
488         size_t i, num_vnns, num_new_vnns, num_remote_vnns;
489
490         if ((data.dsize % sizeof(uint32_t)) != 0) {
491                 DEBUG(1, ("Invalid record size in notify index db: %u\n",
492                           (unsigned)data.dsize));
493                 return;
494         }
495         new_vnns = (uint32_t *)data.dptr;
496         num_new_vnns = data.dsize / sizeof(uint32_t);
497         num_remote_vnns = num_new_vnns;
498
499         for (i=0; i<num_new_vnns; i++) {
500                 if (new_vnns[i] == state->my_vnn) {
501                         state->found_my_vnn = true;
502                         num_remote_vnns -= 1;
503                 }
504         }
505         if (num_remote_vnns == 0) {
506                 return;
507         }
508
509         num_vnns = talloc_array_length(state->vnns);
510         state->vnns = talloc_realloc(state->mem_ctx, state->vnns, uint32_t,
511                                      num_vnns + num_remote_vnns);
512         if (state->vnns == NULL) {
513                 DEBUG(1, ("talloc_realloc failed\n"));
514                 return;
515         }
516
517         for (i=0; i<num_new_vnns; i++) {
518                 if (new_vnns[i] != state->my_vnn) {
519                         state->vnns[num_vnns] = new_vnns[i];
520                         num_vnns += 1;
521                 }
522         }
523 }
524
525 static int vnn_cmp(const void *p1, const void *p2)
526 {
527         const uint32_t *vnn1 = (const uint32_t *)p1;
528         const uint32_t *vnn2 = (const uint32_t *)p2;
529
530         if (*vnn1 < *vnn2) {
531                 return -1;
532         }
533         if (*vnn1 == *vnn2) {
534                 return 0;
535         }
536         return 1;
537 }
538
539 static bool notify_push_remote_blob(TALLOC_CTX *mem_ctx, uint32_t action,
540                                     uint32_t filter, const char *path,
541                                     uint8_t **pblob, size_t *pblob_len)
542 {
543         struct notify_remote_event ev;
544         DATA_BLOB data;
545         enum ndr_err_code ndr_err;
546
547         ev.action = action;
548         ev.filter = filter;
549         ev.path = path;
550
551         if (DEBUGLEVEL >= 10) {
552                 NDR_PRINT_DEBUG(notify_remote_event, &ev);
553         }
554
555         ndr_err = ndr_push_struct_blob(
556                 &data, mem_ctx, &ev,
557                 (ndr_push_flags_fn_t)ndr_push_notify_remote_event);
558         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
559                 return false;
560         }
561         *pblob = data.data;
562         *pblob_len = data.length;
563         return true;
564 }
565
566 static bool notify_pull_remote_blob(TALLOC_CTX *mem_ctx,
567                                     const uint8_t *blob, size_t blob_len,
568                                     uint32_t *paction, uint32_t *pfilter,
569                                     char **path)
570 {
571         struct notify_remote_event *ev;
572         enum ndr_err_code ndr_err;
573         DATA_BLOB data;
574         char *p;
575
576         data.data = discard_const_p(uint8_t, blob);
577         data.length = blob_len;
578
579         ev = talloc(mem_ctx, struct notify_remote_event);
580         if (ev == NULL) {
581                 return false;
582         }
583
584         ndr_err = ndr_pull_struct_blob(
585                 &data, ev, ev,
586                 (ndr_pull_flags_fn_t)ndr_pull_notify_remote_event);
587         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
588                 TALLOC_FREE(ev);
589                 return false;
590         }
591         if (DEBUGLEVEL >= 10) {
592                 NDR_PRINT_DEBUG(notify_remote_event, ev);
593         }
594         *paction = ev->action;
595         *pfilter = ev->filter;
596         p = discard_const_p(char, ev->path);
597         *path = talloc_move(mem_ctx, &p);
598
599         TALLOC_FREE(ev);
600         return true;
601 }
602
603 void notify_trigger(struct notify_context *notify,
604                     uint32_t action, uint32_t filter, const char *path)
605 {
606         struct ctdbd_connection *ctdbd_conn;
607         struct notify_trigger_index_state idx_state;
608         const char *p, *next_p;
609         size_t i, num_vnns;
610         uint32_t last_vnn;
611         uint8_t *remote_blob = NULL;
612         size_t remote_blob_len = 0;
613
614         DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, "
615                    "path=%s\n", (unsigned)action, (unsigned)filter, path));
616
617         /* see if change notify is enabled at all */
618         if (notify == NULL) {
619                 return;
620         }
621
622         if (path[0] != '/') {
623                 /*
624                  * The rest of this routine assumes an absolute path.
625                  */
626                 return;
627         }
628
629         idx_state.mem_ctx = talloc_tos();
630         idx_state.vnns = NULL;
631         idx_state.found_my_vnn = false;
632         idx_state.my_vnn = get_my_vnn();
633
634         for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
635                 ptrdiff_t path_len = p - path;
636                 bool recursive;
637
638                 next_p = strchr(p+1, '/');
639                 recursive = (next_p != NULL);
640
641                 dbwrap_parse_record(
642                         notify->db_index,
643                         make_tdb_data(discard_const_p(uint8_t, path), path_len),
644                         notify_trigger_index_parser, &idx_state);
645
646                 if (idx_state.found_my_vnn) {
647                         notify_trigger_local(notify, action, filter,
648                                              path, path_len, recursive);
649                         idx_state.found_my_vnn = false;
650                 }
651         }
652
653         if (idx_state.vnns == NULL) {
654                 goto done;
655         }
656
657         ctdbd_conn = messaging_ctdbd_connection();
658         if (ctdbd_conn == NULL) {
659                 goto done;
660         }
661
662         num_vnns = talloc_array_length(idx_state.vnns);
663         qsort(idx_state.vnns, num_vnns, sizeof(uint32_t), vnn_cmp);
664
665         last_vnn = 0xffffffff;
666
667         if (!notify_push_remote_blob(talloc_tos(), action, filter, path,
668                                      &remote_blob, &remote_blob_len)) {
669                 DEBUG(1, ("notify_push_remote_blob failed\n"));
670                 goto done;
671         }
672
673         for (i=0; i<num_vnns; i++) {
674                 uint32_t vnn = idx_state.vnns[i];
675                 NTSTATUS status;
676
677                 if (vnn == last_vnn) {
678                         continue;
679                 }
680
681                 status = ctdbd_messaging_send_blob(
682                         ctdbd_conn, vnn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
683                         remote_blob, remote_blob_len);
684                 if (!NT_STATUS_IS_OK(status)) {
685                         DEBUG(10, ("ctdbd_messaging_send_blob to vnn %d "
686                                    "returned %s, ignoring\n", (int)vnn,
687                                    nt_errstr(status)));
688                 }
689
690                 last_vnn = vnn;
691         }
692
693 done:
694         TALLOC_FREE(remote_blob);
695         TALLOC_FREE(idx_state.vnns);
696 }
697
698 static void notify_trigger_local(struct notify_context *notify,
699                                  uint32_t action, uint32_t filter,
700                                  const char *path, size_t path_len,
701                                  bool recursive)
702 {
703         TDB_DATA data;
704         struct notify_db_entry *entries;
705         size_t i, num_entries;
706         NTSTATUS status;
707
708         DEBUG(10, ("notify_trigger_local called for %*s, path_len=%d, "
709                    "filter=%d\n", (int)path_len, path, (int)path_len,
710                    (int)filter));
711
712         status = dbwrap_fetch(
713                 notify->db_notify, talloc_tos(),
714                 make_tdb_data(discard_const_p(uint8_t, path), path_len), &data);
715         if (!NT_STATUS_IS_OK(status)) {
716                 DEBUG(10, ("dbwrap_fetch returned %s\n",
717                            nt_errstr(status)));
718                 return;
719         }
720         if (data.dsize == sizeof(time_t)) {
721                 DEBUG(10, ("Got deleted record\n"));
722                 goto done;
723         }
724         if ((data.dsize % sizeof(struct notify_db_entry)) != 0) {
725                 DEBUG(1, ("Invalid data.dsize = %u\n",
726                           (unsigned)data.dsize));
727                 goto done;
728         }
729
730         entries = (struct notify_db_entry *)data.dptr;
731         num_entries = data.dsize / sizeof(struct notify_db_entry);
732
733         DEBUG(10, ("recursive = %s pathlen=%d (%c)\n",
734                    recursive ? "true" : "false", (int)path_len,
735                    path[path_len]));
736
737         for (i=0; i<num_entries; i++) {
738                 struct notify_db_entry *e = &entries[i];
739                 uint32_t e_filter;
740
741                 if (DEBUGLEVEL >= 10) {
742                         NDR_PRINT_DEBUG(notify_db_entry, e);
743                 }
744
745                 e_filter = recursive ? e->subdir_filter : e->filter;
746
747                 if ((filter & e_filter) == 0) {
748                         continue;
749                 }
750
751                 if (!procid_is_local(&e->server)) {
752                         DEBUG(1, ("internal error: Non-local pid %s in "
753                                   "notify.tdb\n",
754                                   procid_str_static(&e->server)));
755                         continue;
756                 }
757
758                 status = notify_send(notify, &e->server, path + path_len + 1,
759                                      action, e->private_data);
760                 if (!NT_STATUS_IS_OK(status)) {
761                         DEBUG(10, ("notify_send returned %s\n",
762                                    nt_errstr(status)));
763                 }
764         }
765
766 done:
767         TALLOC_FREE(data.dptr);
768 }
769
770 static NTSTATUS notify_send(struct notify_context *notify,
771                             struct server_id *pid,
772                             const char *path, uint32_t action,
773                             void *private_data)
774 {
775         struct notify_event ev;
776         DATA_BLOB data;
777         NTSTATUS status;
778         enum ndr_err_code ndr_err;
779
780         ev.action = action;
781         ev.path = path;
782         ev.private_data = private_data;
783
784         ndr_err = ndr_push_struct_blob(
785                 &data, talloc_tos(), &ev,
786                 (ndr_push_flags_fn_t)ndr_push_notify_event);
787         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
788                 return ndr_map_error2ntstatus(ndr_err);
789         }
790         status = messaging_send(notify->msg, *pid, MSG_PVFS_NOTIFY,
791                                 &data);
792         TALLOC_FREE(data.data);
793         return status;
794 }
795
796 static void notify_handler(struct messaging_context *msg_ctx,
797                            void *private_data, uint32_t msg_type,
798                            struct server_id server_id, DATA_BLOB *data)
799 {
800         struct notify_context *notify = talloc_get_type_abort(
801                 private_data, struct notify_context);
802         enum ndr_err_code ndr_err;
803         struct notify_event *n;
804         struct notify_list *listel;
805
806         n = talloc(talloc_tos(), struct notify_event);
807         if (n == NULL) {
808                 DEBUG(1, ("talloc failed\n"));
809                 return;
810         }
811
812         ndr_err = ndr_pull_struct_blob(
813                 data, n, n, (ndr_pull_flags_fn_t)ndr_pull_notify_event);
814         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
815                 TALLOC_FREE(n);
816                 return;
817         }
818         if (DEBUGLEVEL >= 10) {
819                 NDR_PRINT_DEBUG(notify_event, n);
820         }
821
822         for (listel=notify->list;listel;listel=listel->next) {
823                 if (listel->private_data == n->private_data) {
824                         listel->callback(listel->private_data,
825                                          timespec_current(), n);
826                         break;
827                 }
828         }
829         TALLOC_FREE(n);
830 }
831
832 struct notify_walk_idx_state {
833         void (*fn)(const char *path,
834                    uint32_t *vnns, size_t num_vnns,
835                    void *private_data);
836         void *private_data;
837 };
838
839 static int notify_walk_idx_fn(struct db_record *rec, void *private_data)
840 {
841         struct notify_walk_idx_state *state =
842                 (struct notify_walk_idx_state *)private_data;
843         TDB_DATA key, value;
844         char *path;
845
846         key = dbwrap_record_get_key(rec);
847         value = dbwrap_record_get_value(rec);
848
849         if ((value.dsize % sizeof(uint32_t)) != 0) {
850                 DEBUG(1, ("invalid value size in notify index db: %u\n",
851                           (unsigned)(value.dsize)));
852                 return 0;
853         }
854
855         path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
856         if (path == NULL) {
857                 DEBUG(1, ("talloc_strndup failed\n"));
858                 return 0;
859         }
860         state->fn(path, (uint32_t *)value.dptr, value.dsize/sizeof(uint32_t),
861                   state->private_data);
862         TALLOC_FREE(path);
863         return 0;
864 }
865
866 void notify_walk_idx(struct notify_context *notify,
867                      void (*fn)(const char *path,
868                                 uint32_t *vnns, size_t num_vnns,
869                                 void *private_data),
870                      void *private_data)
871 {
872         struct notify_walk_idx_state state;
873         state.fn = fn;
874         state.private_data = private_data;
875         dbwrap_traverse_read(notify->db_index, notify_walk_idx_fn, &state,
876                              NULL);
877 }
878
879 struct notify_walk_state {
880         void (*fn)(const char *path,
881                    struct notify_db_entry *entries, size_t num_entries,
882                    time_t deleted_time, void *private_data);
883         void *private_data;
884 };
885
886 static int notify_walk_fn(struct db_record *rec, void *private_data)
887 {
888         struct notify_walk_state *state =
889                 (struct notify_walk_state *)private_data;
890         TDB_DATA key, value;
891         struct notify_db_entry *entries;
892         size_t num_entries;
893         time_t deleted_time;
894         char *path;
895
896         key = dbwrap_record_get_key(rec);
897         value = dbwrap_record_get_value(rec);
898
899         if (value.dsize == sizeof(deleted_time)) {
900                 memcpy(&deleted_time, value.dptr, sizeof(deleted_time));
901                 entries = NULL;
902                 num_entries = 0;
903         } else {
904                 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
905                         DEBUG(1, ("invalid value size in notify db: %u\n",
906                                   (unsigned)(value.dsize)));
907                         return 0;
908                 }
909                 entries = (struct notify_db_entry *)value.dptr;
910                 num_entries = value.dsize / sizeof(struct notify_db_entry);
911                 deleted_time = 0;
912         }
913
914         path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
915         if (path == NULL) {
916                 DEBUG(1, ("talloc_strndup failed\n"));
917                 return 0;
918         }
919         state->fn(path, entries, num_entries, deleted_time,
920                   state->private_data);
921         TALLOC_FREE(path);
922         return 0;
923 }
924
925 void notify_walk(struct notify_context *notify,
926                  void (*fn)(const char *path,
927                             struct notify_db_entry *entries,
928                             size_t num_entries,
929                             time_t deleted_time, void *private_data),
930                  void *private_data)
931 {
932         struct notify_walk_state state;
933         state.fn = fn;
934         state.private_data = private_data;
935         dbwrap_traverse_read(notify->db_notify, notify_walk_fn, &state,
936                              NULL);
937 }
938
939 struct notify_cleanup_state {
940         TALLOC_CTX *mem_ctx;
941         time_t delete_before;
942         ssize_t array_size;
943         uint32_t num_paths;
944         char **paths;
945 };
946
947 static void notify_cleanup_collect(
948         const char *path, struct notify_db_entry *entries, size_t num_entries,
949         time_t deleted_time, void *private_data)
950 {
951         struct notify_cleanup_state *state =
952                 (struct notify_cleanup_state *)private_data;
953         char *p;
954
955         if (num_entries != 0) {
956                 return;
957         }
958         if (deleted_time >= state->delete_before) {
959                 return;
960         }
961
962         p = talloc_strdup(state->mem_ctx, path);
963         if (p == NULL) {
964                 DEBUG(1, ("talloc_strdup failed\n"));
965                 return;
966         }
967         add_to_large_array(state->mem_ctx, sizeof(p), (void *)&p,
968                            &state->paths, &state->num_paths,
969                            &state->array_size);
970         if (state->array_size == -1) {
971                 TALLOC_FREE(p);
972         }
973 }
974
975 static bool notify_cleanup_path(struct notify_context *notify,
976                               const char *path, time_t delete_before);
977
978 void notify_cleanup(struct notify_context *notify)
979 {
980         struct notify_cleanup_state state;
981         uint32_t failure_pool;
982
983         ZERO_STRUCT(state);
984         state.mem_ctx = talloc_stackframe();
985
986         state.delete_before = time(NULL)
987                 - lp_parm_int(-1, "smbd", "notify cleanup interval", 60);
988
989         notify_walk(notify, notify_cleanup_collect, &state);
990
991         failure_pool = state.num_paths;
992
993         while (state.num_paths != 0) {
994                 size_t idx;
995
996                 /*
997                  * This loop is designed to be as kind as possible to
998                  * ctdb. ctdb does not like it if many smbds hammer on a
999                  * single record. If on many nodes the cleanup process starts
1000                  * running, it can happen that all of them need to clean up
1001                  * records in the same order. This would generate a ctdb
1002                  * migrate storm on these records. Randomizing the load across
1003                  * multiple records reduces the load on the individual record.
1004                  */
1005
1006                 generate_random_buffer((uint8_t *)&idx, sizeof(idx));
1007                 idx = idx % state.num_paths;
1008
1009                 if (!notify_cleanup_path(notify, state.paths[idx],
1010                                          state.delete_before)) {
1011                         /*
1012                          * notify_cleanup_path failed, the most likely reason
1013                          * is that dbwrap_try_fetch_locked failed due to
1014                          * contention. We allow one failed attempt per deleted
1015                          * path on average before we give up.
1016                          */
1017                         failure_pool -= 1;
1018                         if (failure_pool == 0) {
1019                                 /*
1020                                  * Too many failures. We will come back here,
1021                                  * maybe next time there is less contention.
1022                                  */
1023                                 break;
1024                         }
1025                 }
1026
1027                 TALLOC_FREE(state.paths[idx]);
1028                 state.paths[idx] = state.paths[state.num_paths-1];
1029                 state.num_paths -= 1;
1030         }
1031         TALLOC_FREE(state.mem_ctx);
1032 }
1033
1034 static bool notify_cleanup_path(struct notify_context *notify,
1035                                 const char *path, time_t delete_before)
1036 {
1037         struct db_record *notify_rec = NULL;
1038         struct db_record *idx_rec = NULL;
1039         TDB_DATA key = string_tdb_data(path);
1040         TDB_DATA value;
1041         time_t deleted;
1042         NTSTATUS status;
1043
1044         notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(), key);
1045         if (notify_rec == NULL) {
1046                 DEBUG(10, ("Could not fetch notify_rec\n"));
1047                 return false;
1048         }
1049         value = dbwrap_record_get_value(notify_rec);
1050
1051         if (value.dsize != sizeof(deleted)) {
1052                 DEBUG(10, ("record %s has been re-used\n", path));
1053                 goto done;
1054         }
1055         memcpy(&deleted, value.dptr, sizeof(deleted));
1056
1057         if (deleted >= delete_before) {
1058                 DEBUG(10, ("record %s too young\n", path));
1059                 goto done;
1060         }
1061
1062         /*
1063          * Be kind to ctdb and only try one dmaster migration at most.
1064          */
1065         idx_rec = dbwrap_try_fetch_locked(notify->db_index, talloc_tos(), key);
1066         if (idx_rec == NULL) {
1067                 DEBUG(10, ("Could not fetch idx_rec\n"));
1068                 goto done;
1069         }
1070
1071         status = dbwrap_record_delete(notify_rec);
1072         if (!NT_STATUS_IS_OK(status)) {
1073                 DEBUG(10, ("Could not delete notify_rec: %s\n",
1074                            nt_errstr(status)));
1075         }
1076
1077         status = notify_del_idx(idx_rec, get_my_vnn());
1078         if (!NT_STATUS_IS_OK(status)) {
1079                 DEBUG(10, ("Could not delete idx_rec: %s\n",
1080                            nt_errstr(status)));
1081         }
1082
1083 done:
1084         TALLOC_FREE(idx_rec);
1085         TALLOC_FREE(notify_rec);
1086         return true;
1087 }
1088
1089 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn)
1090 {
1091         TDB_DATA value = dbwrap_record_get_value(rec);
1092         uint32_t *vnns;
1093         size_t i, num_vnns;
1094
1095         if ((value.dsize % sizeof(uint32_t)) != 0) {
1096                 DEBUG(1, ("Invalid value.dsize = %u\n",
1097                           (unsigned)value.dsize));
1098                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
1099         }
1100         num_vnns = value.dsize / sizeof(uint32_t);
1101         vnns = (uint32_t *)value.dptr;
1102
1103         for (i=0; i<num_vnns; i++) {
1104                 if (vnns[i] == vnn) {
1105                         break;
1106                 }
1107         }
1108
1109         if (i == num_vnns) {
1110                 /*
1111                  * Not found. Should not happen, but okay...
1112                  */
1113                 return NT_STATUS_OK;
1114         }
1115
1116         memmove(&vnns[i], &vnns[i+1], sizeof(uint32_t) * (num_vnns - i - 1));
1117         value.dsize -= sizeof(uint32_t);
1118
1119         if (value.dsize == 0) {
1120                 return dbwrap_record_delete(rec);
1121         }
1122         return dbwrap_record_store(rec, value, 0);
1123 }
1124
1125 struct notify_cluster_proxy_state {
1126         struct tevent_context *ev;
1127         struct notify_context *notify;
1128         struct ctdb_msg_channel *chan;
1129 };
1130
1131 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq);
1132 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq);
1133 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1134                                          uint32_t action, uint32_t filter,
1135                                          char *path);
1136
1137 struct tevent_req *notify_cluster_proxy_send(
1138         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1139         struct notify_context *notify)
1140 {
1141         struct tevent_req *req, *subreq;
1142         struct notify_cluster_proxy_state *state;
1143
1144         req = tevent_req_create(mem_ctx, &state,
1145                                 struct notify_cluster_proxy_state);
1146         if (req == NULL) {
1147                 return NULL;
1148         }
1149         state->ev = ev;
1150         state->notify = notify;
1151
1152         subreq = ctdb_msg_channel_init_send(
1153                 state, state->ev,  lp_ctdbd_socket(),
1154                 CTDB_SRVID_SAMBA_NOTIFY_PROXY);
1155         if (tevent_req_nomem(subreq, req)) {
1156                 return tevent_req_post(req, ev);
1157         }
1158         tevent_req_set_callback(subreq, notify_cluster_proxy_got_chan, req);
1159         return req;
1160 }
1161
1162 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq)
1163 {
1164         struct tevent_req *req = tevent_req_callback_data(
1165                 subreq, struct tevent_req);
1166         struct notify_cluster_proxy_state *state = tevent_req_data(
1167                 req, struct notify_cluster_proxy_state);
1168         int ret;
1169
1170         ret = ctdb_msg_channel_init_recv(subreq, state, &state->chan);
1171         TALLOC_FREE(subreq);
1172         if (ret != 0) {
1173                 tevent_req_error(req, ret);
1174                 return;
1175         }
1176         subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1177         if (tevent_req_nomem(subreq, req)) {
1178                 return;
1179         }
1180         tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1181 }
1182
1183 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq)
1184 {
1185         struct tevent_req *req = tevent_req_callback_data(
1186                 subreq, struct tevent_req);
1187         struct notify_cluster_proxy_state *state = tevent_req_data(
1188                 req, struct notify_cluster_proxy_state);
1189         uint8_t *msg;
1190         size_t msg_len;
1191         uint32_t action, filter;
1192         char *path;
1193         int ret;
1194         bool res;
1195
1196         ret = ctdb_msg_read_recv(subreq, talloc_tos(), &msg, &msg_len);
1197         TALLOC_FREE(subreq);
1198         if (ret != 0) {
1199                 tevent_req_error(req, ret);
1200                 return;
1201         }
1202
1203         res = notify_pull_remote_blob(talloc_tos(), msg, msg_len,
1204                                       &action, &filter, &path);
1205         TALLOC_FREE(msg);
1206         if (!res) {
1207                 tevent_req_error(req, EIO);
1208                 return;
1209         }
1210         notify_cluster_proxy_trigger(state->notify, action, filter, path);
1211         TALLOC_FREE(path);
1212
1213         subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1214         if (tevent_req_nomem(subreq, req)) {
1215                 return;
1216         }
1217         tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1218 }
1219
1220 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1221                                          uint32_t action, uint32_t filter,
1222                                          char *path)
1223 {
1224         const char *p, *next_p;
1225
1226         for (p = path; p != NULL; p = next_p) {
1227                 ptrdiff_t path_len = p - path;
1228                 bool recursive;
1229
1230                 next_p = strchr(p+1, '/');
1231                 recursive = (next_p != NULL);
1232
1233                 notify_trigger_local(notify, action, filter,
1234                                      path, path_len, recursive);
1235         }
1236 }
1237
1238 int notify_cluster_proxy_recv(struct tevent_req *req)
1239 {
1240         int err;
1241
1242         if (tevent_req_is_unix_error(req, &err)) {
1243                 return err;
1244         }
1245         return 0;
1246 }