tests/krb5: Test that root key data is the correct length in bytes
[bjacke/samba-autobuild/.git] / python / samba / tests / gkdi.py
1 #
2 # Helper classes for testing the Group Key Distribution Service.
3 #
4 # Copyright (C) Catalyst.Net Ltd 2023
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 sys.path.insert(0, "bin/python")
24 os.environ["PYTHONUNBUFFERED"] = "1"
25
26 import datetime
27 import secrets
28 from typing import Final, NewType, Optional, Tuple, Union
29
30 import ldb
31
32 from cryptography.hazmat.backends import default_backend
33 from cryptography.hazmat.primitives import hashes
34 from cryptography.hazmat.primitives.kdf.kbkdf import CounterLocation, KBKDFHMAC, Mode
35
36 from samba import (
37     HRES_E_INVALIDARG,
38     HRES_NTE_BAD_KEY,
39     HRES_NTE_NO_KEY,
40     ntstatus,
41     NTSTATUSError,
42     werror,
43 )
44 from samba.credentials import Credentials
45 from samba.dcerpc import gkdi, misc
46 from samba.gkdi import (
47     Algorithm,
48     Gkid,
49     GkidType,
50     GroupKey,
51     KEY_CYCLE_DURATION,
52     KEY_LEN_BYTES,
53     MAX_CLOCK_SKEW,
54     SeedKeyPair,
55 )
56 from samba.ndr import ndr_pack, ndr_unpack
57 from samba.nt_time import (
58     nt_time_from_datetime,
59     NtTime,
60     NtTimeDelta,
61     timedelta_from_nt_time_delta,
62 )
63 from samba.param import LoadParm
64 from samba.samdb import SamDB
65
66 from samba.tests import delete_force, TestCase
67
68
69 HResult = NewType("HResult", int)
70 RootKey = NewType("RootKey", ldb.Message)
71
72
73 ROOT_KEY_START_TIME: Final = NtTime(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
74
75
76 class GetKeyError(Exception):
77     def __init__(self, status: HResult, message: str):
78         super().__init__(status, message)
79
80
81 class GkdiBaseTest(TestCase):
82     # This is the NDR‐encoded security descriptor O:SYD:(A;;FRFW;;;S-1-5-9).
83     gmsa_sd = (
84         b"\x01\x00\x04\x800\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
85         b"\x14\x00\x00\x00\x02\x00\x1c\x00\x01\x00\x00\x00\x00\x00\x14\x00"
86         b"\x9f\x01\x12\x00\x01\x01\x00\x00\x00\x00\x00\x05\t\x00\x00\x00"
87         b"\x01\x01\x00\x00\x00\x00\x00\x05\x12\x00\x00\x00"
88     )
89
90     @staticmethod
91     def current_time(offset: Optional[datetime.timedelta] = None) -> datetime.datetime:
92         if offset is None:
93             # Allow for clock skew.
94             offset = timedelta_from_nt_time_delta(MAX_CLOCK_SKEW)
95
96         current_time = datetime.datetime.now(tz=datetime.timezone.utc)
97         return current_time + offset
98
99     def current_nt_time(self, offset: Optional[datetime.timedelta] = None) -> NtTime:
100         return nt_time_from_datetime(self.current_time(offset))
101
102     def current_gkid(self, offset: Optional[datetime.timedelta] = None) -> Gkid:
103         return Gkid.from_nt_time(self.current_nt_time(offset))
104
105     def gkdi_connect(
106         self, host: str, lp: LoadParm, server_creds: Credentials
107     ) -> gkdi.gkdi:
108         try:
109             return gkdi.gkdi(f"ncacn_ip_tcp:{host}[seal]", lp, server_creds)
110         except NTSTATUSError as err:
111             if err.args[0] == ntstatus.NT_STATUS_PORT_UNREACHABLE:
112                 self.fail(
113                     "Try starting the Microsoft Key Distribution Service (KdsSvc).\n"
114                     "In PowerShell, run:\n\tStart-Service -Name KdsSvc"
115                 )
116
117             raise
118
119     def rpc_get_key(
120         self,
121         conn: gkdi.gkdi,
122         target_sd: bytes,
123         root_key_id: Optional[misc.GUID],
124         gkid: Gkid,
125     ) -> SeedKeyPair:
126         out_len, out, result = conn.GetKey(
127             list(target_sd), root_key_id, gkid.l0_idx, gkid.l1_idx, gkid.l2_idx
128         )
129         result_code, result_string = result
130         if (
131             root_key_id is None
132             and result_code & 0xFFFF == werror.WERR_TOO_MANY_OPEN_FILES
133         ):
134             self.fail(
135                 "The server has given up selecting a root key because there are too"
136                 " many keys (more than 1000) in the Master Root Keys container. Delete"
137                 " some root keys and try again."
138             )
139         if result != (0, None):
140             raise GetKeyError(result_code, result_string)
141         self.assertEqual(len(out), out_len, "output len mismatch")
142
143         envelope = ndr_unpack(gkdi.GroupKeyEnvelope, bytes(out))
144
145         gkid = Gkid(envelope.l0_index, envelope.l1_index, envelope.l2_index)
146         l1_key = bytes(envelope.l1_key) if envelope.l1_key else None
147         l2_key = bytes(envelope.l2_key) if envelope.l2_key else None
148
149         hash_algorithm = Algorithm.from_kdf_parameters(bytes(envelope.kdf_parameters))
150
151         root_key_id = envelope.root_key_id
152
153         return SeedKeyPair(l1_key, l2_key, gkid, hash_algorithm, root_key_id)
154
155     def get_root_key_object(
156         self, samdb: SamDB, root_key_id: Optional[misc.GUID], gkid: Gkid
157     ) -> Tuple[RootKey, misc.GUID]:
158         """Return a root key object and its corresponding GUID.
159
160         *root_key_id* specifies the GUID of the root key object to return. It
161         can be ``None`` to indicate that the selected key should be the most
162         recently created key starting not after the time indicated by *gkid*.
163
164         Bear in mind as that the Microsoft Key Distribution Service caches root
165         keys, the most recently created key might not be the one that Windows
166         chooses."""
167
168         root_key_attrs = [
169             "cn",
170             "msKds-CreateTime",
171             "msKds-KDFAlgorithmID",
172             "msKds-KDFParam",
173             "msKds-RootKeyData",
174             "msKds-UseStartTime",
175             "msKds-Version",
176         ]
177
178         gkid_start_nt_time = gkid.start_nt_time()
179
180         exact_key_specified = root_key_id is not None
181         if exact_key_specified:
182             root_key_dn = self.get_root_key_container_dn(samdb)
183             root_key_dn.add_child(f"CN={root_key_id}")
184
185             try:
186                 root_key_res = samdb.search(
187                     root_key_dn, scope=ldb.SCOPE_BASE, attrs=root_key_attrs
188                 )
189             except ldb.LdbError as err:
190                 if err.args[0] == ldb.ERR_NO_SUCH_OBJECT:
191                     raise GetKeyError(HRES_NTE_NO_KEY, "no such root key exists")
192
193                 raise
194
195             root_key_object = root_key_res[0]
196         else:
197             root_keys = samdb.search(
198                 self.get_root_key_container_dn(samdb),
199                 scope=ldb.SCOPE_SUBTREE,
200                 expression=f"(msKds-UseStartTime<={gkid_start_nt_time})",
201                 attrs=root_key_attrs,
202             )
203             if not root_keys:
204                 raise GetKeyError(
205                     HRES_NTE_NO_KEY, "no root keys exist at specified time"
206                 )
207
208             def root_key_create_time(key: RootKey) -> NtTime:
209                 create_time = key.get("msKds-CreateTime", idx=0)
210                 if create_time is None:
211                     return NtTime(0)
212
213                 return NtTime(int(create_time))
214
215             root_key_object = max(root_keys, key=root_key_create_time)
216
217             root_key_cn = root_key_object.get("cn", idx=0)
218             self.assertIsNotNone(root_key_cn)
219             root_key_id = misc.GUID(root_key_cn)
220
221         use_start_nt_time = NtTime(
222             int(root_key_object.get("msKds-UseStartTime", idx=0))
223         )
224         if use_start_nt_time == 0:
225             raise GetKeyError(HRES_NTE_BAD_KEY, "root key effective time is 0")
226         use_start_nt_time = NtTime(
227             use_start_nt_time - NtTimeDelta(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
228         )
229
230         if exact_key_specified and not (0 <= use_start_nt_time <= gkid_start_nt_time):
231             raise GetKeyError(HRES_E_INVALIDARG, "root key is not yet valid")
232
233         return root_key_object, root_key_id
234
235     def validate_get_key_request(
236         self, gkid: Gkid, current_gkid: Gkid, root_key_specified: bool
237     ) -> None:
238         if gkid > current_gkid:
239             raise GetKeyError(
240                 HRES_E_INVALIDARG, "invalid request for a key from the future"
241             )
242
243         gkid_type = gkid.gkid_type()
244         if gkid_type is GkidType.DEFAULT:
245             derived_from = (
246                 " derived from the specified root key" if root_key_specified else ""
247             )
248             raise NotImplementedError(
249                 f"The latest group key{derived_from} is being requested."
250             )
251
252         if gkid_type is not GkidType.L2_SEED_KEY:
253             raise GetKeyError(
254                 HRES_E_INVALIDARG, f"invalid request for {gkid_type.description()}"
255             )
256
257     def get_key(
258         self,
259         samdb: SamDB,
260         target_sd: bytes,  # An NDR‐encoded valid security descriptor in self‐relative format.
261         root_key_id: Optional[misc.GUID],
262         gkid: Gkid,
263         *,
264         root_key_id_hint: Optional[misc.GUID] = None,
265         current_gkid: Optional[Gkid] = None,
266     ) -> SeedKeyPair:
267         """Emulate the ISDKey.GetKey() RPC method.
268
269         When passed a NULL root key ID, GetKey() may use a cached root key
270         rather than picking the most recently created applicable key as the
271         documentation implies. If it’s important to arrive at the same result as
272         Windows, pass a GUID in the *root_key_id_hint* parameter to specify a
273         particular root key to use."""
274
275         if current_gkid is None:
276             current_gkid = self.current_gkid()
277
278         root_key_specified = root_key_id is not None
279         if root_key_specified:
280             self.assertIsNone(
281                 root_key_id_hint, "don’t provide both root key ID parameters"
282             )
283
284         self.validate_get_key_request(gkid, current_gkid, root_key_specified)
285
286         root_key_object, root_key_id = self.get_root_key_object(
287             samdb, root_key_id if root_key_specified else root_key_id_hint, gkid
288         )
289
290         if root_key_specified:
291             if gkid.l0_idx < current_gkid.l0_idx:
292                 # All of the seed keys with an L0 index less than the current L0
293                 # index are from the past and thus are safe to return. If the
294                 # caller has requested a specific seed key with a past L0 index,
295                 # return the L1 seed key (L0, 31, −1), from which any L1 or L2
296                 # seed key having that L0 index can be derived.
297                 l1_gkid = Gkid(gkid.l0_idx, 31, -1)
298                 seed_key = self.compute_seed_key(
299                     target_sd, root_key_id, root_key_object, l1_gkid
300                 )
301                 return SeedKeyPair(
302                     seed_key.key,
303                     None,
304                     Gkid(gkid.l0_idx, 31, 31),
305                     seed_key.hash_algorithm,
306                     root_key_id,
307                 )
308
309             # All of the previous seed keys with an L0 index equal to the
310             # current L0 index can be derived from the current seed key or from
311             # the next older L1 seed key.
312             gkid = current_gkid
313
314         if gkid.l2_idx == 31:
315             # The current seed key, and all previous seed keys with that same L0
316             # index, can be derived from the L1 seed key (L0, L1, 31).
317             l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx, -1)
318             seed_key = self.compute_seed_key(
319                 target_sd, root_key_id, root_key_object, l1_gkid
320             )
321             return SeedKeyPair(
322                 seed_key.key, None, gkid, seed_key.hash_algorithm, root_key_id
323             )
324
325         # Compute the L2 seed key to return.
326         seed_key = self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid)
327
328         next_older_seed_key = None
329         if gkid.l1_idx != 0:
330             # From the current seed key can be derived only those seed keys that
331             # share its L1 and L2 indices. To be able to derive previous seed
332             # keys with older L1 indices, the caller must be given the next
333             # older L1 seed key as well.
334             next_older_l1_gkid = Gkid(gkid.l0_idx, gkid.l1_idx - 1, -1)
335             next_older_seed_key = self.compute_seed_key(
336                 target_sd, root_key_id, root_key_object, next_older_l1_gkid
337             ).key
338
339         return SeedKeyPair(
340             next_older_seed_key,
341             seed_key.key,
342             gkid,
343             seed_key.hash_algorithm,
344             root_key_id,
345         )
346
347     def get_key_exact(
348         self,
349         samdb: SamDB,
350         target_sd: bytes,  # An NDR‐encoded valid security descriptor in self‐relative format.
351         root_key_id: Optional[misc.GUID],
352         gkid: Gkid,
353         current_gkid: Optional[Gkid] = None,
354     ) -> GroupKey:
355         if current_gkid is None:
356             current_gkid = self.current_gkid()
357
358         root_key_specified = root_key_id is not None
359         self.validate_get_key_request(gkid, current_gkid, root_key_specified)
360
361         root_key_object, root_key_id = self.get_root_key_object(
362             samdb, root_key_id, gkid
363         )
364
365         return self.compute_seed_key(target_sd, root_key_id, root_key_object, gkid)
366
367     def get_root_key_data(self, root_key: RootKey) -> Tuple[bytes, Algorithm]:
368         version = root_key.get("msKds-Version", idx=0)
369         self.assertEqual(b"1", version)
370
371         algorithm_id = root_key.get("msKds-KDFAlgorithmID", idx=0)
372         self.assertEqual(b"SP800_108_CTR_HMAC", algorithm_id)
373
374         hash_algorithm = Algorithm.from_kdf_parameters(
375             root_key.get("msKds-KDFParam", idx=0)
376         )
377
378         root_key_data = root_key.get("msKds-RootKeyData", idx=0)
379         self.assertIsInstance(root_key_data, bytes)
380
381         return root_key_data, hash_algorithm
382
383     def compute_seed_key(
384         self,
385         target_sd: bytes,
386         root_key_id: misc.GUID,
387         root_key: RootKey,
388         target_gkid: Gkid,
389     ) -> GroupKey:
390         target_gkid_type = target_gkid.gkid_type()
391         self.assertIn(
392             target_gkid_type,
393             (GkidType.L1_SEED_KEY, GkidType.L2_SEED_KEY),
394             f"unexpected attempt to compute {target_gkid_type.description()}",
395         )
396
397         root_key_data, algorithm = self.get_root_key_data(root_key)
398         root_key_id_bytes = ndr_pack(root_key_id)
399
400         hash_algorithm = algorithm.algorithm()
401
402         # Derive the L0 seed key.
403         gkid = Gkid.l0_seed_key(target_gkid.l0_idx)
404         key = self.derive_key(root_key_data, root_key_id_bytes, hash_algorithm, gkid)
405
406         # Derive the L1 seed key.
407
408         gkid = gkid.derive_l1_seed_key()
409         key = self.derive_key(
410             key, root_key_id_bytes, hash_algorithm, gkid, target_sd=target_sd
411         )
412
413         while gkid.l1_idx != target_gkid.l1_idx:
414             gkid = gkid.derive_l1_seed_key()
415             key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid)
416
417         # Derive the L2 seed key.
418         while gkid != target_gkid:
419             gkid = gkid.derive_l2_seed_key()
420             key = self.derive_key(key, root_key_id_bytes, hash_algorithm, gkid)
421
422         return GroupKey(key, gkid, algorithm, root_key_id)
423
424     def derive_key(
425         self,
426         key: bytes,
427         root_key_id_bytes: bytes,
428         hash_algorithm: hashes.HashAlgorithm,
429         gkid: Gkid,
430         *,
431         target_sd: Optional[bytes] = None,
432     ) -> bytes:
433         def u32_bytes(n: int) -> bytes:
434             return (n & 0xFFFF_FFFF).to_bytes(length=4, byteorder="little")
435
436         context = (
437             root_key_id_bytes
438             + u32_bytes(gkid.l0_idx)
439             + u32_bytes(gkid.l1_idx)
440             + u32_bytes(gkid.l2_idx)
441         )
442         if target_sd is not None:
443             context += target_sd
444         return self.kdf(hash_algorithm, key, context)
445
446     def kdf(
447         self,
448         hash_algorithm: hashes.HashAlgorithm,
449         key: bytes,
450         context: bytes,
451         *,
452         label="KDS service",
453         len_in_bytes=KEY_LEN_BYTES,
454     ) -> bytes:
455         label = label.encode("utf-16-le") + b"\x00\x00"
456         kdf = KBKDFHMAC(
457             algorithm=hash_algorithm,
458             mode=Mode.CounterMode,
459             length=len_in_bytes,
460             rlen=4,
461             llen=4,
462             location=CounterLocation.BeforeFixed,
463             label=label,
464             context=context,
465             fixed=None,
466             backend=default_backend(),
467         )
468         return kdf.derive(key)
469
470     def get_config_dn(self, samdb: SamDB, dn: str) -> ldb.Dn:
471         config_dn = samdb.get_config_basedn()
472         config_dn.add_child(dn)
473         return config_dn
474
475     def get_server_config_dn(self, samdb: SamDB) -> ldb.Dn:
476         # [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution
477         # Service”, and “CN=SID Key Server Configuration” for “CN=Group Key
478         # Distribution Service Server Configuration”.
479         return self.get_config_dn(
480             samdb,
481             "CN=Group Key Distribution Service Server Configuration,"
482             "CN=Server Configuration,"
483             "CN=Group Key Distribution Service,"
484             "CN=Services",
485         )
486
487     def get_root_key_container_dn(self, samdb: SamDB) -> ldb.Dn:
488         # [MS-GKDI] has “CN=Sid Key Service” for “CN=Group Key Distribution Service”.
489         return self.get_config_dn(
490             samdb,
491             "CN=Master Root Keys,CN=Group Key Distribution Service,CN=Services",
492         )
493
494     def create_root_key(
495         self,
496         samdb: SamDB,
497         domain_dn: ldb.Dn,
498         *,
499         use_start_time: Optional[Union[datetime.datetime, NtTime]] = None,
500         hash_algorithm: Optional[Algorithm] = Algorithm.SHA512,
501         guid: Optional[misc.GUID] = None,
502         data: Optional[bytes] = None,
503     ) -> misc.GUID:
504         # [MS-GKDI] 3.1.4.1.1, “Creating a New Root Key”, states that if the
505         # server receives a GetKey request and the root keys container in Active
506         # Directory is empty, the the server must create a new root key object
507         # based on the default Server Configuration object. Additional root keys
508         # are to be created based on either the default Server Configuration
509         # object or an updated one specifying optional configuration values.
510
511         guid_specified = guid is not None
512         if not guid_specified:
513             guid = misc.GUID(secrets.token_bytes(16))
514
515         if data is None:
516             data = secrets.token_bytes(KEY_LEN_BYTES)
517
518         create_time = current_nt_time = self.current_nt_time()
519
520         if use_start_time is None:
521             # Root keys created by Windows without the ‘-EffectiveImmediately’
522             # parameter have an effective time of exactly ten days in the
523             # future, presumably to allow time for replication.
524             #
525             # Microsoft’s documentation on creating a KDS root key, located at
526             # https://learn.microsoft.com/en-us/windows-server/security/group-managed-service-accounts/create-the-key-distribution-services-kds-root-key,
527             # claims to the contrary that domain controllers will only wait up
528             # to ten hours before allowing Group Managed Service Accounts to be
529             # created.
530             #
531             # The same page includes instructions for creating a root key with
532             # an effective time of ten hours in the past (for testing purposes),
533             # but I’m not sure why — the KDS will consider a key valid for use
534             # immediately after its start time has passed, without bothering to
535             # wait ten hours first. In fact, it will consider a key to be valid
536             # a full ten hours (plus clock skew) *before* its declared start
537             # time — intentional, or (conceivably) the result of an accidental
538             # negation?
539             current_interval_start_nt_time = Gkid.from_nt_time(
540                 current_nt_time
541             ).start_nt_time()
542             use_start_time = NtTime(
543                 current_interval_start_nt_time + KEY_CYCLE_DURATION + MAX_CLOCK_SKEW
544             )
545
546         if isinstance(use_start_time, datetime.datetime):
547             use_start_nt_time = nt_time_from_datetime(use_start_time)
548         else:
549             self.assertIsInstance(use_start_time, int)
550             use_start_nt_time = use_start_time
551
552         kdf_parameters = None
553         if hash_algorithm is not None:
554             kdf_parameters = gkdi.KdfParameters()
555             kdf_parameters.hash_algorithm = hash_algorithm.value
556             kdf_parameters = ndr_pack(kdf_parameters)
557
558         # These are the encoded p and g values, respectively, of the “2048‐bit
559         # MODP Group with 256‐bit Prime Order Subgroup” from RFC 5114 section
560         # 2.3.
561         field_order = (
562             b"\x87\xa8\xe6\x1d\xb4\xb6f<\xff\xbb\xd1\x9ce\x19Y\x99\x8c\xee\xf6\x08"
563             b"f\r\xd0\xf2],\xee\xd4C^;\x00\xe0\r\xf8\xf1\xd6\x19W\xd4\xfa\xf7\xdfE"
564             b"a\xb2\xaa0\x16\xc3\xd9\x114\to\xaa;\xf4)m\x83\x0e\x9a|"
565             b" \x9e\x0cd\x97Qz\xbd"
566             b'Z\x8a\x9d0k\xcfg\xed\x91\xf9\xe6r[GX\xc0"\xe0\xb1\xefBu\xbf{l[\xfc\x11'
567             b"\xd4_\x90\x88\xb9A\xf5N\xb1\xe5\x9b\xb8\xbc9\xa0\xbf\x120\x7f\\O\xdbp\xc5"
568             b"\x81\xb2?v\xb6:\xca\xe1\xca\xa6\xb7\x90-RRg5H\x8a\x0e\xf1<m\x9aQ\xbf\xa4\xab"
569             b":\xd84w\x96RM\x8e\xf6\xa1g\xb5\xa4\x18%\xd9g\xe1D\xe5\x14\x05d%"
570             b"\x1c\xca\xcb\x83\xe6\xb4\x86\xf6\xb3\xca?yqP`&\xc0\xb8W\xf6\x89\x96(V"
571             b"\xde\xd4\x01\n\xbd\x0b\xe6!\xc3\xa3\x96\nT\xe7\x10\xc3u\xf2cu\xd7\x01A\x03"
572             b"\xa4\xb5C0\xc1\x98\xaf\x12a\x16\xd2'n\x11q_i8w\xfa\xd7\xef\t\xca\xdb\tJ\xe9"
573             b"\x1e\x1a\x15\x97"
574         )
575         generator = (
576             b"?\xb3,\x9bs\x13M\x0b.wPf`\xed\xbdHL\xa7\xb1\x8f!\xef T\x07\xf4y:"
577             b"\x1a\x0b\xa1%\x10\xdb\xc1Pw\xbeF?\xffO\xedJ\xac\x0b\xb5U\xbe:l\x1b\x0ckG\xb1"
578             b"\xbc7s\xbf~\x8cob\x90\x12(\xf8\xc2\x8c\xbb\x18\xa5Z\xe3\x13A\x00\ne"
579             b"\x01\x96\xf91\xc7zW\xf2\xdd\xf4c\xe5\xe9\xec\x14Kw}\xe6*\xaa\xb8\xa8b"
580             b"\x8a\xc3v\xd2\x82\xd6\xed8d\xe6y\x82B\x8e\xbc\x83\x1d\x144\x8fo/\x91\x93"
581             b"\xb5\x04Z\xf2vqd\xe1\xdf\xc9g\xc1\xfb?.U\xa4\xbd\x1b\xff\xe8;\x9c\x80"
582             b"\xd0R\xb9\x85\xd1\x82\xea\n\xdb*;s\x13\xd3\xfe\x14\xc8HK\x1e\x05%\x88\xb9"
583             b"\xb7\xd2\xbb\xd2\xdf\x01a\x99\xec\xd0n\x15W\xcd\t\x15\xb35;\xbbd\xe0\xec7"
584             b"\x7f\xd0(7\r\xf9+R\xc7\x89\x14(\xcd\xc6~\xb6\x18KR=\x1d\xb2F\xc3/c\x07\x84"
585             b"\x90\xf0\x0e\xf8\xd6G\xd1H\xd4yTQ^#'\xcf\xef\x98\xc5\x82fKL\x0fl\xc4\x16Y"
586         )
587         self.assertEqual(len(field_order), len(generator))
588         key_length = len(field_order)
589
590         ffc_dh_parameters = gkdi.FfcDhParameters()
591         ffc_dh_parameters.field_order = list(field_order)
592         ffc_dh_parameters.generator = list(generator)
593         ffc_dh_parameters.key_length = key_length
594         ffc_dh_parameters = ndr_pack(ffc_dh_parameters)
595
596         root_key_dn = self.get_root_key_container_dn(samdb)
597         root_key_dn.add_child(f"CN={guid}")
598
599         # Avoid deleting root key objects without subsequently restarting the
600         # Microsoft Key Distribution Service. This service will keep its root
601         # key cached even after the corresponding AD object has been deleted,
602         # breaking later tests that try to look up the root key object.
603
604         details = {
605             "dn": root_key_dn,
606             "objectClass": "msKds-ProvRootKey",
607             "msKds-RootKeyData": data,
608             "msKds-CreateTime": str(create_time),
609             "msKds-UseStartTime": str(use_start_nt_time),
610             "msKds-DomainID": str(domain_dn),
611             "msKds-Version": "1",  # comes from Server Configuration object.
612             "msKds-KDFAlgorithmID": (
613                 "SP800_108_CTR_HMAC"
614             ),  # comes from Server Configuration.
615             "msKds-SecretAgreementAlgorithmID": (
616                 "DH"
617             ),  # comes from Server Configuration.
618             "msKds-SecretAgreementParam": (
619                 ffc_dh_parameters
620             ),  # comes from Server Configuration.
621             "msKds-PublicKeyLength": "2048",  # comes from Server Configuration.
622             "msKds-PrivateKeyLength": (
623                 "512"
624             ),  # comes from Server Configuration. [MS-GKDI] claims this defaults to ‘256’.
625         }
626         if kdf_parameters is not None:
627             details["msKds-KDFParam"] = (
628                 kdf_parameters  # comes from Server Configuration.
629             )
630
631         if guid_specified:
632             # A test may request that a root key have a specific GUID so that
633             # results may be reproducible. Ensure these keys are cleaned up
634             # afterwards.
635             self.addCleanup(delete_force, samdb, root_key_dn)
636         samdb.add(details)
637
638         return guid