1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
22 from samba.dcerpc import srvsvc, dnsserver
25 from samba.samdb import SamDB
26 import samba.tests.auth_log_base
27 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
28 from samba import NTSTATUSError
29 from subprocess import call
30 from ldb import LdbError
34 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
37 super(AuthLogTests, self).setUp()
38 self.remoteAddress = os.environ["CLIENT_IP"]
41 super(AuthLogTests, self).tearDown()
43 def _test_rpc_ncacn_np(self, authTypes, creds, service,
44 binding, protection, checkFunction):
45 def isLastExpectedMessage(msg):
46 return (msg["type"] == "Authorization" and
47 (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
48 msg["Authorization"]["serviceDescription"] == service) and
49 msg["Authorization"]["authType"] == authTypes[0] and
50 msg["Authorization"]["transportProtection"] == protection)
53 binding = "[%s]" % binding
55 if service == "dnsserver":
56 x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
59 elif service == "srvsvc":
60 x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
64 # The connection is passed to ensure the server
65 # messaging context stays up until all the messages have been received.
66 messages = self.waitForMessages(isLastExpectedMessage, x)
67 checkFunction(messages, authTypes, service, binding, protection)
69 def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
70 # Turn "[foo,bar]" into a list ("foo", "bar") to test
71 # lambda x: x removes anything that evaluates to False,
72 # including empty strings, so we handle "" as well
73 binding_list = filter(lambda x: x, re.compile('[\[,\]]').split(binding))
75 # Handle explicit smb2, smb1 or auto negotiation
76 if "smb2" in binding_list:
77 self.assertEquals(serviceDescription, "SMB2")
78 elif "smb1" in binding_list:
79 self.assertEquals(serviceDescription, "SMB")
81 self.assertIn(serviceDescription, ["SMB", "SMB2"])
83 def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
86 expected_messages = len(authTypes)
87 self.assertEquals(expected_messages,
89 "Did not receive the expected number of messages")
91 # Check the first message it should be an Authentication
93 self.assertEquals("Authentication", msg["type"])
94 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
95 self._assert_ncacn_np_serviceDescription(binding,
96 msg["Authentication"]["serviceDescription"])
97 self.assertEquals(authTypes[1],
98 msg["Authentication"]["authDescription"])
100 # Check the second message it should be an Authorization
102 self.assertEquals("Authorization", msg["type"])
103 self._assert_ncacn_np_serviceDescription(binding,
104 msg["Authorization"]["serviceDescription"])
105 self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
106 self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
107 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
109 # Check the third message it should be an Authentication
110 # if we are expecting 4 messages
111 if expected_messages == 4:
112 def checkServiceDescription(desc):
113 return (desc == "DCE/RPC" or desc == service)
116 self.assertEquals("Authentication", msg["type"])
117 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
119 checkServiceDescription(
120 msg["Authentication"]["serviceDescription"]))
122 self.assertEquals(authTypes[3],
123 msg["Authentication"]["authDescription"])
125 def rpc_ncacn_np_krb5_check(
133 expected_messages = len(authTypes)
134 self.assertEquals(expected_messages,
136 "Did not receive the expected number of messages")
138 # Check the first message it should be an Authentication
139 # This is almost certainly Authentication over UDP, and is probably
140 # returning message too big,
142 self.assertEquals("Authentication", msg["type"])
143 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
144 self.assertEquals("Kerberos KDC",
145 msg["Authentication"]["serviceDescription"])
146 self.assertEquals(authTypes[1],
147 msg["Authentication"]["authDescription"])
149 # Check the second message it should be an Authentication
150 # This this the TCP Authentication in response to the message too big
151 # response to the UDP Authentication
153 self.assertEquals("Authentication", msg["type"])
154 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
155 self.assertEquals("Kerberos KDC",
156 msg["Authentication"]["serviceDescription"])
157 self.assertEquals(authTypes[2],
158 msg["Authentication"]["authDescription"])
160 # Check the third message it should be an Authorization
162 self.assertEquals("Authorization", msg["type"])
163 self._assert_ncacn_np_serviceDescription(binding,
164 msg["Authorization"]["serviceDescription"])
165 self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
166 self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
167 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
169 def test_rpc_ncacn_np_ntlm_dns_sign(self):
170 creds = self.insta_creds(template=self.get_credentials(),
171 kerberos_state=DONT_USE_KERBEROS)
172 self._test_rpc_ncacn_np(["NTLMSSP",
176 creds, "dnsserver", "sign", "SIGN",
177 self.rpc_ncacn_np_ntlm_check)
179 def test_rpc_ncacn_np_ntlm_srv_sign(self):
180 creds = self.insta_creds(template=self.get_credentials(),
181 kerberos_state=DONT_USE_KERBEROS)
182 self._test_rpc_ncacn_np(["NTLMSSP",
186 creds, "srvsvc", "sign", "SIGN",
187 self.rpc_ncacn_np_ntlm_check)
189 def test_rpc_ncacn_np_ntlm_dns(self):
190 creds = self.insta_creds(template=self.get_credentials(),
191 kerberos_state=DONT_USE_KERBEROS)
192 self._test_rpc_ncacn_np(["ncacn_np",
195 creds, "dnsserver", "", "SMB",
196 self.rpc_ncacn_np_ntlm_check)
198 def test_rpc_ncacn_np_ntlm_srv(self):
199 creds = self.insta_creds(template=self.get_credentials(),
200 kerberos_state=DONT_USE_KERBEROS)
201 self._test_rpc_ncacn_np(["ncacn_np",
204 creds, "srvsvc", "", "SMB",
205 self.rpc_ncacn_np_ntlm_check)
207 def test_rpc_ncacn_np_krb_dns_sign(self):
208 creds = self.insta_creds(template=self.get_credentials(),
209 kerberos_state=MUST_USE_KERBEROS)
210 self._test_rpc_ncacn_np(["krb5",
211 "ENC-TS Pre-authentication",
212 "ENC-TS Pre-authentication",
214 creds, "dnsserver", "sign", "SIGN",
215 self.rpc_ncacn_np_krb5_check)
217 def test_rpc_ncacn_np_krb_srv_sign(self):
218 creds = self.insta_creds(template=self.get_credentials(),
219 kerberos_state=MUST_USE_KERBEROS)
220 self._test_rpc_ncacn_np(["krb5",
221 "ENC-TS Pre-authentication",
222 "ENC-TS Pre-authentication",
224 creds, "srvsvc", "sign", "SIGN",
225 self.rpc_ncacn_np_krb5_check)
227 def test_rpc_ncacn_np_krb_dns(self):
228 creds = self.insta_creds(template=self.get_credentials(),
229 kerberos_state=MUST_USE_KERBEROS)
230 self._test_rpc_ncacn_np(["ncacn_np",
231 "ENC-TS Pre-authentication",
232 "ENC-TS Pre-authentication",
234 creds, "dnsserver", "", "SMB",
235 self.rpc_ncacn_np_krb5_check)
237 def test_rpc_ncacn_np_krb_dns_smb2(self):
238 creds = self.insta_creds(template=self.get_credentials(),
239 kerberos_state=MUST_USE_KERBEROS)
240 self._test_rpc_ncacn_np(["ncacn_np",
241 "ENC-TS Pre-authentication",
242 "ENC-TS Pre-authentication",
244 creds, "dnsserver", "smb2", "SMB",
245 self.rpc_ncacn_np_krb5_check)
247 def test_rpc_ncacn_np_krb_srv(self):
248 creds = self.insta_creds(template=self.get_credentials(),
249 kerberos_state=MUST_USE_KERBEROS)
250 self._test_rpc_ncacn_np(["ncacn_np",
251 "ENC-TS Pre-authentication",
252 "ENC-TS Pre-authentication",
254 creds, "srvsvc", "", "SMB",
255 self.rpc_ncacn_np_krb5_check)
257 def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
258 binding, protection, checkFunction):
259 def isLastExpectedMessage(msg):
260 return (msg["type"] == "Authorization" and
261 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
262 msg["Authorization"]["authType"] == authTypes[0] and
263 msg["Authorization"]["transportProtection"] == protection)
266 binding = "[%s]" % binding
268 if service == "dnsserver":
269 conn = dnsserver.dnsserver(
270 "ncacn_ip_tcp:%s%s" % (self.server, binding),
273 elif service == "srvsvc":
274 conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
278 messages = self.waitForMessages(isLastExpectedMessage, conn)
279 checkFunction(messages, authTypes, service, binding, protection)
281 def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
282 binding, protection):
284 expected_messages = len(authTypes)
285 self.assertEquals(expected_messages,
287 "Did not receive the expected number of messages")
289 # Check the first message it should be an Authorization
291 self.assertEquals("Authorization", msg["type"])
292 self.assertEquals("DCE/RPC",
293 msg["Authorization"]["serviceDescription"])
294 self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
295 self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
296 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
298 # Check the second message it should be an Authentication
300 self.assertEquals("Authentication", msg["type"])
301 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
302 self.assertEquals("DCE/RPC",
303 msg["Authentication"]["serviceDescription"])
304 self.assertEquals(authTypes[2],
305 msg["Authentication"]["authDescription"])
307 def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
308 binding, protection):
310 expected_messages = len(authTypes)
311 self.assertEquals(expected_messages,
313 "Did not receive the expected number of messages")
315 # Check the first message it should be an Authorization
317 self.assertEquals("Authorization", msg["type"])
318 self.assertEquals("DCE/RPC",
319 msg["Authorization"]["serviceDescription"])
320 self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
321 self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
322 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
324 # Check the second message it should be an Authentication
326 self.assertEquals("Authentication", msg["type"])
327 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
328 self.assertEquals("Kerberos KDC",
329 msg["Authentication"]["serviceDescription"])
330 self.assertEquals(authTypes[2],
331 msg["Authentication"]["authDescription"])
333 # Check the third message it should be an Authentication
335 self.assertEquals("Authentication", msg["type"])
336 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
337 self.assertEquals("Kerberos KDC",
338 msg["Authentication"]["serviceDescription"])
339 self.assertEquals(authTypes[2],
340 msg["Authentication"]["authDescription"])
342 def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
343 creds = self.insta_creds(template=self.get_credentials(),
344 kerberos_state=DONT_USE_KERBEROS)
345 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
348 creds, "dnsserver", "sign", "SIGN",
349 self.rpc_ncacn_ip_tcp_ntlm_check)
351 def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
352 creds = self.insta_creds(template=self.get_credentials(),
353 kerberos_state=MUST_USE_KERBEROS)
354 self._test_rpc_ncacn_ip_tcp(["krb5",
356 "ENC-TS Pre-authentication",
357 "ENC-TS Pre-authentication"],
358 creds, "dnsserver", "sign", "SIGN",
359 self.rpc_ncacn_ip_tcp_krb5_check)
361 def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
362 creds = self.insta_creds(template=self.get_credentials(),
363 kerberos_state=DONT_USE_KERBEROS)
364 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
367 creds, "dnsserver", "", "SIGN",
368 self.rpc_ncacn_ip_tcp_ntlm_check)
370 def test_rpc_ncacn_ip_tcp_krb5_dns(self):
371 creds = self.insta_creds(template=self.get_credentials(),
372 kerberos_state=MUST_USE_KERBEROS)
373 self._test_rpc_ncacn_ip_tcp(["krb5",
375 "ENC-TS Pre-authentication",
376 "ENC-TS Pre-authentication"],
377 creds, "dnsserver", "", "SIGN",
378 self.rpc_ncacn_ip_tcp_krb5_check)
380 def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
381 creds = self.insta_creds(template=self.get_credentials(),
382 kerberos_state=DONT_USE_KERBEROS)
383 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
386 creds, "dnsserver", "connect", "NONE",
387 self.rpc_ncacn_ip_tcp_ntlm_check)
389 def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
390 creds = self.insta_creds(template=self.get_credentials(),
391 kerberos_state=MUST_USE_KERBEROS)
392 self._test_rpc_ncacn_ip_tcp(["krb5",
394 "ENC-TS Pre-authentication",
395 "ENC-TS Pre-authentication"],
396 creds, "dnsserver", "connect", "NONE",
397 self.rpc_ncacn_ip_tcp_krb5_check)
399 def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
400 creds = self.insta_creds(template=self.get_credentials(),
401 kerberos_state=DONT_USE_KERBEROS)
402 self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
405 creds, "dnsserver", "seal", "SEAL",
406 self.rpc_ncacn_ip_tcp_ntlm_check)
408 def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
409 creds = self.insta_creds(template=self.get_credentials(),
410 kerberos_state=MUST_USE_KERBEROS)
411 self._test_rpc_ncacn_ip_tcp(["krb5",
413 "ENC-TS Pre-authentication",
414 "ENC-TS Pre-authentication"],
415 creds, "dnsserver", "seal", "SEAL",
416 self.rpc_ncacn_ip_tcp_krb5_check)
420 def isLastExpectedMessage(msg):
421 return (msg["type"] == "Authorization" and
422 msg["Authorization"]["serviceDescription"] == "LDAP" and
423 msg["Authorization"]["transportProtection"] == "SIGN" and
424 msg["Authorization"]["authType"] == "krb5")
426 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
427 lp=self.get_loadparm(),
428 credentials=self.get_credentials())
430 messages = self.waitForMessages(isLastExpectedMessage)
433 "Did not receive the expected number of messages")
435 # Check the first message it should be an Authentication
437 self.assertEquals("Authentication", msg["type"])
438 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
439 self.assertEquals("Kerberos KDC",
440 msg["Authentication"]["serviceDescription"])
441 self.assertEquals("ENC-TS Pre-authentication",
442 msg["Authentication"]["authDescription"])
443 self.assertTrue(msg["Authentication"]["duration"] > 0)
445 # Check the second message it should be an Authentication
447 self.assertEquals("Authentication", msg["type"])
448 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
449 self.assertEquals("Kerberos KDC",
450 msg["Authentication"]["serviceDescription"])
451 self.assertEquals("ENC-TS Pre-authentication",
452 msg["Authentication"]["authDescription"])
453 self.assertTrue(msg["Authentication"]["duration"] > 0)
455 def test_ldap_ntlm(self):
457 def isLastExpectedMessage(msg):
458 return (msg["type"] == "Authorization" and
459 msg["Authorization"]["serviceDescription"] == "LDAP" and
460 msg["Authorization"]["transportProtection"] == "SEAL" and
461 msg["Authorization"]["authType"] == "NTLMSSP")
463 self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
464 lp=self.get_loadparm(),
465 credentials=self.get_credentials())
467 messages = self.waitForMessages(isLastExpectedMessage)
470 "Did not receive the expected number of messages")
471 # Check the first message it should be an Authentication
473 self.assertEquals("Authentication", msg["type"])
474 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
475 self.assertEquals("LDAP",
476 msg["Authentication"]["serviceDescription"])
477 self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
478 self.assertTrue(msg["Authentication"]["duration"] > 0)
480 def test_ldap_simple_bind(self):
481 def isLastExpectedMessage(msg):
482 return (msg["type"] == "Authorization" and
483 msg["Authorization"]["serviceDescription"] == "LDAP" and
484 msg["Authorization"]["transportProtection"] == "TLS" and
485 msg["Authorization"]["authType"] == "simple bind")
487 creds = self.insta_creds(template=self.get_credentials())
488 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
489 creds.get_username()))
491 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
492 lp=self.get_loadparm(),
495 messages = self.waitForMessages(isLastExpectedMessage)
498 "Did not receive the expected number of messages")
500 # Check the first message it should be an Authentication
502 self.assertEquals("Authentication", msg["type"])
503 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
504 self.assertEquals("LDAP",
505 msg["Authentication"]["serviceDescription"])
506 self.assertEquals("simple bind",
507 msg["Authentication"]["authDescription"])
509 def test_ldap_simple_bind_bad_password(self):
510 def isLastExpectedMessage(msg):
511 return (msg["type"] == "Authentication" and
512 msg["Authentication"]["serviceDescription"] == "LDAP" and
513 (msg["Authentication"]["status"] ==
514 "NT_STATUS_WRONG_PASSWORD") and
515 msg["Authentication"]["authDescription"] == "simple bind")
517 creds = self.insta_creds(template=self.get_credentials())
518 creds.set_password("badPassword")
519 creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
520 creds.get_username()))
524 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
525 lp=self.get_loadparm(),
529 self.assertEquals(thrown, True)
531 messages = self.waitForMessages(isLastExpectedMessage)
534 "Did not receive the expected number of messages")
536 def test_ldap_simple_bind_bad_user(self):
537 def isLastExpectedMessage(msg):
538 return (msg["type"] == "Authentication" and
539 msg["Authentication"]["serviceDescription"] == "LDAP" and
540 (msg["Authentication"]["status"] ==
541 "NT_STATUS_NO_SUCH_USER") and
542 msg["Authentication"]["authDescription"] == "simple bind")
544 creds = self.insta_creds(template=self.get_credentials())
545 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
549 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
550 lp=self.get_loadparm(),
554 self.assertEquals(thrown, True)
556 messages = self.waitForMessages(isLastExpectedMessage)
559 "Did not receive the expected number of messages")
561 def test_ldap_simple_bind_unparseable_user(self):
562 def isLastExpectedMessage(msg):
563 return (msg["type"] == "Authentication" and
564 msg["Authentication"]["serviceDescription"] == "LDAP" and
565 (msg["Authentication"]["status"] ==
566 "NT_STATUS_NO_SUCH_USER") and
567 msg["Authentication"]["authDescription"] == "simple bind")
569 creds = self.insta_creds(template=self.get_credentials())
570 creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
574 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
575 lp=self.get_loadparm(),
579 self.assertEquals(thrown, True)
581 messages = self.waitForMessages(isLastExpectedMessage)
584 "Did not receive the expected number of messages")
587 # Note: as this test does not expect any messages it will
588 # time out in the call to self.waitForMessages.
589 # This is expected, but it will slow this test.
590 def test_ldap_anonymous_access_bind_only(self):
591 # Should be no logging for anonymous bind
592 # so receiving any message indicates a failure.
593 def isLastExpectedMessage(msg):
596 creds = self.insta_creds(template=self.get_credentials())
597 creds.set_anonymous()
599 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
600 lp=self.get_loadparm(),
603 messages = self.waitForMessages(isLastExpectedMessage)
606 "Did not receive the expected number of messages")
608 def test_ldap_anonymous_access(self):
609 def isLastExpectedMessage(msg):
610 return (msg["type"] == "Authorization" and
611 msg["Authorization"]["serviceDescription"] == "LDAP" and
612 msg["Authorization"]["transportProtection"] == "TLS" and
613 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
614 msg["Authorization"]["authType"] == "no bind")
616 creds = self.insta_creds(template=self.get_credentials())
617 creds.set_anonymous()
619 self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
620 lp=self.get_loadparm(),
624 self.samdb.search(base=self.samdb.domain_dn())
625 self.fail("Expected an LdbError exception")
629 messages = self.waitForMessages(isLastExpectedMessage)
632 "Did not receive the expected number of messages")
635 def isLastExpectedMessage(msg):
636 return (msg["type"] == "Authorization" and
637 msg["Authorization"]["serviceDescription"] == "SMB" and
638 msg["Authorization"]["authType"] == "krb5" and
639 msg["Authorization"]["transportProtection"] == "SMB")
641 creds = self.insta_creds(template=self.get_credentials())
644 lp=self.get_loadparm(),
647 messages = self.waitForMessages(isLastExpectedMessage)
650 "Did not receive the expected number of messages")
651 # Check the first message it should be an Authentication
653 self.assertEquals("Authentication", msg["type"])
654 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
655 self.assertEquals("Kerberos KDC",
656 msg["Authentication"]["serviceDescription"])
657 self.assertEquals("ENC-TS Pre-authentication",
658 msg["Authentication"]["authDescription"])
660 # Check the second message it should be an Authentication
662 self.assertEquals("Authentication", msg["type"])
663 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
664 self.assertEquals("Kerberos KDC",
665 msg["Authentication"]["serviceDescription"])
666 self.assertEquals("ENC-TS Pre-authentication",
667 msg["Authentication"]["authDescription"])
669 def test_smb_bad_password(self):
670 def isLastExpectedMessage(msg):
671 return (msg["type"] == "Authentication" and
672 (msg["Authentication"]["serviceDescription"] ==
674 (msg["Authentication"]["status"] ==
675 "NT_STATUS_WRONG_PASSWORD") and
676 (msg["Authentication"]["authDescription"] ==
677 "ENC-TS Pre-authentication"))
679 creds = self.insta_creds(template=self.get_credentials())
680 creds.set_password("badPassword")
686 lp=self.get_loadparm(),
688 except NTSTATUSError:
690 self.assertEquals(thrown, True)
692 messages = self.waitForMessages(isLastExpectedMessage)
695 "Did not receive the expected number of messages")
697 def test_smb_bad_user(self):
698 def isLastExpectedMessage(msg):
699 return (msg["type"] == "Authentication" and
700 (msg["Authentication"]["serviceDescription"] ==
702 (msg["Authentication"]["status"] ==
703 "NT_STATUS_NO_SUCH_USER") and
704 (msg["Authentication"]["authDescription"] ==
705 "ENC-TS Pre-authentication"))
707 creds = self.insta_creds(template=self.get_credentials())
708 creds.set_username("badUser")
714 lp=self.get_loadparm(),
716 except NTSTATUSError:
718 self.assertEquals(thrown, True)
720 messages = self.waitForMessages(isLastExpectedMessage)
723 "Did not receive the expected number of messages")
725 def test_smb1_anonymous(self):
726 def isLastExpectedMessage(msg):
727 return (msg["type"] == "Authorization" and
728 msg["Authorization"]["serviceDescription"] == "SMB" and
729 msg["Authorization"]["authType"] == "NTLMSSP" and
730 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
731 msg["Authorization"]["transportProtection"] == "SMB")
733 server = os.environ["SERVER"]
735 path = "//%s/IPC$" % server
737 call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
739 messages = self.waitForMessages(isLastExpectedMessage)
742 "Did not receive the expected number of messages")
744 # Check the first message it should be an Authentication
746 self.assertEquals("Authentication", msg["type"])
747 self.assertEquals("NT_STATUS_NO_SUCH_USER",
748 msg["Authentication"]["status"])
749 self.assertEquals("SMB",
750 msg["Authentication"]["serviceDescription"])
751 self.assertEquals("NTLMSSP",
752 msg["Authentication"]["authDescription"])
753 self.assertEquals("No-Password",
754 msg["Authentication"]["passwordType"])
756 # Check the second message it should be an Authentication
758 self.assertEquals("Authentication", msg["type"])
759 self.assertEquals("NT_STATUS_OK",
760 msg["Authentication"]["status"])
761 self.assertEquals("SMB",
762 msg["Authentication"]["serviceDescription"])
763 self.assertEquals("NTLMSSP",
764 msg["Authentication"]["authDescription"])
765 self.assertEquals("No-Password",
766 msg["Authentication"]["passwordType"])
767 self.assertEquals("ANONYMOUS LOGON",
768 msg["Authentication"]["becameAccount"])
770 def test_smb2_anonymous(self):
771 def isLastExpectedMessage(msg):
772 return (msg["type"] == "Authorization" and
773 msg["Authorization"]["serviceDescription"] == "SMB2" and
774 msg["Authorization"]["authType"] == "NTLMSSP" and
775 msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
776 msg["Authorization"]["transportProtection"] == "SMB")
778 server = os.environ["SERVER"]
780 path = "//%s/IPC$" % server
782 call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
784 messages = self.waitForMessages(isLastExpectedMessage)
787 "Did not receive the expected number of messages")
789 # Check the first message it should be an Authentication
791 self.assertEquals("Authentication", msg["type"])
792 self.assertEquals("NT_STATUS_NO_SUCH_USER",
793 msg["Authentication"]["status"])
794 self.assertEquals("SMB2",
795 msg["Authentication"]["serviceDescription"])
796 self.assertEquals("NTLMSSP",
797 msg["Authentication"]["authDescription"])
798 self.assertEquals("No-Password",
799 msg["Authentication"]["passwordType"])
801 # Check the second message it should be an Authentication
803 self.assertEquals("Authentication", msg["type"])
804 self.assertEquals("NT_STATUS_OK",
805 msg["Authentication"]["status"])
806 self.assertEquals("SMB2",
807 msg["Authentication"]["serviceDescription"])
808 self.assertEquals("NTLMSSP",
809 msg["Authentication"]["authDescription"])
810 self.assertEquals("No-Password",
811 msg["Authentication"]["passwordType"])
812 self.assertEquals("ANONYMOUS LOGON",
813 msg["Authentication"]["becameAccount"])
815 def test_smb_no_krb_spnego(self):
816 def isLastExpectedMessage(msg):
817 return (msg["type"] == "Authorization" and
818 msg["Authorization"]["serviceDescription"] == "SMB" and
819 msg["Authorization"]["authType"] == "NTLMSSP" and
820 msg["Authorization"]["transportProtection"] == "SMB")
822 creds = self.insta_creds(template=self.get_credentials(),
823 kerberos_state=DONT_USE_KERBEROS)
826 lp=self.get_loadparm(),
829 messages = self.waitForMessages(isLastExpectedMessage)
832 "Did not receive the expected number of messages")
833 # Check the first message it should be an Authentication
835 self.assertEquals("Authentication", msg["type"])
836 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
837 self.assertEquals("SMB",
838 msg["Authentication"]["serviceDescription"])
839 self.assertEquals("NTLMSSP",
840 msg["Authentication"]["authDescription"])
841 self.assertEquals("NTLMv2",
842 msg["Authentication"]["passwordType"])
844 def test_smb_no_krb_spnego_bad_password(self):
845 def isLastExpectedMessage(msg):
846 return (msg["type"] == "Authentication" and
847 msg["Authentication"]["serviceDescription"] == "SMB" and
848 msg["Authentication"]["authDescription"] == "NTLMSSP" and
849 msg["Authentication"]["passwordType"] == "NTLMv2" and
850 (msg["Authentication"]["status"] ==
851 "NT_STATUS_WRONG_PASSWORD"))
853 creds = self.insta_creds(template=self.get_credentials(),
854 kerberos_state=DONT_USE_KERBEROS)
855 creds.set_password("badPassword")
861 lp=self.get_loadparm(),
863 except NTSTATUSError:
865 self.assertEquals(thrown, True)
867 messages = self.waitForMessages(isLastExpectedMessage)
870 "Did not receive the expected number of messages")
872 def test_smb_no_krb_spnego_bad_user(self):
873 def isLastExpectedMessage(msg):
874 return (msg["type"] == "Authentication" and
875 msg["Authentication"]["serviceDescription"] == "SMB" and
876 msg["Authentication"]["authDescription"] == "NTLMSSP" and
877 msg["Authentication"]["passwordType"] == "NTLMv2" and
878 (msg["Authentication"]["status"] ==
879 "NT_STATUS_NO_SUCH_USER"))
881 creds = self.insta_creds(template=self.get_credentials(),
882 kerberos_state=DONT_USE_KERBEROS)
883 creds.set_username("badUser")
889 lp=self.get_loadparm(),
891 except NTSTATUSError:
893 self.assertEquals(thrown, True)
895 messages = self.waitForMessages(isLastExpectedMessage)
898 "Did not receive the expected number of messages")
900 def test_smb_no_krb_no_spnego_no_ntlmv2(self):
901 def isLastExpectedMessage(msg):
902 return (msg["type"] == "Authorization" and
903 msg["Authorization"]["serviceDescription"] == "SMB" and
904 msg["Authorization"]["authType"] == "bare-NTLM" and
905 msg["Authorization"]["transportProtection"] == "SMB")
907 creds = self.insta_creds(template=self.get_credentials(),
908 kerberos_state=DONT_USE_KERBEROS)
911 lp=self.get_loadparm(),
916 messages = self.waitForMessages(isLastExpectedMessage)
919 "Did not receive the expected number of messages")
920 # Check the first message it should be an Authentication
922 self.assertEquals("Authentication", msg["type"])
923 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
924 self.assertEquals("SMB",
925 msg["Authentication"]["serviceDescription"])
926 self.assertEquals("bare-NTLM",
927 msg["Authentication"]["authDescription"])
928 self.assertEquals("NTLMv1",
929 msg["Authentication"]["passwordType"])
931 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
932 def isLastExpectedMessage(msg):
933 return (msg["type"] == "Authentication" and
934 msg["Authentication"]["serviceDescription"] == "SMB" and
935 msg["Authentication"]["authDescription"] == "bare-NTLM" and
936 msg["Authentication"]["passwordType"] == "NTLMv1" and
937 (msg["Authentication"]["status"] ==
938 "NT_STATUS_WRONG_PASSWORD"))
940 creds = self.insta_creds(template=self.get_credentials(),
941 kerberos_state=DONT_USE_KERBEROS)
942 creds.set_password("badPassword")
948 lp=self.get_loadparm(),
952 except NTSTATUSError:
954 self.assertEquals(thrown, True)
956 messages = self.waitForMessages(isLastExpectedMessage)
959 "Did not receive the expected number of messages")
961 def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
962 def isLastExpectedMessage(msg):
963 return (msg["type"] == "Authentication" and
964 msg["Authentication"]["serviceDescription"] == "SMB" and
965 msg["Authentication"]["authDescription"] == "bare-NTLM" and
966 msg["Authentication"]["passwordType"] == "NTLMv1" and
967 (msg["Authentication"]["status"] ==
968 "NT_STATUS_NO_SUCH_USER"))
970 creds = self.insta_creds(template=self.get_credentials(),
971 kerberos_state=DONT_USE_KERBEROS)
972 creds.set_username("badUser")
978 lp=self.get_loadparm(),
982 except NTSTATUSError:
984 self.assertEquals(thrown, True)
986 messages = self.waitForMessages(isLastExpectedMessage)
989 "Did not receive the expected number of messages")
991 def test_samlogon_interactive(self):
993 workstation = "AuthLogTests"
995 def isLastExpectedMessage(msg):
996 return (msg["type"] == "Authentication" and
997 (msg["Authentication"]["serviceDescription"] ==
999 (msg["Authentication"]["authDescription"] ==
1001 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1002 (msg["Authentication"]["workstation"] ==
1003 r"\\%s" % workstation))
1005 server = os.environ["SERVER"]
1006 user = os.environ["USERNAME"]
1007 password = os.environ["PASSWORD"]
1008 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1010 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1012 messages = self.waitForMessages(isLastExpectedMessage)
1013 messages = self.remove_netlogon_messages(messages)
1014 received = len(messages)
1016 (received == 5 or received == 6),
1017 "Did not receive the expected number of messages")
1019 def test_samlogon_interactive_bad_password(self):
1021 workstation = "AuthLogTests"
1023 def isLastExpectedMessage(msg):
1024 return (msg["type"] == "Authentication" and
1025 (msg["Authentication"]["serviceDescription"] ==
1027 (msg["Authentication"]["authDescription"] ==
1029 (msg["Authentication"]["status"] ==
1030 "NT_STATUS_WRONG_PASSWORD") and
1031 (msg["Authentication"]["workstation"] ==
1032 r"\\%s" % workstation))
1034 server = os.environ["SERVER"]
1035 user = os.environ["USERNAME"]
1036 password = "badPassword"
1037 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1039 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1041 messages = self.waitForMessages(isLastExpectedMessage)
1042 messages = self.remove_netlogon_messages(messages)
1043 received = len(messages)
1045 (received == 5 or received == 6),
1046 "Did not receive the expected number of messages")
1048 def test_samlogon_interactive_bad_user(self):
1050 workstation = "AuthLogTests"
1052 def isLastExpectedMessage(msg):
1053 return (msg["type"] == "Authentication" and
1054 (msg["Authentication"]["serviceDescription"] ==
1056 (msg["Authentication"]["authDescription"] ==
1058 (msg["Authentication"]["status"] ==
1059 "NT_STATUS_NO_SUCH_USER") and
1060 (msg["Authentication"]["workstation"] ==
1061 r"\\%s" % workstation))
1063 server = os.environ["SERVER"]
1065 password = os.environ["PASSWORD"]
1066 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1068 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1070 messages = self.waitForMessages(isLastExpectedMessage)
1071 messages = self.remove_netlogon_messages(messages)
1072 received = len(messages)
1074 (received == 5 or received == 6),
1075 "Did not receive the expected number of messages")
1077 def test_samlogon_network(self):
1079 workstation = "AuthLogTests"
1081 def isLastExpectedMessage(msg):
1082 return (msg["type"] == "Authentication" and
1083 (msg["Authentication"]["serviceDescription"] ==
1085 msg["Authentication"]["authDescription"] == "network" and
1086 msg["Authentication"]["status"] == "NT_STATUS_OK" and
1087 (msg["Authentication"]["workstation"] ==
1088 r"\\%s" % workstation))
1090 server = os.environ["SERVER"]
1091 user = os.environ["USERNAME"]
1092 password = os.environ["PASSWORD"]
1093 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1095 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1097 messages = self.waitForMessages(isLastExpectedMessage)
1098 messages = self.remove_netlogon_messages(messages)
1099 received = len(messages)
1101 (received == 5 or received == 6),
1102 "Did not receive the expected number of messages")
1104 def test_samlogon_network_bad_password(self):
1106 workstation = "AuthLogTests"
1108 def isLastExpectedMessage(msg):
1109 return (msg["type"] == "Authentication" and
1110 (msg["Authentication"]["serviceDescription"] ==
1112 msg["Authentication"]["authDescription"] == "network" and
1113 (msg["Authentication"]["status"] ==
1114 "NT_STATUS_WRONG_PASSWORD") and
1115 (msg["Authentication"]["workstation"] ==
1116 r"\\%s" % workstation))
1118 server = os.environ["SERVER"]
1119 user = os.environ["USERNAME"]
1120 password = "badPassword"
1121 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1123 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1125 messages = self.waitForMessages(isLastExpectedMessage)
1126 messages = self.remove_netlogon_messages(messages)
1127 received = len(messages)
1129 (received == 5 or received == 6),
1130 "Did not receive the expected number of messages")
1132 def test_samlogon_network_bad_user(self):
1134 workstation = "AuthLogTests"
1136 def isLastExpectedMessage(msg):
1137 return ((msg["type"] == "Authentication") and
1138 (msg["Authentication"]["serviceDescription"] ==
1140 (msg["Authentication"]["authDescription"] == "network") and
1141 (msg["Authentication"]["status"] ==
1142 "NT_STATUS_NO_SUCH_USER") and
1143 (msg["Authentication"]["workstation"] ==
1144 r"\\%s" % workstation))
1146 server = os.environ["SERVER"]
1148 password = os.environ["PASSWORD"]
1149 samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1151 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1153 messages = self.waitForMessages(isLastExpectedMessage)
1154 messages = self.remove_netlogon_messages(messages)
1155 received = len(messages)
1157 (received == 5 or received == 6),
1158 "Did not receive the expected number of messages")
1160 def test_samlogon_network_mschap(self):
1162 workstation = "AuthLogTests"
1164 def isLastExpectedMessage(msg):
1165 return ((msg["type"] == "Authentication") and
1166 (msg["Authentication"]["serviceDescription"] ==
1168 (msg["Authentication"]["authDescription"] == "network") and
1169 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1170 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1171 (msg["Authentication"]["workstation"] ==
1172 r"\\%s" % workstation))
1174 server = os.environ["SERVER"]
1175 user = os.environ["USERNAME"]
1176 password = os.environ["PASSWORD"]
1177 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1178 user, password, workstation, 2)
1180 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1182 messages = self.waitForMessages(isLastExpectedMessage)
1183 messages = self.remove_netlogon_messages(messages)
1184 received = len(messages)
1186 (received == 5 or received == 6),
1187 "Did not receive the expected number of messages")
1189 def test_samlogon_network_mschap_bad_password(self):
1191 workstation = "AuthLogTests"
1193 def isLastExpectedMessage(msg):
1194 return ((msg["type"] == "Authentication") and
1195 (msg["Authentication"]["serviceDescription"] ==
1197 (msg["Authentication"]["authDescription"] == "network") and
1198 (msg["Authentication"]["status"] ==
1199 "NT_STATUS_WRONG_PASSWORD") and
1200 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1201 (msg["Authentication"]["workstation"] ==
1202 r"\\%s" % workstation))
1204 server = os.environ["SERVER"]
1205 user = os.environ["USERNAME"]
1206 password = "badPassword"
1207 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1208 user, password, workstation, 2)
1210 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1212 messages = self.waitForMessages(isLastExpectedMessage)
1213 messages = self.remove_netlogon_messages(messages)
1214 received = len(messages)
1216 (received == 5 or received == 6),
1217 "Did not receive the expected number of messages")
1219 def test_samlogon_network_mschap_bad_user(self):
1221 workstation = "AuthLogTests"
1223 def isLastExpectedMessage(msg):
1224 return ((msg["type"] == "Authentication") and
1225 (msg["Authentication"]["serviceDescription"] ==
1227 (msg["Authentication"]["authDescription"] == "network") and
1228 (msg["Authentication"]["status"] ==
1229 "NT_STATUS_NO_SUCH_USER") and
1230 (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1231 (msg["Authentication"]["workstation"] ==
1232 r"\\%s" % workstation))
1234 server = os.environ["SERVER"]
1236 password = os.environ["PASSWORD"]
1237 samlogon = "samlogon %s %s %s %d 0x00010000" % (
1238 user, password, workstation, 2)
1240 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1242 messages = self.waitForMessages(isLastExpectedMessage)
1243 messages = self.remove_netlogon_messages(messages)
1244 received = len(messages)
1246 (received == 5 or received == 6),
1247 "Did not receive the expected number of messages")
1249 def test_samlogon_schannel_seal(self):
1251 workstation = "AuthLogTests"
1253 def isLastExpectedMessage(msg):
1254 return ((msg["type"] == "Authentication") and
1255 (msg["Authentication"]["serviceDescription"] ==
1257 (msg["Authentication"]["authDescription"] == "network") and
1258 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1259 (msg["Authentication"]["workstation"] ==
1260 r"\\%s" % workstation))
1262 server = os.environ["SERVER"]
1263 user = os.environ["USERNAME"]
1264 password = os.environ["PASSWORD"]
1265 samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1267 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1269 messages = self.waitForMessages(isLastExpectedMessage)
1270 messages = self.remove_netlogon_messages(messages)
1271 received = len(messages)
1273 (received == 5 or received == 6),
1274 "Did not receive the expected number of messages")
1276 # Check the second to last message it should be an Authorization
1278 self.assertEquals("Authorization", msg["type"])
1279 self.assertEquals("DCE/RPC",
1280 msg["Authorization"]["serviceDescription"])
1281 self.assertEquals("schannel", msg["Authorization"]["authType"])
1282 self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1283 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1285 # Signed logons get promoted to sealed, this test ensures that
1286 # this behaviour is not removed accidentally
1287 def test_samlogon_schannel_sign(self):
1289 workstation = "AuthLogTests"
1291 def isLastExpectedMessage(msg):
1292 return ((msg["type"] == "Authentication") and
1293 (msg["Authentication"]["serviceDescription"] ==
1295 (msg["Authentication"]["authDescription"] == "network") and
1296 (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1297 (msg["Authentication"]["workstation"] ==
1298 r"\\%s" % workstation))
1300 server = os.environ["SERVER"]
1301 user = os.environ["USERNAME"]
1302 password = os.environ["PASSWORD"]
1303 samlogon = "schannelsign;samlogon %s %s %s" % (
1304 user, password, workstation)
1306 call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1308 messages = self.waitForMessages(isLastExpectedMessage)
1309 messages = self.remove_netlogon_messages(messages)
1310 received = len(messages)
1312 (received == 5 or received == 6),
1313 "Did not receive the expected number of messages")
1315 # Check the second to last message it should be an Authorization
1317 self.assertEquals("Authorization", msg["type"])
1318 self.assertEquals("DCE/RPC",
1319 msg["Authorization"]["serviceDescription"])
1320 self.assertEquals("schannel", msg["Authorization"]["authType"])
1321 self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1322 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))