1 # Unix SMB/CIFS implementation. Tests for samba.kcc.ldif_import_export.
2 # Copyright (C) Andrew Bartlett 2015
4 # Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Tests for samba.kcc.ldif_import_export"""
30 from samba.kcc import ldif_import_export, KCC
32 from samba.dcerpc import misc
35 from samba.param import LoadParm
36 from samba.credentials import Credentials
37 from samba.samdb import SamDB
39 unix_now = int(time.time())
41 MULTISITE_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
42 "testdata/ldif-utils-test-multisite.ldif")
44 MULTISITE_LDIF_DSAS = (
45 ("CN=WIN08,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
47 ("CN=WIN07,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
49 ("CN=WIN06,CN=Servers,CN=Site-3,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
51 ("CN=WIN09,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
53 ("CN=WIN10,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
55 ("CN=WIN02,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
57 ("CN=WIN04,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
59 ("CN=WIN03,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
61 ("CN=WIN05,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
63 ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
64 "Default-First-Site-Name"),
68 class LdifImportExportTests(samba.tests.TestCaseInTempDir):
70 super(LdifImportExportTests, self).setUp()
72 self.creds = Credentials()
73 self.creds.guess(self.lp)
75 def remove_files(self, *files):
77 assert(f.startswith(self.tempdir))
80 def test_write_search_url(self):
83 def test_ldif_to_samdb(self):
84 dburl = os.path.join(self.tempdir, "ldap")
85 samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
87 self.assertIsInstance(samdb, SamDB)
89 dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
90 "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
91 res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
92 scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
94 ntds_guid = misc.GUID(samdb.get_ntds_GUID())
95 self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
97 service_name_res = samdb.search(base="",
99 attrs=["dsServiceName"])
101 service_name_res[0]["dsServiceName"][0])
102 self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
103 self.remove_files(dburl)
105 def test_ldif_to_samdb_forced_local_dsa(self):
106 for dsa, site in MULTISITE_LDIF_DSAS:
107 dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa"
109 samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
111 forced_local_dsa=dsa)
112 self.assertIsInstance(samdb, SamDB)
113 self.assertEqual(samdb.server_site_name(), site)
115 res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
116 scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
118 ntds_guid = misc.GUID(samdb.get_ntds_GUID())
119 self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
121 service_name_res = samdb.search(base="",
122 scope=ldb.SCOPE_BASE,
123 attrs=["dsServiceName"])
125 service_name_res[0]["dsServiceName"][0])
126 self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
127 self.remove_files(dburl)
130 def test_samdb_to_ldif_file(self):
131 dburl = os.path.join(self.tempdir, "ldap")
132 dburl2 = os.path.join(self.tempdir, "ldap_roundtrip")
133 ldif_file = os.path.join(self.tempdir, "ldif")
134 samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
136 self.assertIsInstance(samdb, SamDB)
137 ldif_import_export.samdb_to_ldif_file(samdb, dburl,
138 lp=self.lp, creds=None,
140 self.assertGreater(os.path.getsize(ldif_file), 1000,
141 "LDIF should be larger than 1000 bytes")
142 samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp,
144 self.assertIsInstance(samdb, SamDB)
145 dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
146 "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
147 res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
148 scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
149 self.remove_files(dburl)
150 self.remove_files(dburl2)
151 self.remove_files(ldif_file)
154 class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir):
156 super(KCCMultisiteLdifTests, self).setUp()
158 self.creds = Credentials()
159 self.creds.guess(self.lp)
161 def remove_files(self, *files):
163 assert(f.startswith(self.tempdir))
166 def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None):
167 # Note that setting read-only to False won't affect the ldif,
168 # only the temporary database that is created from it.
169 my_kcc = KCC(unix_now, readonly=readonly, verify=verify,
170 dot_file_dir=dot_file_dir)
171 tmpdb = os.path.join(self.tempdir, 'tmpdb')
172 my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
173 self.remove_files(tmpdb)
176 def test_list_dsas(self):
177 my_kcc = self._get_kcc('test-list')
178 dsas = set(my_kcc.list_dsas())
179 expected_dsas = set(x[0] for x in MULTISITE_LDIF_DSAS)
180 self.assertEqual(dsas, expected_dsas)
182 def test_verify(self):
183 """Check that the KCC generates graphs that pass its own verify
186 my_kcc = self._get_kcc('test-verify', verify=True)
187 tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
188 my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
192 attempt_live_connections=False)
193 self.remove_files(tmpdb)
195 def test_dotfiles(self):
196 """Check that KCC writes dot_files when asked.
198 my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir)
199 tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb')
201 my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
204 attempt_live_connections=False)
207 for fn in os.listdir(self.tempdir):
208 if fn.endswith('.dot'):
209 ffn = os.path.join(self.tempdir, fn)
210 if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0:
211 r = subprocess.call([dot, '-Tcanon', ffn])
212 self.assertEqual(r, 0)
214 #even if dot is not there, at least check the file is non-empty
215 size = os.stat(ffn).st_size
216 self.assertNotEqual(size, 0)
219 self.remove_files(*files)