import ldb
import os
import samba
-
+import gc
+import time
class DsdbTests(TestCase):
version = self.samdb.get_attribute_replmetadata_version(dn, "description")
self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
+
+ def test_db_lock(self):
+ basedn = self.samdb.get_default_basedn()
+ (r1, w1) = os.pipe()
+
+ pid = os.fork()
+ if pid == 0:
+ # In the child, close the main DB, re-open just one DB
+ del(self.samdb)
+ gc.collect()
+ self.samdb = SamDB(session_info=self.session,
+ credentials=self.creds,
+ lp=self.lp)
+
+ self.samdb.transaction_start()
+
+ self.samdb.add({
+ "dn": "cn=test_db_lock_user,cn=users," + str(basedn),
+ "objectclass": "user",
+ })
+
+ # Obtain a write lock
+ self.samdb.transaction_prepare_commit()
+ os.write(w1, b"added")
+ time.sleep(2)
+
+ # Drop the write lock
+ self.samdb.transaction_cancel()
+ os._exit(0)
+
+ self.assertEqual(os.read(r1, 5), b"added")
+
+ start = time.time()
+
+ # We need to hold this iterator open to hold the all-record lock.
+ res = self.samdb.search_iterator()
+
+ # This should take at least 2 seconds because the transaction
+ # has a write lock on one backend db open
+
+ # Release the locks
+ for l in res:
+ pass
+
+ end = time.time()
+ self.assertGreater(end - start, 1.9)
+
+ (got_pid, status) = os.waitpid(pid, 0)
+ self.assertEqual(got_pid, pid)
+ self.assertTrue(os.WIFEXITED(status))
+ self.assertEqual(os.WEXITSTATUS(status), 0)