PEP8: fix E128: continuation line under-indented for visual indent
[metze/samba/wip.git] / python / samba / tests / auth_log.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
20 """
21 import samba.tests
22 from samba.dcerpc import srvsvc, dnsserver
23 import os
24 from samba import smb
25 from samba.samdb import SamDB
26 import samba.tests.auth_log_base
27 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
28 from samba import NTSTATUSError
29 from subprocess import call
30 from ldb import LdbError
31 import re
32
33
34 class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
35
36     def setUp(self):
37         super(AuthLogTests, self).setUp()
38         self.remoteAddress = os.environ["CLIENT_IP"]
39
40     def tearDown(self):
41         super(AuthLogTests, self).tearDown()
42
43     def _test_rpc_ncacn_np(self, authTypes, creds, service,
44                            binding, protection, checkFunction):
45         def isLastExpectedMessage(msg):
46             return (msg["type"] == "Authorization" and
47                     (msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
48                      msg["Authorization"]["serviceDescription"] == service) and
49                     msg["Authorization"]["authType"] == authTypes[0] and
50                     msg["Authorization"]["transportProtection"] == protection)
51
52         if binding:
53             binding = "[%s]" % binding
54
55         if service == "dnsserver":
56             x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
57                                     self.get_loadparm(),
58                                     creds)
59         elif service == "srvsvc":
60             x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
61                               self.get_loadparm(),
62                               creds)
63
64         # The connection is passed to ensure the server
65         # messaging context stays up until all the messages have been received.
66         messages = self.waitForMessages(isLastExpectedMessage, x)
67         checkFunction(messages, authTypes, service, binding, protection)
68
69     def _assert_ncacn_np_serviceDescription(self, binding, serviceDescription):
70         # Turn "[foo,bar]" into a list ("foo", "bar") to test
71         # lambda x: x removes anything that evaluates to False,
72         # including empty strings, so we handle "" as well
73         binding_list = filter(lambda x: x, re.compile('[\[,\]]').split(binding))
74
75         # Handle explicit smb2, smb1 or auto negotiation
76         if "smb2" in binding_list:
77             self.assertEquals(serviceDescription, "SMB2")
78         elif "smb1" in binding_list:
79             self.assertEquals(serviceDescription, "SMB")
80         else:
81             self.assertIn(serviceDescription, ["SMB", "SMB2"])
82
83     def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
84                                 binding, protection):
85
86         expected_messages = len(authTypes)
87         self.assertEquals(expected_messages,
88                           len(messages),
89                           "Did not receive the expected number of messages")
90
91         # Check the first message it should be an Authentication
92         msg = messages[0]
93         self.assertEquals("Authentication", msg["type"])
94         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
95         self._assert_ncacn_np_serviceDescription(binding,
96                                                  msg["Authentication"]["serviceDescription"])
97         self.assertEquals(authTypes[1],
98                           msg["Authentication"]["authDescription"])
99
100         # Check the second message it should be an Authorization
101         msg = messages[1]
102         self.assertEquals("Authorization", msg["type"])
103         self._assert_ncacn_np_serviceDescription(binding,
104                                                  msg["Authorization"]["serviceDescription"])
105         self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
106         self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
107         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
108
109         # Check the third message it should be an Authentication
110         # if we are expecting 4 messages
111         if expected_messages == 4:
112             def checkServiceDescription(desc):
113                 return (desc == "DCE/RPC" or desc == service)
114
115             msg = messages[2]
116             self.assertEquals("Authentication", msg["type"])
117             self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
118             self.assertTrue(
119                 checkServiceDescription(
120                     msg["Authentication"]["serviceDescription"]))
121
122             self.assertEquals(authTypes[3],
123                               msg["Authentication"]["authDescription"])
124
125     def rpc_ncacn_np_krb5_check(
126             self,
127             messages,
128             authTypes,
129             service,
130             binding,
131             protection):
132
133         expected_messages = len(authTypes)
134         self.assertEquals(expected_messages,
135                           len(messages),
136                           "Did not receive the expected number of messages")
137
138         # Check the first message it should be an Authentication
139         # This is almost certainly Authentication over UDP, and is probably
140         # returning message too big,
141         msg = messages[0]
142         self.assertEquals("Authentication", msg["type"])
143         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
144         self.assertEquals("Kerberos KDC",
145                           msg["Authentication"]["serviceDescription"])
146         self.assertEquals(authTypes[1],
147                           msg["Authentication"]["authDescription"])
148
149         # Check the second message it should be an Authentication
150         # This this the TCP Authentication in response to the message too big
151         # response to the UDP Authentication
152         msg = messages[1]
153         self.assertEquals("Authentication", msg["type"])
154         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
155         self.assertEquals("Kerberos KDC",
156                           msg["Authentication"]["serviceDescription"])
157         self.assertEquals(authTypes[2],
158                           msg["Authentication"]["authDescription"])
159
160         # Check the third message it should be an Authorization
161         msg = messages[2]
162         self.assertEquals("Authorization", msg["type"])
163         self._assert_ncacn_np_serviceDescription(binding,
164                                                  msg["Authorization"]["serviceDescription"])
165         self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
166         self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
167         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
168
169     def test_rpc_ncacn_np_ntlm_dns_sign(self):
170         creds = self.insta_creds(template=self.get_credentials(),
171                                  kerberos_state=DONT_USE_KERBEROS)
172         self._test_rpc_ncacn_np(["NTLMSSP",
173                                  "NTLMSSP",
174                                  "NTLMSSP",
175                                  "NTLMSSP"],
176                                 creds, "dnsserver", "sign", "SIGN",
177                                 self.rpc_ncacn_np_ntlm_check)
178
179     def test_rpc_ncacn_np_ntlm_srv_sign(self):
180         creds = self.insta_creds(template=self.get_credentials(),
181                                  kerberos_state=DONT_USE_KERBEROS)
182         self._test_rpc_ncacn_np(["NTLMSSP",
183                                  "NTLMSSP",
184                                  "NTLMSSP",
185                                  "NTLMSSP"],
186                                 creds, "srvsvc", "sign", "SIGN",
187                                 self.rpc_ncacn_np_ntlm_check)
188
189     def test_rpc_ncacn_np_ntlm_dns(self):
190         creds = self.insta_creds(template=self.get_credentials(),
191                                  kerberos_state=DONT_USE_KERBEROS)
192         self._test_rpc_ncacn_np(["ncacn_np",
193                                  "NTLMSSP",
194                                  "NTLMSSP"],
195                                 creds, "dnsserver", "", "SMB",
196                                 self.rpc_ncacn_np_ntlm_check)
197
198     def test_rpc_ncacn_np_ntlm_srv(self):
199         creds = self.insta_creds(template=self.get_credentials(),
200                                  kerberos_state=DONT_USE_KERBEROS)
201         self._test_rpc_ncacn_np(["ncacn_np",
202                                  "NTLMSSP",
203                                  "NTLMSSP"],
204                                 creds, "srvsvc", "", "SMB",
205                                 self.rpc_ncacn_np_ntlm_check)
206
207     def test_rpc_ncacn_np_krb_dns_sign(self):
208         creds = self.insta_creds(template=self.get_credentials(),
209                                  kerberos_state=MUST_USE_KERBEROS)
210         self._test_rpc_ncacn_np(["krb5",
211                                  "ENC-TS Pre-authentication",
212                                  "ENC-TS Pre-authentication",
213                                  "krb5"],
214                                 creds, "dnsserver", "sign", "SIGN",
215                                 self.rpc_ncacn_np_krb5_check)
216
217     def test_rpc_ncacn_np_krb_srv_sign(self):
218         creds = self.insta_creds(template=self.get_credentials(),
219                                  kerberos_state=MUST_USE_KERBEROS)
220         self._test_rpc_ncacn_np(["krb5",
221                                  "ENC-TS Pre-authentication",
222                                  "ENC-TS Pre-authentication",
223                                  "krb5"],
224                                 creds, "srvsvc", "sign", "SIGN",
225                                 self.rpc_ncacn_np_krb5_check)
226
227     def test_rpc_ncacn_np_krb_dns(self):
228         creds = self.insta_creds(template=self.get_credentials(),
229                                  kerberos_state=MUST_USE_KERBEROS)
230         self._test_rpc_ncacn_np(["ncacn_np",
231                                  "ENC-TS Pre-authentication",
232                                  "ENC-TS Pre-authentication",
233                                  "krb5"],
234                                 creds, "dnsserver", "", "SMB",
235                                 self.rpc_ncacn_np_krb5_check)
236
237     def test_rpc_ncacn_np_krb_dns_smb2(self):
238         creds = self.insta_creds(template=self.get_credentials(),
239                                  kerberos_state=MUST_USE_KERBEROS)
240         self._test_rpc_ncacn_np(["ncacn_np",
241                                  "ENC-TS Pre-authentication",
242                                  "ENC-TS Pre-authentication",
243                                  "krb5"],
244                                 creds, "dnsserver", "smb2", "SMB",
245                                 self.rpc_ncacn_np_krb5_check)
246
247     def test_rpc_ncacn_np_krb_srv(self):
248         creds = self.insta_creds(template=self.get_credentials(),
249                                  kerberos_state=MUST_USE_KERBEROS)
250         self._test_rpc_ncacn_np(["ncacn_np",
251                                  "ENC-TS Pre-authentication",
252                                  "ENC-TS Pre-authentication",
253                                  "krb5"],
254                                 creds, "srvsvc", "", "SMB",
255                                 self.rpc_ncacn_np_krb5_check)
256
257     def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
258                                binding, protection, checkFunction):
259         def isLastExpectedMessage(msg):
260             return (msg["type"] == "Authorization" and
261                     msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
262                     msg["Authorization"]["authType"] == authTypes[0] and
263                     msg["Authorization"]["transportProtection"] == protection)
264
265         if binding:
266             binding = "[%s]" % binding
267
268         if service == "dnsserver":
269             conn = dnsserver.dnsserver(
270                 "ncacn_ip_tcp:%s%s" % (self.server, binding),
271                 self.get_loadparm(),
272                 creds)
273         elif service == "srvsvc":
274             conn = srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
275                                  self.get_loadparm(),
276                                  creds)
277
278         messages = self.waitForMessages(isLastExpectedMessage, conn)
279         checkFunction(messages, authTypes, service, binding, protection)
280
281     def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
282                                     binding, protection):
283
284         expected_messages = len(authTypes)
285         self.assertEquals(expected_messages,
286                           len(messages),
287                           "Did not receive the expected number of messages")
288
289         # Check the first message it should be an Authorization
290         msg = messages[0]
291         self.assertEquals("Authorization", msg["type"])
292         self.assertEquals("DCE/RPC",
293                           msg["Authorization"]["serviceDescription"])
294         self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
295         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
296         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
297
298         # Check the second message it should be an Authentication
299         msg = messages[1]
300         self.assertEquals("Authentication", msg["type"])
301         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
302         self.assertEquals("DCE/RPC",
303                           msg["Authentication"]["serviceDescription"])
304         self.assertEquals(authTypes[2],
305                           msg["Authentication"]["authDescription"])
306
307     def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
308                                     binding, protection):
309
310         expected_messages = len(authTypes)
311         self.assertEquals(expected_messages,
312                           len(messages),
313                           "Did not receive the expected number of messages")
314
315         # Check the first message it should be an Authorization
316         msg = messages[0]
317         self.assertEquals("Authorization", msg["type"])
318         self.assertEquals("DCE/RPC",
319                           msg["Authorization"]["serviceDescription"])
320         self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
321         self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
322         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
323
324         # Check the second message it should be an Authentication
325         msg = messages[1]
326         self.assertEquals("Authentication", msg["type"])
327         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
328         self.assertEquals("Kerberos KDC",
329                           msg["Authentication"]["serviceDescription"])
330         self.assertEquals(authTypes[2],
331                           msg["Authentication"]["authDescription"])
332
333         # Check the third message it should be an Authentication
334         msg = messages[2]
335         self.assertEquals("Authentication", msg["type"])
336         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
337         self.assertEquals("Kerberos KDC",
338                           msg["Authentication"]["serviceDescription"])
339         self.assertEquals(authTypes[2],
340                           msg["Authentication"]["authDescription"])
341
342     def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
343         creds = self.insta_creds(template=self.get_credentials(),
344                                  kerberos_state=DONT_USE_KERBEROS)
345         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
346                                      "ncacn_ip_tcp",
347                                      "NTLMSSP"],
348                                     creds, "dnsserver", "sign", "SIGN",
349                                     self.rpc_ncacn_ip_tcp_ntlm_check)
350
351     def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
352         creds = self.insta_creds(template=self.get_credentials(),
353                                  kerberos_state=MUST_USE_KERBEROS)
354         self._test_rpc_ncacn_ip_tcp(["krb5",
355                                      "ncacn_ip_tcp",
356                                      "ENC-TS Pre-authentication",
357                                      "ENC-TS Pre-authentication"],
358                                     creds, "dnsserver", "sign", "SIGN",
359                                     self.rpc_ncacn_ip_tcp_krb5_check)
360
361     def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
362         creds = self.insta_creds(template=self.get_credentials(),
363                                  kerberos_state=DONT_USE_KERBEROS)
364         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
365                                      "ncacn_ip_tcp",
366                                      "NTLMSSP"],
367                                     creds, "dnsserver", "", "SIGN",
368                                     self.rpc_ncacn_ip_tcp_ntlm_check)
369
370     def test_rpc_ncacn_ip_tcp_krb5_dns(self):
371         creds = self.insta_creds(template=self.get_credentials(),
372                                  kerberos_state=MUST_USE_KERBEROS)
373         self._test_rpc_ncacn_ip_tcp(["krb5",
374                                      "ncacn_ip_tcp",
375                                      "ENC-TS Pre-authentication",
376                                      "ENC-TS Pre-authentication"],
377                                     creds, "dnsserver", "", "SIGN",
378                                     self.rpc_ncacn_ip_tcp_krb5_check)
379
380     def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
381         creds = self.insta_creds(template=self.get_credentials(),
382                                  kerberos_state=DONT_USE_KERBEROS)
383         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
384                                      "ncacn_ip_tcp",
385                                      "NTLMSSP"],
386                                     creds, "dnsserver", "connect", "NONE",
387                                     self.rpc_ncacn_ip_tcp_ntlm_check)
388
389     def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
390         creds = self.insta_creds(template=self.get_credentials(),
391                                  kerberos_state=MUST_USE_KERBEROS)
392         self._test_rpc_ncacn_ip_tcp(["krb5",
393                                      "ncacn_ip_tcp",
394                                      "ENC-TS Pre-authentication",
395                                      "ENC-TS Pre-authentication"],
396                                     creds, "dnsserver", "connect", "NONE",
397                                     self.rpc_ncacn_ip_tcp_krb5_check)
398
399     def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
400         creds = self.insta_creds(template=self.get_credentials(),
401                                  kerberos_state=DONT_USE_KERBEROS)
402         self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
403                                      "ncacn_ip_tcp",
404                                      "NTLMSSP"],
405                                     creds, "dnsserver", "seal", "SEAL",
406                                     self.rpc_ncacn_ip_tcp_ntlm_check)
407
408     def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
409         creds = self.insta_creds(template=self.get_credentials(),
410                                  kerberos_state=MUST_USE_KERBEROS)
411         self._test_rpc_ncacn_ip_tcp(["krb5",
412                                      "ncacn_ip_tcp",
413                                      "ENC-TS Pre-authentication",
414                                      "ENC-TS Pre-authentication"],
415                                     creds, "dnsserver", "seal", "SEAL",
416                                     self.rpc_ncacn_ip_tcp_krb5_check)
417
418     def test_ldap(self):
419
420         def isLastExpectedMessage(msg):
421             return (msg["type"] == "Authorization" and
422                     msg["Authorization"]["serviceDescription"] == "LDAP" and
423                     msg["Authorization"]["transportProtection"] == "SIGN" and
424                     msg["Authorization"]["authType"] == "krb5")
425
426         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
427                            lp=self.get_loadparm(),
428                            credentials=self.get_credentials())
429
430         messages = self.waitForMessages(isLastExpectedMessage)
431         self.assertEquals(3,
432                           len(messages),
433                           "Did not receive the expected number of messages")
434
435         # Check the first message it should be an Authentication
436         msg = messages[0]
437         self.assertEquals("Authentication", msg["type"])
438         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
439         self.assertEquals("Kerberos KDC",
440                           msg["Authentication"]["serviceDescription"])
441         self.assertEquals("ENC-TS Pre-authentication",
442                           msg["Authentication"]["authDescription"])
443         self.assertTrue(msg["Authentication"]["duration"] > 0)
444
445         # Check the second message it should be an Authentication
446         msg = messages[1]
447         self.assertEquals("Authentication", msg["type"])
448         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
449         self.assertEquals("Kerberos KDC",
450                           msg["Authentication"]["serviceDescription"])
451         self.assertEquals("ENC-TS Pre-authentication",
452                           msg["Authentication"]["authDescription"])
453         self.assertTrue(msg["Authentication"]["duration"] > 0)
454
455     def test_ldap_ntlm(self):
456
457         def isLastExpectedMessage(msg):
458             return (msg["type"] == "Authorization" and
459                     msg["Authorization"]["serviceDescription"] == "LDAP" and
460                     msg["Authorization"]["transportProtection"] == "SEAL" and
461                     msg["Authorization"]["authType"] == "NTLMSSP")
462
463         self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
464                            lp=self.get_loadparm(),
465                            credentials=self.get_credentials())
466
467         messages = self.waitForMessages(isLastExpectedMessage)
468         self.assertEquals(2,
469                           len(messages),
470                           "Did not receive the expected number of messages")
471         # Check the first message it should be an Authentication
472         msg = messages[0]
473         self.assertEquals("Authentication", msg["type"])
474         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
475         self.assertEquals("LDAP",
476                           msg["Authentication"]["serviceDescription"])
477         self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
478         self.assertTrue(msg["Authentication"]["duration"] > 0)
479
480     def test_ldap_simple_bind(self):
481         def isLastExpectedMessage(msg):
482             return (msg["type"] == "Authorization" and
483                     msg["Authorization"]["serviceDescription"] == "LDAP" and
484                     msg["Authorization"]["transportProtection"] == "TLS" and
485                     msg["Authorization"]["authType"] == "simple bind")
486
487         creds = self.insta_creds(template=self.get_credentials())
488         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
489                                       creds.get_username()))
490
491         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
492                            lp=self.get_loadparm(),
493                            credentials=creds)
494
495         messages = self.waitForMessages(isLastExpectedMessage)
496         self.assertEquals(2,
497                           len(messages),
498                           "Did not receive the expected number of messages")
499
500         # Check the first message it should be an Authentication
501         msg = messages[0]
502         self.assertEquals("Authentication", msg["type"])
503         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
504         self.assertEquals("LDAP",
505                           msg["Authentication"]["serviceDescription"])
506         self.assertEquals("simple bind",
507                           msg["Authentication"]["authDescription"])
508
509     def test_ldap_simple_bind_bad_password(self):
510         def isLastExpectedMessage(msg):
511             return (msg["type"] == "Authentication" and
512                     msg["Authentication"]["serviceDescription"] == "LDAP" and
513                     (msg["Authentication"]["status"] ==
514                         "NT_STATUS_WRONG_PASSWORD") and
515                     msg["Authentication"]["authDescription"] == "simple bind")
516
517         creds = self.insta_creds(template=self.get_credentials())
518         creds.set_password("badPassword")
519         creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
520                                       creds.get_username()))
521
522         thrown = False
523         try:
524             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
525                                lp=self.get_loadparm(),
526                                credentials=creds)
527         except LdbError:
528             thrown = True
529         self.assertEquals(thrown, True)
530
531         messages = self.waitForMessages(isLastExpectedMessage)
532         self.assertEquals(1,
533                           len(messages),
534                           "Did not receive the expected number of messages")
535
536     def test_ldap_simple_bind_bad_user(self):
537         def isLastExpectedMessage(msg):
538             return (msg["type"] == "Authentication" and
539                     msg["Authentication"]["serviceDescription"] == "LDAP" and
540                     (msg["Authentication"]["status"] ==
541                         "NT_STATUS_NO_SUCH_USER") and
542                     msg["Authentication"]["authDescription"] == "simple bind")
543
544         creds = self.insta_creds(template=self.get_credentials())
545         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "badUser"))
546
547         thrown = False
548         try:
549             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
550                                lp=self.get_loadparm(),
551                                credentials=creds)
552         except LdbError:
553             thrown = True
554         self.assertEquals(thrown, True)
555
556         messages = self.waitForMessages(isLastExpectedMessage)
557         self.assertEquals(1,
558                           len(messages),
559                           "Did not receive the expected number of messages")
560
561     def test_ldap_simple_bind_unparseable_user(self):
562         def isLastExpectedMessage(msg):
563             return (msg["type"] == "Authentication" and
564                     msg["Authentication"]["serviceDescription"] == "LDAP" and
565                     (msg["Authentication"]["status"] ==
566                         "NT_STATUS_NO_SUCH_USER") and
567                     msg["Authentication"]["authDescription"] == "simple bind")
568
569         creds = self.insta_creds(template=self.get_credentials())
570         creds.set_bind_dn("%s\\%s" % (creds.get_domain(), "abdcef"))
571
572         thrown = False
573         try:
574             self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
575                                lp=self.get_loadparm(),
576                                credentials=creds)
577         except LdbError:
578             thrown = True
579         self.assertEquals(thrown, True)
580
581         messages = self.waitForMessages(isLastExpectedMessage)
582         self.assertEquals(1,
583                           len(messages),
584                           "Did not receive the expected number of messages")
585
586     #
587     # Note: as this test does not expect any messages it will
588     #       time out in the call to self.waitForMessages.
589     #       This is expected, but it will slow this test.
590     def test_ldap_anonymous_access_bind_only(self):
591         # Should be no logging for anonymous bind
592         # so receiving any message indicates a failure.
593         def isLastExpectedMessage(msg):
594             return True
595
596         creds = self.insta_creds(template=self.get_credentials())
597         creds.set_anonymous()
598
599         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
600                            lp=self.get_loadparm(),
601                            credentials=creds)
602
603         messages = self.waitForMessages(isLastExpectedMessage)
604         self.assertEquals(0,
605                           len(messages),
606                           "Did not receive the expected number of messages")
607
608     def test_ldap_anonymous_access(self):
609         def isLastExpectedMessage(msg):
610             return (msg["type"] == "Authorization" and
611                     msg["Authorization"]["serviceDescription"]  == "LDAP" and
612                     msg["Authorization"]["transportProtection"] == "TLS" and
613                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
614                     msg["Authorization"]["authType"] == "no bind")
615
616         creds = self.insta_creds(template=self.get_credentials())
617         creds.set_anonymous()
618
619         self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
620                            lp=self.get_loadparm(),
621                            credentials=creds)
622
623         try:
624             self.samdb.search(base=self.samdb.domain_dn())
625             self.fail("Expected an LdbError exception")
626         except LdbError:
627             pass
628
629         messages = self.waitForMessages(isLastExpectedMessage)
630         self.assertEquals(1,
631                           len(messages),
632                           "Did not receive the expected number of messages")
633
634     def test_smb(self):
635         def isLastExpectedMessage(msg):
636             return (msg["type"] == "Authorization" and
637                     msg["Authorization"]["serviceDescription"] == "SMB" and
638                     msg["Authorization"]["authType"] == "krb5" and
639                     msg["Authorization"]["transportProtection"] == "SMB")
640
641         creds = self.insta_creds(template=self.get_credentials())
642         smb.SMB(self.server,
643                 "sysvol",
644                 lp=self.get_loadparm(),
645                 creds=creds)
646
647         messages = self.waitForMessages(isLastExpectedMessage)
648         self.assertEquals(3,
649                           len(messages),
650                           "Did not receive the expected number of messages")
651         # Check the first message it should be an Authentication
652         msg = messages[0]
653         self.assertEquals("Authentication", msg["type"])
654         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
655         self.assertEquals("Kerberos KDC",
656                           msg["Authentication"]["serviceDescription"])
657         self.assertEquals("ENC-TS Pre-authentication",
658                           msg["Authentication"]["authDescription"])
659
660         # Check the second message it should be an Authentication
661         msg = messages[1]
662         self.assertEquals("Authentication", msg["type"])
663         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
664         self.assertEquals("Kerberos KDC",
665                           msg["Authentication"]["serviceDescription"])
666         self.assertEquals("ENC-TS Pre-authentication",
667                           msg["Authentication"]["authDescription"])
668
669     def test_smb_bad_password(self):
670         def isLastExpectedMessage(msg):
671             return (msg["type"] == "Authentication" and
672                     (msg["Authentication"]["serviceDescription"] ==
673                         "Kerberos KDC") and
674                     (msg["Authentication"]["status"] ==
675                         "NT_STATUS_WRONG_PASSWORD") and
676                     (msg["Authentication"]["authDescription"] ==
677                         "ENC-TS Pre-authentication"))
678
679         creds = self.insta_creds(template=self.get_credentials())
680         creds.set_password("badPassword")
681
682         thrown = False
683         try:
684             smb.SMB(self.server,
685                     "sysvol",
686                     lp=self.get_loadparm(),
687                     creds=creds)
688         except NTSTATUSError:
689             thrown = True
690         self.assertEquals(thrown, True)
691
692         messages = self.waitForMessages(isLastExpectedMessage)
693         self.assertEquals(1,
694                           len(messages),
695                           "Did not receive the expected number of messages")
696
697     def test_smb_bad_user(self):
698         def isLastExpectedMessage(msg):
699             return (msg["type"] == "Authentication" and
700                     (msg["Authentication"]["serviceDescription"] ==
701                         "Kerberos KDC") and
702                     (msg["Authentication"]["status"] ==
703                         "NT_STATUS_NO_SUCH_USER") and
704                     (msg["Authentication"]["authDescription"] ==
705                         "ENC-TS Pre-authentication"))
706
707         creds = self.insta_creds(template=self.get_credentials())
708         creds.set_username("badUser")
709
710         thrown = False
711         try:
712             smb.SMB(self.server,
713                     "sysvol",
714                     lp=self.get_loadparm(),
715                     creds=creds)
716         except NTSTATUSError:
717             thrown = True
718         self.assertEquals(thrown, True)
719
720         messages = self.waitForMessages(isLastExpectedMessage)
721         self.assertEquals(1,
722                           len(messages),
723                           "Did not receive the expected number of messages")
724
725     def test_smb1_anonymous(self):
726         def isLastExpectedMessage(msg):
727             return (msg["type"] == "Authorization" and
728                     msg["Authorization"]["serviceDescription"] == "SMB" and
729                     msg["Authorization"]["authType"] == "NTLMSSP" and
730                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
731                     msg["Authorization"]["transportProtection"] == "SMB")
732
733         server   = os.environ["SERVER"]
734
735         path = "//%s/IPC$" % server
736         auth = "-N"
737         call(["bin/smbclient", path, auth, "-mNT1", "-c quit"])
738
739         messages = self.waitForMessages(isLastExpectedMessage)
740         self.assertEquals(3,
741                           len(messages),
742                           "Did not receive the expected number of messages")
743
744         # Check the first message it should be an Authentication
745         msg = messages[0]
746         self.assertEquals("Authentication", msg["type"])
747         self.assertEquals("NT_STATUS_NO_SUCH_USER",
748                           msg["Authentication"]["status"])
749         self.assertEquals("SMB",
750                           msg["Authentication"]["serviceDescription"])
751         self.assertEquals("NTLMSSP",
752                           msg["Authentication"]["authDescription"])
753         self.assertEquals("No-Password",
754                           msg["Authentication"]["passwordType"])
755
756         # Check the second message it should be an Authentication
757         msg = messages[1]
758         self.assertEquals("Authentication", msg["type"])
759         self.assertEquals("NT_STATUS_OK",
760                           msg["Authentication"]["status"])
761         self.assertEquals("SMB",
762                           msg["Authentication"]["serviceDescription"])
763         self.assertEquals("NTLMSSP",
764                           msg["Authentication"]["authDescription"])
765         self.assertEquals("No-Password",
766                           msg["Authentication"]["passwordType"])
767         self.assertEquals("ANONYMOUS LOGON",
768                           msg["Authentication"]["becameAccount"])
769
770     def test_smb2_anonymous(self):
771         def isLastExpectedMessage(msg):
772             return (msg["type"] == "Authorization" and
773                     msg["Authorization"]["serviceDescription"] == "SMB2" and
774                     msg["Authorization"]["authType"] == "NTLMSSP" and
775                     msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
776                     msg["Authorization"]["transportProtection"] == "SMB")
777
778         server   = os.environ["SERVER"]
779
780         path = "//%s/IPC$" % server
781         auth = "-N"
782         call(["bin/smbclient", path, auth, "-mSMB3", "-c quit"])
783
784         messages = self.waitForMessages(isLastExpectedMessage)
785         self.assertEquals(3,
786                           len(messages),
787                           "Did not receive the expected number of messages")
788
789         # Check the first message it should be an Authentication
790         msg = messages[0]
791         self.assertEquals("Authentication", msg["type"])
792         self.assertEquals("NT_STATUS_NO_SUCH_USER",
793                           msg["Authentication"]["status"])
794         self.assertEquals("SMB2",
795                           msg["Authentication"]["serviceDescription"])
796         self.assertEquals("NTLMSSP",
797                           msg["Authentication"]["authDescription"])
798         self.assertEquals("No-Password",
799                           msg["Authentication"]["passwordType"])
800
801         # Check the second message it should be an Authentication
802         msg = messages[1]
803         self.assertEquals("Authentication", msg["type"])
804         self.assertEquals("NT_STATUS_OK",
805                           msg["Authentication"]["status"])
806         self.assertEquals("SMB2",
807                           msg["Authentication"]["serviceDescription"])
808         self.assertEquals("NTLMSSP",
809                           msg["Authentication"]["authDescription"])
810         self.assertEquals("No-Password",
811                           msg["Authentication"]["passwordType"])
812         self.assertEquals("ANONYMOUS LOGON",
813                           msg["Authentication"]["becameAccount"])
814
815     def test_smb_no_krb_spnego(self):
816         def isLastExpectedMessage(msg):
817             return (msg["type"] == "Authorization" and
818                     msg["Authorization"]["serviceDescription"] == "SMB" and
819                     msg["Authorization"]["authType"] == "NTLMSSP" and
820                     msg["Authorization"]["transportProtection"] == "SMB")
821
822         creds = self.insta_creds(template=self.get_credentials(),
823                                  kerberos_state=DONT_USE_KERBEROS)
824         smb.SMB(self.server,
825                 "sysvol",
826                 lp=self.get_loadparm(),
827                 creds=creds)
828
829         messages = self.waitForMessages(isLastExpectedMessage)
830         self.assertEquals(2,
831                           len(messages),
832                           "Did not receive the expected number of messages")
833         # Check the first message it should be an Authentication
834         msg = messages[0]
835         self.assertEquals("Authentication", msg["type"])
836         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
837         self.assertEquals("SMB",
838                           msg["Authentication"]["serviceDescription"])
839         self.assertEquals("NTLMSSP",
840                           msg["Authentication"]["authDescription"])
841         self.assertEquals("NTLMv2",
842                           msg["Authentication"]["passwordType"])
843
844     def test_smb_no_krb_spnego_bad_password(self):
845         def isLastExpectedMessage(msg):
846             return (msg["type"] == "Authentication" and
847                     msg["Authentication"]["serviceDescription"] == "SMB" and
848                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
849                     msg["Authentication"]["passwordType"] == "NTLMv2" and
850                     (msg["Authentication"]["status"] ==
851                         "NT_STATUS_WRONG_PASSWORD"))
852
853         creds = self.insta_creds(template=self.get_credentials(),
854                                  kerberos_state=DONT_USE_KERBEROS)
855         creds.set_password("badPassword")
856
857         thrown = False
858         try:
859             smb.SMB(self.server,
860                     "sysvol",
861                     lp=self.get_loadparm(),
862                     creds=creds)
863         except NTSTATUSError:
864             thrown = True
865         self.assertEquals(thrown, True)
866
867         messages = self.waitForMessages(isLastExpectedMessage)
868         self.assertEquals(1,
869                           len(messages),
870                           "Did not receive the expected number of messages")
871
872     def test_smb_no_krb_spnego_bad_user(self):
873         def isLastExpectedMessage(msg):
874             return (msg["type"] == "Authentication" and
875                     msg["Authentication"]["serviceDescription"] == "SMB" and
876                     msg["Authentication"]["authDescription"] == "NTLMSSP" and
877                     msg["Authentication"]["passwordType"] == "NTLMv2" and
878                     (msg["Authentication"]["status"] ==
879                         "NT_STATUS_NO_SUCH_USER"))
880
881         creds = self.insta_creds(template=self.get_credentials(),
882                                  kerberos_state=DONT_USE_KERBEROS)
883         creds.set_username("badUser")
884
885         thrown = False
886         try:
887             smb.SMB(self.server,
888                     "sysvol",
889                     lp=self.get_loadparm(),
890                     creds=creds)
891         except NTSTATUSError:
892             thrown = True
893         self.assertEquals(thrown, True)
894
895         messages = self.waitForMessages(isLastExpectedMessage)
896         self.assertEquals(1,
897                           len(messages),
898                           "Did not receive the expected number of messages")
899
900     def test_smb_no_krb_no_spnego_no_ntlmv2(self):
901         def isLastExpectedMessage(msg):
902             return (msg["type"] == "Authorization" and
903                     msg["Authorization"]["serviceDescription"] == "SMB" and
904                     msg["Authorization"]["authType"] == "bare-NTLM" and
905                     msg["Authorization"]["transportProtection"] == "SMB")
906
907         creds = self.insta_creds(template=self.get_credentials(),
908                                  kerberos_state=DONT_USE_KERBEROS)
909         smb.SMB(self.server,
910                 "sysvol",
911                 lp=self.get_loadparm(),
912                 creds=creds,
913                 ntlmv2_auth=False,
914                 use_spnego=False)
915
916         messages = self.waitForMessages(isLastExpectedMessage)
917         self.assertEquals(2,
918                           len(messages),
919                           "Did not receive the expected number of messages")
920         # Check the first message it should be an Authentication
921         msg = messages[0]
922         self.assertEquals("Authentication", msg["type"])
923         self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
924         self.assertEquals("SMB",
925                           msg["Authentication"]["serviceDescription"])
926         self.assertEquals("bare-NTLM",
927                           msg["Authentication"]["authDescription"])
928         self.assertEquals("NTLMv1",
929                           msg["Authentication"]["passwordType"])
930
931     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
932         def isLastExpectedMessage(msg):
933             return (msg["type"] == "Authentication" and
934                     msg["Authentication"]["serviceDescription"] == "SMB" and
935                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
936                     msg["Authentication"]["passwordType"] == "NTLMv1" and
937                     (msg["Authentication"]["status"] ==
938                         "NT_STATUS_WRONG_PASSWORD"))
939
940         creds = self.insta_creds(template=self.get_credentials(),
941                                  kerberos_state=DONT_USE_KERBEROS)
942         creds.set_password("badPassword")
943
944         thrown = False
945         try:
946             smb.SMB(self.server,
947                     "sysvol",
948                     lp=self.get_loadparm(),
949                     creds=creds,
950                     ntlmv2_auth=False,
951                     use_spnego=False)
952         except NTSTATUSError:
953             thrown = True
954         self.assertEquals(thrown, True)
955
956         messages = self.waitForMessages(isLastExpectedMessage)
957         self.assertEquals(1,
958                           len(messages),
959                           "Did not receive the expected number of messages")
960
961     def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
962         def isLastExpectedMessage(msg):
963             return (msg["type"] == "Authentication" and
964                     msg["Authentication"]["serviceDescription"] == "SMB" and
965                     msg["Authentication"]["authDescription"] == "bare-NTLM" and
966                     msg["Authentication"]["passwordType"] == "NTLMv1" and
967                     (msg["Authentication"]["status"] ==
968                         "NT_STATUS_NO_SUCH_USER"))
969
970         creds = self.insta_creds(template=self.get_credentials(),
971                                  kerberos_state=DONT_USE_KERBEROS)
972         creds.set_username("badUser")
973
974         thrown = False
975         try:
976             smb.SMB(self.server,
977                     "sysvol",
978                     lp=self.get_loadparm(),
979                     creds=creds,
980                     ntlmv2_auth=False,
981                     use_spnego=False)
982         except NTSTATUSError:
983             thrown = True
984         self.assertEquals(thrown, True)
985
986         messages = self.waitForMessages(isLastExpectedMessage)
987         self.assertEquals(1,
988                           len(messages),
989                           "Did not receive the expected number of messages")
990
991     def test_samlogon_interactive(self):
992
993         workstation = "AuthLogTests"
994
995         def isLastExpectedMessage(msg):
996             return (msg["type"] == "Authentication" and
997                     (msg["Authentication"]["serviceDescription"] ==
998                         "SamLogon") and
999                     (msg["Authentication"]["authDescription"] ==
1000                         "interactive") and
1001                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1002                     (msg["Authentication"]["workstation"] ==
1003                         r"\\%s" % workstation))
1004
1005         server   = os.environ["SERVER"]
1006         user     = os.environ["USERNAME"]
1007         password = os.environ["PASSWORD"]
1008         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1009
1010         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1011
1012         messages = self.waitForMessages(isLastExpectedMessage)
1013         messages = self.remove_netlogon_messages(messages)
1014         received = len(messages)
1015         self.assertIs(True,
1016                       (received == 5 or received == 6),
1017                       "Did not receive the expected number of messages")
1018
1019     def test_samlogon_interactive_bad_password(self):
1020
1021         workstation = "AuthLogTests"
1022
1023         def isLastExpectedMessage(msg):
1024             return (msg["type"] == "Authentication" and
1025                     (msg["Authentication"]["serviceDescription"] ==
1026                         "SamLogon") and
1027                     (msg["Authentication"]["authDescription"] ==
1028                         "interactive") and
1029                     (msg["Authentication"]["status"] ==
1030                         "NT_STATUS_WRONG_PASSWORD") and
1031                     (msg["Authentication"]["workstation"] ==
1032                         r"\\%s" % workstation))
1033
1034         server   = os.environ["SERVER"]
1035         user     = os.environ["USERNAME"]
1036         password = "badPassword"
1037         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1038
1039         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1040
1041         messages = self.waitForMessages(isLastExpectedMessage)
1042         messages = self.remove_netlogon_messages(messages)
1043         received = len(messages)
1044         self.assertIs(True,
1045                       (received == 5 or received == 6),
1046                       "Did not receive the expected number of messages")
1047
1048     def test_samlogon_interactive_bad_user(self):
1049
1050         workstation = "AuthLogTests"
1051
1052         def isLastExpectedMessage(msg):
1053             return (msg["type"] == "Authentication" and
1054                     (msg["Authentication"]["serviceDescription"] ==
1055                         "SamLogon") and
1056                     (msg["Authentication"]["authDescription"] ==
1057                         "interactive") and
1058                     (msg["Authentication"]["status"] ==
1059                         "NT_STATUS_NO_SUCH_USER") and
1060                     (msg["Authentication"]["workstation"] ==
1061                         r"\\%s" % workstation))
1062
1063         server   = os.environ["SERVER"]
1064         user     = "badUser"
1065         password = os.environ["PASSWORD"]
1066         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 1)
1067
1068         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1069
1070         messages = self.waitForMessages(isLastExpectedMessage)
1071         messages = self.remove_netlogon_messages(messages)
1072         received = len(messages)
1073         self.assertIs(True,
1074                       (received == 5 or received == 6),
1075                       "Did not receive the expected number of messages")
1076
1077     def test_samlogon_network(self):
1078
1079         workstation = "AuthLogTests"
1080
1081         def isLastExpectedMessage(msg):
1082             return (msg["type"] == "Authentication" and
1083                     (msg["Authentication"]["serviceDescription"] ==
1084                         "SamLogon") and
1085                     msg["Authentication"]["authDescription"] == "network" and
1086                     msg["Authentication"]["status"] == "NT_STATUS_OK" and
1087                     (msg["Authentication"]["workstation"] ==
1088                         r"\\%s" % workstation))
1089
1090         server   = os.environ["SERVER"]
1091         user     = os.environ["USERNAME"]
1092         password = os.environ["PASSWORD"]
1093         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1094
1095         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1096
1097         messages = self.waitForMessages(isLastExpectedMessage)
1098         messages = self.remove_netlogon_messages(messages)
1099         received = len(messages)
1100         self.assertIs(True,
1101                       (received == 5 or received == 6),
1102                       "Did not receive the expected number of messages")
1103
1104     def test_samlogon_network_bad_password(self):
1105
1106         workstation = "AuthLogTests"
1107
1108         def isLastExpectedMessage(msg):
1109             return (msg["type"] == "Authentication" and
1110                     (msg["Authentication"]["serviceDescription"] ==
1111                         "SamLogon") and
1112                     msg["Authentication"]["authDescription"] == "network" and
1113                     (msg["Authentication"]["status"] ==
1114                         "NT_STATUS_WRONG_PASSWORD") and
1115                     (msg["Authentication"]["workstation"] ==
1116                         r"\\%s" % workstation))
1117
1118         server   = os.environ["SERVER"]
1119         user     = os.environ["USERNAME"]
1120         password = "badPassword"
1121         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1122
1123         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1124
1125         messages = self.waitForMessages(isLastExpectedMessage)
1126         messages = self.remove_netlogon_messages(messages)
1127         received = len(messages)
1128         self.assertIs(True,
1129                       (received == 5 or received == 6),
1130                       "Did not receive the expected number of messages")
1131
1132     def test_samlogon_network_bad_user(self):
1133
1134         workstation = "AuthLogTests"
1135
1136         def isLastExpectedMessage(msg):
1137             return ((msg["type"] == "Authentication") and
1138                     (msg["Authentication"]["serviceDescription"] ==
1139                         "SamLogon") and
1140                     (msg["Authentication"]["authDescription"] == "network") and
1141                     (msg["Authentication"]["status"] ==
1142                         "NT_STATUS_NO_SUCH_USER") and
1143                     (msg["Authentication"]["workstation"] ==
1144                         r"\\%s" % workstation))
1145
1146         server   = os.environ["SERVER"]
1147         user     = "badUser"
1148         password = os.environ["PASSWORD"]
1149         samlogon = "samlogon %s %s %s %d" % (user, password, workstation, 2)
1150
1151         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1152
1153         messages = self.waitForMessages(isLastExpectedMessage)
1154         messages = self.remove_netlogon_messages(messages)
1155         received = len(messages)
1156         self.assertIs(True,
1157                       (received == 5 or received == 6),
1158                       "Did not receive the expected number of messages")
1159
1160     def test_samlogon_network_mschap(self):
1161
1162         workstation = "AuthLogTests"
1163
1164         def isLastExpectedMessage(msg):
1165             return ((msg["type"] == "Authentication") and
1166                     (msg["Authentication"]["serviceDescription"] ==
1167                         "SamLogon") and
1168                     (msg["Authentication"]["authDescription"] == "network") and
1169                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1170                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1171                     (msg["Authentication"]["workstation"] ==
1172                         r"\\%s" % workstation))
1173
1174         server   = os.environ["SERVER"]
1175         user     = os.environ["USERNAME"]
1176         password = os.environ["PASSWORD"]
1177         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1178             user, password, workstation, 2)
1179
1180         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1181
1182         messages = self.waitForMessages(isLastExpectedMessage)
1183         messages = self.remove_netlogon_messages(messages)
1184         received = len(messages)
1185         self.assertIs(True,
1186                       (received == 5 or received == 6),
1187                       "Did not receive the expected number of messages")
1188
1189     def test_samlogon_network_mschap_bad_password(self):
1190
1191         workstation = "AuthLogTests"
1192
1193         def isLastExpectedMessage(msg):
1194             return ((msg["type"] == "Authentication") and
1195                     (msg["Authentication"]["serviceDescription"] ==
1196                         "SamLogon") and
1197                     (msg["Authentication"]["authDescription"] == "network") and
1198                     (msg["Authentication"]["status"] ==
1199                         "NT_STATUS_WRONG_PASSWORD") and
1200                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1201                     (msg["Authentication"]["workstation"] ==
1202                         r"\\%s" % workstation))
1203
1204         server   = os.environ["SERVER"]
1205         user     = os.environ["USERNAME"]
1206         password = "badPassword"
1207         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1208             user, password, workstation, 2)
1209
1210         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1211
1212         messages = self.waitForMessages(isLastExpectedMessage)
1213         messages = self.remove_netlogon_messages(messages)
1214         received = len(messages)
1215         self.assertIs(True,
1216                       (received == 5 or received == 6),
1217                       "Did not receive the expected number of messages")
1218
1219     def test_samlogon_network_mschap_bad_user(self):
1220
1221         workstation = "AuthLogTests"
1222
1223         def isLastExpectedMessage(msg):
1224             return ((msg["type"] == "Authentication") and
1225                     (msg["Authentication"]["serviceDescription"] ==
1226                         "SamLogon") and
1227                     (msg["Authentication"]["authDescription"] == "network") and
1228                     (msg["Authentication"]["status"] ==
1229                         "NT_STATUS_NO_SUCH_USER") and
1230                     (msg["Authentication"]["passwordType"] == "MSCHAPv2") and
1231                     (msg["Authentication"]["workstation"] ==
1232                         r"\\%s" % workstation))
1233
1234         server   = os.environ["SERVER"]
1235         user     = "badUser"
1236         password = os.environ["PASSWORD"]
1237         samlogon = "samlogon %s %s %s %d 0x00010000" % (
1238             user, password, workstation, 2)
1239
1240         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1241
1242         messages = self.waitForMessages(isLastExpectedMessage)
1243         messages = self.remove_netlogon_messages(messages)
1244         received = len(messages)
1245         self.assertIs(True,
1246                       (received == 5 or received == 6),
1247                       "Did not receive the expected number of messages")
1248
1249     def test_samlogon_schannel_seal(self):
1250
1251         workstation = "AuthLogTests"
1252
1253         def isLastExpectedMessage(msg):
1254             return ((msg["type"] == "Authentication") and
1255                     (msg["Authentication"]["serviceDescription"] ==
1256                         "SamLogon") and
1257                     (msg["Authentication"]["authDescription"] == "network") and
1258                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1259                     (msg["Authentication"]["workstation"] ==
1260                         r"\\%s" % workstation))
1261
1262         server   = os.environ["SERVER"]
1263         user     = os.environ["USERNAME"]
1264         password = os.environ["PASSWORD"]
1265         samlogon = "schannel;samlogon %s %s %s" % (user, password, workstation)
1266
1267         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1268
1269         messages = self.waitForMessages(isLastExpectedMessage)
1270         messages = self.remove_netlogon_messages(messages)
1271         received = len(messages)
1272         self.assertIs(True,
1273                       (received == 5 or received == 6),
1274                       "Did not receive the expected number of messages")
1275
1276         # Check the second to last message it should be an Authorization
1277         msg = messages[-2]
1278         self.assertEquals("Authorization", msg["type"])
1279         self.assertEquals("DCE/RPC",
1280                           msg["Authorization"]["serviceDescription"])
1281         self.assertEquals("schannel",  msg["Authorization"]["authType"])
1282         self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1283         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
1284
1285     # Signed logons get promoted to sealed, this test ensures that
1286     # this behaviour is not removed accidentally
1287     def test_samlogon_schannel_sign(self):
1288
1289         workstation = "AuthLogTests"
1290
1291         def isLastExpectedMessage(msg):
1292             return ((msg["type"] == "Authentication") and
1293                     (msg["Authentication"]["serviceDescription"] ==
1294                         "SamLogon") and
1295                     (msg["Authentication"]["authDescription"] == "network") and
1296                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
1297                     (msg["Authentication"]["workstation"] ==
1298                         r"\\%s" % workstation))
1299
1300         server   = os.environ["SERVER"]
1301         user     = os.environ["USERNAME"]
1302         password = os.environ["PASSWORD"]
1303         samlogon = "schannelsign;samlogon %s %s %s" % (
1304             user, password, workstation)
1305
1306         call(["bin/rpcclient", "-c", samlogon, "-U%", server])
1307
1308         messages = self.waitForMessages(isLastExpectedMessage)
1309         messages = self.remove_netlogon_messages(messages)
1310         received = len(messages)
1311         self.assertIs(True,
1312                       (received == 5 or received == 6),
1313                       "Did not receive the expected number of messages")
1314
1315         # Check the second to last message it should be an Authorization
1316         msg = messages[-2]
1317         self.assertEquals("Authorization", msg["type"])
1318         self.assertEquals("DCE/RPC",
1319                           msg["Authorization"]["serviceDescription"])
1320         self.assertEquals("schannel",  msg["Authorization"]["authType"])
1321         self.assertEquals("SEAL", msg["Authorization"]["transportProtection"])
1322         self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))