class SambaToolDrsShowReplTests(drs_base.DrsBaseTestCase):
"""Blackbox test case for samba-tool drs."""
- def assertRegex(self, exp, s, flags=0):
- m = re.search(exp, s, flags=flags)
- if m is None:
- self.fail("%r did not match /%s/" % (s, exp))
- return m
-
def setUp(self):
super(SambaToolDrsShowReplTests, self).setUp()
self.assertEqual(_outbound, ' OUTBOUND NEIGHBORS ')
self.assertEqual(_conn, ' KCC CONNECTION OBJECTS ')
- self.assertRegex(r'^Default-First-Site-Name\\LOCALDC\s+'
- r"DSA Options: %s\s+"
- r"DSA object GUID: %s\s+"
- r"DSA invocationId: %s" %
- (HEX8_RE, GUID_RE, GUID_RE), header)
+ self.assertRegexpMatches(header,
+ r'^Default-First-Site-Name\\LOCALDC\s+'
+ r"DSA Options: %s\s+"
+ r"DSA object GUID: %s\s+"
+ r"DSA invocationId: %s" %
+ (HEX8_RE, GUID_RE, GUID_RE))
# We don't assert the DomainDnsZones and ForestDnsZones are
# there because we don't know that they have been set up yet.
for p in ['CN=Configuration,DC=samba,DC=example,DC=com',
'DC=samba,DC=example,DC=com',
'CN=Schema,CN=Configuration,DC=samba,DC=example,DC=com']:
- self.assertRegex(r'%s\n'
- r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
- r'\t\tDSA object GUID: %s\n'
- r'\t\tLast attempt @ [^\n]+\n'
- r'\t\t\d+ consecutive failure\(s\).\n'
- r'\t\tLast success @ [^\n]+\n'
- r'\n' % (p, GUID_RE), inbound)
-
- self.assertRegex(r'%s\n'
- r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
- r'\t\tDSA object GUID: %s\n'
- r'\t\tLast attempt @ [^\n]+\n'
- r'\t\t\d+ consecutive failure\(s\).\n'
- r'\t\tLast success @ [^\n]+\n'
- r'\n' % (p, GUID_RE), outbound)
-
- self.assertRegex(r'Connection --\n'
- r'\tConnection name: %s\n'
- r'\tEnabled : TRUE\n'
- r'\tServer DNS name : \w+.samba.example.com\n'
- r'\tServer DN name : %s'
- r'\n' % (GUID_RE, DN_RE), conn)
+ self.assertRegexpMatches(
+ inbound,
+ r'%s\n'
+ r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
+ r'\t\tDSA object GUID: %s\n'
+ r'\t\tLast attempt @ [^\n]+\n'
+ r'\t\t\d+ consecutive failure\(s\).\n'
+ r'\t\tLast success @ [^\n]+\n'
+ r'\n' % (p, GUID_RE),
+ msg="%s inbound missing" % p)
+
+ self.assertRegexpMatches(
+ outbound,
+ r'%s\n'
+ r'\tDefault-First-Site-Name\\[A-Z]+ via RPC\n'
+ r'\t\tDSA object GUID: %s\n'
+ r'\t\tLast attempt @ [^\n]+\n'
+ r'\t\t\d+ consecutive failure\(s\).\n'
+ r'\t\tLast success @ [^\n]+\n'
+ r'\n' % (p, GUID_RE),
+ msg="%s outbound missing" % p)
+
+ self.assertRegexpMatches(conn,
+ r'Connection --\n'
+ r'\tConnection name: %s\n'
+ r'\tEnabled : TRUE\n'
+ r'\tServer DNS name : \w+.samba.example.com\n'
+ r'\tServer DN name : %s'
+ r'\n' % (GUID_RE, DN_RE))
def test_samba_tool_showrepl_json(self):
"""Tests 'samba-tool drs showrepl --json' command.
# dsa
for k in ["objectGUID", "invocationId"]:
- self.assertRegex('^%s$' % GUID_RE, d['dsa'][k])
+ self.assertRegexpMatches(d['dsa'][k], '^%s$' % GUID_RE)
self.assertTrue(isinstance(d['dsa']["options"], int))
# repsfrom and repsto
for reps in (d['repsFrom'], d['repsTo']):
for r in reps:
for k in ('NC dn', "NTDS DN"):
- self.assertRegex('^%s$' % DN_RE, r[k])
+ self.assertRegexpMatches(r[k], '^%s$' % DN_RE)
for k in ("last attempt time",
"last attempt message",
"last success"):
self.assertTrue(isinstance(r[k], json_str))
- self.assertRegex('^%s$' % GUID_RE, r["DSA objectGUID"])
+ self.assertRegexpMatches(r["DSA objectGUID"], '^%s$' % GUID_RE)
self.assertTrue(isinstance(r["consecutive failures"], int))
# ntdsconnection
for n in d["NTDSConnections"]:
- self.assertRegex(r'^[\w]+\.samba\.example\.com$', n["dns name"])
- self.assertRegex("^%s$" % GUID_RE, n["name"])
+ self.assertRegexpMatches(n["dns name"],
+ r'^[\w]+\.samba\.example\.com$')
+ self.assertRegexpMatches(n["name"], "^%s$" % GUID_RE)
self.assertTrue(isinstance(n['enabled'], bool))
self.assertTrue(isinstance(n['options'], int))
self.assertTrue(isinstance(n['replicates NC'], list))
- self.assertRegex("^%s$" % DN_RE, n["remote DN"])
+ self.assertRegexpMatches(n["remote DN"], "^%s$" % DN_RE)
def _force_all_reps(self, samdb, dc, direction):
if direction == 'inbound':
continue
dsa_dn = str(ldb.Dn(samdb, x.source_dsa_obj_dn).parent())
- res = samdb.search(base=dsa_dn,
- scope=ldb.SCOPE_BASE,
- attrs=['dNSHostName'])
+ try:
+ res = samdb.search(base=dsa_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['dNSHostName'])
+ except ldb.LdbError as e:
+ if e.args[0] != ldb.ERR_NO_SUCH_OBJECT:
+ raise
+ continue
+
+ if len(res) == 0:
+ print("server %s has no dNSHostName" % dsa_dn)
+ continue
+
+ remote = res[0].get('dNSHostName', [''])[0]
+ if remote:
+ self._enable_all_repl(remote)
- remote = res[0]['dNSHostName'][0]
- self._enable_all_repl(remote)
if direction == 'inbound':
src, dest = remote, dc
else:
except samba.tests.BlackboxProcessError as e:
print("Good, failed as expected after %d rounds: %r" % (i, e.cmd))
self.assertIn('There are failing connections', e.stdout)
- self.assertRegexpMatches(e.stdout,
- r'result 845[67] '
- r'\(WERR_DS_DRA_(SINK|SOURCE)_DISABLED\)',
- msg=("The process should have failed "
- "because replication was forced off, "
- "but it failed for some other reason."))
+ self.assertRegexpMatches(
+ e.stdout,
+ r'result 845[67] '
+ r'\(WERR_DS_DRA_(SINK|SOURCE)_DISABLED\)',
+ msg=("The process should have failed "
+ "because replication was forced off, "
+ "but it failed for some other reason."))
self.assertIn('consecutive failure(s).', e.stdout)
else:
self.fail("No DRS failure noticed after 100 rounds of trying")