Merge pull request #92 from zomglings/crawlers-ethereum-trending

Add "mooncrawl ethcrawler trending" command
pull/95/head
Neeraj Kashyap 2021-08-09 09:32:49 -07:00 zatwierdzone przez GitHub
commit 58a9268b4d
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
5 zmienionych plików z 209 dodań i 4 usunięć

Wyświetl plik

@ -8,13 +8,18 @@ import sys
import time
from typing import Iterator, List
import dateutil.parser
from .ethereum import (
crawl_blocks_executor,
crawl_blocks,
check_missing_blocks,
get_latest_blocks,
process_contract_deployments,
DateRange,
trending,
)
from .publish import publish_json
from .settings import MOONSTREAM_CRAWL_WORKERS
@ -164,6 +169,23 @@ def ethcrawler_contracts_update_handler(args: argparse.Namespace) -> None:
json.dump(results, args.outfile)
def ethcrawler_trending_handler(args: argparse.Namespace) -> None:
date_range = DateRange(
start_time=args.start,
end_time=args.end,
include_start=args.include_start,
include_end=args.include_end,
)
results = trending(date_range)
if args.humbug:
opening_bracket = "[" if args.include_start else "("
closing_bracket = "]" if args.include_end else ")"
title = f"Ethereum trending addresses: {opening_bracket}{args.start}, {args.end}{closing_bracket}"
publish_json("ethereum_trending", args.humbug, title, results)
with args.outfile as ofp:
json.dump(results, ofp)
def main() -> None:
parser = argparse.ArgumentParser(description="Moonstream crawlers CLI")
parser.set_defaults(func=lambda _: parser.print_help())
@ -306,6 +328,47 @@ def main() -> None:
func=ethcrawler_contracts_update_handler
)
parser_ethcrawler_trending = subcommands.add_parser(
"trending", description="Trending addresses on the Ethereum blockchain"
)
parser_ethcrawler_trending.add_argument(
"-s",
"--start",
type=dateutil.parser.parse,
required=True,
help="Start time for window to calculate trending addresses in",
)
parser_ethcrawler_trending.add_argument(
"--include-start",
action="store_true",
help="Set this flag if range should include start time",
)
parser_ethcrawler_trending.add_argument(
"-e",
"--end",
type=dateutil.parser.parse,
required=True,
help="End time for window to calculate trending addresses in",
)
parser_ethcrawler_trending.add_argument(
"--include-end",
action="store_true",
help="Set this flag if range should include end time",
)
parser_ethcrawler_trending.add_argument(
"--humbug",
default=None,
help="If you would like to write this data to a Moonstream journal, please provide a Humbug token for that here.",
)
parser_ethcrawler_trending.add_argument(
"-o",
"--outfile",
type=argparse.FileType("w"),
default=sys.stdout,
help="Optional file to write output to. By default, prints to stdout.",
)
parser_ethcrawler_trending.set_defaults(func=ethcrawler_trending_handler)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -1,12 +1,17 @@
from concurrent.futures import Future, ProcessPoolExecutor, wait
from typing import List, Optional, Tuple, Union
from dataclasses import dataclass
from datetime import datetime
from os import close
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from sqlalchemy import desc
from sqlalchemy import desc, Column
from sqlalchemy import func
from sqlalchemy.orm import Session, Query
from web3 import Web3, IPCProvider, HTTPProvider
from web3.types import BlockData
from .settings import MOONSTREAM_IPC_PATH, MOONSTREAM_CRAWL_WORKERS
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.db import yield_db_session, yield_db_session_ctx
from moonstreamdb.models import (
EthereumBlock,
EthereumAddress,
@ -20,6 +25,14 @@ class EthereumBlockCrawlError(Exception):
"""
@dataclass
class DateRange:
start_time: datetime
end_time: datetime
include_start: bool
include_end: bool
def connect(web3_uri: Optional[str] = MOONSTREAM_IPC_PATH):
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
if web3_uri is not None:
@ -263,3 +276,97 @@ def process_contract_deployments() -> List[Tuple[str, str]]:
current_offset += limit
return results
def trending(
date_range: DateRange, db_session: Optional[Session] = None
) -> Dict[str, Any]:
close_db_session = False
if db_session is None:
close_db_session = True
db_session = next(yield_db_session())
start_timestamp = int(date_range.start_time.timestamp())
end_timestamp = int(date_range.end_time.timestamp())
def make_query(
identifying_column: Column,
statistic_column: Column,
aggregate_func: Callable,
aggregate_label: str,
) -> Query:
query = db_session.query(
identifying_column, aggregate_func(statistic_column).label(aggregate_label)
).join(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
if date_range.include_start:
query = query.filter(EthereumBlock.timestamp >= start_timestamp)
else:
query = query.filter(EthereumBlock.timestamp > start_timestamp)
if date_range.include_end:
query = query.filter(EthereumBlock.timestamp <= end_timestamp)
else:
query = query.filter(EthereumBlock.timestamp < end_timestamp)
query = (
query.group_by(identifying_column).order_by(desc(aggregate_label)).limit(10)
)
return query
results: Dict[str, Any] = {}
try:
transactions_out_query = make_query(
EthereumTransaction.from_address,
EthereumTransaction.hash,
func.count,
"transactions_out",
)
transactions_out = transactions_out_query.all()
results["transactions_out"] = [
{"address": row[0], "statistic": row[1]} for row in transactions_out
]
transactions_in_query = make_query(
EthereumTransaction.to_address,
EthereumTransaction.hash,
func.count,
"transactions_in",
)
transactions_in = transactions_in_query.all()
results["transactions_in"] = [
{"address": row[0], "statistic": row[1]} for row in transactions_in
]
value_out_query = make_query(
EthereumTransaction.from_address,
EthereumTransaction.value,
func.sum,
"value_out",
)
value_out = value_out_query.all()
results["value_out"] = [
{"address": row[0], "statistic": int(row[1])} for row in value_out
]
value_in_query = make_query(
EthereumTransaction.to_address,
EthereumTransaction.value,
func.sum,
"value_in",
)
value_in = value_in_query.all()
results["value_in"] = [
{"address": row[0], "statistic": int(row[1])} for row in value_in
]
pass
finally:
if close_db_session:
db_session.close()
return results

Wyświetl plik

@ -0,0 +1,33 @@
import json
import os
from typing import Any, Dict, List, Optional
import requests
def publish_json(
crawl_type: str,
humbug_token: str,
title: str,
content: Dict[str, Any],
tags: Optional[List[str]] = None,
) -> None:
spire_api_url = os.environ.get(
"MOONSTREAM_SPIRE_API_URL", "https://spire.bugout.dev"
).rstrip("/")
report_url = f"{spire_api_url}/humbug/reports"
if tags is None:
tags = []
tags.append(f"crawl_type:{crawl_type}")
headers = {
"Authorization": f"Bearer {humbug_token}",
}
request_body = {"title": title, "content": json.dumps(content), "tags": tags}
query_parameters = {"sync": True}
response = requests.post(
report_url, headers=headers, json=request_body, params=query_parameters
)
response.raise_for_status()

Wyświetl plik

@ -2,3 +2,4 @@
export MOONSTREAM_IPC_PATH=null
export MOONSTREAM_CRAWL_WORKERS=4
export MOONSTREAM_DB_URI="postgresql://<username>:<password>@<db_host>:<db_port>/<db_name>"
export MOONSTREAM_HUMBUG_TOKEN="<Token for crawlers store data via Humbug>"

Wyświetl plik

@ -34,6 +34,7 @@ setup(
zip_safe=False,
install_requires=[
"moonstreamdb @ git+https://git@github.com/bugout-dev/moonstream.git@39d2b8e36a49958a9ae085ec2cc1be3fc732b9d0#egg=moonstreamdb&subdirectory=db",
"python-dateutil",
"requests",
"tqdm",
"web3",
@ -43,7 +44,7 @@ setup(
"console_scripts": [
"ethcrawler=mooncrawl.ethcrawler:main",
"esd=mooncrawl.esd:main",
"identity=mooncrawl.identity:main"
"identity=mooncrawl.identity:main",
]
},
)