1 # Unix SMB/CIFS implementation.
3 # Copyright (C) Bjoern Baumbach <bb@sernet.de> 2018
6 # Copyright (C) Michael Adam 2012
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from samba.tests.samba_tool.base import SambaToolCmdTest
25 from samba import dsdb
26 from samba.ndr import ndr_unpack, ndr_pack
27 from samba.dcerpc import dnsp
30 class ComputerCmdTestCase(SambaToolCmdTest):
31 """Tests for samba-tool computer subcommands"""
36 super(ComputerCmdTestCase, self).setUp()
37 self.creds = "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])
38 self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], self.creds)
39 # ips used to test --ip-address option
40 self.ipv4 = '10.10.10.10'
41 self.ipv6 = '2001:0db8:0a0b:12f0:0000:0000:0000:0001'
44 'name': 'testcomputer1',
45 'ip_address_list': [self.ipv4]
48 'name': 'testcomputer2',
49 'ip_address_list': [self.ipv6],
50 'service_principal_name_list': ['SPN0']
53 'name': 'testcomputer3$',
54 'ip_address_list': [self.ipv4, self.ipv6],
55 'service_principal_name_list': ['SPN0', 'SPN1']
58 'name': 'testcomputer4$',
61 self.computers = [self._randomComputer(base=item) for item in data]
63 # setup the 4 computers and ensure they are correct
64 for computer in self.computers:
65 (result, out, err) = self._create_computer(computer)
67 self.assertCmdSuccess(result, out, err)
68 self.assertEquals(err, "", "There shouldn't be any error message")
69 self.assertIn("Computer '%s' created successfully" %
70 computer["name"], out)
72 found = self._find_computer(computer["name"])
74 self.assertIsNotNone(found)
76 expectedname = computer["name"].rstrip('$')
77 expectedsamaccountname = computer["name"]
78 if not computer["name"].endswith('$'):
79 expectedsamaccountname = "%s$" % computer["name"]
80 self.assertEquals("%s" % found.get("name"), expectedname)
81 self.assertEquals("%s" % found.get("sAMAccountName"),
82 expectedsamaccountname)
83 self.assertEquals("%s" % found.get("description"),
84 computer["description"])
87 super(ComputerCmdTestCase, self).tearDown()
88 # clean up all the left over computers, just in case
89 for computer in self.computers:
90 if self._find_computer(computer["name"]):
91 (result, out, err) = self.runsubcmd("computer", "delete",
92 "%s" % computer["name"])
93 self.assertCmdSuccess(result, out, err,
94 "Failed to delete computer '%s'" %
97 def test_newcomputer_with_service_principal_name(self):
98 # Each computer should have correct servicePrincipalName as provided.
99 for computer in self.computers:
100 expected_names = computer.get('service_principal_name_list', [])
101 found = self._find_service_principal_name(computer['name'], expected_names)
102 self.assertTrue(found)
104 def test_newcomputer_with_dns_records(self):
106 # Each computer should have correct DNS record and ip address.
107 for computer in self.computers:
108 for ip_address in computer.get('ip_address_list', []):
109 found = self._find_dns_record(computer['name'], ip_address)
110 self.assertTrue(found)
112 # try to delete all the computers we just created
113 for computer in self.computers:
114 (result, out, err) = self.runsubcmd("computer", "delete",
115 "%s" % computer["name"])
116 self.assertCmdSuccess(result, out, err,
117 "Failed to delete computer '%s'" %
119 found = self._find_computer(computer["name"])
120 self.assertIsNone(found,
121 "Deleted computer '%s' still exists" %
124 # all DNS records should be gone
125 for computer in self.computers:
126 for ip_address in computer.get('ip_address_list', []):
127 found = self._find_dns_record(computer['name'], ip_address)
128 self.assertFalse(found)
130 def test_newcomputer(self):
131 """This tests the "computer create" and "computer delete" commands"""
132 # try to create all the computers again, this should fail
133 for computer in self.computers:
134 (result, out, err) = self._create_computer(computer)
135 self.assertCmdFail(result, "Succeeded to create existing computer")
136 self.assertIn("already exists", err)
138 # try to delete all the computers we just created
139 for computer in self.computers:
140 (result, out, err) = self.runsubcmd("computer", "delete", "%s" %
142 self.assertCmdSuccess(result, out, err,
143 "Failed to delete computer '%s'" %
145 found = self._find_computer(computer["name"])
146 self.assertIsNone(found,
147 "Deleted computer '%s' still exists" %
150 # test creating computers
151 for computer in self.computers:
152 (result, out, err) = self.runsubcmd(
153 "computer", "create", "%s" % computer["name"],
154 "--description=%s" % computer["description"])
156 self.assertCmdSuccess(result, out, err)
157 self.assertEquals(err, "", "There shouldn't be any error message")
158 self.assertIn("Computer '%s' created successfully" %
159 computer["name"], out)
161 found = self._find_computer(computer["name"])
163 expectedname = computer["name"].rstrip('$')
164 expectedsamaccountname = computer["name"]
165 if not computer["name"].endswith('$'):
166 expectedsamaccountname = "%s$" % computer["name"]
167 self.assertEquals("%s" % found.get("name"), expectedname)
168 self.assertEquals("%s" % found.get("sAMAccountName"),
169 expectedsamaccountname)
170 self.assertEquals("%s" % found.get("description"),
171 computer["description"])
174 (result, out, err) = self.runsubcmd("computer", "list")
175 self.assertCmdSuccess(result, out, err, "Error running list")
177 search_filter = ("(sAMAccountType=%u)" %
178 dsdb.ATYPE_WORKSTATION_TRUST)
180 computerlist = self.samdb.search(base=self.samdb.domain_dn(),
181 scope=ldb.SCOPE_SUBTREE,
182 expression=search_filter,
183 attrs=["samaccountname"])
185 self.assertTrue(len(computerlist) > 0, "no computers found in samdb")
187 for computerobj in computerlist:
188 name = computerobj.get("samaccountname", idx=0)
189 found = self.assertMatch(out, str(name),
190 "computer '%s' not found" % name)
193 parentou = self._randomOU({"name": "parentOU"})
194 (result, out, err) = self._create_ou(parentou)
195 self.assertCmdSuccess(result, out, err)
197 for computer in self.computers:
198 olddn = self._find_computer(computer["name"]).get("dn")
200 (result, out, err) = self.runsubcmd("computer", "move",
201 "%s" % computer["name"],
202 "OU=%s" % parentou["name"])
203 self.assertCmdSuccess(result, out, err,
204 "Failed to move computer '%s'" %
206 self.assertEquals(err, "", "There shouldn't be any error message")
207 self.assertIn('Moved computer "%s"' % computer["name"], out)
209 found = self._find_computer(computer["name"])
210 self.assertNotEquals(found.get("dn"), olddn,
211 ("Moved computer '%s' still exists with the "
212 "same dn" % computer["name"]))
213 computername = computer["name"].rstrip('$')
214 newexpecteddn = ldb.Dn(self.samdb,
216 (computername, parentou["name"],
217 self.samdb.domain_dn()))
218 self.assertEquals(found.get("dn"), newexpecteddn,
219 "Moved computer '%s' does not exist" %
222 (result, out, err) = self.runsubcmd("computer", "move",
223 "%s" % computer["name"],
224 "%s" % olddn.parent())
225 self.assertCmdSuccess(result, out, err,
226 "Failed to move computer '%s'" %
229 (result, out, err) = self.runsubcmd("ou", "delete",
230 "OU=%s" % parentou["name"])
231 self.assertCmdSuccess(result, out, err,
232 "Failed to delete ou '%s'" % parentou["name"])
234 def _randomComputer(self, base={}):
235 """create a computer with random attribute values, you can specify base
239 "name": self.randomName(),
240 "description": self.randomName(count=100),
242 computer.update(base)
245 def _randomOU(self, base={}):
246 """create an ou with random attribute values, you can specify base
250 "name": self.randomName(),
251 "description": self.randomName(count=100),
256 def _create_computer(self, computer):
257 args = '{0} {1} --description={2}'.format(
258 computer['name'], self.creds, computer["description"])
260 for ip_address in computer.get('ip_address_list', []):
261 args += ' --ip-address={0}'.format(ip_address)
263 for service_principal_name in computer.get('service_principal_name_list', []):
264 args += ' --service-principal-name={0}'.format(service_principal_name)
268 return self.runsubcmd('computer', 'create', *args)
270 def _create_ou(self, ou):
271 return self.runsubcmd("ou", "create", "OU=%s" % ou["name"],
272 "--description=%s" % ou["description"])
274 def _find_computer(self, name):
275 samaccountname = name
276 if not name.endswith('$'):
277 samaccountname = "%s$" % name
278 search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
279 (ldb.binary_encode(samaccountname),
280 "CN=Computer,CN=Schema,CN=Configuration",
281 self.samdb.domain_dn()))
282 computerlist = self.samdb.search(base=self.samdb.domain_dn(),
283 scope=ldb.SCOPE_SUBTREE,
284 expression=search_filter, attrs=[])
286 return computerlist[0]
290 def _find_dns_record(self, name, ip_address):
291 name = name.rstrip('$') # computername
292 records = self.samdb.search(
293 base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
294 scope=ldb.SCOPE_SUBTREE,
295 expression="(&(objectClass=dnsNode)(name={0}))".format(name),
296 attrs=['dnsRecord', 'dNSTombstoned'])
298 # unpack data and compare
299 for record in records:
300 if 'dNSTombstoned' in record and str(record['dNSTombstoned']) == 'TRUE':
301 # if a record is dNSTombstoned, ignore it.
303 for dns_record_bin in record['dnsRecord']:
304 dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
305 ip = str(dns_record_obj.data)
307 if str(ip) == str(ip_address):
312 def _find_service_principal_name(self, name, expected_service_principal_names):
313 """Find all servicePrincipalName values and compare with expected_service_principal_names"""
314 samaccountname = name.strip('$') + '$'
315 search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
316 (ldb.binary_encode(samaccountname),
317 "CN=Computer,CN=Schema,CN=Configuration",
318 self.samdb.domain_dn()))
319 computer_list = self.samdb.search(
320 base=self.samdb.domain_dn(),
321 scope=ldb.SCOPE_SUBTREE,
322 expression=search_filter,
323 attrs=['servicePrincipalName'])
325 for computer in computer_list:
326 for name in computer.get('servicePrincipalName', []):
328 return names == set(expected_service_principal_names)