dbwrap_watch: Improve a debug message
[metze/samba/wip.git] / source3 / lib / dbwrap / dbwrap_watch.c
1 /*
2    Unix SMB/CIFS implementation.
3    Watch dbwrap record changes
4    Copyright (C) Volker Lendecke 2012
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "dbwrap/dbwrap.h"
23 #include "dbwrap_watch.h"
24 #include "dbwrap_open.h"
25 #include "lib/util/util_tdb.h"
26 #include "lib/util/tevent_ntstatus.h"
27 #include "server_id_watch.h"
28 #include "lib/dbwrap/dbwrap_private.h"
29
30 static ssize_t dbwrap_record_watchers_key(struct db_context *db,
31                                           struct db_record *rec,
32                                           uint8_t *wkey, size_t wkey_len)
33 {
34         size_t db_id_len = dbwrap_db_id(db, NULL, 0);
35         uint8_t db_id[db_id_len];
36         size_t needed;
37         TDB_DATA key;
38
39         dbwrap_db_id(db, db_id, db_id_len);
40
41         key = dbwrap_record_get_key(rec);
42
43         needed = sizeof(uint32_t) + db_id_len;
44         if (needed < sizeof(uint32_t)) {
45                 return -1;
46         }
47
48         needed += key.dsize;
49         if (needed < key.dsize) {
50                 return -1;
51         }
52
53         if (wkey_len >= needed) {
54                 SIVAL(wkey, 0, db_id_len);
55                 memcpy(wkey + sizeof(uint32_t), db_id, db_id_len);
56                 memcpy(wkey + sizeof(uint32_t) + db_id_len,
57                        key.dptr, key.dsize);
58         }
59
60         return needed;
61 }
62
63 static bool dbwrap_record_watchers_key_parse(
64         TDB_DATA wkey, uint8_t **p_db_id, size_t *p_db_id_len, TDB_DATA *key)
65 {
66         size_t db_id_len;
67
68         if (wkey.dsize < sizeof(uint32_t)) {
69                 DBG_WARNING("Invalid watchers key, dsize=%zu\n", wkey.dsize);
70                 return false;
71         }
72         db_id_len = IVAL(wkey.dptr, 0);
73         if (db_id_len > (wkey.dsize - sizeof(uint32_t))) {
74                 DBG_WARNING("Invalid watchers key, wkey.dsize=%zu, "
75                             "db_id_len=%zu\n", wkey.dsize, db_id_len);
76                 return false;
77         }
78         if (p_db_id != NULL) {
79                 *p_db_id = wkey.dptr + sizeof(uint32_t);
80         }
81         if (p_db_id_len != NULL) {
82                 *p_db_id_len = db_id_len;
83         }
84         if (key != NULL) {
85                 key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
86                 key->dsize = wkey.dsize - sizeof(uint32_t) - db_id_len;
87         }
88         return true;
89 }
90
91 /*
92  * Watched records contain a header of:
93  *
94  * [uint32] num_records | deleted bit
95  * 0 [SERVER_ID_BUF_LENGTH]                   \
96  * 1 [SERVER_ID_BUF_LENGTH]                   |
97  * ..                                         |- Array of watchers
98  * (num_records-1)[SERVER_ID_BUF_LENGTH]      /
99  *
100  * [Remainder of record....]
101  *
102  * If this header is absent then this is a
103  * fresh record of length zero (no watchers).
104  *
105  * Note that a record can be deleted with
106  * watchers present. If so the deleted bit
107  * is set and the watcher server_id's are
108  * woken to allow them to remove themselves
109  * from the watcher array. The record is left
110  * present marked with the deleted bit until all
111  * watchers are removed, then the record itself
112  * is deleted.
113  */
114
115 #define NUM_WATCHERS_DELETED_BIT (1UL<<31)
116 #define NUM_WATCHERS_MASK (NUM_WATCHERS_DELETED_BIT-1)
117
118 static ssize_t dbwrap_watched_parse(TDB_DATA data, struct server_id *ids,
119                                     size_t num_ids, bool *pdeleted,
120                                     TDB_DATA *pdata)
121 {
122         size_t i, num_watchers;
123         bool deleted;
124
125         if (data.dsize < sizeof(uint32_t)) {
126                 /* Fresh or invalid record */
127                 return -1;
128         }
129
130         num_watchers = IVAL(data.dptr, 0);
131
132         deleted = num_watchers & NUM_WATCHERS_DELETED_BIT;
133         num_watchers &= NUM_WATCHERS_MASK;
134
135         data.dptr += sizeof(uint32_t);
136         data.dsize -= sizeof(uint32_t);
137
138         if (num_watchers > data.dsize/SERVER_ID_BUF_LENGTH) {
139                 /* Invalid record */
140                 return -1;
141         }
142
143         if (num_watchers > num_ids) {
144                 /*
145                  * Not enough space to store the watchers server_id's.
146                  * Just move past all of them to allow the remaining part
147                  * of the record to be returned.
148                  */
149                 data.dptr += num_watchers * SERVER_ID_BUF_LENGTH;
150                 data.dsize -= num_watchers * SERVER_ID_BUF_LENGTH;
151                 goto done;
152         }
153
154         /*
155          * Note, even if marked deleted we still must
156          * return the id's array to allow awoken
157          * watchers to remove themselves.
158          */
159
160         for (i=0; i<num_watchers; i++) {
161                 server_id_get(&ids[i], data.dptr);
162                 data.dptr += SERVER_ID_BUF_LENGTH;
163                 data.dsize -= SERVER_ID_BUF_LENGTH;
164         }
165
166 done:
167         if (deleted) {
168                 data = (TDB_DATA) {0};
169         }
170         if (pdata != NULL) {
171                 *pdata = data;
172         }
173         if (pdeleted != NULL) {
174                 *pdeleted = deleted;
175         }
176
177         return num_watchers;
178 }
179
180 static ssize_t dbwrap_watched_unparse(const struct server_id *watchers,
181                                       size_t num_watchers, bool deleted,
182                                       TDB_DATA data,
183                                       uint8_t *buf, size_t buflen)
184 {
185         size_t i, len, ofs;
186         uint32_t num_watchers_buf;
187
188         if (num_watchers > UINT32_MAX/SERVER_ID_BUF_LENGTH) {
189                 return -1;
190         }
191
192         len = num_watchers * SERVER_ID_BUF_LENGTH;
193
194         len += sizeof(uint32_t);
195         if (len < sizeof(uint32_t)) {
196                 return -1;
197         }
198
199         len += data.dsize;
200         if (len < data.dsize) {
201                 return -1;
202         }
203
204         if (len > buflen) {
205                 return len;
206         }
207
208         num_watchers_buf = num_watchers;
209         if (deleted) {
210                 num_watchers_buf |= NUM_WATCHERS_DELETED_BIT;
211         }
212
213         ofs = 0;
214         SIVAL(buf, ofs, num_watchers_buf);
215         ofs += 4;
216
217         for (i=0; i<num_watchers; i++) {
218                 server_id_put(buf+ofs, watchers[i]);
219                 ofs += SERVER_ID_BUF_LENGTH;
220         }
221
222         if ((data.dptr != NULL) && (data.dsize != 0)) {
223                 memcpy(buf + ofs, data.dptr, data.dsize);
224         }
225
226         return len;
227 }
228
229 struct db_watched_ctx {
230         struct db_context *backend;
231         struct messaging_context *msg;
232 };
233
234 struct db_watched_subrec {
235         struct db_record *subrec;
236         struct server_id *watchers;
237         bool deleted;
238 };
239
240 static NTSTATUS dbwrap_watched_store(struct db_record *rec, TDB_DATA data,
241                                      int flag);
242 static NTSTATUS dbwrap_watched_delete(struct db_record *rec);
243
244 static struct db_record *dbwrap_watched_fetch_locked(
245         struct db_context *db, TALLOC_CTX *mem_ctx, TDB_DATA key)
246 {
247         struct db_watched_ctx *ctx = talloc_get_type_abort(
248                 db->private_data, struct db_watched_ctx);
249         struct db_record *rec;
250         struct db_watched_subrec *subrec;
251         TDB_DATA subrec_value;
252         ssize_t num_watchers;
253
254         rec = talloc_zero(mem_ctx, struct db_record);
255         if (rec == NULL) {
256                 return NULL;
257         }
258         subrec = talloc_zero(rec, struct db_watched_subrec);
259         if (subrec == NULL) {
260                 TALLOC_FREE(rec);
261                 return NULL;
262         }
263         rec->private_data = subrec;
264
265         subrec->subrec = dbwrap_fetch_locked(ctx->backend, subrec, key);
266         if (subrec->subrec == NULL) {
267                 TALLOC_FREE(rec);
268                 return NULL;
269         }
270
271         rec->db = db;
272         rec->key = dbwrap_record_get_key(subrec->subrec);
273         rec->store = dbwrap_watched_store;
274         rec->delete_rec = dbwrap_watched_delete;
275
276         subrec_value = dbwrap_record_get_value(subrec->subrec);
277
278         num_watchers = dbwrap_watched_parse(subrec_value, NULL, 0, NULL, NULL);
279         if (num_watchers == -1) {
280                 /* Fresh or invalid record */
281                 rec->value = (TDB_DATA) {};
282                 return rec;
283         }
284
285         subrec->watchers = talloc_array(subrec, struct server_id,
286                                         num_watchers);
287         if (subrec->watchers == NULL) {
288                 TALLOC_FREE(rec);
289                 return NULL;
290         }
291
292         dbwrap_watched_parse(subrec_value, subrec->watchers, num_watchers,
293                              &subrec->deleted, &rec->value);
294
295         return rec;
296 }
297
298 static void dbwrap_watched_wakeup(struct db_record *rec,
299                                   struct db_watched_subrec *subrec)
300 {
301         struct db_context *db = dbwrap_record_get_db(rec);
302         struct db_watched_ctx *ctx = talloc_get_type_abort(
303                 db->private_data, struct db_watched_ctx);
304         size_t i, num_watchers;
305         size_t db_id_len = dbwrap_db_id(db, NULL, 0);
306         uint8_t db_id[db_id_len];
307         uint8_t len_buf[4];
308         struct iovec iov[3];
309
310         SIVAL(len_buf, 0, db_id_len);
311
312         iov[0] = (struct iovec) { .iov_base = len_buf, .iov_len = 4 };
313         iov[1] = (struct iovec) { .iov_base = db_id, .iov_len = db_id_len };
314         iov[2] = (struct iovec) { .iov_base = rec->key.dptr,
315                                   .iov_len = rec->key.dsize };
316
317         dbwrap_db_id(db, db_id, db_id_len);
318
319         num_watchers = talloc_array_length(subrec->watchers);
320
321         i = 0;
322
323         while (i < num_watchers) {
324                 NTSTATUS status;
325                 struct server_id_buf tmp;
326
327                 DBG_DEBUG("Alerting %s\n",
328                           server_id_str_buf(subrec->watchers[i], &tmp));
329
330                 status = messaging_send_iov(ctx->msg, subrec->watchers[i],
331                                             MSG_DBWRAP_MODIFIED,
332                                             iov, ARRAY_SIZE(iov), NULL, 0);
333                 if (!NT_STATUS_IS_OK(status)) {
334                         DBG_DEBUG("messaging_send_iov to %s failed: %s\n",
335                                   server_id_str_buf(subrec->watchers[i], &tmp),
336                                   nt_errstr(status));
337                 }
338                 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
339                         subrec->watchers[i] = subrec->watchers[num_watchers-1];
340                         num_watchers -= 1;
341
342                         subrec->watchers = talloc_realloc(
343                                 subrec, subrec->watchers, struct server_id,
344                                 num_watchers);
345                         continue;
346                 }
347
348                 i += 1;
349         }
350 }
351
352 static NTSTATUS dbwrap_watched_save(struct db_watched_subrec *subrec,
353                                     TDB_DATA data, int flag)
354 {
355         size_t num_watchers;
356         ssize_t len;
357         uint8_t *buf;
358         NTSTATUS status;
359
360         num_watchers = talloc_array_length(subrec->watchers);
361
362         len = dbwrap_watched_unparse(subrec->watchers, num_watchers,
363                                      subrec->deleted, data, NULL, 0);
364         if (len == -1) {
365                 return NT_STATUS_INSUFFICIENT_RESOURCES;
366         }
367
368         buf = talloc_array(subrec, uint8_t, len);
369         if (buf == NULL) {
370                 return NT_STATUS_NO_MEMORY;
371         }
372
373         dbwrap_watched_unparse(subrec->watchers, num_watchers,
374                                subrec->deleted, data, buf, len);
375
376         status = dbwrap_record_store(
377                 subrec->subrec, (TDB_DATA) { .dptr = buf, .dsize = len },
378                 flag);
379
380         TALLOC_FREE(buf);
381
382         return status;
383 }
384
385 static NTSTATUS dbwrap_watched_store(struct db_record *rec, TDB_DATA data,
386                                      int flag)
387 {
388         struct db_watched_subrec *subrec = talloc_get_type_abort(
389                 rec->private_data, struct db_watched_subrec);
390
391         dbwrap_watched_wakeup(rec, subrec);
392
393         subrec->deleted = false;
394
395         return dbwrap_watched_save(subrec, data, flag);
396
397 }
398
399 static NTSTATUS dbwrap_watched_delete(struct db_record *rec)
400 {
401         struct db_watched_subrec *subrec = talloc_get_type_abort(
402                 rec->private_data, struct db_watched_subrec);
403         size_t num_watchers;
404
405         dbwrap_watched_wakeup(rec, subrec);
406
407         num_watchers = talloc_array_length(subrec->watchers);
408         if (num_watchers == 0) {
409                 return dbwrap_record_delete(subrec->subrec);
410         }
411
412         subrec->deleted = true;
413
414         return dbwrap_watched_save(subrec, (TDB_DATA) {0}, 0);
415 }
416
417 struct dbwrap_watched_traverse_state {
418         int (*fn)(struct db_record *rec, void *private_data);
419         void *private_data;
420 };
421
422 static int dbwrap_watched_traverse_fn(struct db_record *rec,
423                                       void *private_data)
424 {
425         struct dbwrap_watched_traverse_state *state = private_data;
426         ssize_t num_watchers;
427         struct db_record prec = *rec;
428         bool deleted;
429
430         num_watchers = dbwrap_watched_parse(rec->value, NULL, 0, &deleted,
431                                             &prec.value);
432
433         if ((num_watchers == -1) || deleted) {
434                 return 0;
435         }
436
437         return state->fn(&prec, state->private_data);
438 }
439
440 static int dbwrap_watched_traverse(struct db_context *db,
441                                    int (*fn)(struct db_record *rec,
442                                              void *private_data),
443                                    void *private_data)
444 {
445         struct db_watched_ctx *ctx = talloc_get_type_abort(
446                 db->private_data, struct db_watched_ctx);
447         struct dbwrap_watched_traverse_state state = {
448                 .fn = fn, .private_data = private_data };
449         NTSTATUS status;
450         int ret;
451
452         status = dbwrap_traverse(
453                 ctx->backend, dbwrap_watched_traverse_fn, &state, &ret);
454         if (!NT_STATUS_IS_OK(status)) {
455                 return -1;
456         }
457         return ret;
458 }
459
460 static int dbwrap_watched_traverse_read(struct db_context *db,
461                                         int (*fn)(struct db_record *rec,
462                                                   void *private_data),
463                                         void *private_data)
464 {
465         struct db_watched_ctx *ctx = talloc_get_type_abort(
466                 db->private_data, struct db_watched_ctx);
467         struct dbwrap_watched_traverse_state state = {
468                 .fn = fn, .private_data = private_data };
469         NTSTATUS status;
470         int ret;
471
472         status = dbwrap_traverse_read(
473                 ctx->backend, dbwrap_watched_traverse_fn, &state, &ret);
474         if (!NT_STATUS_IS_OK(status)) {
475                 return -1;
476         }
477         return ret;
478 }
479
480 static int dbwrap_watched_get_seqnum(struct db_context *db)
481 {
482         struct db_watched_ctx *ctx = talloc_get_type_abort(
483                 db->private_data, struct db_watched_ctx);
484         return dbwrap_get_seqnum(ctx->backend);
485 }
486
487 static int dbwrap_watched_transaction_start(struct db_context *db)
488 {
489         struct db_watched_ctx *ctx = talloc_get_type_abort(
490                 db->private_data, struct db_watched_ctx);
491         return dbwrap_transaction_start(ctx->backend);
492 }
493
494 static int dbwrap_watched_transaction_commit(struct db_context *db)
495 {
496         struct db_watched_ctx *ctx = talloc_get_type_abort(
497                 db->private_data, struct db_watched_ctx);
498         return dbwrap_transaction_commit(ctx->backend);
499 }
500
501 static int dbwrap_watched_transaction_cancel(struct db_context *db)
502 {
503         struct db_watched_ctx *ctx = talloc_get_type_abort(
504                 db->private_data, struct db_watched_ctx);
505         return dbwrap_transaction_cancel(ctx->backend);
506 }
507
508 struct dbwrap_watched_parse_record_state {
509         void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data);
510         void *private_data;
511         bool deleted;
512 };
513
514 static void dbwrap_watched_parse_record_parser(TDB_DATA key, TDB_DATA data,
515                                                void *private_data)
516 {
517         struct dbwrap_watched_parse_record_state *state = private_data;
518         ssize_t num_watchers;
519         TDB_DATA userdata;
520
521         num_watchers = dbwrap_watched_parse(data, NULL, 0, &state->deleted,
522                                             &userdata);
523         if ((num_watchers == -1) || state->deleted) {
524                 return;
525         }
526         state->parser(key, userdata, state->private_data);
527 }
528
529 static NTSTATUS dbwrap_watched_parse_record(
530         struct db_context *db, TDB_DATA key,
531         void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
532         void *private_data)
533 {
534         struct db_watched_ctx *ctx = talloc_get_type_abort(
535                 db->private_data, struct db_watched_ctx);
536         struct dbwrap_watched_parse_record_state state = {
537                 .parser = parser,
538                 .private_data = private_data,
539                 .deleted = false
540         };
541         NTSTATUS status;
542
543         status = dbwrap_parse_record(
544                 ctx->backend, key, dbwrap_watched_parse_record_parser, &state);
545         if (!NT_STATUS_IS_OK(status)) {
546                 return status;
547         }
548         if (state.deleted) {
549                 return NT_STATUS_NOT_FOUND;
550         }
551         return NT_STATUS_OK;
552 }
553
554 static int dbwrap_watched_exists(struct db_context *db, TDB_DATA key)
555 {
556         struct db_watched_ctx *ctx = talloc_get_type_abort(
557                 db->private_data, struct db_watched_ctx);
558
559         return dbwrap_exists(ctx->backend, key);
560 }
561
562 static size_t dbwrap_watched_id(struct db_context *db, uint8_t *id,
563                                 size_t idlen)
564 {
565         struct db_watched_ctx *ctx = talloc_get_type_abort(
566                 db->private_data, struct db_watched_ctx);
567
568         return dbwrap_db_id(ctx->backend, id, idlen);
569 }
570
571 struct db_context *db_open_watched(TALLOC_CTX *mem_ctx,
572                                    struct db_context *backend,
573                                    struct messaging_context *msg)
574 {
575         struct db_context *db;
576         struct db_watched_ctx *ctx;
577
578         db = talloc_zero(mem_ctx, struct db_context);
579         if (db == NULL) {
580                 return NULL;
581         }
582         ctx = talloc_zero(db, struct db_watched_ctx);
583         if (ctx == NULL) {
584                 TALLOC_FREE(db);
585                 return NULL;
586         }
587         db->private_data = ctx;
588
589         ctx->msg = msg;
590
591         db->lock_order = backend->lock_order;
592         backend->lock_order = DBWRAP_LOCK_ORDER_NONE;
593         ctx->backend = talloc_move(ctx, &backend);
594
595         db->fetch_locked = dbwrap_watched_fetch_locked;
596         db->traverse = dbwrap_watched_traverse;
597         db->traverse_read = dbwrap_watched_traverse_read;
598         db->get_seqnum = dbwrap_watched_get_seqnum;
599         db->transaction_start = dbwrap_watched_transaction_start;
600         db->transaction_commit = dbwrap_watched_transaction_commit;
601         db->transaction_cancel = dbwrap_watched_transaction_cancel;
602         db->parse_record = dbwrap_watched_parse_record;
603         db->exists = dbwrap_watched_exists;
604         db->id = dbwrap_watched_id;
605         db->name = dbwrap_name(ctx->backend);
606
607         return db;
608 }
609
610 struct dbwrap_watched_watch_state {
611         struct db_context *db;
612         struct server_id me;
613         TDB_DATA w_key;
614         struct server_id blocker;
615         bool blockerdead;
616 };
617
618 static bool dbwrap_watched_msg_filter(struct messaging_rec *rec,
619                                       void *private_data);
620 static void dbwrap_watched_watch_done(struct tevent_req *subreq);
621 static void dbwrap_watched_watch_blocker_died(struct tevent_req *subreq);
622 static int dbwrap_watched_watch_state_destructor(
623         struct dbwrap_watched_watch_state *state);
624
625 struct tevent_req *dbwrap_watched_watch_send(TALLOC_CTX *mem_ctx,
626                                              struct tevent_context *ev,
627                                              struct db_record *rec,
628                                              struct server_id blocker)
629 {
630         struct db_watched_subrec *subrec = talloc_get_type_abort(
631                 rec->private_data, struct db_watched_subrec);
632         struct db_context *db = dbwrap_record_get_db(rec);
633         struct db_watched_ctx *ctx = talloc_get_type_abort(
634                 db->private_data, struct db_watched_ctx);
635
636         struct tevent_req *req, *subreq;
637         struct dbwrap_watched_watch_state *state;
638         ssize_t needed;
639         size_t num_watchers;
640         struct server_id *tmp;
641         NTSTATUS status;
642
643         req = tevent_req_create(mem_ctx, &state,
644                                 struct dbwrap_watched_watch_state);
645         if (req == NULL) {
646                 return NULL;
647         }
648         state->db = db;
649         state->blocker = blocker;
650
651         if (ctx->msg == NULL) {
652                 tevent_req_nterror(req, NT_STATUS_NOT_SUPPORTED);
653                 return tevent_req_post(req, ev);
654         }
655
656         state->me = messaging_server_id(ctx->msg);
657
658         needed = dbwrap_record_watchers_key(db, rec, NULL, 0);
659         if (needed == -1) {
660                 tevent_req_nterror(req, NT_STATUS_INSUFFICIENT_RESOURCES);
661                 return tevent_req_post(req, ev);
662         }
663         state->w_key.dsize = needed;
664
665         state->w_key.dptr = talloc_array(state, uint8_t, state->w_key.dsize);
666         if (tevent_req_nomem(state->w_key.dptr, req)) {
667                 return tevent_req_post(req, ev);
668         }
669         dbwrap_record_watchers_key(db, rec, state->w_key.dptr,
670                                    state->w_key.dsize);
671
672         subreq = messaging_filtered_read_send(
673                 state, ev, ctx->msg, dbwrap_watched_msg_filter, state);
674         if (tevent_req_nomem(subreq, req)) {
675                 return tevent_req_post(req, ev);
676         }
677         tevent_req_set_callback(subreq, dbwrap_watched_watch_done, req);
678
679         num_watchers = talloc_array_length(subrec->watchers);
680
681         tmp = talloc_realloc(subrec, subrec->watchers, struct server_id,
682                              num_watchers + 1);
683         if (tevent_req_nomem(tmp, req)) {
684                 return tevent_req_post(req, ev);
685         }
686         subrec->watchers = tmp;
687         subrec->watchers[num_watchers] = state->me;
688
689         status = dbwrap_watched_save(subrec, rec->value, 0);
690         if (tevent_req_nterror(req, status)) {
691                 return tevent_req_post(req, ev);
692         }
693
694         talloc_set_destructor(state, dbwrap_watched_watch_state_destructor);
695
696         if (blocker.pid != 0) {
697                 subreq = server_id_watch_send(state, ev, ctx->msg, blocker);
698                 if (tevent_req_nomem(subreq, req)) {
699                         return tevent_req_post(req, ev);
700                 }
701                 tevent_req_set_callback(
702                         subreq, dbwrap_watched_watch_blocker_died, req);
703         }
704
705         return req;
706 }
707
708 static void dbwrap_watched_watch_blocker_died(struct tevent_req *subreq)
709 {
710         struct tevent_req *req = tevent_req_callback_data(
711                 subreq, struct tevent_req);
712         struct dbwrap_watched_watch_state *state = tevent_req_data(
713                 req, struct dbwrap_watched_watch_state);
714         int ret;
715
716         ret = server_id_watch_recv(subreq, NULL);
717         TALLOC_FREE(subreq);
718         if (ret != 0) {
719                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
720                 return;
721         }
722         state->blockerdead = true;
723         tevent_req_done(req);
724 }
725
726 static bool dbwrap_watched_remove_waiter(struct db_watched_subrec *subrec,
727                                          struct server_id id)
728 {
729         size_t i, num_watchers;
730
731         num_watchers = talloc_array_length(subrec->watchers);
732
733         for (i=0; i<num_watchers; i++) {
734                 if (server_id_equal(&id, &subrec->watchers[i])) {
735                         break;
736                 }
737         }
738
739         if (i == num_watchers) {
740                 struct server_id_buf buf;
741                 DBG_WARNING("Did not find %s in state->watchers\n",
742                             server_id_str_buf(id, &buf));
743                 return false;
744         }
745
746         subrec->watchers[i] = subrec->watchers[num_watchers-1];
747         subrec->watchers = talloc_realloc(subrec, subrec->watchers,
748                                           struct server_id, num_watchers-1);
749
750         return true;
751 }
752
753 static int dbwrap_watched_watch_state_destructor(
754         struct dbwrap_watched_watch_state *state)
755 {
756         struct db_record *rec;
757         struct db_watched_subrec *subrec;
758         TDB_DATA key;
759         bool ok;
760
761         ok = dbwrap_record_watchers_key_parse(state->w_key, NULL, NULL, &key);
762         if (!ok) {
763                 DBG_WARNING("dbwrap_record_watchers_key_parse failed\n");
764                 return 0;
765         }
766
767         rec = dbwrap_fetch_locked(state->db, state, key);
768         if (rec == NULL) {
769                 DBG_WARNING("dbwrap_fetch_locked failed\n");
770                 return 0;
771         }
772
773         subrec = talloc_get_type_abort(
774                 rec->private_data, struct db_watched_subrec);
775
776         ok = dbwrap_watched_remove_waiter(subrec, state->me);
777         if (ok) {
778                 NTSTATUS status;
779                 status = dbwrap_watched_save(subrec, rec->value, 0);
780                 if (!NT_STATUS_IS_OK(status)) {
781                         DBG_WARNING("dbwrap_watched_save failed: %s\n",
782                                     nt_errstr(status));
783                 }
784         }
785
786         TALLOC_FREE(rec);
787         return 0;
788 }
789
790 static bool dbwrap_watched_msg_filter(struct messaging_rec *rec,
791                                       void *private_data)
792 {
793         struct dbwrap_watched_watch_state *state = talloc_get_type_abort(
794                 private_data, struct dbwrap_watched_watch_state);
795         int cmp;
796
797         if (rec->msg_type != MSG_DBWRAP_MODIFIED) {
798                 return false;
799         }
800         if (rec->num_fds != 0) {
801                 return false;
802         }
803         if (rec->buf.length != state->w_key.dsize) {
804                 return false;
805         }
806
807         cmp = memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length);
808
809         return (cmp == 0);
810 }
811
812 static void dbwrap_watched_watch_done(struct tevent_req *subreq)
813 {
814         struct tevent_req *req = tevent_req_callback_data(
815                 subreq, struct tevent_req);
816         struct messaging_rec *rec;
817         int ret;
818
819         ret = messaging_filtered_read_recv(subreq, talloc_tos(), &rec);
820         TALLOC_FREE(subreq);
821         if (ret != 0) {
822                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
823                 return;
824         }
825         tevent_req_done(req);
826 }
827
828 NTSTATUS dbwrap_watched_watch_recv(struct tevent_req *req,
829                                    TALLOC_CTX *mem_ctx,
830                                    struct db_record **prec,
831                                    bool *blockerdead,
832                                    struct server_id *blocker)
833 {
834         struct dbwrap_watched_watch_state *state = tevent_req_data(
835                 req, struct dbwrap_watched_watch_state);
836         struct db_watched_subrec *subrec;
837         NTSTATUS status;
838         TDB_DATA key;
839         struct db_record *rec;
840         bool ok;
841
842         if (tevent_req_is_nterror(req, &status)) {
843                 return status;
844         }
845         if (blockerdead != NULL) {
846                 *blockerdead = state->blockerdead;
847         }
848         if (blocker != NULL) {
849                 *blocker = state->blocker;
850         }
851         if (prec == NULL) {
852                 return NT_STATUS_OK;
853         }
854
855         ok = dbwrap_record_watchers_key_parse(state->w_key, NULL, NULL, &key);
856         if (!ok) {
857                 return NT_STATUS_INTERNAL_DB_ERROR;
858         }
859
860         rec = dbwrap_fetch_locked(state->db, mem_ctx, key);
861         if (rec == NULL) {
862                 return NT_STATUS_INTERNAL_DB_ERROR;
863         }
864
865         talloc_set_destructor(state, NULL);
866
867         subrec = talloc_get_type_abort(
868                 rec->private_data, struct db_watched_subrec);
869
870         ok = dbwrap_watched_remove_waiter(subrec, state->me);
871         if (ok) {
872                 status = dbwrap_watched_save(subrec, rec->value, 0);
873                 if (!NT_STATUS_IS_OK(status)) {
874                         DBG_WARNING("dbwrap_watched_save failed: %s\n",
875                                     nt_errstr(status));
876                 }
877         }
878
879         *prec = rec;
880         return NT_STATUS_OK;
881 }