1 # Unix SMB/CIFS implementation. Tests for dsdb
2 # Copyright (C) Matthieu Patou <mat@matws.net> 2010
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for samba.dsdb."""
20 from samba.credentials import Credentials
21 from samba.samdb import SamDB
22 from samba.auth import system_session
23 from samba.tests import TestCase
24 from samba.ndr import ndr_unpack, ndr_pack
25 from samba.dcerpc import drsblobs
32 class DsdbTests(TestCase):
35 super(DsdbTests, self).setUp()
36 self.lp = samba.tests.env_loadparm()
37 self.creds = Credentials()
38 self.creds.guess(self.lp)
39 self.session = system_session()
40 self.samdb = SamDB(session_info=self.session,
41 credentials=self.creds,
44 def test_get_oid_from_attrid(self):
45 oid = self.samdb.get_oid_from_attid(591614)
46 self.assertEquals(oid, "1.2.840.113556.1.4.1790")
48 def test_error_replpropertymetadata(self):
49 res = self.samdb.search(expression="cn=Administrator",
50 scope=ldb.SCOPE_SUBTREE,
51 attrs=["replPropertyMetaData"])
52 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
53 str(res[0]["replPropertyMetaData"]))
56 # Search for Description
58 old_version = o.version
59 o.version = o.version + 1
60 replBlob = ndr_pack(repl)
63 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
64 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
66 def test_error_replpropertymetadata_nochange(self):
67 res = self.samdb.search(expression="cn=Administrator",
68 scope=ldb.SCOPE_SUBTREE,
69 attrs=["replPropertyMetaData"])
70 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
71 str(res[0]["replPropertyMetaData"]))
72 replBlob = ndr_pack(repl)
75 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
76 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
78 def test_error_replpropertymetadata_allow_sort(self):
79 res = self.samdb.search(expression="cn=Administrator",
80 scope=ldb.SCOPE_SUBTREE,
81 attrs=["replPropertyMetaData"])
82 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
83 str(res[0]["replPropertyMetaData"]))
84 replBlob = ndr_pack(repl)
87 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
88 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0", "local_oid:1.3.6.1.4.1.7165.4.3.25:0"])
90 def test_twoatt_replpropertymetadata(self):
91 res = self.samdb.search(expression="cn=Administrator",
92 scope=ldb.SCOPE_SUBTREE,
93 attrs=["replPropertyMetaData", "uSNChanged"])
94 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
95 str(res[0]["replPropertyMetaData"]))
98 # Search for Description
100 old_version = o.version
101 o.version = o.version + 1
102 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
103 replBlob = ndr_pack(repl)
106 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
107 msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description")
108 self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
110 def test_set_replpropertymetadata(self):
111 res = self.samdb.search(expression="cn=Administrator",
112 scope=ldb.SCOPE_SUBTREE,
113 attrs=["replPropertyMetaData", "uSNChanged"])
114 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob,
115 str(res[0]["replPropertyMetaData"]))
118 # Search for Description
120 old_version = o.version
121 o.version = o.version + 1
122 o.local_usn = long(str(res[0]["uSNChanged"])) + 1
123 o.originating_usn = long(str(res[0]["uSNChanged"])) + 1
124 replBlob = ndr_pack(repl)
127 msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData")
128 self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"])
130 def test_ok_get_attribute_from_attid(self):
131 self.assertEquals(self.samdb.get_attribute_from_attid(13), "description")
133 def test_ko_get_attribute_from_attid(self):
134 self.assertEquals(self.samdb.get_attribute_from_attid(11979), None)
136 def test_get_attribute_replmetadata_version(self):
137 res = self.samdb.search(expression="cn=Administrator",
138 scope=ldb.SCOPE_SUBTREE,
140 self.assertEquals(len(res), 1)
142 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1)
144 def test_set_attribute_replmetadata_version(self):
145 res = self.samdb.search(expression="cn=Administrator",
146 scope=ldb.SCOPE_SUBTREE,
148 self.assertEquals(len(res), 1)
150 version = self.samdb.get_attribute_replmetadata_version(dn, "description")
151 self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2)
152 self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2)
154 def test_db_lock(self):
155 basedn = self.samdb.get_default_basedn()
160 # In the child, close the main DB, re-open just one DB
163 self.samdb = SamDB(session_info=self.session,
164 credentials=self.creds,
167 self.samdb.transaction_start()
170 "dn": "cn=test_db_lock_user,cn=users," + str(basedn),
171 "objectclass": "user",
174 # Obtain a write lock
175 self.samdb.transaction_prepare_commit()
176 os.write(w1, b"added")
179 # Drop the write lock
180 self.samdb.transaction_cancel()
183 self.assertEqual(os.read(r1, 5), b"added")
187 # We need to hold this iterator open to hold the all-record lock.
188 res = self.samdb.search_iterator()
190 # This should take at least 2 seconds because the transaction
191 # has a write lock on one backend db open
198 self.assertGreater(end - start, 1.9)
200 (got_pid, status) = os.waitpid(pid, 0)
201 self.assertEqual(got_pid, pid)
202 self.assertTrue(os.WIFEXITED(status))
203 self.assertEqual(os.WEXITSTATUS(status), 0)