From b0dc2c9e9c6f34a90dc2b6caeaa55fd65c54115b Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Mon, 9 Aug 2021 04:36:52 -0700 Subject: [PATCH 1/6] Added MOONSTREAM_HUMBUG_TOKEN environment variable The ethereum trending addresses crawler will be the first one to report data through a Humbug journal. This requires us to accept a Humbug token as configuration. --- crawlers/sample.env | 1 + 1 file changed, 1 insertion(+) diff --git a/crawlers/sample.env b/crawlers/sample.env index e6946b01..acbc1736 100644 --- a/crawlers/sample.env +++ b/crawlers/sample.env @@ -2,3 +2,4 @@ export MOONSTREAM_IPC_PATH=null export MOONSTREAM_CRAWL_WORKERS=4 export MOONSTREAM_DB_URI="postgresql://:@:/" +export MOONSTREAM_HUMBUG_TOKEN="" From ea1a714e9b2334e8359d88e02be6891231dc090c Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Mon, 9 Aug 2021 04:58:47 -0700 Subject: [PATCH 2/6] Scaffolding for "mooncrawl ethcrawler trending" --- crawlers/mooncrawl/cli.py | 48 +++++++++++++++++++++++++++++++++++++++ crawlers/setup.py | 1 + 2 files changed, 49 insertions(+) diff --git a/crawlers/mooncrawl/cli.py b/crawlers/mooncrawl/cli.py index c37e9ff8..2673269c 100644 --- a/crawlers/mooncrawl/cli.py +++ b/crawlers/mooncrawl/cli.py @@ -8,6 +8,8 @@ import sys import time from typing import Iterator, List +import dateutil.parser + from .ethereum import ( crawl_blocks_executor, crawl_blocks, @@ -164,6 +166,11 @@ def ethcrawler_contracts_update_handler(args: argparse.Namespace) -> None: json.dump(results, args.outfile) +def ethcrawler_trending_handler(args: argparse.Namespace) -> None: + with args.outfile: + print(args) + + def main() -> None: parser = argparse.ArgumentParser(description="Moonstream crawlers CLI") parser.set_defaults(func=lambda _: parser.print_help()) @@ -314,6 +321,47 @@ def main() -> None: func=ethcrawler_contracts_update_handler ) + parser_ethcrawler_trending = subcommands_ethcrawler.add_parser( + "trending", description="Trending addresses on the Ethereum blockchain" + ) + parser_ethcrawler_trending.add_argument( + "-s", + "--start", + type=dateutil.parser.parse, + required=True, + help="Start time for window to calculate trending addresses in", + ) + parser_ethcrawler_trending.add_argument( + "--include-start", + action="store_true", + help="Set this flag if range should include start time", + ) + parser_ethcrawler_trending.add_argument( + "-e", + "--end", + type=dateutil.parser.parse, + required=True, + help="End time for window to calculate trending addresses in", + ) + parser_ethcrawler_trending.add_argument( + "--include-end", + action="store_true", + help="Set this flag if range should include end time", + ) + parser_ethcrawler_trending.add_argument( + "--humbug", + default=None, + help="If you would like to write this data to a Moonstream journal, please provide a Humbug token for that here.", + ) + parser_ethcrawler_trending.add_argument( + "-o", + "--outfile", + type=argparse.FileType("w"), + default=sys.stdout, + help="Optional file to write output to. By default, prints to stdout.", + ) + parser_ethcrawler_trending.set_defaults(func=ethcrawler_trending_handler) + args = parser.parse_args() args.func(args) diff --git a/crawlers/setup.py b/crawlers/setup.py index 737e8d65..29eefd31 100644 --- a/crawlers/setup.py +++ b/crawlers/setup.py @@ -34,6 +34,7 @@ setup( zip_safe=False, install_requires=[ "moonstreamdb @ git+https://git@github.com/bugout-dev/moonstream.git@ec3278e192119d1e8a273cfaab6cb53890d2e8e9#egg=moonstreamdb&subdirectory=db", + "python-dateutil", "requests", "tqdm", "web3", From e5226b8a8b14c85f3a43875d539ee900c15ed62e Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Mon, 9 Aug 2021 05:55:10 -0700 Subject: [PATCH 3/6] Working implementation of "ethcrawlers trending" The 24 hour queries take a LONG time but the 1 hour queries execute in 4 seconds. --- crawlers/mooncrawl/cli.py | 13 +++- crawlers/mooncrawl/ethereum.py | 113 ++++++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 5 deletions(-) diff --git a/crawlers/mooncrawl/cli.py b/crawlers/mooncrawl/cli.py index 2673269c..44489c1d 100644 --- a/crawlers/mooncrawl/cli.py +++ b/crawlers/mooncrawl/cli.py @@ -16,6 +16,8 @@ from .ethereum import ( check_missing_blocks, get_latest_blocks, process_contract_deployments, + DateRange, + trending, ) from .settings import MOONSTREAM_CRAWL_WORKERS @@ -167,8 +169,15 @@ def ethcrawler_contracts_update_handler(args: argparse.Namespace) -> None: def ethcrawler_trending_handler(args: argparse.Namespace) -> None: - with args.outfile: - print(args) + date_range = DateRange( + start_time=args.start, + end_time=args.end, + include_start=args.include_start, + include_end=args.include_end, + ) + results = trending(date_range) + with args.outfile as ofp: + json.dump(results, ofp) def main() -> None: diff --git a/crawlers/mooncrawl/ethereum.py b/crawlers/mooncrawl/ethereum.py index a4f94c4a..a4929306 100644 --- a/crawlers/mooncrawl/ethereum.py +++ b/crawlers/mooncrawl/ethereum.py @@ -1,12 +1,17 @@ from concurrent.futures import Future, ProcessPoolExecutor, wait -from typing import List, Optional, Tuple, Union +from dataclasses import dataclass +from datetime import datetime +from os import close +from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from sqlalchemy import desc +from sqlalchemy import desc, Column +from sqlalchemy import func +from sqlalchemy.orm import Session, Query from web3 import Web3, IPCProvider, HTTPProvider from web3.types import BlockData from .settings import MOONSTREAM_IPC_PATH, MOONSTREAM_CRAWL_WORKERS -from moonstreamdb.db import yield_db_session_ctx +from moonstreamdb.db import yield_db_session, yield_db_session_ctx from moonstreamdb.models import ( EthereumBlock, EthereumSmartContract, @@ -20,6 +25,14 @@ class EthereumBlockCrawlError(Exception): """ +@dataclass +class DateRange: + start_time: datetime + end_time: datetime + include_start: bool + include_end: bool + + def connect(web3_uri: Optional[str] = MOONSTREAM_IPC_PATH): web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() if web3_uri is not None: @@ -263,3 +276,97 @@ def process_contract_deployments() -> List[Tuple[str, str]]: current_offset += limit return results + + +def trending( + date_range: DateRange, db_session: Optional[Session] = None +) -> Dict[str, Any]: + close_db_session = False + if db_session is None: + close_db_session = True + db_session = next(yield_db_session()) + + start_timestamp = int(date_range.start_time.timestamp()) + end_timestamp = int(date_range.end_time.timestamp()) + + def make_query( + transaction_column: Column, + aggregate_column: Column, + aggregate_func: Callable, + aggregate_label: str, + ) -> Query: + query = db_session.query( + transaction_column, aggregate_func(aggregate_column).label(aggregate_label) + ).join( + EthereumBlock, + EthereumTransaction.block_number == EthereumBlock.block_number, + ) + if date_range.include_start: + query = query.filter(EthereumBlock.timestamp >= start_timestamp) + else: + query = query.filter(EthereumBlock.timestamp > start_timestamp) + + if date_range.include_end: + query = query.filter(EthereumBlock.timestamp <= end_timestamp) + else: + query = query.filter(EthereumBlock.timestamp < end_timestamp) + + query = ( + query.group_by(transaction_column).order_by(desc(aggregate_label)).limit(10) + ) + + return query + + results: Dict[str, Any] = {} + + try: + transactions_out_query = make_query( + EthereumTransaction.from_address, + EthereumTransaction.hash, + func.count, + "transactions_out", + ) + transactions_out = transactions_out_query.all() + results["transactions_out"] = [ + {"address": row[0], "metric": row[1]} for row in transactions_out + ] + + transactions_in_query = make_query( + EthereumTransaction.to_address, + EthereumTransaction.hash, + func.count, + "transactions_in", + ) + transactions_in = transactions_in_query.all() + results["transactions_in"] = [ + {"address": row[0], "metric": row[1]} for row in transactions_in + ] + + value_out_query = make_query( + EthereumTransaction.from_address, + EthereumTransaction.value, + func.sum, + "value_out", + ) + value_out = value_out_query.all() + results["value_out"] = [ + {"address": row[0], "metric": float(row[1])} for row in value_out + ] + + value_in_query = make_query( + EthereumTransaction.to_address, + EthereumTransaction.value, + func.sum, + "value_in", + ) + value_in = value_in_query.all() + results["value_in"] = [ + {"address": row[0], "metric": float(row[1])} for row in value_in + ] + + pass + finally: + if close_db_session: + db_session.close() + + return results From 2d513800aa8e970843d5d7bc643f6dc5e601c733 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Mon, 9 Aug 2021 06:01:28 -0700 Subject: [PATCH 4/6] Displaying WEI values as ints (not floats) Also changed "metric" to "statistic" in the output for `mooncrawl ethcrawler trending`. --- crawlers/mooncrawl/ethereum.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crawlers/mooncrawl/ethereum.py b/crawlers/mooncrawl/ethereum.py index a4929306..7908e45b 100644 --- a/crawlers/mooncrawl/ethereum.py +++ b/crawlers/mooncrawl/ethereum.py @@ -328,7 +328,7 @@ def trending( ) transactions_out = transactions_out_query.all() results["transactions_out"] = [ - {"address": row[0], "metric": row[1]} for row in transactions_out + {"address": row[0], "statistic": row[1]} for row in transactions_out ] transactions_in_query = make_query( @@ -339,7 +339,7 @@ def trending( ) transactions_in = transactions_in_query.all() results["transactions_in"] = [ - {"address": row[0], "metric": row[1]} for row in transactions_in + {"address": row[0], "statistic": row[1]} for row in transactions_in ] value_out_query = make_query( @@ -350,7 +350,7 @@ def trending( ) value_out = value_out_query.all() results["value_out"] = [ - {"address": row[0], "metric": float(row[1])} for row in value_out + {"address": row[0], "statistic": int(row[1])} for row in value_out ] value_in_query = make_query( @@ -361,7 +361,7 @@ def trending( ) value_in = value_in_query.all() results["value_in"] = [ - {"address": row[0], "metric": float(row[1])} for row in value_in + {"address": row[0], "statistic": int(row[1])} for row in value_in ] pass From d9d863e74265906fe97f1ef46fce94d5837741e5 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Mon, 9 Aug 2021 06:33:56 -0700 Subject: [PATCH 5/6] Functionality to publish JSON results to Humbug For `mooncrawl ethcrawler trending` --- crawlers/mooncrawl/cli.py | 6 ++++++ crawlers/mooncrawl/publish.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 crawlers/mooncrawl/publish.py diff --git a/crawlers/mooncrawl/cli.py b/crawlers/mooncrawl/cli.py index 44489c1d..deaa8932 100644 --- a/crawlers/mooncrawl/cli.py +++ b/crawlers/mooncrawl/cli.py @@ -19,6 +19,7 @@ from .ethereum import ( DateRange, trending, ) +from .publish import publish_json from .settings import MOONSTREAM_CRAWL_WORKERS @@ -176,6 +177,11 @@ def ethcrawler_trending_handler(args: argparse.Namespace) -> None: include_end=args.include_end, ) results = trending(date_range) + if args.humbug: + opening_bracket = "[" if args.include_start else "(" + closing_bracket = "]" if args.include_end else ")" + title = f"Ethereum trending addresses: {opening_bracket}{args.start}, {args.end}{closing_bracket}" + publish_json("ethereum_trending", args.humbug, title, results) with args.outfile as ofp: json.dump(results, ofp) diff --git a/crawlers/mooncrawl/publish.py b/crawlers/mooncrawl/publish.py new file mode 100644 index 00000000..c1765610 --- /dev/null +++ b/crawlers/mooncrawl/publish.py @@ -0,0 +1,33 @@ +import json +import os +from typing import Any, Dict, List, Optional + +import requests + + +def publish_json( + crawl_type: str, + humbug_token: str, + title: str, + content: Dict[str, Any], + tags: Optional[List[str]] = None, +) -> None: + spire_api_url = os.environ.get( + "MOONSTREAM_SPIRE_API_URL", "https://spire.bugout.dev" + ).rstrip("/") + report_url = f"{spire_api_url}/humbug/reports" + + if tags is None: + tags = [] + + tags.append(f"crawl_type:{crawl_type}") + + headers = { + "Authorization": f"Bearer {humbug_token}", + } + request_body = {"title": title, "content": json.dumps(content), "tags": tags} + query_parameters = {"sync": True} + response = requests.post( + report_url, headers=headers, json=request_body, params=query_parameters + ) + response.raise_for_status() From bd49b2956be63c7fc5a030ce230889f298fbba87 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Mon, 9 Aug 2021 09:30:08 -0700 Subject: [PATCH 6/6] Slight renaming for clarity of internal args --- crawlers/mooncrawl/ethereum.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crawlers/mooncrawl/ethereum.py b/crawlers/mooncrawl/ethereum.py index 6c845e97..942fb0a3 100644 --- a/crawlers/mooncrawl/ethereum.py +++ b/crawlers/mooncrawl/ethereum.py @@ -290,13 +290,13 @@ def trending( end_timestamp = int(date_range.end_time.timestamp()) def make_query( - transaction_column: Column, - aggregate_column: Column, + identifying_column: Column, + statistic_column: Column, aggregate_func: Callable, aggregate_label: str, ) -> Query: query = db_session.query( - transaction_column, aggregate_func(aggregate_column).label(aggregate_label) + identifying_column, aggregate_func(statistic_column).label(aggregate_label) ).join( EthereumBlock, EthereumTransaction.block_number == EthereumBlock.block_number, @@ -312,7 +312,7 @@ def trending( query = query.filter(EthereumBlock.timestamp < end_timestamp) query = ( - query.group_by(transaction_column).order_by(desc(aggregate_label)).limit(10) + query.group_by(identifying_column).order_by(desc(aggregate_label)).limit(10) ) return query