Remove DNSKEY keytag uniqueness assumption (RFC 4034, section 8)
authorJames Dempsey <jamespd@gmail.com>
Wed, 27 Mar 2013 20:59:30 +0000 (09:59 +1300)
committerJames Dempsey <jamespd@gmail.com>
Wed, 27 Mar 2013 20:59:30 +0000 (09:59 +1300)
dns/dnssec.py
tests/dnssec.py

index dd6a27a53642fdbb8bb0b749b8510e40444db54e..4f6fc980a6260f0aac112687594f82dedbce3d80 100644 (file)
@@ -126,7 +126,8 @@ def make_ds(name, key, algorithm, origin=None):
     return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
                                len(dsrdata))
 
-def _find_key(keys, rrsig):
+def _find_candidate_keys(keys, rrsig):
+    candidate_keys=[]
     value = keys.get(rrsig.signer)
     if value is None:
         return None
@@ -141,8 +142,8 @@ def _find_key(keys, rrsig):
     for rdata in rdataset:
         if rdata.algorithm == rrsig.algorithm and \
                key_id(rdata) == rrsig.key_tag:
-            return rdata
-    return None
+            candidate_keys.append(rdata)
+    return candidate_keys
 
 def _is_rsa(algorithm):
     return algorithm in (RSAMD5, RSASHA1,
@@ -217,100 +218,101 @@ def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
     if isinstance(origin, (str, unicode)):
         origin = dns.name.from_text(origin, dns.name.root)
 
-    key = _find_key(keys, rrsig)
-    if not key:
-        raise ValidationFailure, 'unknown key'
-
-    # For convenience, allow the rrset to be specified as a (name, rdataset)
-    # tuple as well as a proper rrset
-    if isinstance(rrset, tuple):
-        rrname = rrset[0]
-        rdataset = rrset[1]
-    else:
-        rrname = rrset.name
-        rdataset = rrset
-
-    if now is None:
-        now = time.time()
-    if rrsig.expiration < now:
-        raise ValidationFailure, 'expired'
-    if rrsig.inception > now:
-        raise ValidationFailure, 'not yet valid'
-
-    hash = _make_hash(rrsig.algorithm)
-
-    if _is_rsa(rrsig.algorithm):
-        keyptr = key.key
-        (bytes,) = struct.unpack('!B', keyptr[0:1])
-        keyptr = keyptr[1:]
-        if bytes == 0:
-            (bytes,) = struct.unpack('!H', keyptr[0:2])
-            keyptr = keyptr[2:]
-        rsa_e = keyptr[0:bytes]
-        rsa_n = keyptr[bytes:]
-        keylen = len(rsa_n) * 8
-        pubkey = Crypto.PublicKey.RSA.construct(
-            (Crypto.Util.number.bytes_to_long(rsa_n),
-             Crypto.Util.number.bytes_to_long(rsa_e)))
-        sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
-    elif _is_dsa(rrsig.algorithm):
-        keyptr = key.key
-        (t,) = struct.unpack('!B', keyptr[0:1])
-        keyptr = keyptr[1:]
-        octets = 64 + t * 8
-        dsa_q = keyptr[0:20]
-        keyptr = keyptr[20:]
-        dsa_p = keyptr[0:octets]
-        keyptr = keyptr[octets:]
-        dsa_g = keyptr[0:octets]
-        keyptr = keyptr[octets:]
-        dsa_y = keyptr[0:octets]
-        pubkey = Crypto.PublicKey.DSA.construct(
-            (Crypto.Util.number.bytes_to_long(dsa_y),
-             Crypto.Util.number.bytes_to_long(dsa_g),
-             Crypto.Util.number.bytes_to_long(dsa_p),
-             Crypto.Util.number.bytes_to_long(dsa_q)))
-        (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
-        sig = (Crypto.Util.number.bytes_to_long(dsa_r),
-               Crypto.Util.number.bytes_to_long(dsa_s))
-    else:
-        raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
-
-    hash.update(_to_rdata(rrsig, origin)[:18])
-    hash.update(rrsig.signer.to_digestable(origin))
-
-    if rrsig.labels < len(rrname) - 1:
-        suffix = rrname.split(rrsig.labels + 1)[1]
-        rrname = dns.name.from_text('*', suffix)
-    rrnamebuf = rrname.to_digestable(origin)
-    rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
-                          rrsig.original_ttl)
-    rrlist = sorted(rdataset);
-    for rr in rrlist:
-        hash.update(rrnamebuf)
-        hash.update(rrfixed)
-        rrdata = rr.to_digestable(origin)
-        rrlen = struct.pack('!H', len(rrdata))
-        hash.update(rrlen)
-        hash.update(rrdata)
-
-    digest = hash.digest()
-
-    if _is_rsa(rrsig.algorithm):
-        # PKCS1 algorithm identifier goop
-        digest = _make_algorithm_id(rrsig.algorithm) + digest
-        padlen = keylen // 8 - len(digest) - 3
-        digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
-    elif _is_dsa(rrsig.algorithm):
-        pass
-    else:
-        # Raise here for code clarity; this won't actually ever happen
-        # since if the algorithm is really unknown we'd already have
-        # raised an exception above
-        raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
+    for candidate_key in _find_candidate_keys(keys, rrsig):
+        if not candidate_key:
+            raise ValidationFailure, 'unknown key'
+
+        # For convenience, allow the rrset to be specified as a (name, rdataset)
+        # tuple as well as a proper rrset
+        if isinstance(rrset, tuple):
+            rrname = rrset[0]
+            rdataset = rrset[1]
+        else:
+            rrname = rrset.name
+            rdataset = rrset
+
+        if now is None:
+            now = time.time()
+        if rrsig.expiration < now:
+            raise ValidationFailure, 'expired'
+        if rrsig.inception > now:
+            raise ValidationFailure, 'not yet valid'
+
+        hash = _make_hash(rrsig.algorithm)
+
+        if _is_rsa(rrsig.algorithm):
+            keyptr = candidate_key.key
+            (bytes,) = struct.unpack('!B', keyptr[0:1])
+            keyptr = keyptr[1:]
+            if bytes == 0:
+                (bytes,) = struct.unpack('!H', keyptr[0:2])
+                keyptr = keyptr[2:]
+            rsa_e = keyptr[0:bytes]
+            rsa_n = keyptr[bytes:]
+            keylen = len(rsa_n) * 8
+            pubkey = Crypto.PublicKey.RSA.construct(
+                (Crypto.Util.number.bytes_to_long(rsa_n),
+                 Crypto.Util.number.bytes_to_long(rsa_e)))
+            sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
+        elif _is_dsa(rrsig.algorithm):
+            keyptr = candidate_key.key
+            (t,) = struct.unpack('!B', keyptr[0:1])
+            keyptr = keyptr[1:]
+            octets = 64 + t * 8
+            dsa_q = keyptr[0:20]
+            keyptr = keyptr[20:]
+            dsa_p = keyptr[0:octets]
+            keyptr = keyptr[octets:]
+            dsa_g = keyptr[0:octets]
+            keyptr = keyptr[octets:]
+            dsa_y = keyptr[0:octets]
+            pubkey = Crypto.PublicKey.DSA.construct(
+                (Crypto.Util.number.bytes_to_long(dsa_y),
+                 Crypto.Util.number.bytes_to_long(dsa_g),
+                 Crypto.Util.number.bytes_to_long(dsa_p),
+                 Crypto.Util.number.bytes_to_long(dsa_q)))
+            (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
+            sig = (Crypto.Util.number.bytes_to_long(dsa_r),
+                   Crypto.Util.number.bytes_to_long(dsa_s))
+        else:
+            raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
+
+        hash.update(_to_rdata(rrsig, origin)[:18])
+        hash.update(rrsig.signer.to_digestable(origin))
+
+        if rrsig.labels < len(rrname) - 1:
+            suffix = rrname.split(rrsig.labels + 1)[1]
+            rrname = dns.name.from_text('*', suffix)
+        rrnamebuf = rrname.to_digestable(origin)
+        rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
+                              rrsig.original_ttl)
+        rrlist = sorted(rdataset);
+        for rr in rrlist:
+            hash.update(rrnamebuf)
+            hash.update(rrfixed)
+            rrdata = rr.to_digestable(origin)
+            rrlen = struct.pack('!H', len(rrdata))
+            hash.update(rrlen)
+            hash.update(rrdata)
+
+        digest = hash.digest()
+
+        if _is_rsa(rrsig.algorithm):
+            # PKCS1 algorithm identifier goop
+            digest = _make_algorithm_id(rrsig.algorithm) + digest
+            padlen = keylen // 8 - len(digest) - 3
+            digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
+        elif _is_dsa(rrsig.algorithm):
+            pass
+        else:
+            # Raise here for code clarity; this won't actually ever happen
+            # since if the algorithm is really unknown we'd already have
+            # raised an exception above
+            raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
 
-    if not pubkey.verify(digest, sig):
-        raise ValidationFailure, 'verify failure'
+        if pubkey.verify(digest, sig):
+            return
+    raise ValidationFailure, 'verify failure'
 
 def _validate(rrset, rrsigset, keys, origin=None, now=None):
     """Validate an RRset
index 7b4546a08657c9d2c4c0f51fec466a7b19b711a7..b997b176b12fd5fe29808304a0c8dddec5217c61 100644 (file)
@@ -30,6 +30,13 @@ abs_keys = { abs_dnspython_org :
                                  '256 3 5 AwEAAdSSghOGjU33IQZgwZM2Hh771VGXX05olJK49FxpSyuEAjDBXY58 LGU9R2Zgeecnk/b9EAhFu/vCV9oECtiTCvwuVAkt9YEweqYDluQInmgP NGMJCKdSLlnX93DkjDw8rMYv5dqXCuSGPlKChfTJOLQxIAxGloS7lL+c 0CTZydAF')
          }
 
+abs_keys_duplicate_keytag = { abs_dnspython_org :
+             dns.rrset.from_text('dnspython.org.', 3600, 'IN', 'DNSKEY',
+                                 '257 3 5 AwEAAenVTr9L1OMlL1/N2ta0Qj9LLLnnmFWIr1dJoAsWM9BQfsbV7kFZ XbAkER/FY9Ji2o7cELxBwAsVBuWn6IUUAJXLH74YbC1anY0lifjgt29z SwDzuB7zmC7yVYZzUunBulVW4zT0tg1aePbpVL2EtTL8VzREqbJbE25R KuQYHZtFwG8S4iBxJUmT2Bbd0921LLxSQgVoFXlQx/gFV2+UERXcJ5ce iX6A6wc02M/pdg/YbJd2rBa0MYL3/Fz/Xltre0tqsImZGxzi6YtYDs45 NC8gH+44egz82e2DATCVM1ICPmRDjXYTLldQiWA2ZXIWnK0iitl5ue24 7EsWJefrIhE=',
+                                 '256 3 5 AwEAAdSSg++++THIS/IS/NOT/THE/CORRECT/KEY++++++++++++++++ ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ AaOSydAF',
+                                 '256 3 5 AwEAAdSSghOGjU33IQZgwZM2Hh771VGXX05olJK49FxpSyuEAjDBXY58 LGU9R2Zgeecnk/b9EAhFu/vCV9oECtiTCvwuVAkt9YEweqYDluQInmgP NGMJCKdSLlnX93DkjDw8rMYv5dqXCuSGPlKChfTJOLQxIAxGloS7lL+c 0CTZydAF')
+         }
+
 rel_keys = { dns.name.empty :
              dns.rrset.from_text('@', 3600, 'IN', 'DNSKEY',
                                  '257 3 5 AwEAAenVTr9L1OMlL1/N2ta0Qj9LLLnnmFWIr1dJoAsWM9BQfsbV7kFZ XbAkER/FY9Ji2o7cELxBwAsVBuWn6IUUAJXLH74YbC1anY0lifjgt29z SwDzuB7zmC7yVYZzUunBulVW4zT0tg1aePbpVL2EtTL8VzREqbJbE25R KuQYHZtFwG8S4iBxJUmT2Bbd0921LLxSQgVoFXlQx/gFV2+UERXcJ5ce iX6A6wc02M/pdg/YbJd2rBa0MYL3/Fz/Xltre0tqsImZGxzi6YtYDs45 NC8gH+44egz82e2DATCVM1ICPmRDjXYTLldQiWA2ZXIWnK0iitl5ue24 7EsWJefrIhE=',
@@ -95,6 +102,9 @@ class DNSSECValidatorTestCase(unittest.TestCase):
     def testAbsoluteRSAGood(self):
         dns.dnssec.validate(abs_soa, abs_soa_rrsig, abs_keys, None, when)
 
+    def testDuplicateKeytag(self):
+        dns.dnssec.validate(abs_soa, abs_soa_rrsig, abs_keys_duplicate_keytag, None, when)
+
     def testAbsoluteRSABad(self):
         def bad():
             dns.dnssec.validate(abs_other_soa, abs_soa_rrsig, abs_keys, None,