From 500a729a5566a1c8afc36e6f3e73910b13c8b9ad Mon Sep 17 00:00:00 2001 From: Volker Lendecke Date: Wed, 24 Oct 2018 14:31:34 +0200 Subject: [PATCH] tdb: Make record deletion circular-chain safe Before this patch we had 3 loops walking a hash chain to delete records: tdb_do_delete() to find the predecessor of the record that was to be deleted. tdb_count_dead(), the name says it all and tdb_purge_dead() to give back all dead records from a chain to the freelist. This patch introduces tdb_trim_dead that walks a hash chain just once. While it does so it counts the number of dead records, and all records beyond tdb->max_dead_records are moved to the freelist. Normal record deletion now works by always marking a record as dead in step 1 and then calling tdb_trim_dead. This is made safe against circular chains by doing the slow chain walk only in the case when we did not delete a dead record during our walk. It changes our dynamics a bit: When deleting a record with non-zero max_dead_records, now we always leave that number of records around when deleting, doing a blocking lock on the freelist when we found too many dead records. Previously when exceeding max_dead_records we wiped all dead records to start accumulating them from scratch, assuming we could lock the freelist in a nonblocking fashion. The net effect for an uncontended freelist is the same: In tdb_allocate() we still completely hand over all dead records to the freelist when we could lock it, it just happens later than without this patch. This means for a lightly loaded system we will potentially leave more dead records around in databases like locking.tdb. However, on a heavily loaded system we become more predictable: If the freelist is so heavily contended that across many deletes we can't get hold of it, previously we accumulated more dead records than max_dead_records would allow. This is a really lowlevel tradeoff that is likely hard to measure, but to me becoming more deterministic without sacrificing too much parallelism (we keep more dead records around) is worth trying. Signed-off-by: Volker Lendecke Reviewed-by: Jeremy Allison Autobuild-User(master): Jeremy Allison Autobuild-Date(master): Tue Oct 30 02:48:38 CET 2018 on sn-devel-144 --- lib/tdb/common/freelist.c | 11 ++ lib/tdb/common/tdb.c | 217 +++++++++++++++++++---------------- lib/tdb/common/tdb_private.h | 3 +- 3 files changed, 129 insertions(+), 102 deletions(-) diff --git a/lib/tdb/common/freelist.c b/lib/tdb/common/freelist.c index 5221bf3679e..90643862208 100644 --- a/lib/tdb/common/freelist.c +++ b/lib/tdb/common/freelist.c @@ -555,6 +555,17 @@ static bool tdb_alloc_dead( return (tdb_ofs_write(tdb, last_ptr, &rec->next) == 0); } +static void tdb_purge_dead(struct tdb_context *tdb, uint32_t hash) +{ + uint32_t max_dead_records = tdb->max_dead_records; + + tdb->max_dead_records = 0; + + tdb_trim_dead(tdb, hash); + + tdb->max_dead_records = max_dead_records; +} + /* * Chain "hash" is assumed to be locked */ diff --git a/lib/tdb/common/tdb.c b/lib/tdb/common/tdb.c index 2a6d8977002..9c80a36e00a 100644 --- a/lib/tdb/common/tdb.c +++ b/lib/tdb/common/tdb.c @@ -357,103 +357,139 @@ _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key) return ret; } -/* actually delete an entry in the database given the offset */ -int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec) +/* + * Move a dead record to the freelist. The hash chain and freelist + * must be locked. + */ +static int tdb_del_dead(struct tdb_context *tdb, + uint32_t last_ptr, + uint32_t rec_ptr, + struct tdb_record *rec, + bool *deleted) { - tdb_off_t last_ptr, i; - struct tdb_record lastrec; - - if (tdb->read_only || tdb->traverse_read) return -1; + int ret; - if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) || - tdb_write_lock_record(tdb, rec_ptr) == -1) { - /* Someone traversing here: mark it as dead */ - rec->magic = TDB_DEAD_MAGIC; - return tdb_rec_write(tdb, rec_ptr, rec); + ret = tdb_write_lock_record(tdb, rec_ptr); + if (ret == -1) { + /* Someone traversing here: Just leave it dead */ + return 0; } - if (tdb_write_unlock_record(tdb, rec_ptr) != 0) - return -1; - - /* find previous record in hash chain */ - if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1) - return -1; - for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next) - if (tdb_rec_read(tdb, i, &lastrec) == -1) - return -1; - - /* unlink it: next ptr is at start of record. */ - if (last_ptr == 0) - last_ptr = TDB_HASH_TOP(rec->full_hash); - if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) + ret = tdb_write_unlock_record(tdb, rec_ptr); + if (ret == -1) { return -1; - - /* recover the space */ - if (tdb_free(tdb, rec_ptr, rec) == -1) + } + ret = tdb_ofs_write(tdb, last_ptr, &rec->next); + if (ret == -1) { return -1; - return 0; -} - -static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash) -{ - int res = 0; - tdb_off_t rec_ptr; - struct tdb_record rec; - - /* read in the hash top */ - if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) - return 0; + } - while (rec_ptr) { - if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) - return 0; + *deleted = true; - if (rec.magic == TDB_DEAD_MAGIC) { - res += 1; - } - rec_ptr = rec.next; - } - return res; + ret = tdb_free(tdb, rec_ptr, rec); + return ret; } /* - * Purge all DEAD records from a hash chain + * Walk the hash chain and leave tdb->max_dead_records around. Move + * the rest of dead records to the freelist. */ -int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash) +int tdb_trim_dead(struct tdb_context *tdb, uint32_t hash) { - int res = -1; + struct tdb_chainwalk_ctx chainwalk; struct tdb_record rec; - tdb_off_t rec_ptr; + tdb_off_t last_ptr, rec_ptr; + bool locked_freelist = false; + int num_dead = 0; + int ret; - if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) { - /* - * Don't block the freelist if not strictly necessary - */ + last_ptr = TDB_HASH_TOP(hash); + + /* + * Init chainwalk with the pointer to the hash top. It might + * be that the very first record in the chain is a dead one + * that we have to delete. + */ + tdb_chainwalk_init(&chainwalk, last_ptr); + + ret = tdb_ofs_read(tdb, last_ptr, &rec_ptr); + if (ret == -1) { return -1; } - /* read in the hash top */ - if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) - goto fail; - - while (rec_ptr) { - tdb_off_t next; + while (rec_ptr != 0) { + bool deleted = false; + uint32_t next; - if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) { + ret = tdb_rec_read(tdb, rec_ptr, &rec); + if (ret == -1) { goto fail; } + /* + * Make a copy of rec.next: Further down we might + * delete and put the record on the freelist. Make + * sure that modifications in that code path can't + * break the chainwalk here. + */ next = rec.next; - if (rec.magic == TDB_DEAD_MAGIC - && tdb_do_delete(tdb, rec_ptr, &rec) == -1) { - goto fail; + if (rec.magic == TDB_DEAD_MAGIC) { + num_dead += 1; + + if (num_dead > tdb->max_dead_records) { + + if (!locked_freelist) { + /* + * Lock the freelist only if + * it's really required. + */ + ret = tdb_lock(tdb, -1, F_WRLCK); + if (ret == -1) { + goto fail; + }; + locked_freelist = true; + } + + ret = tdb_del_dead( + tdb, + last_ptr, + rec_ptr, + &rec, + &deleted); + + if (ret == -1) { + goto fail; + } + } + } + + /* + * Don't do the chainwalk check if "rec_ptr" was + * deleted. We reduced the chain, and the chainwalk + * check might catch up early. Imagine a valid chain + * with just dead records: We never can bump the + * "slow" pointer in chainwalk_check, as there isn't + * anything left to jump to and compare. + */ + if (!deleted) { + bool ok; + + last_ptr = rec_ptr; + + ok = tdb_chainwalk_check(tdb, &chainwalk, next); + if (!ok) { + ret = -1; + goto fail; + } } rec_ptr = next; } - res = 0; - fail: - tdb_unlock(tdb, -1, F_WRLCK); - return res; + ret = 0; +fail: + if (locked_freelist) { + tdb_unlock(tdb, -1, F_WRLCK); + } + return ret; } /* delete an entry in the database given a key */ @@ -473,38 +509,19 @@ static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash) return -1; } - if (tdb->max_dead_records != 0) { - - uint32_t magic = TDB_DEAD_MAGIC; - - /* - * Allow for some dead records per hash chain, mainly for - * tdb's with a very high create/delete rate like locking.tdb. - */ - - if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) { - /* - * Don't let the per-chain freelist grow too large, - * delete all existing dead records - */ - tdb_purge_dead(tdb, hash); - } - - /* - * Just mark the record as dead. - */ - ret = tdb_ofs_write( - tdb, rec_ptr + offsetof(struct tdb_record, magic), - &magic); - } - else { - ret = tdb_do_delete(tdb, rec_ptr, &rec); + /* + * Mark the record dead + */ + rec.magic = TDB_DEAD_MAGIC; + ret = tdb_rec_write(tdb, rec_ptr, &rec); + if (ret == -1) { + goto done; } - if (ret == 0) { - tdb_increment_seqnum(tdb); - } + tdb_increment_seqnum(tdb); + ret = tdb_trim_dead(tdb, hash); +done: if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0) TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n")); return ret; diff --git a/lib/tdb/common/tdb_private.h b/lib/tdb/common/tdb_private.h index 307cad92c2a..42aaac62f59 100644 --- a/lib/tdb/common/tdb_private.h +++ b/lib/tdb/common/tdb_private.h @@ -311,7 +311,6 @@ int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off); bool tdb_needs_recovery(struct tdb_context *tdb); int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec); int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec); -int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec); unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len); int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key, tdb_off_t offset, tdb_len_t len, @@ -323,7 +322,7 @@ tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t has tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash, struct tdb_record *r, tdb_len_t length, tdb_off_t *p_last_ptr); -int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash); +int tdb_trim_dead(struct tdb_context *tdb, uint32_t hash); void tdb_io_init(struct tdb_context *tdb); int tdb_expand(struct tdb_context *tdb, tdb_off_t size); tdb_off_t tdb_expand_adjust(tdb_off_t map_size, tdb_off_t size, int page_size); -- 2.34.1