check_results(expected, actual)
self.delete_dns(dns)
+
+ def test_EnumDomainUsers(self):
+ def check_results(expected, actual):
+ for (e, a) in zip(expected, actual):
+ self.assertTrue(isinstance(a, samr.SamEntry))
+ self.assertEquals(
+ str(e["sAMAccountName"]), str(a.name.string))
+
+ # Create four users
+ # to ensure that we have the minimum needed for the tests.
+ dns = self.create_users([1, 2, 3, 4])
+
+ #
+ # Get the expected results by querying the samdb database directly.
+ # We do this rather than use a list of expected results as this runs
+ # with other tests so we do not have a known fixed list of elements
+ select = "(objectClass=user)"
+ attributes = ["sAMAccountName", "objectSID", "userAccountConrol"]
+ unfiltered = self.samdb.search(expression=select, attrs=attributes)
+ filtered = self.filter_domain(unfiltered)
+ self.assertTrue(len(filtered) > 4)
+
+ # Sort the expected results by rid
+ expected = sorted(list(filtered), key=rid)
+
+ #
+ # Perform EnumDomainUsers with max_size greater than required for the
+ # expected number of results. We should get all the results.
+ #
+ max_size = calc_max_size(len(expected) + 10)
+ (resume_handle, actual, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ self.assertEquals(len(expected), num_entries)
+ check_results(expected, actual.entries)
+
+ #
+ # Perform EnumDomainUsers with size set to so that it contains
+ # 4 entries.
+ max_size = calc_max_size(4)
+ (resume_handle, actual, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ self.assertEquals(4, num_entries)
+ check_results(expected[:4], actual.entries)
+
+ #
+ # Try calling with resume_handle greater than number of entries
+ # Should return no results and a resume handle of 0
+ rh = len(expected)
+ max_size = calc_max_size(1)
+ self.conn.Close(self.handle)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, rh, 0, max_size)
+
+ self.assertEquals(0, num_entries)
+ self.assertEquals(0, resume_handle)
+
+ #
+ # Enumerate through the domain users one element at a time.
+ # We should get all the results.
+ #
+ actual = []
+ max_size = calc_max_size(1)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ while resume_handle:
+ self.assertEquals(1, num_entries)
+ actual.append(a.entries[0])
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, resume_handle, 0, max_size)
+ if num_entries:
+ actual.append(a.entries[0])
+
+ self.assertEquals(len(expected), len(actual))
+ check_results(expected, actual)
+
+ #
+ # Check that the cached results are being returned.
+ # Obtain a new resume_handle and insert new entries into the
+ # into the DB. As the entries were added after the results were cached
+ # they should not show up in the returned results.
+ #
+ actual = []
+ max_size = calc_max_size(1)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ extra_dns = self.create_users([1000, 1002, 1003, 1004])
+ while resume_handle:
+ self.assertEquals(1, num_entries)
+ actual.append(a.entries[0])
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, resume_handle, 0, max_size)
+ if num_entries:
+ actual.append(a.entries[0])
+
+ self.assertEquals(len(expected), len(actual))
+ check_results(expected, actual)
+
+ #
+ # Perform EnumDomainUsers, we should read the newly added groups
+ # As resume_handle is zero, the results will be read from disk.
+ #
+ max_size = calc_max_size(len(expected) + len(extra_dns) + 10)
+ (resume_handle, actual, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ self.assertEquals(len(expected) + len(extra_dns), num_entries)
+
+ #
+ # Get a new expected result set by querying the database directly
+ unfiltered01 = self.samdb.search(expression=select, attrs=attributes)
+ filtered01 = self.filter_domain(unfiltered01)
+ self.assertTrue(len(filtered01) > len(expected))
+
+ # Sort the expected results by rid
+ expected01 = sorted(list(filtered01), key=rid)
+
+ #
+ # Now check that we read the new entries.
+ #
+ self.assertEquals(len(expected01), num_entries)
+ check_results(expected01, actual.entries)
+
+ #
+ # Check that deleted results are handled correctly.
+ # Obtain a new resume_handle and delete entries from the DB.
+ # We will not see the deleted entries in the result set, as details
+ # need to be read from disk. Only the object GUID's are cached.
+ #
+ actual = []
+ max_size = calc_max_size(1)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ self.delete_dns(extra_dns)
+ while resume_handle and num_entries:
+ self.assertEquals(1, num_entries)
+ actual.append(a.entries[0])
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, resume_handle, 0, max_size)
+ if num_entries:
+ actual.append(a.entries[0])
+
+ self.assertEquals(len(expected), len(actual))
+ check_results(expected, actual)
+
+ self.delete_dns(dns)