self.assertEqual(got_pid, pid)
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(os.WEXITSTATUS(status), 0)
+
+
+ def test_full_db_lock(self):
+ basedn = self.samdb.get_default_basedn()
+ backend_filename = "%s.ldb" % basedn.get_casefold()
+ backend_subpath = os.path.join("sam.ldb.d",
+ backend_filename)
+ backend_path = self.lp.private_path(backend_subpath)
+ (r1, w1) = os.pipe()
+
+ pid = os.fork()
+ if pid == 0:
+ # In the child, close the main DB, re-open just one DB
+ del(self.samdb)
+ gc.collect()
+
+ backenddb = ldb.Ldb(backend_path)
+
+
+ backenddb.transaction_start()
+
+ backenddb.add({"dn":"@DSDB_LOCK_TEST"})
+ backenddb.delete("@DSDB_LOCK_TEST")
+
+ # Obtain a write lock
+ backenddb.transaction_prepare_commit()
+ os.write(w1, b"added")
+ time.sleep(2)
+
+ # Drop the write lock
+ backenddb.transaction_cancel()
+ os._exit(0)
+
+ self.assertEqual(os.read(r1, 5), b"added")
+
+ start = time.time()
+
+ # We need to hold this iterator open to hold the all-record lock.
+ res = self.samdb.search_iterator()
+
+ # This should take at least 2 seconds because the transaction
+ # has a write lock on one backend db open
+
+ end = time.time()
+ self.assertGreater(end - start, 1.9)
+
+ # Release the locks
+ for l in res:
+ pass
+
+ (got_pid, status) = os.waitpid(pid, 0)
+ self.assertEqual(got_pid, pid)
+ self.assertTrue(os.WIFEXITED(status))
+ self.assertEqual(os.WEXITSTATUS(status), 0)