diff --git a/nkeys/__init__.py b/nkeys/__init__.py index 7b13d4a..58d8568 100644 --- a/nkeys/__init__.py +++ b/nkeys/__init__.py @@ -79,6 +79,49 @@ def decode_seed(src): return (prefix, result) +def encode_seed(src, prefix): + """ + :param src: A bytestring of length 32, used as the seed + for an nkey. + :param prefix: A prefix describing the nkey roll, one of: + PREFIX_BYTE_SERVER, PREFIX_BYTE_CLUSTER, + PREFIX_BYTE_OPERATOR, PREFIX_BYTE_ACCOUNT, + or PREFIX_BYTE_USER + + :rtype bytestring: + :return: nkey-encoded seed + """ + + if not valid_public_prefix_byte(prefix): + raise ErrInvalidPrefixByte() + + if len(src) != 32: + raise ErrInvalidSeedLen() + + # The first five bits of the first byte + # contain the first base32 character, + # encoding 'S' for seed. + # The last three bytes of the first byte + # contain the first three bits of the second + # base32 character, which encodes the roll. + first_byte = PREFIX_BYTE_SEED | prefix >> 5 + + # The forth and fifth bits of the the second byte + # contain the last two bits of the second base32 + # charater, which encodes the roll. + # Note that: + # 31 decimal == 00011111 binary + # and is therefore a mask for the last 5 bits. + # Note that the last three bits of the second byte + # are entirely unused. + second_byte = (31 & prefix) << 3 + + header = bytearray([first_byte, second_byte]) + checksum = crc16_checksum(header + bytearray(src)) + final_bytes = bytes(header) + src + bytes(checksum) + return base64.b32encode(final_bytes).rstrip(b'=') + + def valid_public_prefix_byte(prefix): if prefix == PREFIX_BYTE_OPERATOR \ or prefix == PREFIX_BYTE_SERVER \ @@ -482,6 +525,11 @@ def crc16(data): return crc +def crc16_checksum(data): + crc = crc16(data) + return crc.to_bytes(2, byteorder='little') + + class NkeysError(Exception): pass diff --git a/tests/nkeys_test.py b/tests/nkeys_test.py index c5de403..3bbefb7 100644 --- a/tests/nkeys_test.py +++ b/tests/nkeys_test.py @@ -17,6 +17,15 @@ import nkeys import binascii import base64 +import os + +PREFIXES = [ + nkeys.PREFIX_BYTE_OPERATOR, + nkeys.PREFIX_BYTE_SERVER, + nkeys.PREFIX_BYTE_CLUSTER, + nkeys.PREFIX_BYTE_ACCOUNT, + nkeys.PREFIX_BYTE_USER +] class NatsTestCase(unittest.TestCase): @@ -171,6 +180,21 @@ def test_keypair_private_key(self): with self.assertRaises(AttributeError): kp._private_key + def test_roundtrip_seed_encoding(self): + # This test is a low-tech property test in disguise, + # testing the property: + # decode . encode == identity + # Using a proper framework like hypothesis might be preferable. + num_trials = 500 + raw_seeds = [os.urandom(32) for _ in range(num_trials)] + for raw_seed in raw_seeds: + for prefix in PREFIXES: + with self.subTest(rawseed=raw_seed, prefix=prefix): + encoded_seed = nkeys.encode_seed(raw_seed, prefix) + decoded_prefix, decoded_seed = nkeys.decode_seed(encoded_seed) + self.assertEqual(prefix, decoded_prefix) + self.assertEqual(raw_seed, decoded_seed) + if __name__ == '__main__': runner = unittest.TextTestRunner(stream=sys.stdout)