python:tests: Catch failures to authenticate with gMSA managed passwords
[cs/samba-autobuild/.git] / python / samba / tests / samba_tool / user_getpassword_gmsa.py
1 # Unix SMB/CIFS implementation.
2 #
3 # Blackbox tests for reading Group Managed Service Account passwords
4 #
5 # Copyright (C) Catalyst.Net Ltd. 2023
6 #
7 # Written by Rob van der Linde <rob@catalyst.net.nz>
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 import os
24 import sys
25
26 sys.path.insert(0, "bin/python")
27 os.environ["PYTHONUNBUFFERED"] = "1"
28
29 import datetime
30 import shlex
31
32 from ldb import ERR_INVALID_CREDENTIALS, LdbError, SCOPE_BASE
33
34 from samba.credentials import MUST_USE_KERBEROS
35 from samba.dcerpc import samr, security
36 from samba.domain.models import GroupManagedServiceAccount, User
37 from samba.ndr import ndr_unpack
38 from samba.nt_time import nt_time_from_datetime
39 from samba.tests import BlackboxTestCase, connect_samdb
40
41 DC_SERVER = os.environ["SERVER"]
42 SERVER = os.environ["SERVER"]
43 SERVER_USERNAME = os.environ["USERNAME"]
44 SERVER_PASSWORD = os.environ["PASSWORD"]
45
46 HOST = f"ldap://{SERVER}"
47 CREDS = f"-U{SERVER_USERNAME}%{SERVER_PASSWORD}"
48
49
50 class GMSAPasswordTest(BlackboxTestCase):
51     """Blackbox tests for GMSA getpassword and connecting as that user."""
52
53     @classmethod
54     def setUpClass(cls):
55         cls.lp = cls.get_loadparm()
56         cls.env_creds = cls.get_env_credentials(lp=cls.lp,
57                                                 env_username="USERNAME",
58                                                 env_password="PASSWORD",
59                                                 env_domain="DOMAIN",
60                                                 env_realm="REALM")
61         cls.samdb = connect_samdb(HOST, lp=cls.lp, credentials=cls.env_creds)
62         super().setUpClass()
63
64     @classmethod
65     def setUpTestData(cls):
66         cls.gmsa = GroupManagedServiceAccount.create(
67             cls.samdb,
68             name="GMSA_Test_User",
69             dns_host_name="samba.example.com",
70             managed_password_interval=1,
71             group_msa_membership=f"O:SYD:(A;;RP;;;{cls.samdb.connecting_user_sid})")
72
73         cls.addClassCleanup(cls.gmsa.delete, cls.samdb)
74
75     def getpassword(self, attrs):
76         shattrs = shlex.quote(attrs)
77         cmd = f"user getpassword --attributes={shattrs} {self.gmsa.account_name}"
78
79         ldif = self.check_output(cmd).decode()
80         res = self.samdb.parse_ldif(ldif)
81         _, user_message = next(res)
82
83         # check each attr is returned
84         for attr in attrs.split(","):
85             self.assertIn(attr, user_message)
86
87         return user_message
88
89     def test_getpassword(self):
90         self.getpassword("virtualClearTextUTF16,unicodePwd")
91         self.getpassword("virtualClearTextUTF16")
92         self.getpassword("unicodePwd")
93
94     def test_utf16_password(self):
95         user_msg = self.getpassword("virtualClearTextUTF16")
96         password = user_msg["virtualClearTextUTF16"][0]
97
98         creds = self.insta_creds(template=self.env_creds)
99         creds.set_username(self.gmsa.account_name)
100         creds.set_utf16_password(password)
101         try:
102             db = connect_samdb(HOST, credentials=creds, lp=self.lp)
103         except LdbError as err:
104             num, _ = err.args
105             if num == ERR_INVALID_CREDENTIALS:
106                 self.fail('failed to authenticate using credentials')
107
108             raise
109
110         msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
111         connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
112
113         self.assertEqual(self.gmsa.object_sid, connecting_user_sid)
114
115     def test_utf8_password(self):
116         user_msg = self.getpassword("virtualClearTextUTF8")
117         password = str(user_msg["virtualClearTextUTF8"][0])
118
119         creds = self.insta_creds(template=self.env_creds)
120         # Because the password has been converted to utf-8 via UTF16_MUNGED
121         # the nthash is no longer valid. We need to use AES kerberos ciphers
122         # for this to work.
123         creds.set_kerberos_state(MUST_USE_KERBEROS)
124         creds.set_username(self.gmsa.account_name)
125         creds.set_password(password)
126         try:
127             db = connect_samdb(HOST, credentials=creds, lp=self.lp)
128         except LdbError as err:
129             num, _ = err.args
130             if num == ERR_INVALID_CREDENTIALS:
131                 self.fail('failed to authenticate using credentials')
132
133             raise
134
135         msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
136         connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
137
138         self.assertEqual(self.gmsa.object_sid, connecting_user_sid)
139
140     def test_unicode_pwd(self):
141         user_msg = self.getpassword("unicodePwd")
142
143         creds = self.insta_creds(template=self.env_creds)
144         creds.set_username(self.gmsa.account_name)
145         nt_pass = samr.Password()
146         nt_pass.hash = list(user_msg["unicodePwd"][0])
147         creds.set_nt_hash(nt_pass)
148         try:
149             db = connect_samdb(HOST, credentials=creds, lp=self.lp)
150         except LdbError as err:
151             num, _ = err.args
152             if num == ERR_INVALID_CREDENTIALS:
153                 self.fail('failed to authenticate using credentials')
154
155             raise
156
157         msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
158         connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
159
160         self.assertEqual(self.gmsa.object_sid, connecting_user_sid)
161
162     def test_querytime(self):
163         user_msg = self.getpassword("virtualManagedPasswordQueryTime")
164         querytime = int(user_msg["virtualManagedPasswordQueryTime"][0])
165
166         # Just assert the number makes sense
167         self.assertGreater(querytime, nt_time_from_datetime(datetime.datetime.now(tz=datetime.timezone.utc)))
168         self.assertLess(querytime, nt_time_from_datetime(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=21)))
169
170     def test_querytime_unixtime(self):
171         user_msg = self.getpassword("virtualManagedPasswordQueryTime;format=UnixTime")
172         querytime = int(user_msg["virtualManagedPasswordQueryTime;format=UnixTime"][0])
173
174         # Just assert the number makes sense
175         self.assertGreater(querytime, datetime.datetime.now(tz=datetime.timezone.utc).timestamp())
176         self.assertLess(querytime, (datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=21)).timestamp())
177
178     @classmethod
179     def _make_cmdline(cls, line):
180         """Override to pass line as samba-tool subcommand instead.
181
182         Automatically fills in HOST and CREDS as well.
183         """
184         if isinstance(line, list):
185             cmd = ["samba-tool"] + line + ["-H", SERVER, CREDS]
186         else:
187             cmd = f"samba-tool {line} -H {HOST} {CREDS}"
188
189         return super()._make_cmdline(cmd)
190
191
192 if __name__ == "__main__":
193     import unittest
194     unittest.main()