3 # Tombstone reanimation tests
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from __future__ import print_function
25 sys.path.insert(0, "bin/python")
28 from samba.ndr import ndr_unpack, ndr_print
29 from samba.dcerpc import misc
30 from samba.dcerpc import security
31 from samba.dcerpc import drsblobs
32 from samba.dcerpc.drsuapi import *
33 from samba.tests.password_test import PasswordCommon
34 from samba.compat import get_string
37 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
38 MessageElement, LdbError,
39 ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
40 ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
43 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
44 """ verify Samba restores required attributes when
45 user restores a Deleted object
49 super(RestoredObjectAttributesBaseTestCase, self).setUp()
50 self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
51 self.base_dn = self.samdb.domain_dn()
52 self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
53 self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
55 # permit password changes during this test
56 PasswordCommon.allow_password_changes(self, self.samdb)
59 super(RestoredObjectAttributesBaseTestCase, self).tearDown()
61 def GUID_string(self, guid):
62 return get_string(self.samdb.schema_format_value("objectGUID", guid))
64 def search_guid(self, guid, attrs=["*"]):
65 res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
66 scope=SCOPE_BASE, attrs=attrs,
67 controls=["show_deleted:1"])
68 self.assertEquals(len(res), 1)
71 def search_dn(self, dn):
72 res = self.samdb.search(expression="(objectClass=*)",
75 controls=["show_recycled:1"])
76 self.assertEquals(len(res), 1)
79 def _create_object(self, msg):
80 """:param msg: dict with dn and attributes to create an object from"""
81 # delete an object if leftover from previous test
82 samba.tests.delete_force(self.samdb, msg['dn'])
84 return self.search_dn(msg['dn'])
86 def assertNamesEqual(self, attrs_expected, attrs_extra):
87 self.assertEqual(attrs_expected, attrs_extra,
88 "Actual object does not have expected attributes, missing from expected (%s), extra (%s)"
89 % (str(attrs_expected.difference(attrs_extra)), str(attrs_extra.difference(attrs_expected))))
91 def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
92 self.assertNamesEqual(attrs_orig, attrs_rest)
93 # remove volatile attributes, they can't be equal
94 attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
95 for attr in attrs_orig:
96 # convert original attr value to ldif
97 orig_val = obj_orig.get(attr)
100 if not isinstance(orig_val, MessageElement):
101 orig_val = MessageElement(str(orig_val), 0, attr)
104 orig_ldif = self.samdb.write_ldif(m, 0)
105 # convert restored attr value to ldif
106 rest_val = obj_restored.get(attr)
107 self.assertFalse(rest_val is None)
109 if not isinstance(rest_val, MessageElement):
110 rest_val = MessageElement(str(rest_val), 0, attr)
112 rest_ldif = self.samdb.write_ldif(m, 0)
113 # compare generated ldif's
114 self.assertEqual(orig_ldif, rest_ldif)
116 def assertAttributesExists(self, attr_expected, obj_msg):
117 """Check object contains at least expected attrbigutes
118 :param attr_expected: dict of expected attributes with values. ** is any value
119 :param obj_msg: Ldb.Message for the object under test
121 actual_names = set(obj_msg.keys())
122 # Samba does not use 'dSCorePropagationData', so skip it
123 actual_names -= set(['dSCorePropagationData'])
124 expected_names = set(attr_expected.keys())
125 self.assertNamesEqual(expected_names, actual_names)
126 for name in attr_expected.keys():
127 expected_val = attr_expected[name]
128 actual_val = obj_msg.get(name)
129 self.assertFalse(actual_val is None, "No value for attribute '%s'" % name)
130 if expected_val == "**":
131 # "**" values means "any"
133 # if expected_val is e.g. ldb.bytes we can't depend on
134 # str(actual_value) working, we may just get a decoding
135 # error. Better to just compare raw values
136 if not isinstance(expected_val, str):
137 actual_val = actual_val[0]
139 actual_val = str(actual_val)
140 self.assertEqual(expected_val, actual_val,
141 "Unexpected value (%s) for '%s', expected (%s)" % (
142 repr(actual_val), name, repr(expected_val)))
144 def _check_metadata(self, metadata, expected):
145 repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, metadata[0])
148 for o in repl.ctr.array:
149 repl_array.append((o.attid, o.version))
150 repl_set = set(repl_array)
152 expected_set = set(expected)
153 self.assertEqual(len(repl_set), len(expected),
154 "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % (
155 str(expected_set.difference(repl_set)),
156 str(repl_set.difference(expected_set)),
160 for o in repl.ctr.array:
163 self.assertEquals(attid, o.attid,
164 "(LDAP) Wrong attid "
165 "for expected value %d, wanted 0x%08x got 0x%08x, "
167 % (i, attid, o.attid, ndr_print(repl)))
168 # Allow version to be skipped when it does not matter
169 if version is not None:
170 self.assertEquals(o.version, version,
171 "(LDAP) Wrong version for expected value %d, "
173 "wanted %d got %d, repl: \n%s"
175 version, o.version, ndr_print(repl)))
179 def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
180 """Restores a deleted object
181 :param samdb: SamDB connection to SAM
182 :param del_dn: str Deleted object DN
183 :param new_dn: str Where to restore the object
184 :param new_attrs: dict Additional attributes to set
187 msg.dn = Dn(samdb, str(del_dn))
188 msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
189 msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
190 if new_attrs is not None:
191 assert isinstance(new_attrs, dict)
192 for attr in new_attrs:
193 msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr)
194 samdb.modify(msg, ["show_deleted:1"])
197 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
199 super(BaseRestoreObjectTestCase, self).setUp()
201 def enable_recycle_bin(self):
203 msg.dn = Dn(self.samdb, "")
204 msg["enableOptionalFeature"] = MessageElement(
205 "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
206 FLAG_MOD_ADD, "enableOptionalFeature")
208 self.samdb.modify(msg)
209 except LdbError as e:
211 self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
213 def test_undelete(self):
214 print("Testing standard undelete operation")
215 usr1 = "cn=testuser,cn=users," + self.base_dn
216 samba.tests.delete_force(self.samdb, usr1)
219 "objectclass": "user",
220 "description": "test user description",
221 "samaccountname": "testuser"})
222 objLive1 = self.search_dn(usr1)
223 guid1 = objLive1["objectGUID"][0]
224 self.samdb.delete(usr1)
225 objDeleted1 = self.search_guid(guid1)
226 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
227 objLive2 = self.search_dn(usr1)
228 self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
229 samba.tests.delete_force(self.samdb, usr1)
231 def test_rename(self):
232 print("Testing attempt to rename deleted object")
233 usr1 = "cn=testuser,cn=users," + self.base_dn
236 "objectclass": "user",
237 "description": "test user description",
238 "samaccountname": "testuser"})
239 objLive1 = self.search_dn(usr1)
240 guid1 = objLive1["objectGUID"][0]
241 self.samdb.delete(usr1)
242 objDeleted1 = self.search_guid(guid1)
243 # just to make sure we get the correct error if the show deleted is missing
245 self.samdb.rename(str(objDeleted1.dn), usr1)
247 except LdbError as e1:
249 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
252 self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
254 except LdbError as e2:
256 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
258 def test_undelete_with_mod(self):
259 print("Testing standard undelete operation with modification of additional attributes")
260 usr1 = "cn=testuser,cn=users," + self.base_dn
263 "objectclass": "user",
264 "description": "test user description",
265 "samaccountname": "testuser"})
266 objLive1 = self.search_dn(usr1)
267 guid1 = objLive1["objectGUID"][0]
268 self.samdb.delete(usr1)
269 objDeleted1 = self.search_guid(guid1)
270 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"})
271 objLive2 = self.search_dn(usr1)
272 self.assertEqual(str(objLive2["url"][0]), "www.samba.org")
273 samba.tests.delete_force(self.samdb, usr1)
275 def test_undelete_newuser(self):
276 print("Testing undelete user with a different dn")
277 usr1 = "cn=testuser,cn=users," + self.base_dn
278 usr2 = "cn=testuser2,cn=users," + self.base_dn
279 samba.tests.delete_force(self.samdb, usr1)
282 "objectclass": "user",
283 "description": "test user description",
284 "samaccountname": "testuser"})
285 objLive1 = self.search_dn(usr1)
286 guid1 = objLive1["objectGUID"][0]
287 self.samdb.delete(usr1)
288 objDeleted1 = self.search_guid(guid1)
289 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2)
290 objLive2 = self.search_dn(usr2)
291 samba.tests.delete_force(self.samdb, usr1)
292 samba.tests.delete_force(self.samdb, usr2)
294 def test_undelete_existing(self):
295 print("Testing undelete user after a user with the same dn has been created")
296 usr1 = "cn=testuser,cn=users," + self.base_dn
299 "objectclass": "user",
300 "description": "test user description",
301 "samaccountname": "testuser"})
302 objLive1 = self.search_dn(usr1)
303 guid1 = objLive1["objectGUID"][0]
304 self.samdb.delete(usr1)
307 "objectclass": "user",
308 "description": "test user description",
309 "samaccountname": "testuser"})
310 objDeleted1 = self.search_guid(guid1)
312 self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
314 except LdbError as e3:
316 self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
318 def test_undelete_cross_nc(self):
319 print("Cross NC undelete")
320 c1 = "cn=ldaptestcontainer," + self.base_dn
321 c2 = "cn=ldaptestcontainer2," + self.configuration_dn
322 c3 = "cn=ldaptestcontainer," + self.configuration_dn
323 c4 = "cn=ldaptestcontainer2," + self.base_dn
324 samba.tests.delete_force(self.samdb, c1)
325 samba.tests.delete_force(self.samdb, c2)
326 samba.tests.delete_force(self.samdb, c3)
327 samba.tests.delete_force(self.samdb, c4)
330 "objectclass": "container"})
333 "objectclass": "container"})
334 objLive1 = self.search_dn(c1)
335 objLive2 = self.search_dn(c2)
336 guid1 = objLive1["objectGUID"][0]
337 guid2 = objLive2["objectGUID"][0]
338 self.samdb.delete(c1)
339 self.samdb.delete(c2)
340 objDeleted1 = self.search_guid(guid1)
341 objDeleted2 = self.search_guid(guid2)
342 # try to undelete from base dn to config
344 self.restore_deleted_object(self.samdb, objDeleted1.dn, c3)
346 except LdbError as e4:
348 self.assertEquals(num, ERR_OPERATIONS_ERROR)
349 # try to undelete from config to base dn
351 self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
353 except LdbError as e5:
355 self.assertEquals(num, ERR_OPERATIONS_ERROR)
356 # assert undeletion will work in same nc
357 self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
358 self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
361 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
362 """Test cases for delete/reanimate user objects"""
364 def _expected_user_add_attributes(self, username, user_dn, category):
365 return {'dn': user_dn,
368 'distinguishedName': user_dn,
376 'userAccountControl': '546',
378 'badPasswordTime': '0',
384 'primaryGroupID': '513',
386 'accountExpires': '9223372036854775807',
388 'sAMAccountName': username,
389 'sAMAccountType': '805306368',
390 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
393 def _expected_user_add_metadata(self):
395 (DRSUAPI_ATTID_objectClass, 1),
396 (DRSUAPI_ATTID_cn, 1),
397 (DRSUAPI_ATTID_instanceType, 1),
398 (DRSUAPI_ATTID_whenCreated, 1),
399 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
400 (DRSUAPI_ATTID_name, 1),
401 (DRSUAPI_ATTID_userAccountControl, None),
402 (DRSUAPI_ATTID_codePage, 1),
403 (DRSUAPI_ATTID_countryCode, 1),
404 (DRSUAPI_ATTID_dBCSPwd, 1),
405 (DRSUAPI_ATTID_logonHours, 1),
406 (DRSUAPI_ATTID_unicodePwd, 1),
407 (DRSUAPI_ATTID_ntPwdHistory, 1),
408 (DRSUAPI_ATTID_pwdLastSet, 1),
409 (DRSUAPI_ATTID_primaryGroupID, 1),
410 (DRSUAPI_ATTID_objectSid, 1),
411 (DRSUAPI_ATTID_accountExpires, 1),
412 (DRSUAPI_ATTID_lmPwdHistory, 1),
413 (DRSUAPI_ATTID_sAMAccountName, 1),
414 (DRSUAPI_ATTID_sAMAccountType, 1),
415 (DRSUAPI_ATTID_objectCategory, 1)]
417 def _expected_user_del_attributes(self, username, _guid, _sid):
418 guid = ndr_unpack(misc.GUID, _guid)
419 dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
420 cn = "%s\nDEL:%s" % (username, guid)
424 'distinguishedName': dn,
426 'isRecycled': 'TRUE',
434 'userAccountControl': '546',
436 'sAMAccountName': username,
437 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
440 def _expected_user_del_metadata(self):
442 (DRSUAPI_ATTID_objectClass, 1),
443 (DRSUAPI_ATTID_cn, 2),
444 (DRSUAPI_ATTID_instanceType, 1),
445 (DRSUAPI_ATTID_whenCreated, 1),
446 (DRSUAPI_ATTID_isDeleted, 1),
447 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
448 (DRSUAPI_ATTID_name, 2),
449 (DRSUAPI_ATTID_userAccountControl, None),
450 (DRSUAPI_ATTID_codePage, 2),
451 (DRSUAPI_ATTID_countryCode, 2),
452 (DRSUAPI_ATTID_dBCSPwd, 1),
453 (DRSUAPI_ATTID_logonHours, 1),
454 (DRSUAPI_ATTID_unicodePwd, 1),
455 (DRSUAPI_ATTID_ntPwdHistory, 1),
456 (DRSUAPI_ATTID_pwdLastSet, 2),
457 (DRSUAPI_ATTID_primaryGroupID, 2),
458 (DRSUAPI_ATTID_objectSid, 1),
459 (DRSUAPI_ATTID_accountExpires, 2),
460 (DRSUAPI_ATTID_lmPwdHistory, 1),
461 (DRSUAPI_ATTID_sAMAccountName, 1),
462 (DRSUAPI_ATTID_sAMAccountType, 2),
463 (DRSUAPI_ATTID_lastKnownParent, 1),
464 (DRSUAPI_ATTID_objectCategory, 2),
465 (DRSUAPI_ATTID_isRecycled, 1)]
467 def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category):
468 return {'dn': user_dn,
471 'distinguishedName': user_dn,
479 'userAccountControl': '546',
481 'badPasswordTime': '0',
487 'primaryGroupID': '513',
488 'operatorCount': '0',
491 'accountExpires': '0',
493 'sAMAccountName': username,
494 'sAMAccountType': '805306368',
495 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
496 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
499 def _expected_user_restore_metadata(self):
501 (DRSUAPI_ATTID_objectClass, 1),
502 (DRSUAPI_ATTID_cn, 3),
503 (DRSUAPI_ATTID_instanceType, 1),
504 (DRSUAPI_ATTID_whenCreated, 1),
505 (DRSUAPI_ATTID_isDeleted, 2),
506 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
507 (DRSUAPI_ATTID_name, 3),
508 (DRSUAPI_ATTID_userAccountControl, None),
509 (DRSUAPI_ATTID_codePage, 3),
510 (DRSUAPI_ATTID_countryCode, 3),
511 (DRSUAPI_ATTID_dBCSPwd, 1),
512 (DRSUAPI_ATTID_logonHours, 1),
513 (DRSUAPI_ATTID_unicodePwd, 1),
514 (DRSUAPI_ATTID_ntPwdHistory, 1),
515 (DRSUAPI_ATTID_pwdLastSet, 3),
516 (DRSUAPI_ATTID_primaryGroupID, 3),
517 (DRSUAPI_ATTID_operatorCount, 1),
518 (DRSUAPI_ATTID_objectSid, 1),
519 (DRSUAPI_ATTID_adminCount, 1),
520 (DRSUAPI_ATTID_accountExpires, 3),
521 (DRSUAPI_ATTID_lmPwdHistory, 1),
522 (DRSUAPI_ATTID_sAMAccountName, 1),
523 (DRSUAPI_ATTID_sAMAccountType, 3),
524 (DRSUAPI_ATTID_lastKnownParent, 1),
525 (DRSUAPI_ATTID_objectCategory, 3),
526 (DRSUAPI_ATTID_isRecycled, 2)]
528 def test_restore_user(self):
529 print("Test restored user attributes")
530 username = "restore_user"
531 usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
532 samba.tests.delete_force(self.samdb, usr_dn)
535 "objectClass": "user",
536 "sAMAccountName": username})
537 obj = self.search_dn(usr_dn)
538 guid = obj["objectGUID"][0]
539 sid = obj["objectSID"][0]
540 obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
541 self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj)
542 self._check_metadata(obj_rmd["replPropertyMetaData"],
543 self._expected_user_add_metadata())
544 self.samdb.delete(usr_dn)
545 obj_del = self.search_guid(guid)
546 obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
547 orig_attrs = set(obj.keys())
548 del_attrs = set(obj_del.keys())
549 self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del)
550 self._check_metadata(obj_del_rmd["replPropertyMetaData"],
551 self._expected_user_del_metadata())
552 # restore the user and fetch what's restored
553 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
554 obj_restore = self.search_guid(guid)
555 obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
556 # check original attributes and restored one are same
557 orig_attrs = set(obj.keys())
558 # windows restore more attributes that originally we have
559 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
560 rest_attrs = set(obj_restore.keys())
561 self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
562 self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
563 self._expected_user_restore_metadata())
566 class RestoreUserPwdObjectTestCase(RestoredObjectAttributesBaseTestCase):
567 """Test cases for delete/reanimate user objects with password"""
569 def _expected_userpw_add_attributes(self, username, user_dn, category):
570 return {'dn': user_dn,
573 'distinguishedName': user_dn,
581 'userAccountControl': '546',
583 'badPasswordTime': '0',
589 'primaryGroupID': '513',
591 'accountExpires': '9223372036854775807',
593 'sAMAccountName': username,
594 'sAMAccountType': '805306368',
595 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
598 def _expected_userpw_add_metadata(self):
600 (DRSUAPI_ATTID_objectClass, 1),
601 (DRSUAPI_ATTID_cn, 1),
602 (DRSUAPI_ATTID_instanceType, 1),
603 (DRSUAPI_ATTID_whenCreated, 1),
604 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
605 (DRSUAPI_ATTID_name, 1),
606 (DRSUAPI_ATTID_userAccountControl, None),
607 (DRSUAPI_ATTID_codePage, 1),
608 (DRSUAPI_ATTID_countryCode, 1),
609 (DRSUAPI_ATTID_dBCSPwd, 1),
610 (DRSUAPI_ATTID_logonHours, 1),
611 (DRSUAPI_ATTID_unicodePwd, 1),
612 (DRSUAPI_ATTID_ntPwdHistory, 1),
613 (DRSUAPI_ATTID_pwdLastSet, 1),
614 (DRSUAPI_ATTID_primaryGroupID, 1),
615 (DRSUAPI_ATTID_supplementalCredentials, 1),
616 (DRSUAPI_ATTID_objectSid, 1),
617 (DRSUAPI_ATTID_accountExpires, 1),
618 (DRSUAPI_ATTID_lmPwdHistory, 1),
619 (DRSUAPI_ATTID_sAMAccountName, 1),
620 (DRSUAPI_ATTID_sAMAccountType, 1),
621 (DRSUAPI_ATTID_objectCategory, 1)]
623 def _expected_userpw_del_attributes(self, username, _guid, _sid):
624 guid = ndr_unpack(misc.GUID, _guid)
625 dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
626 cn = "%s\nDEL:%s" % (username, guid)
630 'distinguishedName': dn,
632 'isRecycled': 'TRUE',
640 'userAccountControl': '546',
642 'sAMAccountName': username,
643 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
646 def _expected_userpw_del_metadata(self):
648 (DRSUAPI_ATTID_objectClass, 1),
649 (DRSUAPI_ATTID_cn, 2),
650 (DRSUAPI_ATTID_instanceType, 1),
651 (DRSUAPI_ATTID_whenCreated, 1),
652 (DRSUAPI_ATTID_isDeleted, 1),
653 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
654 (DRSUAPI_ATTID_name, 2),
655 (DRSUAPI_ATTID_userAccountControl, None),
656 (DRSUAPI_ATTID_codePage, 2),
657 (DRSUAPI_ATTID_countryCode, 2),
658 (DRSUAPI_ATTID_dBCSPwd, 1),
659 (DRSUAPI_ATTID_logonHours, 1),
660 (DRSUAPI_ATTID_unicodePwd, 2),
661 (DRSUAPI_ATTID_ntPwdHistory, 2),
662 (DRSUAPI_ATTID_pwdLastSet, 2),
663 (DRSUAPI_ATTID_primaryGroupID, 2),
664 (DRSUAPI_ATTID_supplementalCredentials, 2),
665 (DRSUAPI_ATTID_objectSid, 1),
666 (DRSUAPI_ATTID_accountExpires, 2),
667 (DRSUAPI_ATTID_lmPwdHistory, 2),
668 (DRSUAPI_ATTID_sAMAccountName, 1),
669 (DRSUAPI_ATTID_sAMAccountType, 2),
670 (DRSUAPI_ATTID_lastKnownParent, 1),
671 (DRSUAPI_ATTID_objectCategory, 2),
672 (DRSUAPI_ATTID_isRecycled, 1)]
674 def _expected_userpw_restore_attributes(self, username, guid, sid, user_dn, category):
675 return {'dn': user_dn,
678 'distinguishedName': user_dn,
686 'userAccountControl': '546',
688 'badPasswordTime': '0',
694 'primaryGroupID': '513',
695 'operatorCount': '0',
698 'accountExpires': '0',
700 'sAMAccountName': username,
701 'sAMAccountType': '805306368',
702 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
703 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
706 def _expected_userpw_restore_metadata(self):
708 (DRSUAPI_ATTID_objectClass, 1),
709 (DRSUAPI_ATTID_cn, 3),
710 (DRSUAPI_ATTID_instanceType, 1),
711 (DRSUAPI_ATTID_whenCreated, 1),
712 (DRSUAPI_ATTID_isDeleted, 2),
713 (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
714 (DRSUAPI_ATTID_name, 3),
715 (DRSUAPI_ATTID_userAccountControl, None),
716 (DRSUAPI_ATTID_codePage, 3),
717 (DRSUAPI_ATTID_countryCode, 3),
718 (DRSUAPI_ATTID_dBCSPwd, 2),
719 (DRSUAPI_ATTID_logonHours, 1),
720 (DRSUAPI_ATTID_unicodePwd, 3),
721 (DRSUAPI_ATTID_ntPwdHistory, 3),
722 (DRSUAPI_ATTID_pwdLastSet, 4),
723 (DRSUAPI_ATTID_primaryGroupID, 3),
724 (DRSUAPI_ATTID_supplementalCredentials, 3),
725 (DRSUAPI_ATTID_operatorCount, 1),
726 (DRSUAPI_ATTID_objectSid, 1),
727 (DRSUAPI_ATTID_adminCount, 1),
728 (DRSUAPI_ATTID_accountExpires, 3),
729 (DRSUAPI_ATTID_lmPwdHistory, 3),
730 (DRSUAPI_ATTID_sAMAccountName, 1),
731 (DRSUAPI_ATTID_sAMAccountType, 3),
732 (DRSUAPI_ATTID_lastKnownParent, 1),
733 (DRSUAPI_ATTID_objectCategory, 3),
734 (DRSUAPI_ATTID_isRecycled, 2)]
736 def test_restorepw_user(self):
737 print("Test restored user attributes")
738 username = "restorepw_user"
739 usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
740 samba.tests.delete_force(self.samdb, usr_dn)
743 "objectClass": "user",
744 "userPassword": "thatsAcomplPASS0",
745 "sAMAccountName": username})
746 obj = self.search_dn(usr_dn)
747 guid = obj["objectGUID"][0]
748 sid = obj["objectSID"][0]
749 obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
750 self.assertAttributesExists(self._expected_userpw_add_attributes(username, usr_dn, "Person"), obj)
751 self._check_metadata(obj_rmd["replPropertyMetaData"],
752 self._expected_userpw_add_metadata())
753 self.samdb.delete(usr_dn)
754 obj_del = self.search_guid(guid)
755 obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
756 orig_attrs = set(obj.keys())
757 del_attrs = set(obj_del.keys())
758 self.assertAttributesExists(self._expected_userpw_del_attributes(username, guid, sid), obj_del)
759 self._check_metadata(obj_del_rmd["replPropertyMetaData"],
760 self._expected_userpw_del_metadata())
761 # restore the user and fetch what's restored
762 self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn, {"userPassword": ["thatsAcomplPASS1"]})
763 obj_restore = self.search_guid(guid)
764 obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
765 # check original attributes and restored one are same
766 orig_attrs = set(obj.keys())
767 # windows restore more attributes that originally we have
768 orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
769 rest_attrs = set(obj_restore.keys())
770 self.assertAttributesExists(self._expected_userpw_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
771 self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
772 self._expected_userpw_restore_metadata())
775 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
776 """Test different scenarios for delete/reanimate group objects"""
778 def _make_object_dn(self, name):
779 return "CN=%s,CN=Users,%s" % (name, self.base_dn)
781 def _create_test_user(self, user_name):
782 user_dn = self._make_object_dn(user_name)
785 "objectClass": "user",
786 "sAMAccountName": user_name,
788 # delete an object if leftover from previous test
789 samba.tests.delete_force(self.samdb, user_dn)
790 # finally, create the group
792 return self.search_dn(user_dn)
794 def _create_test_group(self, group_name, members=None):
795 group_dn = self._make_object_dn(group_name)
798 "objectClass": "group",
799 "sAMAccountName": group_name,
802 ldif["member"] = [str(usr_dn) for usr_dn in members]
805 # delete an object if leftover from previous test
806 samba.tests.delete_force(self.samdb, group_dn)
807 # finally, create the group
809 return self.search_dn(group_dn)
811 def _expected_group_attributes(self, groupname, group_dn, category):
812 return {'dn': group_dn,
813 'groupType': '-2147483646',
814 'distinguishedName': group_dn,
815 'sAMAccountName': groupname,
817 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
820 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
822 'sAMAccountType': '268435456',
826 'operatorCount': '0',
832 def test_plain_group(self):
833 print("Test restored Group attributes")
835 obj = self._create_test_group("r_group")
836 guid = obj["objectGUID"][0]
838 self.samdb.delete(str(obj.dn))
839 obj_del = self.search_guid(guid)
840 # restore the Group and fetch what's restored
841 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
842 obj_restore = self.search_guid(guid)
843 # check original attributes and restored one are same
844 attr_orig = set(obj.keys())
845 # windows restore more attributes that originally we have
846 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
847 attr_rest = set(obj_restore.keys())
848 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
849 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
851 def test_group_with_members(self):
852 print("Test restored Group with members attributes")
854 usr1 = self._create_test_user("r_user_1")
855 usr2 = self._create_test_user("r_user_2")
856 obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
857 guid = obj["objectGUID"][0]
859 self.samdb.delete(str(obj.dn))
860 obj_del = self.search_guid(guid)
861 # restore the Group and fetch what's restored
862 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
863 obj_restore = self.search_guid(guid)
864 # check original attributes and restored one are same
865 attr_orig = set(obj.keys())
866 # windows restore more attributes that originally we have
867 attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
868 # and does not restore following attributes
869 attr_orig.remove("member")
870 attr_rest = set(obj_restore.keys())
871 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
872 self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
875 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
876 """Test different scenarios for delete/reanimate OU/container objects"""
878 def _expected_container_attributes(self, rdn, name, dn, category):
880 lastKnownParent = '%s' % self.base_dn
882 lastKnownParent = 'CN=Users,%s' % self.base_dn
884 'distinguishedName': dn,
886 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
889 'lastKnownParent': lastKnownParent,
897 def _create_test_ou(self, rdn, name=None, description=None):
898 ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
899 # delete an object if leftover from previous test
900 samba.tests.delete_force(self.samdb, ou_dn)
901 # create ou and return created object
902 self.samdb.create_ou(ou_dn, name=name, description=description)
903 return self.search_dn(ou_dn)
905 def test_ou_with_name_description(self):
906 print("Test OU reanimation")
907 # create OU to test with
908 obj = self._create_test_ou(rdn="r_ou",
910 description="r_ou description")
911 guid = obj["objectGUID"][0]
913 self.samdb.delete(str(obj.dn))
914 obj_del = self.search_guid(guid)
915 # restore the Object and fetch what's restored
916 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
917 obj_restore = self.search_guid(guid)
918 # check original attributes and restored one are same
919 attr_orig = set(obj.keys())
920 attr_rest = set(obj_restore.keys())
921 # windows restore more attributes that originally we have
922 attr_orig.update(["lastKnownParent"])
923 # and does not restore following attributes
924 attr_orig -= set(["description"])
925 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
926 expected_attrs = self._expected_container_attributes("OU", "r_ou", str(obj.dn), "Organizational-Unit")
927 self.assertAttributesExists(expected_attrs, obj_restore)
929 def test_container(self):
930 print("Test Container reanimation")
931 # create test Container
932 obj = self._create_object({
933 "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
934 "objectClass": "container"
936 guid = obj["objectGUID"][0]
938 self.samdb.delete(str(obj.dn))
939 obj_del = self.search_guid(guid)
940 # restore the Object and fetch what's restored
941 self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
942 obj_restore = self.search_guid(guid)
943 # check original attributes and restored one are same
944 attr_orig = set(obj.keys())
945 attr_rest = set(obj_restore.keys())
946 # windows restore more attributes that originally we have
947 attr_orig.update(["lastKnownParent"])
948 # and does not restore following attributes
949 attr_orig -= set(["showInAdvancedViewOnly"])
950 self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
951 expected_attrs = self._expected_container_attributes("CN", "r_container",
952 str(obj.dn), "Container")
953 self.assertAttributesExists(expected_attrs, obj_restore)
956 if __name__ == '__main__':