s3:g_lock: try to keep the watch instance during g_lock_watch_data()
authorStefan Metzmacher <metze@samba.org>
Sun, 26 Jun 2022 16:16:38 +0000 (16:16 +0000)
committerRalph Boehme <slow@samba.org>
Tue, 26 Jul 2022 13:40:34 +0000 (13:40 +0000)
Unless the unique_lock_epoch changes via g_lock_lock()/g_lock_unlock()
we try to keep our existing watch instance alive while waiting
for unique_data_epoch to change.

This will become important in the following commits when the
dbwrap_watch layer will only wake up one watcher at a time
and each woken watcher will wakeup the next one. Without this
commit we would trigger an endless loop as none of the watchers
will ever change unique_data_epoch.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15125

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
source3/lib/g_lock.c

index 3e063b4985194f9310a568bf05fbe18c86381621..52a1cf9fa4677624e8fd79c8b42b46d5343a6a07 100644 (file)
@@ -1366,7 +1366,9 @@ struct g_lock_watch_data_state {
        TDB_DATA key;
        struct server_id blocker;
        bool blockerdead;
+       uint64_t unique_lock_epoch;
        uint64_t unique_data_epoch;
+       uint64_t watch_instance;
        NTSTATUS status;
 };
 
@@ -1390,6 +1392,7 @@ static void g_lock_watch_data_send_fn(
                state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
                return;
        }
+       state->unique_lock_epoch = lck.unique_lock_epoch;
        state->unique_data_epoch = lck.unique_data_epoch;
 
        DBG_DEBUG("state->unique_data_epoch=%"PRIu64"\n", state->unique_data_epoch);
@@ -1460,11 +1463,13 @@ static void g_lock_watch_data_done_fn(
 
        ok = g_lock_parse(value.dptr, value.dsize, &lck);
        if (!ok) {
+               dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
                state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
                return;
        }
 
        if (lck.unique_data_epoch != state->unique_data_epoch) {
+               dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
                DBG_DEBUG("lck.unique_data_epoch=%"PRIu64", "
                          "state->unique_data_epoch=%"PRIu64"\n",
                          lck.unique_data_epoch,
@@ -1473,9 +1478,28 @@ static void g_lock_watch_data_done_fn(
                return;
        }
 
+       /*
+        * The lock epoch changed, so we better
+        * remove ourself from the waiter list
+        * (most likely the first position)
+        * and re-add us at the end of the list.
+        *
+        * This gives other lock waiters a change
+        * to make progress.
+        *
+        * Otherwise we'll keep our waiter instance alive,
+        * keep waiting (most likely at first position).
+        */
+       if (lck.unique_lock_epoch != state->unique_lock_epoch) {
+               dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
+               state->watch_instance = dbwrap_watched_watch_add_instance(rec);
+               state->unique_lock_epoch = lck.unique_lock_epoch;
+       }
+
        subreq = dbwrap_watched_watch_send(
-               state, state->ev, rec, 0, state->blocker);
+               state, state->ev, rec, state->watch_instance, state->blocker);
        if (subreq == NULL) {
+               dbwrap_watched_watch_remove_instance(rec, state->watch_instance);
                state->status = NT_STATUS_NO_MEMORY;
                return;
        }
@@ -1491,9 +1515,10 @@ static void g_lock_watch_data_done(struct tevent_req *subreq)
        struct g_lock_watch_data_state *state = tevent_req_data(
                req, struct g_lock_watch_data_state);
        NTSTATUS status;
+       uint64_t instance = 0;
 
        status = dbwrap_watched_watch_recv(
-               subreq, NULL, &state->blockerdead, &state->blocker);
+               subreq, &instance, &state->blockerdead, &state->blocker);
        TALLOC_FREE(subreq);
        if (tevent_req_nterror(req, status)) {
                DBG_DEBUG("dbwrap_watched_watch_recv returned %s\n",
@@ -1501,6 +1526,8 @@ static void g_lock_watch_data_done(struct tevent_req *subreq)
                return;
        }
 
+       state->watch_instance = instance;
+
        status = dbwrap_do_locked(
                state->ctx->db, state->key, g_lock_watch_data_done_fn, req);
        if (tevent_req_nterror(req, status)) {