moonstream/engineapi/engineapi/signatures.py

326 wiersze
11 KiB
Python

"""
Signing and signature verification functionality and interfaces.
"""
import abc
import logging
import json
from typing import Any, List, Optional, Union
import boto3
from web3 import Web3
from eth_account import Account
from eth_account.messages import encode_defunct
from eth_account._utils.signing import sign_message_hash
import eth_keys
import requests
from hexbytes import HexBytes
from .settings import (
SIGNER_KEYSTORE,
SIGNER_PASSWORD,
MOONSTREAM_SIGNING_SERVER_IP,
AWS_DEFAULT_REGION,
MOONSTREAM_AWS_SIGNER_LAUNCH_TEMPLATE_ID,
MOONSTREAM_AWS_SIGNER_IMAGE_ID,
MOONSTREAM_AWS_SIGNER_INSTANCE_PORT,
)
logger = logging.getLogger(__name__)
aws_client = boto3.client("ec2", region_name=AWS_DEFAULT_REGION)
class AWSDescribeInstancesFail(Exception):
"""
Raised when AWS describe instances command failed.
"""
class AWSRunInstancesFail(Exception):
"""
Raised when AWS run instances command failed.
"""
class AWSTerminateInstancesFail(Exception):
"""
Raised when AWS terminate instances command failed.
"""
class SigningInstancesNotFound(Exception):
"""
Raised when signing instances with the given ids is not found in at AWS.
"""
class SigningInstancesTerminationLimitExceeded(Exception):
"""
Raised when provided several instances to termination.
"""
class SignWithInstanceFail(Exception):
"""
Raised when failed signing of message with instance server.
"""
class Signer:
@abc.abstractmethod
def sign_message(self, message):
pass
@abc.abstractmethod
def refresh_signer(self):
pass
@abc.abstractmethod
def batch_sign_message(self, messages_list):
pass
class AccountSigner(Signer):
"""
Simple implementation of a signer that uses a Brownie account to sign messages.
"""
def __init__(self, private_key: HexBytes) -> None:
self.private_key = private_key
def sign_message(self, message):
eth_private_key = eth_keys.keys.PrivateKey(self.private_key)
message_hash_bytes = HexBytes(message)
_, _, _, signed_message_bytes = sign_message_hash(
eth_private_key, message_hash_bytes
)
return signed_message_bytes.hex()
def batch_sign_message(self, messages_list: List[str]):
signed_messages_list = {}
for message in messages_list:
eth_private_key = eth_keys.keys.PrivateKey(self.private_key)
message_hash_bytes = HexBytes(message)
_, _, _, signed_message_bytes = sign_message_hash(
eth_private_key, message_hash_bytes
)
signed_messages_list[message.hex()] = signed_message_bytes.hex()
return signed_messages_list
def create_account_signer(keystore: str, password: str) -> AccountSigner:
with open(keystore) as keystore_file:
keystore_data = json.load(keystore_file)
private_key = Account.decrypt(keystore_data, password)
signer = AccountSigner(private_key)
return signer
class InstanceSigner(Signer):
"""
AWS instance server signer.
"""
def __init__(self, ip: Optional[str] = None) -> None:
self.current_signer_uri = None
if ip is not None:
self.current_signer_uri = (
f"http://{ip}:{MOONSTREAM_AWS_SIGNER_INSTANCE_PORT}/sign"
)
self.current_signer_batch_uri = (
f"http://{ip}:{MOONSTREAM_AWS_SIGNER_INSTANCE_PORT}/batchsign"
)
def clean_signer(self) -> None:
self.current_signer_uri = None
self.current_signer_batch_uri = None
def refresh_signer(self) -> None:
try:
instances = list_signing_instances([])
except AWSDescribeInstancesFail:
raise AWSDescribeInstancesFail("AWS describe instances command failed")
except Exception as err:
logger.error(f"AWS describe instances command failed: {err}")
raise SignWithInstanceFail("AWS describe instances command failed")
if len(instances) != 1:
raise SignWithInstanceFail("Unsupported number of signing instances")
self.current_signer_uri = f"http://{instances[0]['private_ip_address']}:{MOONSTREAM_AWS_SIGNER_INSTANCE_PORT}/sign"
self.current_signer_batch_uri = f"http://{instances[0]['private_ip_address']}:{MOONSTREAM_AWS_SIGNER_INSTANCE_PORT}/batchsign"
def sign_message(self, message: str):
# TODO(kompotkot): What to do if self.current_signer_uri is not None but the signing server went down?
if self.current_signer_uri is None:
self.refresh_signer()
signed_message = ""
try:
resp = requests.post(
self.current_signer_uri,
headers={"Content-Type": "application/json"},
json={"unsigned_data": str(message)},
)
resp.raise_for_status()
body = resp.json()
signed_message = body["signed_data"]
except Exception as err:
logger.error(f"Failed signing of message with instance server, {err}")
raise SignWithInstanceFail("Failed signing of message with instance server")
# Hack as per: https://medium.com/@yaoshiang/ethereums-ecrecover-openzeppelin-s-ecdsa-and-web3-s-sign-8ff8d16595e1
signature = signed_message[2:]
if signature[-2:] == "00":
signature = f"{signature[:-2]}1b"
elif signature[-2:] == "01":
signature = f"{signature[:-2]}1c"
else:
raise SignWithInstanceFail(
f"Unexpected v-value on signed message: {signed_message[-2:]}"
)
return signature
def batch_sign_message(self, messages_list: List[str]):
if self.current_signer_uri is None:
self.refresh_signer()
try:
resp = requests.post(
self.current_signer_batch_uri,
headers={"Content-Type": "application/json"},
json={"unsigned_data": [str(message) for message in messages_list]},
)
resp.raise_for_status()
signed_messages = resp.json()["signed_data"]
except Exception as err:
logger.error(f"Failed signing of message with instance server, {err}")
raise SignWithInstanceFail("Failed signing of message with instance server")
results = {}
# Hack as per: https://medium.com/@yaoshiang/ethereums-ecrecover-openzeppelin-s-ecdsa-and-web3-s-sign-8ff8d16595e1
for unsigned_message, signed_message in signed_messages.items():
signature = signed_message[2:]
if signature[-2:] == "00":
signature = f"{signature[:-2]}1b"
elif signature[-2:] == "01":
signature = f"{signature[:-2]}1c"
else:
raise SignWithInstanceFail(
f"Unexpected v-value on signed message: {signed_message[-2:]}"
)
results[unsigned_message] = signature
return results
DROP_SIGNER: Optional[Signer] = None
if SIGNER_KEYSTORE is not None and SIGNER_PASSWORD is not None:
DROP_SIGNER = create_account_signer(SIGNER_KEYSTORE, SIGNER_PASSWORD)
if DROP_SIGNER is None:
DROP_SIGNER = InstanceSigner(MOONSTREAM_SIGNING_SERVER_IP)
def list_signing_instances(
signing_instances: List[str],
) -> List[Any]:
"""
Return a list of signing instances with IPs.
"""
described_instances = []
try:
described_instances_response = aws_client.describe_instances(
Filters=[
{"Name": "image-id", "Values": [MOONSTREAM_AWS_SIGNER_IMAGE_ID]},
{"Name": "tag:Application", "Values": ["signer"]},
],
InstanceIds=signing_instances,
)
for r in described_instances_response["Reservations"]:
for i in r["Instances"]:
described_instances.append(
{
"instance_id": i["InstanceId"],
"private_ip_address": i["PrivateIpAddress"],
}
)
except Exception as err:
logger.error(f"AWS describe instances command failed: {err}")
raise AWSDescribeInstancesFail("AWS describe instances command failed.")
return described_instances
def wakeup_signing_instances(run_confirmed=False, dry_run=True) -> List[str]:
"""
Run new signing instances.
"""
run_instances = []
if run_confirmed:
try:
run_instances_response = aws_client.run_instances(
LaunchTemplate={
"LaunchTemplateId": MOONSTREAM_AWS_SIGNER_LAUNCH_TEMPLATE_ID
},
MinCount=1,
MaxCount=1,
DryRun=dry_run,
)
for i in run_instances_response["Instances"]:
run_instances.append(i["InstanceId"])
except Exception as err:
logger.error(f"AWS run instances command failed: {err}")
raise AWSRunInstancesFail("AWS run instances command failed")
return run_instances
def sleep_signing_instances(
signing_instances: List[str], termination_confirmed=False, dry_run=True
) -> List[str]:
"""
Fetch, describe, verify signing instances and terminate them.
"""
if len(signing_instances) == 0:
raise SigningInstancesNotFound("There are no signing instances to describe")
described_instances = []
try:
described_instances_response = list_signing_instances(signing_instances)
for i in described_instances_response:
described_instances.append(i["instance_id"])
except Exception as err:
logger.error(f"AWS describe instances command failed: {err}")
raise AWSDescribeInstancesFail("AWS describe instances command failed.")
if len(described_instances) == 0:
raise SigningInstancesNotFound(
"Signing instances with the given ids is not found in at AWS."
)
if len(described_instances) > 1:
raise SigningInstancesTerminationLimitExceeded(
f"Provided {len(described_instances)} instances to termination"
)
terminated_instances = []
if termination_confirmed:
try:
terminated_instances_response = aws_client.terminate_instances(
InstanceIds=described_instances,
DryRun=dry_run,
)
for i in terminated_instances_response["TerminatingInstances"]:
terminated_instances.append(i["InstanceId"])
except Exception as err:
logger.error(
f"Unable to terminate instance {described_instances}, error: {err}"
)
raise AWSTerminateInstancesFail("AWS terminate instances command failed")
return terminated_instances