3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Samba Python tests."""
26 from samba import param
27 from samba.samdb import SamDB
31 # Other modules import these two classes from here, for convenience:
32 from testtools.testcase import (
33 TestCase as TesttoolsTestCase,
38 class TestCase(TesttoolsTestCase):
39 """A Samba test case."""
42 super(TestCase, self).setUp()
43 test_debug_level = os.getenv("TEST_DEBUG_LEVEL")
44 if test_debug_level is not None:
45 test_debug_level = int(test_debug_level)
46 self._old_debug_level = samba.get_debug_level()
47 samba.set_debug_level(test_debug_level)
48 self.addCleanup(samba.set_debug_level, test_debug_level)
50 def get_loadparm(self):
53 def get_credentials(self):
54 return cmdline_credentials
57 class LdbTestCase(TesttoolsTestCase):
58 """Trivial test case for running tests against a LDB."""
61 super(LdbTestCase, self).setUp()
62 self.filename = os.tempnam()
63 self.ldb = samba.Ldb(self.filename)
65 def set_modules(self, modules=[]):
66 """Change the modules for this Ldb."""
68 m.dn = ldb.Dn(self.ldb, "@MODULES")
69 m["@LIST"] = ",".join(modules)
71 self.ldb = samba.Ldb(self.filename)
74 class TestCaseInTempDir(TestCase):
77 super(TestCaseInTempDir, self).setUp()
78 self.tempdir = tempfile.mkdtemp()
81 super(TestCaseInTempDir, self).tearDown()
82 self.assertEquals([], os.listdir(self.tempdir))
83 os.rmdir(self.tempdir)
89 lp.load(os.environ["SMB_CONF_PATH"])
91 raise Exception("SMB_CONF_PATH not set")
95 def env_get_var_value(var_name):
96 """Returns value for variable in os.environ
98 Function throws AssertionError if variable is defined.
99 Unit-test based python tests require certain input params
100 to be set in environment, otherwise they can't be run
102 assert var_name in os.environ.keys(), "Please supply %s in environment" % var_name
103 return os.environ[var_name]
106 cmdline_credentials = None
108 class RpcInterfaceTestCase(TestCase):
109 """DCE/RPC Test case."""
112 class ValidNetbiosNameTests(TestCase):
114 def test_valid(self):
115 self.assertTrue(samba.valid_netbios_name("FOO"))
117 def test_too_long(self):
118 self.assertFalse(samba.valid_netbios_name("FOO"*10))
120 def test_invalid_characters(self):
121 self.assertFalse(samba.valid_netbios_name("*BLA"))
124 class BlackboxProcessError(subprocess.CalledProcessError):
125 """This exception is raised when a process run by check_output() returns
126 a non-zero exit status. Exception instance should contain
127 the exact exit code (S.returncode), command line (S.cmd),
128 process output (S.stdout) and process error stream (S.stderr)"""
129 def __init__(self, returncode, cmd, stdout, stderr):
130 super(BlackboxProcessError, self).__init__(returncode, cmd)
134 return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self.cmd, self.returncode,
135 self.stdout, self.stderr)
137 class BlackboxTestCase(TestCase):
138 """Base test case for blackbox tests."""
140 def _make_cmdline(self, line):
141 bindir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../bin"))
142 parts = line.split(" ")
143 if os.path.exists(os.path.join(bindir, parts[0])):
144 parts[0] = os.path.join(bindir, parts[0])
145 line = " ".join(parts)
148 def check_run(self, line):
149 line = self._make_cmdline(line)
150 subprocess.check_call(line, shell=True)
152 def check_output(self, line):
153 line = self._make_cmdline(line)
154 p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
157 raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read())
158 return p.stdout.read()
160 def connect_samdb(samdb_url, lp=None, session_info=None, credentials=None,
161 flags=0, ldb_options=None, ldap_only=False):
162 """Create SamDB instance and connects to samdb_url database.
164 :param samdb_url: Url for database to connect to.
165 :param lp: Optional loadparm object
166 :param session_info: Optional session information
167 :param credentials: Optional credentials, defaults to anonymous.
168 :param flags: Optional LDB flags
169 :param ldap_only: If set, only remote LDAP connection will be created.
171 Added value for tests is that we have a shorthand function
172 to make proper URL for ldb.connect() while using default
173 parameters for connection based on test environment
175 samdb_url = samdb_url.lower()
176 if not "://" in samdb_url:
177 if not ldap_only and os.path.isfile(samdb_url):
178 samdb_url = "tdb://%s" % samdb_url
180 samdb_url = "ldap://%s" % samdb_url
181 # use 'paged_search' module when connecting remotely
182 if samdb_url.startswith("ldap://"):
183 ldb_options = ["modules:paged_searches"]
185 raise AssertionError("Trying to connect to %s while remote "
186 "connection is required" % samdb_url)
188 # set defaults for test environment
191 if session_info is None:
192 session_info = samba.auth.system_session(lp)
193 if credentials is None:
194 credentials = cmdline_credentials
196 return SamDB(url=samdb_url,
198 session_info=session_info,
199 credentials=credentials,
204 def connect_samdb_ex(samdb_url, lp=None, session_info=None, credentials=None,
205 flags=0, ldb_options=None, ldap_only=False):
206 """Connects to samdb_url database
208 :param samdb_url: Url for database to connect to.
209 :param lp: Optional loadparm object
210 :param session_info: Optional session information
211 :param credentials: Optional credentials, defaults to anonymous.
212 :param flags: Optional LDB flags
213 :param ldap_only: If set, only remote LDAP connection will be created.
214 :return: (sam_db_connection, rootDse_record) tuple
216 sam_db = connect_samdb(samdb_url, lp, session_info, credentials,
217 flags, ldb_options, ldap_only)
219 res = sam_db.search(base="", expression="", scope=ldb.SCOPE_BASE,
221 return (sam_db, res[0])
224 def delete_force(samdb, dn):
227 except ldb.LdbError, (num, _):
228 assert(num == ldb.ERR_NO_SUCH_OBJECT)