added sync command, that will continiously work

pull/226/head
yhtiyar 2021-09-08 17:00:02 +03:00
rodzic ed8096b813
commit e51e008164
2 zmienionych plików z 88 dodań i 6 usunięć

Wyświetl plik

@ -4,11 +4,15 @@ A command line tool to crawl information about NFTs from various sources.
import argparse
from datetime import datetime, timedelta, timezone
import json
import os
import sys
from typing import cast
import time
from typing import Any, Dict, cast
import dateutil.parser
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumBlock
from sqlalchemy.orm.session import Session
from web3 import Web3
from ..ethereum import connect
@ -34,12 +38,77 @@ def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3:
return connect(web3_connection_string)
def get_latest_block_from_db(db_session: Session):
return (
db_session.query(EthereumBlock)
.order_by(EthereumBlock.timestamp.desc())
.limit(1)
.one()
)
def ethereum_sync_handler(args: argparse.Namespace) -> None:
web3_client = web3_client_from_cli_or_env(args)
humbug_token = os.environ.get("MOONSTREAM_HUMBUG_TOKEN")
if humbug_token is None:
raise ValueError("MOONSTREAM_HUMBUG_TOKEN env variable is not set")
with yield_db_session_ctx() as db_session:
start = args.start
if start is None:
time_now = datetime.now(timezone.utc)
week_ago = time_now - timedelta(weeks=1)
start = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.timestamp >= week_ago.timestamp())
.order_by(EthereumBlock.timestamp.asc())
.limit(1)
.one()
).block_number
latest_block = get_latest_block_from_db(db_session)
end = latest_block.block_number
assert (
start <= end
), f"Start block {start} is greater than latest_block {end} in db"
while True:
print(f"Labeling blocks {start}-{end}")
add_labels(web3_client, db_session, start, end)
end_time = datetime.fromtimestamp(latest_block.timestamp, timezone.utc)
print(f"Creating summary with endtime={end_time}")
result = ethereum_summary(db_session, end_time)
push_summary(result, end, humbug_token)
sleep_time = 60 * 60
print(f"Going to sleep for:{sleep_time}s")
time.sleep(sleep_time)
start = end + 1
latest_block = get_latest_block_from_db(db_session)
end = latest_block.block_number
def ethereum_label_handler(args: argparse.Namespace) -> None:
web3_client = web3_client_from_cli_or_env(args)
with yield_db_session_ctx() as db_session:
add_labels(web3_client, db_session, args.start, args.end, args.address)
def push_summary(result: Dict[str, Any], end_block_no: int, humbug_token: str):
title = f"NFT activity on the Ethereum blockchain: end time: {result['crawled_at'] } (block {end_block_no})"
publish_json(
"nft_ethereum",
humbug_token,
title,
result,
tags=[f"crawler_version:{MOONCRAWL_VERSION}", f"end_block:{end_block_no}"],
wait=False,
)
def ethereum_summary_handler(args: argparse.Namespace) -> None:
with yield_db_session_ctx() as db_session:
result = ethereum_summary(db_session, args.end)
@ -136,6 +205,19 @@ def main() -> None:
)
parser_ethereum_summary.set_defaults(func=ethereum_summary_handler)
parser_ethereum_sync = subparsers_ethereum.add_parser(
"sync",
description="Label addresses and transactions in databse using crawled NFT transfer information, sync mode",
)
parser_ethereum_sync.add_argument(
"-s",
"--start",
type=int,
required=False,
help="Starting block number (inclusive if block available)",
)
parser_ethereum_sync.set_defaults(func=ethereum_sync_handler)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -388,7 +388,7 @@ def add_labels(
from_block: Optional[int] = None,
to_block: Optional[int] = None,
contract_address: Optional[str] = None,
batch_size: int = 50,
batch_size: int = 100,
) -> None:
"""
Crawls blocks between from_block and to_block checking for NFT mints and transfers.
@ -439,12 +439,12 @@ def add_labels(
start, end = get_block_bounds(w3, from_block, to_block)
batch_start = start
batch_end = start + batch_size - 1
batch_end = min(start + batch_size - 1, end)
address_ids: Dict[str, int] = {}
pbar = tqdm(total=(end - start + 1))
pbar.set_description("Processing blocks")
pbar.set_description(f"Labeling blocks {start}-{end}")
while batch_start <= batch_end:
job = get_nft_transfers(
w3,
@ -598,7 +598,7 @@ def time_bounded_summary(
func.sum(transfer_query.subquery().c.value)
).scalar()
num_minted = mint_query.distinct(EthereumTransaction.hash).count()
num_minted = mint_query.count()
num_purchasers = (
db_session.query(purchaser_query.subquery())
@ -652,7 +652,7 @@ def summary(db_session: Session, end_time: datetime) -> Dict[str, Any]:
def aggregate_summary(key: str) -> Dict[str, Any]:
return {period: summary.get(key) for period, summary in summaries.items()}
result = {
result: Dict[str, Any] = {
summary_key: aggregate_summary(summary_key) for summary_key in SUMMARY_KEYS
}
result["crawled_at"] = end_time.isoformat()