from __future__ import print_function
import os
import re
+import json
import random
import subprocess
from samba.tests.samba_tool.base import SambaToolCmdTest
'CN=LOCALVAMPIREDC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=samba,DC=example,DC=com'],
}
+PARTITION_NAMES = [
+ "DOMAIN",
+ "CONFIGURATION",
+ "SCHEMA",
+ "DNSDOMAIN",
+ "DNSFOREST",
+]
+
def set_auto_replication(dc, allow):
credstring = '-U%s%%%s' % (os.environ["USERNAME"], os.environ["PASSWORD"])
def test_uptodateness_partitions(self):
creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
dc1 = os.environ["SERVER"]
- for part in ["CONFIGURATION",
- "SCHEMA",
- "DNSDOMAIN",
- "DNSFOREST"]:
-
+ for part in PARTITION_NAMES:
(result, out, err) = self.runsubcmd("visualize", "uptodateness",
"-r",
'-H', "ldap://%s" % dc1,
'--partition', part)
self.assertCmdSuccess(result, out, err)
+ def test_drs_uptodateness(self):
+ """
+ Test cmd `drs uptodateness`
+
+ It should print info like this:
+
+ DNSDOMAIN failure: 4 median: 1.5 maximum: 2
+ SCHEMA failure: 4 median: 220.0 maximum: 439
+ DOMAIN failure: 1 median: 25 maximum: 25
+ CONFIGURATION failure: 1 median: 25 maximum: 25
+ DNSFOREST failure: 4 median: 1.5 maximum: 2
+
+ """
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ for dc in [dc1, dc2]:
+ (result, out, err) = self.runsubcmd("drs", "uptodateness",
+ '-H', "ldap://%s" % dc,
+ '-U', creds)
+ self.assertCmdSuccess(result, out, err)
+ # each partition name should be in output
+ for part_name in PARTITION_NAMES:
+ self.assertIn(part_name, out, msg=out)
+
+ for line in out.splitlines():
+ # check keyword in output
+ for attr in ['maximum', 'median', 'failure']:
+ self.assertIn(attr, line)
+
+ def test_drs_uptodateness_partition(self):
+ """
+ Test cmd `drs uptodateness --partition DOMAIN`
+
+ It should print info like this:
+
+ DOMAIN failure: 1 median: 25 maximum: 25
+
+ """
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ for dc in [dc1, dc2]:
+ (result, out, err) = self.runsubcmd("drs", "uptodateness",
+ '-H', "ldap://%s" % dc,
+ '-U', creds,
+ '--partition', 'DOMAIN')
+ self.assertCmdSuccess(result, out, err)
+
+ lines = out.splitlines()
+ self.assertEquals(len(lines), 1)
+
+ line = lines[0]
+ self.assertTrue(line.startswith('DOMAIN'))
+
+ def test_drs_uptodateness_json(self):
+ """
+ Test cmd `drs uptodateness --json`
+
+ Example output:
+
+ {
+ "DNSDOMAIN": {
+ "failure": 0,
+ "median": 0.0,
+ "maximum": 0
+ },
+ ...
+ "SCHEMA": {
+ "failure": 0,
+ "median": 0.0,
+ "maximum": 0
+ }
+ }
+ """
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ for dc in [dc1, dc2]:
+ (result, out, err) = self.runsubcmd("drs", "uptodateness",
+ '-H', "ldap://%s" % dc,
+ '-U', creds,
+ '--json')
+ self.assertCmdSuccess(result, out, err)
+ # should be json format
+ obj = json.loads(out)
+ # each partition name should be in json obj
+ for part_name in PARTITION_NAMES:
+ self.assertIn(part_name, obj)
+ summary_obj = obj[part_name]
+ for attr in ['maximum', 'median', 'failure']:
+ self.assertIn(attr, summary_obj)
+
+ def test_drs_uptodateness_json_median(self):
+ """
+ Test cmd `drs uptodateness --json --median`
+
+ drs uptodateness --json --median
+
+ {
+ "DNSDOMAIN": {
+ "median": 0.0
+ },
+ ...
+ "SCHEMA": {
+ "median": 0.0
+ }
+ }
+ """
+ creds = "%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])
+ dc1 = os.environ["SERVER"]
+ dc2 = os.environ["DC_SERVER"]
+ for dc in [dc1, dc2]:
+ (result, out, err) = self.runsubcmd("drs", "uptodateness",
+ '-H', "ldap://%s" % dc,
+ '-U', creds,
+ '--json', '--median')
+ self.assertCmdSuccess(result, out, err)
+ # should be json format
+ obj = json.loads(out)
+ # each partition name should be in json obj
+ for part_name in PARTITION_NAMES:
+ self.assertIn(part_name, obj)
+ summary_obj = obj[part_name]
+ self.assertIn('median', summary_obj)
+ self.assertNotIn('maximum', summary_obj)
+ self.assertNotIn('failure', summary_obj)
+
def assert_matrix_validity(self, matrix, dcs=()):
for dc in dcs:
self.assertIn(dc, matrix)