dsdb: lock metadata.tdb during lock_read in partitions module
authorAndrew Bartlett <abartlet@samba.org>
Wed, 15 May 2019 22:44:42 +0000 (10:44 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 20 May 2019 04:01:10 +0000 (04:01 +0000)
metadata.tdb was being locked during transactions, but not during read, and
we should ensure we take all our locks in order for consistency

BUG: https://bugzilla.samba.org/show_bug.cgi?id=13950

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Gary Lockyer <gary@catalyst.net.nz>
source4/dsdb/samdb/ldb_modules/partition.c
source4/dsdb/samdb/ldb_modules/partition.h
source4/dsdb/samdb/ldb_modules/partition_metadata.c

index dca4b7f7993ce983df9c8f2c68a37eb01fd1b2bd..c9d815b4fb0e22b69765f83d90fa72342a0cdcc8 100644 (file)
@@ -1435,7 +1435,7 @@ int partition_read_lock(struct ldb_module *module)
 
        /*
         * This order must match that in prepare_commit(), start with
-        * the metadata partition (sam.ldb) lock
+        * the top level DB (sam.ldb) lock
         */
        ret = ldb_next_read_lock(module);
        if (ret != LDB_SUCCESS) {
@@ -1449,7 +1449,7 @@ int partition_read_lock(struct ldb_module *module)
        }
 
        /*
-        * The metadata partition (sam.ldb) lock is not
+        * The top level DB (sam.ldb) lock is not
         * enough to block another process in prepare_commit(),
         * because prepare_commit() is a no-op, if nothing
         * was changed in the specific backend.
@@ -1476,18 +1476,24 @@ int partition_read_lock(struct ldb_module *module)
                              ldb_dn_get_linearized(
                                      data->partitions[i]->ctrl->dn));
 
-               /* Back it out, if it fails on one */
-               for (i--; i >= 0; i--) {
-                       ret2 = ldb_next_read_unlock(data->partitions[i]->module);
-                       if (ret2 != LDB_SUCCESS) {
-                               ldb_debug(ldb,
-                                         LDB_DEBUG_FATAL,
-                                         "Failed to unlock db: %s / %s",
-                                         ldb_errstring(ldb),
-                                         ldb_strerror(ret2));
-                       }
-               }
-               ret2 = ldb_next_read_unlock(module);
+               goto failed;
+       }
+
+       /*
+        * Because in prepare_commit this must come last, to ensure
+        * lock ordering we have to do this last here also
+        */
+       ret = partition_metadata_read_lock(module);
+       if (ret != LDB_SUCCESS) {
+               goto failed;
+       }
+
+       return LDB_SUCCESS;
+
+failed:
+       /* Back it out, if it fails on one */
+       for (i--; i >= 0; i--) {
+               ret2 = ldb_next_read_unlock(data->partitions[i]->module);
                if (ret2 != LDB_SUCCESS) {
                        ldb_debug(ldb,
                                  LDB_DEBUG_FATAL,
@@ -1495,10 +1501,16 @@ int partition_read_lock(struct ldb_module *module)
                                  ldb_errstring(ldb),
                                  ldb_strerror(ret2));
                }
-               return ret;
        }
-
-       return LDB_SUCCESS;
+       ret2 = ldb_next_read_unlock(module);
+       if (ret2 != LDB_SUCCESS) {
+               ldb_debug(ldb,
+                         LDB_DEBUG_FATAL,
+                         "Failed to unlock db: %s / %s",
+                         ldb_errstring(ldb),
+                         ldb_strerror(ret2));
+       }
+       return ret;
 }
 
 /* unlock all the backends */
@@ -1566,6 +1578,20 @@ int partition_read_unlock(struct ldb_module *module)
                }
        }
 
+       /*
+        * Because in prepare_commit this must come last, to ensure
+        * lock ordering we have to do this last here also
+        */
+       ret = partition_metadata_read_unlock(module);
+
+       /*
+        * Don't overwrite the original failure code
+        * if there was one
+        */
+       if (ret == LDB_SUCCESS) {
+               ret = ret2;
+       }
+
        return ret;
 }
 
index 1bf213f9767706a47fafa436697e00c260a4d5ed..5790838e31853e6c5aa88324b2a91d7414a0cbc0 100644 (file)
@@ -43,6 +43,7 @@ struct partition_module {
 struct partition_metadata {
        struct tdb_wrap *db;
        int in_transaction;
+       int read_lock_count;
 };
 
 struct partition_private_data {
index 02e56e175a36f323bfd0ab771d07960d86d84ee3..349f79c26457925e7b66f57dfe01b6121818ab26 100644 (file)
@@ -421,6 +421,69 @@ int partition_metadata_sequence_number_increment(struct ldb_module *module, uint
        ret = partition_metadata_set_uint64(module, LDB_METADATA_SEQ_NUM, *value, false);
        return ret;
 }
+/*
+  lock the database for read - use by partition_lock_read
+*/
+int partition_metadata_read_lock(struct ldb_module *module)
+{
+       struct partition_private_data *data
+               = talloc_get_type_abort(ldb_module_get_private(module),
+                                       struct partition_private_data);
+       struct tdb_context *tdb = NULL;
+       int tdb_ret = 0;
+       int ret;
+
+       if (!data || !data->metadata || !data->metadata->db) {
+               return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
+                                       "partition_metadata: metadata not initialized");
+       }
+       tdb = data->metadata->db->tdb;
+
+       if (tdb_transaction_active(tdb) == false &&
+           data->metadata->read_lock_count == 0) {
+               tdb_ret = tdb_lockall_read(tdb);
+       }
+       if (tdb_ret == 0) {
+               data->metadata->read_lock_count++;
+               return LDB_SUCCESS;
+       } else {
+               /* Sadly we can't call ltdb_err_map(tdb_error(tdb)); */
+               ret = LDB_ERR_BUSY;
+       }
+       ldb_debug_set(ldb_module_get_ctx(module),
+                     LDB_DEBUG_FATAL,
+                     "Failure during partition_metadata_read_lock(): %s",
+                     tdb_errorstr(tdb));
+       return ret;
+}
+
+/*
+  unlock the database after a partition_metadata_lock_read()
+*/
+int partition_metadata_read_unlock(struct ldb_module *module)
+{
+       struct partition_private_data *data
+               = talloc_get_type_abort(ldb_module_get_private(module),
+                                       struct partition_private_data);
+       struct tdb_context *tdb = NULL;
+
+       data = talloc_get_type_abort(ldb_module_get_private(module),
+                                    struct partition_private_data);
+       if (!data || !data->metadata || !data->metadata->db) {
+               return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
+                                       "partition_metadata: metadata not initialized");
+       }
+       tdb = data->metadata->db->tdb;
+
+       if (!tdb_transaction_active(tdb) &&
+           data->metadata->read_lock_count == 1) {
+               tdb_unlockall_read(tdb);
+               data->metadata->read_lock_count--;
+               return 0;
+       }
+       data->metadata->read_lock_count--;
+       return 0;
+}
 
 
 /*