b418a087df808d4e58745ea867be8c63157aefb2
[kseeger/samba-autobuild-v4-16-test/.git] / python / samba / tests / krb5 / kdc_tgs_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2020 Catalyst.Net Ltd
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 import ldb
24
25
26 from samba import dsdb
27
28 from samba.dcerpc import krb5pac, security
29
30 sys.path.insert(0, "bin/python")
31 os.environ["PYTHONUNBUFFERED"] = "1"
32
33 import samba.tests.krb5.kcrypto as kcrypto
34 from samba.tests.krb5.kdc_base_test import KDCBaseTest
35 from samba.tests.krb5.raw_testcase import Krb5EncryptionKey
36 from samba.tests.krb5.rfc4120_constants import (
37     AES256_CTS_HMAC_SHA1_96,
38     ARCFOUR_HMAC_MD5,
39     KRB_ERROR,
40     KRB_TGS_REP,
41     KDC_ERR_BADMATCH,
42     KDC_ERR_GENERIC,
43     KDC_ERR_MODIFIED,
44     KDC_ERR_NOT_US,
45     KDC_ERR_POLICY,
46     KDC_ERR_C_PRINCIPAL_UNKNOWN,
47     KDC_ERR_S_PRINCIPAL_UNKNOWN,
48     KDC_ERR_TGT_REVOKED,
49     KRB_ERR_TKT_NYV,
50     KDC_ERR_WRONG_REALM,
51     NT_ENTERPRISE_PRINCIPAL,
52     NT_PRINCIPAL,
53     NT_SRV_INST,
54 )
55 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
56
57 global_asn1_print = False
58 global_hexdump = False
59
60
61 class KdcTgsTests(KDCBaseTest):
62
63     def setUp(self):
64         super().setUp()
65         self.do_asn1_print = global_asn1_print
66         self.do_hexdump = global_hexdump
67
68     def test_tgs_req_cname_does_not_not_match_authenticator_cname(self):
69         ''' Try and obtain a ticket from the TGS, but supply a cname
70             that differs from that provided to the krbtgt
71         '''
72         # Create the user account
73         samdb = self.get_samdb()
74         user_name = "tsttktusr"
75         (uc, _) = self.create_account(samdb, user_name)
76         realm = uc.get_realm().lower()
77
78         # Do the initial AS-REQ, should get a pre-authentication required
79         # response
80         etype = (AES256_CTS_HMAC_SHA1_96,)
81         cname = self.PrincipalName_create(
82             name_type=NT_PRINCIPAL, names=[user_name])
83         sname = self.PrincipalName_create(
84             name_type=NT_SRV_INST, names=["krbtgt", realm])
85
86         rep = self.as_req(cname, sname, realm, etype)
87         self.check_pre_authentication(rep)
88
89         # Do the next AS-REQ
90         padata = self.get_enc_timestamp_pa_data(uc, rep)
91         key = self.get_as_rep_key(uc, rep)
92         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
93         self.check_as_reply(rep)
94
95         # Request a service ticket, but use a cname that does not match
96         # that in the original AS-REQ
97         enc_part2 = self.get_as_rep_enc_data(key, rep)
98         key = self.EncryptionKey_import(enc_part2['key'])
99         ticket = rep['ticket']
100
101         cname = self.PrincipalName_create(
102             name_type=NT_PRINCIPAL,
103             names=["Administrator"])
104         sname = self.PrincipalName_create(
105             name_type=NT_PRINCIPAL,
106             names=["host", samdb.host_dns_name()])
107
108         (rep, enc_part) = self.tgs_req(cname, sname, realm, ticket, key, etype,
109                                        expected_error_mode=KDC_ERR_BADMATCH,
110                                        expect_edata=False)
111
112         self.assertIsNone(
113             enc_part,
114             "rep = {%s}, enc_part = {%s}" % (rep, enc_part))
115         self.assertEqual(KRB_ERROR, rep['msg-type'], "rep = {%s}" % rep)
116         self.assertEqual(
117             KDC_ERR_BADMATCH,
118             rep['error-code'],
119             "rep = {%s}" % rep)
120
121     def test_ldap_service_ticket(self):
122         '''Get a ticket to the ldap service
123         '''
124         # Create the user account
125         samdb = self.get_samdb()
126         user_name = "tsttktusr"
127         (uc, _) = self.create_account(samdb, user_name)
128         realm = uc.get_realm().lower()
129
130         # Do the initial AS-REQ, should get a pre-authentication required
131         # response
132         etype = (AES256_CTS_HMAC_SHA1_96,)
133         cname = self.PrincipalName_create(
134             name_type=NT_PRINCIPAL, names=[user_name])
135         sname = self.PrincipalName_create(
136             name_type=NT_SRV_INST, names=["krbtgt", realm])
137
138         rep = self.as_req(cname, sname, realm, etype)
139         self.check_pre_authentication(rep)
140
141         # Do the next AS-REQ
142         padata = self.get_enc_timestamp_pa_data(uc, rep)
143         key = self.get_as_rep_key(uc, rep)
144         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
145         self.check_as_reply(rep)
146
147         enc_part2 = self.get_as_rep_enc_data(key, rep)
148         key = self.EncryptionKey_import(enc_part2['key'])
149         ticket = rep['ticket']
150
151         # Request a ticket to the ldap service
152         sname = self.PrincipalName_create(
153             name_type=NT_SRV_INST,
154             names=["ldap", samdb.host_dns_name()])
155
156         (rep, _) = self.tgs_req(
157             cname, sname, uc.get_realm(), ticket, key, etype,
158             service_creds=self.get_dc_creds())
159
160         self.check_tgs_reply(rep)
161
162     def test_get_ticket_for_host_service_of_machine_account(self):
163
164         # Create a user and machine account for the test.
165         #
166         samdb = self.get_samdb()
167         user_name = "tsttktusr"
168         (uc, dn) = self.create_account(samdb, user_name)
169         (mc, _) = self.create_account(samdb, "tsttktmac",
170                                       account_type=self.AccountType.COMPUTER)
171         realm = uc.get_realm().lower()
172
173         # Do the initial AS-REQ, should get a pre-authentication required
174         # response
175         etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
176         cname = self.PrincipalName_create(
177             name_type=NT_PRINCIPAL, names=[user_name])
178         sname = self.PrincipalName_create(
179             name_type=NT_SRV_INST, names=["krbtgt", realm])
180
181         rep = self.as_req(cname, sname, realm, etype)
182         self.check_pre_authentication(rep)
183
184         # Do the next AS-REQ
185         padata = self.get_enc_timestamp_pa_data(uc, rep)
186         key = self.get_as_rep_key(uc, rep)
187         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
188         self.check_as_reply(rep)
189
190         # Request a ticket to the host service on the machine account
191         ticket = rep['ticket']
192         enc_part2 = self.get_as_rep_enc_data(key, rep)
193         key = self.EncryptionKey_import(enc_part2['key'])
194         cname = self.PrincipalName_create(
195             name_type=NT_PRINCIPAL,
196             names=[user_name])
197         sname = self.PrincipalName_create(
198             name_type=NT_PRINCIPAL,
199             names=[mc.get_username()])
200
201         (rep, enc_part) = self.tgs_req(
202             cname, sname, uc.get_realm(), ticket, key, etype,
203             service_creds=mc)
204         self.check_tgs_reply(rep)
205
206         # Check the contents of the service ticket
207         ticket = rep['ticket']
208         enc_part = self.decode_service_ticket(mc, ticket)
209
210         pac_data = self.get_pac_data(enc_part['authorization-data'])
211         sid = self.get_objectSid(samdb, dn)
212         upn = "%s@%s" % (uc.get_username(), realm)
213         self.assertEqual(
214             uc.get_username(),
215             str(pac_data.account_name),
216             "rep = {%s},%s" % (rep, pac_data))
217         self.assertEqual(
218             uc.get_username(),
219             pac_data.logon_name,
220             "rep = {%s},%s" % (rep, pac_data))
221         self.assertEqual(
222             uc.get_realm(),
223             pac_data.domain_name,
224             "rep = {%s},%s" % (rep, pac_data))
225         self.assertEqual(
226             upn,
227             pac_data.upn,
228             "rep = {%s},%s" % (rep, pac_data))
229         self.assertEqual(
230             sid,
231             pac_data.account_sid,
232             "rep = {%s},%s" % (rep, pac_data))
233
234     def _make_tgs_request(self, client_creds, service_creds, tgt,
235                           client_account=None,
236                           client_name_type=NT_PRINCIPAL,
237                           kdc_options=None,
238                           pac_request=None, expect_pac=True,
239                           expect_error=False,
240                           expected_cname=None,
241                           expected_account_name=None,
242                           expected_upn_name=None,
243                           expected_sid=None):
244         if client_account is None:
245             client_account = client_creds.get_username()
246         cname = self.PrincipalName_create(name_type=client_name_type,
247                                           names=client_account.split('/'))
248
249         service_account = service_creds.get_username()
250         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
251                                           names=[service_account])
252
253         realm = service_creds.get_realm()
254
255         expected_crealm = realm
256         if expected_cname is None:
257             expected_cname = cname
258         expected_srealm = realm
259         expected_sname = sname
260
261         expected_supported_etypes = service_creds.tgs_supported_enctypes
262
263         etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
264
265         if kdc_options is None:
266             kdc_options = 'canonicalize'
267         kdc_options = str(krb5_asn1.KDCOptions(kdc_options))
268
269         target_decryption_key = self.TicketDecryptionKey_from_creds(
270             service_creds)
271
272         authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
273
274         if expect_error:
275             expected_error_mode = KDC_ERR_TGT_REVOKED
276             check_error_fn = self.generic_check_kdc_error
277             check_rep_fn = None
278         else:
279             expected_error_mode = 0
280             check_error_fn = None
281             check_rep_fn = self.generic_check_kdc_rep
282
283         kdc_exchange_dict = self.tgs_exchange_dict(
284             expected_crealm=expected_crealm,
285             expected_cname=expected_cname,
286             expected_srealm=expected_srealm,
287             expected_sname=expected_sname,
288             expected_account_name=expected_account_name,
289             expected_upn_name=expected_upn_name,
290             expected_sid=expected_sid,
291             expected_supported_etypes=expected_supported_etypes,
292             ticket_decryption_key=target_decryption_key,
293             check_error_fn=check_error_fn,
294             check_rep_fn=check_rep_fn,
295             check_kdc_private_fn=self.generic_check_kdc_private,
296             expected_error_mode=expected_error_mode,
297             tgt=tgt,
298             authenticator_subkey=authenticator_subkey,
299             kdc_options=kdc_options,
300             pac_request=pac_request,
301             expect_pac=expect_pac,
302             expect_edata=False)
303
304         rep = self._generic_kdc_exchange(kdc_exchange_dict,
305                                          cname=cname,
306                                          realm=realm,
307                                          sname=sname,
308                                          etypes=etypes)
309         if expect_error:
310             self.check_error_rep(rep, expected_error_mode)
311
312             return None
313         else:
314             self.check_reply(rep, KRB_TGS_REP)
315
316             return kdc_exchange_dict['rep_ticket_creds']
317
318     def test_request(self):
319         client_creds = self.get_client_creds()
320         service_creds = self.get_service_creds()
321
322         tgt = self.get_tgt(client_creds)
323
324         pac = self.get_ticket_pac(tgt)
325         self.assertIsNotNone(pac)
326
327         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
328
329         pac = self.get_ticket_pac(ticket)
330         self.assertIsNotNone(pac)
331
332     def test_request_no_pac(self):
333         client_creds = self.get_client_creds()
334         service_creds = self.get_service_creds()
335
336         tgt = self.get_tgt(client_creds, pac_request=False)
337
338         pac = self.get_ticket_pac(tgt)
339         self.assertIsNotNone(pac)
340
341         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
342                                         pac_request=False, expect_pac=False)
343
344         pac = self.get_ticket_pac(ticket, expect_pac=False)
345         self.assertIsNone(pac)
346
347     def test_request_enterprise_canon(self):
348         upn = self.get_new_username()
349         client_creds = self.get_cached_creds(
350             account_type=self.AccountType.USER,
351             opts={'upn': upn})
352         service_creds = self.get_service_creds()
353
354         user_name = client_creds.get_username()
355         realm = client_creds.get_realm()
356         client_account = f'{user_name}@{realm}'
357
358         expected_cname = self.PrincipalName_create(
359             name_type=NT_PRINCIPAL,
360             names=[user_name])
361
362         kdc_options = 'canonicalize'
363
364         tgt = self.get_tgt(client_creds,
365                            client_account=client_account,
366                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
367                            expected_cname=expected_cname,
368                            expected_account_name=user_name,
369                            kdc_options=kdc_options)
370
371         self._make_tgs_request(
372             client_creds, service_creds, tgt,
373             client_account=client_account,
374             client_name_type=NT_ENTERPRISE_PRINCIPAL,
375             expected_cname=expected_cname,
376             expected_account_name=user_name,
377             kdc_options=kdc_options)
378
379     def test_request_enterprise_canon_case(self):
380         upn = self.get_new_username()
381         client_creds = self.get_cached_creds(
382             account_type=self.AccountType.USER,
383             opts={'upn': upn})
384         service_creds = self.get_service_creds()
385
386         user_name = client_creds.get_username()
387         realm = client_creds.get_realm().lower()
388         client_account = f'{user_name}@{realm}'
389
390         expected_cname = self.PrincipalName_create(
391             name_type=NT_PRINCIPAL,
392             names=[user_name])
393
394         kdc_options = 'canonicalize'
395
396         tgt = self.get_tgt(client_creds,
397                            client_account=client_account,
398                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
399                            expected_cname=expected_cname,
400                            expected_account_name=user_name,
401                            kdc_options=kdc_options)
402
403         self._make_tgs_request(
404             client_creds, service_creds, tgt,
405             client_account=client_account,
406             client_name_type=NT_ENTERPRISE_PRINCIPAL,
407             expected_cname=expected_cname,
408             expected_account_name=user_name,
409             kdc_options=kdc_options)
410
411     def test_request_enterprise_canon_mac(self):
412         upn = self.get_new_username()
413         client_creds = self.get_cached_creds(
414             account_type=self.AccountType.COMPUTER,
415             opts={'upn': upn})
416         service_creds = self.get_service_creds()
417
418         user_name = client_creds.get_username()
419         realm = client_creds.get_realm()
420         client_account = f'{user_name}@{realm}'
421
422         expected_cname = self.PrincipalName_create(
423             name_type=NT_PRINCIPAL,
424             names=[user_name])
425
426         kdc_options = 'canonicalize'
427
428         tgt = self.get_tgt(client_creds,
429                            client_account=client_account,
430                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
431                            expected_cname=expected_cname,
432                            expected_account_name=user_name,
433                            kdc_options=kdc_options)
434
435         self._make_tgs_request(
436             client_creds, service_creds, tgt,
437             client_account=client_account,
438             client_name_type=NT_ENTERPRISE_PRINCIPAL,
439             expected_cname=expected_cname,
440             expected_account_name=user_name,
441             kdc_options=kdc_options)
442
443     def test_request_enterprise_canon_case_mac(self):
444         upn = self.get_new_username()
445         client_creds = self.get_cached_creds(
446             account_type=self.AccountType.COMPUTER,
447             opts={'upn': upn})
448         service_creds = self.get_service_creds()
449
450         user_name = client_creds.get_username()
451         realm = client_creds.get_realm().lower()
452         client_account = f'{user_name}@{realm}'
453
454         expected_cname = self.PrincipalName_create(
455             name_type=NT_PRINCIPAL,
456             names=[user_name])
457
458         kdc_options = 'canonicalize'
459
460         tgt = self.get_tgt(client_creds,
461                            client_account=client_account,
462                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
463                            expected_cname=expected_cname,
464                            expected_account_name=user_name,
465                            kdc_options=kdc_options)
466
467         self._make_tgs_request(
468             client_creds, service_creds, tgt,
469             client_account=client_account,
470             client_name_type=NT_ENTERPRISE_PRINCIPAL,
471             expected_cname=expected_cname,
472             expected_account_name=user_name,
473             kdc_options=kdc_options)
474
475     def test_request_enterprise_no_canon(self):
476         upn = self.get_new_username()
477         client_creds = self.get_cached_creds(
478             account_type=self.AccountType.USER,
479             opts={'upn': upn})
480         service_creds = self.get_service_creds()
481
482         user_name = client_creds.get_username()
483         realm = client_creds.get_realm()
484         client_account = f'{user_name}@{realm}'
485
486         kdc_options = '0'
487
488         tgt = self.get_tgt(client_creds,
489                            client_account=client_account,
490                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
491                            expected_account_name=user_name,
492                            kdc_options=kdc_options)
493
494         self._make_tgs_request(
495             client_creds, service_creds, tgt,
496             client_account=client_account,
497             client_name_type=NT_ENTERPRISE_PRINCIPAL,
498             expected_account_name=user_name,
499             kdc_options=kdc_options)
500
501     def test_request_enterprise_no_canon_case(self):
502         upn = self.get_new_username()
503         client_creds = self.get_cached_creds(
504             account_type=self.AccountType.USER,
505             opts={'upn': upn})
506         service_creds = self.get_service_creds()
507
508         user_name = client_creds.get_username()
509         realm = client_creds.get_realm().lower()
510         client_account = f'{user_name}@{realm}'
511
512         kdc_options = '0'
513
514         tgt = self.get_tgt(client_creds,
515                            client_account=client_account,
516                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
517                            expected_account_name=user_name,
518                            kdc_options=kdc_options)
519
520         self._make_tgs_request(
521             client_creds, service_creds, tgt,
522             client_account=client_account,
523             client_name_type=NT_ENTERPRISE_PRINCIPAL,
524             expected_account_name=user_name,
525             kdc_options=kdc_options)
526
527     def test_request_enterprise_no_canon_mac(self):
528         upn = self.get_new_username()
529         client_creds = self.get_cached_creds(
530             account_type=self.AccountType.COMPUTER,
531             opts={'upn': upn})
532         service_creds = self.get_service_creds()
533
534         user_name = client_creds.get_username()
535         realm = client_creds.get_realm()
536         client_account = f'{user_name}@{realm}'
537
538         kdc_options = '0'
539
540         tgt = self.get_tgt(client_creds,
541                            client_account=client_account,
542                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
543                            expected_account_name=user_name,
544                            kdc_options=kdc_options)
545
546         self._make_tgs_request(
547             client_creds, service_creds, tgt,
548             client_account=client_account,
549             client_name_type=NT_ENTERPRISE_PRINCIPAL,
550             expected_account_name=user_name,
551             kdc_options=kdc_options)
552
553     def test_request_enterprise_no_canon_case_mac(self):
554         upn = self.get_new_username()
555         client_creds = self.get_cached_creds(
556             account_type=self.AccountType.COMPUTER,
557             opts={'upn': upn})
558         service_creds = self.get_service_creds()
559
560         user_name = client_creds.get_username()
561         realm = client_creds.get_realm().lower()
562         client_account = f'{user_name}@{realm}'
563
564         kdc_options = '0'
565
566         tgt = self.get_tgt(client_creds,
567                            client_account=client_account,
568                            client_name_type=NT_ENTERPRISE_PRINCIPAL,
569                            expected_account_name=user_name,
570                            kdc_options=kdc_options)
571
572         self._make_tgs_request(
573             client_creds, service_creds, tgt,
574             client_account=client_account,
575             client_name_type=NT_ENTERPRISE_PRINCIPAL,
576             expected_account_name=user_name,
577             kdc_options=kdc_options)
578
579     def test_client_no_auth_data_required(self):
580         client_creds = self.get_cached_creds(
581             account_type=self.AccountType.USER,
582             opts={'no_auth_data_required': True})
583         service_creds = self.get_service_creds()
584
585         tgt = self.get_tgt(client_creds)
586
587         pac = self.get_ticket_pac(tgt)
588         self.assertIsNotNone(pac)
589
590         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
591
592         pac = self.get_ticket_pac(ticket)
593         self.assertIsNotNone(pac)
594
595     def test_no_pac_client_no_auth_data_required(self):
596         client_creds = self.get_cached_creds(
597             account_type=self.AccountType.USER,
598             opts={'no_auth_data_required': True})
599         service_creds = self.get_service_creds()
600
601         tgt = self.get_tgt(client_creds)
602
603         pac = self.get_ticket_pac(tgt)
604         self.assertIsNotNone(pac)
605
606         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
607                                         pac_request=False, expect_pac=True)
608
609         pac = self.get_ticket_pac(ticket)
610         self.assertIsNotNone(pac)
611
612     def test_service_no_auth_data_required(self):
613         client_creds = self.get_client_creds()
614         service_creds = self.get_cached_creds(
615             account_type=self.AccountType.COMPUTER,
616             opts={'no_auth_data_required': True})
617
618         tgt = self.get_tgt(client_creds)
619
620         pac = self.get_ticket_pac(tgt)
621         self.assertIsNotNone(pac)
622
623         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
624                                         expect_pac=False)
625
626         pac = self.get_ticket_pac(ticket, expect_pac=False)
627         self.assertIsNone(pac)
628
629     def test_no_pac_service_no_auth_data_required(self):
630         client_creds = self.get_client_creds()
631         service_creds = self.get_cached_creds(
632             account_type=self.AccountType.COMPUTER,
633             opts={'no_auth_data_required': True})
634
635         tgt = self.get_tgt(client_creds, pac_request=False)
636
637         pac = self.get_ticket_pac(tgt)
638         self.assertIsNotNone(pac)
639
640         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
641                                         pac_request=False, expect_pac=False)
642
643         pac = self.get_ticket_pac(ticket, expect_pac=False)
644         self.assertIsNone(pac)
645
646     def test_remove_pac_service_no_auth_data_required(self):
647         client_creds = self.get_client_creds()
648         service_creds = self.get_cached_creds(
649             account_type=self.AccountType.COMPUTER,
650             opts={'no_auth_data_required': True})
651
652         tgt = self.modified_ticket(self.get_tgt(client_creds),
653                                    exclude_pac=True)
654
655         pac = self.get_ticket_pac(tgt, expect_pac=False)
656         self.assertIsNone(pac)
657
658         self._make_tgs_request(client_creds, service_creds, tgt,
659                                expect_error=True)
660
661     def test_remove_pac_client_no_auth_data_required(self):
662         client_creds = self.get_cached_creds(
663             account_type=self.AccountType.USER,
664             opts={'no_auth_data_required': True})
665         service_creds = self.get_service_creds()
666
667         tgt = self.modified_ticket(self.get_tgt(client_creds),
668                                    exclude_pac=True)
669
670         pac = self.get_ticket_pac(tgt, expect_pac=False)
671         self.assertIsNone(pac)
672
673         self._make_tgs_request(client_creds, service_creds, tgt,
674                                expect_error=True)
675
676     def test_remove_pac(self):
677         client_creds = self.get_client_creds()
678         service_creds = self.get_service_creds()
679
680         tgt = self.modified_ticket(self.get_tgt(client_creds),
681                                    exclude_pac=True)
682
683         pac = self.get_ticket_pac(tgt, expect_pac=False)
684         self.assertIsNone(pac)
685
686         self._make_tgs_request(client_creds, service_creds, tgt,
687                                expect_error=True)
688
689     def test_upn_dns_info_ex_user(self):
690         client_creds = self.get_client_creds()
691         self._run_upn_dns_info_ex_test(client_creds)
692
693     def test_upn_dns_info_ex_mac(self):
694         mach_creds = self.get_mach_creds()
695         self._run_upn_dns_info_ex_test(mach_creds)
696
697     def test_upn_dns_info_ex_upn_user(self):
698         client_creds = self.get_cached_creds(
699             account_type=self.AccountType.USER,
700             opts={'upn': 'upn_dns_info_test_upn0@bar'})
701         self._run_upn_dns_info_ex_test(client_creds)
702
703     def test_upn_dns_info_ex_upn_mac(self):
704         mach_creds = self.get_cached_creds(
705             account_type=self.AccountType.COMPUTER,
706             opts={'upn': 'upn_dns_info_test_upn1@bar'})
707         self._run_upn_dns_info_ex_test(mach_creds)
708
709     def _run_upn_dns_info_ex_test(self, client_creds):
710         service_creds = self.get_service_creds()
711
712         samdb = self.get_samdb()
713         dn = client_creds.get_dn()
714
715         account_name = client_creds.get_username()
716         upn_name = client_creds.get_upn()
717         if upn_name is None:
718             realm = client_creds.get_realm().lower()
719             upn_name = f'{account_name}@{realm}'
720         sid = self.get_objectSid(samdb, dn)
721
722         tgt = self.get_tgt(client_creds,
723                            expected_account_name=account_name,
724                            expected_upn_name=upn_name,
725                            expected_sid=sid)
726
727         self._make_tgs_request(client_creds, service_creds, tgt,
728                                expected_account_name=account_name,
729                                expected_upn_name=upn_name,
730                                expected_sid=sid)
731
732     # Test making a TGS request.
733     def test_tgs_req(self):
734         creds = self._get_creds()
735         tgt = self._get_tgt(creds)
736         self._run_tgs(tgt, expected_error=0)
737
738     def test_renew_req(self):
739         creds = self._get_creds()
740         tgt = self._get_tgt(creds, renewable=True)
741         self._renew_tgt(tgt, expected_error=0,
742                         expect_pac_attrs=True,
743                         expect_pac_attrs_pac_request=True,
744                         expect_requester_sid=True)
745
746     def test_validate_req(self):
747         creds = self._get_creds()
748         tgt = self._get_tgt(creds, invalid=True)
749         self._validate_tgt(tgt, expected_error=0,
750                            expect_pac_attrs=True,
751                            expect_pac_attrs_pac_request=True,
752                            expect_requester_sid=True)
753
754     def test_s4u2self_req(self):
755         creds = self._get_creds()
756         tgt = self._get_tgt(creds)
757         self._s4u2self(tgt, creds, expected_error=0)
758
759     def test_user2user_req(self):
760         creds = self._get_creds()
761         tgt = self._get_tgt(creds)
762         self._user2user(tgt, creds, expected_error=0)
763
764     def test_fast_req(self):
765         creds = self._get_creds()
766         tgt = self._get_tgt(creds)
767         self._fast(tgt, creds, expected_error=0)
768
769     def test_tgs_req_invalid(self):
770         creds = self._get_creds()
771         tgt = self._get_tgt(creds, invalid=True)
772         self._run_tgs(tgt, expected_error=KRB_ERR_TKT_NYV)
773
774     def test_s4u2self_req_invalid(self):
775         creds = self._get_creds()
776         tgt = self._get_tgt(creds, invalid=True)
777         self._s4u2self(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
778
779     def test_user2user_req_invalid(self):
780         creds = self._get_creds()
781         tgt = self._get_tgt(creds, invalid=True)
782         self._user2user(tgt, creds, expected_error=KRB_ERR_TKT_NYV)
783
784     def test_fast_req_invalid(self):
785         creds = self._get_creds()
786         tgt = self._get_tgt(creds, invalid=True)
787         self._fast(tgt, creds, expected_error=KRB_ERR_TKT_NYV,
788                    expected_sname=self.get_krbtgt_sname())
789
790     def test_tgs_req_no_requester_sid(self):
791         creds = self._get_creds()
792         tgt = self._get_tgt(creds, remove_requester_sid=True)
793
794         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
795
796     def test_tgs_req_no_pac_attrs(self):
797         creds = self._get_creds()
798         tgt = self._get_tgt(creds, remove_pac_attrs=True)
799
800         self._run_tgs(tgt, expected_error=0, expect_pac=True,
801                       expect_pac_attrs=False)
802
803     def test_tgs_req_from_rodc_no_requester_sid(self):
804         creds = self._get_creds(replication_allowed=True,
805                                 revealed_to_rodc=True)
806         tgt = self._get_tgt(creds, from_rodc=True, remove_requester_sid=True)
807
808         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
809
810     def test_tgs_req_from_rodc_no_pac_attrs(self):
811         creds = self._get_creds(replication_allowed=True,
812                                 revealed_to_rodc=True)
813         tgt = self._get_tgt(creds, from_rodc=True, remove_pac_attrs=True)
814         self._run_tgs(tgt, expected_error=0, expect_pac=True,
815                       expect_pac_attrs=False)
816
817     # Test making a request without a PAC.
818     def test_tgs_no_pac(self):
819         creds = self._get_creds()
820         tgt = self._get_tgt(creds, remove_pac=True)
821         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
822
823     def test_renew_no_pac(self):
824         creds = self._get_creds()
825         tgt = self._get_tgt(creds, renewable=True, remove_pac=True)
826         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
827
828     def test_validate_no_pac(self):
829         creds = self._get_creds()
830         tgt = self._get_tgt(creds, invalid=True, remove_pac=True)
831         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
832
833     def test_s4u2self_no_pac(self):
834         creds = self._get_creds()
835         tgt = self._get_tgt(creds, remove_pac=True)
836         self._s4u2self(tgt, creds,
837                        expected_error=KDC_ERR_TGT_REVOKED,
838                        expect_edata=False)
839
840     def test_user2user_no_pac(self):
841         creds = self._get_creds()
842         tgt = self._get_tgt(creds, remove_pac=True)
843         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
844
845     def test_fast_no_pac(self):
846         creds = self._get_creds()
847         tgt = self._get_tgt(creds, remove_pac=True)
848         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
849                    expected_sname=self.get_krbtgt_sname())
850
851     # Test making a request with authdata and without a PAC.
852     def test_tgs_authdata_no_pac(self):
853         creds = self._get_creds()
854         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
855         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
856
857     def test_renew_authdata_no_pac(self):
858         creds = self._get_creds()
859         tgt = self._get_tgt(creds, renewable=True, remove_pac=True,
860                             allow_empty_authdata=True)
861         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
862
863     def test_validate_authdata_no_pac(self):
864         creds = self._get_creds()
865         tgt = self._get_tgt(creds, invalid=True, remove_pac=True,
866                             allow_empty_authdata=True)
867         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
868
869     def test_s4u2self_authdata_no_pac(self):
870         creds = self._get_creds()
871         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
872         self._s4u2self(tgt, creds,
873                        expected_error=KDC_ERR_TGT_REVOKED,
874                        expect_edata=False)
875
876     def test_user2user_authdata_no_pac(self):
877         creds = self._get_creds()
878         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
879         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
880
881     def test_fast_authdata_no_pac(self):
882         creds = self._get_creds()
883         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
884         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
885                    expected_sname=self.get_krbtgt_sname())
886
887     # Test changing the SID in the PAC to that of another account.
888     def test_tgs_sid_mismatch_existing(self):
889         creds = self._get_creds()
890         existing_rid = self._get_existing_rid()
891         tgt = self._get_tgt(creds, new_rid=existing_rid)
892         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
893
894     def test_renew_sid_mismatch_existing(self):
895         creds = self._get_creds()
896         existing_rid = self._get_existing_rid()
897         tgt = self._get_tgt(creds, renewable=True, new_rid=existing_rid)
898         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
899
900     def test_validate_sid_mismatch_existing(self):
901         creds = self._get_creds()
902         existing_rid = self._get_existing_rid()
903         tgt = self._get_tgt(creds, invalid=True, new_rid=existing_rid)
904         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
905
906     def test_s4u2self_sid_mismatch_existing(self):
907         creds = self._get_creds()
908         existing_rid = self._get_existing_rid()
909         tgt = self._get_tgt(creds, new_rid=existing_rid)
910         self._s4u2self(tgt, creds,
911                        expected_error=KDC_ERR_TGT_REVOKED)
912
913     def test_user2user_sid_mismatch_existing(self):
914         creds = self._get_creds()
915         existing_rid = self._get_existing_rid()
916         tgt = self._get_tgt(creds, new_rid=existing_rid)
917         self._user2user(tgt, creds,
918                         expected_error=KDC_ERR_TGT_REVOKED)
919
920     def test_fast_sid_mismatch_existing(self):
921         creds = self._get_creds()
922         existing_rid = self._get_existing_rid()
923         tgt = self._get_tgt(creds, new_rid=existing_rid)
924         self._fast(tgt, creds,
925                    expected_error=KDC_ERR_TGT_REVOKED,
926                    expected_sname=self.get_krbtgt_sname())
927
928     def test_requester_sid_mismatch_existing(self):
929         creds = self._get_creds()
930         existing_rid = self._get_existing_rid()
931         tgt = self._get_tgt(creds, new_rid=existing_rid,
932                             can_modify_logon_info=False)
933         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
934
935     def test_logon_info_sid_mismatch_existing(self):
936         creds = self._get_creds()
937         existing_rid = self._get_existing_rid()
938         tgt = self._get_tgt(creds, new_rid=existing_rid,
939                             can_modify_requester_sid=False)
940         self._run_tgs(tgt, expected_error=0)
941
942     def test_logon_info_only_sid_mismatch_existing(self):
943         creds = self._get_creds()
944         existing_rid = self._get_existing_rid()
945         tgt = self._get_tgt(creds, new_rid=existing_rid,
946                             remove_requester_sid=True)
947         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
948
949     # Test changing the SID in the PAC to a non-existent one.
950     def test_tgs_sid_mismatch_nonexisting(self):
951         creds = self._get_creds()
952         nonexistent_rid = self._get_non_existent_rid()
953         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
954         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
955
956     def test_renew_sid_mismatch_nonexisting(self):
957         creds = self._get_creds()
958         nonexistent_rid = self._get_non_existent_rid()
959         tgt = self._get_tgt(creds, renewable=True,
960                             new_rid=nonexistent_rid)
961         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
962
963     def test_validate_sid_mismatch_nonexisting(self):
964         creds = self._get_creds()
965         nonexistent_rid = self._get_non_existent_rid()
966         tgt = self._get_tgt(creds, invalid=True,
967                             new_rid=nonexistent_rid)
968         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
969
970     def test_s4u2self_sid_mismatch_nonexisting(self):
971         creds = self._get_creds()
972         nonexistent_rid = self._get_non_existent_rid()
973         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
974         self._s4u2self(tgt, creds,
975                        expected_error=KDC_ERR_TGT_REVOKED)
976
977     def test_user2user_sid_mismatch_nonexisting(self):
978         creds = self._get_creds()
979         nonexistent_rid = self._get_non_existent_rid()
980         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
981         self._user2user(tgt, creds,
982                         expected_error=KDC_ERR_TGT_REVOKED)
983
984     def test_fast_sid_mismatch_nonexisting(self):
985         creds = self._get_creds()
986         nonexistent_rid = self._get_non_existent_rid()
987         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
988         self._fast(tgt, creds,
989                    expected_error=KDC_ERR_TGT_REVOKED,
990                    expected_sname=self.get_krbtgt_sname())
991
992     def test_requester_sid_mismatch_nonexisting(self):
993         creds = self._get_creds()
994         nonexistent_rid = self._get_non_existent_rid()
995         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
996                             can_modify_logon_info=False)
997         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
998
999     def test_logon_info_sid_mismatch_nonexisting(self):
1000         creds = self._get_creds()
1001         nonexistent_rid = self._get_non_existent_rid()
1002         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1003                             can_modify_requester_sid=False)
1004         self._run_tgs(tgt, expected_error=0)
1005
1006     def test_logon_info_only_sid_mismatch_nonexisting(self):
1007         creds = self._get_creds()
1008         nonexistent_rid = self._get_non_existent_rid()
1009         tgt = self._get_tgt(creds, new_rid=nonexistent_rid,
1010                             remove_requester_sid=True)
1011         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1012
1013     # Test with an RODC-issued ticket where the client is revealed to the RODC.
1014     def test_tgs_rodc_revealed(self):
1015         creds = self._get_creds(replication_allowed=True,
1016                                 revealed_to_rodc=True)
1017         tgt = self._get_tgt(creds, from_rodc=True)
1018         self._run_tgs(tgt, expected_error=0)
1019
1020     def test_renew_rodc_revealed(self):
1021         creds = self._get_creds(replication_allowed=True,
1022                                 revealed_to_rodc=True)
1023         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1024         self._renew_tgt(tgt, expected_error=0,
1025                         expect_pac_attrs=False,
1026                         expect_requester_sid=True)
1027
1028     def test_validate_rodc_revealed(self):
1029         creds = self._get_creds(replication_allowed=True,
1030                                 revealed_to_rodc=True)
1031         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1032         self._validate_tgt(tgt, expected_error=0,
1033                            expect_pac_attrs=False,
1034                            expect_requester_sid=True)
1035
1036     # This test fails on Windows, which gives KDC_ERR_C_PRINCIPAL_UNKNOWN when
1037     # attempting to use S4U2Self with a TGT from an RODC.
1038     def test_s4u2self_rodc_revealed(self):
1039         creds = self._get_creds(replication_allowed=True,
1040                                 revealed_to_rodc=True)
1041         tgt = self._get_tgt(creds, from_rodc=True)
1042         self._s4u2self(tgt, creds, expected_error=0)
1043
1044     def test_user2user_rodc_revealed(self):
1045         creds = self._get_creds(replication_allowed=True,
1046                                 revealed_to_rodc=True)
1047         tgt = self._get_tgt(creds, from_rodc=True)
1048         self._user2user(tgt, creds, expected_error=0)
1049
1050     # Test with an RODC-issued ticket where the SID in the PAC is changed to
1051     # that of another account.
1052     def test_tgs_rodc_sid_mismatch_existing(self):
1053         creds = self._get_creds(replication_allowed=True,
1054                                 revealed_to_rodc=True)
1055         existing_rid = self._get_existing_rid(replication_allowed=True,
1056                                               revealed_to_rodc=True)
1057         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1058         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1059
1060     def test_renew_rodc_sid_mismatch_existing(self):
1061         creds = self._get_creds(replication_allowed=True,
1062                                 revealed_to_rodc=True)
1063         existing_rid = self._get_existing_rid(replication_allowed=True,
1064                                               revealed_to_rodc=True)
1065         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1066                             new_rid=existing_rid)
1067         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1068
1069     def test_validate_rodc_sid_mismatch_existing(self):
1070         creds = self._get_creds(replication_allowed=True,
1071                                 revealed_to_rodc=True)
1072         existing_rid = self._get_existing_rid(replication_allowed=True,
1073                                        revealed_to_rodc=True)
1074         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1075                             new_rid=existing_rid)
1076         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1077
1078     def test_s4u2self_rodc_sid_mismatch_existing(self):
1079         creds = self._get_creds(replication_allowed=True,
1080                                 revealed_to_rodc=True)
1081         existing_rid = self._get_existing_rid(replication_allowed=True,
1082                                               revealed_to_rodc=True)
1083         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1084         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1085
1086     def test_user2user_rodc_sid_mismatch_existing(self):
1087         creds = self._get_creds(replication_allowed=True,
1088                                 revealed_to_rodc=True)
1089         existing_rid = self._get_existing_rid(replication_allowed=True,
1090                                               revealed_to_rodc=True)
1091         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1092         self._user2user(tgt, creds,
1093                         expected_error=KDC_ERR_TGT_REVOKED)
1094
1095     def test_fast_rodc_sid_mismatch_existing(self):
1096         creds = self._get_creds(replication_allowed=True,
1097                                 revealed_to_rodc=True)
1098         existing_rid = self._get_existing_rid(replication_allowed=True,
1099                                               revealed_to_rodc=True)
1100         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
1101         self._fast(tgt, creds,
1102                    expected_error=KDC_ERR_TGT_REVOKED,
1103                    expected_sname=self.get_krbtgt_sname())
1104
1105     def test_tgs_rodc_requester_sid_mismatch_existing(self):
1106         creds = self._get_creds(replication_allowed=True,
1107                                 revealed_to_rodc=True)
1108         existing_rid = self._get_existing_rid(replication_allowed=True,
1109                                               revealed_to_rodc=True)
1110         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1111                             can_modify_logon_info=False)
1112         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1113
1114     def test_tgs_rodc_logon_info_sid_mismatch_existing(self):
1115         creds = self._get_creds(replication_allowed=True,
1116                                 revealed_to_rodc=True)
1117         existing_rid = self._get_existing_rid(replication_allowed=True,
1118                                               revealed_to_rodc=True)
1119         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1120                             can_modify_requester_sid=False)
1121         self._run_tgs(tgt, expected_error=0)
1122
1123     def test_tgs_rodc_logon_info_only_sid_mismatch_existing(self):
1124         creds = self._get_creds(replication_allowed=True,
1125                                 revealed_to_rodc=True)
1126         existing_rid = self._get_existing_rid(replication_allowed=True,
1127                                               revealed_to_rodc=True)
1128         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid,
1129                             remove_requester_sid=True)
1130         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1131
1132     # Test with an RODC-issued ticket where the SID in the PAC is changed to a
1133     # non-existent one.
1134     def test_tgs_rodc_sid_mismatch_nonexisting(self):
1135         creds = self._get_creds(replication_allowed=True,
1136                                 revealed_to_rodc=True)
1137         nonexistent_rid = self._get_non_existent_rid()
1138         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1139         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1140
1141     def test_renew_rodc_sid_mismatch_nonexisting(self):
1142         creds = self._get_creds(replication_allowed=True,
1143                                 revealed_to_rodc=True)
1144         nonexistent_rid = self._get_non_existent_rid()
1145         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
1146                             new_rid=nonexistent_rid)
1147         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1148
1149     def test_validate_rodc_sid_mismatch_nonexisting(self):
1150         creds = self._get_creds(replication_allowed=True,
1151                                 revealed_to_rodc=True)
1152         nonexistent_rid = self._get_non_existent_rid()
1153         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
1154                             new_rid=nonexistent_rid)
1155         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1156
1157     def test_s4u2self_rodc_sid_mismatch_nonexisting(self):
1158         creds = self._get_creds(replication_allowed=True,
1159                                 revealed_to_rodc=True)
1160         nonexistent_rid = self._get_non_existent_rid()
1161         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1162         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1163
1164     def test_user2user_rodc_sid_mismatch_nonexisting(self):
1165         creds = self._get_creds(replication_allowed=True,
1166                                 revealed_to_rodc=True)
1167         nonexistent_rid = self._get_non_existent_rid()
1168         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1169         self._user2user(tgt, creds,
1170                         expected_error=KDC_ERR_TGT_REVOKED)
1171
1172     def test_fast_rodc_sid_mismatch_nonexisting(self):
1173         creds = self._get_creds(replication_allowed=True,
1174                                 revealed_to_rodc=True)
1175         nonexistent_rid = self._get_non_existent_rid()
1176         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
1177         self._fast(tgt, creds,
1178                    expected_error=KDC_ERR_TGT_REVOKED,
1179                    expected_sname=self.get_krbtgt_sname())
1180
1181     def test_tgs_rodc_requester_sid_mismatch_nonexisting(self):
1182         creds = self._get_creds(replication_allowed=True,
1183                                 revealed_to_rodc=True)
1184         nonexistent_rid = self._get_non_existent_rid()
1185         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1186                             can_modify_logon_info=False)
1187         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1188
1189     def test_tgs_rodc_logon_info_sid_mismatch_nonexisting(self):
1190         creds = self._get_creds(replication_allowed=True,
1191                                 revealed_to_rodc=True)
1192         nonexistent_rid = self._get_non_existent_rid()
1193         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1194                             can_modify_requester_sid=False)
1195         self._run_tgs(tgt, expected_error=0)
1196
1197     def test_tgs_rodc_logon_info_only_sid_mismatch_nonexisting(self):
1198         creds = self._get_creds(replication_allowed=True,
1199                                 revealed_to_rodc=True)
1200         nonexistent_rid = self._get_non_existent_rid()
1201         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid,
1202                             remove_requester_sid=True)
1203         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1204
1205     # Test with an RODC-issued ticket where the client is not revealed to the
1206     # RODC.
1207     def test_tgs_rodc_not_revealed(self):
1208         creds = self._get_creds(replication_allowed=True)
1209         tgt = self._get_tgt(creds, from_rodc=True)
1210         # TODO: error code
1211         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1212
1213     def test_renew_rodc_not_revealed(self):
1214         creds = self._get_creds(replication_allowed=True)
1215         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1216         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1217
1218     def test_validate_rodc_not_revealed(self):
1219         creds = self._get_creds(replication_allowed=True)
1220         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1221         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1222
1223     def test_s4u2self_rodc_not_revealed(self):
1224         creds = self._get_creds(replication_allowed=True)
1225         tgt = self._get_tgt(creds, from_rodc=True)
1226         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1227
1228     def test_user2user_rodc_not_revealed(self):
1229         creds = self._get_creds(replication_allowed=True)
1230         tgt = self._get_tgt(creds, from_rodc=True)
1231         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1232
1233     # Test with an RODC-issued ticket where the RODC account does not have the
1234     # PARTIAL_SECRETS bit set.
1235     def test_tgs_rodc_no_partial_secrets(self):
1236         creds = self._get_creds(replication_allowed=True,
1237                                 revealed_to_rodc=True)
1238         tgt = self._get_tgt(creds, from_rodc=True)
1239         self._remove_rodc_partial_secrets()
1240         self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
1241
1242     def test_renew_rodc_no_partial_secrets(self):
1243         creds = self._get_creds(replication_allowed=True,
1244                                 revealed_to_rodc=True)
1245         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1246         self._remove_rodc_partial_secrets()
1247         self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
1248
1249     def test_validate_rodc_no_partial_secrets(self):
1250         creds = self._get_creds(replication_allowed=True,
1251                                 revealed_to_rodc=True)
1252         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1253         self._remove_rodc_partial_secrets()
1254         self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
1255
1256     def test_s4u2self_rodc_no_partial_secrets(self):
1257         creds = self._get_creds(replication_allowed=True,
1258                                 revealed_to_rodc=True)
1259         tgt = self._get_tgt(creds, from_rodc=True)
1260         self._remove_rodc_partial_secrets()
1261         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1262
1263     def test_user2user_rodc_no_partial_secrets(self):
1264         creds = self._get_creds(replication_allowed=True,
1265                                 revealed_to_rodc=True)
1266         tgt = self._get_tgt(creds, from_rodc=True)
1267         self._remove_rodc_partial_secrets()
1268         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1269
1270     def test_fast_rodc_no_partial_secrets(self):
1271         creds = self._get_creds(replication_allowed=True,
1272                                 revealed_to_rodc=True)
1273         tgt = self._get_tgt(creds, from_rodc=True)
1274         self._remove_rodc_partial_secrets()
1275         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1276                    expected_sname=self.get_krbtgt_sname())
1277
1278     # Test with an RODC-issued ticket where the RODC account does not have an
1279     # msDS-KrbTgtLink.
1280     def test_tgs_rodc_no_krbtgt_link(self):
1281         creds = self._get_creds(replication_allowed=True,
1282                                 revealed_to_rodc=True)
1283         tgt = self._get_tgt(creds, from_rodc=True)
1284         self._remove_rodc_krbtgt_link()
1285         self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
1286
1287     def test_renew_rodc_no_krbtgt_link(self):
1288         creds = self._get_creds(replication_allowed=True,
1289                                 revealed_to_rodc=True)
1290         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1291         self._remove_rodc_krbtgt_link()
1292         self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
1293
1294     def test_validate_rodc_no_krbtgt_link(self):
1295         creds = self._get_creds(replication_allowed=True,
1296                                 revealed_to_rodc=True)
1297         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1298         self._remove_rodc_krbtgt_link()
1299         self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
1300
1301     def test_s4u2self_rodc_no_krbtgt_link(self):
1302         creds = self._get_creds(replication_allowed=True,
1303                                 revealed_to_rodc=True)
1304         tgt = self._get_tgt(creds, from_rodc=True)
1305         self._remove_rodc_krbtgt_link()
1306         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
1307
1308     def test_user2user_rodc_no_krbtgt_link(self):
1309         creds = self._get_creds(replication_allowed=True,
1310                                 revealed_to_rodc=True)
1311         tgt = self._get_tgt(creds, from_rodc=True)
1312         self._remove_rodc_krbtgt_link()
1313         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
1314
1315     def test_fast_rodc_no_krbtgt_link(self):
1316         creds = self._get_creds(replication_allowed=True,
1317                                 revealed_to_rodc=True)
1318         tgt = self._get_tgt(creds, from_rodc=True)
1319         self._remove_rodc_krbtgt_link()
1320         self._fast(tgt, creds, expected_error=KDC_ERR_POLICY,
1321                    expected_sname=self.get_krbtgt_sname())
1322
1323     # Test with an RODC-issued ticket where the client is not allowed to
1324     # replicate to the RODC.
1325     def test_tgs_rodc_not_allowed(self):
1326         creds = self._get_creds(revealed_to_rodc=True)
1327         tgt = self._get_tgt(creds, from_rodc=True)
1328         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1329
1330     def test_renew_rodc_not_allowed(self):
1331         creds = self._get_creds(revealed_to_rodc=True)
1332         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1333         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1334
1335     def test_validate_rodc_not_allowed(self):
1336         creds = self._get_creds(revealed_to_rodc=True)
1337         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1338         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1339
1340     def test_s4u2self_rodc_not_allowed(self):
1341         creds = self._get_creds(revealed_to_rodc=True)
1342         tgt = self._get_tgt(creds, from_rodc=True)
1343         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1344
1345     def test_user2user_rodc_not_allowed(self):
1346         creds = self._get_creds(revealed_to_rodc=True)
1347         tgt = self._get_tgt(creds, from_rodc=True)
1348         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1349
1350     def test_fast_rodc_not_allowed(self):
1351         creds = self._get_creds(revealed_to_rodc=True)
1352         tgt = self._get_tgt(creds, from_rodc=True)
1353         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1354                    expected_sname=self.get_krbtgt_sname())
1355
1356     # Test with an RODC-issued ticket where the client is denied from
1357     # replicating to the RODC.
1358     def test_tgs_rodc_denied(self):
1359         creds = self._get_creds(replication_denied=True,
1360                                 revealed_to_rodc=True)
1361         tgt = self._get_tgt(creds, from_rodc=True)
1362         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1363
1364     def test_renew_rodc_denied(self):
1365         creds = self._get_creds(replication_denied=True,
1366                                 revealed_to_rodc=True)
1367         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1368         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1369
1370     def test_validate_rodc_denied(self):
1371         creds = self._get_creds(replication_denied=True,
1372                                 revealed_to_rodc=True)
1373         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1374         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1375
1376     def test_s4u2self_rodc_denied(self):
1377         creds = self._get_creds(replication_denied=True,
1378                                 revealed_to_rodc=True)
1379         tgt = self._get_tgt(creds, from_rodc=True)
1380         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1381
1382     def test_user2user_rodc_denied(self):
1383         creds = self._get_creds(replication_denied=True,
1384                                 revealed_to_rodc=True)
1385         tgt = self._get_tgt(creds, from_rodc=True)
1386         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1387
1388     def test_fast_rodc_denied(self):
1389         creds = self._get_creds(replication_denied=True,
1390                                 revealed_to_rodc=True)
1391         tgt = self._get_tgt(creds, from_rodc=True)
1392         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1393                    expected_sname=self.get_krbtgt_sname())
1394
1395     # Test with an RODC-issued ticket where the client is both allowed and
1396     # denied replicating to the RODC.
1397     def test_tgs_rodc_allowed_denied(self):
1398         creds = self._get_creds(replication_allowed=True,
1399                                 replication_denied=True,
1400                                 revealed_to_rodc=True)
1401         tgt = self._get_tgt(creds, from_rodc=True)
1402         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1403
1404     def test_renew_rodc_allowed_denied(self):
1405         creds = self._get_creds(replication_allowed=True,
1406                                 replication_denied=True,
1407                                 revealed_to_rodc=True)
1408         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
1409         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1410
1411     def test_validate_rodc_allowed_denied(self):
1412         creds = self._get_creds(replication_allowed=True,
1413                                 replication_denied=True,
1414                                 revealed_to_rodc=True)
1415         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
1416         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1417
1418     def test_s4u2self_rodc_allowed_denied(self):
1419         creds = self._get_creds(replication_allowed=True,
1420                                 replication_denied=True,
1421                                 revealed_to_rodc=True)
1422         tgt = self._get_tgt(creds, from_rodc=True)
1423         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1424
1425     def test_user2user_rodc_allowed_denied(self):
1426         creds = self._get_creds(replication_allowed=True,
1427                                 replication_denied=True,
1428                                 revealed_to_rodc=True)
1429         tgt = self._get_tgt(creds, from_rodc=True)
1430         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
1431
1432     def test_fast_rodc_allowed_denied(self):
1433         creds = self._get_creds(replication_allowed=True,
1434                                 replication_denied=True,
1435                                 revealed_to_rodc=True)
1436         tgt = self._get_tgt(creds, from_rodc=True)
1437         self._fast(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED,
1438                    expected_sname=self.get_krbtgt_sname())
1439
1440     # Test user-to-user with incorrect service principal names.
1441     def test_user2user_matching_sname_host(self):
1442         creds = self._get_creds()
1443         tgt = self._get_tgt(creds)
1444
1445         user_name = creds.get_username()
1446         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1447                                           names=['host', user_name])
1448
1449         self._user2user(tgt, creds, sname=sname,
1450                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1451
1452     def test_user2user_matching_sname_no_host(self):
1453         creds = self._get_creds()
1454         tgt = self._get_tgt(creds)
1455
1456         user_name = creds.get_username()
1457         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1458                                           names=[user_name])
1459
1460         self._user2user(tgt, creds, sname=sname, expected_error=0)
1461
1462     def test_user2user_wrong_sname(self):
1463         creds = self._get_creds()
1464         tgt = self._get_tgt(creds)
1465
1466         other_creds = self._get_mach_creds()
1467         user_name = other_creds.get_username()
1468         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1469                                           names=[user_name])
1470
1471         self._user2user(tgt, creds, sname=sname,
1472                         expected_error=KDC_ERR_BADMATCH)
1473
1474     def test_user2user_other_sname(self):
1475         other_name = self.get_new_username()
1476         spn = f'host/{other_name}'
1477         creds = self.get_cached_creds(
1478             account_type=self.AccountType.COMPUTER,
1479             opts={'spn': spn})
1480         tgt = self._get_tgt(creds)
1481
1482         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1483                                           names=['host', other_name])
1484
1485         self._user2user(tgt, creds, sname=sname, expected_error=0)
1486
1487     def test_user2user_wrong_sname_krbtgt(self):
1488         creds = self._get_creds()
1489         tgt = self._get_tgt(creds)
1490
1491         sname = self.get_krbtgt_sname()
1492
1493         self._user2user(tgt, creds, sname=sname,
1494                         expected_error=KDC_ERR_BADMATCH)
1495
1496     def test_user2user_wrong_srealm(self):
1497         creds = self._get_creds()
1498         tgt = self._get_tgt(creds)
1499
1500         self._user2user(tgt, creds, srealm='OTHER.REALM',
1501                         expected_error=(KDC_ERR_WRONG_REALM,
1502                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1503
1504     def test_user2user_tgt_correct_realm(self):
1505         creds = self._get_creds()
1506         tgt = self._get_tgt(creds)
1507
1508         realm = creds.get_realm().encode('utf-8')
1509         tgt = self._modify_tgt(tgt, realm)
1510
1511         self._user2user(tgt, creds,
1512                         expected_error=0)
1513
1514     def test_user2user_tgt_wrong_realm(self):
1515         creds = self._get_creds()
1516         tgt = self._get_tgt(creds)
1517
1518         tgt = self._modify_tgt(tgt, b'OTHER.REALM')
1519
1520         self._user2user(tgt, creds,
1521                         expected_error=0)
1522
1523     def test_user2user_tgt_correct_cname(self):
1524         creds = self._get_creds()
1525         tgt = self._get_tgt(creds)
1526
1527         user_name = creds.get_username()
1528         user_name = user_name.encode('utf-8')
1529         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1530                                           names=[user_name])
1531
1532         tgt = self._modify_tgt(tgt, cname=cname)
1533
1534         self._user2user(tgt, creds, expected_error=0)
1535
1536     def test_user2user_tgt_other_cname(self):
1537         samdb = self.get_samdb()
1538
1539         other_name = self.get_new_username()
1540         upn = f'{other_name}@{samdb.domain_dns_name()}'
1541
1542         creds = self.get_cached_creds(
1543             account_type=self.AccountType.COMPUTER,
1544             opts={'upn': upn})
1545         tgt = self._get_tgt(creds)
1546
1547         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1548                                           names=[other_name.encode('utf-8')])
1549
1550         tgt = self._modify_tgt(tgt, cname=cname)
1551
1552         self._user2user(tgt, creds, expected_error=0)
1553
1554     def test_user2user_tgt_cname_host(self):
1555         creds = self._get_creds()
1556         tgt = self._get_tgt(creds)
1557
1558         user_name = creds.get_username()
1559         user_name = user_name.encode('utf-8')
1560         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1561                                           names=[b'host', user_name])
1562
1563         tgt = self._modify_tgt(tgt, cname=cname)
1564
1565         self._user2user(tgt, creds,
1566                         expected_error=(KDC_ERR_TGT_REVOKED,
1567                                         KDC_ERR_C_PRINCIPAL_UNKNOWN))
1568
1569     def test_user2user_non_existent_sname(self):
1570         creds = self._get_creds()
1571         tgt = self._get_tgt(creds)
1572
1573         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1574                                           names=['host', 'non_existent_user'])
1575
1576         self._user2user(tgt, creds, sname=sname,
1577                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1578
1579     def test_user2user_no_sname(self):
1580         creds = self._get_creds()
1581         tgt = self._get_tgt(creds)
1582
1583         self._user2user(tgt, creds, sname=False,
1584                         expected_error=(KDC_ERR_GENERIC,
1585                                         KDC_ERR_S_PRINCIPAL_UNKNOWN))
1586
1587     def test_tgs_service_ticket(self):
1588         creds = self._get_creds()
1589         tgt = self._get_tgt(creds)
1590
1591         service_creds = self.get_service_creds()
1592         service_ticket = self.get_service_ticket(tgt, service_creds)
1593
1594         self._run_tgs(service_ticket,
1595                       expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1596
1597     def test_renew_service_ticket(self):
1598         creds = self._get_creds()
1599         tgt = self._get_tgt(creds)
1600
1601         service_creds = self.get_service_creds()
1602         service_ticket = self.get_service_ticket(tgt, service_creds)
1603
1604         service_ticket = self.modified_ticket(
1605             service_ticket,
1606             modify_fn=self._modify_renewable,
1607             checksum_keys=self.get_krbtgt_checksum_key())
1608
1609         self._renew_tgt(service_ticket,
1610                         expected_error=KDC_ERR_POLICY)
1611
1612     def test_validate_service_ticket(self):
1613         creds = self._get_creds()
1614         tgt = self._get_tgt(creds)
1615
1616         service_creds = self.get_service_creds()
1617         service_ticket = self.get_service_ticket(tgt, service_creds)
1618
1619         service_ticket = self.modified_ticket(
1620             service_ticket,
1621             modify_fn=self._modify_invalid,
1622             checksum_keys=self.get_krbtgt_checksum_key())
1623
1624         self._validate_tgt(service_ticket,
1625                            expected_error=KDC_ERR_POLICY)
1626
1627     def test_s4u2self_service_ticket(self):
1628         creds = self._get_creds()
1629         tgt = self._get_tgt(creds)
1630
1631         service_creds = self.get_service_creds()
1632         service_ticket = self.get_service_ticket(tgt, service_creds)
1633
1634         self._s4u2self(service_ticket, creds,
1635                        expected_error=(KDC_ERR_NOT_US, KDC_ERR_POLICY))
1636
1637     def test_user2user_service_ticket(self):
1638         creds = self._get_creds()
1639         tgt = self._get_tgt(creds)
1640
1641         service_creds = self.get_service_creds()
1642         service_ticket = self.get_service_ticket(tgt, service_creds)
1643
1644         self._user2user(service_ticket, creds,
1645                         expected_error=(KDC_ERR_MODIFIED, KDC_ERR_POLICY))
1646
1647     # Expected to fail against Windows, which does not produce a policy error.
1648     def test_fast_service_ticket(self):
1649         creds = self._get_creds()
1650         tgt = self._get_tgt(creds)
1651
1652         service_creds = self.get_service_creds()
1653         service_ticket = self.get_service_ticket(tgt, service_creds)
1654
1655         self._fast(service_ticket, creds,
1656                    expected_error=KDC_ERR_POLICY)
1657
1658     def test_pac_attrs_none(self):
1659         creds = self._get_creds()
1660         self.get_tgt(creds, pac_request=None,
1661                      expect_pac=True,
1662                      expect_pac_attrs=True,
1663                      expect_pac_attrs_pac_request=None)
1664
1665     def test_pac_attrs_false(self):
1666         creds = self._get_creds()
1667         self.get_tgt(creds, pac_request=False,
1668                      expect_pac=True,
1669                      expect_pac_attrs=True,
1670                      expect_pac_attrs_pac_request=False)
1671
1672     def test_pac_attrs_true(self):
1673         creds = self._get_creds()
1674         self.get_tgt(creds, pac_request=True,
1675                      expect_pac=True,
1676                      expect_pac_attrs=True,
1677                      expect_pac_attrs_pac_request=True)
1678
1679     def test_pac_attrs_renew_none(self):
1680         creds = self._get_creds()
1681         tgt = self.get_tgt(creds, pac_request=None,
1682                            expect_pac=True,
1683                            expect_pac_attrs=True,
1684                            expect_pac_attrs_pac_request=None)
1685         tgt = self._modify_tgt(tgt, renewable=True)
1686
1687         self._renew_tgt(tgt, expected_error=0,
1688                         expect_pac=True,
1689                         expect_pac_attrs=True,
1690                         expect_pac_attrs_pac_request=None,
1691                         expect_requester_sid=True)
1692
1693     def test_pac_attrs_renew_false(self):
1694         creds = self._get_creds()
1695         tgt = self.get_tgt(creds, pac_request=False,
1696                            expect_pac=True,
1697                            expect_pac_attrs=True,
1698                            expect_pac_attrs_pac_request=False)
1699         tgt = self._modify_tgt(tgt, renewable=True)
1700
1701         self._renew_tgt(tgt, expected_error=0,
1702                         expect_pac=True,
1703                         expect_pac_attrs=True,
1704                         expect_pac_attrs_pac_request=False,
1705                         expect_requester_sid=True)
1706
1707     def test_pac_attrs_renew_true(self):
1708         creds = self._get_creds()
1709         tgt = self.get_tgt(creds, pac_request=True,
1710                            expect_pac=True,
1711                            expect_pac_attrs=True,
1712                            expect_pac_attrs_pac_request=True)
1713         tgt = self._modify_tgt(tgt, renewable=True)
1714
1715         self._renew_tgt(tgt, expected_error=0,
1716                         expect_pac=True,
1717                         expect_pac_attrs=True,
1718                         expect_pac_attrs_pac_request=True,
1719                         expect_requester_sid=True)
1720
1721     def test_pac_attrs_rodc_renew_none(self):
1722         creds = self._get_creds(replication_allowed=True,
1723                                 revealed_to_rodc=True)
1724         tgt = self.get_tgt(creds, pac_request=None,
1725                            expect_pac=True,
1726                            expect_pac_attrs=True,
1727                            expect_pac_attrs_pac_request=None)
1728         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1729
1730         self._renew_tgt(tgt, expected_error=0,
1731                         expect_pac=True,
1732                         expect_pac_attrs=False,
1733                         expect_requester_sid=True)
1734
1735     def test_pac_attrs_rodc_renew_false(self):
1736         creds = self._get_creds(replication_allowed=True,
1737                                 revealed_to_rodc=True)
1738         tgt = self.get_tgt(creds, pac_request=False,
1739                            expect_pac=True,
1740                            expect_pac_attrs=True,
1741                            expect_pac_attrs_pac_request=False)
1742         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1743
1744         self._renew_tgt(tgt, expected_error=0,
1745                         expect_pac=True,
1746                         expect_pac_attrs=False,
1747                         expect_requester_sid=True)
1748
1749     def test_pac_attrs_rodc_renew_true(self):
1750         creds = self._get_creds(replication_allowed=True,
1751                                 revealed_to_rodc=True)
1752         tgt = self.get_tgt(creds, pac_request=True,
1753                            expect_pac=True,
1754                            expect_pac_attrs=True,
1755                            expect_pac_attrs_pac_request=True)
1756         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1757
1758         self._renew_tgt(tgt, expected_error=0,
1759                         expect_pac=True,
1760                         expect_pac_attrs=False,
1761                         expect_requester_sid=True)
1762
1763     def test_pac_attrs_missing_renew_none(self):
1764         creds = self._get_creds()
1765         tgt = self.get_tgt(creds, pac_request=None,
1766                            expect_pac=True,
1767                            expect_pac_attrs=True,
1768                            expect_pac_attrs_pac_request=None)
1769         tgt = self._modify_tgt(tgt, renewable=True,
1770                                remove_pac_attrs=True)
1771
1772         self._renew_tgt(tgt, expected_error=0,
1773                         expect_pac=True,
1774                         expect_pac_attrs=False,
1775                         expect_requester_sid=True)
1776
1777     def test_pac_attrs_missing_renew_false(self):
1778         creds = self._get_creds()
1779         tgt = self.get_tgt(creds, pac_request=False,
1780                            expect_pac=True,
1781                            expect_pac_attrs=True,
1782                            expect_pac_attrs_pac_request=False)
1783         tgt = self._modify_tgt(tgt, renewable=True,
1784                                remove_pac_attrs=True)
1785
1786         self._renew_tgt(tgt, expected_error=0,
1787                         expect_pac=True,
1788                         expect_pac_attrs=False,
1789                         expect_requester_sid=True)
1790
1791     def test_pac_attrs_missing_renew_true(self):
1792         creds = self._get_creds()
1793         tgt = self.get_tgt(creds, pac_request=True,
1794                            expect_pac=True,
1795                            expect_pac_attrs=True,
1796                            expect_pac_attrs_pac_request=True)
1797         tgt = self._modify_tgt(tgt, renewable=True,
1798                                remove_pac_attrs=True)
1799
1800         self._renew_tgt(tgt, expected_error=0,
1801                         expect_pac=True,
1802                         expect_pac_attrs=False,
1803                         expect_requester_sid=True)
1804
1805     def test_pac_attrs_missing_rodc_renew_none(self):
1806         creds = self._get_creds(replication_allowed=True,
1807                                 revealed_to_rodc=True)
1808         tgt = self.get_tgt(creds, pac_request=None,
1809                            expect_pac=True,
1810                            expect_pac_attrs=True,
1811                            expect_pac_attrs_pac_request=None)
1812         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
1813                                remove_pac_attrs=True)
1814
1815         self._renew_tgt(tgt, expected_error=0,
1816                         expect_pac=True,
1817                         expect_pac_attrs=False,
1818                         expect_requester_sid=True)
1819
1820     def test_pac_attrs_missing_rodc_renew_false(self):
1821         creds = self._get_creds(replication_allowed=True,
1822                                 revealed_to_rodc=True)
1823         tgt = self.get_tgt(creds, pac_request=False,
1824                            expect_pac=True,
1825                            expect_pac_attrs=True,
1826                            expect_pac_attrs_pac_request=False)
1827         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
1828                                remove_pac_attrs=True)
1829
1830         self._renew_tgt(tgt, expected_error=0,
1831                         expect_pac=True,
1832                         expect_pac_attrs=False,
1833                         expect_requester_sid=True)
1834
1835     def test_pac_attrs_missing_rodc_renew_true(self):
1836         creds = self._get_creds(replication_allowed=True,
1837                                 revealed_to_rodc=True)
1838         tgt = self.get_tgt(creds, pac_request=True,
1839                            expect_pac=True,
1840                            expect_pac_attrs=True,
1841                            expect_pac_attrs_pac_request=True)
1842         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
1843                                remove_pac_attrs=True)
1844
1845         self._renew_tgt(tgt, expected_error=0,
1846                         expect_pac=True,
1847                         expect_pac_attrs=False,
1848                         expect_requester_sid=True)
1849
1850     def test_tgs_pac_attrs_none(self):
1851         creds = self._get_creds()
1852         tgt = self.get_tgt(creds, pac_request=None,
1853                            expect_pac=True,
1854                            expect_pac_attrs=True,
1855                            expect_pac_attrs_pac_request=None)
1856
1857         self._run_tgs(tgt, expected_error=0, expect_pac=True,
1858                       expect_pac_attrs=False)
1859
1860     def test_tgs_pac_attrs_false(self):
1861         creds = self._get_creds()
1862         tgt = self.get_tgt(creds, pac_request=False,
1863                            expect_pac=True,
1864                            expect_pac_attrs=True,
1865                            expect_pac_attrs_pac_request=False)
1866
1867         self._run_tgs(tgt, expected_error=0, expect_pac=False,
1868                       expect_pac_attrs=False)
1869
1870     def test_tgs_pac_attrs_true(self):
1871         creds = self._get_creds()
1872         tgt = self.get_tgt(creds, pac_request=True,
1873                            expect_pac=True,
1874                            expect_pac_attrs=True,
1875                            expect_pac_attrs_pac_request=True)
1876
1877         self._run_tgs(tgt, expected_error=0, expect_pac=True,
1878                       expect_pac_attrs=False)
1879
1880     def test_as_requester_sid(self):
1881         creds = self._get_creds()
1882
1883         samdb = self.get_samdb()
1884         sid = self.get_objectSid(samdb, creds.get_dn())
1885
1886         self.get_tgt(creds, pac_request=None,
1887                      expect_pac=True,
1888                      expected_sid=sid,
1889                      expect_requester_sid=True)
1890
1891     def test_tgs_requester_sid(self):
1892         creds = self._get_creds()
1893
1894         samdb = self.get_samdb()
1895         sid = self.get_objectSid(samdb, creds.get_dn())
1896
1897         tgt = self.get_tgt(creds, pac_request=None,
1898                            expect_pac=True,
1899                            expected_sid=sid,
1900                            expect_requester_sid=True)
1901
1902         self._run_tgs(tgt, expected_error=0, expect_pac=True,
1903                       expect_requester_sid=False)
1904
1905     def test_tgs_requester_sid_renew(self):
1906         creds = self._get_creds()
1907
1908         samdb = self.get_samdb()
1909         sid = self.get_objectSid(samdb, creds.get_dn())
1910
1911         tgt = self.get_tgt(creds, pac_request=None,
1912                            expect_pac=True,
1913                            expected_sid=sid,
1914                            expect_requester_sid=True)
1915         tgt = self._modify_tgt(tgt, renewable=True)
1916
1917         self._renew_tgt(tgt, expected_error=0, expect_pac=True,
1918                         expect_pac_attrs=True,
1919                         expect_pac_attrs_pac_request=None,
1920                         expected_sid=sid,
1921                         expect_requester_sid=True)
1922
1923     def test_tgs_requester_sid_rodc_renew(self):
1924         creds = self._get_creds(replication_allowed=True,
1925                                 revealed_to_rodc=True)
1926
1927         samdb = self.get_samdb()
1928         sid = self.get_objectSid(samdb, creds.get_dn())
1929
1930         tgt = self.get_tgt(creds, pac_request=None,
1931                            expect_pac=True,
1932                            expected_sid=sid,
1933                            expect_requester_sid=True)
1934         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True)
1935
1936         self._renew_tgt(tgt, expected_error=0, expect_pac=True,
1937                         expect_pac_attrs=False,
1938                         expected_sid=sid,
1939                         expect_requester_sid=True)
1940
1941     def test_tgs_requester_sid_missing_renew(self):
1942         creds = self._get_creds()
1943
1944         samdb = self.get_samdb()
1945         sid = self.get_objectSid(samdb, creds.get_dn())
1946
1947         tgt = self.get_tgt(creds, pac_request=None,
1948                            expect_pac=True,
1949                            expected_sid=sid,
1950                            expect_requester_sid=True)
1951         tgt = self._modify_tgt(tgt, renewable=True,
1952                                remove_requester_sid=True)
1953
1954         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1955
1956     def test_tgs_requester_sid_missing_rodc_renew(self):
1957         creds = self._get_creds(replication_allowed=True,
1958                                 revealed_to_rodc=True)
1959
1960         samdb = self.get_samdb()
1961         sid = self.get_objectSid(samdb, creds.get_dn())
1962
1963         tgt = self.get_tgt(creds, pac_request=None,
1964                            expect_pac=True,
1965                            expected_sid=sid,
1966                            expect_requester_sid=True)
1967         tgt = self._modify_tgt(tgt, from_rodc=True, renewable=True,
1968                                remove_requester_sid=True)
1969
1970         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
1971
1972     def test_tgs_requester_sid_validate(self):
1973         creds = self._get_creds()
1974
1975         samdb = self.get_samdb()
1976         sid = self.get_objectSid(samdb, creds.get_dn())
1977
1978         tgt = self.get_tgt(creds, pac_request=None,
1979                            expect_pac=True,
1980                            expected_sid=sid,
1981                            expect_requester_sid=True)
1982         tgt = self._modify_tgt(tgt, invalid=True)
1983
1984         self._validate_tgt(tgt, expected_error=0, expect_pac=True,
1985                            expect_pac_attrs=True,
1986                            expect_pac_attrs_pac_request=None,
1987                            expected_sid=sid,
1988                            expect_requester_sid=True)
1989
1990     def test_tgs_requester_sid_rodc_validate(self):
1991         creds = self._get_creds(replication_allowed=True,
1992                                 revealed_to_rodc=True)
1993
1994         samdb = self.get_samdb()
1995         sid = self.get_objectSid(samdb, creds.get_dn())
1996
1997         tgt = self.get_tgt(creds, pac_request=None,
1998                            expect_pac=True,
1999                            expected_sid=sid,
2000                            expect_requester_sid=True)
2001         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True)
2002
2003         self._validate_tgt(tgt, expected_error=0, expect_pac=True,
2004                            expect_pac_attrs=False,
2005                            expected_sid=sid,
2006                            expect_requester_sid=True)
2007
2008     def test_tgs_requester_sid_missing_validate(self):
2009         creds = self._get_creds()
2010
2011         samdb = self.get_samdb()
2012         sid = self.get_objectSid(samdb, creds.get_dn())
2013
2014         tgt = self.get_tgt(creds, pac_request=None,
2015                            expect_pac=True,
2016                            expected_sid=sid,
2017                            expect_requester_sid=True)
2018         tgt = self._modify_tgt(tgt, invalid=True,
2019                                remove_requester_sid=True)
2020
2021         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2022
2023     def test_tgs_requester_sid_missing_rodc_validate(self):
2024         creds = self._get_creds(replication_allowed=True,
2025                                 revealed_to_rodc=True)
2026
2027         samdb = self.get_samdb()
2028         sid = self.get_objectSid(samdb, creds.get_dn())
2029
2030         tgt = self.get_tgt(creds, pac_request=None,
2031                            expect_pac=True,
2032                            expected_sid=sid,
2033                            expect_requester_sid=True)
2034         tgt = self._modify_tgt(tgt, from_rodc=True, invalid=True,
2035                                remove_requester_sid=True)
2036
2037         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
2038
2039     def test_tgs_pac_request_none(self):
2040         creds = self._get_creds()
2041         tgt = self.get_tgt(creds, pac_request=None)
2042
2043         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2044
2045         pac = self.get_ticket_pac(ticket)
2046         self.assertIsNotNone(pac)
2047
2048     def test_tgs_pac_request_false(self):
2049         creds = self._get_creds()
2050         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2051
2052         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=False)
2053
2054         pac = self.get_ticket_pac(ticket, expect_pac=False)
2055         self.assertIsNone(pac)
2056
2057     def test_tgs_pac_request_true(self):
2058         creds = self._get_creds()
2059         tgt = self.get_tgt(creds, pac_request=True)
2060
2061         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2062
2063         pac = self.get_ticket_pac(ticket)
2064         self.assertIsNotNone(pac)
2065
2066     def test_renew_pac_request_none(self):
2067         creds = self._get_creds()
2068         tgt = self.get_tgt(creds, pac_request=None)
2069         tgt = self._modify_tgt(tgt, renewable=True)
2070
2071         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2072                               expect_pac_attrs=True,
2073                               expect_pac_attrs_pac_request=None,
2074                               expect_requester_sid=True)
2075
2076         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2077
2078         pac = self.get_ticket_pac(ticket)
2079         self.assertIsNotNone(pac)
2080
2081     def test_renew_pac_request_false(self):
2082         creds = self._get_creds()
2083         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2084         tgt = self._modify_tgt(tgt, renewable=True)
2085
2086         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2087                               expect_pac_attrs=True,
2088                               expect_pac_attrs_pac_request=False,
2089                               expect_requester_sid=True)
2090
2091         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=False)
2092
2093         pac = self.get_ticket_pac(ticket, expect_pac=False)
2094         self.assertIsNone(pac)
2095
2096     def test_renew_pac_request_true(self):
2097         creds = self._get_creds()
2098         tgt = self.get_tgt(creds, pac_request=True)
2099         tgt = self._modify_tgt(tgt, renewable=True)
2100
2101         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2102                               expect_pac_attrs=True,
2103                               expect_pac_attrs_pac_request=True,
2104                               expect_requester_sid=True)
2105
2106         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2107
2108         pac = self.get_ticket_pac(ticket)
2109         self.assertIsNotNone(pac)
2110
2111     def test_rodc_renew_pac_request_none(self):
2112         creds = self._get_creds(replication_allowed=True,
2113                                 revealed_to_rodc=True)
2114         tgt = self.get_tgt(creds, pac_request=None)
2115         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2116
2117         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2118                               expect_pac_attrs=False,
2119                               expect_requester_sid=True)
2120
2121         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2122
2123         pac = self.get_ticket_pac(ticket)
2124         self.assertIsNotNone(pac)
2125
2126     def test_rodc_renew_pac_request_false(self):
2127         creds = self._get_creds(replication_allowed=True,
2128                                 revealed_to_rodc=True)
2129         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2130         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2131
2132         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2133                               expect_pac_attrs=False,
2134                               expect_requester_sid=True)
2135
2136         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2137
2138         pac = self.get_ticket_pac(ticket)
2139         self.assertIsNotNone(pac)
2140
2141     def test_rodc_renew_pac_request_true(self):
2142         creds = self._get_creds(replication_allowed=True,
2143                                 revealed_to_rodc=True)
2144         tgt = self.get_tgt(creds, pac_request=True)
2145         tgt = self._modify_tgt(tgt, renewable=True, from_rodc=True)
2146
2147         tgt = self._renew_tgt(tgt, expected_error=0, expect_pac=None,
2148                               expect_pac_attrs=False,
2149                               expect_requester_sid=True)
2150
2151         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2152
2153         pac = self.get_ticket_pac(ticket)
2154         self.assertIsNotNone(pac)
2155
2156     def test_validate_pac_request_none(self):
2157         creds = self._get_creds()
2158         tgt = self.get_tgt(creds, pac_request=None)
2159         tgt = self._modify_tgt(tgt, invalid=True)
2160
2161         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2162                                  expect_pac_attrs=True,
2163                                  expect_pac_attrs_pac_request=None,
2164                                  expect_requester_sid=True)
2165
2166         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2167
2168         pac = self.get_ticket_pac(ticket)
2169         self.assertIsNotNone(pac)
2170
2171     def test_validate_pac_request_false(self):
2172         creds = self._get_creds()
2173         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2174         tgt = self._modify_tgt(tgt, invalid=True)
2175
2176         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2177                                  expect_pac_attrs=True,
2178                                  expect_pac_attrs_pac_request=False,
2179                                  expect_requester_sid=True)
2180
2181         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=False)
2182
2183         pac = self.get_ticket_pac(ticket, expect_pac=False)
2184         self.assertIsNone(pac)
2185
2186     def test_validate_pac_request_true(self):
2187         creds = self._get_creds()
2188         tgt = self.get_tgt(creds, pac_request=True)
2189         tgt = self._modify_tgt(tgt, invalid=True)
2190
2191         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2192                                  expect_pac_attrs=True,
2193                                  expect_pac_attrs_pac_request=True,
2194                                  expect_requester_sid=True)
2195
2196         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2197
2198         pac = self.get_ticket_pac(ticket)
2199         self.assertIsNotNone(pac)
2200
2201     def test_rodc_validate_pac_request_none(self):
2202         creds = self._get_creds(replication_allowed=True,
2203                                 revealed_to_rodc=True)
2204         tgt = self.get_tgt(creds, pac_request=None)
2205         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2206
2207         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2208                                  expect_pac_attrs=False,
2209                                  expect_requester_sid=True)
2210
2211         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2212
2213         pac = self.get_ticket_pac(ticket)
2214         self.assertIsNotNone(pac)
2215
2216     def test_rodc_validate_pac_request_false(self):
2217         creds = self._get_creds(replication_allowed=True,
2218                                 revealed_to_rodc=True)
2219         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2220         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2221
2222         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2223                                  expect_pac_attrs=False,
2224                                  expect_requester_sid=True)
2225
2226         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2227
2228         pac = self.get_ticket_pac(ticket)
2229         self.assertIsNotNone(pac)
2230
2231     def test_rodc_validate_pac_request_true(self):
2232         creds = self._get_creds(replication_allowed=True,
2233                                 revealed_to_rodc=True)
2234         tgt = self.get_tgt(creds, pac_request=True)
2235         tgt = self._modify_tgt(tgt, invalid=True, from_rodc=True)
2236
2237         tgt = self._validate_tgt(tgt, expected_error=0, expect_pac=None,
2238                                  expect_pac_attrs=False,
2239                                  expect_requester_sid=True)
2240
2241         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2242
2243         pac = self.get_ticket_pac(ticket)
2244         self.assertIsNotNone(pac)
2245
2246     def test_s4u2self_pac_request_none(self):
2247         creds = self._get_creds()
2248         tgt = self.get_tgt(creds, pac_request=None)
2249
2250         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2251
2252         pac = self.get_ticket_pac(ticket)
2253         self.assertIsNotNone(pac)
2254
2255     def test_s4u2self_pac_request_false(self):
2256         creds = self._get_creds()
2257         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2258
2259         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2260
2261         pac = self.get_ticket_pac(ticket)
2262         self.assertIsNotNone(pac)
2263
2264     def test_s4u2self_pac_request_true(self):
2265         creds = self._get_creds()
2266         tgt = self.get_tgt(creds, pac_request=True)
2267
2268         ticket = self._s4u2self(tgt, creds, expected_error=0, expect_pac=True)
2269
2270         pac = self.get_ticket_pac(ticket)
2271         self.assertIsNotNone(pac)
2272
2273     def test_user2user_pac_request_none(self):
2274         creds = self._get_creds()
2275         tgt = self.get_tgt(creds, pac_request=None)
2276
2277         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2278
2279         pac = self.get_ticket_pac(ticket)
2280         self.assertIsNotNone(pac)
2281
2282     def test_user2user_pac_request_false(self):
2283         creds = self._get_creds()
2284         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2285
2286         ticket = self._user2user(tgt, creds, expected_error=0,
2287                                  expect_pac=True)
2288
2289         pac = self.get_ticket_pac(ticket, expect_pac=True)
2290         self.assertIsNotNone(pac)
2291
2292     def test_user2user_pac_request_true(self):
2293         creds = self._get_creds()
2294         tgt = self.get_tgt(creds, pac_request=True)
2295
2296         ticket = self._user2user(tgt, creds, expected_error=0, expect_pac=True)
2297
2298         pac = self.get_ticket_pac(ticket)
2299         self.assertIsNotNone(pac)
2300
2301     def test_user2user_user_pac_request_none(self):
2302         creds = self._get_creds()
2303         tgt = self.get_tgt(creds)
2304
2305         user_creds = self._get_mach_creds()
2306         user_tgt = self.get_tgt(user_creds, pac_request=None)
2307
2308         ticket = self._user2user(tgt, creds, expected_error=0,
2309                                  user_tgt=user_tgt, expect_pac=True)
2310
2311         pac = self.get_ticket_pac(ticket)
2312         self.assertIsNotNone(pac)
2313
2314     def test_user2user_user_pac_request_false(self):
2315         creds = self._get_creds()
2316         tgt = self.get_tgt(creds)
2317
2318         user_creds = self._get_mach_creds()
2319         user_tgt = self.get_tgt(user_creds, pac_request=False, expect_pac=None)
2320
2321         ticket = self._user2user(tgt, creds, expected_error=0,
2322                                  user_tgt=user_tgt, expect_pac=False)
2323
2324         pac = self.get_ticket_pac(ticket, expect_pac=False)
2325         self.assertIsNone(pac)
2326
2327     def test_user2user_user_pac_request_true(self):
2328         creds = self._get_creds()
2329         tgt = self.get_tgt(creds)
2330
2331         user_creds = self._get_mach_creds()
2332         user_tgt = self.get_tgt(user_creds, pac_request=True)
2333
2334         ticket = self._user2user(tgt, creds, expected_error=0,
2335                                  user_tgt=user_tgt, expect_pac=True)
2336
2337         pac = self.get_ticket_pac(ticket)
2338         self.assertIsNotNone(pac)
2339
2340     def test_fast_pac_request_none(self):
2341         creds = self._get_creds()
2342         tgt = self.get_tgt(creds, pac_request=None)
2343
2344         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2345
2346         pac = self.get_ticket_pac(ticket)
2347         self.assertIsNotNone(pac)
2348
2349     def test_fast_pac_request_false(self):
2350         creds = self._get_creds()
2351         tgt = self.get_tgt(creds, pac_request=False)
2352
2353         ticket = self._fast(tgt, creds, expected_error=0,
2354                             expect_pac=True)
2355
2356         pac = self.get_ticket_pac(ticket, expect_pac=True)
2357         self.assertIsNotNone(pac)
2358
2359     def test_fast_pac_request_true(self):
2360         creds = self._get_creds()
2361         tgt = self.get_tgt(creds, pac_request=True)
2362
2363         ticket = self._fast(tgt, creds, expected_error=0, expect_pac=True)
2364
2365         pac = self.get_ticket_pac(ticket)
2366         self.assertIsNotNone(pac)
2367
2368     def test_tgs_rodc_pac_request_none(self):
2369         creds = self._get_creds(replication_allowed=True,
2370                                 revealed_to_rodc=True)
2371         tgt = self.get_tgt(creds, pac_request=None)
2372         tgt = self._modify_tgt(tgt, from_rodc=True)
2373
2374         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2375
2376         pac = self.get_ticket_pac(ticket)
2377         self.assertIsNotNone(pac)
2378
2379     def test_tgs_rodc_pac_request_false(self):
2380         creds = self._get_creds(replication_allowed=True,
2381                                 revealed_to_rodc=True)
2382         tgt = self.get_tgt(creds, pac_request=False, expect_pac=None)
2383         tgt = self._modify_tgt(tgt, from_rodc=True)
2384
2385         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2386
2387         pac = self.get_ticket_pac(ticket)
2388         self.assertIsNotNone(pac)
2389
2390     def test_tgs_rodc_pac_request_true(self):
2391         creds = self._get_creds(replication_allowed=True,
2392                                 revealed_to_rodc=True)
2393         tgt = self.get_tgt(creds, pac_request=True)
2394         tgt = self._modify_tgt(tgt, from_rodc=True)
2395
2396         ticket = self._run_tgs(tgt, expected_error=0, expect_pac=True)
2397
2398         pac = self.get_ticket_pac(ticket)
2399         self.assertIsNotNone(pac)
2400
2401     def test_tgs_rename(self):
2402         creds = self.get_cached_creds(account_type=self.AccountType.USER,
2403                                       use_cache=False)
2404         tgt = self.get_tgt(creds)
2405
2406         # Rename the account.
2407         new_name = self.get_new_username()
2408
2409         samdb = self.get_samdb()
2410         msg = ldb.Message(creds.get_dn())
2411         msg['sAMAccountName'] = ldb.MessageElement(new_name,
2412                                                    ldb.FLAG_MOD_REPLACE,
2413                                                    'sAMAccountName')
2414         samdb.modify(msg)
2415
2416         self._run_tgs(tgt, expected_error=(KDC_ERR_TGT_REVOKED,
2417                                            KDC_ERR_C_PRINCIPAL_UNKNOWN))
2418
2419     def _modify_renewable(self, enc_part):
2420         # Set the renewable flag.
2421         renewable_flag = krb5_asn1.TicketFlags('renewable')
2422         pos = len(tuple(renewable_flag)) - 1
2423
2424         flags = enc_part['flags']
2425         self.assertLessEqual(pos, len(flags))
2426
2427         new_flags = flags[:pos] + '1' + flags[pos + 1:]
2428         enc_part['flags'] = new_flags
2429
2430         # Set the renew-till time to be in the future.
2431         renew_till = self.get_KerberosTime(offset=100 * 60 * 60)
2432         enc_part['renew-till'] = renew_till
2433
2434         return enc_part
2435
2436     def _modify_invalid(self, enc_part):
2437         # Set the invalid flag.
2438         invalid_flag = krb5_asn1.TicketFlags('invalid')
2439         pos = len(tuple(invalid_flag)) - 1
2440
2441         flags = enc_part['flags']
2442         self.assertLessEqual(pos, len(flags))
2443
2444         new_flags = flags[:pos] + '1' + flags[pos + 1:]
2445         enc_part['flags'] = new_flags
2446
2447         # Set the ticket start time to be in the past.
2448         past_time = self.get_KerberosTime(offset=-100 * 60 * 60)
2449         enc_part['starttime'] = past_time
2450
2451         return enc_part
2452
2453     def _get_tgt(self,
2454                  client_creds,
2455                  renewable=False,
2456                  invalid=False,
2457                  from_rodc=False,
2458                  new_rid=None,
2459                  remove_pac=False,
2460                  allow_empty_authdata=False,
2461                  can_modify_logon_info=True,
2462                  can_modify_requester_sid=True,
2463                  remove_pac_attrs=False,
2464                  remove_requester_sid=False):
2465         self.assertFalse(renewable and invalid)
2466
2467         if remove_pac:
2468             self.assertIsNone(new_rid)
2469
2470         tgt = self.get_tgt(client_creds)
2471
2472         return self._modify_tgt(
2473             tgt=tgt,
2474             renewable=renewable,
2475             invalid=invalid,
2476             from_rodc=from_rodc,
2477             new_rid=new_rid,
2478             remove_pac=remove_pac,
2479             allow_empty_authdata=allow_empty_authdata,
2480             can_modify_logon_info=can_modify_logon_info,
2481             can_modify_requester_sid=can_modify_requester_sid,
2482             remove_pac_attrs=remove_pac_attrs,
2483             remove_requester_sid=remove_requester_sid)
2484
2485     def _modify_tgt(self,
2486                     tgt,
2487                     renewable=False,
2488                     invalid=False,
2489                     from_rodc=False,
2490                     new_rid=None,
2491                     remove_pac=False,
2492                     allow_empty_authdata=False,
2493                     cname=None,
2494                     crealm=None,
2495                     can_modify_logon_info=True,
2496                     can_modify_requester_sid=True,
2497                     remove_pac_attrs=False,
2498                     remove_requester_sid=False):
2499         if from_rodc:
2500             krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2501         else:
2502             krbtgt_creds = self.get_krbtgt_creds()
2503
2504         if new_rid is not None or remove_requester_sid or remove_pac_attrs:
2505             def change_sid_fn(pac):
2506                 pac_buffers = pac.buffers
2507                 for pac_buffer in pac_buffers:
2508                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
2509                         if new_rid is not None and can_modify_logon_info:
2510                             logon_info = pac_buffer.info.info
2511
2512                             logon_info.info3.base.rid = new_rid
2513                     elif pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID:
2514                         if remove_requester_sid:
2515                             pac.num_buffers -= 1
2516                             pac_buffers.remove(pac_buffer)
2517                         elif new_rid is not None and can_modify_requester_sid:
2518                             requester_sid = pac_buffer.info
2519
2520                             samdb = self.get_samdb()
2521                             domain_sid = samdb.get_domain_sid()
2522
2523                             new_sid = f'{domain_sid}-{new_rid}'
2524
2525                             requester_sid.sid = security.dom_sid(new_sid)
2526                     elif pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO:
2527                         if remove_pac_attrs:
2528                             pac.num_buffers -= 1
2529                             pac_buffers.remove(pac_buffer)
2530
2531                 pac.buffers = pac_buffers
2532
2533                 return pac
2534         else:
2535             change_sid_fn = None
2536
2537         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2538
2539         if remove_pac:
2540             checksum_keys = None
2541         else:
2542             checksum_keys = {
2543                 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
2544             }
2545
2546         if renewable:
2547             flags_modify_fn = self._modify_renewable
2548         elif invalid:
2549             flags_modify_fn = self._modify_invalid
2550         else:
2551             flags_modify_fn = None
2552
2553         if cname is not None or crealm is not None:
2554             def modify_fn(enc_part):
2555                 if flags_modify_fn is not None:
2556                     enc_part = flags_modify_fn(enc_part)
2557
2558                 if cname is not None:
2559                     enc_part['cname'] = cname
2560
2561                 if crealm is not None:
2562                     enc_part['crealm'] = crealm
2563
2564                 return enc_part
2565         else:
2566             modify_fn = flags_modify_fn
2567
2568         if cname is not None:
2569             def modify_pac_fn(pac):
2570                 if change_sid_fn is not None:
2571                     pac = change_sid_fn(pac)
2572
2573                 for pac_buffer in pac.buffers:
2574                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
2575                         logon_info = pac_buffer.info
2576
2577                         logon_info.account_name = (
2578                             cname['name-string'][0].decode('utf-8'))
2579
2580                 return pac
2581         else:
2582             modify_pac_fn = change_sid_fn
2583
2584         return self.modified_ticket(
2585             tgt,
2586             new_ticket_key=krbtgt_key,
2587             modify_fn=modify_fn,
2588             modify_pac_fn=modify_pac_fn,
2589             exclude_pac=remove_pac,
2590             allow_empty_authdata=allow_empty_authdata,
2591             update_pac_checksums=not remove_pac,
2592             checksum_keys=checksum_keys)
2593
2594     def _remove_rodc_partial_secrets(self):
2595         samdb = self.get_samdb()
2596
2597         rodc_ctx = self.get_mock_rodc_ctx()
2598         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2599
2600         def add_rodc_partial_secrets():
2601             msg = ldb.Message()
2602             msg.dn = rodc_dn
2603             msg['userAccountControl'] = ldb.MessageElement(
2604                 str(rodc_ctx.userAccountControl),
2605                 ldb.FLAG_MOD_REPLACE,
2606                 'userAccountControl')
2607             samdb.modify(msg)
2608
2609         self.addCleanup(add_rodc_partial_secrets)
2610
2611         uac = rodc_ctx.userAccountControl & ~dsdb.UF_PARTIAL_SECRETS_ACCOUNT
2612
2613         msg = ldb.Message()
2614         msg.dn = rodc_dn
2615         msg['userAccountControl'] = ldb.MessageElement(
2616             str(uac),
2617             ldb.FLAG_MOD_REPLACE,
2618             'userAccountControl')
2619         samdb.modify(msg)
2620
2621     def _remove_rodc_krbtgt_link(self):
2622         samdb = self.get_samdb()
2623
2624         rodc_ctx = self.get_mock_rodc_ctx()
2625         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
2626
2627         def add_rodc_krbtgt_link():
2628             msg = ldb.Message()
2629             msg.dn = rodc_dn
2630             msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2631                 rodc_ctx.new_krbtgt_dn,
2632                 ldb.FLAG_MOD_ADD,
2633                 'msDS-KrbTgtLink')
2634             samdb.modify(msg)
2635
2636         self.addCleanup(add_rodc_krbtgt_link)
2637
2638         msg = ldb.Message()
2639         msg.dn = rodc_dn
2640         msg['msDS-KrbTgtLink'] = ldb.MessageElement(
2641             [],
2642             ldb.FLAG_MOD_DELETE,
2643             'msDS-KrbTgtLink')
2644         samdb.modify(msg)
2645
2646     def _get_creds(self,
2647                    replication_allowed=False,
2648                    replication_denied=False,
2649                    revealed_to_rodc=False):
2650         return self.get_cached_creds(
2651             account_type=self.AccountType.COMPUTER,
2652             opts={
2653                 'allowed_replication_mock': replication_allowed,
2654                 'denied_replication_mock': replication_denied,
2655                 'revealed_to_mock_rodc': revealed_to_rodc,
2656                 'id': 0
2657             })
2658
2659     def _get_existing_rid(self,
2660                           replication_allowed=False,
2661                           replication_denied=False,
2662                           revealed_to_rodc=False):
2663         other_creds = self.get_cached_creds(
2664             account_type=self.AccountType.COMPUTER,
2665             opts={
2666                 'allowed_replication_mock': replication_allowed,
2667                 'denied_replication_mock': replication_denied,
2668                 'revealed_to_mock_rodc': revealed_to_rodc,
2669                 'id': 1
2670             })
2671
2672         samdb = self.get_samdb()
2673
2674         other_dn = other_creds.get_dn()
2675         other_sid = self.get_objectSid(samdb, other_dn)
2676
2677         other_rid = int(other_sid.rsplit('-', 1)[1])
2678
2679         return other_rid
2680
2681     def _get_mach_creds(self):
2682         return self.get_cached_creds(
2683             account_type=self.AccountType.COMPUTER,
2684             opts={
2685                 'allowed_replication_mock': True,
2686                 'denied_replication_mock': False,
2687                 'revealed_to_mock_rodc': True,
2688                 'id': 2
2689             })
2690
2691     def _get_non_existent_rid(self):
2692         return (1 << 30) - 1
2693
2694     def _run_tgs(self, tgt, expected_error, expect_pac=True,
2695                  expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2696                  expect_requester_sid=None, expected_sid=None):
2697         target_creds = self.get_service_creds()
2698         return self._tgs_req(
2699             tgt, expected_error, target_creds,
2700             expect_pac=expect_pac,
2701             expect_pac_attrs=expect_pac_attrs,
2702             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2703             expect_requester_sid=expect_requester_sid,
2704             expected_sid=expected_sid)
2705
2706     # These tests fail against Windows, which does not implement ticket
2707     # renewal.
2708     def _renew_tgt(self, tgt, expected_error, expect_pac=True,
2709                    expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
2710                    expect_requester_sid=None, expected_sid=None):
2711         krbtgt_creds = self.get_krbtgt_creds()
2712         kdc_options = str(krb5_asn1.KDCOptions('renew'))
2713         return self._tgs_req(
2714             tgt, expected_error, krbtgt_creds,
2715             kdc_options=kdc_options,
2716             expect_pac=expect_pac,
2717             expect_pac_attrs=expect_pac_attrs,
2718             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2719             expect_requester_sid=expect_requester_sid,
2720             expected_sid=expected_sid)
2721
2722     # These tests fail against Windows, which does not implement ticket
2723     # validation.
2724     def _validate_tgt(self, tgt, expected_error, expect_pac=True,
2725                       expect_pac_attrs=None,
2726                       expect_pac_attrs_pac_request=None,
2727                       expect_requester_sid=None,
2728                       expected_sid=None):
2729         krbtgt_creds = self.get_krbtgt_creds()
2730         kdc_options = str(krb5_asn1.KDCOptions('validate'))
2731         return self._tgs_req(
2732             tgt, expected_error, krbtgt_creds,
2733             kdc_options=kdc_options,
2734             expect_pac=expect_pac,
2735             expect_pac_attrs=expect_pac_attrs,
2736             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2737             expect_requester_sid=expect_requester_sid,
2738             expected_sid=expected_sid)
2739
2740     def _s4u2self(self, tgt, tgt_creds, expected_error, expect_pac=True,
2741                   expect_edata=False, expected_status=None):
2742         user_creds = self._get_mach_creds()
2743
2744         user_name = user_creds.get_username()
2745         user_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2746                                                names=[user_name])
2747         user_realm = user_creds.get_realm()
2748
2749         def generate_s4u2self_padata(_kdc_exchange_dict,
2750                                      _callback_dict,
2751                                      req_body):
2752             padata = self.PA_S4U2Self_create(
2753                 name=user_cname,
2754                 realm=user_realm,
2755                 tgt_session_key=tgt.session_key,
2756                 ctype=None)
2757
2758             return [padata], req_body
2759
2760         return self._tgs_req(tgt, expected_error, tgt_creds,
2761                              expected_cname=user_cname,
2762                              generate_padata_fn=generate_s4u2self_padata,
2763                              expect_claims=False, expect_edata=expect_edata,
2764                              expected_status=expected_status,
2765                              expect_pac=expect_pac)
2766
2767     def _user2user(self, tgt, tgt_creds, expected_error, sname=None,
2768                    srealm=None, user_tgt=None, expect_pac=True):
2769         if user_tgt is None:
2770             user_creds = self._get_mach_creds()
2771             user_tgt = self.get_tgt(user_creds)
2772
2773         kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
2774         return self._tgs_req(user_tgt, expected_error, tgt_creds,
2775                              kdc_options=kdc_options,
2776                              additional_ticket=tgt,
2777                              sname=sname,
2778                              srealm=srealm,
2779                              expect_pac=expect_pac)
2780
2781     def _fast(self, armor_tgt, armor_tgt_creds, expected_error,
2782               expected_sname=None, expect_pac=True):
2783         user_creds = self._get_mach_creds()
2784         user_tgt = self.get_tgt(user_creds)
2785
2786         target_creds = self.get_service_creds()
2787
2788         return self._tgs_req(user_tgt, expected_error, target_creds,
2789                              armor_tgt=armor_tgt,
2790                              expected_sname=expected_sname,
2791                              expect_pac=expect_pac)
2792
2793     def _tgs_req(self, tgt, expected_error, target_creds,
2794                  armor_tgt=None,
2795                  kdc_options='0',
2796                  expected_cname=None,
2797                  expected_sname=None,
2798                  additional_ticket=None,
2799                  generate_padata_fn=None,
2800                  sname=None,
2801                  srealm=None,
2802                  use_fast=False,
2803                  expect_claims=True,
2804                  expect_pac=True,
2805                  expect_pac_attrs=None,
2806                  expect_pac_attrs_pac_request=None,
2807                  expect_requester_sid=None,
2808                  expect_edata=False,
2809                  expected_sid=None,
2810                  expected_status=None):
2811         if srealm is False:
2812             srealm = None
2813         elif srealm is None:
2814             srealm = target_creds.get_realm()
2815
2816         if sname is False:
2817             sname = None
2818             if expected_sname is None:
2819                 expected_sname = self.get_krbtgt_sname()
2820         else:
2821             if sname is None:
2822                 target_name = target_creds.get_username()
2823                 if target_name == 'krbtgt':
2824                     sname = self.PrincipalName_create(
2825                         name_type=NT_SRV_INST,
2826                         names=[target_name, srealm])
2827                 else:
2828                     if target_name[-1] == '$':
2829                         target_name = target_name[:-1]
2830                     sname = self.PrincipalName_create(
2831                         name_type=NT_PRINCIPAL,
2832                         names=['host', target_name])
2833
2834             if expected_sname is None:
2835                 expected_sname = sname
2836
2837         if additional_ticket is not None:
2838             additional_tickets = [additional_ticket.ticket]
2839             decryption_key = additional_ticket.session_key
2840         else:
2841             additional_tickets = None
2842             decryption_key = self.TicketDecryptionKey_from_creds(
2843                 target_creds)
2844
2845         subkey = self.RandomKey(tgt.session_key.etype)
2846
2847         if armor_tgt is not None:
2848             armor_subkey = self.RandomKey(subkey.etype)
2849             explicit_armor_key = self.generate_armor_key(armor_subkey,
2850                                                          armor_tgt.session_key)
2851             armor_key = kcrypto.cf2(explicit_armor_key.key,
2852                                     subkey.key,
2853                                     b'explicitarmor',
2854                                     b'tgsarmor')
2855             armor_key = Krb5EncryptionKey(armor_key, None)
2856
2857             generate_fast_fn = self.generate_simple_fast
2858             generate_fast_armor_fn = self.generate_ap_req
2859
2860             pac_options = '1'  # claims support
2861         else:
2862             armor_subkey = None
2863             armor_key = None
2864             generate_fast_fn = None
2865             generate_fast_armor_fn = None
2866
2867             pac_options = None
2868
2869         etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
2870
2871         if expected_error:
2872             check_error_fn = self.generic_check_kdc_error
2873             check_rep_fn = None
2874         else:
2875             check_error_fn = None
2876             check_rep_fn = self.generic_check_kdc_rep
2877
2878         if expected_cname is None:
2879             expected_cname = tgt.cname
2880
2881         kdc_exchange_dict = self.tgs_exchange_dict(
2882             expected_crealm=tgt.crealm,
2883             expected_cname=expected_cname,
2884             expected_srealm=srealm,
2885             expected_sname=expected_sname,
2886             ticket_decryption_key=decryption_key,
2887             generate_padata_fn=generate_padata_fn,
2888             generate_fast_fn=generate_fast_fn,
2889             generate_fast_armor_fn=generate_fast_armor_fn,
2890             check_error_fn=check_error_fn,
2891             check_rep_fn=check_rep_fn,
2892             check_kdc_private_fn=self.generic_check_kdc_private,
2893             expected_error_mode=expected_error,
2894             expected_status=expected_status,
2895             tgt=tgt,
2896             armor_key=armor_key,
2897             armor_tgt=armor_tgt,
2898             armor_subkey=armor_subkey,
2899             pac_options=pac_options,
2900             authenticator_subkey=subkey,
2901             kdc_options=kdc_options,
2902             expect_edata=expect_edata,
2903             expect_pac=expect_pac,
2904             expect_pac_attrs=expect_pac_attrs,
2905             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
2906             expect_requester_sid=expect_requester_sid,
2907             expected_sid=expected_sid,
2908             expect_claims=expect_claims)
2909
2910         rep = self._generic_kdc_exchange(kdc_exchange_dict,
2911                                          cname=None,
2912                                          realm=srealm,
2913                                          sname=sname,
2914                                          etypes=etypes,
2915                                          additional_tickets=additional_tickets)
2916         if expected_error:
2917             self.check_error_rep(rep, expected_error)
2918             return None
2919         else:
2920             self.check_reply(rep, KRB_TGS_REP)
2921             return kdc_exchange_dict['rep_ticket_creds']
2922
2923
2924 if __name__ == "__main__":
2925     global_asn1_print = False
2926     global_hexdump = False
2927     import unittest
2928     unittest.main()