PY3: change shebang to python3 in source4/dsdb dir
[metze/samba/wip.git] / source4 / dsdb / tests / python / tombstone_reanimation.py
1 #!/usr/bin/env python3
2 #
3 # Tombstone reanimation tests
4 #
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2014
6 # Copyright (C) Nadezhda Ivanova <nivanova@symas.com> 2014
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20
21 from __future__ import print_function
22 import sys
23 import unittest
24
25 sys.path.insert(0, "bin/python")
26 import samba
27
28 from samba.ndr import ndr_unpack, ndr_print
29 from samba.dcerpc import misc
30 from samba.dcerpc import security
31 from samba.dcerpc import drsblobs
32 from samba.dcerpc.drsuapi import *
33 from samba.tests.password_test import PasswordCommon
34 from samba.compat import get_string
35
36 import samba.tests
37 from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
38                  MessageElement, LdbError,
39                  ERR_ATTRIBUTE_OR_VALUE_EXISTS, ERR_NO_SUCH_OBJECT, ERR_ENTRY_ALREADY_EXISTS,
40                  ERR_OPERATIONS_ERROR, ERR_UNWILLING_TO_PERFORM)
41
42
43 class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase):
44     """ verify Samba restores required attributes when
45         user restores a Deleted object
46     """
47
48     def setUp(self):
49         super(RestoredObjectAttributesBaseTestCase, self).setUp()
50         self.samdb = samba.tests.connect_samdb_env("TEST_SERVER", "TEST_USERNAME", "TEST_PASSWORD")
51         self.base_dn = self.samdb.domain_dn()
52         self.schema_dn = self.samdb.get_schema_basedn().get_linearized()
53         self.configuration_dn = self.samdb.get_config_basedn().get_linearized()
54
55         # permit password changes during this test
56         PasswordCommon.allow_password_changes(self, self.samdb)
57
58     def tearDown(self):
59         super(RestoredObjectAttributesBaseTestCase, self).tearDown()
60
61     def GUID_string(self, guid):
62         return get_string(self.samdb.schema_format_value("objectGUID", guid))
63
64     def search_guid(self, guid, attrs=["*"]):
65         res = self.samdb.search(base="<GUID=%s>" % self.GUID_string(guid),
66                                 scope=SCOPE_BASE, attrs=attrs,
67                                 controls=["show_deleted:1"])
68         self.assertEquals(len(res), 1)
69         return res[0]
70
71     def search_dn(self, dn):
72         res = self.samdb.search(expression="(objectClass=*)",
73                                 base=dn,
74                                 scope=SCOPE_BASE,
75                                 controls=["show_recycled:1"])
76         self.assertEquals(len(res), 1)
77         return res[0]
78
79     def _create_object(self, msg):
80         """:param msg: dict with dn and attributes to create an object from"""
81         # delete an object if leftover from previous test
82         samba.tests.delete_force(self.samdb, msg['dn'])
83         self.samdb.add(msg)
84         return self.search_dn(msg['dn'])
85
86     def assertNamesEqual(self, attrs_expected, attrs_extra):
87         self.assertEqual(attrs_expected, attrs_extra,
88                          "Actual object does not have expected attributes, missing from expected (%s), extra (%s)"
89                          % (str(attrs_expected.difference(attrs_extra)), str(attrs_extra.difference(attrs_expected))))
90
91     def assertAttributesEqual(self, obj_orig, attrs_orig, obj_restored, attrs_rest):
92         self.assertNamesEqual(attrs_orig, attrs_rest)
93         # remove volatile attributes, they can't be equal
94         attrs_orig -= set(["uSNChanged", "dSCorePropagationData", "whenChanged"])
95         for attr in attrs_orig:
96             # convert original attr value to ldif
97             orig_val = obj_orig.get(attr)
98             if orig_val is None:
99                 continue
100             if not isinstance(orig_val, MessageElement):
101                 orig_val = MessageElement(str(orig_val), 0, attr)
102             m = Message()
103             m.add(orig_val)
104             orig_ldif = self.samdb.write_ldif(m, 0)
105             # convert restored attr value to ldif
106             rest_val = obj_restored.get(attr)
107             self.assertFalse(rest_val is None)
108             m = Message()
109             if not isinstance(rest_val, MessageElement):
110                 rest_val = MessageElement(str(rest_val), 0, attr)
111             m.add(rest_val)
112             rest_ldif = self.samdb.write_ldif(m, 0)
113             # compare generated ldif's
114             self.assertEqual(orig_ldif, rest_ldif)
115
116     def assertAttributesExists(self, attr_expected, obj_msg):
117         """Check object contains at least expected attrbigutes
118         :param attr_expected: dict of expected attributes with values. ** is any value
119         :param obj_msg: Ldb.Message for the object under test
120         """
121         actual_names = set(obj_msg.keys())
122         # Samba does not use 'dSCorePropagationData', so skip it
123         actual_names -= set(['dSCorePropagationData'])
124         expected_names = set(attr_expected.keys())
125         self.assertNamesEqual(expected_names, actual_names)
126         for name in attr_expected.keys():
127             expected_val = attr_expected[name]
128             actual_val = obj_msg.get(name)
129             self.assertFalse(actual_val is None, "No value for attribute '%s'" % name)
130             if expected_val == "**":
131                 # "**" values means "any"
132                 continue
133             # if expected_val is e.g. ldb.bytes we can't depend on
134             # str(actual_value) working, we may just get a decoding
135             # error. Better to just compare raw values
136             if not isinstance(expected_val, str):
137                 actual_val = actual_val[0]
138             else:
139                 actual_val = str(actual_val)
140             self.assertEqual(expected_val, actual_val,
141                              "Unexpected value (%s) for '%s', expected (%s)" % (
142                              repr(actual_val), name, repr(expected_val)))
143
144     def _check_metadata(self, metadata, expected):
145         repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, metadata[0])
146
147         repl_array = []
148         for o in repl.ctr.array:
149             repl_array.append((o.attid, o.version))
150         repl_set = set(repl_array)
151
152         expected_set = set(expected)
153         self.assertEqual(len(repl_set), len(expected),
154                          "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % (
155                          str(expected_set.difference(repl_set)),
156                          str(repl_set.difference(expected_set)),
157                          ndr_print(repl)))
158
159         i = 0
160         for o in repl.ctr.array:
161             e = expected[i]
162             (attid, version) = e
163             self.assertEquals(attid, o.attid,
164                               "(LDAP) Wrong attid "
165                               "for expected value %d, wanted 0x%08x got 0x%08x, "
166                               "repl: \n%s"
167                               % (i, attid, o.attid, ndr_print(repl)))
168             # Allow version to be skipped when it does not matter
169             if version is not None:
170                 self.assertEquals(o.version, version,
171                                   "(LDAP) Wrong version for expected value %d, "
172                                   "attid 0x%08x, "
173                                   "wanted %d got %d, repl: \n%s"
174                                   % (i, o.attid,
175                                      version, o.version, ndr_print(repl)))
176             i = i + 1
177
178     @staticmethod
179     def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None):
180         """Restores a deleted object
181         :param samdb: SamDB connection to SAM
182         :param del_dn: str Deleted object DN
183         :param new_dn: str Where to restore the object
184         :param new_attrs: dict Additional attributes to set
185         """
186         msg = Message()
187         msg.dn = Dn(samdb, str(del_dn))
188         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
189         msg["distinguishedName"] = MessageElement([str(new_dn)], FLAG_MOD_REPLACE, "distinguishedName")
190         if new_attrs is not None:
191             assert isinstance(new_attrs, dict)
192             for attr in new_attrs:
193                 msg[attr] = MessageElement(new_attrs[attr], FLAG_MOD_REPLACE, attr)
194         samdb.modify(msg, ["show_deleted:1"])
195
196
197 class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase):
198     def setUp(self):
199         super(BaseRestoreObjectTestCase, self).setUp()
200
201     def enable_recycle_bin(self):
202         msg = Message()
203         msg.dn = Dn(self.samdb, "")
204         msg["enableOptionalFeature"] = MessageElement(
205             "CN=Partitions," + self.configuration_dn + ":766ddcd8-acd0-445e-f3b9-a7f9b6744f2a",
206             FLAG_MOD_ADD, "enableOptionalFeature")
207         try:
208             self.samdb.modify(msg)
209         except LdbError as e:
210             (num, _) = e.args
211             self.assertEquals(num, ERR_ATTRIBUTE_OR_VALUE_EXISTS)
212
213     def test_undelete(self):
214         print("Testing standard undelete operation")
215         usr1 = "cn=testuser,cn=users," + self.base_dn
216         samba.tests.delete_force(self.samdb, usr1)
217         self.samdb.add({
218             "dn": usr1,
219             "objectclass": "user",
220             "description": "test user description",
221             "samaccountname": "testuser"})
222         objLive1 = self.search_dn(usr1)
223         guid1 = objLive1["objectGUID"][0]
224         self.samdb.delete(usr1)
225         objDeleted1 = self.search_guid(guid1)
226         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
227         objLive2 = self.search_dn(usr1)
228         self.assertEqual(str(objLive2.dn).lower(), str(objLive1.dn).lower())
229         samba.tests.delete_force(self.samdb, usr1)
230
231     def test_rename(self):
232         print("Testing attempt to rename deleted object")
233         usr1 = "cn=testuser,cn=users," + self.base_dn
234         self.samdb.add({
235             "dn": usr1,
236             "objectclass": "user",
237             "description": "test user description",
238             "samaccountname": "testuser"})
239         objLive1 = self.search_dn(usr1)
240         guid1 = objLive1["objectGUID"][0]
241         self.samdb.delete(usr1)
242         objDeleted1 = self.search_guid(guid1)
243         # just to make sure we get the correct error if the show deleted is missing
244         try:
245             self.samdb.rename(str(objDeleted1.dn), usr1)
246             self.fail()
247         except LdbError as e1:
248             (num, _) = e1.args
249             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
250
251         try:
252             self.samdb.rename(str(objDeleted1.dn), usr1, ["show_deleted:1"])
253             self.fail()
254         except LdbError as e2:
255             (num, _) = e2.args
256             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
257
258     def test_undelete_with_mod(self):
259         print("Testing standard undelete operation with modification of additional attributes")
260         usr1 = "cn=testuser,cn=users," + self.base_dn
261         self.samdb.add({
262             "dn": usr1,
263             "objectclass": "user",
264             "description": "test user description",
265             "samaccountname": "testuser"})
266         objLive1 = self.search_dn(usr1)
267         guid1 = objLive1["objectGUID"][0]
268         self.samdb.delete(usr1)
269         objDeleted1 = self.search_guid(guid1)
270         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1, {"url": "www.samba.org"})
271         objLive2 = self.search_dn(usr1)
272         self.assertEqual(str(objLive2["url"][0]), "www.samba.org")
273         samba.tests.delete_force(self.samdb, usr1)
274
275     def test_undelete_newuser(self):
276         print("Testing undelete user with a different dn")
277         usr1 = "cn=testuser,cn=users," + self.base_dn
278         usr2 = "cn=testuser2,cn=users," + self.base_dn
279         samba.tests.delete_force(self.samdb, usr1)
280         self.samdb.add({
281             "dn": usr1,
282             "objectclass": "user",
283             "description": "test user description",
284             "samaccountname": "testuser"})
285         objLive1 = self.search_dn(usr1)
286         guid1 = objLive1["objectGUID"][0]
287         self.samdb.delete(usr1)
288         objDeleted1 = self.search_guid(guid1)
289         self.restore_deleted_object(self.samdb, objDeleted1.dn, usr2)
290         objLive2 = self.search_dn(usr2)
291         samba.tests.delete_force(self.samdb, usr1)
292         samba.tests.delete_force(self.samdb, usr2)
293
294     def test_undelete_existing(self):
295         print("Testing undelete user after a user with the same dn has been created")
296         usr1 = "cn=testuser,cn=users," + self.base_dn
297         self.samdb.add({
298             "dn": usr1,
299             "objectclass": "user",
300             "description": "test user description",
301             "samaccountname": "testuser"})
302         objLive1 = self.search_dn(usr1)
303         guid1 = objLive1["objectGUID"][0]
304         self.samdb.delete(usr1)
305         self.samdb.add({
306             "dn": usr1,
307             "objectclass": "user",
308             "description": "test user description",
309             "samaccountname": "testuser"})
310         objDeleted1 = self.search_guid(guid1)
311         try:
312             self.restore_deleted_object(self.samdb, objDeleted1.dn, usr1)
313             self.fail()
314         except LdbError as e3:
315             (num, _) = e3.args
316             self.assertEquals(num, ERR_ENTRY_ALREADY_EXISTS)
317
318     def test_undelete_cross_nc(self):
319         print("Cross NC undelete")
320         c1 = "cn=ldaptestcontainer," + self.base_dn
321         c2 = "cn=ldaptestcontainer2," + self.configuration_dn
322         c3 = "cn=ldaptestcontainer," + self.configuration_dn
323         c4 = "cn=ldaptestcontainer2," + self.base_dn
324         samba.tests.delete_force(self.samdb, c1)
325         samba.tests.delete_force(self.samdb, c2)
326         samba.tests.delete_force(self.samdb, c3)
327         samba.tests.delete_force(self.samdb, c4)
328         self.samdb.add({
329             "dn": c1,
330             "objectclass": "container"})
331         self.samdb.add({
332             "dn": c2,
333             "objectclass": "container"})
334         objLive1 = self.search_dn(c1)
335         objLive2 = self.search_dn(c2)
336         guid1 = objLive1["objectGUID"][0]
337         guid2 = objLive2["objectGUID"][0]
338         self.samdb.delete(c1)
339         self.samdb.delete(c2)
340         objDeleted1 = self.search_guid(guid1)
341         objDeleted2 = self.search_guid(guid2)
342         # try to undelete from base dn to config
343         try:
344             self.restore_deleted_object(self.samdb, objDeleted1.dn, c3)
345             self.fail()
346         except LdbError as e4:
347             (num, _) = e4.args
348             self.assertEquals(num, ERR_OPERATIONS_ERROR)
349         # try to undelete from config to base dn
350         try:
351             self.restore_deleted_object(self.samdb, objDeleted2.dn, c4)
352             self.fail()
353         except LdbError as e5:
354             (num, _) = e5.args
355             self.assertEquals(num, ERR_OPERATIONS_ERROR)
356         # assert undeletion will work in same nc
357         self.restore_deleted_object(self.samdb, objDeleted1.dn, c4)
358         self.restore_deleted_object(self.samdb, objDeleted2.dn, c3)
359
360
361 class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase):
362     """Test cases for delete/reanimate user objects"""
363
364     def _expected_user_add_attributes(self, username, user_dn, category):
365         return {'dn': user_dn,
366                 'objectClass': '**',
367                 'cn': username,
368                 'distinguishedName': user_dn,
369                 'instanceType': '4',
370                 'whenCreated': '**',
371                 'whenChanged': '**',
372                 'uSNCreated': '**',
373                 'uSNChanged': '**',
374                 'name': username,
375                 'objectGUID': '**',
376                 'userAccountControl': '546',
377                 'badPwdCount': '0',
378                 'badPasswordTime': '0',
379                 'codePage': '0',
380                 'countryCode': '0',
381                 'lastLogon': '0',
382                 'lastLogoff': '0',
383                 'pwdLastSet': '0',
384                 'primaryGroupID': '513',
385                 'objectSid': '**',
386                 'accountExpires': '9223372036854775807',
387                 'logonCount': '0',
388                 'sAMAccountName': username,
389                 'sAMAccountType': '805306368',
390                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
391                 }
392
393     def _expected_user_add_metadata(self):
394         return [
395             (DRSUAPI_ATTID_objectClass, 1),
396             (DRSUAPI_ATTID_cn, 1),
397             (DRSUAPI_ATTID_instanceType, 1),
398             (DRSUAPI_ATTID_whenCreated, 1),
399             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
400             (DRSUAPI_ATTID_name, 1),
401             (DRSUAPI_ATTID_userAccountControl, None),
402             (DRSUAPI_ATTID_codePage, 1),
403             (DRSUAPI_ATTID_countryCode, 1),
404             (DRSUAPI_ATTID_dBCSPwd, 1),
405             (DRSUAPI_ATTID_logonHours, 1),
406             (DRSUAPI_ATTID_unicodePwd, 1),
407             (DRSUAPI_ATTID_ntPwdHistory, 1),
408             (DRSUAPI_ATTID_pwdLastSet, 1),
409             (DRSUAPI_ATTID_primaryGroupID, 1),
410             (DRSUAPI_ATTID_objectSid, 1),
411             (DRSUAPI_ATTID_accountExpires, 1),
412             (DRSUAPI_ATTID_lmPwdHistory, 1),
413             (DRSUAPI_ATTID_sAMAccountName, 1),
414             (DRSUAPI_ATTID_sAMAccountType, 1),
415             (DRSUAPI_ATTID_objectCategory, 1)]
416
417     def _expected_user_del_attributes(self, username, _guid, _sid):
418         guid = ndr_unpack(misc.GUID, _guid)
419         dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
420         cn = "%s\nDEL:%s" % (username, guid)
421         return {'dn': dn,
422                 'objectClass': '**',
423                 'cn': cn,
424                 'distinguishedName': dn,
425                 'isDeleted': 'TRUE',
426                 'isRecycled': 'TRUE',
427                 'instanceType': '4',
428                 'whenCreated': '**',
429                 'whenChanged': '**',
430                 'uSNCreated': '**',
431                 'uSNChanged': '**',
432                 'name': cn,
433                 'objectGUID': _guid,
434                 'userAccountControl': '546',
435                 'objectSid': _sid,
436                 'sAMAccountName': username,
437                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
438                 }
439
440     def _expected_user_del_metadata(self):
441         return [
442             (DRSUAPI_ATTID_objectClass, 1),
443             (DRSUAPI_ATTID_cn, 2),
444             (DRSUAPI_ATTID_instanceType, 1),
445             (DRSUAPI_ATTID_whenCreated, 1),
446             (DRSUAPI_ATTID_isDeleted, 1),
447             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
448             (DRSUAPI_ATTID_name, 2),
449             (DRSUAPI_ATTID_userAccountControl, None),
450             (DRSUAPI_ATTID_codePage, 2),
451             (DRSUAPI_ATTID_countryCode, 2),
452             (DRSUAPI_ATTID_dBCSPwd, 1),
453             (DRSUAPI_ATTID_logonHours, 1),
454             (DRSUAPI_ATTID_unicodePwd, 1),
455             (DRSUAPI_ATTID_ntPwdHistory, 1),
456             (DRSUAPI_ATTID_pwdLastSet, 2),
457             (DRSUAPI_ATTID_primaryGroupID, 2),
458             (DRSUAPI_ATTID_objectSid, 1),
459             (DRSUAPI_ATTID_accountExpires, 2),
460             (DRSUAPI_ATTID_lmPwdHistory, 1),
461             (DRSUAPI_ATTID_sAMAccountName, 1),
462             (DRSUAPI_ATTID_sAMAccountType, 2),
463             (DRSUAPI_ATTID_lastKnownParent, 1),
464             (DRSUAPI_ATTID_objectCategory, 2),
465             (DRSUAPI_ATTID_isRecycled, 1)]
466
467     def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category):
468         return {'dn': user_dn,
469                 'objectClass': '**',
470                 'cn': username,
471                 'distinguishedName': user_dn,
472                 'instanceType': '4',
473                 'whenCreated': '**',
474                 'whenChanged': '**',
475                 'uSNCreated': '**',
476                 'uSNChanged': '**',
477                 'name': username,
478                 'objectGUID': guid,
479                 'userAccountControl': '546',
480                 'badPwdCount': '0',
481                 'badPasswordTime': '0',
482                 'codePage': '0',
483                 'countryCode': '0',
484                 'lastLogon': '0',
485                 'lastLogoff': '0',
486                 'pwdLastSet': '0',
487                 'primaryGroupID': '513',
488                 'operatorCount': '0',
489                 'objectSid': sid,
490                 'adminCount': '0',
491                 'accountExpires': '0',
492                 'logonCount': '0',
493                 'sAMAccountName': username,
494                 'sAMAccountType': '805306368',
495                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
496                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
497                 }
498
499     def _expected_user_restore_metadata(self):
500         return [
501             (DRSUAPI_ATTID_objectClass, 1),
502             (DRSUAPI_ATTID_cn, 3),
503             (DRSUAPI_ATTID_instanceType, 1),
504             (DRSUAPI_ATTID_whenCreated, 1),
505             (DRSUAPI_ATTID_isDeleted, 2),
506             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
507             (DRSUAPI_ATTID_name, 3),
508             (DRSUAPI_ATTID_userAccountControl, None),
509             (DRSUAPI_ATTID_codePage, 3),
510             (DRSUAPI_ATTID_countryCode, 3),
511             (DRSUAPI_ATTID_dBCSPwd, 1),
512             (DRSUAPI_ATTID_logonHours, 1),
513             (DRSUAPI_ATTID_unicodePwd, 1),
514             (DRSUAPI_ATTID_ntPwdHistory, 1),
515             (DRSUAPI_ATTID_pwdLastSet, 3),
516             (DRSUAPI_ATTID_primaryGroupID, 3),
517             (DRSUAPI_ATTID_operatorCount, 1),
518             (DRSUAPI_ATTID_objectSid, 1),
519             (DRSUAPI_ATTID_adminCount, 1),
520             (DRSUAPI_ATTID_accountExpires, 3),
521             (DRSUAPI_ATTID_lmPwdHistory, 1),
522             (DRSUAPI_ATTID_sAMAccountName, 1),
523             (DRSUAPI_ATTID_sAMAccountType, 3),
524             (DRSUAPI_ATTID_lastKnownParent, 1),
525             (DRSUAPI_ATTID_objectCategory, 3),
526             (DRSUAPI_ATTID_isRecycled, 2)]
527
528     def test_restore_user(self):
529         print("Test restored user attributes")
530         username = "restore_user"
531         usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
532         samba.tests.delete_force(self.samdb, usr_dn)
533         self.samdb.add({
534             "dn": usr_dn,
535             "objectClass": "user",
536             "sAMAccountName": username})
537         obj = self.search_dn(usr_dn)
538         guid = obj["objectGUID"][0]
539         sid = obj["objectSID"][0]
540         obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
541         self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj)
542         self._check_metadata(obj_rmd["replPropertyMetaData"],
543                              self._expected_user_add_metadata())
544         self.samdb.delete(usr_dn)
545         obj_del = self.search_guid(guid)
546         obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
547         orig_attrs = set(obj.keys())
548         del_attrs = set(obj_del.keys())
549         self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del)
550         self._check_metadata(obj_del_rmd["replPropertyMetaData"],
551                              self._expected_user_del_metadata())
552         # restore the user and fetch what's restored
553         self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn)
554         obj_restore = self.search_guid(guid)
555         obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
556         # check original attributes and restored one are same
557         orig_attrs = set(obj.keys())
558         # windows restore more attributes that originally we have
559         orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
560         rest_attrs = set(obj_restore.keys())
561         self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
562         self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
563                              self._expected_user_restore_metadata())
564
565
566 class RestoreUserPwdObjectTestCase(RestoredObjectAttributesBaseTestCase):
567     """Test cases for delete/reanimate user objects with password"""
568
569     def _expected_userpw_add_attributes(self, username, user_dn, category):
570         return {'dn': user_dn,
571                 'objectClass': '**',
572                 'cn': username,
573                 'distinguishedName': user_dn,
574                 'instanceType': '4',
575                 'whenCreated': '**',
576                 'whenChanged': '**',
577                 'uSNCreated': '**',
578                 'uSNChanged': '**',
579                 'name': username,
580                 'objectGUID': '**',
581                 'userAccountControl': '546',
582                 'badPwdCount': '0',
583                 'badPasswordTime': '0',
584                 'codePage': '0',
585                 'countryCode': '0',
586                 'lastLogon': '0',
587                 'lastLogoff': '0',
588                 'pwdLastSet': '**',
589                 'primaryGroupID': '513',
590                 'objectSid': '**',
591                 'accountExpires': '9223372036854775807',
592                 'logonCount': '0',
593                 'sAMAccountName': username,
594                 'sAMAccountType': '805306368',
595                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
596                 }
597
598     def _expected_userpw_add_metadata(self):
599         return [
600             (DRSUAPI_ATTID_objectClass, 1),
601             (DRSUAPI_ATTID_cn, 1),
602             (DRSUAPI_ATTID_instanceType, 1),
603             (DRSUAPI_ATTID_whenCreated, 1),
604             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
605             (DRSUAPI_ATTID_name, 1),
606             (DRSUAPI_ATTID_userAccountControl, None),
607             (DRSUAPI_ATTID_codePage, 1),
608             (DRSUAPI_ATTID_countryCode, 1),
609             (DRSUAPI_ATTID_dBCSPwd, 1),
610             (DRSUAPI_ATTID_logonHours, 1),
611             (DRSUAPI_ATTID_unicodePwd, 1),
612             (DRSUAPI_ATTID_ntPwdHistory, 1),
613             (DRSUAPI_ATTID_pwdLastSet, 1),
614             (DRSUAPI_ATTID_primaryGroupID, 1),
615             (DRSUAPI_ATTID_supplementalCredentials, 1),
616             (DRSUAPI_ATTID_objectSid, 1),
617             (DRSUAPI_ATTID_accountExpires, 1),
618             (DRSUAPI_ATTID_lmPwdHistory, 1),
619             (DRSUAPI_ATTID_sAMAccountName, 1),
620             (DRSUAPI_ATTID_sAMAccountType, 1),
621             (DRSUAPI_ATTID_objectCategory, 1)]
622
623     def _expected_userpw_del_attributes(self, username, _guid, _sid):
624         guid = ndr_unpack(misc.GUID, _guid)
625         dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn)
626         cn = "%s\nDEL:%s" % (username, guid)
627         return {'dn': dn,
628                 'objectClass': '**',
629                 'cn': cn,
630                 'distinguishedName': dn,
631                 'isDeleted': 'TRUE',
632                 'isRecycled': 'TRUE',
633                 'instanceType': '4',
634                 'whenCreated': '**',
635                 'whenChanged': '**',
636                 'uSNCreated': '**',
637                 'uSNChanged': '**',
638                 'name': cn,
639                 'objectGUID': _guid,
640                 'userAccountControl': '546',
641                 'objectSid': _sid,
642                 'sAMAccountName': username,
643                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
644                 }
645
646     def _expected_userpw_del_metadata(self):
647         return [
648             (DRSUAPI_ATTID_objectClass, 1),
649             (DRSUAPI_ATTID_cn, 2),
650             (DRSUAPI_ATTID_instanceType, 1),
651             (DRSUAPI_ATTID_whenCreated, 1),
652             (DRSUAPI_ATTID_isDeleted, 1),
653             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
654             (DRSUAPI_ATTID_name, 2),
655             (DRSUAPI_ATTID_userAccountControl, None),
656             (DRSUAPI_ATTID_codePage, 2),
657             (DRSUAPI_ATTID_countryCode, 2),
658             (DRSUAPI_ATTID_dBCSPwd, 1),
659             (DRSUAPI_ATTID_logonHours, 1),
660             (DRSUAPI_ATTID_unicodePwd, 2),
661             (DRSUAPI_ATTID_ntPwdHistory, 2),
662             (DRSUAPI_ATTID_pwdLastSet, 2),
663             (DRSUAPI_ATTID_primaryGroupID, 2),
664             (DRSUAPI_ATTID_supplementalCredentials, 2),
665             (DRSUAPI_ATTID_objectSid, 1),
666             (DRSUAPI_ATTID_accountExpires, 2),
667             (DRSUAPI_ATTID_lmPwdHistory, 2),
668             (DRSUAPI_ATTID_sAMAccountName, 1),
669             (DRSUAPI_ATTID_sAMAccountType, 2),
670             (DRSUAPI_ATTID_lastKnownParent, 1),
671             (DRSUAPI_ATTID_objectCategory, 2),
672             (DRSUAPI_ATTID_isRecycled, 1)]
673
674     def _expected_userpw_restore_attributes(self, username, guid, sid, user_dn, category):
675         return {'dn': user_dn,
676                 'objectClass': '**',
677                 'cn': username,
678                 'distinguishedName': user_dn,
679                 'instanceType': '4',
680                 'whenCreated': '**',
681                 'whenChanged': '**',
682                 'uSNCreated': '**',
683                 'uSNChanged': '**',
684                 'name': username,
685                 'objectGUID': guid,
686                 'userAccountControl': '546',
687                 'badPwdCount': '0',
688                 'badPasswordTime': '0',
689                 'codePage': '0',
690                 'countryCode': '0',
691                 'lastLogon': '0',
692                 'lastLogoff': '0',
693                 'pwdLastSet': '**',
694                 'primaryGroupID': '513',
695                 'operatorCount': '0',
696                 'objectSid': sid,
697                 'adminCount': '0',
698                 'accountExpires': '0',
699                 'logonCount': '0',
700                 'sAMAccountName': username,
701                 'sAMAccountType': '805306368',
702                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
703                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn)
704                 }
705
706     def _expected_userpw_restore_metadata(self):
707         return [
708             (DRSUAPI_ATTID_objectClass, 1),
709             (DRSUAPI_ATTID_cn, 3),
710             (DRSUAPI_ATTID_instanceType, 1),
711             (DRSUAPI_ATTID_whenCreated, 1),
712             (DRSUAPI_ATTID_isDeleted, 2),
713             (DRSUAPI_ATTID_ntSecurityDescriptor, 1),
714             (DRSUAPI_ATTID_name, 3),
715             (DRSUAPI_ATTID_userAccountControl, None),
716             (DRSUAPI_ATTID_codePage, 3),
717             (DRSUAPI_ATTID_countryCode, 3),
718             (DRSUAPI_ATTID_dBCSPwd, 2),
719             (DRSUAPI_ATTID_logonHours, 1),
720             (DRSUAPI_ATTID_unicodePwd, 3),
721             (DRSUAPI_ATTID_ntPwdHistory, 3),
722             (DRSUAPI_ATTID_pwdLastSet, 4),
723             (DRSUAPI_ATTID_primaryGroupID, 3),
724             (DRSUAPI_ATTID_supplementalCredentials, 3),
725             (DRSUAPI_ATTID_operatorCount, 1),
726             (DRSUAPI_ATTID_objectSid, 1),
727             (DRSUAPI_ATTID_adminCount, 1),
728             (DRSUAPI_ATTID_accountExpires, 3),
729             (DRSUAPI_ATTID_lmPwdHistory, 3),
730             (DRSUAPI_ATTID_sAMAccountName, 1),
731             (DRSUAPI_ATTID_sAMAccountType, 3),
732             (DRSUAPI_ATTID_lastKnownParent, 1),
733             (DRSUAPI_ATTID_objectCategory, 3),
734             (DRSUAPI_ATTID_isRecycled, 2)]
735
736     def test_restorepw_user(self):
737         print("Test restored user attributes")
738         username = "restorepw_user"
739         usr_dn = "CN=%s,CN=Users,%s" % (username, self.base_dn)
740         samba.tests.delete_force(self.samdb, usr_dn)
741         self.samdb.add({
742             "dn": usr_dn,
743             "objectClass": "user",
744             "userPassword": "thatsAcomplPASS0",
745             "sAMAccountName": username})
746         obj = self.search_dn(usr_dn)
747         guid = obj["objectGUID"][0]
748         sid = obj["objectSID"][0]
749         obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
750         self.assertAttributesExists(self._expected_userpw_add_attributes(username, usr_dn, "Person"), obj)
751         self._check_metadata(obj_rmd["replPropertyMetaData"],
752                              self._expected_userpw_add_metadata())
753         self.samdb.delete(usr_dn)
754         obj_del = self.search_guid(guid)
755         obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
756         orig_attrs = set(obj.keys())
757         del_attrs = set(obj_del.keys())
758         self.assertAttributesExists(self._expected_userpw_del_attributes(username, guid, sid), obj_del)
759         self._check_metadata(obj_del_rmd["replPropertyMetaData"],
760                              self._expected_userpw_del_metadata())
761         # restore the user and fetch what's restored
762         self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn, {"userPassword": ["thatsAcomplPASS1"]})
763         obj_restore = self.search_guid(guid)
764         obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"])
765         # check original attributes and restored one are same
766         orig_attrs = set(obj.keys())
767         # windows restore more attributes that originally we have
768         orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent'])
769         rest_attrs = set(obj_restore.keys())
770         self.assertAttributesExists(self._expected_userpw_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore)
771         self._check_metadata(obj_restore_rmd["replPropertyMetaData"],
772                              self._expected_userpw_restore_metadata())
773
774
775 class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase):
776     """Test different scenarios for delete/reanimate group objects"""
777
778     def _make_object_dn(self, name):
779         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
780
781     def _create_test_user(self, user_name):
782         user_dn = self._make_object_dn(user_name)
783         ldif = {
784             "dn": user_dn,
785             "objectClass": "user",
786             "sAMAccountName": user_name,
787         }
788         # delete an object if leftover from previous test
789         samba.tests.delete_force(self.samdb, user_dn)
790         # finally, create the group
791         self.samdb.add(ldif)
792         return self.search_dn(user_dn)
793
794     def _create_test_group(self, group_name, members=None):
795         group_dn = self._make_object_dn(group_name)
796         ldif = {
797             "dn": group_dn,
798             "objectClass": "group",
799             "sAMAccountName": group_name,
800         }
801         try:
802             ldif["member"] = [str(usr_dn) for usr_dn in members]
803         except TypeError:
804             pass
805         # delete an object if leftover from previous test
806         samba.tests.delete_force(self.samdb, group_dn)
807         # finally, create the group
808         self.samdb.add(ldif)
809         return self.search_dn(group_dn)
810
811     def _expected_group_attributes(self, groupname, group_dn, category):
812         return {'dn': group_dn,
813                 'groupType': '-2147483646',
814                 'distinguishedName': group_dn,
815                 'sAMAccountName': groupname,
816                 'name': groupname,
817                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
818                 'objectClass': '**',
819                 'objectGUID': '**',
820                 'lastKnownParent': 'CN=Users,%s' % self.base_dn,
821                 'whenChanged': '**',
822                 'sAMAccountType': '268435456',
823                 'objectSid': '**',
824                 'whenCreated': '**',
825                 'uSNCreated': '**',
826                 'operatorCount': '0',
827                 'uSNChanged': '**',
828                 'instanceType': '4',
829                 'adminCount': '0',
830                 'cn': groupname}
831
832     def test_plain_group(self):
833         print("Test restored Group attributes")
834         # create test group
835         obj = self._create_test_group("r_group")
836         guid = obj["objectGUID"][0]
837         # delete the group
838         self.samdb.delete(str(obj.dn))
839         obj_del = self.search_guid(guid)
840         # restore the Group and fetch what's restored
841         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
842         obj_restore = self.search_guid(guid)
843         # check original attributes and restored one are same
844         attr_orig = set(obj.keys())
845         # windows restore more attributes that originally we have
846         attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
847         attr_rest = set(obj_restore.keys())
848         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
849         self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
850
851     def test_group_with_members(self):
852         print("Test restored Group with members attributes")
853         # create test group
854         usr1 = self._create_test_user("r_user_1")
855         usr2 = self._create_test_user("r_user_2")
856         obj = self._create_test_group("r_group", [usr1.dn, usr2.dn])
857         guid = obj["objectGUID"][0]
858         # delete the group
859         self.samdb.delete(str(obj.dn))
860         obj_del = self.search_guid(guid)
861         # restore the Group and fetch what's restored
862         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
863         obj_restore = self.search_guid(guid)
864         # check original attributes and restored one are same
865         attr_orig = set(obj.keys())
866         # windows restore more attributes that originally we have
867         attr_orig.update(['adminCount', 'operatorCount', 'lastKnownParent'])
868         # and does not restore following attributes
869         attr_orig.remove("member")
870         attr_rest = set(obj_restore.keys())
871         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
872         self.assertAttributesExists(self._expected_group_attributes("r_group", str(obj.dn), "Group"), obj_restore)
873
874
875 class RestoreContainerObjectTestCase(RestoredObjectAttributesBaseTestCase):
876     """Test different scenarios for delete/reanimate OU/container objects"""
877
878     def _expected_container_attributes(self, rdn, name, dn, category):
879         if rdn == 'OU':
880             lastKnownParent = '%s' % self.base_dn
881         else:
882             lastKnownParent = 'CN=Users,%s' % self.base_dn
883         return {'dn': dn,
884                 'distinguishedName': dn,
885                 'name': name,
886                 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn),
887                 'objectClass': '**',
888                 'objectGUID': '**',
889                 'lastKnownParent': lastKnownParent,
890                 'whenChanged': '**',
891                 'whenCreated': '**',
892                 'uSNCreated': '**',
893                 'uSNChanged': '**',
894                 'instanceType': '4',
895                 rdn.lower(): name}
896
897     def _create_test_ou(self, rdn, name=None, description=None):
898         ou_dn = "OU=%s,%s" % (rdn, self.base_dn)
899         # delete an object if leftover from previous test
900         samba.tests.delete_force(self.samdb, ou_dn)
901         # create ou and return created object
902         self.samdb.create_ou(ou_dn, name=name, description=description)
903         return self.search_dn(ou_dn)
904
905     def test_ou_with_name_description(self):
906         print("Test OU reanimation")
907         # create OU to test with
908         obj = self._create_test_ou(rdn="r_ou",
909                                    name="r_ou name",
910                                    description="r_ou description")
911         guid = obj["objectGUID"][0]
912         # delete the object
913         self.samdb.delete(str(obj.dn))
914         obj_del = self.search_guid(guid)
915         # restore the Object and fetch what's restored
916         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
917         obj_restore = self.search_guid(guid)
918         # check original attributes and restored one are same
919         attr_orig = set(obj.keys())
920         attr_rest = set(obj_restore.keys())
921         # windows restore more attributes that originally we have
922         attr_orig.update(["lastKnownParent"])
923         # and does not restore following attributes
924         attr_orig -= set(["description"])
925         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
926         expected_attrs = self._expected_container_attributes("OU", "r_ou", str(obj.dn), "Organizational-Unit")
927         self.assertAttributesExists(expected_attrs, obj_restore)
928
929     def test_container(self):
930         print("Test Container reanimation")
931         # create test Container
932         obj = self._create_object({
933             "dn": "CN=r_container,CN=Users,%s" % self.base_dn,
934             "objectClass": "container"
935         })
936         guid = obj["objectGUID"][0]
937         # delete the object
938         self.samdb.delete(str(obj.dn))
939         obj_del = self.search_guid(guid)
940         # restore the Object and fetch what's restored
941         self.restore_deleted_object(self.samdb, obj_del.dn, obj.dn)
942         obj_restore = self.search_guid(guid)
943         # check original attributes and restored one are same
944         attr_orig = set(obj.keys())
945         attr_rest = set(obj_restore.keys())
946         # windows restore more attributes that originally we have
947         attr_orig.update(["lastKnownParent"])
948         # and does not restore following attributes
949         attr_orig -= set(["showInAdvancedViewOnly"])
950         self.assertAttributesEqual(obj, attr_orig, obj_restore, attr_rest)
951         expected_attrs = self._expected_container_attributes("CN", "r_container",
952                                                              str(obj.dn), "Container")
953         self.assertAttributesExists(expected_attrs, obj_restore)
954
955
956 if __name__ == '__main__':
957     unittest.main()