2 Unix SMB/CIFS implementation.
3 Watch dbwrap record changes
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "dbwrap/dbwrap.h"
23 #include "dbwrap_watch.h"
24 #include "dbwrap_open.h"
25 #include "lib/util/util_tdb.h"
26 #include "lib/util/tevent_ntstatus.h"
27 #include "server_id_watch.h"
28 #include "lib/dbwrap/dbwrap_private.h"
30 static ssize_t dbwrap_record_watchers_key(struct db_context *db,
31 struct db_record *rec,
32 uint8_t *wkey, size_t wkey_len)
34 size_t db_id_len = dbwrap_db_id(db, NULL, 0);
35 uint8_t db_id[db_id_len];
39 dbwrap_db_id(db, db_id, db_id_len);
41 key = dbwrap_record_get_key(rec);
43 needed = sizeof(uint32_t) + db_id_len;
44 if (needed < sizeof(uint32_t)) {
49 if (needed < key.dsize) {
53 if (wkey_len >= needed) {
54 SIVAL(wkey, 0, db_id_len);
55 memcpy(wkey + sizeof(uint32_t), db_id, db_id_len);
56 memcpy(wkey + sizeof(uint32_t) + db_id_len,
63 static bool dbwrap_record_watchers_key_parse(
64 TDB_DATA wkey, uint8_t **p_db_id, size_t *p_db_id_len, TDB_DATA *key)
68 if (wkey.dsize < sizeof(uint32_t)) {
69 DBG_WARNING("Invalid watchers key, dsize=%zu\n", wkey.dsize);
72 db_id_len = IVAL(wkey.dptr, 0);
73 if (db_id_len > (wkey.dsize - sizeof(uint32_t))) {
74 DBG_WARNING("Invalid watchers key, wkey.dsize=%zu, "
75 "db_id_len=%zu\n", wkey.dsize, db_id_len);
78 if (p_db_id != NULL) {
79 *p_db_id = wkey.dptr + sizeof(uint32_t);
81 if (p_db_id_len != NULL) {
82 *p_db_id_len = db_id_len;
85 key->dptr = wkey.dptr + sizeof(uint32_t) + db_id_len;
86 key->dsize = wkey.dsize - sizeof(uint32_t) - db_id_len;
92 * Watched records contain a header of:
94 * [uint32] num_records | deleted bit
95 * 0 [SERVER_ID_BUF_LENGTH] \
96 * 1 [SERVER_ID_BUF_LENGTH] |
97 * .. |- Array of watchers
98 * (num_records-1)[SERVER_ID_BUF_LENGTH] /
100 * [Remainder of record....]
102 * If this header is absent then this is a
103 * fresh record of length zero (no watchers).
105 * Note that a record can be deleted with
106 * watchers present. If so the deleted bit
107 * is set and the watcher server_id's are
108 * woken to allow them to remove themselves
109 * from the watcher array. The record is left
110 * present marked with the deleted bit until all
111 * watchers are removed, then the record itself
115 #define NUM_WATCHERS_DELETED_BIT (1UL<<31)
116 #define NUM_WATCHERS_MASK (NUM_WATCHERS_DELETED_BIT-1)
118 static ssize_t dbwrap_watched_parse(TDB_DATA data, struct server_id *ids,
119 size_t num_ids, bool *pdeleted,
122 size_t i, num_watchers;
125 if (data.dsize < sizeof(uint32_t)) {
126 /* Fresh or invalid record */
130 num_watchers = IVAL(data.dptr, 0);
132 deleted = num_watchers & NUM_WATCHERS_DELETED_BIT;
133 num_watchers &= NUM_WATCHERS_MASK;
135 data.dptr += sizeof(uint32_t);
136 data.dsize -= sizeof(uint32_t);
138 if (num_watchers > data.dsize/SERVER_ID_BUF_LENGTH) {
143 if (num_watchers > num_ids) {
145 * Not enough space to store the watchers server_id's.
146 * Just move past all of them to allow the remaining part
147 * of the record to be returned.
149 data.dptr += num_watchers * SERVER_ID_BUF_LENGTH;
150 data.dsize -= num_watchers * SERVER_ID_BUF_LENGTH;
155 * Note, even if marked deleted we still must
156 * return the id's array to allow awoken
157 * watchers to remove themselves.
160 for (i=0; i<num_watchers; i++) {
161 server_id_get(&ids[i], data.dptr);
162 data.dptr += SERVER_ID_BUF_LENGTH;
163 data.dsize -= SERVER_ID_BUF_LENGTH;
168 data = (TDB_DATA) {0};
173 if (pdeleted != NULL) {
180 static ssize_t dbwrap_watched_unparse(const struct server_id *watchers,
181 size_t num_watchers, bool deleted,
183 uint8_t *buf, size_t buflen)
186 uint32_t num_watchers_buf;
188 if (num_watchers > UINT32_MAX/SERVER_ID_BUF_LENGTH) {
192 len = num_watchers * SERVER_ID_BUF_LENGTH;
194 len += sizeof(uint32_t);
195 if (len < sizeof(uint32_t)) {
200 if (len < data.dsize) {
208 num_watchers_buf = num_watchers;
210 num_watchers_buf |= NUM_WATCHERS_DELETED_BIT;
214 SIVAL(buf, ofs, num_watchers_buf);
217 for (i=0; i<num_watchers; i++) {
218 server_id_put(buf+ofs, watchers[i]);
219 ofs += SERVER_ID_BUF_LENGTH;
222 if ((data.dptr != NULL) && (data.dsize != 0)) {
223 memcpy(buf + ofs, data.dptr, data.dsize);
229 struct db_watched_ctx {
230 struct db_context *backend;
231 struct messaging_context *msg;
234 struct db_watched_subrec {
235 struct db_record *subrec;
236 struct server_id *watchers;
240 static NTSTATUS dbwrap_watched_store(struct db_record *rec, TDB_DATA data,
242 static NTSTATUS dbwrap_watched_delete(struct db_record *rec);
244 static struct db_record *dbwrap_watched_fetch_locked(
245 struct db_context *db, TALLOC_CTX *mem_ctx, TDB_DATA key)
247 struct db_watched_ctx *ctx = talloc_get_type_abort(
248 db->private_data, struct db_watched_ctx);
249 struct db_record *rec;
250 struct db_watched_subrec *subrec;
251 TDB_DATA subrec_value;
252 ssize_t num_watchers;
254 rec = talloc_zero(mem_ctx, struct db_record);
258 subrec = talloc_zero(rec, struct db_watched_subrec);
259 if (subrec == NULL) {
263 rec->private_data = subrec;
265 subrec->subrec = dbwrap_fetch_locked(ctx->backend, subrec, key);
266 if (subrec->subrec == NULL) {
272 rec->key = dbwrap_record_get_key(subrec->subrec);
273 rec->store = dbwrap_watched_store;
274 rec->delete_rec = dbwrap_watched_delete;
276 subrec_value = dbwrap_record_get_value(subrec->subrec);
278 num_watchers = dbwrap_watched_parse(subrec_value, NULL, 0, NULL, NULL);
279 if (num_watchers == -1) {
280 /* Fresh or invalid record */
281 rec->value = (TDB_DATA) {};
285 subrec->watchers = talloc_array(subrec, struct server_id,
287 if (subrec->watchers == NULL) {
292 dbwrap_watched_parse(subrec_value, subrec->watchers, num_watchers,
293 &subrec->deleted, &rec->value);
298 static void dbwrap_watched_wakeup(struct db_record *rec,
299 struct db_watched_subrec *subrec)
301 struct db_context *db = dbwrap_record_get_db(rec);
302 struct db_watched_ctx *ctx = talloc_get_type_abort(
303 db->private_data, struct db_watched_ctx);
304 size_t i, num_watchers;
305 size_t db_id_len = dbwrap_db_id(db, NULL, 0);
306 uint8_t db_id[db_id_len];
310 SIVAL(len_buf, 0, db_id_len);
312 iov[0] = (struct iovec) { .iov_base = len_buf, .iov_len = 4 };
313 iov[1] = (struct iovec) { .iov_base = db_id, .iov_len = db_id_len };
314 iov[2] = (struct iovec) { .iov_base = rec->key.dptr,
315 .iov_len = rec->key.dsize };
317 dbwrap_db_id(db, db_id, db_id_len);
319 num_watchers = talloc_array_length(subrec->watchers);
323 while (i < num_watchers) {
325 struct server_id_buf tmp;
327 DBG_DEBUG("Alerting %s\n",
328 server_id_str_buf(subrec->watchers[i], &tmp));
330 status = messaging_send_iov(ctx->msg, subrec->watchers[i],
332 iov, ARRAY_SIZE(iov), NULL, 0);
333 if (!NT_STATUS_IS_OK(status)) {
334 DBG_DEBUG("messaging_send_iov to %s failed: %s\n",
335 server_id_str_buf(subrec->watchers[i], &tmp),
338 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
339 subrec->watchers[i] = subrec->watchers[num_watchers-1];
342 subrec->watchers = talloc_realloc(
343 subrec, subrec->watchers, struct server_id,
352 static NTSTATUS dbwrap_watched_save(struct db_watched_subrec *subrec,
353 TDB_DATA data, int flag)
360 num_watchers = talloc_array_length(subrec->watchers);
362 len = dbwrap_watched_unparse(subrec->watchers, num_watchers,
363 subrec->deleted, data, NULL, 0);
365 return NT_STATUS_INSUFFICIENT_RESOURCES;
368 buf = talloc_array(subrec, uint8_t, len);
370 return NT_STATUS_NO_MEMORY;
373 dbwrap_watched_unparse(subrec->watchers, num_watchers,
374 subrec->deleted, data, buf, len);
376 status = dbwrap_record_store(
377 subrec->subrec, (TDB_DATA) { .dptr = buf, .dsize = len },
385 static NTSTATUS dbwrap_watched_store(struct db_record *rec, TDB_DATA data,
388 struct db_watched_subrec *subrec = talloc_get_type_abort(
389 rec->private_data, struct db_watched_subrec);
391 dbwrap_watched_wakeup(rec, subrec);
393 subrec->deleted = false;
395 return dbwrap_watched_save(subrec, data, flag);
399 static NTSTATUS dbwrap_watched_delete(struct db_record *rec)
401 struct db_watched_subrec *subrec = talloc_get_type_abort(
402 rec->private_data, struct db_watched_subrec);
405 dbwrap_watched_wakeup(rec, subrec);
407 num_watchers = talloc_array_length(subrec->watchers);
408 if (num_watchers == 0) {
409 return dbwrap_record_delete(subrec->subrec);
412 subrec->deleted = true;
414 return dbwrap_watched_save(subrec, (TDB_DATA) {0}, 0);
417 struct dbwrap_watched_traverse_state {
418 int (*fn)(struct db_record *rec, void *private_data);
422 static int dbwrap_watched_traverse_fn(struct db_record *rec,
425 struct dbwrap_watched_traverse_state *state = private_data;
426 ssize_t num_watchers;
427 struct db_record prec = *rec;
430 num_watchers = dbwrap_watched_parse(rec->value, NULL, 0, &deleted,
433 if ((num_watchers == -1) || deleted) {
437 return state->fn(&prec, state->private_data);
440 static int dbwrap_watched_traverse(struct db_context *db,
441 int (*fn)(struct db_record *rec,
445 struct db_watched_ctx *ctx = talloc_get_type_abort(
446 db->private_data, struct db_watched_ctx);
447 struct dbwrap_watched_traverse_state state = {
448 .fn = fn, .private_data = private_data };
452 status = dbwrap_traverse(
453 ctx->backend, dbwrap_watched_traverse_fn, &state, &ret);
454 if (!NT_STATUS_IS_OK(status)) {
460 static int dbwrap_watched_traverse_read(struct db_context *db,
461 int (*fn)(struct db_record *rec,
465 struct db_watched_ctx *ctx = talloc_get_type_abort(
466 db->private_data, struct db_watched_ctx);
467 struct dbwrap_watched_traverse_state state = {
468 .fn = fn, .private_data = private_data };
472 status = dbwrap_traverse_read(
473 ctx->backend, dbwrap_watched_traverse_fn, &state, &ret);
474 if (!NT_STATUS_IS_OK(status)) {
480 static int dbwrap_watched_get_seqnum(struct db_context *db)
482 struct db_watched_ctx *ctx = talloc_get_type_abort(
483 db->private_data, struct db_watched_ctx);
484 return dbwrap_get_seqnum(ctx->backend);
487 static int dbwrap_watched_transaction_start(struct db_context *db)
489 struct db_watched_ctx *ctx = talloc_get_type_abort(
490 db->private_data, struct db_watched_ctx);
491 return dbwrap_transaction_start(ctx->backend);
494 static int dbwrap_watched_transaction_commit(struct db_context *db)
496 struct db_watched_ctx *ctx = talloc_get_type_abort(
497 db->private_data, struct db_watched_ctx);
498 return dbwrap_transaction_commit(ctx->backend);
501 static int dbwrap_watched_transaction_cancel(struct db_context *db)
503 struct db_watched_ctx *ctx = talloc_get_type_abort(
504 db->private_data, struct db_watched_ctx);
505 return dbwrap_transaction_cancel(ctx->backend);
508 struct dbwrap_watched_parse_record_state {
509 void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data);
514 static void dbwrap_watched_parse_record_parser(TDB_DATA key, TDB_DATA data,
517 struct dbwrap_watched_parse_record_state *state = private_data;
518 ssize_t num_watchers;
521 num_watchers = dbwrap_watched_parse(data, NULL, 0, &state->deleted,
523 if ((num_watchers == -1) || state->deleted) {
526 state->parser(key, userdata, state->private_data);
529 static NTSTATUS dbwrap_watched_parse_record(
530 struct db_context *db, TDB_DATA key,
531 void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data),
534 struct db_watched_ctx *ctx = talloc_get_type_abort(
535 db->private_data, struct db_watched_ctx);
536 struct dbwrap_watched_parse_record_state state = {
538 .private_data = private_data,
543 status = dbwrap_parse_record(
544 ctx->backend, key, dbwrap_watched_parse_record_parser, &state);
545 if (!NT_STATUS_IS_OK(status)) {
549 return NT_STATUS_NOT_FOUND;
554 static int dbwrap_watched_exists(struct db_context *db, TDB_DATA key)
556 struct db_watched_ctx *ctx = talloc_get_type_abort(
557 db->private_data, struct db_watched_ctx);
559 return dbwrap_exists(ctx->backend, key);
562 static size_t dbwrap_watched_id(struct db_context *db, uint8_t *id,
565 struct db_watched_ctx *ctx = talloc_get_type_abort(
566 db->private_data, struct db_watched_ctx);
568 return dbwrap_db_id(ctx->backend, id, idlen);
571 struct db_context *db_open_watched(TALLOC_CTX *mem_ctx,
572 struct db_context *backend,
573 struct messaging_context *msg)
575 struct db_context *db;
576 struct db_watched_ctx *ctx;
578 db = talloc_zero(mem_ctx, struct db_context);
582 ctx = talloc_zero(db, struct db_watched_ctx);
587 db->private_data = ctx;
591 db->lock_order = backend->lock_order;
592 backend->lock_order = DBWRAP_LOCK_ORDER_NONE;
593 ctx->backend = talloc_move(ctx, &backend);
595 db->fetch_locked = dbwrap_watched_fetch_locked;
596 db->traverse = dbwrap_watched_traverse;
597 db->traverse_read = dbwrap_watched_traverse_read;
598 db->get_seqnum = dbwrap_watched_get_seqnum;
599 db->transaction_start = dbwrap_watched_transaction_start;
600 db->transaction_commit = dbwrap_watched_transaction_commit;
601 db->transaction_cancel = dbwrap_watched_transaction_cancel;
602 db->parse_record = dbwrap_watched_parse_record;
603 db->exists = dbwrap_watched_exists;
604 db->id = dbwrap_watched_id;
605 db->name = dbwrap_name(ctx->backend);
610 struct dbwrap_watched_watch_state {
611 struct db_context *db;
614 struct server_id blocker;
618 static bool dbwrap_watched_msg_filter(struct messaging_rec *rec,
620 static void dbwrap_watched_watch_done(struct tevent_req *subreq);
621 static void dbwrap_watched_watch_blocker_died(struct tevent_req *subreq);
622 static int dbwrap_watched_watch_state_destructor(
623 struct dbwrap_watched_watch_state *state);
625 struct tevent_req *dbwrap_watched_watch_send(TALLOC_CTX *mem_ctx,
626 struct tevent_context *ev,
627 struct db_record *rec,
628 struct server_id blocker)
630 struct db_watched_subrec *subrec = talloc_get_type_abort(
631 rec->private_data, struct db_watched_subrec);
632 struct db_context *db = dbwrap_record_get_db(rec);
633 struct db_watched_ctx *ctx = talloc_get_type_abort(
634 db->private_data, struct db_watched_ctx);
636 struct tevent_req *req, *subreq;
637 struct dbwrap_watched_watch_state *state;
640 struct server_id *tmp;
643 req = tevent_req_create(mem_ctx, &state,
644 struct dbwrap_watched_watch_state);
649 state->blocker = blocker;
651 if (ctx->msg == NULL) {
652 tevent_req_nterror(req, NT_STATUS_NOT_SUPPORTED);
653 return tevent_req_post(req, ev);
656 state->me = messaging_server_id(ctx->msg);
658 needed = dbwrap_record_watchers_key(db, rec, NULL, 0);
660 tevent_req_nterror(req, NT_STATUS_INSUFFICIENT_RESOURCES);
661 return tevent_req_post(req, ev);
663 state->w_key.dsize = needed;
665 state->w_key.dptr = talloc_array(state, uint8_t, state->w_key.dsize);
666 if (tevent_req_nomem(state->w_key.dptr, req)) {
667 return tevent_req_post(req, ev);
669 dbwrap_record_watchers_key(db, rec, state->w_key.dptr,
672 subreq = messaging_filtered_read_send(
673 state, ev, ctx->msg, dbwrap_watched_msg_filter, state);
674 if (tevent_req_nomem(subreq, req)) {
675 return tevent_req_post(req, ev);
677 tevent_req_set_callback(subreq, dbwrap_watched_watch_done, req);
679 num_watchers = talloc_array_length(subrec->watchers);
681 tmp = talloc_realloc(subrec, subrec->watchers, struct server_id,
683 if (tevent_req_nomem(tmp, req)) {
684 return tevent_req_post(req, ev);
686 subrec->watchers = tmp;
687 subrec->watchers[num_watchers] = state->me;
689 status = dbwrap_watched_save(subrec, rec->value, 0);
690 if (tevent_req_nterror(req, status)) {
691 return tevent_req_post(req, ev);
694 talloc_set_destructor(state, dbwrap_watched_watch_state_destructor);
696 if (blocker.pid != 0) {
697 subreq = server_id_watch_send(state, ev, ctx->msg, blocker);
698 if (tevent_req_nomem(subreq, req)) {
699 return tevent_req_post(req, ev);
701 tevent_req_set_callback(
702 subreq, dbwrap_watched_watch_blocker_died, req);
708 static void dbwrap_watched_watch_blocker_died(struct tevent_req *subreq)
710 struct tevent_req *req = tevent_req_callback_data(
711 subreq, struct tevent_req);
712 struct dbwrap_watched_watch_state *state = tevent_req_data(
713 req, struct dbwrap_watched_watch_state);
716 ret = server_id_watch_recv(subreq, NULL);
719 tevent_req_nterror(req, map_nt_error_from_unix(ret));
722 state->blockerdead = true;
723 tevent_req_done(req);
726 static bool dbwrap_watched_remove_waiter(struct db_watched_subrec *subrec,
729 size_t i, num_watchers;
731 num_watchers = talloc_array_length(subrec->watchers);
733 for (i=0; i<num_watchers; i++) {
734 if (server_id_equal(&id, &subrec->watchers[i])) {
739 if (i == num_watchers) {
740 struct server_id_buf buf;
741 DBG_WARNING("Did not find %s in state->watchers\n",
742 server_id_str_buf(id, &buf));
746 subrec->watchers[i] = subrec->watchers[num_watchers-1];
747 subrec->watchers = talloc_realloc(subrec, subrec->watchers,
748 struct server_id, num_watchers-1);
753 static int dbwrap_watched_watch_state_destructor(
754 struct dbwrap_watched_watch_state *state)
756 struct db_record *rec;
757 struct db_watched_subrec *subrec;
761 ok = dbwrap_record_watchers_key_parse(state->w_key, NULL, NULL, &key);
763 DBG_WARNING("dbwrap_record_watchers_key_parse failed\n");
767 rec = dbwrap_fetch_locked(state->db, state, key);
769 DBG_WARNING("dbwrap_fetch_locked failed\n");
773 subrec = talloc_get_type_abort(
774 rec->private_data, struct db_watched_subrec);
776 ok = dbwrap_watched_remove_waiter(subrec, state->me);
779 status = dbwrap_watched_save(subrec, rec->value, 0);
780 if (!NT_STATUS_IS_OK(status)) {
781 DBG_WARNING("dbwrap_watched_save failed: %s\n",
790 static bool dbwrap_watched_msg_filter(struct messaging_rec *rec,
793 struct dbwrap_watched_watch_state *state = talloc_get_type_abort(
794 private_data, struct dbwrap_watched_watch_state);
797 if (rec->msg_type != MSG_DBWRAP_MODIFIED) {
800 if (rec->num_fds != 0) {
803 if (rec->buf.length != state->w_key.dsize) {
807 cmp = memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length);
812 static void dbwrap_watched_watch_done(struct tevent_req *subreq)
814 struct tevent_req *req = tevent_req_callback_data(
815 subreq, struct tevent_req);
816 struct messaging_rec *rec;
819 ret = messaging_filtered_read_recv(subreq, talloc_tos(), &rec);
822 tevent_req_nterror(req, map_nt_error_from_unix(ret));
825 tevent_req_done(req);
828 NTSTATUS dbwrap_watched_watch_recv(struct tevent_req *req,
830 struct db_record **prec,
832 struct server_id *blocker)
834 struct dbwrap_watched_watch_state *state = tevent_req_data(
835 req, struct dbwrap_watched_watch_state);
836 struct db_watched_subrec *subrec;
839 struct db_record *rec;
842 if (tevent_req_is_nterror(req, &status)) {
845 if (blockerdead != NULL) {
846 *blockerdead = state->blockerdead;
848 if (blocker != NULL) {
849 *blocker = state->blocker;
855 ok = dbwrap_record_watchers_key_parse(state->w_key, NULL, NULL, &key);
857 return NT_STATUS_INTERNAL_DB_ERROR;
860 rec = dbwrap_fetch_locked(state->db, mem_ctx, key);
862 return NT_STATUS_INTERNAL_DB_ERROR;
865 talloc_set_destructor(state, NULL);
867 subrec = talloc_get_type_abort(
868 rec->private_data, struct db_watched_subrec);
870 ok = dbwrap_watched_remove_waiter(subrec, state->me);
872 status = dbwrap_watched_save(subrec, rec->value, 0);
873 if (!NT_STATUS_IS_OK(status)) {
874 DBG_WARNING("dbwrap_watched_save failed: %s\n",