82807e7ba6f718c6f0b19b07b9f1a6e9ec404b9d
[samba.git] / python / samba / tests / kcc / ldif_import_export.py
1 # Unix SMB/CIFS implementation. Tests for samba.kcc.ldif_import_export.
2 # Copyright (C) Andrew Bartlett 2015
3 #
4 # Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 """Tests for samba.kcc.ldif_import_export"""
21
22 import samba
23 import os
24 import time
25 import shutil
26 import sys
27 import subprocess
28
29 import samba.tests
30 from samba.kcc import ldif_import_export, KCC
31 from samba import ldb
32 from samba.dcerpc import misc
33
34
35 from samba.param import LoadParm
36 from samba.credentials import Credentials
37 from samba.samdb import SamDB
38
39 unix_now = int(time.time())
40
41 MULTISITE_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
42                               "testdata/ldif-utils-test-multisite.ldif")
43
44 MULTISITE_LDIF_DSAS = (
45     ("CN=WIN08,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
46      "Site-4"),
47     ("CN=WIN07,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
48      "Site-4"),
49     ("CN=WIN06,CN=Servers,CN=Site-3,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
50      "Site-3"),
51     ("CN=WIN09,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
52      "Site-5"),
53     ("CN=WIN10,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
54      "Site-5"),
55     ("CN=WIN02,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
56      "Site-2"),
57     ("CN=WIN04,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
58      "Site-2"),
59     ("CN=WIN03,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
60      "Site-2"),
61     ("CN=WIN05,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
62      "Site-2"),
63     ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
64      "Default-First-Site-Name"),
65 )
66
67
68 class LdifImportExportTests(samba.tests.TestCaseInTempDir):
69     def setUp(self):
70         super(LdifImportExportTests, self).setUp()
71         self.lp = LoadParm()
72         self.creds = Credentials()
73         self.creds.guess(self.lp)
74
75     def remove_files(self, *files):
76         for f in files:
77             assert(f.startswith(self.tempdir))
78             os.unlink(f)
79
80     def test_write_search_url(self):
81         pass
82
83     def test_ldif_to_samdb(self):
84         dburl = os.path.join(self.tempdir, "ldap")
85         samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
86                                                  MULTISITE_LDIF)
87         self.assertIsInstance(samdb, SamDB)
88
89         dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
90                "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
91         res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
92                            scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
93
94         ntds_guid = misc.GUID(samdb.get_ntds_GUID())
95         self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
96
97         service_name_res = samdb.search(base="",
98                                         scope=ldb.SCOPE_BASE,
99                                         attrs=["dsServiceName"])
100         dn = ldb.Dn(samdb,
101                     service_name_res[0]["dsServiceName"][0])
102         self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
103         self.remove_files(dburl)
104
105     def test_ldif_to_samdb_forced_local_dsa(self):
106         for dsa, site in MULTISITE_LDIF_DSAS:
107             dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa"
108                                  "-%s" % dsa)
109             samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
110                                                      MULTISITE_LDIF,
111                                                      forced_local_dsa=dsa)
112             self.assertIsInstance(samdb, SamDB)
113             self.assertEqual(samdb.server_site_name(), site)
114
115             res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
116                                scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
117
118             ntds_guid = misc.GUID(samdb.get_ntds_GUID())
119             self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
120
121             service_name_res = samdb.search(base="",
122                                             scope=ldb.SCOPE_BASE,
123                                             attrs=["dsServiceName"])
124             dn = ldb.Dn(samdb,
125                         service_name_res[0]["dsServiceName"][0])
126             self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
127             self.remove_files(dburl)
128
129
130     def test_samdb_to_ldif_file(self):
131         dburl = os.path.join(self.tempdir, "ldap")
132         dburl2 = os.path.join(self.tempdir, "ldap_roundtrip")
133         ldif_file = os.path.join(self.tempdir, "ldif")
134         samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
135                                                  MULTISITE_LDIF)
136         self.assertIsInstance(samdb, SamDB)
137         ldif_import_export.samdb_to_ldif_file(samdb, dburl,
138                                               lp=self.lp, creds=None,
139                                               ldif_file=ldif_file)
140         self.assertGreater(os.path.getsize(ldif_file), 1000,
141                            "LDIF should be larger than 1000 bytes")
142         samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp,
143                                                  ldif_file)
144         self.assertIsInstance(samdb, SamDB)
145         dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
146                "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
147         res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
148                            scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
149         self.remove_files(dburl)
150         self.remove_files(dburl2)
151         self.remove_files(ldif_file)
152
153
154 class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir):
155     def setUp(self):
156         super(KCCMultisiteLdifTests, self).setUp()
157         self.lp = LoadParm()
158         self.creds = Credentials()
159         self.creds.guess(self.lp)
160
161     def remove_files(self, *files):
162         for f in files:
163             assert(f.startswith(self.tempdir))
164             os.unlink(f)
165
166     def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None):
167         # Note that setting read-only to False won't affect the ldif,
168         # only the temporary database that is created from it.
169         my_kcc = KCC(unix_now, readonly=readonly, verify=verify,
170                      dot_file_dir=dot_file_dir)
171         tmpdb = os.path.join(self.tempdir, 'tmpdb')
172         my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
173         self.remove_files(tmpdb)
174         return my_kcc
175
176     def test_list_dsas(self):
177         my_kcc = self._get_kcc('test-list')
178         dsas = set(my_kcc.list_dsas())
179         expected_dsas = set(x[0] for x in MULTISITE_LDIF_DSAS)
180         self.assertEqual(dsas, expected_dsas)
181
182     def test_verify(self):
183         """Check that the KCC generates graphs that pass its own verify
184         option.
185         """
186         my_kcc = self._get_kcc('test-verify', verify=True)
187         tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
188         my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
189
190         my_kcc.run(None,
191                    self.lp, self.creds,
192                    attempt_live_connections=False)
193         self.remove_files(tmpdb)
194
195     def test_dotfiles(self):
196         """Check that KCC writes dot_files when asked.
197         """
198         my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir)
199         tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb')
200         files = [tmpdb]
201         my_kcc.import_ldif(tmpdb, self.lp, self.creds, MULTISITE_LDIF)
202         my_kcc.run(None,
203                    self.lp, self.creds,
204                    attempt_live_connections=False)
205
206         dot = '/usr/bin/dot'
207         for fn in os.listdir(self.tempdir):
208             if fn.endswith('.dot'):
209                 ffn = os.path.join(self.tempdir, fn)
210                 if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0:
211                     r = subprocess.call([dot, '-Tcanon', ffn])
212                     self.assertEqual(r, 0)
213
214                 #even if dot is not there, at least check the file is non-empty
215                 size = os.stat(ffn).st_size
216                 self.assertNotEqual(size, 0)
217                 files.append(ffn)
218
219         self.remove_files(*files)