First version of "nft ethereum summary" crawler

pull/226/head
Neeraj Kashyap 2021-09-05 22:59:10 -07:00
rodzic 5cc7def647
commit 1013d5cb25
2 zmienionych plików z 34 dodań i 10 usunięć

Wyświetl plik

@ -42,7 +42,7 @@ def ethereum_label_handler(args: argparse.Namespace) -> None:
def ethereum_summary_handler(args: argparse.Namespace) -> None:
with yield_db_session_ctx() as db_session:
result = ethereum_summary(db_session, args.start, args.end)
result = ethereum_summary(db_session, args.end)
# humbug_token = args.humbug
# if humbug_token is None:
@ -111,13 +111,6 @@ def main() -> None:
parser_ethereum_summary = subparsers_ethereum.add_parser(
"summary", description="Generate Ethereum NFT summary"
)
parser_ethereum_summary.add_argument(
"-s",
"--start",
type=dateutil.parser.parse,
default=(time_now - timedelta(hours=1, minutes=0)).isoformat(),
help=f"Start time for window to calculate NFT statistics (default: {(time_now - timedelta(hours=1,minutes=0)).isoformat()})",
)
parser_ethereum_summary.add_argument(
"-e",
"--end",

Wyświetl plik

@ -1,5 +1,5 @@
from dataclasses import dataclass, asdict
from datetime import datetime
from datetime import datetime, time, timedelta
import logging
from hexbytes.main import HexBytes
from typing import Any, cast, Dict, List, Optional, Set, Tuple
@ -460,11 +460,14 @@ def add_labels(
pbar.close()
def summary(
def time_bounded_summary(
db_session: Session,
start_time: datetime,
end_time: datetime,
) -> Dict[str, Any]:
"""
Produces a summary of Ethereum NFT activity between the given start_time and end_time (inclusive).
"""
start_timestamp = int(start_time.timestamp())
end_timestamp = int(end_time.timestamp())
@ -559,3 +562,31 @@ def summary(
}
return result
def summary(db_session: Session, end_time: datetime) -> Dict[str, Any]:
"""
Produces a summary of all Ethereum NFT activity:
1. From 1 hour before end_time to end_time
2. From 1 day before end_time to end_time
3. From 1 week before end_time to end_time
"""
start_times = {
"hour": end_time - timedelta(hours=1),
"day": end_time - timedelta(days=1),
"week": end_time - timedelta(weeks=1),
}
summaries = {
period: time_bounded_summary(db_session, start_time, end_time)
for period, start_time in start_times.items()
}
def aggregate_summary(key: str) -> Dict[str, Any]:
return {period: summary.get(key) for period, summary in summaries.items()}
return {
"crawled_at": end_time.isoformat(),
"blocks": aggregate_summary("blocks"),
"transactions": aggregate_summary("transactions"),
"value": aggregate_summary("value"),
}