diff --git a/crawlers/mooncrawl/ethcrawler.py b/crawlers/mooncrawl/ethcrawler.py index 065bff9f..96968177 100644 --- a/crawlers/mooncrawl/ethcrawler.py +++ b/crawlers/mooncrawl/ethcrawler.py @@ -8,13 +8,18 @@ import sys import time from typing import Iterator, List +import dateutil.parser + from .ethereum import ( crawl_blocks_executor, crawl_blocks, check_missing_blocks, get_latest_blocks, process_contract_deployments, + DateRange, + trending, ) +from .publish import publish_json from .settings import MOONSTREAM_CRAWL_WORKERS @@ -164,6 +169,23 @@ def ethcrawler_contracts_update_handler(args: argparse.Namespace) -> None: json.dump(results, args.outfile) +def ethcrawler_trending_handler(args: argparse.Namespace) -> None: + date_range = DateRange( + start_time=args.start, + end_time=args.end, + include_start=args.include_start, + include_end=args.include_end, + ) + results = trending(date_range) + if args.humbug: + opening_bracket = "[" if args.include_start else "(" + closing_bracket = "]" if args.include_end else ")" + title = f"Ethereum trending addresses: {opening_bracket}{args.start}, {args.end}{closing_bracket}" + publish_json("ethereum_trending", args.humbug, title, results) + with args.outfile as ofp: + json.dump(results, ofp) + + def main() -> None: parser = argparse.ArgumentParser(description="Moonstream crawlers CLI") parser.set_defaults(func=lambda _: parser.print_help()) @@ -306,6 +328,47 @@ def main() -> None: func=ethcrawler_contracts_update_handler ) + parser_ethcrawler_trending = subcommands.add_parser( + "trending", description="Trending addresses on the Ethereum blockchain" + ) + parser_ethcrawler_trending.add_argument( + "-s", + "--start", + type=dateutil.parser.parse, + required=True, + help="Start time for window to calculate trending addresses in", + ) + parser_ethcrawler_trending.add_argument( + "--include-start", + action="store_true", + help="Set this flag if range should include start time", + ) + parser_ethcrawler_trending.add_argument( + "-e", + "--end", + type=dateutil.parser.parse, + required=True, + help="End time for window to calculate trending addresses in", + ) + parser_ethcrawler_trending.add_argument( + "--include-end", + action="store_true", + help="Set this flag if range should include end time", + ) + parser_ethcrawler_trending.add_argument( + "--humbug", + default=None, + help="If you would like to write this data to a Moonstream journal, please provide a Humbug token for that here.", + ) + parser_ethcrawler_trending.add_argument( + "-o", + "--outfile", + type=argparse.FileType("w"), + default=sys.stdout, + help="Optional file to write output to. By default, prints to stdout.", + ) + parser_ethcrawler_trending.set_defaults(func=ethcrawler_trending_handler) + args = parser.parse_args() args.func(args) diff --git a/crawlers/mooncrawl/ethereum.py b/crawlers/mooncrawl/ethereum.py index 68ff6d6f..942fb0a3 100644 --- a/crawlers/mooncrawl/ethereum.py +++ b/crawlers/mooncrawl/ethereum.py @@ -1,12 +1,17 @@ from concurrent.futures import Future, ProcessPoolExecutor, wait -from typing import List, Optional, Tuple, Union +from dataclasses import dataclass +from datetime import datetime +from os import close +from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from sqlalchemy import desc +from sqlalchemy import desc, Column +from sqlalchemy import func +from sqlalchemy.orm import Session, Query from web3 import Web3, IPCProvider, HTTPProvider from web3.types import BlockData from .settings import MOONSTREAM_IPC_PATH, MOONSTREAM_CRAWL_WORKERS -from moonstreamdb.db import yield_db_session_ctx +from moonstreamdb.db import yield_db_session, yield_db_session_ctx from moonstreamdb.models import ( EthereumBlock, EthereumAddress, @@ -20,6 +25,14 @@ class EthereumBlockCrawlError(Exception): """ +@dataclass +class DateRange: + start_time: datetime + end_time: datetime + include_start: bool + include_end: bool + + def connect(web3_uri: Optional[str] = MOONSTREAM_IPC_PATH): web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() if web3_uri is not None: @@ -263,3 +276,97 @@ def process_contract_deployments() -> List[Tuple[str, str]]: current_offset += limit return results + + +def trending( + date_range: DateRange, db_session: Optional[Session] = None +) -> Dict[str, Any]: + close_db_session = False + if db_session is None: + close_db_session = True + db_session = next(yield_db_session()) + + start_timestamp = int(date_range.start_time.timestamp()) + end_timestamp = int(date_range.end_time.timestamp()) + + def make_query( + identifying_column: Column, + statistic_column: Column, + aggregate_func: Callable, + aggregate_label: str, + ) -> Query: + query = db_session.query( + identifying_column, aggregate_func(statistic_column).label(aggregate_label) + ).join( + EthereumBlock, + EthereumTransaction.block_number == EthereumBlock.block_number, + ) + if date_range.include_start: + query = query.filter(EthereumBlock.timestamp >= start_timestamp) + else: + query = query.filter(EthereumBlock.timestamp > start_timestamp) + + if date_range.include_end: + query = query.filter(EthereumBlock.timestamp <= end_timestamp) + else: + query = query.filter(EthereumBlock.timestamp < end_timestamp) + + query = ( + query.group_by(identifying_column).order_by(desc(aggregate_label)).limit(10) + ) + + return query + + results: Dict[str, Any] = {} + + try: + transactions_out_query = make_query( + EthereumTransaction.from_address, + EthereumTransaction.hash, + func.count, + "transactions_out", + ) + transactions_out = transactions_out_query.all() + results["transactions_out"] = [ + {"address": row[0], "statistic": row[1]} for row in transactions_out + ] + + transactions_in_query = make_query( + EthereumTransaction.to_address, + EthereumTransaction.hash, + func.count, + "transactions_in", + ) + transactions_in = transactions_in_query.all() + results["transactions_in"] = [ + {"address": row[0], "statistic": row[1]} for row in transactions_in + ] + + value_out_query = make_query( + EthereumTransaction.from_address, + EthereumTransaction.value, + func.sum, + "value_out", + ) + value_out = value_out_query.all() + results["value_out"] = [ + {"address": row[0], "statistic": int(row[1])} for row in value_out + ] + + value_in_query = make_query( + EthereumTransaction.to_address, + EthereumTransaction.value, + func.sum, + "value_in", + ) + value_in = value_in_query.all() + results["value_in"] = [ + {"address": row[0], "statistic": int(row[1])} for row in value_in + ] + + pass + finally: + if close_db_session: + db_session.close() + + return results diff --git a/crawlers/mooncrawl/publish.py b/crawlers/mooncrawl/publish.py new file mode 100644 index 00000000..c1765610 --- /dev/null +++ b/crawlers/mooncrawl/publish.py @@ -0,0 +1,33 @@ +import json +import os +from typing import Any, Dict, List, Optional + +import requests + + +def publish_json( + crawl_type: str, + humbug_token: str, + title: str, + content: Dict[str, Any], + tags: Optional[List[str]] = None, +) -> None: + spire_api_url = os.environ.get( + "MOONSTREAM_SPIRE_API_URL", "https://spire.bugout.dev" + ).rstrip("/") + report_url = f"{spire_api_url}/humbug/reports" + + if tags is None: + tags = [] + + tags.append(f"crawl_type:{crawl_type}") + + headers = { + "Authorization": f"Bearer {humbug_token}", + } + request_body = {"title": title, "content": json.dumps(content), "tags": tags} + query_parameters = {"sync": True} + response = requests.post( + report_url, headers=headers, json=request_body, params=query_parameters + ) + response.raise_for_status() diff --git a/crawlers/sample.env b/crawlers/sample.env index e6946b01..acbc1736 100644 --- a/crawlers/sample.env +++ b/crawlers/sample.env @@ -2,3 +2,4 @@ export MOONSTREAM_IPC_PATH=null export MOONSTREAM_CRAWL_WORKERS=4 export MOONSTREAM_DB_URI="postgresql://:@:/" +export MOONSTREAM_HUMBUG_TOKEN="" diff --git a/crawlers/setup.py b/crawlers/setup.py index b5a536bf..5ba8bbaa 100644 --- a/crawlers/setup.py +++ b/crawlers/setup.py @@ -34,6 +34,7 @@ setup( zip_safe=False, install_requires=[ "moonstreamdb @ git+https://git@github.com/bugout-dev/moonstream.git@39d2b8e36a49958a9ae085ec2cc1be3fc732b9d0#egg=moonstreamdb&subdirectory=db", + "python-dateutil", "requests", "tqdm", "web3", @@ -43,7 +44,7 @@ setup( "console_scripts": [ "ethcrawler=mooncrawl.ethcrawler:main", "esd=mooncrawl.esd:main", - "identity=mooncrawl.identity:main" + "identity=mooncrawl.identity:main", ] }, )