1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
3 # Copyright (C) Stefan Metzmacher 2014,2015
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Samba Python tests."""
26 from samba import param
27 from samba import credentials
28 from samba.credentials import Credentials
33 from enum import IntEnum, unique
36 import samba.dcerpc.base
37 from random import randint
38 from random import SystemRandom
39 from contextlib import contextmanager
43 from samba.samdb import SamDB
45 # We are built without samdb support,
46 # imitate it so that connect_samdb() can recover
47 def SamDB(*args, **kwargs):
51 import samba.dcerpc.dcerpc
52 import samba.dcerpc.epmapper
54 from unittest import SkipTest
57 BINDIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
60 HEXDUMP_FILTER = bytearray([x if ((len(repr(chr(x))) == 3) and (x < 127)) else ord('.') for x in range(256)])
62 LDB_ERR_LUT = {v: k for k, v in vars(ldb).items() if k.startswith('ERR_')}
64 RE_CAMELCASE = re.compile(r"([_\-])+")
68 if isinstance(v, ldb.LdbError):
75 return f"[{', '.join(LDB_ERR_LUT.get(x, x) for x in v)}]"
76 except TypeError as e:
81 def DynamicTestCase(cls):
82 cls.setUpDynamicTestCases()
86 class TestCase(unittest.TestCase):
87 """A Samba test case."""
89 # Re-implement addClassCleanup to support Python versions older than 3.8.
90 # Can be removed once these older Python versions are no longer needed.
91 if sys.version_info.major == 3 and sys.version_info.minor < 8:
95 def addClassCleanup(cls, function, *args, **kwargs):
96 cls._class_cleanups.append((function, args, kwargs))
99 def tearDownClass(cls):
100 teardown_exceptions = []
102 while cls._class_cleanups:
103 function, args, kwargs = cls._class_cleanups.pop()
105 function(*args, **kwargs)
107 teardown_exceptions.append(traceback.format_exc())
109 # ExceptionGroup would be better but requires Python 3.11
110 if teardown_exceptions:
111 raise ValueError("tearDownClass failed:\n\n" +
112 "\n".join(teardown_exceptions))
117 Call setUpTestData, ensure tearDownClass is called on exceptions.
119 This is only required on Python versions older than 3.8.
130 setUpClass only needs to call setUpTestData.
132 On Python 3.8 and above unittest will always call tearDownClass,
133 even if an exception was raised in setUpClass.
138 def setUpTestData(cls):
139 """Create class level test fixtures here."""
143 def generate_dynamic_test(cls, fnname, suffix, *args, doc=None):
145 fnname is something like "test_dynamic_sum"
146 suffix is something like "1plus2"
147 argstr could be (1, 2)
149 This would generate a test case called
150 "test_dynamic_sum_1plus2(self)" that
152 self._test_dynamic_sum_with_args(1, 2)
155 getattr(self, "_%s_with_args" % fnname)(*args)
157 attr = "%s_%s" % (fnname, suffix)
158 if hasattr(cls, attr):
159 raise RuntimeError(f"Dynamic test {attr} already exists!")
160 setattr(cls, attr, fn)
163 def setUpDynamicTestCases(cls):
164 """This can be implemented in order to call cls.generate_dynamic_test()
165 In order to implement autogenerated testcase permutations.
167 msg = "%s needs setUpDynamicTestCases() if @DynamicTestCase is used!" % (cls)
168 raise NotImplementedError(msg)
170 def unique_name(self):
171 """Generate a unique name from within a test for creating objects.
173 Used to ensure each test generates uniquely named objects that don't
174 interfere with other tests.
176 # name of calling function
177 name = self.id().rsplit(".", 1)[1]
179 # remove test_ prefix
180 if name.startswith("test_"):
183 # finally, convert to camelcase
184 name = RE_CAMELCASE.sub(" ", name).title().replace(" ", "")
185 return "".join([name[0].lower(), name[1:]])
189 test_debug_level = os.getenv("TEST_DEBUG_LEVEL")
190 if test_debug_level is not None:
191 test_debug_level = int(test_debug_level)
192 self._old_debug_level = samba.get_debug_level()
193 samba.set_debug_level(test_debug_level)
194 self.addCleanup(samba.set_debug_level, test_debug_level)
197 def get_loadparm(cls):
198 return env_loadparm()
200 def get_credentials(self):
201 return cmdline_credentials
204 def get_env_credentials(cls, *, lp, env_username, env_password,
205 env_realm=None, env_domain=None):
206 creds = credentials.Credentials()
208 # guess Credentials parameters here. Otherwise, workstation
209 # and domain fields are NULL and gencache code segfaults
211 creds.set_username(env_get_var_value(env_username))
212 creds.set_password(env_get_var_value(env_password))
214 if env_realm is not None:
215 creds.set_realm(env_get_var_value(env_realm))
217 if env_domain is not None:
218 creds.set_domain(env_get_var_value(env_domain))
222 def get_creds_ccache_name(self):
223 creds = self.get_credentials()
224 ccache = creds.get_named_ccache(self.get_loadparm())
225 ccache_name = ccache.get_name()
229 def hexdump(self, src):
232 is_string = isinstance(src, str)
238 hl = ' '.join(["%02X" % ord(x) for x in ll])
239 hr = ' '.join(["%02X" % ord(x) for x in lr])
240 ll = ll.translate(HEXDUMP_FILTER)
241 lr = lr.translate(HEXDUMP_FILTER)
243 hl = ' '.join(["%02X" % x for x in ll])
244 hr = ' '.join(["%02X" % x for x in lr])
245 ll = ll.translate(HEXDUMP_FILTER).decode('utf8')
246 lr = lr.translate(HEXDUMP_FILTER).decode('utf8')
247 result += "[%04X] %-*s %-*s %s %s\n" % (N, 8 * 3, hl, 8 * 3, hr, ll, lr)
251 def insta_creds(self, template=None, username=None, userpass=None, kerberos_state=None):
254 raise ValueError("you need to supply a Credentials template")
256 if username is not None and userpass is None:
258 "you cannot set creds username without setting a password")
261 assert userpass is None
263 username = template.get_username()
264 userpass = template.get_password()
266 simple_bind_dn = template.get_bind_dn()
268 if kerberos_state is None:
269 kerberos_state = template.get_kerberos_state()
271 # get a copy of the global creds or the passed in creds
273 c.set_username(username)
274 c.set_password(userpass)
275 c.set_domain(template.get_domain())
276 c.set_realm(template.get_realm())
277 c.set_workstation(template.get_workstation())
278 c.set_gensec_features(c.get_gensec_features()
279 | samba.gensec.FEATURE_SEAL)
280 c.set_kerberos_state(kerberos_state)
282 c.set_bind_dn(simple_bind_dn)
285 def assertStringsEqual(self, a, b, msg=None, strip=False):
286 """Assert equality between two strings and highlight any differences.
287 If strip is true, leading and trailing whitespace is ignored."""
293 sys.stderr.write("The strings differ %s(lengths %d vs %d); "
295 % ('when stripped ' if strip else '',
299 from difflib import unified_diff
300 diff = unified_diff(a.splitlines(True),
304 sys.stderr.write(line)
308 def assertRaisesLdbError(self, errcode, message, f, *args, **kwargs):
309 """Assert a function raises a particular LdbError."""
311 message = f"{f.__name__}(*{args}, **{kwargs})"
314 except ldb.LdbError as e:
316 if isinstance(errcode, collections.abc.Container):
317 found = num in errcode
319 found = num == errcode
321 lut = {v: k for k, v in vars(ldb).items()
322 if k.startswith('ERR_') and isinstance(v, int)}
323 if isinstance(errcode, collections.abc.Container):
324 errcode_name = ' '.join(lut.get(x) for x in errcode)
326 errcode_name = lut.get(errcode)
327 self.fail(f"{message}, expected "
328 f"LdbError {errcode_name}, {errcode} "
329 f"got {lut.get(num)} ({num}) "
332 lut = {v: k for k, v in vars(ldb).items()
333 if k.startswith('ERR_') and isinstance(v, int)}
334 if isinstance(errcode, collections.abc.Container):
335 errcode_name = ' '.join(lut.get(x) for x in errcode)
337 errcode_name = lut.get(errcode)
338 self.fail("%s, expected "
340 "but we got success" % (message,
345 class LdbTestCase(TestCase):
346 """Trivial test case for running tests against a LDB."""
350 self.tempfile = tempfile.NamedTemporaryFile(delete=False)
351 self.filename = self.tempfile.name
352 self.ldb = samba.Ldb(self.filename)
354 def set_modules(self, modules=None):
355 """Change the modules for this Ldb."""
359 m.dn = ldb.Dn(self.ldb, "@MODULES")
360 m["@LIST"] = ",".join(modules)
362 self.ldb = samba.Ldb(self.filename)
365 class TestCaseInTempDir(TestCase):
369 self.tempdir = tempfile.mkdtemp()
370 self.addCleanup(self._remove_tempdir)
372 def _remove_tempdir(self):
373 # Note asserting here is treated as an error rather than a test failure
374 self.assertEqual([], os.listdir(self.tempdir))
375 os.rmdir(self.tempdir)
380 """Yield a temporary filename in the tempdir."""
382 fd, fn = tempfile.mkstemp(dir=self.tempdir)
388 except (OSError, IOError) as e:
389 print("could not remove temporary file: %s" % e,
392 def rm_files(self, *files, allow_missing=False, _rm=os.remove):
393 """Remove listed files from the temp directory.
395 The files must be true files in the directory itself, not in
398 By default a non-existent file will cause a test failure (or
399 error if used outside a test in e.g. tearDown), but if
400 allow_missing is true, the absence will be ignored.
403 path = os.path.join(self.tempdir, f)
405 # os.path.join will happily step out of the tempdir,
406 # so let's just check.
407 if os.path.dirname(path) != self.tempdir:
408 raise ValueError(f"{path} might be outside {self.tempdir}")
412 except FileNotFoundError as e:
413 if not allow_missing:
414 raise AssertionError(f"{f} not in {self.tempdir}: {e}")
416 print(f"{f} not in {self.tempdir}")
418 def rm_dirs(self, *dirs, allow_missing=False):
419 """Remove listed directories from temp directory.
421 This works like rm_files, but only removes directories,
422 including their contents.
424 self.rm_files(*dirs, allow_missing=allow_missing, _rm=shutil.rmtree)
428 lp = param.LoadParm()
430 lp.load(os.environ["SMB_CONF_PATH"])
432 raise KeyError("SMB_CONF_PATH not set")
436 def env_get_var_value(var_name, allow_missing=False):
437 """Returns value for variable in os.environ
439 Function throws AssertionError if variable is undefined.
440 Unit-test based python tests require certain input params
441 to be set in environment, otherwise they can't be run
444 if var_name not in os.environ.keys():
446 assert var_name in os.environ.keys(), "Please supply %s in environment" % var_name
447 return os.environ[var_name]
450 cmdline_credentials = None
453 class RpcInterfaceTestCase(TestCase):
454 """DCE/RPC Test case."""
457 class BlackboxProcessError(Exception):
458 """This is raised when check_output() process returns a non-zero exit status
460 Exception instance should contain the exact exit code (S.returncode),
461 command line (S.cmd), process output (S.stdout) and process error stream
465 def __init__(self, returncode, cmd, stdout, stderr, msg=None):
466 self.returncode = returncode
467 if isinstance(cmd, list):
468 self.cmd = ' '.join(cmd)
478 s = ("Command '%s'; shell %s; exit status %d; "
479 "stdout: '%s'; stderr: '%s'" %
480 (self.cmd, self.shell, self.returncode, self.stdout, self.stderr))
481 if self.msg is not None:
482 s = "%s; message: %s" % (s, self.msg)
487 class BlackboxTestCase(TestCaseInTempDir):
488 """Base test case for blackbox tests."""
491 def _make_cmdline(line):
492 """Expand the called script into a fully resolved path in the bin
494 if isinstance(line, list):
497 parts = line.split(" ", 1)
499 exe = os.path.join(BINDIR, cmd)
501 python_cmds = ["samba-tool",
504 "script/traffic_replay",
505 "script/traffic_learner"]
507 if os.path.exists(exe):
509 if cmd in python_cmds and os.getenv("PYTHON", False):
510 parts.insert(0, os.environ["PYTHON"])
512 if not isinstance(line, list):
513 line = " ".join(parts)
518 def check_run(cls, line, msg=None):
519 cls.check_exit_code(line, 0, msg=msg)
522 def check_exit_code(cls, line, expected, msg=None):
523 line = cls._make_cmdline(line)
524 use_shell = not isinstance(line, list)
525 p = subprocess.Popen(line,
526 stdout=subprocess.PIPE,
527 stderr=subprocess.PIPE,
529 stdoutdata, stderrdata = p.communicate()
530 retcode = p.returncode
531 if retcode != expected:
533 msg = "expected return code %s; got %s" % (expected, retcode)
534 raise BlackboxProcessError(retcode,
542 def check_output(cls, line):
543 use_shell = not isinstance(line, list)
544 line = cls._make_cmdline(line)
545 p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
546 shell=use_shell, close_fds=True)
547 stdoutdata, stderrdata = p.communicate()
548 retcode = p.returncode
550 raise BlackboxProcessError(retcode, line, stdoutdata, stderrdata)
554 # Run a command without checking the return code, returns the tuple
555 # (ret, stdout, stderr)
556 # where ret is the return code
557 # stdout is a string containing the commands stdout
558 # stderr is a string containing the commands stderr
560 def run_command(cls, line):
561 line = cls._make_cmdline(line)
562 use_shell = not isinstance(line, list)
563 p = subprocess.Popen(line,
564 stdout=subprocess.PIPE,
565 stderr=subprocess.PIPE,
567 stdoutdata, stderrdata = p.communicate()
568 retcode = p.returncode
569 return (retcode, stdoutdata.decode('UTF8'), stderrdata.decode('UTF8'))
571 # Generate a random password that can be safely passed on the command line
572 # i.e. it does not contain any shell meta characters.
573 def random_password(self, count=32):
574 password = SystemRandom().choice(string.ascii_uppercase)
575 password += SystemRandom().choice(string.digits)
576 password += SystemRandom().choice(string.ascii_lowercase)
577 password += ''.join(SystemRandom().choice(string.ascii_uppercase +
578 string.ascii_lowercase +
579 string.digits) for x in range(count - 3))
583 def connect_samdb(samdb_url, *, lp=None, session_info=None, credentials=None,
584 flags=0, ldb_options=None, ldap_only=False, global_schema=True):
585 """Create SamDB instance and connects to samdb_url database.
587 :param samdb_url: Url for database to connect to.
588 :param lp: Optional loadparm object
589 :param session_info: Optional session information
590 :param credentials: Optional credentials, defaults to anonymous.
591 :param flags: Optional LDB flags
592 :param ldap_only: If set, only remote LDAP connection will be created.
593 :param global_schema: Whether to use global schema.
595 Added value for tests is that we have a shorthand function
596 to make proper URL for ldb.connect() while using default
597 parameters for connection based on test environment
599 if "://" not in samdb_url:
600 if not ldap_only and os.path.isfile(samdb_url):
601 samdb_url = "tdb://%s" % samdb_url
603 samdb_url = "ldap://%s" % samdb_url
604 # use 'paged_search' module when connecting remotely
605 if samdb_url.startswith("ldap://"):
606 ldb_options = ["modules:paged_searches"]
608 raise AssertionError("Trying to connect to %s while remote "
609 "connection is required" % samdb_url)
611 # set defaults for test environment
614 if session_info is None:
615 session_info = samba.auth.system_session(lp)
616 if credentials is None:
617 credentials = cmdline_credentials
619 return SamDB(url=samdb_url,
621 session_info=session_info,
622 credentials=credentials,
625 global_schema=global_schema)
628 def connect_samdb_ex(samdb_url, *, lp=None, session_info=None, credentials=None,
629 flags=0, ldb_options=None, ldap_only=False):
630 """Connects to samdb_url database
632 :param samdb_url: Url for database to connect to.
633 :param lp: Optional loadparm object
634 :param session_info: Optional session information
635 :param credentials: Optional credentials, defaults to anonymous.
636 :param flags: Optional LDB flags
637 :param ldap_only: If set, only remote LDAP connection will be created.
638 :return: (sam_db_connection, rootDse_record) tuple
640 sam_db = connect_samdb(samdb_url, lp=lp, session_info=session_info,
641 credentials=credentials, flags=flags,
642 ldb_options=ldb_options, ldap_only=ldap_only)
644 res = sam_db.search(base="", expression="", scope=ldb.SCOPE_BASE,
646 return (sam_db, res[0])
649 def connect_samdb_env(env_url, env_username, env_password, lp=None):
650 """Connect to SamDB by getting URL and Credentials from environment
652 :param env_url: Environment variable name to get lsb url from
653 :param env_username: Username environment variable
654 :param env_password: Password environment variable
655 :return: sam_db_connection
657 samdb_url = env_get_var_value(env_url)
658 creds = credentials.Credentials()
660 # guess Credentials parameters here. Otherwise workstation
661 # and domain fields are NULL and gencache code segfaults
662 lp = param.LoadParm()
664 creds.set_username(env_get_var_value(env_username))
665 creds.set_password(env_get_var_value(env_password))
666 return connect_samdb(samdb_url, credentials=creds, lp=lp)
669 def delete_force(samdb, dn, **kwargs):
671 samdb.delete(dn, **kwargs)
672 except ldb.LdbError as error:
673 (num, errstr) = error.args
674 assert num == ldb.ERR_NO_SUCH_OBJECT, "ldb.delete() failed: %s" % errstr
677 def create_test_ou(samdb, name):
678 """Creates a unique OU for the test"""
680 # Add some randomness to the test OU. Replication between the testenvs is
681 # constantly happening in the background. Deletion of the last test's
682 # objects can be slow to replicate out. So the OU created by a previous
683 # testenv may still exist at the point that tests start on another testenv.
684 rand = randint(1, 10000000)
685 dn = ldb.Dn(samdb, "OU=%s%d,%s" % (name, rand, samdb.get_default_basedn()))
686 samdb.add({"dn": dn, "objectclass": "organizationalUnit"})
691 class OptState(IntEnum):
698 def parse_help_consistency(out,
702 max_leading_spaces=10):
703 if options_start is None:
708 for raw_line in out.split('\n'):
709 line = raw_line.lstrip()
712 if opt_lines is None:
713 if line == options_start:
717 if len(line) < len(raw_line) - max_leading_spaces:
718 # for the case where we have:
720 # --foo frobnicate or barlify depending on
723 # where we want to ignore the --bar.
726 opt_lines.append(line)
727 if line == options_end:
730 if opt_lines is None:
731 # No --help options is not an error in *this* test.
734 is_longname_char = re.compile(r'^[\w-]$').match
735 for line in opt_lines:
736 state = OptState.NOOPT
740 if state == OptState.NOOPT:
741 if c == '-' and prev.isspace():
742 state = OptState.HYPHEN1
745 if state == OptState.HYPHEN1:
748 state = OptState.NAME
750 state = OptState.HYPHEN2
752 if state == OptState.HYPHEN2:
755 state = OptState.NAME
756 else: # WTF, perhaps '--' ending option list.
757 state = OptState.NOOPT
760 if state == OptState.NAME:
761 if is_longname_char(c):
764 optmap.setdefault(name, []).append(line)
765 state = OptState.NOOPT
768 if state == OptState.NAME:
769 optmap.setdefault(name, []).append(line)
772 def check_help_consistency(out,
775 """Ensure that options are not repeated and redefined in --help
778 Returns None if everything is OK, otherwise a string indicating
781 If options_start and/or options_end are provided, only the bit in
782 the output between these two lines is considered. For example,
785 options_start='Options:', options_end='Available subcommands:'
787 will prevent the test looking at the preamble which may contain
788 examples using options.
790 # Silly test, you might think, but this happens
792 parse_help_consistency(out,
798 for k, values in sorted(optmap.items()):
801 errors.append("%s: %s" % (k, v))
804 return "\n".join(errors)
807 def get_env_dir(key):
808 """A helper to pull a directory name from the environment, used in
809 some tests that optionally write e.g. fuzz seeds into a directory
810 named in an environment variable.
812 dir = os.environ.get(key)
816 if not os.path.isdir(dir):
818 f"{key} should name an existing directory (got '{dir}')")