moonstream/engineapi/engineapi/signatures.py

326 wiersze
11 KiB
Python
Czysty Zwykły widok Historia

2023-06-06 12:11:49 +00:00
"""
Signing and signature verification functionality and interfaces.
"""
import abc
import logging
import json
from typing import Any, List, Optional, Union
import boto3
from web3 import Web3
from eth_account import Account
from eth_account.messages import encode_defunct
from eth_account._utils.signing import sign_message_hash
import eth_keys
import requests
from hexbytes import HexBytes
from .settings import (
SIGNER_KEYSTORE,
SIGNER_PASSWORD,
MOONSTREAM_SIGNING_SERVER_IP,
AWS_DEFAULT_REGION,
MOONSTREAM_AWS_SIGNER_LAUNCH_TEMPLATE_ID,
MOONSTREAM_AWS_SIGNER_IMAGE_ID,
MOONSTREAM_AWS_SIGNER_INSTANCE_PORT,
)
logger = logging.getLogger(__name__)
aws_client = boto3.client("ec2", region_name=AWS_DEFAULT_REGION)
class AWSDescribeInstancesFail(Exception):
"""
Raised when AWS describe instances command failed.
"""
class AWSRunInstancesFail(Exception):
"""
Raised when AWS run instances command failed.
"""
class AWSTerminateInstancesFail(Exception):
"""
Raised when AWS terminate instances command failed.
"""
class SigningInstancesNotFound(Exception):
"""
Raised when signing instances with the given ids is not found in at AWS.
"""
class SigningInstancesTerminationLimitExceeded(Exception):
"""
Raised when provided several instances to termination.
"""
class SignWithInstanceFail(Exception):
"""
Raised when failed signing of message with instance server.
"""
class Signer:
@abc.abstractmethod
def sign_message(self, message):
pass
@abc.abstractmethod
def refresh_signer(self):
pass
@abc.abstractmethod
def batch_sign_message(self, messages_list):
pass
class AccountSigner(Signer):
"""
Simple implementation of a signer that uses a Brownie account to sign messages.
"""
def __init__(self, private_key: HexBytes) -> None:
self.private_key = private_key
def sign_message(self, message):
eth_private_key = eth_keys.keys.PrivateKey(self.private_key)
message_hash_bytes = HexBytes(message)
_, _, _, signed_message_bytes = sign_message_hash(
eth_private_key, message_hash_bytes
)
return signed_message_bytes.hex()
def batch_sign_message(self, messages_list: List[str]):
signed_messages_list = {}
for message in messages_list:
eth_private_key = eth_keys.keys.PrivateKey(self.private_key)
message_hash_bytes = HexBytes(message)
_, _, _, signed_message_bytes = sign_message_hash(
eth_private_key, message_hash_bytes
)
signed_messages_list[message.hex()] = signed_message_bytes.hex()
return signed_messages_list
def create_account_signer(keystore: str, password: str) -> AccountSigner:
with open(keystore) as keystore_file:
keystore_data = json.load(keystore_file)
private_key = Account.decrypt(keystore_data, password)
signer = AccountSigner(private_key)
return signer
class InstanceSigner(Signer):
"""
AWS instance server signer.
"""
def __init__(self, ip: Optional[str] = None) -> None:
self.current_signer_uri = None
if ip is not None:
self.current_signer_uri = (
f"http://{ip}:{MOONSTREAM_AWS_SIGNER_INSTANCE_PORT}/sign"
)
self.current_signer_batch_uri = (
f"http://{ip}:{MOONSTREAM_AWS_SIGNER_INSTANCE_PORT}/batchsign"
)
def clean_signer(self) -> None:
self.current_signer_uri = None
self.current_signer_batch_uri = None
def refresh_signer(self) -> None:
try:
instances = list_signing_instances([])
except AWSDescribeInstancesFail:
raise AWSDescribeInstancesFail("AWS describe instances command failed")
except Exception as err:
logger.error(f"AWS describe instances command failed: {err}")
raise SignWithInstanceFail("AWS describe instances command failed")
if len(instances) != 1:
raise SignWithInstanceFail("Unsupported number of signing instances")
self.current_signer_uri = f"http://{instances[0]['private_ip_address']}:{MOONSTREAM_AWS_SIGNER_INSTANCE_PORT}/sign"
self.current_signer_batch_uri = f"http://{instances[0]['private_ip_address']}:{MOONSTREAM_AWS_SIGNER_INSTANCE_PORT}/batchsign"
def sign_message(self, message: str):
# TODO(kompotkot): What to do if self.current_signer_uri is not None but the signing server went down?
if self.current_signer_uri is None:
self.refresh_signer()
signed_message = ""
try:
resp = requests.post(
self.current_signer_uri,
headers={"Content-Type": "application/json"},
json={"unsigned_data": str(message)},
)
resp.raise_for_status()
body = resp.json()
signed_message = body["signed_data"]
except Exception as err:
logger.error(f"Failed signing of message with instance server, {err}")
raise SignWithInstanceFail("Failed signing of message with instance server")
# Hack as per: https://medium.com/@yaoshiang/ethereums-ecrecover-openzeppelin-s-ecdsa-and-web3-s-sign-8ff8d16595e1
signature = signed_message[2:]
if signature[-2:] == "00":
signature = f"{signature[:-2]}1b"
elif signature[-2:] == "01":
signature = f"{signature[:-2]}1c"
else:
raise SignWithInstanceFail(
f"Unexpected v-value on signed message: {signed_message[-2:]}"
)
return signature
def batch_sign_message(self, messages_list: List[str]):
if self.current_signer_uri is None:
self.refresh_signer()
try:
resp = requests.post(
self.current_signer_batch_uri,
headers={"Content-Type": "application/json"},
json={"unsigned_data": [str(message) for message in messages_list]},
)
resp.raise_for_status()
signed_messages = resp.json()["signed_data"]
except Exception as err:
logger.error(f"Failed signing of message with instance server, {err}")
raise SignWithInstanceFail("Failed signing of message with instance server")
results = {}
# Hack as per: https://medium.com/@yaoshiang/ethereums-ecrecover-openzeppelin-s-ecdsa-and-web3-s-sign-8ff8d16595e1
for unsigned_message, signed_message in signed_messages.items():
signature = signed_message[2:]
if signature[-2:] == "00":
signature = f"{signature[:-2]}1b"
elif signature[-2:] == "01":
signature = f"{signature[:-2]}1c"
else:
raise SignWithInstanceFail(
f"Unexpected v-value on signed message: {signed_message[-2:]}"
)
results[unsigned_message] = signature
return results
DROP_SIGNER: Optional[Signer] = None
if SIGNER_KEYSTORE is not None and SIGNER_PASSWORD is not None:
DROP_SIGNER = create_account_signer(SIGNER_KEYSTORE, SIGNER_PASSWORD)
if DROP_SIGNER is None:
DROP_SIGNER = InstanceSigner(MOONSTREAM_SIGNING_SERVER_IP)
def list_signing_instances(
signing_instances: List[str],
) -> List[Any]:
"""
Return a list of signing instances with IPs.
"""
described_instances = []
try:
described_instances_response = aws_client.describe_instances(
Filters=[
{"Name": "image-id", "Values": [MOONSTREAM_AWS_SIGNER_IMAGE_ID]},
{"Name": "tag:Application", "Values": ["signer"]},
],
InstanceIds=signing_instances,
)
for r in described_instances_response["Reservations"]:
for i in r["Instances"]:
described_instances.append(
{
"instance_id": i["InstanceId"],
"private_ip_address": i["PrivateIpAddress"],
}
)
except Exception as err:
logger.error(f"AWS describe instances command failed: {err}")
raise AWSDescribeInstancesFail("AWS describe instances command failed.")
return described_instances
def wakeup_signing_instances(run_confirmed=False, dry_run=True) -> List[str]:
"""
Run new signing instances.
"""
run_instances = []
if run_confirmed:
try:
run_instances_response = aws_client.run_instances(
LaunchTemplate={
"LaunchTemplateId": MOONSTREAM_AWS_SIGNER_LAUNCH_TEMPLATE_ID
},
MinCount=1,
MaxCount=1,
DryRun=dry_run,
)
for i in run_instances_response["Instances"]:
run_instances.append(i["InstanceId"])
except Exception as err:
logger.error(f"AWS run instances command failed: {err}")
raise AWSRunInstancesFail("AWS run instances command failed")
return run_instances
def sleep_signing_instances(
signing_instances: List[str], termination_confirmed=False, dry_run=True
) -> List[str]:
"""
Fetch, describe, verify signing instances and terminate them.
"""
if len(signing_instances) == 0:
raise SigningInstancesNotFound("There are no signing instances to describe")
described_instances = []
try:
described_instances_response = list_signing_instances(signing_instances)
for i in described_instances_response:
described_instances.append(i["instance_id"])
except Exception as err:
logger.error(f"AWS describe instances command failed: {err}")
raise AWSDescribeInstancesFail("AWS describe instances command failed.")
if len(described_instances) == 0:
raise SigningInstancesNotFound(
"Signing instances with the given ids is not found in at AWS."
)
if len(described_instances) > 1:
raise SigningInstancesTerminationLimitExceeded(
f"Provided {len(described_instances)} instances to termination"
)
terminated_instances = []
if termination_confirmed:
try:
terminated_instances_response = aws_client.terminate_instances(
InstanceIds=described_instances,
DryRun=dry_run,
)
for i in terminated_instances_response["TerminatingInstances"]:
terminated_instances.append(i["InstanceId"])
except Exception as err:
logger.error(
f"Unable to terminate instance {described_instances}, error: {err}"
)
raise AWSTerminateInstancesFail("AWS terminate instances command failed")
return terminated_instances