Working implementation of "ethcrawlers trending"

The 24 hour queries take a LONG time but the 1 hour queries execute in 4
seconds.
pull/92/head
Neeraj Kashyap 2021-08-09 05:55:10 -07:00
rodzic ea1a714e9b
commit e5226b8a8b
2 zmienionych plików z 121 dodań i 5 usunięć

Wyświetl plik

@ -16,6 +16,8 @@ from .ethereum import (
check_missing_blocks,
get_latest_blocks,
process_contract_deployments,
DateRange,
trending,
)
from .settings import MOONSTREAM_CRAWL_WORKERS
@ -167,8 +169,15 @@ def ethcrawler_contracts_update_handler(args: argparse.Namespace) -> None:
def ethcrawler_trending_handler(args: argparse.Namespace) -> None:
with args.outfile:
print(args)
date_range = DateRange(
start_time=args.start,
end_time=args.end,
include_start=args.include_start,
include_end=args.include_end,
)
results = trending(date_range)
with args.outfile as ofp:
json.dump(results, ofp)
def main() -> None:

Wyświetl plik

@ -1,12 +1,17 @@
from concurrent.futures import Future, ProcessPoolExecutor, wait
from typing import List, Optional, Tuple, Union
from dataclasses import dataclass
from datetime import datetime
from os import close
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from sqlalchemy import desc
from sqlalchemy import desc, Column
from sqlalchemy import func
from sqlalchemy.orm import Session, Query
from web3 import Web3, IPCProvider, HTTPProvider
from web3.types import BlockData
from .settings import MOONSTREAM_IPC_PATH, MOONSTREAM_CRAWL_WORKERS
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.db import yield_db_session, yield_db_session_ctx
from moonstreamdb.models import (
EthereumBlock,
EthereumSmartContract,
@ -20,6 +25,14 @@ class EthereumBlockCrawlError(Exception):
"""
@dataclass
class DateRange:
start_time: datetime
end_time: datetime
include_start: bool
include_end: bool
def connect(web3_uri: Optional[str] = MOONSTREAM_IPC_PATH):
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
if web3_uri is not None:
@ -263,3 +276,97 @@ def process_contract_deployments() -> List[Tuple[str, str]]:
current_offset += limit
return results
def trending(
date_range: DateRange, db_session: Optional[Session] = None
) -> Dict[str, Any]:
close_db_session = False
if db_session is None:
close_db_session = True
db_session = next(yield_db_session())
start_timestamp = int(date_range.start_time.timestamp())
end_timestamp = int(date_range.end_time.timestamp())
def make_query(
transaction_column: Column,
aggregate_column: Column,
aggregate_func: Callable,
aggregate_label: str,
) -> Query:
query = db_session.query(
transaction_column, aggregate_func(aggregate_column).label(aggregate_label)
).join(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
if date_range.include_start:
query = query.filter(EthereumBlock.timestamp >= start_timestamp)
else:
query = query.filter(EthereumBlock.timestamp > start_timestamp)
if date_range.include_end:
query = query.filter(EthereumBlock.timestamp <= end_timestamp)
else:
query = query.filter(EthereumBlock.timestamp < end_timestamp)
query = (
query.group_by(transaction_column).order_by(desc(aggregate_label)).limit(10)
)
return query
results: Dict[str, Any] = {}
try:
transactions_out_query = make_query(
EthereumTransaction.from_address,
EthereumTransaction.hash,
func.count,
"transactions_out",
)
transactions_out = transactions_out_query.all()
results["transactions_out"] = [
{"address": row[0], "metric": row[1]} for row in transactions_out
]
transactions_in_query = make_query(
EthereumTransaction.to_address,
EthereumTransaction.hash,
func.count,
"transactions_in",
)
transactions_in = transactions_in_query.all()
results["transactions_in"] = [
{"address": row[0], "metric": row[1]} for row in transactions_in
]
value_out_query = make_query(
EthereumTransaction.from_address,
EthereumTransaction.value,
func.sum,
"value_out",
)
value_out = value_out_query.all()
results["value_out"] = [
{"address": row[0], "metric": float(row[1])} for row in value_out
]
value_in_query = make_query(
EthereumTransaction.to_address,
EthereumTransaction.value,
func.sum,
"value_in",
)
value_in = value_in_query.all()
results["value_in"] = [
{"address": row[0], "metric": float(row[1])} for row in value_in
]
pass
finally:
if close_db_session:
db_session.close()
return results