From 1265346f6c8ada27f2f0cf29a4f21a457fb6af00 Mon Sep 17 00:00:00 2001 From: Andrew Bartlett Date: Thu, 1 Feb 2018 17:16:13 +1300 Subject: [PATCH] partition: Use a transaction to write and a read lock to read the LDB_METADATA_SEQ_NUM This is critical as otherwise we can read a sequence number in advance of the data that it represents and so have a false cache. Signed-off-by: Andrew Bartlett Reviewed-by: Garming Sam --- source4/dsdb/samdb/ldb_modules/partition.c | 12 ++-- .../samdb/ldb_modules/partition_metadata.c | 65 +++++++++++++++---- 2 files changed, 60 insertions(+), 17 deletions(-) diff --git a/source4/dsdb/samdb/ldb_modules/partition.c b/source4/dsdb/samdb/ldb_modules/partition.c index 426fce36e4a2..2cb05f9ef3a7 100644 --- a/source4/dsdb/samdb/ldb_modules/partition.c +++ b/source4/dsdb/samdb/ldb_modules/partition.c @@ -861,7 +861,7 @@ static int partition_rename(struct ldb_module *module, struct ldb_request *req) } /* start a transaction */ -static int partition_start_trans(struct ldb_module *module) +int partition_start_trans(struct ldb_module *module) { int i; int ret; @@ -923,7 +923,7 @@ static int partition_start_trans(struct ldb_module *module) } /* prepare for a commit */ -static int partition_prepare_commit(struct ldb_module *module) +int partition_prepare_commit(struct ldb_module *module) { unsigned int i; struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module), @@ -960,7 +960,7 @@ static int partition_prepare_commit(struct ldb_module *module) /* end a transaction */ -static int partition_end_trans(struct ldb_module *module) +int partition_end_trans(struct ldb_module *module) { int ret, ret2; unsigned int i; @@ -1006,7 +1006,7 @@ static int partition_end_trans(struct ldb_module *module) } /* delete a transaction */ -static int partition_del_trans(struct ldb_module *module) +int partition_del_trans(struct ldb_module *module) { int ret, final_ret = LDB_SUCCESS; unsigned int i; @@ -1205,7 +1205,7 @@ static int partition_sequence_number(struct ldb_module *module, struct ldb_reque } /* lock all the backends */ -static int partition_read_lock(struct ldb_module *module) +int partition_read_lock(struct ldb_module *module) { int i; int ret; @@ -1309,7 +1309,7 @@ static int partition_read_lock(struct ldb_module *module) } /* unlock all the backends */ -static int partition_read_unlock(struct ldb_module *module) +int partition_read_unlock(struct ldb_module *module) { int i; int ret = LDB_SUCCESS; diff --git a/source4/dsdb/samdb/ldb_modules/partition_metadata.c b/source4/dsdb/samdb/ldb_modules/partition_metadata.c index 3c5180b7ab7a..e3ad0d8c6c21 100644 --- a/source4/dsdb/samdb/ldb_modules/partition_metadata.c +++ b/source4/dsdb/samdb/ldb_modules/partition_metadata.c @@ -300,7 +300,8 @@ int partition_metadata_init(struct ldb_module *module) ret = partition_metadata_open(module, false); if (ret == LDB_SUCCESS) { - goto end; + /* Great, we got the DB open */ + return LDB_SUCCESS; } /* metadata.tdb does not exist, create it */ @@ -314,18 +315,42 @@ int partition_metadata_init(struct ldb_module *module) "Migrating partition metadata: " "create of metadata.tdb gave: %s\n", ldb_errstring(ldb_module_get_ctx(module))); - talloc_free(data->metadata); - data->metadata = NULL; - goto end; + TALLOC_FREE(data->metadata); + return ret; + } + + /* + * We need to fill in the sequence number from the DB, so we + * need to get a lock over all the databases. We only read + * from the main partitions, but write to metadata so to avoid + * lock ordering we just get a transaction over the lot. + */ + ret = partition_start_trans(module); + if (ret != LDB_SUCCESS) { + TALLOC_FREE(data->metadata); + return ret; } ret = partition_metadata_set_sequence_number(module); if (ret != LDB_SUCCESS) { - talloc_free(data->metadata); - data->metadata = NULL; + TALLOC_FREE(data->metadata); + partition_del_trans(module); + return ret; + } + + ret = partition_prepare_commit(module); + if (ret != LDB_SUCCESS) { + TALLOC_FREE(data->metadata); + partition_del_trans(module); + return ret; + } + + ret = partition_end_trans(module); + if (ret != LDB_SUCCESS) { + /* Nothing much we can do */ + TALLOC_FREE(data->metadata); } -end: return ret; } @@ -335,10 +360,28 @@ end: */ int partition_metadata_sequence_number(struct ldb_module *module, uint64_t *value) { - return partition_metadata_get_uint64(module, - LDB_METADATA_SEQ_NUM, - value, - 0); + + /* We have to lock all the databases as otherwise we can + * return a sequence number that is higher than the DB values + * that we can see, as those transactions close after the + * metadata.tdb transaction closes */ + int ret = partition_read_lock(module); + if (ret != LDB_SUCCESS) { + return ret; + } + + ret = partition_metadata_get_uint64(module, + LDB_METADATA_SEQ_NUM, + value, + 0); + if (ret == LDB_SUCCESS) { + ret = partition_read_unlock(module); + } else { + /* Don't overwrite the error code */ + partition_read_unlock(module); + } + return ret; + } -- 2.34.1