kopia lustrzana https://github.com/bugout-dev/moonstream
287 wiersze
8.9 KiB
Python
287 wiersze
8.9 KiB
Python
"""
|
|
Login functionality for Moonstream Engine.
|
|
|
|
Login flow relies on an Authorization header passed to Moonstream Engine of the form:
|
|
Authorization: moonstream <base64-encoded JSON>
|
|
|
|
The schema for the JSON object will be as follows:
|
|
{
|
|
"address": "<address of account which signed the message>",
|
|
"deadline": <epoch timestamp after which this header becomes invalid>,
|
|
"signature": "<signed authorization message>"
|
|
}
|
|
|
|
Authorization messages will be generated pursuant to EIP712 using the following parameters:
|
|
Domain separator - name: MoonstreamAuthorization, version: <Engine API version>
|
|
Fields - address ("address" type), deadline: ("uint256" type)
|
|
"""
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import time
|
|
from typing import Any, Dict, Optional, cast
|
|
|
|
import eth_keys
|
|
from eip712.messages import EIP712Message, _hash_eip191_message
|
|
from eth_account import Account
|
|
from eth_account._utils.signing import sign_message_hash
|
|
from eth_typing import ChecksumAddress
|
|
from hexbytes import HexBytes
|
|
from web3 import Web3
|
|
|
|
|
|
class MoonstreamAuthorizationVerificationError(Exception):
|
|
"""
|
|
Raised when invalid signer is provided.
|
|
"""
|
|
|
|
|
|
class MoonstreamAuthorizationExpired(Exception):
|
|
"""
|
|
Raised when signature is expired by time.
|
|
"""
|
|
|
|
|
|
class MoonstreamAuthorizationStructureError(Exception):
|
|
"""
|
|
Raised when signature has incorrect structure.
|
|
"""
|
|
|
|
|
|
class MoonstreamAuthorization(EIP712Message):
|
|
_name_: "string" # type: ignore
|
|
_version_: "string" # type: ignore
|
|
|
|
address: "address" # type: ignore
|
|
deadline: "uint256" # type: ignore
|
|
|
|
|
|
class MetaTXAuthorization(EIP712Message):
|
|
_name_: "string" # type: ignore
|
|
_version_: "string" # type: ignore
|
|
|
|
caller: "address" # type: ignore
|
|
expires_at: "uint256" # type: ignore
|
|
|
|
|
|
EIP712_AUTHORIZATION_TYPES = {
|
|
"MoonstreamAuthorization": {
|
|
"name": "MoonstreamAuthorization",
|
|
"version": "1",
|
|
"eip712_message_class": MoonstreamAuthorization,
|
|
"primary_types": [
|
|
{"name": "address", "type": "address"},
|
|
{"name": "deadline", "type": int},
|
|
],
|
|
},
|
|
"MetaTXAuthorization": {
|
|
"name": "MetaTXAuthorization",
|
|
"version": "1",
|
|
"eip712_message_class": MetaTXAuthorization,
|
|
"primary_types": [
|
|
{"name": "caller", "type": "address"},
|
|
{"name": "expires_at", "type": int},
|
|
],
|
|
},
|
|
}
|
|
|
|
|
|
def sign_message(message_hash_bytes: HexBytes, private_key: HexBytes) -> HexBytes:
|
|
eth_private_key = eth_keys.keys.PrivateKey(private_key)
|
|
_, _, _, signed_message_bytes = sign_message_hash(
|
|
eth_private_key, message_hash_bytes
|
|
)
|
|
return signed_message_bytes
|
|
|
|
|
|
def authorize(
|
|
authorization_type: Dict[str, Any],
|
|
primary_types: Dict[str, Any],
|
|
private_key: HexBytes,
|
|
signature_name_output: str,
|
|
) -> Dict[str, Any]:
|
|
# Initializing instance of EIP712Message class
|
|
attrs: Dict[str, Any] = {
|
|
"_name_": authorization_type["name"],
|
|
"_version_": authorization_type["version"],
|
|
}
|
|
attrs.update(primary_types)
|
|
message = authorization_type["eip712_message_class"](**attrs)
|
|
|
|
# Generating message hash and signature
|
|
msg_hash_bytes = HexBytes(_hash_eip191_message(message.signable_message))
|
|
|
|
signed_message = sign_message(msg_hash_bytes, private_key)
|
|
|
|
api_payload: Dict[str, Any] = {signature_name_output: signed_message.hex()}
|
|
api_payload.update(primary_types)
|
|
|
|
return api_payload
|
|
|
|
|
|
def verify(
|
|
authorization_type: Dict[str, Any],
|
|
authorization_payload: Dict[str, Any],
|
|
signature_name_input: str,
|
|
) -> bool:
|
|
"""
|
|
Verifies provided signature signer by correct address.
|
|
|
|
**Important** Assume that not address field is timefield (live_at, expires_at, deadline, etc)
|
|
"""
|
|
# Initializing instance of EIP712Message class
|
|
attrs: Dict[str, Any] = {
|
|
"_name_": authorization_type["name"],
|
|
"_version_": authorization_type["version"],
|
|
}
|
|
|
|
time_now = int(time.time())
|
|
web3_client = Web3()
|
|
|
|
address: Optional[ChecksumAddress] = None
|
|
time_field: Optional[int] = None
|
|
|
|
for pt in authorization_type["primary_types"]:
|
|
pt_name = pt["name"]
|
|
pt_type = pt["type"]
|
|
if pt_type == "address":
|
|
address = Web3.toChecksumAddress(cast(str, authorization_payload[pt_name]))
|
|
attrs[pt_name] = address
|
|
else:
|
|
time_field = cast(pt_type, authorization_payload[pt_name])
|
|
attrs[pt_name] = time_field
|
|
if address is None or time_field is None:
|
|
raise MoonstreamAuthorizationStructureError(
|
|
"Field address or time_field could not be None"
|
|
)
|
|
|
|
message = authorization_type["eip712_message_class"](**attrs)
|
|
signature = cast(str, authorization_payload[signature_name_input])
|
|
signer_address = web3_client.eth.account.recover_message(
|
|
message.signable_message, signature=signature
|
|
)
|
|
if signer_address != address:
|
|
raise MoonstreamAuthorizationVerificationError("Invalid signer")
|
|
|
|
if time_field < time_now:
|
|
raise MoonstreamAuthorizationExpired("Time field exceeded")
|
|
|
|
return True
|
|
|
|
|
|
def decrypt_keystore(keystore_path: str, password: str) -> HexBytes:
|
|
with open(keystore_path) as keystore_file:
|
|
keystore_data = json.load(keystore_file)
|
|
return keystore_data["address"], Account.decrypt(keystore_data, password)
|
|
|
|
|
|
def handle_authorize(args: argparse.Namespace) -> None:
|
|
if args.authorization_type not in EIP712_AUTHORIZATION_TYPES:
|
|
raise Exception("Provided unsupported EIP712 Authorization type")
|
|
|
|
authorization_type = EIP712_AUTHORIZATION_TYPES[args.authorization_type]
|
|
primary_types = json.loads(args.primary_types)
|
|
for ptk in authorization_type["primary_types"]:
|
|
if ptk["name"] not in primary_types:
|
|
raise Exception(f"Lost primary type: {ptk}")
|
|
|
|
_, private_key = decrypt_keystore(args.signer, args.password)
|
|
authorization = authorize(
|
|
authorization_type=authorization_type,
|
|
primary_types=primary_types,
|
|
private_key=private_key,
|
|
signature_name_output=args.signature_name_output,
|
|
)
|
|
print(json.dumps(authorization))
|
|
|
|
|
|
def handle_verify(args: argparse.Namespace) -> None:
|
|
if args.authorization_type not in EIP712_AUTHORIZATION_TYPES:
|
|
raise Exception("Provided unsupported EIP712 Authorization type")
|
|
|
|
authorization_type = EIP712_AUTHORIZATION_TYPES[args.authorization_type]
|
|
payload_json = base64.decodebytes(args.payload).decode("utf-8")
|
|
payload = json.loads(payload_json)
|
|
verify(
|
|
authorization_type=authorization_type,
|
|
authorization_payload=payload,
|
|
signature_name_input=args.signature_name_input,
|
|
)
|
|
print("Verified!")
|
|
|
|
|
|
def generate_cli() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="Moonstream Engine authorization module"
|
|
)
|
|
subcommands = parser.add_subparsers()
|
|
|
|
authorize_parser = subcommands.add_parser("authorize")
|
|
authorize_parser.add_argument(
|
|
"-s",
|
|
"--signer",
|
|
required=True,
|
|
help="Path to signer keyfile (or brownie account name).",
|
|
)
|
|
authorize_parser.add_argument(
|
|
"-p",
|
|
"--password",
|
|
required=False,
|
|
help="(Optional) password for signing account. If you don't provide it here, you will be prompte for it.",
|
|
)
|
|
authorize_parser.add_argument(
|
|
"-t",
|
|
"--authorization-type",
|
|
required=True,
|
|
choices=[k for k in EIP712_AUTHORIZATION_TYPES.keys()],
|
|
help="One of supported EIP712 Message authorization types",
|
|
)
|
|
authorize_parser.add_argument(
|
|
"--primary-types",
|
|
required=True,
|
|
help="Primary types for specified EIP712 Message authorization in JSON format {0}. Available keys: {1}".format(
|
|
{"name_1": "value", "name_2": "value"},
|
|
[
|
|
f"{v['primary_types']} for {k}"
|
|
for k, v in EIP712_AUTHORIZATION_TYPES.items()
|
|
],
|
|
),
|
|
)
|
|
authorize_parser.add_argument(
|
|
"--signature-name-output",
|
|
type=str,
|
|
default="signed_message",
|
|
help="Key in output dictionary of signature",
|
|
)
|
|
authorize_parser.set_defaults(func=handle_authorize)
|
|
|
|
verify_parser = subcommands.add_parser("verify")
|
|
verify_parser.add_argument(
|
|
"--payload",
|
|
type=lambda s: s.encode(),
|
|
required=True,
|
|
help="Base64-encoded payload to verify",
|
|
)
|
|
verify_parser.add_argument(
|
|
"-t",
|
|
"--authorization-type",
|
|
required=True,
|
|
choices=[k for k in EIP712_AUTHORIZATION_TYPES.keys()],
|
|
help="One of supported EIP712 Message authorization types",
|
|
)
|
|
verify_parser.add_argument(
|
|
"--signature-name-input",
|
|
type=str,
|
|
default="signed_message",
|
|
help="Key for signature in payload",
|
|
)
|
|
verify_parser.set_defaults(func=handle_verify)
|
|
|
|
return parser
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = generate_cli()
|
|
args = parser.parse_args()
|
|
args.func(args)
|