2 # Helper classes for testing the Group Key Distribution Service.
4 # Copyright (C) Catalyst.Net Ltd 2023
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <https://www.gnu.org/licenses/>.
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
28 from typing import Final, NewType, Optional, Tuple, Union
32 from cryptography.hazmat.backends import default_backend
33 from cryptography.hazmat.primitives import hashes
34 from cryptography.hazmat.primitives.kdf.kbkdf import CounterLocation, KBKDFHMAC, Mode
44 from samba.credentials import Credentials
45 from samba.dcerpc import gkdi, misc
46 from samba.gkdi import (
56 from samba.ndr import ndr_pack, ndr_unpack
57 from samba.nt_time import (
58 nt_time_from_datetime,
61 timedelta_from_nt_time_delta,
63 from samba.param import LoadParm
64 from samba.samdb import SamDB
66 from samba.tests import delete_force, TestCase
69 HResult = NewType("HResult", int)
70 RootKey = NewType("RootKey", ldb.Message)
73 ROOT_KEY_START_TIME: Final = NtTime(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
76 class GetKeyError(Exception):
77 def __init__(self, status: HResult, message: str):
78 super().__init__(status, message)
81 class GkdiBaseTest(TestCase):
82 # This is the NDR‐encoded security descriptor O:SYD:(A;;FRFW;;;S-1-5-9).
84 b"\x01\x00\x04\x800\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
85 b"\x14\x00\x00\x00\x02\x00\x1c\x00\x01\x00\x00\x00\x00\x00\x14\x00"
86 b"\x9f\x01\x12\x00\x01\x01\x00\x00\x00\x00\x00\x05\t\x00\x00\x00"
87 b"\x01\x01\x00\x00\x00\x00\x00\x05\x12\x00\x00\x00"
91 def current_time(offset: Optional[datetime.timedelta] = None) -> datetime.datetime:
93 # Allow for clock skew.
94 offset = timedelta_from_nt_time_delta(MAX_CLOCK_SKEW)
96 current_time = datetime.datetime.now(tz=datetime.timezone.utc)
97 return current_time + offset
99 def current_nt_time(self, offset: Optional[datetime.timedelta] = None) -> NtTime:
100 return nt_time_from_datetime(self.current_time(offset))
102 def current_gkid(self, offset: Optional[datetime.timedelta] = None) -> Gkid:
103 return Gkid.from_nt_time(self.current_nt_time(offset))
106 self, host: str, lp: LoadParm, server_creds: Credentials
109 return gkdi.gkdi(f"ncacn_ip_tcp:{host}[seal]", lp, server_creds)
110 except NTSTATUSError as err:
111 if err.args[0] == ntstatus.NT_STATUS_PORT_UNREACHABLE:
113 "Try starting the Microsoft Key Distribution Service (KdsSvc).\n"
114 "In PowerShell, run:\n\tStart-Service -Name KdsSvc"
123 root_key_id: Optional[misc.GUID],
126 out_len, out, result = conn.GetKey(
127 list(target_sd), root_key_id, gkid.l0_idx, gkid.l1_idx, gkid.l2_idx
129 result_code, result_string = result
132 and result_code & 0xFFFF == werror.WERR_TOO_MANY_OPEN_FILES
135 "The server has given up selecting a root key because there are too"
136 " many keys (more than 1000) in the Master Root Keys container. Delete"
137 " some root keys and try again."
139 if result != (0, None):
140 raise GetKeyError(result_code, result_string)
141 self.assertEqual(len(out), out_len, "output len mismatch")
143 envelope = ndr_unpack(gkdi.GroupKeyEnvelope, bytes(out))
145 gkid = Gkid(envelope.l0_index, envelope.l1_index, envelope.l2_index)
146 l1_key = bytes(envelope.l1_key) if envelope.l1_key else None
147 l2_key = bytes(envelope.l2_key) if envelope.l2_key else None
149 hash_algorithm = Algorithm.from_kdf_parameters(bytes(envelope.kdf_parameters))
151 root_key_id = envelope.root_key_id
153 return SeedKeyPair(l1_key, l2_key, gkid, hash_algorithm, root_key_id)
155 def get_root_key_object(
156 self, samdb: SamDB, root_key_id: Optional[misc.GUID], gkid: Gkid
157 ) -> Tuple[RootKey, misc.GUID]:
158 """Return a root key object and its corresponding GUID.
160 *root_key_id* specifies the GUID of the root key object to return. It
161 can be ``None`` to indicate that the selected key should be the most
162 recently created key starting not after the time indicated by *gkid*.
164 Bear in mind as that the Microsoft Key Distribution Service caches root
165 keys, the most recently created key might not be the one that Windows
171 "msKds-KDFAlgorithmID",
174 "msKds-UseStartTime",
178 gkid_start_nt_time = gkid.start_nt_time()
180 exact_key_specified = root_key_id is not None
181 if exact_key_specified:
182 root_key_dn = self.get_root_key_container_dn(samdb)
183 root_key_dn.add_child(f"CN={root_key_id}")
186 root_key_res = samdb.search(
187 root_key_dn, scope=ldb.SCOPE_BASE, attrs=root_key_attrs
189 except ldb.LdbError as err:
190 if err.args[0] == ldb.ERR_NO_SUCH_OBJECT:
191 raise GetKeyError(HRES_NTE_NO_KEY, "no such root key exists")
195 root_key_object = root_key_res[0]
197 root_keys = samdb.search(
198 self.get_root_key_container_dn(samdb),
199 scope=ldb.SCOPE_SUBTREE,
200 expression=f"(msKds-UseStartTime<={gkid_start_nt_time})",
201 attrs=root_key_attrs,
205 HRES_NTE_NO_KEY, "no root keys exist at specified time"
208 def root_key_create_time(key: RootKey) -> NtTime:
209 create_time = key.get("msKds-CreateTime", idx=0)
210 if create_time is None:
213 return NtTime(int(create_time))
215 root_key_object = max(root_keys, key=root_key_create_time)
217 root_key_cn = root_key_object.get("cn", idx=0)
218 self.assertIsNotNone(root_key_cn)
219 root_key_id = misc.GUID(root_key_cn)
221 use_start_nt_time = NtTime(
222 int(root_key_object.get("msKds-UseStartTime", idx=0))
224 if use_start_nt_time == 0:
225 raise GetKeyError(HRES_NTE_BAD_KEY, "root key effective time is 0")
226 use_start_nt_time = NtTime(
227 use_start_nt_time - NtTimeDelta(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
230 if exact_key_specified and not (0 <= use_start_nt_time <= gkid_start_nt_time):
231 raise GetKeyError(HRES_E_INVALIDARG, "root key is not yet valid")
233 return root_key_object, root_key_id
235 def validate_get_key_request(
236 self, gkid: Gkid, current_gkid: Gkid, root_key_specified: bool
238 if gkid > current_gkid:
240 HRES_E_INVALIDARG, "invalid request for a key from the future"
243 gkid_type = gkid.gkid_type()
244 if gkid_type is GkidType.DEFAULT:
246 " derived from the specified root key" if root_key_specified else ""
248 raise NotImplementedError(
249 f"The latest group key{derived_from} is being requested."
252 if gkid_type is not GkidType.L2_SEED_KEY:
254 HRES_E_INVALIDARG, f"invalid request for {gkid_type.description()}"
260 target_sd: bytes, # An NDR‐encoded valid security descriptor in self‐relative format.
261 root_key_id: Optional[misc.GUID],
264 root_key_id_hint: Optional[misc.GUID] = None,
265 current_gkid: Optional[Gkid] = None,
267 """Emulate the ISDKey.GetKey() RPC method.
269 When passed a NULL root key ID, GetKey() may use a cached root key
270 rather than picking the most recently created applicable key as the
271 documentation implies. If it’s important to arrive at the same result as
272 Windows, pass a GUID in the *root_key_id_hint* parameter to specify a
273 particular root key to use."""
275 if current_gkid is None:
276 current_gkid = self.current_gkid()
278 root_key_specified = root_key_id is not None
279 if root_key_specified:
281 root_key_id_hint, "don’t provide both root key ID parameters"
284 self.validate_get_key_request(gkid, current_gkid, root_key_specified)
286 root_key_object, root_key_id = self.get_root_key_object(
287 samdb, root_key_id if root_key_specified else root_key_id_hint, gkid
290 if root_key_specified:
291 if gkid.l0_idx < current_gkid.l0_idx:
292 # All of the seed keys with an L0 index less than the current L0
293 # index are from the past and thus are safe to return. If the
294 # caller has requested a specific seed key with a past L0 index,
295 # return the L1 seed key (L0, 31, −1), from which any L1 or L2
296 # seed key having that L0 index can be derived.
297 l1_gkid = Gkid(gkid.l0_idx, 31, -1)
298 seed_key = self.compute_seed_key(
299 target_sd, root_key_id, root_key_object, l1_gkid
304 Gkid(gkid.l0_idx, 31, 31),
305 seed_key.hash_algorithm,
309 # All of the previous seed keys with an L0 index equal to the
310 # current L0 index can be derived from the current seed key or from
311 # the next older L1 seed key.
314 if gkid.l2_idx == 31:
315 # The current seed key, and all previous seed keys with that same L0
316 # index, can be derived from the L1 seed key (L0, L1, 31).
317 l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx, -1)
318 seed_key = self.compute_seed_key(
319 target_sd, root_key_id, root_key_object, l1_gkid
322 seed_key.key, None, gkid, seed_key.hash_algorithm, root_key_id
325 # Compute the L2 seed key to return.
326 seed_key = self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid)
328 next_older_seed_key = None
330 # From the current seed key can be derived only those seed keys that
331 # share its L1 and L2 indices. To be able to derive previous seed
332 # keys with older L1 indices, the caller must be given the next
333 # older L1 seed key as well.
334 next_older_l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx - 1, -1)
335 next_older_seed_key = self.compute_seed_key(
336 target_sd, root_key_id, root_key_object, next_older_l1_gkid
343 seed_key.hash_algorithm,
350 target_sd: bytes, # An NDR‐encoded valid security descriptor in self‐relative format.
351 root_key_id: Optional[misc.GUID],
353 current_gkid: Optional[Gkid] = None,
355 if current_gkid is None:
356 current_gkid = self.current_gkid()
358 root_key_specified = root_key_id is not None
359 self.validate_get_key_request(gkid, current_gkid, root_key_specified)
361 root_key_object, root_key_id = self.get_root_key_object(
362 samdb, root_key_id, gkid
365 return self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid)
367 def get_root_key_data(self, root_key: RootKey) -> Tuple[bytes, Algorithm]:
368 version = root_key.get("msKds-Version", idx=0)
369 self.assertEqual(b"1", version)
371 algorithm_id = root_key.get("msKds-KDFAlgorithmID", idx=0)
372 self.assertEqual(b"SP800_108_CTR_HMAC", algorithm_id)
374 hash_algorithm = Algorithm.from_kdf_parameters(
375 root_key.get("msKds-KDFParam", idx=0)
378 root_key_data = root_key.get("msKds-RootKeyData", idx=0)
379 self.assertIsInstance(root_key_data, bytes)
381 return root_key_data, hash_algorithm
383 def compute_seed_key(
386 root_key_id: misc.GUID,
390 target_gkid_type = target_gkid.gkid_type()
393 (GkidType.L1_SEED_KEY, GkidType.L2_SEED_KEY),
394 f"unexpected attempt to compute {target_gkid_type.description()}",
397 root_key_data, algorithm = self.get_root_key_data(root_key)
398 root_key_id_bytes = ndr_pack(root_key_id)
400 hash_algorithm = algorithm.algorithm()
402 # Derive the L0 seed key.
403 gkid = Gkid.l0_seed_key(target_gkid.l0_idx)
404 key = self.derive_key(root_key_data, root_key_id_bytes, hash_algorithm, gkid)
406 # Derive the L1 seed key.
408 gkid = gkid.derive_l1_seed_key()
409 key = self.derive_key(
410 key, root_key_id_bytes, hash_algorithm, gkid, target_sd=target_sd
413 while gkid.l1_idx != target_gkid.l1_idx:
414 gkid = gkid.derive_l1_seed_key()
415 key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid)
417 # Derive the L2 seed key.
418 while gkid != target_gkid:
419 gkid = gkid.derive_l2_seed_key()
420 key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid)
422 return GroupKey(key, gkid, algorithm, root_key_id)
427 root_key_id_bytes: bytes,
428 hash_algorithm: hashes.HashAlgorithm,
431 target_sd: Optional[bytes] = None,
433 def u32_bytes(n: int) -> bytes:
434 return (n & 0xFFFF_FFFF).to_bytes(length=4, byteorder="little")
438 + u32_bytes(gkid.l0_idx)
439 + u32_bytes(gkid.l1_idx)
440 + u32_bytes(gkid.l2_idx)
442 if target_sd is not None:
444 return self.kdf(hash_algorithm, key, context)
448 hash_algorithm: hashes.HashAlgorithm,
453 len_in_bytes=KEY_LEN_BYTES,
455 label = label.encode("utf-16-le") + b"\x00\x00"
457 algorithm=hash_algorithm,
458 mode=Mode.CounterMode,
462 location=CounterLocation.BeforeFixed,
466 backend=default_backend(),
468 return kdf.derive(key)
470 def get_config_dn(self, samdb: SamDB, dn: str) -> ldb.Dn:
471 config_dn = samdb.get_config_basedn()
472 config_dn.add_child(dn)
475 def get_server_config_dn(self, samdb: SamDB) -> ldb.Dn:
476 # [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution
477 # Service”, and “CN=SID Key Server Configuration” for “CN=Group Key
478 # Distribution Service Server Configuration”.
479 return self.get_config_dn(
481 "CN=Group Key Distribution Service Server Configuration,"
482 "CN=Server Configuration,"
483 "CN=Group Key Distribution Service,"
487 def get_root_key_container_dn(self, samdb: SamDB) -> ldb.Dn:
488 # [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution Service”.
489 return self.get_config_dn(
491 "CN=Master Root Keys,CN=Group Key Distribution Service,CN=Services",
499 use_start_time: Optional[Union[datetime.datetime, NtTime]] = None,
500 hash_algorithm: Optional[Algorithm] = Algorithm.SHA512,
501 guid: Optional[misc.GUID] = None,
502 data: Optional[bytes] = None,
504 # [MS-GKDI] 3.1.4.1.1, “Creating a New Root Key”, states that if the
505 # server receives a GetKey request and the root keys container in Active
506 # Directory is empty, the the server must create a new root key object
507 # based on the default Server Configuration object. Additional root keys
508 # are to be created based on either the default Server Configuration
509 # object or an updated one specifying optional configuration values.
511 guid_specified = guid is not None
512 if not guid_specified:
513 guid = misc.GUID(secrets.token_bytes(16))
516 data = secrets.token_bytes(KEY_LEN_BYTES)
518 create_time = current_nt_time = self.current_nt_time()
520 if use_start_time is None:
521 # Root keys created by Windows without the ‘-EffectiveImmediately’
522 # parameter have an effective time of exactly ten days in the
523 # future, presumably to allow time for replication.
525 # Microsoft’s documentation on creating a KDS root key, located at
526 # https://learn.microsoft.com/en-us/windows-server/security/group-managed-service-accounts/create-the-key-distribution-services-kds-root-key,
527 # claims to the contrary that domain controllers will only wait up
528 # to ten hours before allowing Group Managed Service Accounts to be
531 # The same page includes instructions for creating a root key with
532 # an effective time of ten hours in the past (for testing purposes),
533 # but I’m not sure why — the KDS will consider a key valid for use
534 # immediately after its start time has passed, without bothering to
535 # wait ten hours first. In fact, it will consider a key to be valid
536 # a full ten hours (plus clock skew) *before* its declared start
537 # time — intentional, or (conceivably) the result of an accidental
539 current_interval_start_nt_time = Gkid.from_nt_time(
542 use_start_time = NtTime(
543 current_interval_start_nt_time + KEY_CYCLE_DURATION + MAX_CLOCK_SKEW
546 if isinstance(use_start_time, datetime.datetime):
547 use_start_nt_time = nt_time_from_datetime(use_start_time)
549 self.assertIsInstance(use_start_time, int)
550 use_start_nt_time = use_start_time
552 kdf_parameters = None
553 if hash_algorithm is not None:
554 kdf_parameters = gkdi.KdfParameters()
555 kdf_parameters.hash_algorithm = hash_algorithm.value
556 kdf_parameters = ndr_pack(kdf_parameters)
558 # These are the encoded p and g values, respectively, of the “2048‐bit
559 # MODP Group with 256‐bit Prime Order Subgroup” from RFC 5114 section
562 b"\x87\xa8\xe6\x1d\xb4\xb6f<\xff\xbb\xd1\x9ce\x19Y\x99\x8c\xee\xf6\x08"
563 b"f\r\xd0\xf2],\xee\xd4C^;\x00\xe0\r\xf8\xf1\xd6\x19W\xd4\xfa\xf7\xdfE"
564 b"a\xb2\xaa0\x16\xc3\xd9\x114\to\xaa;\xf4)m\x83\x0e\x9a|"
565 b" \x9e\x0cd\x97Qz\xbd"
566 b'Z\x8a\x9d0k\xcfg\xed\x91\xf9\xe6r[GX\xc0"\xe0\xb1\xefBu\xbf{l[\xfc\x11'
567 b"\xd4_\x90\x88\xb9A\xf5N\xb1\xe5\x9b\xb8\xbc9\xa0\xbf\x120\x7f\\O\xdbp\xc5"
568 b"\x81\xb2?v\xb6:\xca\xe1\xca\xa6\xb7\x90-RRg5H\x8a\x0e\xf1<m\x9aQ\xbf\xa4\xab"
569 b":\xd84w\x96RM\x8e\xf6\xa1g\xb5\xa4\x18%\xd9g\xe1D\xe5\x14\x05d%"
570 b"\x1c\xca\xcb\x83\xe6\xb4\x86\xf6\xb3\xca?yqP`&\xc0\xb8W\xf6\x89\x96(V"
571 b"\xde\xd4\x01\n\xbd\x0b\xe6!\xc3\xa3\x96\nT\xe7\x10\xc3u\xf2cu\xd7\x01A\x03"
572 b"\xa4\xb5C0\xc1\x98\xaf\x12a\x16\xd2'n\x11q_i8w\xfa\xd7\xef\t\xca\xdb\tJ\xe9"
576 b"?\xb3,\x9bs\x13M\x0b.wPf`\xed\xbdHL\xa7\xb1\x8f!\xef T\x07\xf4y:"
577 b"\x1a\x0b\xa1%\x10\xdb\xc1Pw\xbeF?\xffO\xedJ\xac\x0b\xb5U\xbe:l\x1b\x0ckG\xb1"
578 b"\xbc7s\xbf~\x8cob\x90\x12(\xf8\xc2\x8c\xbb\x18\xa5Z\xe3\x13A\x00\ne"
579 b"\x01\x96\xf91\xc7zW\xf2\xdd\xf4c\xe5\xe9\xec\x14Kw}\xe6*\xaa\xb8\xa8b"
580 b"\x8a\xc3v\xd2\x82\xd6\xed8d\xe6y\x82B\x8e\xbc\x83\x1d\x144\x8fo/\x91\x93"
581 b"\xb5\x04Z\xf2vqd\xe1\xdf\xc9g\xc1\xfb?.U\xa4\xbd\x1b\xff\xe8;\x9c\x80"
582 b"\xd0R\xb9\x85\xd1\x82\xea\n\xdb*;s\x13\xd3\xfe\x14\xc8HK\x1e\x05%\x88\xb9"
583 b"\xb7\xd2\xbb\xd2\xdf\x01a\x99\xec\xd0n\x15W\xcd\t\x15\xb35;\xbbd\xe0\xec7"
584 b"\x7f\xd0(7\r\xf9+R\xc7\x89\x14(\xcd\xc6~\xb6\x18KR=\x1d\xb2F\xc3/c\x07\x84"
585 b"\x90\xf0\x0e\xf8\xd6G\xd1H\xd4yTQ^#'\xcf\xef\x98\xc5\x82fKL\x0fl\xc4\x16Y"
587 self.assertEqual(len(field_order), len(generator))
588 key_length = len(field_order)
590 ffc_dh_parameters = gkdi.FfcDhParameters()
591 ffc_dh_parameters.field_order = list(field_order)
592 ffc_dh_parameters.generator = list(generator)
593 ffc_dh_parameters.key_length = key_length
594 ffc_dh_parameters = ndr_pack(ffc_dh_parameters)
596 root_key_dn = self.get_root_key_container_dn(samdb)
597 root_key_dn.add_child(f"CN={guid}")
599 # Avoid deleting root key objects without subsequently restarting the
600 # Microsoft Key Distribution Service. This service will keep its root
601 # key cached even after the corresponding AD object has been deleted,
602 # breaking later tests that try to look up the root key object.
606 "objectClass": "msKds-ProvRootKey",
607 "msKds-RootKeyData": data,
608 "msKds-CreateTime": str(create_time),
609 "msKds-UseStartTime": str(use_start_nt_time),
610 "msKds-DomainID": str(domain_dn),
611 "msKds-Version": "1", # comes from Server Configuration object.
612 "msKds-KDFAlgorithmID": (
614 ), # comes from Server Configuration.
615 "msKds-SecretAgreementAlgorithmID": (
617 ), # comes from Server Configuration.
618 "msKds-SecretAgreementParam": (
620 ), # comes from Server Configuration.
621 "msKds-PublicKeyLength": "2048", # comes from Server Configuration.
622 "msKds-PrivateKeyLength": (
624 ), # comes from Server Configuration. [MS-GKDI] claims this defaults to ‘256’.
626 if kdf_parameters is not None:
627 details["msKds-KDFParam"] = (
628 kdf_parameters # comes from Server Configuration.
632 # A test may request that a root key have a specific GUID so that
633 # results may be reproducible. Ensure these keys are cleaned up
635 self.addCleanup(delete_force, samdb, root_key_dn)