s3:smbd: s/struct event_context/struct tevent_context
[samba.git] / source3 / smbd / notify_internal.c
1 /*
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Andrew Tridgell 2006
5    Copyright (C) Volker Lendecke 2012
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the change notify database. It implements mechanisms for
23   storing current change notify waiters in a tdb, and checking if a
24   given event matches any of the stored notify waiiters.
25 */
26
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "librpc/gen_ndr/ndr_notify.h"
30 #include "dbwrap/dbwrap.h"
31 #include "dbwrap/dbwrap_open.h"
32 #include "dbwrap/dbwrap_tdb.h"
33 #include "smbd/smbd.h"
34 #include "messages.h"
35 #include "lib/tdb_wrap/tdb_wrap.h"
36 #include "util_tdb.h"
37 #include "lib/param/param.h"
38 #include "lib/dbwrap/dbwrap_cache.h"
39 #include "ctdb_srvids.h"
40 #include "ctdbd_conn.h"
41 #include "ctdb_conn.h"
42 #include "lib/util/tevent_unix.h"
43
44 struct notify_list {
45         struct notify_list *next, *prev;
46         const char *path;
47         void (*callback)(void *, const struct notify_event *);
48         void *private_data;
49 };
50
51 struct notify_context {
52         struct messaging_context *msg;
53         struct notify_list *list;
54
55         /*
56          * The notify database is split up into two databases: One
57          * relatively static index db and the real notify db with the
58          * volatile entries.
59          */
60
61         /*
62          * "db_notify" is indexed by pathname. Per record it stores an
63          * array of notify_db_entry structs. These represent the
64          * notify records as requested by the smb client. This
65          * database is always held locally, it is never clustered.
66          */
67         struct db_context *db_notify;
68
69         /*
70          * "db_index" is indexed by pathname. The records are an array
71          * of VNNs which have any interest in notifies for this path
72          * name.
73          *
74          * In the non-clustered case this database is cached in RAM by
75          * means of db_cache_open, which maintains a cache per
76          * process. Cache consistency is maintained by the tdb
77          * sequence number.
78          *
79          * In the clustered case right now we can not use the tdb
80          * sequence number, but by means of read only records we
81          * should be able to avoid a lot of full migrations.
82          *
83          * In both cases, it is important to keep the update
84          * operations to db_index to a minimum. This is achieved by
85          * delayed deletion. When a db_notify is initially created,
86          * the db_index record is also created. When more notifies are
87          * add for a path, then only the db_notify record needs to be
88          * modified, the db_index record is not touched. When the last
89          * entry from the db_notify record is deleted, the db_index
90          * record is not immediately deleted. Instead, the db_notify
91          * record is replaced with a current timestamp. A regular
92          * cleanup process will delete all db_index records that are
93          * older than a minute.
94          */
95         struct db_context *db_index;
96 };
97
98 static void notify_trigger_local(struct notify_context *notify,
99                                  uint32_t action, uint32_t filter,
100                                  const char *path, size_t path_len,
101                                  bool recursive);
102 static NTSTATUS notify_send(struct notify_context *notify,
103                             struct server_id *pid,
104                             const char *path, uint32_t action,
105                             void *private_data);
106 static NTSTATUS notify_add_entry(struct db_record *rec,
107                                  const struct notify_db_entry *e,
108                                  bool *p_add_idx);
109 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn);
110
111 static NTSTATUS notify_del_entry(struct db_record *rec,
112                                  const struct server_id *pid,
113                                  void *private_data);
114 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn);
115
116 static int notify_context_destructor(struct notify_context *notify);
117
118 static void notify_handler(struct messaging_context *msg_ctx,
119                            void *private_data, uint32_t msg_type,
120                            struct server_id server_id, DATA_BLOB *data);
121
122 struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
123                                    struct messaging_context *msg,
124                                    struct tevent_context *ev)
125 {
126         struct loadparm_context *lp_ctx;
127         struct notify_context *notify;
128
129         notify = talloc(mem_ctx, struct notify_context);
130         if (notify == NULL) {
131                 goto fail;
132         }
133         notify->msg = msg;
134         notify->list = NULL;
135
136         lp_ctx = loadparm_init_s3(notify, loadparm_s3_helpers());
137         notify->db_notify = db_open_tdb(
138                 notify, lp_ctx, lock_path("notify.tdb"),
139                 0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
140                 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_2);
141                 talloc_unlink(notify, lp_ctx);
142         if (notify->db_notify == NULL) {
143                 goto fail;
144         }
145         notify->db_index = db_open(
146                 notify, lock_path("notify_index.tdb"),
147                 0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
148                 O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_3);
149         if (notify->db_index == NULL) {
150                 goto fail;
151         }
152         if (!lp_clustering()) {
153                 notify->db_index = db_open_cache(notify, notify->db_index);
154                 if (notify->db_index == NULL) {
155                         goto fail;
156                 }
157         }
158
159         if (notify->msg != NULL) {
160                 NTSTATUS status;
161
162                 status = messaging_register(notify->msg, notify,
163                                             MSG_PVFS_NOTIFY, notify_handler);
164                 if (!NT_STATUS_IS_OK(status)) {
165                         DEBUG(1, ("messaging_register returned %s\n",
166                                   nt_errstr(status)));
167                         goto fail;
168                 }
169         }
170
171         talloc_set_destructor(notify, notify_context_destructor);
172
173         return notify;
174 fail:
175         TALLOC_FREE(notify);
176         return NULL;
177 }
178
179 static int notify_context_destructor(struct notify_context *notify)
180 {
181         DEBUG(10, ("notify_context_destructor called\n"));
182
183         if (notify->msg != NULL) {
184                 messaging_deregister(notify->msg, MSG_PVFS_NOTIFY, notify);
185         }
186
187         while (notify->list != NULL) {
188                 DEBUG(10, ("Removing private_data=%p\n",
189                            notify->list->private_data));
190                 notify_remove(notify, notify->list->private_data);
191         }
192         return 0;
193 }
194
195 NTSTATUS notify_add(struct notify_context *notify,
196                     const char *path, uint32_t filter, uint32_t subdir_filter,
197                     void (*callback)(void *, const struct notify_event *),
198                     void *private_data)
199 {
200         struct notify_db_entry e;
201         struct notify_list *listel;
202         struct db_record *notify_rec, *idx_rec;
203         bool add_idx;
204         NTSTATUS status;
205         TDB_DATA key, notify_copy;
206
207         if (notify == NULL) {
208                 return NT_STATUS_NOT_IMPLEMENTED;
209         }
210
211         DEBUG(10, ("notify_add: path=[%s], private_data=%p\n", path,
212                    private_data));
213
214         listel = talloc(notify, struct notify_list);
215         if (listel == NULL) {
216                 return NT_STATUS_NO_MEMORY;
217         }
218         listel->callback = callback;
219         listel->private_data = private_data;
220         listel->path = talloc_strdup(listel, path);
221         if (listel->path == NULL) {
222                 TALLOC_FREE(listel);
223                 return NT_STATUS_NO_MEMORY;
224         }
225         DLIST_ADD(notify->list, listel);
226
227         ZERO_STRUCT(e);
228         e.filter = filter;
229         e.subdir_filter = subdir_filter;
230         e.server = messaging_server_id(notify->msg);
231         e.private_data = private_data;
232
233         key = string_tdb_data(path);
234
235         notify_rec = dbwrap_fetch_locked(notify->db_notify,
236                                          talloc_tos(), key);
237         if (notify_rec == NULL) {
238                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
239                 goto fail;
240         }
241
242         /*
243          * Make a copy of the notify_rec for easy restore in case
244          * updating the index_rec fails;
245          */
246         notify_copy = dbwrap_record_get_value(notify_rec);
247         if (notify_copy.dsize != 0) {
248                 notify_copy.dptr = (uint8_t *)talloc_memdup(
249                         notify_rec, notify_copy.dptr,
250                         notify_copy.dsize);
251                 if (notify_copy.dptr == NULL) {
252                         TALLOC_FREE(notify_rec);
253                         status = NT_STATUS_NO_MEMORY;
254                         goto fail;
255                 }
256         }
257
258         if (DEBUGLEVEL >= 10) {
259                 NDR_PRINT_DEBUG(notify_db_entry, &e);
260         }
261
262         status = notify_add_entry(notify_rec, &e, &add_idx);
263         if (!NT_STATUS_IS_OK(status)) {
264                 goto fail;
265         }
266         if (!add_idx) {
267                 /*
268                  * Someone else has added the idx entry already
269                  */
270                 TALLOC_FREE(notify_rec);
271                 return NT_STATUS_OK;
272         }
273
274         idx_rec = dbwrap_fetch_locked(notify->db_index,
275                                       talloc_tos(), key);
276         if (idx_rec == NULL) {
277                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
278                 goto restore_notify;
279         }
280         status = notify_add_idx(idx_rec, get_my_vnn());
281         if (!NT_STATUS_IS_OK(status)) {
282                 goto restore_notify;
283         }
284
285         TALLOC_FREE(idx_rec);
286         TALLOC_FREE(notify_rec);
287         return NT_STATUS_OK;
288
289 restore_notify:
290         if (notify_copy.dsize != 0) {
291                 dbwrap_record_store(notify_rec, notify_copy, 0);
292         } else {
293                 dbwrap_record_delete(notify_rec);
294         }
295         TALLOC_FREE(notify_rec);
296 fail:
297         DLIST_REMOVE(notify->list, listel);
298         TALLOC_FREE(listel);
299         return status;
300 }
301
302 static NTSTATUS notify_add_entry(struct db_record *rec,
303                                  const struct notify_db_entry *e,
304                                  bool *p_add_idx)
305 {
306         TDB_DATA value = dbwrap_record_get_value(rec);
307         struct notify_db_entry *entries;
308         size_t num_entries;
309         bool add_idx = true;
310         NTSTATUS status;
311
312         if (value.dsize == sizeof(time_t)) {
313                 DEBUG(10, ("Re-using deleted entry\n"));
314                 value.dsize = 0;
315                 add_idx = false;
316         }
317
318         if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
319                 DEBUG(1, ("Invalid value.dsize = %u\n",
320                           (unsigned)value.dsize));
321                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
322         }
323         num_entries = value.dsize / sizeof(struct notify_db_entry);
324
325         if (num_entries != 0) {
326                 add_idx = false;
327         }
328
329         entries = talloc_array(rec, struct notify_db_entry, num_entries + 1);
330         if (entries == NULL) {
331                 return NT_STATUS_NO_MEMORY;
332         }
333         memcpy(entries, value.dptr, value.dsize);
334
335         entries[num_entries] = *e;
336         value = make_tdb_data((uint8_t *)entries, talloc_get_size(entries));
337         status = dbwrap_record_store(rec, value, 0);
338         TALLOC_FREE(entries);
339         if (!NT_STATUS_IS_OK(status)) {
340                 return status;
341         }
342         *p_add_idx = add_idx;
343         return NT_STATUS_OK;
344 }
345
346 static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn)
347 {
348         TDB_DATA value = dbwrap_record_get_value(rec);
349         uint32_t *vnns;
350         size_t i, num_vnns;
351         NTSTATUS status;
352
353         if ((value.dsize % sizeof(uint32_t)) != 0) {
354                 DEBUG(1, ("Invalid value.dsize = %u\n",
355                           (unsigned)value.dsize));
356                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
357         }
358         num_vnns = value.dsize / sizeof(uint32_t);
359         vnns = (uint32_t *)value.dptr;
360
361         for (i=0; i<num_vnns; i++) {
362                 if (vnns[i] == vnn) {
363                         return NT_STATUS_OK;
364                 }
365                 if (vnns[i] > vnn) {
366                         break;
367                 }
368         }
369
370         value.dptr = (uint8_t *)talloc_realloc(
371                 rec, value.dptr, uint32_t, num_vnns + 1);
372         if (value.dptr == NULL) {
373                 return NT_STATUS_NO_MEMORY;
374         }
375         value.dsize = talloc_get_size(value.dptr);
376
377         vnns = (uint32_t *)value.dptr;
378
379         memmove(&vnns[i+1], &vnns[i], sizeof(uint32_t) * (num_vnns - i));
380         vnns[i] = vnn;
381
382         status = dbwrap_record_store(rec, value, 0);
383         if (!NT_STATUS_IS_OK(status)) {
384                 return status;
385         }
386         return NT_STATUS_OK;
387 }
388
389 NTSTATUS notify_remove(struct notify_context *notify, void *private_data)
390 {
391         struct server_id pid;
392         struct notify_list *listel;
393         struct db_record *notify_rec;
394         NTSTATUS status;
395
396         if ((notify == NULL) || (notify->msg == NULL)) {
397                 return NT_STATUS_NOT_IMPLEMENTED;
398         }
399
400         DEBUG(10, ("notify_remove: private_data=%p\n", private_data));
401
402         pid = messaging_server_id(notify->msg);
403
404         for (listel=notify->list;listel;listel=listel->next) {
405                 if (listel->private_data == private_data) {
406                         DLIST_REMOVE(notify->list, listel);
407                         break;
408                 }
409         }
410         if (listel == NULL) {
411                 DEBUG(10, ("%p not found\n", private_data));
412                 return NT_STATUS_NOT_FOUND;
413         }
414         notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(),
415                                          string_tdb_data(listel->path));
416         TALLOC_FREE(listel);
417         if (notify_rec == NULL) {
418                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
419         }
420         status = notify_del_entry(notify_rec, &pid, private_data);
421         DEBUG(10, ("del_entry returned %s\n", nt_errstr(status)));
422         TALLOC_FREE(notify_rec);
423         return status;
424 }
425
426 static NTSTATUS notify_del_entry(struct db_record *rec,
427                                  const struct server_id *pid,
428                                  void *private_data)
429 {
430         TDB_DATA value = dbwrap_record_get_value(rec);
431         struct notify_db_entry *entries;
432         size_t i, num_entries;
433         time_t now;
434
435         DEBUG(10, ("del_entry called for %s %p\n", procid_str_static(pid),
436                    private_data));
437
438         if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
439                 DEBUG(1, ("Invalid value.dsize = %u\n",
440                           (unsigned)value.dsize));
441                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
442         }
443         num_entries = value.dsize / sizeof(struct notify_db_entry);
444         entries = (struct notify_db_entry *)value.dptr;
445
446         for (i=0; i<num_entries; i++) {
447                 struct notify_db_entry *e = &entries[i];
448
449                 if (DEBUGLEVEL >= 10) {
450                         NDR_PRINT_DEBUG(notify_db_entry, e);
451                 }
452
453                 if (e->private_data != private_data) {
454                         continue;
455                 }
456                 if (serverid_equal(&e->server, pid)) {
457                         break;
458                 }
459         }
460         if (i == num_entries) {
461                 return NT_STATUS_NOT_FOUND;
462         }
463         entries[i] = entries[num_entries-1];
464         value.dsize -= sizeof(struct notify_db_entry);
465
466         if (value.dsize == 0) {
467                 now = time(NULL);
468                 value.dptr = (uint8_t *)&now;
469                 value.dsize = sizeof(now);
470         }
471         return dbwrap_record_store(rec, value, 0);
472 }
473
474 struct notify_trigger_index_state {
475         TALLOC_CTX *mem_ctx;
476         uint32_t *vnns;
477         uint32_t my_vnn;
478         bool found_my_vnn;
479 };
480
481 static void notify_trigger_index_parser(TDB_DATA key, TDB_DATA data,
482                                         void *private_data)
483 {
484         struct notify_trigger_index_state *state =
485                 (struct notify_trigger_index_state *)private_data;
486         uint32_t *new_vnns;
487         size_t i, num_vnns, num_new_vnns;
488
489         if ((data.dsize % sizeof(uint32_t)) != 0) {
490                 DEBUG(1, ("Invalid record size in notify index db: %u\n",
491                           (unsigned)data.dsize));
492                 return;
493         }
494         new_vnns = (uint32_t *)data.dptr;
495         num_new_vnns = data.dsize / sizeof(uint32_t);
496
497         num_vnns = talloc_array_length(state->vnns);
498
499         for (i=0; i<num_new_vnns; i++) {
500                 if (new_vnns[i] == state->my_vnn) {
501                         state->found_my_vnn = true;
502                 }
503         }
504
505         state->vnns = talloc_realloc(state->mem_ctx, state->vnns, uint32_t,
506                                      num_vnns + num_new_vnns);
507         if ((num_vnns + num_new_vnns != 0) && (state->vnns == NULL)) {
508                 DEBUG(1, ("talloc_realloc failed\n"));
509                 return;
510         }
511         memcpy(&state->vnns[num_vnns], data.dptr, data.dsize);
512 }
513
514 static int vnn_cmp(const void *p1, const void *p2)
515 {
516         const uint32_t *vnn1 = (const uint32_t *)p1;
517         const uint32_t *vnn2 = (const uint32_t *)p2;
518
519         if (*vnn1 < *vnn2) {
520                 return -1;
521         }
522         if (*vnn1 == *vnn2) {
523                 return 0;
524         }
525         return 1;
526 }
527
528 static bool notify_push_remote_blob(TALLOC_CTX *mem_ctx, uint32_t action,
529                                     uint32_t filter, const char *path,
530                                     uint8_t **pblob, size_t *pblob_len)
531 {
532         struct notify_remote_event ev;
533         DATA_BLOB data;
534         enum ndr_err_code ndr_err;
535
536         ev.action = action;
537         ev.filter = filter;
538         ev.path = path;
539
540         if (DEBUGLEVEL >= 10) {
541                 NDR_PRINT_DEBUG(notify_remote_event, &ev);
542         }
543
544         ndr_err = ndr_push_struct_blob(
545                 &data, mem_ctx, &ev,
546                 (ndr_push_flags_fn_t)ndr_push_notify_remote_event);
547         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
548                 return false;
549         }
550         *pblob = data.data;
551         *pblob_len = data.length;
552         return true;
553 }
554
555 static bool notify_pull_remote_blob(TALLOC_CTX *mem_ctx,
556                                     const uint8_t *blob, size_t blob_len,
557                                     uint32_t *paction, uint32_t *pfilter,
558                                     char **path)
559 {
560         struct notify_remote_event *ev;
561         enum ndr_err_code ndr_err;
562         DATA_BLOB data;
563         char *p;
564
565         data.data = discard_const_p(uint8_t, blob);
566         data.length = blob_len;
567
568         ev = talloc(mem_ctx, struct notify_remote_event);
569         if (ev == NULL) {
570                 return false;
571         }
572
573         ndr_err = ndr_pull_struct_blob(
574                 &data, ev, ev,
575                 (ndr_pull_flags_fn_t)ndr_pull_notify_remote_event);
576         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
577                 TALLOC_FREE(ev);
578                 return false;
579         }
580         if (DEBUGLEVEL >= 10) {
581                 NDR_PRINT_DEBUG(notify_remote_event, ev);
582         }
583         *paction = ev->action;
584         *pfilter = ev->filter;
585         p = discard_const_p(char, ev->path);
586         *path = talloc_move(mem_ctx, &p);
587
588         TALLOC_FREE(ev);
589         return true;
590 }
591
592 void notify_trigger(struct notify_context *notify,
593                     uint32_t action, uint32_t filter, const char *path)
594 {
595         struct ctdbd_connection *ctdbd_conn;
596         struct notify_trigger_index_state idx_state;
597         const char *p, *next_p;
598         size_t i, num_vnns;
599         uint32_t last_vnn;
600         uint8_t *remote_blob = NULL;
601         size_t remote_blob_len = 0;
602
603         DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, "
604                    "path=%s\n", (unsigned)action, (unsigned)filter, path));
605
606         /* see if change notify is enabled at all */
607         if (notify == NULL) {
608                 return;
609         }
610
611         idx_state.mem_ctx = talloc_tos();
612         idx_state.vnns = NULL;
613         idx_state.my_vnn = get_my_vnn();
614
615         for (p = path; p != NULL; p = next_p) {
616                 ptrdiff_t path_len = p - path;
617                 bool recursive;
618
619                 next_p = strchr(p+1, '/');
620                 recursive = (next_p != NULL);
621
622                 idx_state.found_my_vnn = false;
623
624                 dbwrap_parse_record(
625                         notify->db_index,
626                         make_tdb_data(discard_const_p(uint8_t, path), path_len),
627                         notify_trigger_index_parser, &idx_state);
628
629                 if (!idx_state.found_my_vnn) {
630                         continue;
631                 }
632                 notify_trigger_local(notify, action, filter,
633                                      path, path_len, recursive);
634         }
635
636         ctdbd_conn = messaging_ctdbd_connection();
637         if (ctdbd_conn == NULL) {
638                 goto done;
639         }
640
641         num_vnns = talloc_array_length(idx_state.vnns);
642         qsort(idx_state.vnns, num_vnns, sizeof(uint32_t), vnn_cmp);
643
644         last_vnn = 0xffffffff;
645         remote_blob = NULL;
646
647         for (i=0; i<num_vnns; i++) {
648                 uint32_t vnn = idx_state.vnns[i];
649                 NTSTATUS status;
650
651                 if (vnn == last_vnn) {
652                         continue;
653                 }
654                 if (vnn == idx_state.my_vnn) {
655                         continue;
656                 }
657                 if ((remote_blob == NULL) &&
658                     !notify_push_remote_blob(
659                             talloc_tos(), action, filter,
660                             path, &remote_blob, &remote_blob_len)) {
661                         break;
662                 }
663
664                 status = ctdbd_messaging_send_blob(
665                         ctdbd_conn, vnn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
666                         remote_blob, remote_blob_len);
667                 if (!NT_STATUS_IS_OK(status)) {
668                         DEBUG(10, ("ctdbd_messaging_send_blob to vnn %d "
669                                    "returned %s, ignoring\n", (int)vnn,
670                                    nt_errstr(status)));
671                 }
672
673                 last_vnn = vnn;
674         }
675
676 done:
677         TALLOC_FREE(remote_blob);
678         TALLOC_FREE(idx_state.vnns);
679 }
680
681 static void notify_trigger_local(struct notify_context *notify,
682                                  uint32_t action, uint32_t filter,
683                                  const char *path, size_t path_len,
684                                  bool recursive)
685 {
686         TDB_DATA data;
687         struct notify_db_entry *entries;
688         size_t i, num_entries;
689         NTSTATUS status;
690
691         DEBUG(10, ("notify_trigger_local called for %*s, path_len=%d, "
692                    "filter=%d\n", (int)path_len, path, (int)path_len,
693                    (int)filter));
694
695         status = dbwrap_fetch(
696                 notify->db_notify, talloc_tos(),
697                 make_tdb_data(discard_const_p(uint8_t, path), path_len), &data);
698         if (!NT_STATUS_IS_OK(status)) {
699                 DEBUG(10, ("dbwrap_fetch returned %s\n",
700                            nt_errstr(status)));
701                 return;
702         }
703         if (data.dsize == sizeof(time_t)) {
704                 DEBUG(10, ("Got deleted record\n"));
705                 goto done;
706         }
707         if ((data.dsize % sizeof(struct notify_db_entry)) != 0) {
708                 DEBUG(1, ("Invalid data.dsize = %u\n",
709                           (unsigned)data.dsize));
710                 goto done;
711         }
712
713         entries = (struct notify_db_entry *)data.dptr;
714         num_entries = data.dsize / sizeof(struct notify_db_entry);
715
716         DEBUG(10, ("recursive = %s pathlen=%d (%c)\n",
717                    recursive ? "true" : "false", (int)path_len,
718                    path[path_len]));
719
720         for (i=0; i<num_entries; i++) {
721                 struct notify_db_entry *e = &entries[i];
722                 uint32_t e_filter;
723
724                 if (DEBUGLEVEL >= 10) {
725                         NDR_PRINT_DEBUG(notify_db_entry, e);
726                 }
727
728                 e_filter = recursive ? e->subdir_filter : e->filter;
729
730                 if ((filter & e_filter) == 0) {
731                         continue;
732                 }
733
734                 if (!procid_is_local(&e->server)) {
735                         DEBUG(1, ("internal error: Non-local pid %s in "
736                                   "notify.tdb\n",
737                                   procid_str_static(&e->server)));
738                         continue;
739                 }
740
741                 status = notify_send(notify, &e->server, path + path_len + 1,
742                                      action, e->private_data);
743                 if (!NT_STATUS_IS_OK(status)) {
744                         DEBUG(10, ("notify_send returned %s\n",
745                                    nt_errstr(status)));
746                 }
747         }
748
749 done:
750         TALLOC_FREE(data.dptr);
751 }
752
753 static NTSTATUS notify_send(struct notify_context *notify,
754                             struct server_id *pid,
755                             const char *path, uint32_t action,
756                             void *private_data)
757 {
758         struct notify_event ev;
759         DATA_BLOB data;
760         NTSTATUS status;
761         enum ndr_err_code ndr_err;
762
763         ev.action = action;
764         ev.path = path;
765         ev.private_data = private_data;
766
767         ndr_err = ndr_push_struct_blob(
768                 &data, talloc_tos(), &ev,
769                 (ndr_push_flags_fn_t)ndr_push_notify_event);
770         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
771                 return ndr_map_error2ntstatus(ndr_err);
772         }
773         status = messaging_send(notify->msg, *pid, MSG_PVFS_NOTIFY,
774                                 &data);
775         TALLOC_FREE(data.data);
776         return status;
777 }
778
779 static void notify_handler(struct messaging_context *msg_ctx,
780                            void *private_data, uint32_t msg_type,
781                            struct server_id server_id, DATA_BLOB *data)
782 {
783         struct notify_context *notify = talloc_get_type_abort(
784                 private_data, struct notify_context);
785         enum ndr_err_code ndr_err;
786         struct notify_event *n;
787         struct notify_list *listel;
788
789         n = talloc(talloc_tos(), struct notify_event);
790         if (n == NULL) {
791                 DEBUG(1, ("talloc failed\n"));
792                 return;
793         }
794
795         ndr_err = ndr_pull_struct_blob(
796                 data, n, n, (ndr_pull_flags_fn_t)ndr_pull_notify_event);
797         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
798                 TALLOC_FREE(n);
799                 return;
800         }
801         if (DEBUGLEVEL >= 10) {
802                 NDR_PRINT_DEBUG(notify_event, n);
803         }
804
805         for (listel=notify->list;listel;listel=listel->next) {
806                 if (listel->private_data == n->private_data) {
807                         listel->callback(listel->private_data, n);
808                         break;
809                 }
810         }
811         TALLOC_FREE(n);
812 }
813
814 struct notify_walk_idx_state {
815         void (*fn)(const char *path,
816                    uint32_t *vnns, size_t num_vnns,
817                    void *private_data);
818         void *private_data;
819 };
820
821 static int notify_walk_idx_fn(struct db_record *rec, void *private_data)
822 {
823         struct notify_walk_idx_state *state =
824                 (struct notify_walk_idx_state *)private_data;
825         TDB_DATA key, value;
826         char *path;
827
828         key = dbwrap_record_get_key(rec);
829         value = dbwrap_record_get_value(rec);
830
831         if ((value.dsize % sizeof(uint32_t)) != 0) {
832                 DEBUG(1, ("invalid value size in notify index db: %u\n",
833                           (unsigned)(value.dsize)));
834                 return 0;
835         }
836
837         path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
838         if (path == NULL) {
839                 DEBUG(1, ("talloc_strndup failed\n"));
840                 return 0;
841         }
842         state->fn(path, (uint32_t *)value.dptr, value.dsize/sizeof(uint32_t),
843                   state->private_data);
844         TALLOC_FREE(path);
845         return 0;
846 }
847
848 void notify_walk_idx(struct notify_context *notify,
849                      void (*fn)(const char *path,
850                                 uint32_t *vnns, size_t num_vnns,
851                                 void *private_data),
852                      void *private_data)
853 {
854         struct notify_walk_idx_state state;
855         state.fn = fn;
856         state.private_data = private_data;
857         dbwrap_traverse_read(notify->db_index, notify_walk_idx_fn, &state,
858                              NULL);
859 }
860
861 struct notify_walk_state {
862         void (*fn)(const char *path,
863                    struct notify_db_entry *entries, size_t num_entries,
864                    time_t deleted_time, void *private_data);
865         void *private_data;
866 };
867
868 static int notify_walk_fn(struct db_record *rec, void *private_data)
869 {
870         struct notify_walk_state *state =
871                 (struct notify_walk_state *)private_data;
872         TDB_DATA key, value;
873         struct notify_db_entry *entries;
874         size_t num_entries;
875         time_t deleted_time;
876         char *path;
877
878         key = dbwrap_record_get_key(rec);
879         value = dbwrap_record_get_value(rec);
880
881         if (value.dsize == sizeof(deleted_time)) {
882                 memcpy(&deleted_time, value.dptr, sizeof(deleted_time));
883                 entries = NULL;
884                 num_entries = 0;
885         } else {
886                 if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
887                         DEBUG(1, ("invalid value size in notify db: %u\n",
888                                   (unsigned)(value.dsize)));
889                         return 0;
890                 }
891                 entries = (struct notify_db_entry *)value.dptr;
892                 num_entries = value.dsize / sizeof(struct notify_db_entry);
893                 deleted_time = 0;
894         }
895
896         path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
897         if (path == NULL) {
898                 DEBUG(1, ("talloc_strndup failed\n"));
899                 return 0;
900         }
901         state->fn(path, entries, num_entries, deleted_time,
902                   state->private_data);
903         TALLOC_FREE(path);
904         return 0;
905 }
906
907 void notify_walk(struct notify_context *notify,
908                  void (*fn)(const char *path,
909                             struct notify_db_entry *entries,
910                             size_t num_entries,
911                             time_t deleted_time, void *private_data),
912                  void *private_data)
913 {
914         struct notify_walk_state state;
915         state.fn = fn;
916         state.private_data = private_data;
917         dbwrap_traverse_read(notify->db_notify, notify_walk_fn, &state,
918                              NULL);
919 }
920
921 struct notify_cleanup_state {
922         TALLOC_CTX *mem_ctx;
923         time_t delete_before;
924         ssize_t array_size;
925         uint32_t num_paths;
926         char **paths;
927 };
928
929 static void notify_cleanup_collect(
930         const char *path, struct notify_db_entry *entries, size_t num_entries,
931         time_t deleted_time, void *private_data)
932 {
933         struct notify_cleanup_state *state =
934                 (struct notify_cleanup_state *)private_data;
935         char *p;
936
937         if (num_entries != 0) {
938                 return;
939         }
940         if (deleted_time >= state->delete_before) {
941                 return;
942         }
943
944         p = talloc_strdup(state->mem_ctx, path);
945         if (p == NULL) {
946                 DEBUG(1, ("talloc_strdup failed\n"));
947                 return;
948         }
949         add_to_large_array(state->mem_ctx, sizeof(p), (void *)&p,
950                            &state->paths, &state->num_paths,
951                            &state->array_size);
952         if (state->array_size == -1) {
953                 TALLOC_FREE(p);
954         }
955 }
956
957 static bool notify_cleanup_path(struct notify_context *notify,
958                               const char *path, time_t delete_before);
959
960 void notify_cleanup(struct notify_context *notify)
961 {
962         struct notify_cleanup_state state;
963         uint32_t failure_pool;
964
965         ZERO_STRUCT(state);
966         state.mem_ctx = talloc_stackframe();
967
968         state.delete_before = time(NULL)
969                 - lp_parm_int(-1, "smbd", "notify cleanup interval", 60);
970
971         notify_walk(notify, notify_cleanup_collect, &state);
972
973         failure_pool = state.num_paths;
974
975         while (state.num_paths != 0) {
976                 size_t idx;
977
978                 /*
979                  * This loop is designed to be as kind as possible to
980                  * ctdb. ctdb does not like it if many smbds hammer on a
981                  * single record. If on many nodes the cleanup process starts
982                  * running, it can happen that all of them need to clean up
983                  * records in the same order. This would generate a ctdb
984                  * migrate storm on these records. Randomizing the load across
985                  * multiple records reduces the load on the individual record.
986                  */
987
988                 generate_random_buffer((uint8_t *)&idx, sizeof(idx));
989                 idx = idx % state.num_paths;
990
991                 if (!notify_cleanup_path(notify, state.paths[idx],
992                                          state.delete_before)) {
993                         /*
994                          * notify_cleanup_path failed, the most likely reason
995                          * is that dbwrap_try_fetch_locked failed due to
996                          * contention. We allow one failed attempt per deleted
997                          * path on average before we give up.
998                          */
999                         failure_pool -= 1;
1000                         if (failure_pool == 0) {
1001                                 /*
1002                                  * Too many failures. We will come back here,
1003                                  * maybe next time there is less contention.
1004                                  */
1005                                 break;
1006                         }
1007                 }
1008
1009                 TALLOC_FREE(state.paths[idx]);
1010                 state.paths[idx] = state.paths[state.num_paths-1];
1011                 state.num_paths -= 1;
1012         }
1013         TALLOC_FREE(state.mem_ctx);
1014 }
1015
1016 static bool notify_cleanup_path(struct notify_context *notify,
1017                                 const char *path, time_t delete_before)
1018 {
1019         struct db_record *notify_rec = NULL;
1020         struct db_record *idx_rec = NULL;
1021         TDB_DATA key = string_tdb_data(path);
1022         TDB_DATA value;
1023         time_t deleted;
1024         NTSTATUS status;
1025
1026         notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(), key);
1027         if (notify_rec == NULL) {
1028                 DEBUG(10, ("Could not fetch notify_rec\n"));
1029                 return false;
1030         }
1031         value = dbwrap_record_get_value(notify_rec);
1032
1033         if (value.dsize != sizeof(deleted)) {
1034                 DEBUG(10, ("record %s has been re-used\n", path));
1035                 goto done;
1036         }
1037         memcpy(&deleted, value.dptr, sizeof(deleted));
1038
1039         if (deleted >= delete_before) {
1040                 DEBUG(10, ("record %s too young\n", path));
1041                 goto done;
1042         }
1043
1044         /*
1045          * Be kind to ctdb and only try one dmaster migration at most.
1046          */
1047         idx_rec = dbwrap_try_fetch_locked(notify->db_index, talloc_tos(), key);
1048         if (idx_rec == NULL) {
1049                 DEBUG(10, ("Could not fetch idx_rec\n"));
1050                 goto done;
1051         }
1052
1053         status = dbwrap_record_delete(notify_rec);
1054         if (!NT_STATUS_IS_OK(status)) {
1055                 DEBUG(10, ("Could not delete notify_rec: %s\n",
1056                            nt_errstr(status)));
1057         }
1058
1059         status = notify_del_idx(idx_rec, get_my_vnn());
1060         if (!NT_STATUS_IS_OK(status)) {
1061                 DEBUG(10, ("Could not delete idx_rec: %s\n",
1062                            nt_errstr(status)));
1063         }
1064
1065 done:
1066         TALLOC_FREE(idx_rec);
1067         TALLOC_FREE(notify_rec);
1068         return true;
1069 }
1070
1071 static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn)
1072 {
1073         TDB_DATA value = dbwrap_record_get_value(rec);
1074         uint32_t *vnns;
1075         size_t i, num_vnns;
1076
1077         if ((value.dsize % sizeof(uint32_t)) != 0) {
1078                 DEBUG(1, ("Invalid value.dsize = %u\n",
1079                           (unsigned)value.dsize));
1080                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
1081         }
1082         num_vnns = value.dsize / sizeof(uint32_t);
1083         vnns = (uint32_t *)value.dptr;
1084
1085         for (i=0; i<num_vnns; i++) {
1086                 if (vnns[i] == vnn) {
1087                         break;
1088                 }
1089         }
1090
1091         if (i == num_vnns) {
1092                 /*
1093                  * Not found. Should not happen, but okay...
1094                  */
1095                 return NT_STATUS_OK;
1096         }
1097
1098         memmove(&vnns[i], &vnns[i+1], sizeof(uint32_t) * (num_vnns - i - 1));
1099         value.dsize -= sizeof(uint32_t);
1100
1101         if (value.dsize == 0) {
1102                 return dbwrap_record_delete(rec);
1103         }
1104         return dbwrap_record_store(rec, value, 0);
1105 }
1106
1107 struct notify_cluster_proxy_state {
1108         struct tevent_context *ev;
1109         struct notify_context *notify;
1110         struct ctdb_msg_channel *chan;
1111 };
1112
1113 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq);
1114 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq);
1115 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1116                                          uint32_t action, uint32_t filter,
1117                                          char *path);
1118
1119 struct tevent_req *notify_cluster_proxy_send(
1120         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1121         struct notify_context *notify)
1122 {
1123         struct tevent_req *req, *subreq;
1124         struct notify_cluster_proxy_state *state;
1125
1126         req = tevent_req_create(mem_ctx, &state,
1127                                 struct notify_cluster_proxy_state);
1128         if (req == NULL) {
1129                 return NULL;
1130         }
1131         state->ev = ev;
1132         state->notify = notify;
1133
1134         subreq = ctdb_msg_channel_init_send(
1135                 state, state->ev,  lp_ctdbd_socket(),
1136                 CTDB_SRVID_SAMBA_NOTIFY_PROXY);
1137         if (tevent_req_nomem(subreq, req)) {
1138                 return tevent_req_post(req, ev);
1139         }
1140         tevent_req_set_callback(subreq, notify_cluster_proxy_got_chan, req);
1141         return req;
1142 }
1143
1144 static void notify_cluster_proxy_got_chan(struct tevent_req *subreq)
1145 {
1146         struct tevent_req *req = tevent_req_callback_data(
1147                 subreq, struct tevent_req);
1148         struct notify_cluster_proxy_state *state = tevent_req_data(
1149                 req, struct notify_cluster_proxy_state);
1150         int ret;
1151
1152         ret = ctdb_msg_channel_init_recv(subreq, state, &state->chan);
1153         TALLOC_FREE(subreq);
1154         if (ret != 0) {
1155                 tevent_req_error(req, ret);
1156                 return;
1157         }
1158         subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1159         if (tevent_req_nomem(subreq, req)) {
1160                 return;
1161         }
1162         tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1163 }
1164
1165 static void notify_cluster_proxy_got_msg(struct tevent_req *subreq)
1166 {
1167         struct tevent_req *req = tevent_req_callback_data(
1168                 subreq, struct tevent_req);
1169         struct notify_cluster_proxy_state *state = tevent_req_data(
1170                 req, struct notify_cluster_proxy_state);
1171         uint8_t *msg;
1172         size_t msg_len;
1173         uint32_t action, filter;
1174         char *path;
1175         int ret;
1176         bool res;
1177
1178         ret = ctdb_msg_read_recv(subreq, talloc_tos(), &msg, &msg_len);
1179         TALLOC_FREE(subreq);
1180         if (ret != 0) {
1181                 tevent_req_error(req, ret);
1182                 return;
1183         }
1184
1185         res = notify_pull_remote_blob(talloc_tos(), msg, msg_len,
1186                                       &action, &filter, &path);
1187         TALLOC_FREE(msg);
1188         if (!res) {
1189                 tevent_req_error(req, EIO);
1190                 return;
1191         }
1192         notify_cluster_proxy_trigger(state->notify, action, filter, path);
1193         TALLOC_FREE(path);
1194
1195         subreq = ctdb_msg_read_send(state, state->ev, state->chan);
1196         if (tevent_req_nomem(subreq, req)) {
1197                 return;
1198         }
1199         tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
1200 }
1201
1202 static void notify_cluster_proxy_trigger(struct notify_context *notify,
1203                                          uint32_t action, uint32_t filter,
1204                                          char *path)
1205 {
1206         const char *p, *next_p;
1207
1208         for (p = path; p != NULL; p = next_p) {
1209                 ptrdiff_t path_len = p - path;
1210                 bool recursive;
1211
1212                 next_p = strchr(p+1, '/');
1213                 recursive = (next_p != NULL);
1214
1215                 notify_trigger_local(notify, action, filter,
1216                                      path, path_len, recursive);
1217         }
1218 }
1219
1220 int notify_cluster_proxy_recv(struct tevent_req *req)
1221 {
1222         int err;
1223
1224         if (tevent_req_is_unix_error(req, &err)) {
1225                 return err;
1226         }
1227         return 0;
1228 }