ldb_tdb: Do not make search or DB modifications without a lock
authorGary Lockyer <gary@catalyst.net.nz>
Thu, 22 Mar 2018 22:27:10 +0000 (11:27 +1300)
committerStefan Metzmacher <metze@samba.org>
Thu, 12 Apr 2018 13:05:41 +0000 (15:05 +0200)
The ldb_cache startup code would previously not take a read lock
nor a sufficiently wide write transaction.

The new code takes a read lock, and if it needs to write takes a
write lock (transaction) and re-reads before continuing.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
lib/ldb/ldb_tdb/ldb_cache.c

index 4790bcd7e531fa9980f8bfd3da74a73ed500144b..a9d2113e707b783e48d2a999caf9e77df0cddc98 100644 (file)
@@ -386,6 +386,7 @@ int ltdb_cache_load(struct ldb_module *module)
        uint64_t seq;
        struct ldb_message *baseinfo = NULL, *options = NULL;
        const struct ldb_schema_attribute *a;
+       bool have_write_txn = false;
        int r;
 
        ldb = ldb_module_get_ctx(module);
@@ -406,29 +407,38 @@ int ltdb_cache_load(struct ldb_module *module)
        baseinfo_dn = ldb_dn_new(baseinfo, ldb, LTDB_BASEINFO);
        if (baseinfo_dn == NULL) goto failed;
 
+       r = ltdb->kv_ops->lock_read(module);
+       if (r != LDB_SUCCESS) {
+               goto failed;
+       }
        r= ltdb_search_dn1(module, baseinfo_dn, baseinfo, 0);
        if (r != LDB_SUCCESS && r != LDB_ERR_NO_SUCH_OBJECT) {
-               goto failed;
+               goto failed_and_unlock;
        }
-       
+
        /* possibly initialise the baseinfo */
        if (r == LDB_ERR_NO_SUCH_OBJECT) {
 
+               /* Give up the read lock, try again with a write lock */
+               r = ltdb->kv_ops->unlock_read(module);
+               if (r != LDB_SUCCESS) {
+                       goto failed;
+               }
+
                if (ltdb->kv_ops->begin_write(ltdb) != 0) {
                        goto failed;
                }
 
+               have_write_txn = true;
+
                /* error handling for ltdb_baseinfo_init() is by
                   looking for the record again. */
                ltdb_baseinfo_init(module);
 
-               if (ltdb->kv_ops->finish_write(ltdb) != 0) {
-                       goto failed;
-               }
-
                if (ltdb_search_dn1(module, baseinfo_dn, baseinfo, 0) != LDB_SUCCESS) {
-                       goto failed;
+                       goto failed_and_unlock;
                }
+
        }
 
        /* Ignore the result, and update the sequence number */
@@ -443,16 +453,17 @@ int ltdb_cache_load(struct ldb_module *module)
        ltdb->sequence_number = seq;
 
        /* Read an interpret database options */
+
        options = ldb_msg_new(ltdb->cache);
-       if (options == NULL) goto failed;
+       if (options == NULL) goto failed_and_unlock;
 
        options_dn = ldb_dn_new(options, ldb, LTDB_OPTIONS);
-       if (options_dn == NULL) goto failed;
+       if (options_dn == NULL) goto failed_and_unlock;
 
        r= ltdb_search_dn1(module, options_dn, options, 0);
        talloc_free(options_dn);
        if (r != LDB_SUCCESS && r != LDB_ERR_NO_SUCH_OBJECT) {
-               goto failed;
+               goto failed_and_unlock;
        }
        
        /* set flags if they do exist */
@@ -479,7 +490,7 @@ int ltdb_cache_load(struct ldb_module *module)
        ltdb_attributes_unload(module);
 
        if (ltdb_index_load(module, ltdb) == -1) {
-               goto failed;
+               goto failed_and_unlock;
        }
 
        /*
@@ -488,7 +499,7 @@ int ltdb_cache_load(struct ldb_module *module)
         * partition module.
         */
        if (ltdb_attributes_load(module) == -1) {
-               goto failed;
+               goto failed_and_unlock;
        }
 
        ltdb->GUID_index_syntax = NULL;
@@ -503,10 +514,25 @@ int ltdb_cache_load(struct ldb_module *module)
        }
 
 done:
+       if (have_write_txn) {
+               if (ltdb->kv_ops->finish_write(ltdb) != 0) {
+                       goto failed;
+               }
+       } else {
+               ltdb->kv_ops->unlock_read(module);
+       }
+
        talloc_free(options);
        talloc_free(baseinfo);
        return 0;
 
+failed_and_unlock:
+       if (have_write_txn) {
+               ltdb->kv_ops->abort_write(ltdb);
+       } else {
+               ltdb->kv_ops->unlock_read(module);
+       }
+
 failed:
        talloc_free(options);
        talloc_free(baseinfo);