changed summary schema, now it requires start and end blocks

pull/263/head
yhtiyar 2021-09-16 10:32:49 +03:00
rodzic 5b6d06cf99
commit a093946a7a
2 zmienionych plików z 157 dodań i 103 usunięć

Wyświetl plik

@ -3,13 +3,16 @@ A command line tool to crawl information about NFTs from various sources.
"""
import argparse
from datetime import datetime, timedelta, timezone
import dateutil.parser
from dateutil.relativedelta import relativedelta
import json
import logging
import os
import sys
import time
from typing import Any, Dict, cast
from typing import Any, Dict, cast, Optional
import dateutil.parser
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumBlock
from sqlalchemy.orm.session import Session
@ -21,6 +24,11 @@ from ..publish import publish_json
from ..settings import MOONSTREAM_IPC_PATH
from ..version import MOONCRAWL_VERSION
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
BLOCKS_PER_SUMMARY = 40
def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3:
"""
@ -47,35 +55,86 @@ def get_latest_block_from_db(db_session: Session):
)
def get_latest_summary_block() -> Optional[int]:
pass
def get_latest_nft_event_date() -> Optional[datetime]:
pass
def sync_labels(
db_session: Session, web3_client: Web3, start: Optional[int]
) -> EthereumBlock:
if start is None:
logger.info(
"Syncing start block is not given, getting it from latest nft label in db"
)
start_date = get_latest_nft_event_date()
if start_date is None:
logger.warning(
"Didn't find any nft labels in db, starting sync from 3 month before now"
)
time_now = datetime.now(timezone.utc)
start_date = time_now - relativedelta(months=3)
start = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.timestamp >= start_date.timestamp())
.order_by(EthereumBlock.timestamp.asc())
.limit(1)
.one()
).block_number
logger.info(f"Start block: {start}, date: {start_date}")
latest_block = get_latest_block_from_db(db_session)
end = latest_block.block_number
assert start <= end, f"Start block {start} is greater than latest_block {end} in db"
logger.info(f"Labeling blocks {start}-{end}")
add_labels(web3_client, db_session, start, end)
return latest_block
def sync_summaries(db_session: Session, end: int, start: Optional[int]):
if start is None:
logger.info(
"Syncing start time is not given, getting it from latest nft label in db"
)
start = get_latest_summary_block()
if start is None:
time_now = datetime.now(timezone.utc)
start_date = time_now - relativedelta(months=3)
start = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.timestamp >= start_date.timestamp())
.order_by(EthereumBlock.timestamp.asc())
.limit(1)
.one()
).block_number
start += 1
while start < end:
current_end = start + BLOCKS_PER_SUMMARY - 1
current_end_time = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.block_number <= current_end)
.order_by(EthereumBlock.block_number.desc())
.limit(1)
.one()
).timestamp
summary_result = ethereum_summary(
db_session, datetime.fromtimestamp(current_end_time, timezone.utc)
)
def ethereum_sync_handler(args: argparse.Namespace) -> None:
web3_client = web3_client_from_cli_or_env(args)
humbug_token = os.environ.get("MOONSTREAM_HUMBUG_TOKEN")
if humbug_token is None:
raise ValueError("MOONSTREAM_HUMBUG_TOKEN env variable is not set")
with yield_db_session_ctx() as db_session:
start = args.start
if start is None:
time_now = datetime.now(timezone.utc)
week_ago = time_now - timedelta(weeks=1)
start = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.timestamp >= week_ago.timestamp())
.order_by(EthereumBlock.timestamp.asc())
.limit(1)
.one()
).block_number
latest_block = get_latest_block_from_db(db_session)
end = latest_block.block_number
assert (
start <= end
), f"Start block {start} is greater than latest_block {end} in db"
while True:
print(f"Labeling blocks {start}-{end}")
add_labels(web3_client, db_session, start, end)
end_time = datetime.fromtimestamp(latest_block.timestamp, timezone.utc)
print(f"Creating summary with endtime={end_time}")
result = ethereum_summary(db_session, end_time)
@ -96,36 +155,30 @@ def ethereum_label_handler(args: argparse.Namespace) -> None:
add_labels(web3_client, db_session, args.start, args.end, args.address)
def push_summary(result: Dict[str, Any], end_block_no: int, humbug_token: str):
def push_summary(result: Dict[str, Any], humbug_token: str):
title = f"NFT activity on the Ethereum blockchain: end time: {result['crawled_at'] } (block {end_block_no})"
title = (
f"NFT activity on the Ethereum blockchain: end time: {result['crawled_at'] })"
)
publish_json(
"nft_ethereum",
humbug_token,
title,
result,
tags=[f"crawler_version:{MOONCRAWL_VERSION}", f"end_block:{end_block_no}"],
tags=[f"crawler_version:{MOONCRAWL_VERSION}"],
wait=False,
)
def ethereum_summary_handler(args: argparse.Namespace) -> None:
with yield_db_session_ctx() as db_session:
result = ethereum_summary(db_session, args.end)
# humbug_token = args.humbug
# if humbug_token is None:
# humbug_token = os.environ.get("MOONSTREAM_HUMBUG_TOKEN")
# if humbug_token:
# title = f"NFT activity on the Ethereum blockchain: {start_time} (block {start_block}) to {end_time} (block {end_block})"
# publish_json(
# "nft_ethereum",
# humbug_token,
# title,
# result,
# tags=[f"crawler_version:{MOONCRAWL_VERSION}"],
# wait=False,
# )
with yield_db_session_ctx() as db_session:
result = ethereum_summary(db_session, args.start, args.end)
humbug_token = args.humbug
if humbug_token is None:
humbug_token = os.environ.get("MOONSTREAM_HUMBUG_TOKEN")
if humbug_token:
push_summary(result, humbug_token)
with args.outfile as ofp:
json.dump(result, ofp)
@ -181,20 +234,18 @@ def main() -> None:
"summary", description="Generate Ethereum NFT summary"
)
parser_ethereum_summary.add_argument(
"-e",
"--end",
type=dateutil.parser.parse,
default=time_now.isoformat(),
help=f"End time for window to calculate NFT statistics (default: {time_now.isoformat()})",
"-s",
"--start",
type=int,
required=True,
help=f"Start block for window to calculate NFT statistics",
)
parser_ethereum_summary.add_argument(
"--humbug",
default=None,
help=(
"If you would like to write this data to a Moonstream journal, please provide a Humbug "
"token for that here. (This argument overrides any value set in the "
"MOONSTREAM_HUMBUG_TOKEN environment variable)"
),
"-e",
"--end",
type=int,
required=True,
help=f"End block for window to calculate NFT statistics",
)
parser_ethereum_summary.add_argument(
"-o",
@ -203,6 +254,13 @@ def main() -> None:
default=sys.stdout,
help="Optional file to write output to. By default, prints to stdout.",
)
parser_ethereum_summary.add_argument(
"-humbug",
"--humbug",
default=None,
help="Humbug token, if given summary will send there",
)
parser_ethereum_summary.set_defaults(func=ethereum_summary_handler)
parser_ethereum_sync = subparsers_ethereum.add_parser(

Wyświetl plik

@ -1,5 +1,5 @@
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import json
import logging
from hexbytes.main import HexBytes
@ -31,7 +31,9 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Summary keys
SUMMARY_KEY_BLOCKS = "blocks"
SUMMARY_KEY_START_BLOCK = "start_block"
SUMMARY_KEY_END_BLOCK = "end_block"
SUMMARY_KEY_NUM_BLOCKS = "num_blocks"
SUMMARY_KEY_NUM_TRANSACTIONS = "num_transactions"
SUMMARY_KEY_TOTAL_VALUE = "total_value"
SUMMARY_KEY_NFT_TRANSFERS = "nft_transfers"
@ -41,7 +43,9 @@ SUMMARY_KEY_NFT_PURCHASERS = "nft_owners"
SUMMARY_KEY_NFT_MINTERS = "nft_minters"
SUMMARY_KEYS = [
SUMMARY_KEY_BLOCKS,
SUMMARY_KEY_START_BLOCK,
SUMMARY_KEY_END_BLOCK,
SUMMARY_KEY_NUM_BLOCKS,
SUMMARY_KEY_NUM_TRANSACTIONS,
SUMMARY_KEY_TOTAL_VALUE,
SUMMARY_KEY_NFT_TRANSFERS,
@ -485,20 +489,17 @@ def add_labels(
pbar.close()
def time_bounded_summary(
def block_bounded_summary(
db_session: Session,
start_time: datetime,
end_time: datetime,
start_block: int,
end_block: int,
) -> Dict[str, Any]:
"""
Produces a summary of Ethereum NFT activity between the given start_time and end_time (inclusive).
"""
start_timestamp = int(start_time.timestamp())
end_timestamp = int(end_time.timestamp())
time_filter = and_(
EthereumBlock.timestamp >= start_timestamp,
EthereumBlock.timestamp <= end_timestamp,
block_filter = and_(
EthereumBlock.block_number >= start_block,
EthereumBlock.block_number <= end_block,
)
transactions_query = (
@ -507,7 +508,7 @@ def time_bounded_summary(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(time_filter)
.filter(block_filter)
)
def nft_query(label: str) -> Query:
@ -529,7 +530,7 @@ def time_bounded_summary(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(time_filter)
.filter(block_filter)
.filter(EthereumLabel.label == label)
)
return query
@ -556,7 +557,7 @@ def time_bounded_summary(
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(EthereumLabel.label == label)
.filter(time_filter)
.filter(block_filter)
.order_by(
# Without "transfer_value" and "owner_address" as sort keys, the final distinct query
# does not seem to be deterministic.
@ -572,21 +573,30 @@ def time_bounded_summary(
purchaser_query = holder_query(TRANSFER_LABEL)
minter_query = holder_query(MINT_LABEL)
blocks_result: Dict[str, int] = {}
min_block = (
db_session.query(func.min(EthereumBlock.block_number))
.filter(time_filter)
.scalar()
)
max_block = (
db_session.query(func.max(EthereumBlock.block_number))
.filter(time_filter)
.scalar()
blocks = (
db_session.query(EthereumBlock)
.filter(block_filter)
.order_by(EthereumBlock.block_number.asc())
)
first_block = None
last_block = None
num_blocks = 0
for block in blocks:
if num_blocks == 0:
min_block = block
max_block = block
num_blocks += 1
start_time = None
end_time = None
if min_block is not None:
blocks_result["start"] = min_block
first_block = min_block.block_number
start_time = datetime.fromtimestamp(
min_block.timestamp, timezone.utc
).isoformat()
if max_block is not None:
blocks_result["end"] = max_block
last_block = max_block.block_number
end_time = datetime.fromtimestamp(max_block.timestamp, timezone.utc).isoformat()
num_transactions = transactions_query.distinct(EthereumTransaction.hash).count()
num_transfers = transfer_query.distinct(EthereumTransaction.hash).count()
@ -614,12 +624,14 @@ def time_bounded_summary(
result = {
"date_range": {
"start_time": start_time.isoformat(),
"start_time": start_time,
"include_start": True,
"end_time": end_time.isoformat(),
"end_time": end_time,
"include_end": True,
},
SUMMARY_KEY_BLOCKS: blocks_result,
SUMMARY_KEY_START_BLOCK: first_block,
SUMMARY_KEY_END_BLOCK: last_block,
SUMMARY_KEY_NUM_BLOCKS: num_blocks,
SUMMARY_KEY_NUM_TRANSACTIONS: f"{num_transactions}",
SUMMARY_KEY_TOTAL_VALUE: f"{total_value}",
SUMMARY_KEY_NFT_TRANSFERS: f"{num_transfers}",
@ -632,28 +644,12 @@ def time_bounded_summary(
return result
def summary(db_session: Session, end_time: datetime) -> Dict[str, Any]:
def summary(db_session: Session, start_block: int, end_block: int) -> Dict[str, Any]:
"""
Produces a summary of all Ethereum NFT activity:
1. From 1 hour before end_time to end_time
2. From 1 day before end_time to end_time
3. From 1 week before end_time to end_time
From 1 hour before end_time to end_time
"""
start_times = {
"hour": end_time - timedelta(hours=1),
"day": end_time - timedelta(days=1),
"week": end_time - timedelta(weeks=1),
}
summaries = {
period: time_bounded_summary(db_session, start_time, end_time)
for period, start_time in start_times.items()
}
def aggregate_summary(key: str) -> Dict[str, Any]:
return {period: summary.get(key) for period, summary in summaries.items()}
result: Dict[str, Any] = {
summary_key: aggregate_summary(summary_key) for summary_key in SUMMARY_KEYS
}
result["crawled_at"] = end_time.isoformat()
result = block_bounded_summary(db_session, start_block, end_block)
result["crawled_at"] = datetime.utcnow().isoformat()
return result