from abc import ABCMeta, abstractmethod
from hashlib import sha256, sha512
from hmac import new as new_hmac
from math import ceil, floor, log2
from os import urandom
from typing import Any, Callable, List, Optional
__author__ = "Harald Heckmann"
__copyright__ = "Harald Heckmann"
__license__ = "mit"
[docs]def openssl_sha256(message: bytes) -> bytes:
""" Hash function for signature and public key generation
This functions wraps a hashfunction in a way that it takes a byte-sequence
as an argument and returns the hash of that byte-sequence
Args:
message: Byte-sequence to be hashed
Returns:
Sha256 hash
"""
return sha256(message).digest()
[docs]def hmac_openssl_sha256(message: bytes, key: bytes) -> bytes:
""" Peudo random function for key and bitmask generation
This functions wraps a pseudo random function in a way that it takes a
byte-sequence as an argument and returns a value which can be used for
further generation of keys.
Args:
message: Byte-sequence to be hashed
key: key to be used
Returns:
HMAC-sha256 hash
"""
return new_hmac(key=key, msg=message, digestmod=sha256).digest()
[docs]def openssl_sha512(message: bytes) -> bytes:
""" Hash function for signature and public key generation
This functions wraps a hashfunction in a way that it takes a byte-sequence
as an argument and returns the hash of that byte-sequence
Args:
message: Byte-sequence to be hashed
Returns:
Sha512 hash
"""
return sha512(message).digest()
# Abstract definition of OTS class
[docs]class AbstractOTS(object, metaclass=ABCMeta):
""" OTS base class
Every class implementing OTS schemes in this package should implement the
functions defined in this base class
"""
[docs] @abstractmethod
def sign(message: bytes) -> dict:
""" Sign a message
This function will create a valid signature for a message on success
Args:
message: Encoded message to sign
Returns:
A dictionary containing all the information required to verify
the message. Minimal structure::
{
"w": winternitz parameter (Type: int),
"algorithm": OTS algorithm used (Type: string),
"hashalgo": hash algorithm (Type: str),
"digestsize": hash byte count (Type: int),
"pubkey": public key (Type: List[bytes]),
"signature": signature (Type: List[bytes])
}
"""
raise NotImplementedError("sign is essential for WOTS signatures")
[docs] @abstractmethod
def verify(message: bytes, signature: List[bytes]) -> bool:
""" Verify a message
Verify whether a signature is valid for a message
Args:
message: Encoded message to verify
signature: Signature that will be used to verify the message
Returns:
Whether the verification succeded
"""
raise NotImplementedError("verify is essential for WOTS signatures")
[docs] @abstractmethod
def getPubkeyFromSignature(message: bytes, signature: List[bytes]) -> bool:
""" Get pubkey derived from signature
Using this function you can get the public key from the signature. In
contrast to verify(...), this function does not match the public key
against the public key stored in the executing object. Getting the
public key without comparing it can be useful if it is verified
with another method, e.g. by deriving a XMSS root node (public key)
and comparing it against a published XMSS public key.
Args:
message: Encoded message that was signed with **signature**
signature: Signature for **message**
Returns:
WOTS public key derived from signature
"""
raise NotImplementedError("getPubkeyFromSignature is essential "
+ "for WOTS signatures")
@abstractmethod
def __eq__(self, obj: Any) -> bool:
""" Object equality check
Compare the relevant data within the called object and obj
Args:
obj: The object to compare the called object with
Returns:
Whether the the calling object and obj are equal
"""
raise NotImplementedError("Equality checks are required")
@abstractmethod
def __ne__(self, obj: Any) -> bool:
""" Object non-equality check
Compare the relevant data within the called object and obj
Args:
obj: The object to compare the called object with
Returns:
Whether the the calling object and obj are not equal
"""
raise NotImplementedError("Non-equality checks are required")
# Paper describing WOTS: https://eprint.iacr.org/2011/191.pdf
# "On the Security of the Winternitz One-Time Signature Scheme"
[docs]class WOTS(AbstractOTS):
""" Winternitz One-Time-Signature
Fully configurable class in regards to Winternitz paramter, hash function,
private key and public key
"""
slots = ["__weakref__", "__w", "__hashfunction", "__digestsize",
"__privkey", "__pubkey", "__msg_key_count", "__cs_key_count",
"__key_count"]
[docs] def __init__(self,
w: int = 16,
hashfunction: Callable = openssl_sha512,
digestsize: int = 512,
privkey: Optional[List[bytes]] = None,
pubkey: Optional[List[bytes]] = None) -> None:
""" Initialize WOTS object
Define the parameters required to sign and verify a message
Args:
w: The Winternitz parameter. A higher value reduces
the space complexity, but increases the time
complexity. It must be greater than 1 but less or
equal than :math:`2^{digestsize}`. To get the best
space to time complexity ratio, choose a value that
is a power of two.
hashfunction: The hashfunction which will be used to derive
signatures and public keys. Specify a function
which takes bytes as an argument and returns
bytes that represent the hash.
digestsize: The number of bits that will be emitted by the
specified hash function.
privkey: The private key to be used for signing operations.
Leave None if it should be generated. In this case
it will be generated when it is required.
pubkey: The public key to be used for verifying signatures.
Do not specify it if a private key was specified
or if it should be derived. It will be derived
when it is required.
"""
self.__w = w
if not (2 <= w <= (1 << digestsize)):
raise ValueError("Rule broken: 2 <= w <= 2^digestsize")
# Calculate number of message keys, checksum keys and total keys
self.__msg_key_count = int(ceil(digestsize / log2(w)))
self.__cs_key_count = int(floor(log2(self.__msg_key_count *
(w - 1)) / log2(w)) + 1)
self.__key_count = self.__msg_key_count + self.__cs_key_count
# Hashing algorithm
self.__hashfunction = hashfunction
self.__digestsize = digestsize
# Keys
self.__privkey = None
self.__pubkey = None
hash_bytes = int(ceil(digestsize / 8))
# set privkey
if privkey is not None:
if len(privkey) != self.__key_count:
raise ValueError("Provided private key length does not match "
+ "with the provided winternitz parameter")
self.__privkey = privkey.copy()
# set pubkey, but only is privkey is not set
elif pubkey is not None:
if len(pubkey) != self.__key_count:
raise ValueError("Provided public key length does not match "
+ "with the provided winternitz parameter")
for elem in filter(lambda t: len(t) != hash_bytes, pubkey):
raise ValueError("Length of public key hashes does not match "
+ "with the provided digestsize")
self.__pubkey = pubkey.copy()
[docs] def __repr__(self) -> str:
""" Get representation of the object
This function returns a string which is a line of code which can be
executed, if you have imported this module using the command
"import winternitz.signatures". This code can either be manually
executed or evaluated and executed with the command eval(code).
Returns:
A line of code which represents this object
"""
repr = "winternitz.signatures.WOTS(w={}, hashfunction={}, " + \
"digestsize={}, "
repr = repr.format(self.__w, str(self.__hashfunction.__module__) +
"." + str(self.__hashfunction.__qualname__),
self.__digestsize)
if self.__privkey is None and self.__pubkey is not None:
return repr + "pubkey=" + str(self.__pubkey) + ")"
return repr + "privkey=" + str(self.privkey) + ")"
def __str__(self) -> str:
fstr = "Package: winternitz\n: signatures\nClass: WOTS\n" + \
"Winternitz Parameter: {}\nHash algorithm: {}\n" + \
"Digest size: {}\n"
privkey = self.privkey # only copy once if at all
if privkey is not None:
fstr += "Private key:\n"
for idx, key in enumerate(self.privkey):
fstr += "\t[{}] {}\n".format(idx,
hex(int.from_bytes(key, "big")))
fstr += "Public key:\n"
for idx, key in enumerate(self.pubkey):
fstr += "\t[{}] {}\n".format(idx,
hex(int.from_bytes(key, "big")))
return fstr.format(self.__w, self.hashfunction.__qualname__,
self.digestsize)
def __eq__(self, obj: Any) -> bool:
return isinstance(obj, self.__class__) and self.__w == obj.w and \
self.__hashfunction == obj.hashfunction and self.__digestsize == \
obj.digestsize and self.privkey == obj.privkey and \
self.pubkey == obj.pubkey
def __ne__(self, obj: Any) -> bool:
return not self.__eq__(obj)
@property
def privkey(self) -> List[bytes]:
""" Private key getter
Get a copy of the private key
Returns:
Copy of the private key
"""
if self.__privkey is None:
if self.__pubkey is None:
random_bytes = int(ceil(self.__digestsize / 8))
self.__privkey = [urandom(random_bytes)
for pk in range(self.__key_count)]
else:
return None
# return a copy
return self.__privkey.copy() # note: cannot use [*list] in py < 3.5
@property
def pubkey(self) -> List[bytes]:
""" Public key getter
Get a copy of the public key
Returns:
Copy of the public key
"""
if self.__pubkey is None:
self.__pubkey = [self._chain(privkey, 0, self.__w - 1)
for privkey in self.privkey]
return self.__pubkey.copy() # note: cannot use [*list] in py < 3.5
@property
def w(self) -> int:
""" Winternitz parameter getter
Get the Winternitz parameter
Returns:
Winternitz parameter
"""
return self.__w
@property
def hashfunction(self) -> Callable:
""" Hash function getter
Get a reference to the current hash function
Returns:
Reference to hash function
"""
return self.__hashfunction
@property
def digestsize(self) -> int:
""" Digest size getter
Get the digest size of the hash function
Returns:
Digest size of the hash function
"""
return self.__digestsize
[docs] def _chain(self, value: bytes, startidx: int, endidx: int) -> bytes:
""" Chaining function
Core function. It derives hash values which could either represent
a part of a signature or a part of the public key.
Args:
value: Current hash
startidx: Current position of **value** in the hash chain
endidx: Desired position in the hash chain
Returns:
Returns the hash at the position *endidx* in the hash chain
"""
for i in range(startidx, endidx):
value = self.__hashfunction(value)
return value
[docs] def _checksum(self, values: List[int]) -> int:
""" Create checksum for a signature
This helper function is used during the generation and verification
of a signature. It calculates a checksum, which is appenede to the
signatures. It prevents man-in-the-middle attacks.
Args:
values: List containing the signatures for each bit group
Returns:
Checksum
"""
# Inverse sum checksum
result = 0
for value in values:
result += self.__w - 1 - value
return result
[docs] def _numberToBase(self, num: int, base: int) -> List[int]:
""" Base conversion
Using this function one can convert any number to another base
Args:
num: Number to convert
base: base to convert *num* in
Returns:
List containing each digit in base *base* representation. The
digits are stored as decimal numbers.
"""
if num == 0:
return [0]
digits = []
while num:
digits.append(int(num % base))
num //= base
return digits[::-1]
[docs] def _getSignatureBaseMessage(self, msghash: bytes) -> List[bytes]:
""" Get byte-sequences to sign
This functions creates a list of byte-sequences, which will be
converted to a signature or public key by the chaining function.
Args:
msghash: Fingerprint of the message which will be signed
Returns:
Blocks of the message hash which each will be signed
"""
msgnum = int.from_bytes(msghash, "big")
msg_to_sign = self._numberToBase(msgnum, self.__w)
if (len(msg_to_sign) > self.__msg_key_count):
err = "The fingerprint of the message could not be split into the"\
+ " expected amount of bitgroups. This is most likely "\
+ "because the digestsize specified does not match to the " \
+ " real digestsize of the specified hashfunction Excepted:"\
+ " {} bitgroups\nGot: {} bitgroups"
raise IndexError(err.format(self.__msg_key_count,
len(msg_to_sign)))
msg_to_sign += [0] * (self.__msg_key_count - len(msg_to_sign)) # pad
checksum = self._numberToBase(self._checksum(msg_to_sign), self.__w)
checksum += [0] * (self.__cs_key_count - len(checksum)) # pad
return msg_to_sign + checksum
[docs] def sign(self, message: bytes) -> dict:
privkey = self.privkey
if privkey is None:
raise ValueError("Unable to sign the message, only a public key "
+ "was specified")
msghash = self.__hashfunction(message)
msg_to_sign = self._getSignatureBaseMessage(msghash)
signature = [self._chain(privkey[idx], 0, val)
for idx, val in enumerate(msg_to_sign)]
# If the pubkey is not set yet, derive it from the signature
if (self.__pubkey is None):
self.__pubkey = [self._chain(signature[idx], val,
self.__w - 1)
for idx, val in enumerate(msg_to_sign)]
return {
"algorithm": "WOTS",
"signature": signature,
"pubkey": self.__pubkey.copy(),
"w": self.__w,
"hashalgo": self.__hashfunction.__qualname__,
"digestsize": self.__digestsize
}
[docs] def getPubkeyFromSignature(self, message: bytes,
signature: List[bytes]) -> List[bytes]:
if len(signature) != self.__key_count:
return False
msghash = self.__hashfunction(message)
msg_to_verify = self._getSignatureBaseMessage(msghash)
return [self._chain(signature[idx], val, self.__w - 1)
for idx, val in enumerate(msg_to_verify)]
[docs] def verify(self, message: bytes, signature: List[bytes]) -> bool:
pubkey = self.getPubkeyFromSignature(message, signature)
return True if pubkey == self.pubkey else False
# Paper descirbing WOTS+: https://eprint.iacr.org/2017/965.pdf
# "W-OTS+ – Shorter Signatures for Hash-Based Signature Schemes"
[docs]class WOTSPLUS(WOTS):
""" Winternitz One-Time-Signature Plus
Fully configurable class in regards to Winternitz paramter, hash function,
pseudo random function, seed, private key and public key
"""
slots = ["__weakref__", "__seed", "__prf"]
[docs] def __init__(self,
w: int = 16,
hashfunction: Callable = openssl_sha256,
prf: Callable = hmac_openssl_sha256,
digestsize: int = 256,
seed: Optional[bytes] = None,
privkey: Optional[List[bytes]] = None,
pubkey: Optional[List[bytes]] = None):
""" Initialize WOTS object
Define under which circumstances a message should be signed or verified
Args:
w: The Winternitz parameter. A higher value reduces
the space complexity, but increases the time
complexity. It must be greater than 1 but less than
:math: 2^{digestsize}. To get the best space to
time complexity ratio, choose a value that is a
power of two.
hashfunction: The hashfunction which will be used to derive
signatures and public keys. Specify a function
which takes bytes as an argument and returns
bytes that represent the hash.
digestsize: The number of bits that will be emitted by the
specified hash function.
privkey: The private key to be used for signing operations.
Leave None if it should be generated. In this case
it will be generated when it is required.
pubkey: The public key to be used for verifying signatures.
Do not specify it if a private key was specified
or if it should be derived. It will be derived
when it is required.
seed: Seed which is used in the pseudo random function to
generate bitmasks.
prf: Pseudo random function which is used to generate
the bitmasks.
"""
super().__init__(w=w, hashfunction=hashfunction,
digestsize=digestsize, privkey=privkey,
pubkey=pubkey)
self.__seed = seed
self.__prf = prf
[docs] def __repr__(self) -> str:
return super().__repr__().replace("WOTS", "WOTSPLUS")[:-1] + \
", seed={}, prf={})".format(str(self.seed),
str(self.prf.__module__) + "." +
str(self.prf.__qualname__))
def __str__(self) -> str:
sstr = super().__str__().replace("WOTS", "WOTSPLUS")
strsplit = sstr.split("Public key:" if self.privkey is None else
"Private key:")
result = strsplit[0] \
+ "Pseudo random function: " + str(self.prf.__qualname__) \
+ "\nSeed: " + hex(int.from_bytes(self.seed, "big")) \
+ ("\nPublic key: " if self.privkey is None else
"\nPrivate key: ") + strsplit[1]
return result
def __eq__(self, obj) -> bool:
return super().__eq__(obj) and isinstance(obj, self.__class__) and \
self.seed == obj.seed and self.prf == obj.prf
def __ne__(self, obj) -> bool:
return not self.__eq__(obj)
@property
def seed(self) -> bytes:
""" Seed getter
Get the seed which is used in the pseudo random function to generate
the bitmasks.
Returns:
Seed for pseudo random function
"""
if self.__seed is None:
self.__seed = urandom(int(ceil(self.digestsize / 8)))
return self.__seed
@property
def prf(self) -> Callable:
""" Pseudo random function getter
Get the pseudo random function. It is used to generate the bitmasks.
Returns:
Reference to the pseudo random function
"""
return self.__prf
[docs] def _chain(self, value: bytes, startidx: int, endidx: int) -> bytes:
""" Chaining function
Core function. It derives hash values which could either represent
a part of a signature or a part of the public key.
Args:
value: Current hash
startidx: Current position of **value** in the hash chain
endidx: Desired position in the hash chain
Returns:
Returns the hash at the position *endidx* in the hash chain
"""
# Generate seed if it is not set yet
_ = self.seed # noqa: F841
digestsize_bytes = int(ceil(self.digestsize / 8))
for i in range(startidx, endidx):
bm = self.__prf(message=value, key=self.__seed)
tohash_b = (int.from_bytes(value, "big") ^
int.from_bytes(bm, "big"))
tohash = tohash_b.to_bytes(digestsize_bytes, "big")
value = self.hashfunction(tohash)
return value
[docs] def sign(self, message: bytes) -> dict:
""" Sign a message
This function will create a valid signature for a message on success
Args:
message: Encoded message to sign
Returns:
A dictionary containing all the information required to verify
the message. Structure::
{
"w": winternitz parameter (Type: int),
"algorithm": "WOTS+" (Type: string),
"hashalgo": hash algorithm (Type: str),
"digestsize": hash byte count (Type: int),
"pubkey": public key (Type: List[bytes]),
"prf": pseudo random function (Type: str),
"seed": Seed used in prf (Type: bytes),
"signature": signature (Type: List[bytes])
}
"""
ret = super().sign(message)
ret["algorithm"] = "WOTS+"
ret["prf"] = self.__prf.__qualname__
ret["seed"] = self.__seed
return ret
[docs] def verify(self, message: bytes, signature: List[bytes]) -> bool:
return super().verify(message=message, signature=signature)