tests samr: Extra tests for samr_EnumDomainUserss
authorGary Lockyer <gary@catalyst.net.nz>
Thu, 18 Oct 2018 00:53:55 +0000 (13:53 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Tue, 20 Nov 2018 21:14:17 +0000 (22:14 +0100)
Add extra tests to test the content returned by samr_EnumDomainUsers,
and tests for the result caching added in the following commit.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/dcerpc/sam.py
selftest/knownfail.d/samr [new file with mode: 0644]

index 11ed9b9287ff560688e0c95cd1ad048c5e7bfae4..ab710861383e241ecb8ca583c21a1041da87dbc6 100644 (file)
@@ -604,3 +604,147 @@ class SamrTests(RpcInterfaceTestCase):
         check_results(expected, actual)
 
         self.delete_dns(dns)
+
+    def test_EnumDomainUsers(self):
+        def check_results(expected, actual):
+            for (e, a) in zip(expected, actual):
+                self.assertTrue(isinstance(a, samr.SamEntry))
+                self.assertEquals(
+                    str(e["sAMAccountName"]), str(a.name.string))
+
+        # Create four users
+        # to ensure that we have the minimum needed for the tests.
+        dns = self.create_users([1, 2, 3, 4])
+
+        #
+        # Get the expected results by querying the samdb database directly.
+        # We do this rather than use a list of expected results as this runs
+        # with other tests so we do not have a known fixed list of elements
+        select = "(objectClass=user)"
+        attributes = ["sAMAccountName", "objectSID", "userAccountConrol"]
+        unfiltered = self.samdb.search(expression=select, attrs=attributes)
+        filtered = self.filter_domain(unfiltered)
+        self.assertTrue(len(filtered) > 4)
+
+        # Sort the expected results by rid
+        expected = sorted(list(filtered), key=rid)
+
+        #
+        # Perform EnumDomainUsers with max_size greater than required for the
+        # expected number of results. We should get all the results.
+        #
+        max_size = calc_max_size(len(expected) + 10)
+        (resume_handle, actual, num_entries) = self.conn.EnumDomainUsers(
+            self.domain_handle, 0, 0, max_size)
+        self.assertEquals(len(expected), num_entries)
+        check_results(expected, actual.entries)
+
+        #
+        # Perform EnumDomainUsers with size set to so that it contains
+        # 4 entries.
+        max_size = calc_max_size(4)
+        (resume_handle, actual, num_entries) = self.conn.EnumDomainUsers(
+            self.domain_handle, 0, 0, max_size)
+        self.assertEquals(4, num_entries)
+        check_results(expected[:4], actual.entries)
+
+        #
+        # Try calling with resume_handle greater than number of entries
+        # Should return no results and a resume handle of 0
+        rh = len(expected)
+        max_size = calc_max_size(1)
+        self.conn.Close(self.handle)
+        (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+            self.domain_handle, rh, 0, max_size)
+
+        self.assertEquals(0, num_entries)
+        self.assertEquals(0, resume_handle)
+
+        #
+        # Enumerate through the domain users one element at a time.
+        # We should get all the results.
+        #
+        actual = []
+        max_size = calc_max_size(1)
+        (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+            self.domain_handle, 0, 0, max_size)
+        while resume_handle:
+            self.assertEquals(1, num_entries)
+            actual.append(a.entries[0])
+            (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+                self.domain_handle, resume_handle, 0, max_size)
+        if num_entries:
+            actual.append(a.entries[0])
+
+        self.assertEquals(len(expected), len(actual))
+        check_results(expected, actual)
+
+        #
+        # Check that the cached results are being returned.
+        # Obtain a new resume_handle and insert new entries into the
+        # into the DB. As the entries were added after the results were cached
+        # they should not show up in the returned results.
+        #
+        actual = []
+        max_size = calc_max_size(1)
+        (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+            self.domain_handle, 0, 0, max_size)
+        extra_dns = self.create_users([1000, 1002, 1003, 1004])
+        while resume_handle:
+            self.assertEquals(1, num_entries)
+            actual.append(a.entries[0])
+            (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+                self.domain_handle, resume_handle, 0, max_size)
+        if num_entries:
+            actual.append(a.entries[0])
+
+        self.assertEquals(len(expected), len(actual))
+        check_results(expected, actual)
+
+        #
+        # Perform EnumDomainUsers, we should read the newly added groups
+        # As resume_handle is zero, the results will be read from disk.
+        #
+        max_size = calc_max_size(len(expected) + len(extra_dns) + 10)
+        (resume_handle, actual, num_entries) = self.conn.EnumDomainUsers(
+            self.domain_handle, 0, 0, max_size)
+        self.assertEquals(len(expected) + len(extra_dns), num_entries)
+
+        #
+        # Get a new expected result set by querying the database directly
+        unfiltered01 = self.samdb.search(expression=select, attrs=attributes)
+        filtered01 = self.filter_domain(unfiltered01)
+        self.assertTrue(len(filtered01) > len(expected))
+
+        # Sort the expected results by rid
+        expected01 = sorted(list(filtered01), key=rid)
+
+        #
+        # Now check that we read the new entries.
+        #
+        self.assertEquals(len(expected01), num_entries)
+        check_results(expected01, actual.entries)
+
+        #
+        # Check that deleted results are handled correctly.
+        # Obtain a new resume_handle and delete entries from the DB.
+        # We will not see the deleted entries in the result set, as details
+        # need to be read from disk. Only the object GUID's are cached.
+        #
+        actual = []
+        max_size = calc_max_size(1)
+        (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+            self.domain_handle, 0, 0, max_size)
+        self.delete_dns(extra_dns)
+        while resume_handle and num_entries:
+            self.assertEquals(1, num_entries)
+            actual.append(a.entries[0])
+            (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+                self.domain_handle, resume_handle, 0, max_size)
+        if num_entries:
+            actual.append(a.entries[0])
+
+        self.assertEquals(len(expected), len(actual))
+        check_results(expected, actual)
+
+        self.delete_dns(dns)
diff --git a/selftest/knownfail.d/samr b/selftest/knownfail.d/samr
new file mode 100644 (file)
index 0000000..4e95d3c
--- /dev/null
@@ -0,0 +1,2 @@
+^samba.tests.dcerpc.sam.samba.tests.dcerpc.sam.SamrTests.test_EnumDomainUsers
+^samba.tests.dcerpc.sam.python3.samba.tests.dcerpc.sam.SamrTests.test_EnumDomainUsers