s4-tests: Added DsdbTestCase class, containing some common methods used by the dsdb...
[nivanova/samba.git] / source4 / scripting / python / samba / tests / __init__.py
1 #!/usr/bin/env python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2008
5 #   
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #   
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #   
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 """Samba Python tests."""
21
22 import os
23 import ldb
24 import samba
25 import samba.auth
26 from samba import param
27 import subprocess
28 import tempfile
29
30 # Other modules import these two classes from here, for convenience:
31 from testtools.testcase import TestCase, TestSkipped
32
33
34 class LdbTestCase(TestCase):
35     """Trivial test case for running tests against a LDB."""
36
37     def setUp(self):
38         super(LdbTestCase, self).setUp()
39         self.filename = os.tempnam()
40         self.ldb = samba.Ldb(self.filename)
41
42     def set_modules(self, modules=[]):
43         """Change the modules for this Ldb."""
44         m = ldb.Message()
45         m.dn = ldb.Dn(self.ldb, "@MODULES")
46         m["@LIST"] = ",".join(modules)
47         self.ldb.add(m)
48         self.ldb = samba.Ldb(self.filename)
49
50
51 class TestCaseInTempDir(TestCase):
52
53     def setUp(self):
54         super(TestCaseInTempDir, self).setUp()
55         self.tempdir = tempfile.mkdtemp()
56
57     def tearDown(self):
58         super(TestCaseInTempDir, self).tearDown()
59         self.assertEquals([], os.listdir(self.tempdir))
60         os.rmdir(self.tempdir)
61
62
63 def env_loadparm():
64     lp = param.LoadParm()
65     try:
66         lp.load(os.environ["SMB_CONF_PATH"])
67     except KeyError:
68         raise Exception("SMB_CONF_PATH not set")
69     return lp
70
71 def env_get_var_value(var_name):
72     """Returns value for variable in os.environ
73
74     Function throws AssertionError if variable is defined.
75     Unit-test based python tests require certain input params
76     to be set in environment, otherwise they can't be run
77     """
78     assert var_name in os.environ.keys(), "Please supply %s in environment" % var_name
79     return os.environ[var_name]
80
81
82 cmdline_credentials = None
83
84 class RpcInterfaceTestCase(TestCase):
85
86     def get_loadparm(self):
87         return env_loadparm()
88
89     def get_credentials(self):
90         return cmdline_credentials
91
92
93 class ValidNetbiosNameTests(TestCase):
94
95     def test_valid(self):
96         self.assertTrue(samba.valid_netbios_name("FOO"))
97
98     def test_too_long(self):
99         self.assertFalse(samba.valid_netbios_name("FOO"*10))
100
101     def test_invalid_characters(self):
102         self.assertFalse(samba.valid_netbios_name("*BLA"))
103
104
105 class BlackboxTestCase(TestCase):
106     """Base test case for blackbox tests."""
107
108     def check_run(self, line):
109         bindir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../bin"))
110         parts = line.split(" ")
111         if os.path.exists(os.path.join(bindir, parts[0])):
112             parts[0] = os.path.join(bindir, parts[0])
113         line = " ".join(parts)
114         subprocess.check_call(line, shell=True)
115
116 def connect_samdb(samdb_url, lp=None, session_info=None,
117                   credentials=None, flags=0, ldb_options=None, ldap_only=False):
118     """Creates SamDB instance and connects to samdb_url database.
119
120     :param samdb_url: Url for database to connect to.
121     :param lp: Optional loadparm object
122     :param session_info: Optional session information
123     :param credentials: Optional credentials, defaults to anonymous.
124     :param flags: Optional LDB flags
125     :param ldap_only: If set, only remote LDAP connection will be created.
126
127     Added value for tests is that we have a shorthand function
128     to make proper URL for ldb.connect() while using default
129     parameters for connection based on test environment
130     """
131     samdb_url = samdb_url.lower()
132     if not "://" in samdb_url:
133         if not ldap_only and os.path.isfile(samdb_url):
134             samdb_url = "tdb://%s" % samdb_url
135         else:
136             samdb_url = "ldap://%s" % samdb_url
137     # use 'paged_search' module when connecting remotely
138     if samdb_url.startswith("ldap://"):
139         ldb_options = ["modules:paged_searches"]
140     else:
141         assert not ldap_only, \
142                "Trying to connect to %s while remote connection is required" % samdb_url
143
144     # set defaults for test environment
145     if not lp:
146         lp=env_loadparm()
147     if not session_info:
148         session_info=samba.auth.system_session()
149     if not credentials:
150         credentials=cmdline_credentials
151
152     from samba.samdb import SamDB
153     return SamDB(url=samdb_url,
154                  lp=lp,
155                  session_info=session_info,
156                  credentials=credentials,
157                  flags=flags,
158                  options=ldb_options)
159
160 from ldb import (
161     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT)
162 from ldb import Message, MessageElement, Dn
163 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_DELETE
164 from samba.ndr import ndr_unpack
165 from samba.dcerpc import security
166
167 class DsdbTestCase(TestCase):
168     """Base test case for dsdb tests."""
169
170     def delete_force(self, ldb, dn):
171         try:
172             ldb.delete(dn)
173         except LdbError, (num, _):
174             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
175
176     def find_domain_sid(self, ldb):
177         res = ldb.search(base=ldb.domain_dn(), expression="(objectClass=*)", scope=SCOPE_BASE)
178         return ndr_unpack(security.dom_sid,res[0]["objectSid"][0])
179
180     def set_dsheuristics(self, ldb, dsheuristics):
181         configuration_dn = ldb.get_config_basedn().get_linearized()
182         m = Message()
183         m.dn = Dn(ldb, "CN=Directory Service, CN=Windows NT, CN=Services, "
184                   + configuration_dn)
185         if dsheuristics is not None:
186             m["dSHeuristics"] = MessageElement(dsheuristics, FLAG_MOD_REPLACE,
187                                                "dSHeuristics")
188         else:
189             m["dSHeuristics"] = MessageElement([], FLAG_MOD_DELETE, "dsHeuristics")
190         ldb.modify(m)
191
192     def set_minPwdAge(self, ldb, value):
193         m = Message()
194         m.dn = Dn(ldb, ldb.domain_dn())
195         m["minPwdAge"] = MessageElement(value, FLAG_MOD_REPLACE, "minPwdAge")
196         ldb.modify(m)