partition: Use a transaction to write and a read lock to read the LDB_METADATA_SEQ_NUM
authorAndrew Bartlett <abartlet@samba.org>
Thu, 1 Feb 2018 04:16:13 +0000 (17:16 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 5 Mar 2018 19:50:14 +0000 (20:50 +0100)
This is critical as otherwise we can read a sequence number in advance
of the data that it represents and so have a false cache.

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
source4/dsdb/samdb/ldb_modules/partition.c
source4/dsdb/samdb/ldb_modules/partition_metadata.c

index 426fce36e4a264d6993fe28b7d678adf5d100241..2cb05f9ef3a7e2b11832dd1dc51ce81038e83e34 100644 (file)
@@ -861,7 +861,7 @@ static int partition_rename(struct ldb_module *module, struct ldb_request *req)
 }
 
 /* start a transaction */
-static int partition_start_trans(struct ldb_module *module)
+int partition_start_trans(struct ldb_module *module)
 {
        int i;
        int ret;
@@ -923,7 +923,7 @@ static int partition_start_trans(struct ldb_module *module)
 }
 
 /* prepare for a commit */
-static int partition_prepare_commit(struct ldb_module *module)
+int partition_prepare_commit(struct ldb_module *module)
 {
        unsigned int i;
        struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
@@ -960,7 +960,7 @@ static int partition_prepare_commit(struct ldb_module *module)
 
 
 /* end a transaction */
-static int partition_end_trans(struct ldb_module *module)
+int partition_end_trans(struct ldb_module *module)
 {
        int ret, ret2;
        unsigned int i;
@@ -1006,7 +1006,7 @@ static int partition_end_trans(struct ldb_module *module)
 }
 
 /* delete a transaction */
-static int partition_del_trans(struct ldb_module *module)
+int partition_del_trans(struct ldb_module *module)
 {
        int ret, final_ret = LDB_SUCCESS;
        unsigned int i;
@@ -1205,7 +1205,7 @@ static int partition_sequence_number(struct ldb_module *module, struct ldb_reque
 }
 
 /* lock all the backends */
-static int partition_read_lock(struct ldb_module *module)
+int partition_read_lock(struct ldb_module *module)
 {
        int i;
        int ret;
@@ -1309,7 +1309,7 @@ static int partition_read_lock(struct ldb_module *module)
 }
 
 /* unlock all the backends */
-static int partition_read_unlock(struct ldb_module *module)
+int partition_read_unlock(struct ldb_module *module)
 {
        int i;
        int ret = LDB_SUCCESS;
index 3c5180b7ab7a8a35328e1611b4b08d02f96ecb5a..e3ad0d8c6c213e42acd4a5434163d91951bd7915 100644 (file)
@@ -300,7 +300,8 @@ int partition_metadata_init(struct ldb_module *module)
 
        ret = partition_metadata_open(module, false);
        if (ret == LDB_SUCCESS) {
-               goto end;
+               /* Great, we got the DB open */
+               return LDB_SUCCESS;
        }
 
        /* metadata.tdb does not exist, create it */
@@ -314,18 +315,42 @@ int partition_metadata_init(struct ldb_module *module)
                                       "Migrating partition metadata: "
                                       "create of metadata.tdb gave: %s\n",
                                       ldb_errstring(ldb_module_get_ctx(module)));
-               talloc_free(data->metadata);
-               data->metadata = NULL;
-               goto end;
+               TALLOC_FREE(data->metadata);
+               return ret;
+       }
+
+       /*
+        * We need to fill in the sequence number from the DB, so we
+        * need to get a lock over all the databases.  We only read
+        * from the main partitions, but write to metadata so to avoid
+        * lock ordering we just get a transaction over the lot.
+        */
+       ret = partition_start_trans(module);
+       if (ret != LDB_SUCCESS) {
+               TALLOC_FREE(data->metadata);
+               return ret;
        }
 
        ret = partition_metadata_set_sequence_number(module);
        if (ret != LDB_SUCCESS) {
-               talloc_free(data->metadata);
-               data->metadata = NULL;
+               TALLOC_FREE(data->metadata);
+               partition_del_trans(module);
+               return ret;
+       }
+
+       ret = partition_prepare_commit(module);
+       if (ret != LDB_SUCCESS) {
+               TALLOC_FREE(data->metadata);
+               partition_del_trans(module);
+               return ret;
+       }
+
+       ret = partition_end_trans(module);
+       if (ret != LDB_SUCCESS) {
+               /* Nothing much we can do */
+               TALLOC_FREE(data->metadata);
        }
 
-end:
        return ret;
 }
 
@@ -335,10 +360,28 @@ end:
  */
 int partition_metadata_sequence_number(struct ldb_module *module, uint64_t *value)
 {
-       return partition_metadata_get_uint64(module,
-                                            LDB_METADATA_SEQ_NUM,
-                                            value,
-                                            0);
+
+       /* We have to lock all the databases as otherwise we can
+        * return a sequence number that is higher than the DB values
+        * that we can see, as those transactions close after the
+        * metadata.tdb transaction closes */
+       int ret = partition_read_lock(module);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       ret = partition_metadata_get_uint64(module,
+                                           LDB_METADATA_SEQ_NUM,
+                                           value,
+                                           0);
+       if (ret == LDB_SUCCESS) {
+               ret = partition_read_unlock(module);
+       } else {
+               /* Don't overwrite the error code */
+               partition_read_unlock(module);
+       }
+       return ret;
+
 }