return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
len(dsrdata))
-def _find_key(keys, rrsig):
+def _find_candidate_keys(keys, rrsig):
+ candidate_keys=[]
value = keys.get(rrsig.signer)
if value is None:
return None
for rdata in rdataset:
if rdata.algorithm == rrsig.algorithm and \
key_id(rdata) == rrsig.key_tag:
- return rdata
- return None
+ candidate_keys.append(rdata)
+ return candidate_keys
def _is_rsa(algorithm):
return algorithm in (RSAMD5, RSASHA1,
if isinstance(origin, (str, unicode)):
origin = dns.name.from_text(origin, dns.name.root)
- key = _find_key(keys, rrsig)
- if not key:
- raise ValidationFailure, 'unknown key'
-
- # For convenience, allow the rrset to be specified as a (name, rdataset)
- # tuple as well as a proper rrset
- if isinstance(rrset, tuple):
- rrname = rrset[0]
- rdataset = rrset[1]
- else:
- rrname = rrset.name
- rdataset = rrset
-
- if now is None:
- now = time.time()
- if rrsig.expiration < now:
- raise ValidationFailure, 'expired'
- if rrsig.inception > now:
- raise ValidationFailure, 'not yet valid'
-
- hash = _make_hash(rrsig.algorithm)
-
- if _is_rsa(rrsig.algorithm):
- keyptr = key.key
- (bytes,) = struct.unpack('!B', keyptr[0:1])
- keyptr = keyptr[1:]
- if bytes == 0:
- (bytes,) = struct.unpack('!H', keyptr[0:2])
- keyptr = keyptr[2:]
- rsa_e = keyptr[0:bytes]
- rsa_n = keyptr[bytes:]
- keylen = len(rsa_n) * 8
- pubkey = Crypto.PublicKey.RSA.construct(
- (Crypto.Util.number.bytes_to_long(rsa_n),
- Crypto.Util.number.bytes_to_long(rsa_e)))
- sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
- elif _is_dsa(rrsig.algorithm):
- keyptr = key.key
- (t,) = struct.unpack('!B', keyptr[0:1])
- keyptr = keyptr[1:]
- octets = 64 + t * 8
- dsa_q = keyptr[0:20]
- keyptr = keyptr[20:]
- dsa_p = keyptr[0:octets]
- keyptr = keyptr[octets:]
- dsa_g = keyptr[0:octets]
- keyptr = keyptr[octets:]
- dsa_y = keyptr[0:octets]
- pubkey = Crypto.PublicKey.DSA.construct(
- (Crypto.Util.number.bytes_to_long(dsa_y),
- Crypto.Util.number.bytes_to_long(dsa_g),
- Crypto.Util.number.bytes_to_long(dsa_p),
- Crypto.Util.number.bytes_to_long(dsa_q)))
- (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
- sig = (Crypto.Util.number.bytes_to_long(dsa_r),
- Crypto.Util.number.bytes_to_long(dsa_s))
- else:
- raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
-
- hash.update(_to_rdata(rrsig, origin)[:18])
- hash.update(rrsig.signer.to_digestable(origin))
-
- if rrsig.labels < len(rrname) - 1:
- suffix = rrname.split(rrsig.labels + 1)[1]
- rrname = dns.name.from_text('*', suffix)
- rrnamebuf = rrname.to_digestable(origin)
- rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
- rrsig.original_ttl)
- rrlist = sorted(rdataset);
- for rr in rrlist:
- hash.update(rrnamebuf)
- hash.update(rrfixed)
- rrdata = rr.to_digestable(origin)
- rrlen = struct.pack('!H', len(rrdata))
- hash.update(rrlen)
- hash.update(rrdata)
-
- digest = hash.digest()
-
- if _is_rsa(rrsig.algorithm):
- # PKCS1 algorithm identifier goop
- digest = _make_algorithm_id(rrsig.algorithm) + digest
- padlen = keylen // 8 - len(digest) - 3
- digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
- elif _is_dsa(rrsig.algorithm):
- pass
- else:
- # Raise here for code clarity; this won't actually ever happen
- # since if the algorithm is really unknown we'd already have
- # raised an exception above
- raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
+ for candidate_key in _find_candidate_keys(keys, rrsig):
+ if not candidate_key:
+ raise ValidationFailure, 'unknown key'
+
+ # For convenience, allow the rrset to be specified as a (name, rdataset)
+ # tuple as well as a proper rrset
+ if isinstance(rrset, tuple):
+ rrname = rrset[0]
+ rdataset = rrset[1]
+ else:
+ rrname = rrset.name
+ rdataset = rrset
+
+ if now is None:
+ now = time.time()
+ if rrsig.expiration < now:
+ raise ValidationFailure, 'expired'
+ if rrsig.inception > now:
+ raise ValidationFailure, 'not yet valid'
+
+ hash = _make_hash(rrsig.algorithm)
+
+ if _is_rsa(rrsig.algorithm):
+ keyptr = candidate_key.key
+ (bytes,) = struct.unpack('!B', keyptr[0:1])
+ keyptr = keyptr[1:]
+ if bytes == 0:
+ (bytes,) = struct.unpack('!H', keyptr[0:2])
+ keyptr = keyptr[2:]
+ rsa_e = keyptr[0:bytes]
+ rsa_n = keyptr[bytes:]
+ keylen = len(rsa_n) * 8
+ pubkey = Crypto.PublicKey.RSA.construct(
+ (Crypto.Util.number.bytes_to_long(rsa_n),
+ Crypto.Util.number.bytes_to_long(rsa_e)))
+ sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
+ elif _is_dsa(rrsig.algorithm):
+ keyptr = candidate_key.key
+ (t,) = struct.unpack('!B', keyptr[0:1])
+ keyptr = keyptr[1:]
+ octets = 64 + t * 8
+ dsa_q = keyptr[0:20]
+ keyptr = keyptr[20:]
+ dsa_p = keyptr[0:octets]
+ keyptr = keyptr[octets:]
+ dsa_g = keyptr[0:octets]
+ keyptr = keyptr[octets:]
+ dsa_y = keyptr[0:octets]
+ pubkey = Crypto.PublicKey.DSA.construct(
+ (Crypto.Util.number.bytes_to_long(dsa_y),
+ Crypto.Util.number.bytes_to_long(dsa_g),
+ Crypto.Util.number.bytes_to_long(dsa_p),
+ Crypto.Util.number.bytes_to_long(dsa_q)))
+ (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
+ sig = (Crypto.Util.number.bytes_to_long(dsa_r),
+ Crypto.Util.number.bytes_to_long(dsa_s))
+ else:
+ raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
+
+ hash.update(_to_rdata(rrsig, origin)[:18])
+ hash.update(rrsig.signer.to_digestable(origin))
+
+ if rrsig.labels < len(rrname) - 1:
+ suffix = rrname.split(rrsig.labels + 1)[1]
+ rrname = dns.name.from_text('*', suffix)
+ rrnamebuf = rrname.to_digestable(origin)
+ rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
+ rrsig.original_ttl)
+ rrlist = sorted(rdataset);
+ for rr in rrlist:
+ hash.update(rrnamebuf)
+ hash.update(rrfixed)
+ rrdata = rr.to_digestable(origin)
+ rrlen = struct.pack('!H', len(rrdata))
+ hash.update(rrlen)
+ hash.update(rrdata)
+
+ digest = hash.digest()
+
+ if _is_rsa(rrsig.algorithm):
+ # PKCS1 algorithm identifier goop
+ digest = _make_algorithm_id(rrsig.algorithm) + digest
+ padlen = keylen // 8 - len(digest) - 3
+ digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
+ elif _is_dsa(rrsig.algorithm):
+ pass
+ else:
+ # Raise here for code clarity; this won't actually ever happen
+ # since if the algorithm is really unknown we'd already have
+ # raised an exception above
+ raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
- if not pubkey.verify(digest, sig):
- raise ValidationFailure, 'verify failure'
+ if pubkey.verify(digest, sig):
+ return
+ raise ValidationFailure, 'verify failure'
def _validate(rrset, rrsigset, keys, origin=None, now=None):
"""Validate an RRset