kopia lustrzana https://github.com/bugout-dev/moonstream
commit
439d019164
|
@ -12,7 +12,7 @@ h11==0.12.0
|
|||
idna==3.2
|
||||
jmespath==0.10.0
|
||||
humbug==0.2.7
|
||||
-e git+https://git@github.com/bugout-dev/moonstream.git@94135b054cabb9dc11b0a2406431619279979469#egg=moonstreamdb&subdirectory=db
|
||||
-e git+https://git@github.com/bugout-dev/moonstream.git@a4fff6498f66789934d4af26fd42a8cfb6e5eed5#egg=moonstreamdb&subdirectory=db
|
||||
mypy==0.910
|
||||
mypy-extensions==0.4.3
|
||||
pathspec==0.9.0
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
.venv/
|
||||
.mooncrawl/
|
|
@ -22,7 +22,7 @@ from .ethereum import (
|
|||
trending,
|
||||
)
|
||||
from .publish import publish_json
|
||||
from .settings import MOONSTREAM_CRAWL_WORKERS
|
||||
from .settings import MOONSTREAM_CRAWL_WORKERS, MOONSTREAM_IPC_PATH
|
||||
from .version import MOONCRAWL_VERSION
|
||||
|
||||
|
||||
|
@ -92,7 +92,7 @@ def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None:
|
|||
while True:
|
||||
bottom_block_number, top_block_number = get_latest_blocks(args.confirmations)
|
||||
if bottom_block_number is None:
|
||||
raise ValueError("Variable bottom_block_number can't be None")
|
||||
bottom_block_number = 0
|
||||
bottom_block_number = max(bottom_block_number + 1, starting_block)
|
||||
if bottom_block_number >= top_block_number:
|
||||
print(
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
from concurrent.futures import Future, ProcessPoolExecutor, wait
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from os import close
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from sqlalchemy import desc, Column
|
||||
|
|
|
@ -0,0 +1,226 @@
|
|||
"""
|
||||
A command line tool to crawl information about NFTs from various sources.
|
||||
"""
|
||||
import argparse
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Any, Dict, cast
|
||||
|
||||
import dateutil.parser
|
||||
from moonstreamdb.db import yield_db_session_ctx
|
||||
from moonstreamdb.models import EthereumBlock
|
||||
from sqlalchemy.orm.session import Session
|
||||
from web3 import Web3
|
||||
|
||||
from ..ethereum import connect
|
||||
from .ethereum import summary as ethereum_summary, add_labels
|
||||
from ..publish import publish_json
|
||||
from ..settings import MOONSTREAM_IPC_PATH
|
||||
from ..version import MOONCRAWL_VERSION
|
||||
|
||||
|
||||
def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3:
|
||||
"""
|
||||
Returns a web3 client either by parsing "--web3" argument on the given arguments or by looking up
|
||||
the MOONSTREAM_IPC_PATH environment variable.
|
||||
"""
|
||||
web3_connection_string = MOONSTREAM_IPC_PATH
|
||||
args_web3 = vars(args).get("web3")
|
||||
if args_web3 is not None:
|
||||
web3_connection_string = cast(str, args_web3)
|
||||
if web3_connection_string is None:
|
||||
raise ValueError(
|
||||
"Could not find Web3 connection information in arguments or in MOONSTREAM_IPC_PATH environment variable"
|
||||
)
|
||||
return connect(web3_connection_string)
|
||||
|
||||
|
||||
def get_latest_block_from_db(db_session: Session):
|
||||
return (
|
||||
db_session.query(EthereumBlock)
|
||||
.order_by(EthereumBlock.timestamp.desc())
|
||||
.limit(1)
|
||||
.one()
|
||||
)
|
||||
|
||||
|
||||
def ethereum_sync_handler(args: argparse.Namespace) -> None:
|
||||
web3_client = web3_client_from_cli_or_env(args)
|
||||
humbug_token = os.environ.get("MOONSTREAM_HUMBUG_TOKEN")
|
||||
if humbug_token is None:
|
||||
raise ValueError("MOONSTREAM_HUMBUG_TOKEN env variable is not set")
|
||||
|
||||
with yield_db_session_ctx() as db_session:
|
||||
start = args.start
|
||||
if start is None:
|
||||
time_now = datetime.now(timezone.utc)
|
||||
week_ago = time_now - timedelta(weeks=1)
|
||||
start = (
|
||||
db_session.query(EthereumBlock)
|
||||
.filter(EthereumBlock.timestamp >= week_ago.timestamp())
|
||||
.order_by(EthereumBlock.timestamp.asc())
|
||||
.limit(1)
|
||||
.one()
|
||||
).block_number
|
||||
|
||||
latest_block = get_latest_block_from_db(db_session)
|
||||
end = latest_block.block_number
|
||||
assert (
|
||||
start <= end
|
||||
), f"Start block {start} is greater than latest_block {end} in db"
|
||||
|
||||
while True:
|
||||
print(f"Labeling blocks {start}-{end}")
|
||||
add_labels(web3_client, db_session, start, end)
|
||||
|
||||
end_time = datetime.fromtimestamp(latest_block.timestamp, timezone.utc)
|
||||
print(f"Creating summary with endtime={end_time}")
|
||||
result = ethereum_summary(db_session, end_time)
|
||||
push_summary(result, end, humbug_token)
|
||||
|
||||
sleep_time = 60 * 60
|
||||
print(f"Going to sleep for:{sleep_time}s")
|
||||
time.sleep(sleep_time)
|
||||
|
||||
start = end + 1
|
||||
latest_block = get_latest_block_from_db(db_session)
|
||||
end = latest_block.block_number
|
||||
|
||||
|
||||
def ethereum_label_handler(args: argparse.Namespace) -> None:
|
||||
web3_client = web3_client_from_cli_or_env(args)
|
||||
with yield_db_session_ctx() as db_session:
|
||||
add_labels(web3_client, db_session, args.start, args.end, args.address)
|
||||
|
||||
|
||||
def push_summary(result: Dict[str, Any], end_block_no: int, humbug_token: str):
|
||||
|
||||
title = f"NFT activity on the Ethereum blockchain: end time: {result['crawled_at'] } (block {end_block_no})"
|
||||
publish_json(
|
||||
"nft_ethereum",
|
||||
humbug_token,
|
||||
title,
|
||||
result,
|
||||
tags=[f"crawler_version:{MOONCRAWL_VERSION}", f"end_block:{end_block_no}"],
|
||||
wait=False,
|
||||
)
|
||||
|
||||
|
||||
def ethereum_summary_handler(args: argparse.Namespace) -> None:
|
||||
with yield_db_session_ctx() as db_session:
|
||||
result = ethereum_summary(db_session, args.end)
|
||||
|
||||
# humbug_token = args.humbug
|
||||
# if humbug_token is None:
|
||||
# humbug_token = os.environ.get("MOONSTREAM_HUMBUG_TOKEN")
|
||||
# if humbug_token:
|
||||
# title = f"NFT activity on the Ethereum blockchain: {start_time} (block {start_block}) to {end_time} (block {end_block})"
|
||||
# publish_json(
|
||||
# "nft_ethereum",
|
||||
# humbug_token,
|
||||
# title,
|
||||
# result,
|
||||
# tags=[f"crawler_version:{MOONCRAWL_VERSION}"],
|
||||
# wait=False,
|
||||
# )
|
||||
with args.outfile as ofp:
|
||||
json.dump(result, ofp)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
time_now = datetime.now(timezone.utc)
|
||||
|
||||
parser = argparse.ArgumentParser(description="Moonstream NFT crawlers")
|
||||
parser.set_defaults(func=lambda _: parser.print_help())
|
||||
subcommands = parser.add_subparsers(description="Subcommands")
|
||||
|
||||
parser_ethereum = subcommands.add_parser(
|
||||
"ethereum",
|
||||
description="Collect information about NFTs from Ethereum blockchains",
|
||||
)
|
||||
parser_ethereum.set_defaults(func=lambda _: parser_ethereum.print_help())
|
||||
subparsers_ethereum = parser_ethereum.add_subparsers()
|
||||
|
||||
parser_ethereum_label = subparsers_ethereum.add_parser(
|
||||
"label",
|
||||
description="Label addresses and transactions in databse using crawled NFT transfer information",
|
||||
)
|
||||
parser_ethereum_label.add_argument(
|
||||
"-s",
|
||||
"--start",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Starting block number (inclusive if block available)",
|
||||
)
|
||||
parser_ethereum_label.add_argument(
|
||||
"-e",
|
||||
"--end",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Ending block number (inclusive if block available)",
|
||||
)
|
||||
parser_ethereum_label.add_argument(
|
||||
"-a",
|
||||
"--address",
|
||||
type=str,
|
||||
default=None,
|
||||
help="(Optional) NFT contract address that you want to limit the crawl to, e.g. 0x06012c8cf97BEaD5deAe237070F9587f8E7A266d for CryptoKitties.",
|
||||
)
|
||||
parser_ethereum_label.add_argument(
|
||||
"--web3",
|
||||
type=str,
|
||||
default=None,
|
||||
help="(Optional) Web3 connection string. If not provided, uses the value specified by MOONSTREAM_IPC_PATH environment variable.",
|
||||
)
|
||||
parser_ethereum_label.set_defaults(func=ethereum_label_handler)
|
||||
|
||||
parser_ethereum_summary = subparsers_ethereum.add_parser(
|
||||
"summary", description="Generate Ethereum NFT summary"
|
||||
)
|
||||
parser_ethereum_summary.add_argument(
|
||||
"-e",
|
||||
"--end",
|
||||
type=dateutil.parser.parse,
|
||||
default=time_now.isoformat(),
|
||||
help=f"End time for window to calculate NFT statistics (default: {time_now.isoformat()})",
|
||||
)
|
||||
parser_ethereum_summary.add_argument(
|
||||
"--humbug",
|
||||
default=None,
|
||||
help=(
|
||||
"If you would like to write this data to a Moonstream journal, please provide a Humbug "
|
||||
"token for that here. (This argument overrides any value set in the "
|
||||
"MOONSTREAM_HUMBUG_TOKEN environment variable)"
|
||||
),
|
||||
)
|
||||
parser_ethereum_summary.add_argument(
|
||||
"-o",
|
||||
"--outfile",
|
||||
type=argparse.FileType("w"),
|
||||
default=sys.stdout,
|
||||
help="Optional file to write output to. By default, prints to stdout.",
|
||||
)
|
||||
parser_ethereum_summary.set_defaults(func=ethereum_summary_handler)
|
||||
|
||||
parser_ethereum_sync = subparsers_ethereum.add_parser(
|
||||
"sync",
|
||||
description="Label addresses and transactions in databse using crawled NFT transfer information, sync mode",
|
||||
)
|
||||
parser_ethereum_sync.add_argument(
|
||||
"-s",
|
||||
"--start",
|
||||
type=int,
|
||||
required=False,
|
||||
help="Starting block number (inclusive if block available)",
|
||||
)
|
||||
parser_ethereum_sync.set_defaults(func=ethereum_sync_handler)
|
||||
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,659 @@
|
|||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timedelta
|
||||
import json
|
||||
import logging
|
||||
from hexbytes.main import HexBytes
|
||||
from typing import Any, cast, Dict, List, Optional, Set, Tuple
|
||||
|
||||
from eth_typing.encoding import HexStr
|
||||
from moonstreamdb.models import (
|
||||
EthereumAddress,
|
||||
EthereumBlock,
|
||||
EthereumLabel,
|
||||
EthereumTransaction,
|
||||
)
|
||||
from sqlalchemy import and_, func, text
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
from sqlalchemy.orm import Session, Query
|
||||
from tqdm import tqdm
|
||||
from web3 import Web3
|
||||
from web3.types import FilterParams, LogReceipt
|
||||
from web3._utils.events import get_event_data
|
||||
|
||||
# Default length (in blocks) of an Ethereum NFT crawl.
|
||||
DEFAULT_CRAWL_LENGTH = 100
|
||||
|
||||
NFT_LABEL = "erc721"
|
||||
MINT_LABEL = "nft_mint"
|
||||
TRANSFER_LABEL = "nft_transfer"
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
# Summary keys
|
||||
SUMMARY_KEY_BLOCKS = "blocks"
|
||||
SUMMARY_KEY_NUM_TRANSACTIONS = "num_transactions"
|
||||
SUMMARY_KEY_TOTAL_VALUE = "total_value"
|
||||
SUMMARY_KEY_NFT_TRANSFERS = "nft_transfers"
|
||||
SUMMARY_KEY_NFT_TRANSFER_VALUE = "nft_transfer_value"
|
||||
SUMMARY_KEY_NFT_MINTS = "nft_mints"
|
||||
SUMMARY_KEY_NFT_PURCHASERS = "nft_owners"
|
||||
SUMMARY_KEY_NFT_MINTERS = "nft_minters"
|
||||
|
||||
SUMMARY_KEYS = [
|
||||
SUMMARY_KEY_BLOCKS,
|
||||
SUMMARY_KEY_NUM_TRANSACTIONS,
|
||||
SUMMARY_KEY_TOTAL_VALUE,
|
||||
SUMMARY_KEY_NFT_TRANSFERS,
|
||||
SUMMARY_KEY_NFT_TRANSFER_VALUE,
|
||||
SUMMARY_KEY_NFT_MINTS,
|
||||
SUMMARY_KEY_NFT_PURCHASERS,
|
||||
SUMMARY_KEY_NFT_MINTERS,
|
||||
]
|
||||
|
||||
|
||||
# First abi is for old NFT's like crypto kitties
|
||||
# The erc721 standart requieres that Transfer event is indexed for all arguments
|
||||
# That is how we get distinguished from erc20 transfer events
|
||||
erc721_transfer_event_abis = [
|
||||
{
|
||||
"anonymous": False,
|
||||
"inputs": [
|
||||
{"indexed": False, "name": "from", "type": "address"},
|
||||
{"indexed": False, "name": "to", "type": "address"},
|
||||
{"indexed": False, "name": "tokenId", "type": "uint256"},
|
||||
],
|
||||
"name": "Transfer",
|
||||
"type": "event",
|
||||
},
|
||||
{
|
||||
"anonymous": False,
|
||||
"inputs": [
|
||||
{"indexed": True, "name": "from", "type": "address"},
|
||||
{"indexed": True, "name": "to", "type": "address"},
|
||||
{"indexed": True, "name": "tokenId", "type": "uint256"},
|
||||
],
|
||||
"name": "Transfer",
|
||||
"type": "event",
|
||||
},
|
||||
]
|
||||
|
||||
erc721_functions_abi = [
|
||||
{
|
||||
"inputs": [{"internalType": "address", "name": "owner", "type": "address"}],
|
||||
"name": "balanceOf",
|
||||
"outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}],
|
||||
"payable": False,
|
||||
"stateMutability": "view",
|
||||
"type": "function",
|
||||
"constant": True,
|
||||
},
|
||||
{
|
||||
"inputs": [],
|
||||
"name": "name",
|
||||
"outputs": [{"internalType": "string", "name": "", "type": "string"}],
|
||||
"stateMutability": "view",
|
||||
"type": "function",
|
||||
"constant": True,
|
||||
},
|
||||
{
|
||||
"inputs": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}],
|
||||
"name": "ownerOf",
|
||||
"outputs": [{"internalType": "address", "name": "", "type": "address"}],
|
||||
"payable": False,
|
||||
"stateMutability": "view",
|
||||
"type": "function",
|
||||
"constant": True,
|
||||
},
|
||||
{
|
||||
"inputs": [],
|
||||
"name": "symbol",
|
||||
"outputs": [{"internalType": "string", "name": "", "type": "string"}],
|
||||
"stateMutability": "view",
|
||||
"type": "function",
|
||||
"constant": True,
|
||||
},
|
||||
{
|
||||
"inputs": [],
|
||||
"name": "totalSupply",
|
||||
"outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}],
|
||||
"stateMutability": "view",
|
||||
"type": "function",
|
||||
"constant": True,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class NFTContract:
|
||||
address: str
|
||||
name: Optional[str] = None
|
||||
symbol: Optional[str] = None
|
||||
total_supply: Optional[str] = None
|
||||
|
||||
|
||||
def get_erc721_contract_info(w3: Web3, address: str) -> NFTContract:
|
||||
contract = w3.eth.contract(
|
||||
address=w3.toChecksumAddress(address), abi=erc721_functions_abi
|
||||
)
|
||||
name: Optional[str] = None
|
||||
try:
|
||||
name = contract.functions.name().call()
|
||||
except:
|
||||
logger.error(f"Could not get name for potential NFT contract: {address}")
|
||||
|
||||
symbol: Optional[str] = None
|
||||
try:
|
||||
symbol = contract.functions.symbol().call()
|
||||
except:
|
||||
logger.error(f"Could not get symbol for potential NFT contract: {address}")
|
||||
|
||||
totalSupply: Optional[str] = None
|
||||
try:
|
||||
totalSupply = contract.functions.totalSupply().call()
|
||||
except:
|
||||
logger.error(f"Could not get totalSupply for potential NFT contract: {address}")
|
||||
|
||||
return NFTContract(
|
||||
address=address, name=name, symbol=symbol, total_supply=totalSupply
|
||||
)
|
||||
|
||||
|
||||
# SHA3 hash of the string "Transfer(address,address,uint256)"
|
||||
TRANSFER_EVENT_SIGNATURE = HexBytes(
|
||||
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class NFTTransferRaw:
|
||||
contract_address: str
|
||||
transfer_from: str
|
||||
transfer_to: str
|
||||
tokenId: int
|
||||
transfer_tx: HexBytes
|
||||
|
||||
|
||||
@dataclass
|
||||
class NFTTransfer:
|
||||
contract_address: str
|
||||
transfer_from: str
|
||||
transfer_to: str
|
||||
tokenId: int
|
||||
transfer_tx: str
|
||||
value: Optional[int] = None
|
||||
is_mint: bool = False
|
||||
|
||||
|
||||
def get_value_by_tx(w3: Web3, tx_hash: HexBytes):
|
||||
print(f"Trying to get tx: {tx_hash.hex()}")
|
||||
tx = w3.eth.get_transaction(tx_hash)
|
||||
print("got it")
|
||||
return tx["value"]
|
||||
|
||||
|
||||
def decode_nft_transfer_data(w3: Web3, log: LogReceipt) -> Optional[NFTTransferRaw]:
|
||||
for abi in erc721_transfer_event_abis:
|
||||
try:
|
||||
transfer_data = get_event_data(w3.codec, abi, log)
|
||||
nft_transfer = NFTTransferRaw(
|
||||
contract_address=transfer_data["address"],
|
||||
transfer_from=transfer_data["args"]["from"],
|
||||
transfer_to=transfer_data["args"]["to"],
|
||||
tokenId=transfer_data["args"]["tokenId"],
|
||||
transfer_tx=transfer_data["transactionHash"],
|
||||
)
|
||||
return nft_transfer
|
||||
except:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def get_nft_transfers(
|
||||
w3: Web3,
|
||||
from_block: Optional[int] = None,
|
||||
to_block: Optional[int] = None,
|
||||
contract_address: Optional[str] = None,
|
||||
) -> List[NFTTransfer]:
|
||||
filter_params = FilterParams(topics=[cast(HexStr, TRANSFER_EVENT_SIGNATURE.hex())])
|
||||
|
||||
if from_block is not None:
|
||||
filter_params["fromBlock"] = from_block
|
||||
|
||||
if to_block is not None:
|
||||
filter_params["toBlock"] = to_block
|
||||
if contract_address is not None:
|
||||
filter_params["address"] = w3.toChecksumAddress(contract_address)
|
||||
|
||||
logs = w3.eth.get_logs(filter_params)
|
||||
|
||||
nft_transfers: List[NFTTransfer] = []
|
||||
for log in tqdm(logs, desc=f"Processing logs for blocks {from_block}-{to_block}"):
|
||||
nft_transfer = decode_nft_transfer_data(w3, log)
|
||||
if nft_transfer is not None:
|
||||
kwargs = {
|
||||
**asdict(nft_transfer),
|
||||
"transfer_tx": nft_transfer.transfer_tx.hex(),
|
||||
"is_mint": nft_transfer.transfer_from
|
||||
== "0x0000000000000000000000000000000000000000",
|
||||
}
|
||||
|
||||
parsed_transfer = NFTTransfer(**kwargs) # type: ignore
|
||||
nft_transfers.append(parsed_transfer)
|
||||
return nft_transfers
|
||||
|
||||
|
||||
def get_block_bounds(
|
||||
w3: Web3, from_block: Optional[int] = None, to_block: Optional[int] = None
|
||||
) -> Tuple[int, int]:
|
||||
"""
|
||||
Returns starting and ending blocks for an "nft ethereum" crawl subject to the following rules:
|
||||
1. Neither start nor end can be None.
|
||||
2. If both from_block and to_block are None, then start = end - DEFAULT_CRAWL_LENGTH + 1
|
||||
"""
|
||||
end = to_block
|
||||
if end is None:
|
||||
end = w3.eth.get_block_number()
|
||||
|
||||
start = from_block
|
||||
if start is None:
|
||||
start = end - DEFAULT_CRAWL_LENGTH + 1
|
||||
|
||||
return start, end
|
||||
|
||||
|
||||
def ensure_addresses(db_session: Session, addresses: Set[str]) -> Dict[str, int]:
|
||||
"""
|
||||
Ensures that the given addresses are registered in the ethereum_addresses table of the given
|
||||
moonstreamdb database connection. Returns a mapping from the addresses to the ids of their
|
||||
corresponding row in the ethereum_addresses table.
|
||||
|
||||
Returns address_ids for *every* address, not just the new ones.
|
||||
"""
|
||||
if len(addresses) == 0:
|
||||
return {}
|
||||
|
||||
# SQLAlchemy reference:
|
||||
# https://docs.sqlalchemy.org/en/14/orm/persistence_techniques.html#using-postgresql-on-conflict-with-returning-to-return-upserted-orm-objects
|
||||
stmt = (
|
||||
insert(EthereumAddress)
|
||||
.values([{"address": address} for address in addresses])
|
||||
.on_conflict_do_nothing(index_elements=[EthereumAddress.address])
|
||||
)
|
||||
|
||||
try:
|
||||
db_session.execute(stmt)
|
||||
db_session.commit()
|
||||
except Exception:
|
||||
db_session.rollback()
|
||||
raise
|
||||
|
||||
rows = (
|
||||
db_session.query(EthereumAddress)
|
||||
.filter(EthereumAddress.address.in_(addresses))
|
||||
.all()
|
||||
)
|
||||
address_ids = {address.address: address.id for address in rows}
|
||||
return address_ids
|
||||
|
||||
|
||||
def label_erc721_addresses(
|
||||
w3: Web3, db_session: Session, address_ids: List[Tuple[str, int]]
|
||||
) -> None:
|
||||
labels: List[EthereumLabel] = []
|
||||
for address, id in address_ids:
|
||||
try:
|
||||
contract_info = get_erc721_contract_info(w3, address)
|
||||
labels.append(
|
||||
EthereumLabel(
|
||||
address_id=id,
|
||||
label=NFT_LABEL,
|
||||
label_data={
|
||||
"name": contract_info.name,
|
||||
"symbol": contract_info.symbol,
|
||||
"totalSupply": contract_info.total_supply,
|
||||
},
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get metadata of contract {address}")
|
||||
logger.error(e)
|
||||
try:
|
||||
db_session.bulk_save_objects(labels)
|
||||
db_session.commit()
|
||||
except Exception as e:
|
||||
db_session.rollback()
|
||||
logger.error(f"Failed to save labels to db:\n{e}")
|
||||
|
||||
|
||||
def label_key(label: EthereumLabel) -> Tuple[str, int, int, str, str]:
|
||||
return (
|
||||
label.transaction_hash,
|
||||
label.address_id,
|
||||
label.label_data["tokenId"],
|
||||
label.label_data["from"],
|
||||
label.label_data["to"],
|
||||
)
|
||||
|
||||
|
||||
def label_transfers(
|
||||
db_session: Session, transfers: List[NFTTransfer], address_ids: Dict[str, int]
|
||||
) -> None:
|
||||
"""
|
||||
Adds "nft_mint" or "nft_transfer" to the (transaction, address) pair represented by each of the
|
||||
given NFTTransfer objects.
|
||||
"""
|
||||
transaction_hashes: List[str] = []
|
||||
labels: List[EthereumLabel] = []
|
||||
for transfer in transfers:
|
||||
transaction_hash = transfer.transfer_tx
|
||||
transaction_hashes.append(transaction_hash)
|
||||
address_id = address_ids.get(transfer.contract_address)
|
||||
label = MINT_LABEL if transfer.is_mint else TRANSFER_LABEL
|
||||
row = EthereumLabel(
|
||||
address_id=address_id,
|
||||
transaction_hash=transaction_hash,
|
||||
label=label,
|
||||
label_data={
|
||||
"tokenId": transfer.tokenId,
|
||||
"from": transfer.transfer_from,
|
||||
"to": transfer.transfer_to,
|
||||
},
|
||||
)
|
||||
labels.append(row)
|
||||
|
||||
existing_labels = (
|
||||
db_session.query(EthereumLabel)
|
||||
.filter(EthereumLabel.address_id.in_(address_ids.values()))
|
||||
.filter(EthereumLabel.transaction_hash.in_(transaction_hashes))
|
||||
).all()
|
||||
existing_label_keys = {label_key(label) for label in existing_labels}
|
||||
|
||||
new_labels = [
|
||||
label for label in labels if label_key(label) not in existing_label_keys
|
||||
]
|
||||
|
||||
try:
|
||||
db_session.bulk_save_objects(new_labels)
|
||||
db_session.commit()
|
||||
except Exception as e:
|
||||
db_session.rollback()
|
||||
logger.error("Could not write transfer/mint labels to database")
|
||||
logger.error(e)
|
||||
|
||||
|
||||
def add_labels(
|
||||
w3: Web3,
|
||||
db_session: Session,
|
||||
from_block: Optional[int] = None,
|
||||
to_block: Optional[int] = None,
|
||||
contract_address: Optional[str] = None,
|
||||
batch_size: int = 100,
|
||||
) -> None:
|
||||
"""
|
||||
Crawls blocks between from_block and to_block checking for NFT mints and transfers.
|
||||
|
||||
For each mint/transfer, if the contract address involved in the operation has not already been
|
||||
added to the ethereum_addresses table, this method adds it and labels the address with the NFT
|
||||
collection metadata.
|
||||
|
||||
It also adds mint/transfer labels to each (transaction, contract address) pair describing the
|
||||
NFT operation they represent.
|
||||
|
||||
## NFT collection metadata labels
|
||||
|
||||
Label has type "erc721".
|
||||
|
||||
Label data:
|
||||
{
|
||||
"name": "<name of contract>",
|
||||
"symbol": "<symbol of contract>",
|
||||
"totalSupply": "<totalSupply of contract>"
|
||||
}
|
||||
|
||||
## Mint and transfer labels
|
||||
Adds labels to the database for each transaction that involved an NFT transfer. Adds the contract
|
||||
address in the address_id column of ethereum_labels.
|
||||
|
||||
|
||||
Labels (transaction, contract address) pair as:
|
||||
- "nft_mint" if the transaction minted a token on the NFT contract
|
||||
- "nft_transfer" if the transaction transferred a token on the NFT contract
|
||||
|
||||
Label data will always be of the form:
|
||||
{
|
||||
"tokenId": "<ID of token minted/transferred on NFT contract>",
|
||||
"from": "<previous owner address>",
|
||||
"to": "<new owner address>"
|
||||
}
|
||||
|
||||
Arguments:
|
||||
- w3: Web3 client
|
||||
- db_session: Connection to Postgres database with moonstreamdb schema
|
||||
- from_block and to_block: Blocks to crawl
|
||||
- address: Optional contract address representing an NFT collection to restrict the crawl to
|
||||
- batch_size: Number of mint/transfer transactions to label at a time (per database transaction)
|
||||
"""
|
||||
assert batch_size > 0, f"Batch size must be positive (received {batch_size})"
|
||||
|
||||
start, end = get_block_bounds(w3, from_block, to_block)
|
||||
|
||||
batch_start = start
|
||||
batch_end = min(start + batch_size - 1, end)
|
||||
|
||||
address_ids: Dict[str, int] = {}
|
||||
|
||||
pbar = tqdm(total=(end - start + 1))
|
||||
pbar.set_description(f"Labeling blocks {start}-{end}")
|
||||
while batch_start <= batch_end:
|
||||
job = get_nft_transfers(
|
||||
w3,
|
||||
from_block=batch_start,
|
||||
to_block=batch_end,
|
||||
contract_address=contract_address,
|
||||
)
|
||||
contract_addresses = {transfer.contract_address for transfer in job}
|
||||
updated_address_ids = ensure_addresses(db_session, contract_addresses)
|
||||
for address, address_id in updated_address_ids.items():
|
||||
address_ids[address] = address_id
|
||||
|
||||
labelled_address_ids = [
|
||||
label.address_id
|
||||
for label in (
|
||||
db_session.query(EthereumLabel)
|
||||
.filter(EthereumLabel.label == NFT_LABEL)
|
||||
.filter(EthereumLabel.address_id.in_(address_ids.values()))
|
||||
.all()
|
||||
)
|
||||
]
|
||||
unlabelled_address_ids = [
|
||||
(address, address_id)
|
||||
for address, address_id in address_ids.items()
|
||||
if address_id not in labelled_address_ids
|
||||
]
|
||||
|
||||
# Add 'erc721' labels
|
||||
label_erc721_addresses(w3, db_session, unlabelled_address_ids)
|
||||
|
||||
# Add mint/transfer labels to (transaction, contract_address) pairs
|
||||
label_transfers(db_session, job, updated_address_ids)
|
||||
|
||||
# Update batch at end of iteration
|
||||
pbar.update(batch_end - batch_start + 1)
|
||||
batch_start = batch_end + 1
|
||||
batch_end = min(batch_end + batch_size, end)
|
||||
pbar.close()
|
||||
|
||||
|
||||
def time_bounded_summary(
|
||||
db_session: Session,
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Produces a summary of Ethereum NFT activity between the given start_time and end_time (inclusive).
|
||||
"""
|
||||
start_timestamp = int(start_time.timestamp())
|
||||
end_timestamp = int(end_time.timestamp())
|
||||
|
||||
time_filter = and_(
|
||||
EthereumBlock.timestamp >= start_timestamp,
|
||||
EthereumBlock.timestamp <= end_timestamp,
|
||||
)
|
||||
|
||||
transactions_query = (
|
||||
db_session.query(EthereumTransaction)
|
||||
.join(
|
||||
EthereumBlock,
|
||||
EthereumTransaction.block_number == EthereumBlock.block_number,
|
||||
)
|
||||
.filter(time_filter)
|
||||
)
|
||||
|
||||
def nft_query(label: str) -> Query:
|
||||
query = (
|
||||
db_session.query(
|
||||
EthereumLabel.label,
|
||||
EthereumLabel.label_data,
|
||||
EthereumLabel.address_id,
|
||||
EthereumTransaction.hash,
|
||||
EthereumTransaction.value,
|
||||
EthereumBlock.block_number,
|
||||
EthereumBlock.timestamp,
|
||||
)
|
||||
.join(
|
||||
EthereumTransaction,
|
||||
EthereumLabel.transaction_hash == EthereumTransaction.hash,
|
||||
)
|
||||
.join(
|
||||
EthereumBlock,
|
||||
EthereumTransaction.block_number == EthereumBlock.block_number,
|
||||
)
|
||||
.filter(time_filter)
|
||||
.filter(EthereumLabel.label == label)
|
||||
)
|
||||
return query
|
||||
|
||||
transfer_query = nft_query(TRANSFER_LABEL)
|
||||
mint_query = nft_query(MINT_LABEL)
|
||||
|
||||
def holder_query(label: str) -> Query:
|
||||
query = (
|
||||
db_session.query(
|
||||
EthereumLabel.address_id.label("address_id"),
|
||||
EthereumLabel.label_data["to"].astext.label("owner_address"),
|
||||
EthereumLabel.label_data["tokenId"].astext.label("token_id"),
|
||||
EthereumTransaction.block_number.label("block_number"),
|
||||
EthereumTransaction.transaction_index.label("transaction_index"),
|
||||
EthereumTransaction.value.label("transfer_value"),
|
||||
)
|
||||
.join(
|
||||
EthereumTransaction,
|
||||
EthereumLabel.transaction_hash == EthereumTransaction.hash,
|
||||
)
|
||||
.join(
|
||||
EthereumBlock,
|
||||
EthereumTransaction.block_number == EthereumBlock.block_number,
|
||||
)
|
||||
.filter(EthereumLabel.label == label)
|
||||
.filter(time_filter)
|
||||
.order_by(
|
||||
# Without "transfer_value" and "owner_address" as sort keys, the final distinct query
|
||||
# does not seem to be deterministic.
|
||||
# Maybe relevant Stackoverflow post: https://stackoverflow.com/a/59410440
|
||||
text(
|
||||
"address_id, token_id, block_number desc, transaction_index desc, transfer_value, owner_address"
|
||||
)
|
||||
)
|
||||
.distinct("address_id", "token_id")
|
||||
)
|
||||
return query
|
||||
|
||||
purchaser_query = holder_query(TRANSFER_LABEL)
|
||||
minter_query = holder_query(MINT_LABEL)
|
||||
|
||||
blocks_result: Dict[str, int] = {}
|
||||
min_block = (
|
||||
db_session.query(func.min(EthereumBlock.block_number))
|
||||
.filter(time_filter)
|
||||
.scalar()
|
||||
)
|
||||
max_block = (
|
||||
db_session.query(func.max(EthereumBlock.block_number))
|
||||
.filter(time_filter)
|
||||
.scalar()
|
||||
)
|
||||
if min_block is not None:
|
||||
blocks_result["start"] = min_block
|
||||
if max_block is not None:
|
||||
blocks_result["end"] = max_block
|
||||
|
||||
num_transactions = transactions_query.distinct(EthereumTransaction.hash).count()
|
||||
num_transfers = transfer_query.distinct(EthereumTransaction.hash).count()
|
||||
|
||||
total_value = db_session.query(
|
||||
func.sum(transactions_query.subquery().c.value)
|
||||
).scalar()
|
||||
transfer_value = db_session.query(
|
||||
func.sum(transfer_query.subquery().c.value)
|
||||
).scalar()
|
||||
|
||||
num_minted = mint_query.count()
|
||||
|
||||
num_purchasers = (
|
||||
db_session.query(purchaser_query.subquery())
|
||||
.distinct(text("owner_address"))
|
||||
.count()
|
||||
)
|
||||
|
||||
num_minters = (
|
||||
db_session.query(minter_query.subquery())
|
||||
.distinct(text("owner_address"))
|
||||
.count()
|
||||
)
|
||||
|
||||
result = {
|
||||
"date_range": {
|
||||
"start_time": start_time.isoformat(),
|
||||
"include_start": True,
|
||||
"end_time": end_time.isoformat(),
|
||||
"include_end": True,
|
||||
},
|
||||
SUMMARY_KEY_BLOCKS: blocks_result,
|
||||
SUMMARY_KEY_NUM_TRANSACTIONS: f"{num_transactions}",
|
||||
SUMMARY_KEY_TOTAL_VALUE: f"{total_value}",
|
||||
SUMMARY_KEY_NFT_TRANSFERS: f"{num_transfers}",
|
||||
SUMMARY_KEY_NFT_TRANSFER_VALUE: f"{transfer_value}",
|
||||
SUMMARY_KEY_NFT_MINTS: f"{num_minted}",
|
||||
SUMMARY_KEY_NFT_PURCHASERS: f"{num_purchasers}",
|
||||
SUMMARY_KEY_NFT_MINTERS: f"{num_minters}",
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def summary(db_session: Session, end_time: datetime) -> Dict[str, Any]:
|
||||
"""
|
||||
Produces a summary of all Ethereum NFT activity:
|
||||
1. From 1 hour before end_time to end_time
|
||||
2. From 1 day before end_time to end_time
|
||||
3. From 1 week before end_time to end_time
|
||||
"""
|
||||
start_times = {
|
||||
"hour": end_time - timedelta(hours=1),
|
||||
"day": end_time - timedelta(days=1),
|
||||
"week": end_time - timedelta(weeks=1),
|
||||
}
|
||||
summaries = {
|
||||
period: time_bounded_summary(db_session, start_time, end_time)
|
||||
for period, start_time in start_times.items()
|
||||
}
|
||||
|
||||
def aggregate_summary(key: str) -> Dict[str, Any]:
|
||||
return {period: summary.get(key) for period, summary in summaries.items()}
|
||||
|
||||
result: Dict[str, Any] = {
|
||||
summary_key: aggregate_summary(summary_key) for summary_key in SUMMARY_KEYS
|
||||
}
|
||||
result["crawled_at"] = end_time.isoformat()
|
||||
return result
|
|
@ -11,6 +11,7 @@ def publish_json(
|
|||
title: str,
|
||||
content: Dict[str, Any],
|
||||
tags: Optional[List[str]] = None,
|
||||
wait: bool = True,
|
||||
) -> None:
|
||||
spire_api_url = os.environ.get(
|
||||
"MOONSTREAM_SPIRE_API_URL", "https://spire.bugout.dev"
|
||||
|
@ -26,7 +27,7 @@ def publish_json(
|
|||
"Authorization": f"Bearer {humbug_token}",
|
||||
}
|
||||
request_body = {"title": title, "content": json.dumps(content), "tags": tags}
|
||||
query_parameters = {"sync": True}
|
||||
query_parameters = {"sync": wait}
|
||||
response = requests.post(
|
||||
report_url, headers=headers, json=request_body, params=query_parameters
|
||||
)
|
||||
|
|
|
@ -2,4 +2,4 @@
|
|||
Moonstream crawlers version.
|
||||
"""
|
||||
|
||||
MOONCRAWL_VERSION = "0.0.3"
|
||||
MOONCRAWL_VERSION = "0.0.4"
|
||||
|
|
|
@ -8,3 +8,6 @@ ignore_missing_imports = True
|
|||
|
||||
[mypy-pyevmasm.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-tqdm.*]
|
||||
ignore_missing_imports = True
|
||||
|
|
|
@ -32,7 +32,7 @@ setup(
|
|||
package_data={"mooncrawl": ["py.typed"]},
|
||||
zip_safe=False,
|
||||
install_requires=[
|
||||
"moonstreamdb @ git+https://git@github.com/bugout-dev/moonstream.git@39d2b8e36a49958a9ae085ec2cc1be3fc732b9d0#egg=moonstreamdb&subdirectory=db",
|
||||
"moonstreamdb @ git+https://git@github.com/bugout-dev/moonstream.git@a4fff6498f66789934d4af26fd42a8cfb6e5eed5#egg=moonstreamdb&subdirectory=db",
|
||||
"humbug",
|
||||
"python-dateutil",
|
||||
"requests",
|
||||
|
@ -49,6 +49,7 @@ setup(
|
|||
"esd=mooncrawl.esd:main",
|
||||
"identity=mooncrawl.identity:main",
|
||||
"etherscan=mooncrawl.etherscan:main",
|
||||
"nft=mooncrawl.nft.cli:main",
|
||||
]
|
||||
},
|
||||
)
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
"""Support labels on transactions
|
||||
|
||||
Revision ID: 72f1ad512b2e
|
||||
Revises: ecb7817db377
|
||||
Create Date: 2021-09-02 22:59:46.408595
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "72f1ad512b2e"
|
||||
down_revision = "ecb7817db377"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column(
|
||||
"ethereum_labels",
|
||||
sa.Column("transaction_hash", sa.VARCHAR(length=256), nullable=True),
|
||||
)
|
||||
op.alter_column(
|
||||
"ethereum_labels", "address_id", existing_type=sa.INTEGER(), nullable=True
|
||||
)
|
||||
op.create_index(
|
||||
op.f("ix_ethereum_labels_transaction_hash"),
|
||||
"ethereum_labels",
|
||||
["transaction_hash"],
|
||||
unique=False,
|
||||
)
|
||||
op.create_foreign_key(
|
||||
op.f("fk_ethereum_labels_transaction_hash_ethereum_transactions"),
|
||||
"ethereum_labels",
|
||||
"ethereum_transactions",
|
||||
["transaction_hash"],
|
||||
["hash"],
|
||||
ondelete="CASCADE",
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade():
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_constraint(
|
||||
op.f("fk_ethereum_labels_transaction_hash_ethereum_transactions"),
|
||||
"ethereum_labels",
|
||||
type_="foreignkey",
|
||||
)
|
||||
op.drop_index(
|
||||
op.f("ix_ethereum_labels_transaction_hash"), table_name="ethereum_labels"
|
||||
)
|
||||
op.execute("DELETE FROM ethereum_labels WHERE address_id IS NULL;")
|
||||
op.alter_column(
|
||||
"ethereum_labels", "address_id", existing_type=sa.INTEGER(), nullable=False
|
||||
)
|
||||
op.drop_column("ethereum_labels", "transaction_hash")
|
||||
# ### end Alembic commands ###
|
|
@ -147,7 +147,13 @@ class EthereumLabel(Base): # type: ignore
|
|||
address_id = Column(
|
||||
Integer,
|
||||
ForeignKey("ethereum_addresses.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
nullable=True,
|
||||
index=True,
|
||||
)
|
||||
transaction_hash = Column(
|
||||
VARCHAR(256),
|
||||
ForeignKey("ethereum_transactions.hash", ondelete="CASCADE"),
|
||||
nullable=True,
|
||||
index=True,
|
||||
)
|
||||
label_data = Column(JSONB, nullable=True)
|
||||
|
|
|
@ -2,4 +2,4 @@
|
|||
Moonstream database version.
|
||||
"""
|
||||
|
||||
MOONSTREAMDB_VERSION = "0.0.3"
|
||||
MOONSTREAMDB_VERSION = "0.1.0"
|
||||
|
|
Ładowanie…
Reference in New Issue