)
from samba.ndr import ndr_pack, ndr_unpack
from samba.nt_time import (
+ datetime_from_nt_time,
nt_time_from_datetime,
NtTime,
NtTimeDelta,
ROOT_KEY_START_TIME = NtTime(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
+DSDB_GMSA_TIME_OPAQUE = "dsdb_gmsa_time_opaque"
+
class GetKeyError(Exception):
def __init__(self, status: HResult, message: str):
b"\x01\x01\x00\x00\x00\x00\x00\x05\x12\x00\x00\x00"
)
- @staticmethod
- def current_time(offset: Optional[datetime.timedelta] = None) -> datetime.datetime:
- current_time = datetime.datetime.now(tz=datetime.timezone.utc)
+ def set_db_time(self, samdb: SamDB, time: Optional[NtTime]) -> None:
+ samdb.set_opaque(DSDB_GMSA_TIME_OPAQUE, time)
+
+ def get_db_time(self, samdb: SamDB) -> Optional[NtTime]:
+ return samdb.get_opaque(DSDB_GMSA_TIME_OPAQUE)
+
+ def current_time(
+ self, samdb: SamDB, *, offset: Optional[datetime.timedelta] = None
+ ) -> datetime.datetime:
+ now = self.get_db_time(samdb)
+ if now is None:
+ current_time = datetime.datetime.now(tz=datetime.timezone.utc)
+ else:
+ current_time = datetime_from_nt_time(now)
if offset is not None:
current_time += offset
return current_time
- def current_nt_time(self, offset: Optional[datetime.timedelta] = None) -> NtTime:
- return nt_time_from_datetime(self.current_time(offset))
+ def current_nt_time(
+ self, samdb: SamDB, *, offset: Optional[datetime.timedelta] = None
+ ) -> NtTime:
+ return nt_time_from_datetime(self.current_time(samdb, offset=offset))
- def current_gkid(self, offset: Optional[datetime.timedelta] = None) -> Gkid:
+ def current_gkid(
+ self, samdb: SamDB, *, offset: Optional[datetime.timedelta] = None
+ ) -> Gkid:
if offset is None:
# Allow for clock skew.
offset = timedelta_from_nt_time_delta(MAX_CLOCK_SKEW)
- return Gkid.from_nt_time(self.current_nt_time(offset))
+ return Gkid.from_nt_time(self.current_nt_time(samdb, offset=offset))
def gkdi_connect(
self, host: str, lp: LoadParm, server_creds: Credentials
particular root key to use."""
if current_gkid is None:
- current_gkid = self.current_gkid()
+ current_gkid = self.current_gkid(samdb)
root_key_specified = root_key_id is not None
if root_key_specified:
current_gkid: Optional[Gkid] = None,
) -> GroupKey:
if current_gkid is None:
- current_gkid = self.current_gkid()
+ current_gkid = self.current_gkid(samdb)
root_key_specified = root_key_id is not None
self.validate_get_key_request(gkid, current_gkid, root_key_specified)
samdb,
domain_dn,
current_nt_time=self.current_nt_time(
+ samdb,
# Allow for clock skew.
- timedelta_from_nt_time_delta(MAX_CLOCK_SKEW)
+ offset=timedelta_from_nt_time_delta(MAX_CLOCK_SKEW),
),
use_start_time=use_start_time,
hash_algorithm=hash_algorithm,
# It actually doesn’t matter what we specify for the L1 and L2 indices.
# We’ll get the same result regardless — they just cannot specify a key
# from the future.
- current_gkid = self.current_gkid()
+ current_gkid = self.current_gkid(self.get_samdb())
key = self.check_rpc_get_key(root_key_id, current_gkid)
self.assertEqual(current_gkid, key.gkid)
# It actually doesn’t matter what we specify for the L1 and L2 indices.
# We’ll get the same result regardless.
- previous_l0_idx = self.current_gkid().l0_idx - 1
+ previous_l0_idx = self.current_gkid(self.get_samdb()).l0_idx - 1
key = self.check_rpc_get_key(root_key_id, Gkid(previous_l0_idx, 0, 0))
# Expect to get an L1 seed key.
"""Test with the SHA1 algorithm."""
key = self.check_rpc_get_key(
self.new_root_key(hash_algorithm=Algorithm.SHA1),
- self.current_gkid(),
+ self.current_gkid(self.get_samdb()),
)
self.assertIs(Algorithm.SHA1, key.hash_algorithm)
"""Test with the SHA256 algorithm."""
key = self.check_rpc_get_key(
self.new_root_key(hash_algorithm=Algorithm.SHA256),
- self.current_gkid(),
+ self.current_gkid(self.get_samdb()),
)
self.assertIs(Algorithm.SHA256, key.hash_algorithm)
"""Test with the SHA384 algorithm."""
key = self.check_rpc_get_key(
self.new_root_key(hash_algorithm=Algorithm.SHA384),
- self.current_gkid(),
+ self.current_gkid(self.get_samdb()),
)
self.assertIs(Algorithm.SHA384, key.hash_algorithm)
"""Test with the SHA512 algorithm."""
key = self.check_rpc_get_key(
self.new_root_key(hash_algorithm=Algorithm.SHA512),
- self.current_gkid(),
+ self.current_gkid(self.get_samdb()),
)
self.assertIs(Algorithm.SHA512, key.hash_algorithm)
"""Test without a specified algorithm."""
key = self.check_rpc_get_key(
self.new_root_key(hash_algorithm=None),
- self.current_gkid(),
+ self.current_gkid(self.get_samdb()),
)
self.assertIs(Algorithm.SHA256, key.hash_algorithm)
root_key_id = self.new_root_key(use_start_time=ROOT_KEY_START_TIME)
future_gkid = self.current_gkid(
+ self.get_samdb(),
offset=timedelta_from_nt_time_delta(
NtTimeDelta(KEY_CYCLE_DURATION + MAX_CLOCK_SKEW)
)
"""Attempt to use a root key with an effective time of zero."""
root_key_id = self.new_root_key(use_start_time=NtTime(0))
- gkid = self.current_gkid()
+ gkid = self.current_gkid(self.get_samdb())
with self.assertRaises(GetKeyError) as err:
self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid)
"""Attempt to use a root key with an effective time set too low."""
root_key_id = self.new_root_key(use_start_time=NtTime(ROOT_KEY_START_TIME - 1))
- gkid = self.current_gkid()
+ gkid = self.current_gkid(self.get_samdb())
with self.assertRaises(GetKeyError) as err:
self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid)
def test_before_valid(self):
"""Attempt to use a key before it is valid."""
- gkid = self.current_gkid()
+ gkid = self.current_gkid(self.get_samdb())
valid_start_time = NtTime(
gkid.start_nt_time() + KEY_CYCLE_DURATION + MAX_CLOCK_SKEW
)
"""Attempt to use a non‐existent root key."""
root_key_id = misc.GUID(secrets.token_bytes(16))
- gkid = self.current_gkid()
+ gkid = self.current_gkid(self.get_samdb())
with self.assertRaises(GetKeyError) as err:
self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid)
"""Attempt to use a root key that is the wrong length."""
root_key_id = self.new_root_key(data=bytes(KEY_LEN_BYTES // 2))
- gkid = self.current_gkid()
+ gkid = self.current_gkid(self.get_samdb())
with self.assertRaises(GetKeyError) as err:
self.get_key(self.get_samdb(), self.gmsa_sd, root_key_id, gkid)
self.gmsa_sd,
root_key_id,
gkid,
- current_gkid=self.current_gkid(),
+ current_gkid=self.current_gkid(self.get_samdb()),
)
self.assertEqual(gkid, key.gkid)