CVE-2022-37966 python:/tests/krb5: call sys.path.insert(0, "bin/python") before any...
[samba.git] / python / samba / tests / krb5 / lockout_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 sys.path.insert(0, 'bin/python')
24 os.environ['PYTHONUNBUFFERED'] = '1'
25
26 from concurrent import futures
27 from enum import Enum
28 from functools import partial
29 from multiprocessing import Pipe
30 import time
31
32 from cryptography.hazmat.backends import default_backend
33 from cryptography.hazmat.primitives.ciphers.base import Cipher
34 from cryptography.hazmat.primitives.ciphers import algorithms
35
36 import ldb
37
38 from samba import (
39     NTSTATUSError,
40     dsdb,
41     generate_random_bytes,
42     generate_random_password,
43     ntstatus,
44     unix2nttime,
45     werror,
46 )
47 from samba.credentials import DONT_USE_KERBEROS, MUST_USE_KERBEROS
48 from samba.crypto import (
49     aead_aes_256_cbc_hmac_sha512_blob,
50     des_crypt_blob_16,
51     md4_hash_blob,
52     sha512_pbkdf2,
53 )
54 from samba.dcerpc import lsa, samr
55 from samba.samdb import SamDB
56
57 from samba.tests import connect_samdb, env_get_var_value, env_loadparm
58
59 from samba.tests.krb5.as_req_tests import AsReqBaseTest
60 from samba.tests.krb5 import kcrypto
61 from samba.tests.krb5.kdc_base_test import KDCBaseTest
62 from samba.tests.krb5.raw_testcase import KerberosCredentials
63 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
64 from samba.tests.krb5.rfc4120_constants import (
65     KDC_ERR_CLIENT_REVOKED,
66     KDC_ERR_PREAUTH_FAILED,
67     KRB_AS_REP,
68     KRB_ERROR,
69     NT_PRINCIPAL,
70     NT_SRV_INST,
71 )
72
73 global_asn1_print = False
74 global_hexdump = False
75
76
77 class ConnectionResult(Enum):
78     LOCKED_OUT = 1
79     WRONG_PASSWORD = 2
80     SUCCESS = 3
81
82
83 def connect_kdc(pipe,
84                 url,
85                 hostname,
86                 username,
87                 password,
88                 domain,
89                 realm,
90                 workstation,
91                 dn,
92                 expect_error=True):
93     AsReqBaseTest.setUpClass()
94     as_req_base = AsReqBaseTest()
95     as_req_base.setUp()
96
97     user_creds = KerberosCredentials()
98     user_creds.set_username(username)
99     user_creds.set_password(password)
100     user_creds.set_domain(domain)
101     user_creds.set_realm(realm)
102     user_creds.set_workstation(workstation)
103     user_creds.set_kerberos_state(DONT_USE_KERBEROS)
104
105     user_name = user_creds.get_username()
106     cname = as_req_base.PrincipalName_create(name_type=NT_PRINCIPAL,
107                                              names=user_name.split('/'))
108
109     krbtgt_creds = as_req_base.get_krbtgt_creds()
110     krbtgt_supported_etypes = krbtgt_creds.tgs_supported_enctypes
111     realm = krbtgt_creds.get_realm()
112
113     krbtgt_account = krbtgt_creds.get_username()
114     sname = as_req_base.PrincipalName_create(name_type=NT_SRV_INST,
115                                              names=[krbtgt_account, realm])
116
117     expected_salt = user_creds.get_salt()
118
119     till = as_req_base.get_KerberosTime(offset=36000)
120
121     kdc_options = krb5_asn1.KDCOptions('postdated')
122
123     preauth_key = as_req_base.PasswordKey_from_creds(user_creds,
124                                                      kcrypto.Enctype.AES256)
125
126     ts_enc_padata = as_req_base.get_enc_timestamp_pa_data_from_key(preauth_key)
127     padata = [ts_enc_padata]
128
129     krbtgt_decryption_key = (
130         as_req_base.TicketDecryptionKey_from_creds(krbtgt_creds))
131
132     etypes = as_req_base.get_default_enctypes()
133
134     if expect_error:
135         expected_error_modes = (KDC_ERR_CLIENT_REVOKED,
136                                 KDC_ERR_PREAUTH_FAILED)
137     else:
138         expected_error_modes = 0
139
140     # Remove the LDAP connection.
141     del type(as_req_base)._ldb
142
143     # Indicate that we're ready. This ensures we hit the right transaction
144     # lock.
145     pipe.send_bytes(b'0')
146
147     # Wait for the main process to take out a transaction lock.
148     if not pipe.poll(timeout=5):
149         raise AssertionError('main process failed to indicate readiness')
150
151     # Try making a Kerberos AS-REQ to the KDC. This should fail, either due to
152     # the user's account being locked out or due to using the wrong password.
153     as_rep, kdc_exchange_dict = as_req_base._test_as_exchange(
154         cname=cname,
155         realm=realm,
156         sname=sname,
157         till=till,
158         client_as_etypes=etypes,
159         expected_error_mode=expected_error_modes,
160         expected_crealm=realm,
161         expected_cname=cname,
162         expected_srealm=realm,
163         expected_sname=sname,
164         expected_salt=expected_salt,
165         etypes=etypes,
166         padata=padata,
167         kdc_options=kdc_options,
168         expected_supported_etypes=krbtgt_supported_etypes,
169         expected_account_name=user_name,
170         preauth_key=preauth_key,
171         ticket_decryption_key=krbtgt_decryption_key,
172         pac_request=True)
173     as_req_base.assertIsNotNone(as_rep)
174
175     msg_type = as_rep['msg-type']
176     if expect_error and msg_type != KRB_ERROR or (
177             not expect_error and msg_type != KRB_AS_REP):
178         raise AssertionError(f'wrong message type {msg_type}')
179
180     if not expect_error:
181         return ConnectionResult.SUCCESS
182
183     error_code = as_rep['error-code']
184     if error_code == KDC_ERR_CLIENT_REVOKED:
185         return ConnectionResult.LOCKED_OUT
186     elif error_code == KDC_ERR_PREAUTH_FAILED:
187         return ConnectionResult.WRONG_PASSWORD
188     else:
189         raise AssertionError(f'wrong error code {error_code}')
190
191
192 def connect_ntlm(pipe,
193                  url,
194                  hostname,
195                  username,
196                  password,
197                  domain,
198                  realm,
199                  workstation,
200                  dn):
201     user_creds = KerberosCredentials()
202     user_creds.set_username(username)
203     user_creds.set_password(password)
204     user_creds.set_domain(domain)
205     user_creds.set_workstation(workstation)
206     user_creds.set_kerberos_state(DONT_USE_KERBEROS)
207
208     # Indicate that we're ready. This ensures we hit the right transaction
209     # lock.
210     pipe.send_bytes(b'0')
211
212     # Wait for the main process to take out a transaction lock.
213     if not pipe.poll(timeout=5):
214         raise AssertionError('main process failed to indicate readiness')
215
216     try:
217         # Try connecting to SamDB. This should fail, either due to our
218         # account being locked out or due to using the wrong password.
219         SamDB(url=url,
220               credentials=user_creds,
221               lp=env_loadparm())
222     except ldb.LdbError as err:
223         num, estr = err.args
224
225         if num != ldb.ERR_INVALID_CREDENTIALS:
226             raise AssertionError(f'connection raised wrong error code '
227                                  f'({err})')
228
229         if f'data {werror.WERR_ACCOUNT_LOCKED_OUT:x},' in estr:
230             return ConnectionResult.LOCKED_OUT
231         elif f'data {werror.WERR_LOGON_FAILURE:x},' in estr:
232             return ConnectionResult.WRONG_PASSWORD
233         else:
234             raise AssertionError(f'connection raised wrong error code '
235                                  f'({estr})')
236     else:
237         return ConnectionResult.SUCCESS
238
239
240 def connect_samr(pipe,
241                  url,
242                  hostname,
243                  username,
244                  password,
245                  domain,
246                  realm,
247                  workstation,
248                  dn):
249     # Get the user's NT hash.
250     user_creds = KerberosCredentials()
251     user_creds.set_password(password)
252     nt_hash = user_creds.get_nt_hash()
253
254     # Generate a new UTF-16 password.
255     new_password = generate_random_password(32, 32)
256     new_password = new_password.encode('utf-16le')
257
258     # Generate the MD4 hash of the password.
259     new_password_md4 = md4_hash_blob(new_password)
260
261     # Prefix the password with padding so it is 512 bytes long.
262     new_password_len = len(new_password)
263     remaining_len = 512 - new_password_len
264     new_password = bytes(remaining_len) + new_password
265
266     # Append the 32-bit length of the password..
267     new_password += int.to_bytes(new_password_len,
268                                  length=4,
269                                  byteorder='little')
270
271     # Encrypt the password with RC4 and the existing NT hash.
272     encryptor = Cipher(algorithms.ARC4(nt_hash),
273                        None,
274                        default_backend()).encryptor()
275     new_password = encryptor.update(new_password)
276
277     # Create a key from the MD4 hash of the new password.
278     key = new_password_md4[:14]
279
280     # Encrypt the old NT hash with DES to obtain the verifier.
281     verifier = des_crypt_blob_16(nt_hash, key)
282
283     server = lsa.String()
284     server.string = hostname
285
286     account = lsa.String()
287     account.string = username
288
289     nt_password = samr.CryptPassword()
290     nt_password.data = list(new_password)
291
292     nt_verifier = samr.Password()
293     nt_verifier.hash = list(verifier)
294
295     conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
296
297     # Indicate that we're ready. This ensures we hit the right transaction
298     # lock.
299     pipe.send_bytes(b'0')
300
301     # Wait for the main process to take out a transaction lock.
302     if not pipe.poll(timeout=5):
303         raise AssertionError('main process failed to indicate readiness')
304
305     try:
306         # Try changing the password. This should fail, either due to our
307         # account being locked out or due to using the wrong password.
308         conn.ChangePasswordUser3(server=server,
309                                  account=account,
310                                  nt_password=nt_password,
311                                  nt_verifier=nt_verifier,
312                                  lm_change=True,
313                                  lm_password=None,
314                                  lm_verifier=None,
315                                  password3=None)
316     except NTSTATUSError as err:
317         num, estr = err.args
318
319         if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
320             return ConnectionResult.LOCKED_OUT
321         elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
322             return ConnectionResult.WRONG_PASSWORD
323         else:
324             raise AssertionError(f'pwd change raised wrong error code '
325                                  f'({num:08X})')
326     else:
327         return ConnectionResult.SUCCESS
328
329
330 def connect_samr_aes(pipe,
331                      url,
332                      hostname,
333                      username,
334                      password,
335                      domain,
336                      realm,
337                      workstation,
338                      dn):
339     # Get the user's NT hash.
340     user_creds = KerberosCredentials()
341     user_creds.set_password(password)
342     nt_hash = user_creds.get_nt_hash()
343
344     # Generate a new UTF-16 password.
345     new_password = generate_random_password(32, 32)
346     new_password = new_password.encode('utf-16le')
347
348     # Prepend the 16-bit length of the password..
349     new_password_len = int.to_bytes(len(new_password),
350                                     length=2,
351                                     byteorder='little')
352     new_password = new_password_len + new_password
353
354     server = lsa.String()
355     server.string = hostname
356
357     account = lsa.String()
358     account.string = username
359
360     # Derive a key from the user's NT hash.
361     iv = generate_random_bytes(16)
362     iterations = 5555
363     cek = sha512_pbkdf2(nt_hash, iv, iterations)
364
365     enc_key_salt = (b'Microsoft SAM encryption key '
366                     b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
367     mac_key_salt = (b'Microsoft SAM MAC key '
368                     b'AEAD-AES-256-CBC-HMAC-SHA512 16\0')
369
370     # Encrypt the new password.
371     ciphertext, auth_data = aead_aes_256_cbc_hmac_sha512_blob(new_password,
372                                                               cek,
373                                                               enc_key_salt,
374                                                               mac_key_salt,
375                                                               iv)
376
377     # Create the new password structure
378     pwd_buf = samr.EncryptedPasswordAES()
379     pwd_buf.auth_data = list(auth_data)
380     pwd_buf.salt = list(iv)
381     pwd_buf.cipher_len = len(ciphertext)
382     pwd_buf.cipher = list(ciphertext)
383     pwd_buf.PBKDF2Iterations = iterations
384
385     conn = samr.samr(f'ncacn_np:{hostname}[krb5,seal,smb2]')
386
387     # Indicate that we're ready. This ensures we hit the right transaction
388     # lock.
389     pipe.send_bytes(b'0')
390
391     # Wait for the main process to take out a transaction lock.
392     if not pipe.poll(timeout=5):
393         raise AssertionError('main process failed to indicate readiness')
394
395     try:
396         # Try changing the password. This should fail, either due to our
397         # account being locked out or due to using the wrong password.
398         conn.ChangePasswordUser4(server=server,
399                                  account=account,
400                                  password=pwd_buf)
401     except NTSTATUSError as err:
402         num, estr = err.args
403
404         if num == ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT:
405             return ConnectionResult.LOCKED_OUT
406         elif num == ntstatus.NT_STATUS_WRONG_PASSWORD:
407             return ConnectionResult.WRONG_PASSWORD
408         else:
409             raise AssertionError(f'pwd change raised wrong error code '
410                                  f'({num:08X})')
411     else:
412         return ConnectionResult.SUCCESS
413
414
415 def ldap_pwd_change(pipe,
416                     url,
417                     hostname,
418                     username,
419                     password,
420                     domain,
421                     realm,
422                     workstation,
423                     dn):
424     lp = env_loadparm()
425
426     admin_creds = KerberosCredentials()
427     admin_creds.guess(lp)
428     admin_creds.set_username(env_get_var_value('ADMIN_USERNAME'))
429     admin_creds.set_password(env_get_var_value('ADMIN_PASSWORD'))
430     admin_creds.set_kerberos_state(MUST_USE_KERBEROS)
431
432     samdb = SamDB(url=url,
433                   credentials=admin_creds,
434                   lp=lp)
435
436     old_utf16pw = f'"{password}"'.encode('utf-16le')
437
438     new_password = generate_random_password(32, 32)
439     new_utf16pw = f'"{new_password}"'.encode('utf-16le')
440
441     msg = ldb.Message(ldb.Dn(samdb, dn))
442     msg['0'] = ldb.MessageElement(old_utf16pw,
443                                   ldb.FLAG_MOD_DELETE,
444                                   'unicodePwd')
445     msg['1'] = ldb.MessageElement(new_utf16pw,
446                                   ldb.FLAG_MOD_ADD,
447                                   'unicodePwd')
448
449     # Indicate that we're ready. This ensures we hit the right transaction
450     # lock.
451     pipe.send_bytes(b'0')
452
453     # Wait for the main process to take out a transaction lock.
454     if not pipe.poll(timeout=5):
455         raise AssertionError('main process failed to indicate readiness')
456
457     # Try changing the user's password. This should fail, either due to the
458     # user's account being locked out or due to specifying the wrong password.
459     try:
460         samdb.modify(msg)
461     except ldb.LdbError as err:
462         num, estr = err.args
463         if num != ldb.ERR_CONSTRAINT_VIOLATION:
464             raise AssertionError(f'pwd change raised wrong error code ({err})')
465
466         if f'<{werror.WERR_ACCOUNT_LOCKED_OUT:08X}:' in estr:
467             return ConnectionResult.LOCKED_OUT
468         elif f'<{werror.WERR_INVALID_PASSWORD:08X}:' in estr:
469             return ConnectionResult.WRONG_PASSWORD
470         else:
471             raise AssertionError(f'pwd change raised wrong error code '
472                                  f'({estr})')
473     else:
474         return ConnectionResult.SUCCESS
475
476
477 class LockoutTests(KDCBaseTest):
478
479     def setUp(self):
480         super().setUp()
481         self.do_asn1_print = global_asn1_print
482         self.do_hexdump = global_hexdump
483
484         samdb = self.get_samdb()
485         base_dn = ldb.Dn(samdb, samdb.domain_dn())
486
487         def modify_attr(attr, value):
488             if value is None:
489                 value = []
490                 flag = ldb.FLAG_MOD_DELETE
491             else:
492                 value = str(value)
493                 flag = ldb.FLAG_MOD_REPLACE
494
495                 msg = ldb.Message(base_dn)
496                 msg[attr] = ldb.MessageElement(
497                     value, flag, attr)
498                 samdb.modify(msg)
499
500         res = samdb.search(base_dn,
501                            scope=ldb.SCOPE_BASE,
502                            attrs=['lockoutDuration',
503                                   'lockoutThreshold',
504                                   'msDS-LogonTimeSyncInterval'])
505         self.assertEqual(1, len(res))
506
507         # Reset the lockout duration as it was before.
508         lockout_duration = res[0].get('lockoutDuration', idx=0)
509         self.addCleanup(modify_attr, 'lockoutDuration', lockout_duration)
510
511         # Set the new lockout duration: locked out accounts now stay locked
512         # out.
513         modify_attr('lockoutDuration', 0)
514
515         # Reset the lockout threshold as it was before.
516         lockout_threshold = res[0].get('lockoutThreshold', idx=0)
517         self.addCleanup(modify_attr, 'lockoutThreshold', lockout_threshold)
518
519         # Set the new lockout threshold.
520         self.lockout_threshold = 3
521         modify_attr('lockoutThreshold', self.lockout_threshold)
522
523         # Reset the logon time sync interval as it was before.
524         sync_interval = res[0].get('msDS-LogonTimeSyncInterval', idx=0)
525         self.addCleanup(modify_attr,
526                         'msDS-LogonTimeSyncInterval',
527                         sync_interval)
528
529         # Set the new logon time sync interval. Setting it to 0 eliminates the
530         # need for this attribute to be updated on logon, and thus the
531         # requirement to take out a transaction.
532         modify_attr('msDS-LogonTimeSyncInterval', 0)
533
534         # Get the old 'minPwdAge'.
535         minPwdAge = samdb.get_minPwdAge()
536
537         # Reset the 'minPwdAge' as it was before.
538         self.addCleanup(samdb.set_minPwdAge, minPwdAge)
539
540         # Set it temporarily to '0'.
541         samdb.set_minPwdAge('0')
542
543     def assertLocalSamDB(self, samdb):
544         if samdb.url.startswith('tdb://'):
545             return
546         if samdb.url.startswith('mdb://'):
547             return
548
549         self.fail(f'connection to {samdb.url} is not local!')
550
551     def wait_for_ready(self, pipe, future):
552         if pipe.poll(timeout=5):
553             return
554
555         # We failed to read a response from the pipe, so see if the test raised
556         # an exception with more information.
557         if future.done():
558             exception = future.exception(timeout=0)
559             if exception is not None:
560                 raise exception
561
562         self.fail('test failed to indicate readiness')
563
564     def test_lockout_transaction_kdc(self):
565         self.do_lockout_transaction(connect_kdc)
566
567     def test_lockout_transaction_ntlm(self):
568         self.do_lockout_transaction(connect_ntlm)
569
570     def test_lockout_transaction_samr(self):
571         self.do_lockout_transaction(connect_samr)
572
573     def test_lockout_transaction_samr_aes(self):
574         if not self.gnutls_pbkdf2_support:
575             self.skipTest('gnutls_pbkdf2() is not available')
576         self.do_lockout_transaction(connect_samr_aes)
577
578     def test_lockout_transaction_ldap_pw_change(self):
579         self.do_lockout_transaction(ldap_pwd_change)
580
581     # Tests to ensure we can handle the account being renamed. We do not test
582     # renames with SAMR password changes, because in that case the entire
583     # process happens inside a transaction, and the password change method only
584     # receives the account username. By the time it searches for the account,
585     # it will have already been renamed, and so it will always fail to find the
586     # account.
587
588     def test_lockout_transaction_rename_kdc(self):
589         self.do_lockout_transaction(connect_kdc, rename=True)
590
591     def test_lockout_transaction_rename_ntlm(self):
592         self.do_lockout_transaction(connect_ntlm, rename=True)
593
594     def test_lockout_transaction_rename_ldap_pw_change(self):
595         self.do_lockout_transaction(ldap_pwd_change, rename=True)
596
597     def test_lockout_transaction_bad_pwd_kdc(self):
598         self.do_lockout_transaction(connect_kdc, correct_pw=False)
599
600     def test_lockout_transaction_bad_pwd_ntlm(self):
601         self.do_lockout_transaction(connect_ntlm, correct_pw=False)
602
603     def test_lockout_transaction_bad_pwd_samr(self):
604         self.do_lockout_transaction(connect_samr, correct_pw=False)
605
606     def test_lockout_transaction_bad_pwd_samr_aes(self):
607         if not self.gnutls_pbkdf2_support:
608             self.skipTest('gnutls_pbkdf2() is not available')
609         self.do_lockout_transaction(connect_samr_aes, correct_pw=False)
610
611     def test_lockout_transaction_bad_pwd_ldap_pw_change(self):
612         self.do_lockout_transaction(ldap_pwd_change, correct_pw=False)
613
614     def test_bad_pwd_count_transaction_kdc(self):
615         self.do_bad_pwd_count_transaction(connect_kdc)
616
617     def test_bad_pwd_count_transaction_ntlm(self):
618         self.do_bad_pwd_count_transaction(connect_ntlm)
619
620     def test_bad_pwd_count_transaction_samr(self):
621         self.do_bad_pwd_count_transaction(connect_samr)
622
623     def test_bad_pwd_count_transaction_samr_aes(self):
624         if not self.gnutls_pbkdf2_support:
625             self.skipTest('gnutls_pbkdf2() is not available')
626         self.do_bad_pwd_count_transaction(connect_samr_aes)
627
628     def test_bad_pwd_count_transaction_ldap_pw_change(self):
629         self.do_bad_pwd_count_transaction(ldap_pwd_change)
630
631     def test_bad_pwd_count_transaction_rename_kdc(self):
632         self.do_bad_pwd_count_transaction(connect_kdc, rename=True)
633
634     def test_bad_pwd_count_transaction_rename_ntlm(self):
635         self.do_bad_pwd_count_transaction(connect_ntlm, rename=True)
636
637     def test_bad_pwd_count_transaction_rename_ldap_pw_change(self):
638         self.do_bad_pwd_count_transaction(ldap_pwd_change, rename=True)
639
640     def test_lockout_race_kdc(self):
641         self.do_lockout_race(connect_kdc)
642
643     def test_lockout_race_ntlm(self):
644         self.do_lockout_race(connect_ntlm)
645
646     def test_lockout_race_samr(self):
647         self.do_lockout_race(connect_samr)
648
649     def test_lockout_race_samr_aes(self):
650         if not self.gnutls_pbkdf2_support:
651             self.skipTest('gnutls_pbkdf2() is not available')
652         self.do_lockout_race(connect_samr_aes)
653
654     def test_lockout_race_ldap_pw_change(self):
655         self.do_lockout_race(ldap_pwd_change)
656
657     def test_logon_without_transaction_ntlm(self):
658         self.do_logon_without_transaction(connect_ntlm)
659
660     # Tests to ensure that the connection functions work correctly in the happy
661     # path.
662
663     def test_logon_kdc(self):
664         self.do_logon(partial(connect_kdc, expect_error=False))
665
666     def test_logon_ntlm(self):
667         self.do_logon(connect_ntlm)
668
669     def test_logon_samr(self):
670         self.do_logon(connect_samr)
671
672     def test_logon_samr_aes(self):
673         if not self.gnutls_pbkdf2_support:
674             self.skipTest('gnutls_pbkdf2() is not available')
675         self.do_logon(connect_samr_aes)
676
677     def test_logon_ldap_pw_change(self):
678         self.do_logon(ldap_pwd_change)
679
680     # Test that connection without a correct password works.
681     def do_logon(self, connect_fn):
682         # Create the user account for testing.
683         user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
684                                            use_cache=False)
685         user_dn = user_creds.get_dn()
686
687         admin_creds = self.get_admin_creds()
688         lp = self.get_lp()
689
690         # Get a connection to our local SamDB.
691         samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
692                               credentials=admin_creds)
693         self.assertLocalSamDB(samdb)
694
695         password = user_creds.get_password()
696
697         # Prepare to connect to the server with a valid password.
698         our_pipe, their_pipe = Pipe(duplex=True)
699
700         # Inform the test function that it may proceed.
701         our_pipe.send_bytes(b'0')
702
703         result = connect_fn(pipe=their_pipe,
704                             url=f'ldap://{samdb.host_dns_name()}',
705                             hostname=samdb.host_dns_name(),
706                             username=user_creds.get_username(),
707                             password=password,
708                             domain=user_creds.get_domain(),
709                             realm=user_creds.get_realm(),
710                             workstation=user_creds.get_workstation(),
711                             dn=str(user_dn))
712
713         # The connection should succeed.
714         self.assertEqual(result, ConnectionResult.SUCCESS)
715
716     # Lock out the account while holding a transaction lock, then release the
717     # lock. A logon attempt already in progress should reread the account
718     # details and recognise the account is locked out. The account can
719     # additionally be renamed within the transaction to ensure that, by using
720     # the GUID, rereading the account's details still succeeds.
721     def do_lockout_transaction(self, connect_fn,
722                                rename=False,
723                                correct_pw=True):
724         # Create the user account for testing.
725         user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
726                                            use_cache=False)
727         user_dn = user_creds.get_dn()
728
729         admin_creds = self.get_admin_creds()
730         lp = self.get_lp()
731
732         # Get a connection to our local SamDB.
733         samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
734                               credentials=admin_creds)
735         self.assertLocalSamDB(samdb)
736
737         password = user_creds.get_password()
738         if not correct_pw:
739             password = password[:-1]
740
741         # Prepare to connect to the server.
742         with futures.ProcessPoolExecutor(max_workers=1) as executor:
743             our_pipe, their_pipe = Pipe(duplex=True)
744             connect_future = executor.submit(
745                 connect_fn,
746                 pipe=their_pipe,
747                 url=f'ldap://{samdb.host_dns_name()}',
748                 hostname=samdb.host_dns_name(),
749                 username=user_creds.get_username(),
750                 password=password,
751                 domain=user_creds.get_domain(),
752                 realm=user_creds.get_realm(),
753                 workstation=user_creds.get_workstation(),
754                 dn=str(user_dn))
755
756             # Wait until the test process indicates it's ready.
757             self.wait_for_ready(our_pipe, connect_future)
758
759             # Take out a transaction.
760             samdb.transaction_start()
761             try:
762                 # Lock out the account. We must do it using an actual password
763                 # check like so, rather than directly with a database
764                 # modification, so that the account is also added to the
765                 # auxiliary bad password database.
766
767                 old_utf16pw = f'"Secret007"'.encode('utf-16le')  # invalid pwd
768                 new_utf16pw = f'"Secret008"'.encode('utf-16le')
769
770                 msg = ldb.Message(user_dn)
771                 msg['0'] = ldb.MessageElement(old_utf16pw,
772                                               ldb.FLAG_MOD_DELETE,
773                                               'unicodePwd')
774                 msg['1'] = ldb.MessageElement(new_utf16pw,
775                                               ldb.FLAG_MOD_ADD,
776                                               'unicodePwd')
777
778                 for i in range(self.lockout_threshold):
779                     try:
780                         samdb.modify(msg)
781                     except ldb.LdbError as err:
782                         num, estr = err.args
783
784                         # We get an error, but the bad password count should
785                         # still be updated.
786                         self.assertEqual(num, ldb.ERR_OPERATIONS_ERROR)
787                         self.assertEqual('Failed to obtain remote address for '
788                                          'the LDAP client while changing the '
789                                          'password',
790                                          estr)
791                     else:
792                         self.fail('pwd change should have failed')
793
794                 # Ensure the account is locked out.
795
796                 res = samdb.search(
797                     user_dn, scope=ldb.SCOPE_BASE,
798                     attrs=['msDS-User-Account-Control-Computed'])
799                 self.assertEqual(1, len(res))
800
801                 uac = int(res[0].get('msDS-User-Account-Control-Computed',
802                                      idx=0))
803                 self.assertTrue(uac & dsdb.UF_LOCKOUT)
804
805                 # Now the bad password database has been updated, inform the
806                 # test process that it may proceed.
807                 our_pipe.send_bytes(b'0')
808
809                 # Wait one second to ensure the test process hits the
810                 # transaction lock.
811                 time.sleep(1)
812
813                 if rename:
814                     # While we're at it, rename the account to ensure that is
815                     # also safe if a race occurs.
816                     msg = ldb.Message(user_dn)
817                     new_username = self.get_new_username()
818                     msg['sAMAccountName'] = ldb.MessageElement(
819                         new_username,
820                         ldb.FLAG_MOD_REPLACE,
821                         'sAMAccountName')
822                     samdb.modify(msg)
823
824             except Exception:
825                 samdb.transaction_cancel()
826                 raise
827
828             # Commit the local transaction.
829             samdb.transaction_commit()
830
831             result = connect_future.result(timeout=5)
832             self.assertEqual(result, ConnectionResult.LOCKED_OUT)
833
834     # Update the bad password count while holding a transaction lock, then
835     # release the lock. A logon attempt already in progress should reread the
836     # account details and ensure the bad password count is atomically
837     # updated. The account can additionally be renamed within the transaction
838     # to ensure that, by using the GUID, rereading the account's details still
839     # succeeds.
840     def do_bad_pwd_count_transaction(self, connect_fn, rename=False):
841         # Create the user account for testing.
842         user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
843                                            use_cache=False)
844         user_dn = user_creds.get_dn()
845
846         admin_creds = self.get_admin_creds()
847         lp = self.get_lp()
848
849         # Get a connection to our local SamDB.
850         samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
851                               credentials=admin_creds)
852         self.assertLocalSamDB(samdb)
853
854         # Prepare to connect to the server with an invalid password.
855         with futures.ProcessPoolExecutor(max_workers=1) as executor:
856             our_pipe, their_pipe = Pipe(duplex=True)
857             connect_future = executor.submit(
858                 connect_fn,
859                 pipe=their_pipe,
860                 url=f'ldap://{samdb.host_dns_name()}',
861                 hostname=samdb.host_dns_name(),
862                 username=user_creds.get_username(),
863                 password=user_creds.get_password()[:-1],  # invalid password
864                 domain=user_creds.get_domain(),
865                 realm=user_creds.get_realm(),
866                 workstation=user_creds.get_workstation(),
867                 dn=str(user_dn))
868
869             # Wait until the test process indicates it's ready.
870             self.wait_for_ready(our_pipe, connect_future)
871
872             # Take out a transaction.
873             samdb.transaction_start()
874             try:
875                 # Inform the test process that it may proceed.
876                 our_pipe.send_bytes(b'0')
877
878                 # Wait one second to ensure the test process hits the
879                 # transaction lock.
880                 time.sleep(1)
881
882                 # Set badPwdCount to 1.
883                 msg = ldb.Message(user_dn)
884                 now = int(time.time())
885                 bad_pwd_time = unix2nttime(now)
886                 msg['badPwdCount'] = ldb.MessageElement(
887                     '1',
888                     ldb.FLAG_MOD_REPLACE,
889                     'badPwdCount')
890                 msg['badPasswordTime'] = ldb.MessageElement(
891                     str(bad_pwd_time),
892                     ldb.FLAG_MOD_REPLACE,
893                     'badPasswordTime')
894                 if rename:
895                     # While we're at it, rename the account to ensure that is
896                     # also safe if a race occurs.
897                     new_username = self.get_new_username()
898                     msg['sAMAccountName'] = ldb.MessageElement(
899                         new_username,
900                         ldb.FLAG_MOD_REPLACE,
901                         'sAMAccountName')
902                 samdb.modify(msg)
903
904                 # Ensure the account is not yet locked out.
905
906                 res = samdb.search(
907                     user_dn, scope=ldb.SCOPE_BASE,
908                     attrs=['msDS-User-Account-Control-Computed'])
909                 self.assertEqual(1, len(res))
910
911                 uac = int(res[0].get('msDS-User-Account-Control-Computed',
912                                      idx=0))
913                 self.assertFalse(uac & dsdb.UF_LOCKOUT)
914             except Exception:
915                 samdb.transaction_cancel()
916                 raise
917
918             # Commit the local transaction.
919             samdb.transaction_commit()
920
921             result = connect_future.result(timeout=5)
922             self.assertEqual(result, ConnectionResult.WRONG_PASSWORD, result)
923
924         # Check that badPwdCount has now increased to 2.
925
926         res = samdb.search(user_dn,
927                            scope=ldb.SCOPE_BASE,
928                            attrs=['badPwdCount'])
929         self.assertEqual(1, len(res))
930
931         bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
932         self.assertEqual(2, bad_pwd_count)
933
934     # Attempt to log in to the account with an incorrect password, using
935     # lockoutThreshold+1 simultaneous attempts. We should get three 'wrong
936     # password' errors and one 'locked out' error, showing that the bad
937     # password count is checked and incremented atomically.
938     def do_lockout_race(self, connect_fn):
939         # Create the user account for testing.
940         user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
941                                            use_cache=False)
942         user_dn = user_creds.get_dn()
943
944         admin_creds = self.get_admin_creds()
945         lp = self.get_lp()
946
947         # Get a connection to our local SamDB.
948         samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
949                               credentials=admin_creds)
950         self.assertLocalSamDB(samdb)
951
952         # Prepare to connect to the server with an invalid password, using four
953         # simultaneous requests. Only three of those attempts should get
954         # through before the account is locked out.
955         num_attempts = self.lockout_threshold + 1
956         with futures.ProcessPoolExecutor(max_workers=num_attempts) as executor:
957             connect_futures = []
958             our_pipes = []
959             for i in range(num_attempts):
960                 our_pipe, their_pipe = Pipe(duplex=True)
961                 our_pipes.append(our_pipe)
962
963                 connect_future = executor.submit(
964                     connect_fn,
965                     pipe=their_pipe,
966                     url=f'ldap://{samdb.host_dns_name()}',
967                     hostname=samdb.host_dns_name(),
968                     username=user_creds.get_username(),
969                     password=user_creds.get_password()[:-1],  # invalid pw
970                     domain=user_creds.get_domain(),
971                     realm=user_creds.get_realm(),
972                     workstation=user_creds.get_workstation(),
973                     dn=str(user_dn))
974                 connect_futures.append(connect_future)
975
976                 # Wait until the test process indicates it's ready.
977                 self.wait_for_ready(our_pipe, connect_future)
978
979             # Take out a transaction.
980             samdb.transaction_start()
981             try:
982                 # Inform the test processes that they may proceed.
983                 for our_pipe in our_pipes:
984                     our_pipe.send_bytes(b'0')
985
986                 # Wait one second to ensure the test processes hit the
987                 # transaction lock.
988                 time.sleep(1)
989             except Exception:
990                 samdb.transaction_cancel()
991                 raise
992
993             # Commit the local transaction.
994             samdb.transaction_commit()
995
996             lockouts = 0
997             wrong_passwords = 0
998             for i, connect_future in enumerate(connect_futures):
999                 result = connect_future.result(timeout=5)
1000                 if result == ConnectionResult.LOCKED_OUT:
1001                     lockouts += 1
1002                 elif result == ConnectionResult.WRONG_PASSWORD:
1003                     wrong_passwords += 1
1004                 else:
1005                     self.fail(f'process {i} gave an unexpected result '
1006                               f'{result}')
1007
1008             self.assertEqual(wrong_passwords, self.lockout_threshold)
1009             self.assertEqual(lockouts, num_attempts - self.lockout_threshold)
1010
1011         # Ensure the account is now locked out.
1012
1013         res = samdb.search(
1014             user_dn, scope=ldb.SCOPE_BASE,
1015             attrs=['badPwdCount',
1016                    'msDS-User-Account-Control-Computed'])
1017         self.assertEqual(1, len(res))
1018
1019         bad_pwd_count = int(res[0].get('badPwdCount', idx=0))
1020         self.assertEqual(self.lockout_threshold, bad_pwd_count)
1021
1022         uac = int(res[0].get('msDS-User-Account-Control-Computed',
1023                              idx=0))
1024         self.assertTrue(uac & dsdb.UF_LOCKOUT)
1025
1026     # Test that logon is possible even while we locally hold a transaction
1027     # lock. This test only works with NTLM authentication; Kerberos
1028     # authentication must take out a transaction to update the logonCount
1029     # attribute, and LDAP and SAMR password changes both take out a transaction
1030     # to effect the password change. NTLM is the only logon method that does
1031     # not require a transaction, and can thus be performed while we're holding
1032     # the lock.
1033     def do_logon_without_transaction(self, connect_fn):
1034         # Create the user account for testing.
1035         user_creds = self.get_cached_creds(account_type=self.AccountType.USER,
1036                                            use_cache=False)
1037         user_dn = user_creds.get_dn()
1038
1039         admin_creds = self.get_admin_creds()
1040         lp = self.get_lp()
1041
1042         # Get a connection to our local SamDB.
1043         samdb = connect_samdb(samdb_url=lp.samdb_url(), lp=lp,
1044                               credentials=admin_creds)
1045         self.assertLocalSamDB(samdb)
1046
1047         password = user_creds.get_password()
1048
1049         # Prepare to connect to the server with a valid password.
1050         with futures.ProcessPoolExecutor(max_workers=1) as executor:
1051             our_pipe, their_pipe = Pipe(duplex=True)
1052             connect_future = executor.submit(
1053                 connect_fn,
1054                 pipe=their_pipe,
1055                 url=f'ldap://{samdb.host_dns_name()}',
1056                 hostname=samdb.host_dns_name(),
1057                 username=user_creds.get_username(),
1058                 password=password,
1059                 domain=user_creds.get_domain(),
1060                 realm=user_creds.get_realm(),
1061                 workstation=user_creds.get_workstation(),
1062                 dn=str(user_dn))
1063
1064             # Wait until the test process indicates it's ready.
1065             self.wait_for_ready(our_pipe, connect_future)
1066
1067             # Take out a transaction.
1068             samdb.transaction_start()
1069             try:
1070                 # Inform the test process that it may proceed.
1071                 our_pipe.send_bytes(b'0')
1072
1073                 # The connection should succeed, despite our holding a
1074                 # transaction.
1075                 result = connect_future.result(timeout=5)
1076                 self.assertEqual(result, ConnectionResult.SUCCESS)
1077             except Exception:
1078                 samdb.transaction_cancel()
1079                 raise
1080
1081             # Commit the local transaction.
1082             samdb.transaction_commit()
1083
1084
1085 if __name__ == '__main__':
1086     global_asn1_print = False
1087     global_hexdump = False
1088     import unittest
1089     unittest.main()