Merge pull request #481 from bugout-dev/moonworm-crawler

Moonworm Crawler
pull/500/head
Neeraj Kashyap 2021-12-16 13:53:55 -08:00 zatwierdzone przez GitHub
commit c5dd5e27cc
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
22 zmienionych plików z 1564 dodań i 30 usunięć

Wyświetl plik

@ -6,6 +6,7 @@ export BUGOUT_SPIRE_URL="https://spire.bugout.dev"
export MOONSTREAM_APPLICATION_ID="<issued_bugout_application_id>"
export MOONSTREAM_ADMIN_ACCESS_TOKEN="<Access_token_to_application_resources>"
export MOONSTREAM_POOL_SIZE=0
export MOONSTREAM_MOONWORM_TASKS_JOURNAL="<Bugout journal with tasks for moonworm>"
# Blockchain, txpool, whalewatch data depends variables
export MOONSTREAM_DATA_JOURNAL_ID="<bugout_journal_id_to_store_blockchain_data>"

Wyświetl plik

@ -1,11 +1,14 @@
import hashlib
import json
from itertools import chain
import logging
from typing import Optional, Dict, Any, Union
from typing import List, Optional, Dict, Any, Union
import time
from enum import Enum
import uuid
import boto3 # type: ignore
from bugout.data import BugoutSearchResults
from bugout.data import BugoutSearchResults, BugoutSearchResult
from bugout.journal import SearchOrder
from ens.utils import is_valid_ens_name # type: ignore
from eth_utils.address import is_address # type: ignore
@ -31,6 +34,7 @@ from .settings import (
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_BUCKET,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
from web3 import Web3
@ -432,3 +436,107 @@ def upload_abi_to_s3(
update["s3_path"] = result_key
return update
def get_all_entries_from_search(
journal_id: str, search_query: str, limit: int, token: str
) -> List[BugoutSearchResult]:
"""
Get all required entries from journal using search interface
"""
offset = 0
results: List[BugoutSearchResult] = []
try:
existing_metods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
content=False,
timeout=10.0,
limit=limit,
offset=offset,
)
results.extend(existing_metods.results)
except Exception as e:
reporter.error_report(e)
if len(results) != existing_metods.total_results:
for offset in range(limit, existing_metods.total_results, limit):
existing_metods = bc.search(
token=token,
journal_id=journal_id,
query=search_query,
content=False,
timeout=10.0,
limit=limit,
offset=offset,
)
results.extend(existing_metods.results)
return results
def apply_moonworm_tasks(
subscription_type: str,
abi: Any,
address: str,
) -> None:
"""
Get list of subscriptions loads abi and apply them as moonworm tasks if it not exist
"""
entries_pack = []
try:
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
search_query=f"tag:address:{address} tag:subscription_type:{subscription_type}",
limit=100,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
existing_tags = [entry.tags for entry in entries]
existing_hashes = [
tag.split(":")[-1]
for tag in chain(*existing_tags)
if "abi_method_hash" in tag
]
abi_hashes_dict = {
hashlib.md5(json.dumps(method).encode("utf-8")).hexdigest(): method
for method in abi
if (method["type"] in ("event", "function"))
and (method.get("stateMutability", "") != "view")
}
for hash in abi_hashes_dict:
if hash not in existing_hashes:
entries_pack.append(
{
"title": address,
"content": json.dumps(abi_hashes_dict[hash], indent=4),
"tags": [
f"address:{address}",
f"type:{abi_hashes_dict[hash]['type']}",
f"abi_method_hash:{hash}",
f"subscription_type:{subscription_type}",
f"abi_name:{abi_hashes_dict[hash]['name']}",
f"status:active",
],
}
)
except Exception as e:
reporter.error_report(e)
if len(entries_pack) > 0:
bc.create_entries_pack(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries=entries_pack,
timeout=15,
)

Wyświetl plik

@ -11,13 +11,14 @@ from typing import Optional
from moonstreamdb.db import SessionLocal
from sqlalchemy.orm import with_expression
from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID
from ..web3_provider import yield_web3_provider
from . import subscription_types, subscriptions
from . import subscription_types, subscriptions, moonworm_tasks
from .migrations import checksum_address, update_dashboard_subscription_key
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -117,6 +118,16 @@ def migrations_run(args: argparse.Namespace) -> None:
db_session.close()
def moonworm_tasks_list_handler(args: argparse.Namespace) -> None:
moonworm_tasks.get_list_of_addresses()
def moonworm_tasks_add_subscription_handler(args: argparse.Namespace) -> None:
moonworm_tasks.add_subscription(args.id)
def main() -> None:
cli_description = f"""Moonstream Admin CLI
@ -337,6 +348,33 @@ This CLI is configured to work with the following API URLs:
)
parser_migrations_run.set_defaults(func=migrations_run)
parser_moonworm_tasks = subcommands.add_parser(
"moonworm-tasks", description="Manage tasks for moonworm journal."
)
parser_moonworm_tasks.set_defaults(func=lambda _: parser_migrations.print_help())
subcommands_moonworm_tasks = parser_moonworm_tasks.add_subparsers(
description="Moonworm taks commands"
)
parser_moonworm_tasks_list = subcommands_moonworm_tasks.add_parser(
"list", description="Return list of addresses in moonworm journal."
)
parser_moonworm_tasks_list.set_defaults(func=moonworm_tasks_list_handler)
parser_moonworm_tasks_add = subcommands_moonworm_tasks.add_parser(
"add_subscription", description="Manage tasks for moonworm journal."
)
parser_moonworm_tasks_add.add_argument(
"-i",
"--id",
type=str,
help="Id of subscription for add to moonworm tasks.",
)
parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -0,0 +1,87 @@
import logging
import json
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from ..actions import get_all_entries_from_search, apply_moonworm_tasks
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL
from ..settings import bugout_client as bc
logger = logging.getLogger(__name__)
def get_list_of_addresses():
"""
Return list of addresses of tasks
"""
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
search_query=f"?tag:type:event ?tag:type:function",
limit=100,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
addresses = set()
for entry in entries:
addresses.add(entry.title)
print(addresses)
def add_subscription(id: str):
"""
Return list of tags depends on query and tag
"""
try:
subscription_resource: BugoutResource = bc.get_resource(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
resource_id=id,
)
except BugoutResponseException as e:
logging.error(f"Bugout error: {str(e)}")
except Exception as e:
logger.error(f"Error get resource: {str(e)}")
s3_client = boto3.client("s3")
if subscription_resource.resource_data["abi"] is not None:
bucket = subscription_resource.resource_data["bucket"]
key = subscription_resource.resource_data["s3_path"]
if bucket is None or key is None:
logger.error(f"Error subscription not have s3 path to abi")
s3_path = f"s3://{bucket}/{key}"
try:
response = s3_client.get_object(
Bucket=bucket,
Key=key,
)
except s3_client.exceptions.NoSuchKey as e:
logger.error(
f"Error getting Abi for subscription {str(id)} S3 {s3_path} does not exist : {str(e)}"
)
abi = json.loads(response["Body"].read())
apply_moonworm_tasks(
subscription_type=subscription_resource.resource_data[
"subscription_type_id"
],
abi=abi,
address=subscription_resource.resource_data["address"],
)
else:
logging.info("For apply to moonworm tasks subscriptions must have an abi.")

Wyświetl plik

@ -42,7 +42,8 @@ blockchain_by_subscription_id = {
@router.post("/", tags=["dashboards"], response_model=BugoutResource)
async def add_dashboard_handler(
request: Request, dashboard: data.DashboardCreate = Body(...)
request: Request,
dashboard: data.DashboardCreate = Body(...),
) -> BugoutResource:
"""
Add subscription to blockchain stream data for user.
@ -74,7 +75,7 @@ async def add_dashboard_handler(
s3_client = boto3.client("s3")
available_subscriptions = {
available_subscriptions: Dict[UUID, Dict[str, Any]] = {
resource.id: resource.resource_data for resource in resources.resources
}
@ -232,7 +233,9 @@ async def get_dashboard_handler(
@router.put("/{dashboard_id}", tags=["dashboards"], response_model=BugoutResource)
async def update_dashboard_handler(
request: Request, dashboard_id: str, dashboard: data.DashboardUpdate = Body(...)
request: Request,
dashboard_id: str,
dashboard: data.DashboardUpdate = Body(...),
) -> BugoutResource:
"""
Update dashboards mainly fully overwrite name and subscription metadata

Wyświetl plik

@ -10,13 +10,10 @@ from typing import List, Optional, Dict, Any
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from bugout.exceptions import BugoutResponseException
from fastapi import APIRouter, Depends, Request, Form
from fastapi import APIRouter, Depends, Request, Form, BackgroundTasks
from web3 import Web3
from ..actions import (
validate_abi_json,
upload_abi_to_s3,
)
from ..actions import validate_abi_json, upload_abi_to_s3, apply_moonworm_tasks
from ..admin import subscription_types
from .. import data
from ..middleware import MoonstreamHTTPException
@ -41,6 +38,7 @@ BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
@router.post("/", tags=["subscriptions"], response_model=data.SubscriptionResourceData)
async def add_subscription_handler(
request: Request, # subscription_data: data.CreateSubscriptionRequest = Body(...)
background_tasks: BackgroundTasks,
address: str = Form(...),
color: str = Form(...),
label: str = Form(...),
@ -143,6 +141,13 @@ async def add_subscription_handler(
logger.error(f"Error getting user subscriptions: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
background_tasks.add_task(
apply_moonworm_tasks,
subscription_type_id,
json_abi,
address,
)
return data.SubscriptionResourceData(
id=str(resource.id),
user_id=resource.resource_data["user_id"],
@ -234,6 +239,7 @@ async def get_subscriptions_handler(request: Request) -> data.SubscriptionsListR
async def update_subscriptions_handler(
request: Request,
subscription_id: str,
background_tasks: BackgroundTasks,
color: Optional[str] = Form(None),
label: Optional[str] = Form(None),
abi: Optional[str] = Form(None),
@ -301,6 +307,14 @@ async def update_subscriptions_handler(
logger.error(f"Error getting user subscriptions: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
if abi:
background_tasks.add_task(
apply_moonworm_tasks,
subscription_resource.resource_data["subscription_type_id"],
json_abi,
subscription_resource.resource_data["address"],
)
return data.SubscriptionResourceData(
id=str(resource.id),
user_id=resource.resource_data["user_id"],

Wyświetl plik

@ -84,3 +84,11 @@ if MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX is None:
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX = (
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX.rstrip("/")
)
MOONSTREAM_MOONWORM_TASKS_JOURNAL = os.environ.get(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL", ""
)
if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "":
raise ValueError(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set"
)

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream library and API version.
"""
MOONSTREAMAPI_VERSION = "0.1.1"
MOONSTREAMAPI_VERSION = "0.1.2"

Wyświetl plik

@ -45,7 +45,7 @@ POLYGON_MISSING_TIMER_FILE="polygon-missing.timer"
POLYGON_STATISTICS_SERVICE_FILE="polygon-statistics.service"
POLYGON_STATISTICS_TIMER_FILE="polygon-statistics.timer"
POLYGON_TXPOOL_SERVICE_FILE="polygon-txpool.service"
POLYGON_MOONWORM_CRAWLER_SERVICE_FILE="polygon-moonworm-crawler.service"
set -eu
@ -150,3 +150,11 @@ systemctl restart "${POLYGON_STATISTICS_TIMER_FILE}"
# cp "${SCRIPT_DIR}/${POLYGON_TXPOOL_SERVICE_FILE}" "/etc/systemd/system/${POLYGON_TXPOOL_SERVICE_FILE}"
# systemctl daemon-reload
# systemctl restart "${POLYGON_TXPOOL_SERVICE_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Polygon moonworm crawler service definition with ${POLYGON_MOONWORM_CRAWLER_SERVICE_FILE}"
chmod 644 "${SCRIPT_DIR}/${POLYGON_MOONWORM_CRAWLER_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${POLYGON_MOONWORM_CRAWLER_SERVICE_FILE}" "/etc/systemd/system/${POLYGON_MOONWORM_CRAWLER_SERVICE_FILE}"
systemctl daemon-reload
systemctl restart "${POLYGON_MOONWORM_CRAWLER_SERVICE_FILE}"

Wyświetl plik

@ -0,0 +1,18 @@
[Unit]
Description=Polygon moonworm crawler
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli crawl -b polygon
SyslogIdentifier=polygon-moonworm-crawler
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,212 @@
import argparse
import logging
from typing import Optional
from moonstreamdb.db import yield_db_session_ctx
from web3 import Web3
from web3.middleware import geth_poa_middleware
from ..blockchain import AvailableBlockchainType
from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client
from .continuous_crawler import _retry_connect_web3, continuous_crawler
from .crawler import (
SubscriptionTypes,
get_crawl_job_entries,
make_event_crawl_jobs,
make_function_call_crawl_jobs,
)
from .db import get_last_labeled_block_number
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def handle_crawl(args: argparse.Namespace) -> None:
initial_event_jobs = make_event_crawl_jobs(
get_crawl_job_entries(
SubscriptionTypes.POLYGON_BLOCKCHAIN,
"event",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
)
logger.info(f"Initial event crawl jobs count: {len(initial_event_jobs)}")
initial_function_call_jobs = make_function_call_crawl_jobs(
get_crawl_job_entries(
SubscriptionTypes.POLYGON_BLOCKCHAIN,
"function",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
)
)
logger.info(
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
)
# Couldn't figure out how to convert from string to AvailableBlockchainType
# AvailableBlockchainType(args.blockchain_type) is not working
blockchain_type = AvailableBlockchainType(args.blockchain_type)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = _retry_connect_web3(blockchain_type)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(
args.web3,
)
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_labeled_block = get_last_labeled_block_number(db_session, blockchain_type)
logger.info(f"Last labeled block: {last_labeled_block}")
start_block = args.start
if start_block is None:
logger.info("No start block provided")
if last_labeled_block is not None:
start_block = last_labeled_block - 1
logger.info(f"Using last labeled block as start: {start_block}")
else:
logger.info(
"No last labeled block found, using start block (web3.eth.blockNumber - 300)"
)
start_block = web3.eth.blockNumber - 10000
logger.info(f"Starting from block: {start_block}")
elif last_labeled_block is not None:
if start_block < last_labeled_block and not args.force:
logger.info(
f"Start block is less than last labeled block, using last labeled block: {last_labeled_block}"
)
logger.info(
f"Use --force to override this and start from the start block: {start_block}"
)
start_block = last_labeled_block
else:
logger.info(f"Using start block: {start_block}")
else:
logger.info(f"Using start block: {start_block}")
continuous_crawler(
db_session,
blockchain_type,
web3,
initial_event_jobs,
initial_function_call_jobs,
start_block,
args.max_blocks_batch,
args.min_blocks_batch,
args.confirmations,
args.min_sleep_time,
args.heartbeat_interval,
args.new_jobs_refetch_interval,
)
def main() -> None:
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
crawl_parser = subparsers.add_parser("crawl")
crawl_parser.add_argument(
"--start",
"-s",
type=int,
default=None,
)
crawl_parser.add_argument(
"--blockchain-type",
"-b",
type=str,
choices=[
AvailableBlockchainType.ETHEREUM.value,
AvailableBlockchainType.POLYGON.value,
],
required=True,
)
crawl_parser.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
crawl_parser.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
crawl_parser.add_argument(
"--max-blocks-batch",
"-m",
type=int,
default=100,
help="Maximum number of blocks to crawl in a single batch",
)
crawl_parser.add_argument(
"--min-blocks-batch",
"-n",
type=int,
default=10,
help="Minimum number of blocks to crawl in a single batch",
)
crawl_parser.add_argument(
"--confirmations",
"-c",
type=int,
default=100,
help="Number of confirmations to wait for",
)
crawl_parser.add_argument(
"--min-sleep-time",
"-t",
type=float,
default=0.01,
help="Minimum time to sleep between crawl step",
)
crawl_parser.add_argument(
"--heartbeat-interval",
"-i",
type=float,
default=60,
help="Heartbeat interval in seconds",
)
crawl_parser.add_argument(
"--new-jobs-refetch-interval",
"-r",
type=float,
default=120,
help="Time to wait before refetching new jobs",
)
crawl_parser.add_argument(
"--force",
action="store_true",
default=False,
help="Force start from the start block",
)
crawl_parser.set_defaults(func=handle_crawl)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

Wyświetl plik

@ -0,0 +1,312 @@
import logging
import time
import traceback
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
)
from moonworm.crawler.networks import Network # type: ignore
from sqlalchemy.orm.session import Session
from web3 import Web3
from ..blockchain import connect
from ..data import AvailableBlockchainType
from .crawler import (
EventCrawlJob,
FunctionCallCrawlJob,
blockchain_type_to_subscription_type,
get_crawl_job_entries,
heartbeat,
make_event_crawl_jobs,
make_function_call_crawl_jobs,
merge_event_crawl_jobs,
merge_function_call_crawl_jobs,
)
from .db import save_events, save_function_calls
from .event_crawler import _crawl_events
from .function_call_crawler import _crawl_functions
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _date_to_str(date: datetime) -> str:
return date.strftime("%Y-%m-%d %H:%M:%S")
def _refetch_new_jobs(
old_event_jobs: List[EventCrawlJob],
old_function_call_jobs: List[FunctionCallCrawlJob],
blockchain_type: AvailableBlockchainType,
) -> Tuple[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Refetches new jobs from bugout journal, merges, and returns new jobs.
"""
max_created_at_event_job = max(job.created_at for job in old_event_jobs)
max_created_at_function_call_job = max(
job.created_at for job in old_function_call_jobs
)
logger.info("Looking for new event crawl jobs.")
old_event_jobs_length = len(old_event_jobs)
new_event_entries = get_crawl_job_entries(
subscription_type=blockchain_type_to_subscription_type(blockchain_type),
crawler_type="event",
created_at_filter=max_created_at_event_job,
)
new_event_jobs = make_event_crawl_jobs(new_event_entries)
event_crawl_jobs = merge_event_crawl_jobs(old_event_jobs, new_event_jobs)
logger.info(
f"Found {len(event_crawl_jobs) - old_event_jobs_length} new event crawl jobs. "
)
logger.info("Looking for new function call crawl jobs.")
old_function_call_jobs_length = len(old_function_call_jobs)
new_function_entries = get_crawl_job_entries(
subscription_type=blockchain_type_to_subscription_type(blockchain_type),
crawler_type="function",
created_at_filter=max_created_at_function_call_job,
)
new_function_call_jobs = make_function_call_crawl_jobs(new_function_entries)
function_call_crawl_jobs = merge_function_call_crawl_jobs(
old_function_call_jobs, new_function_call_jobs
)
logger.info(
f"Found {len(function_call_crawl_jobs) - old_function_call_jobs_length} new function call crawl jobs. "
)
return event_crawl_jobs, function_call_crawl_jobs
def _retry_connect_web3(
blockchain_type: AvailableBlockchainType,
retry_count: int = 10,
sleep_time: float = 5,
) -> Web3:
"""
Retry connecting to the blockchain.
"""
while retry_count > 0:
retry_count -= 1
try:
web3 = connect(blockchain_type)
web3.eth.block_number
logger.info(f"Connected to {blockchain_type}")
return web3
except Exception as e:
if retry_count == 0:
error = e
break
logger.error(f"Failed to connect to {blockchain_type} blockchain: {e}")
logger.info(f"Retrying in {sleep_time} seconds")
time.sleep(sleep_time)
raise Exception(
f"Failed to connect to {blockchain_type} blockchain after {retry_count} retries: {error}"
)
def continuous_crawler(
db_session: Session,
blockchain_type: AvailableBlockchainType,
web3: Optional[Web3],
event_crawl_jobs: List[EventCrawlJob],
function_call_crawl_jobs: List[FunctionCallCrawlJob],
start_block: int,
max_blocks_batch: int = 100,
min_blocks_batch: int = 10,
confirmations: int = 60,
min_sleep_time: float = 0.1,
heartbeat_interval: float = 60,
new_jobs_refetch_interval: float = 120,
):
assert (
min_blocks_batch < max_blocks_batch
), "min_blocks_batch must be less than max_blocks_batch"
assert min_blocks_batch > 0, "min_blocks_batch must be greater than 0"
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert confirmations > 0, "confirmations must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
assert heartbeat_interval > 0, "heartbeat_interval must be greater than 0"
assert (
new_jobs_refetch_interval > 0
), "new_jobs_refetch_interval must be greater than 0"
crawl_start_time = datetime.utcnow()
jobs_refetchet_time = crawl_start_time
if web3 is None:
web3 = _retry_connect_web3(blockchain_type)
network = (
Network.ethereum
if blockchain_type == AvailableBlockchainType.ETHEREUM
else Network.polygon
)
ethereum_state_provider = MoonstreamEthereumStateProvider(
web3,
network,
db_session,
)
heartbeat_template = {
"status": "crawling",
"start_block": start_block,
"last_block": start_block,
"crawl_start_time": _date_to_str(crawl_start_time),
"current_time": _date_to_str(crawl_start_time),
"current_event_jobs_length": len(event_crawl_jobs),
"current_function_call_jobs_length": len(function_call_crawl_jobs),
"jobs_last_refetched_at": _date_to_str(jobs_refetchet_time),
}
logger.info(f"Starting continuous event crawler start_block={start_block}")
logger.info("Sending initial heartbeat")
heartbeat(
crawler_type="event",
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
last_heartbeat_time = datetime.utcnow()
blocks_cache: Dict[int, int] = {}
failed_count = 0
try:
while True:
try:
# query db with limit 1, to avoid session closing
db_session.execute("SELECT 1")
time.sleep(min_sleep_time)
end_block = min(
web3.eth.blockNumber - confirmations,
start_block + max_blocks_batch,
)
if start_block + min_blocks_batch > end_block:
min_sleep_time *= 2
logger.info(
f"Sleeping for {min_sleep_time} seconds because of low block count"
)
continue
min_sleep_time = max(min_sleep_time, min_sleep_time / 2)
logger.info(f"Crawling events from {start_block} to {end_block}")
all_events = _crawl_events(
db_session=db_session,
blockchain_type=blockchain_type,
web3=web3,
jobs=event_crawl_jobs,
from_block=start_block,
to_block=end_block,
blocks_cache=blocks_cache,
db_block_query_batch=min_blocks_batch * 2,
)
logger.info(
f"Crawled {len(all_events)} events from {start_block} to {end_block}."
)
save_events(db_session, all_events, blockchain_type)
logger.info(
f"Crawling function calls from {start_block} to {end_block}"
)
all_function_calls = _crawl_functions(
blockchain_type,
ethereum_state_provider,
function_call_crawl_jobs,
start_block,
end_block,
)
logger.info(
f"Crawled {len(all_function_calls)} function calls from {start_block} to {end_block}."
)
save_function_calls(db_session, all_function_calls, blockchain_type)
current_time = datetime.utcnow()
if current_time - jobs_refetchet_time > timedelta(
seconds=new_jobs_refetch_interval
):
logger.info(
f"Refetching new jobs from bugout journal since {jobs_refetchet_time}"
)
event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs(
event_crawl_jobs, function_call_crawl_jobs, blockchain_type
)
jobs_refetchet_time = current_time
if current_time - last_heartbeat_time > timedelta(
seconds=heartbeat_interval
):
# Update heartbeat
heartbeat_template["last_block"] = end_block
heartbeat_template["current_time"] = _date_to_str(current_time)
heartbeat_template["current_event_jobs_length"] = len(
event_crawl_jobs
)
heartbeat_template["jobs_last_refetched_at"] = _date_to_str(
jobs_refetchet_time
)
heartbeat_template["current_function_call_jobs_length"] = len(
function_call_crawl_jobs
)
heartbeat_template[
"function_call metrics"
] = ethereum_state_provider.metrics
heartbeat(
crawler_type="event",
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
)
logger.info("Sending heartbeat.", heartbeat_template)
last_heartbeat_time = datetime.utcnow()
start_block = end_block + 1
failed_count = 0
except Exception as e:
logger.error(f"Internal error: {e}")
logger.exception(e)
failed_count += 1
if failed_count > 10:
logger.error("Too many failures, exiting")
raise e
try:
web3 = _retry_connect_web3(blockchain_type)
except Exception as err:
logger.error(f"Failed to reconnect: {err}")
logger.exception(err)
raise err
except BaseException as e:
logger.error(f"!!!!Crawler Died!!!!")
heartbeat_template["status"] = "dead"
heartbeat_template["current_time"] = _date_to_str(datetime.utcnow())
heartbeat_template["current_event_jobs_length"] = len(event_crawl_jobs)
heartbeat_template["jobs_last_refetched_at"] = _date_to_str(jobs_refetchet_time)
error_summary = (repr(e),)
error_traceback = (
"".join(
traceback.format_exception(
etype=type(e),
value=e,
tb=e.__traceback__,
)
),
)
heartbeat_template[
"die_reason"
] = f"{e.__class__.__name__}: {e}\n error_summary: {error_summary}\n error_traceback: {error_traceback}"
heartbeat_template["last_block"] = end_block
heartbeat(
crawler_type="event",
blockchain_type=blockchain_type,
crawler_status=heartbeat_template,
is_dead=True,
)
logger.exception(e)

Wyświetl plik

@ -0,0 +1,285 @@
import json
import logging
import re
import time
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, cast
from bugout.data import BugoutSearchResult
from eth_typing.evm import ChecksumAddress
from moonstreamdb.models import Base
from sqlalchemy.orm.session import Session
from web3.main import Web3
from mooncrawl.data import AvailableBlockchainType
from ..reporter import reporter
from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
bugout_client,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SubscriptionTypes(Enum):
POLYGON_BLOCKCHAIN = "polygon_smartcontract"
ETHEREUM_BLOCKCHAIN = "ethereum_smartcontract"
def _generate_reporter_callback(
crawler_type: str, blockchain_type: AvailableBlockchainType
) -> Callable[[Exception], None]:
def reporter_callback(error: Exception) -> None:
reporter.error_report(
error,
[
"moonworm",
"crawler",
"decode_error",
crawler_type,
blockchain_type.value,
],
)
return reporter_callback
def blockchain_type_to_subscription_type(
blockchain_type: AvailableBlockchainType,
) -> SubscriptionTypes:
if blockchain_type == AvailableBlockchainType.ETHEREUM:
return SubscriptionTypes.ETHEREUM_BLOCKCHAIN
elif blockchain_type == AvailableBlockchainType.POLYGON:
return SubscriptionTypes.POLYGON_BLOCKCHAIN
else:
raise ValueError(f"Unknown blockchain type: {blockchain_type}")
@dataclass
class EventCrawlJob:
event_abi_hash: str
event_abi: Dict[str, Any]
contracts: List[ChecksumAddress]
created_at: int
@dataclass
class FunctionCallCrawlJob:
contract_abi: List[Dict[str, Any]]
contract_address: ChecksumAddress
created_at: int
def get_crawl_job_entries(
subscription_type: SubscriptionTypes,
crawler_type: str,
journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL,
created_at_filter: int = None,
limit: int = 200,
) -> List[BugoutSearchResult]:
"""
Get all event ABIs from bugout journal
where tags are:
- #crawler_type:crawler_type (either event or function)
- #status:active
- #subscription_type:subscription_type (either polygon_blockchain or ethereum_blockchain)
"""
query = f"#status:active #type:{crawler_type} #subscription_type:{subscription_type.value}"
if created_at_filter is not None:
# Filtering by created_at
# Filtering not by strictly greater than
# because theoretically we can miss some jobs
# (in the last query bugout didn't return all of by last created_at)
# On the other hand, we may have multiple same jobs that will be filtered out
#
query += f" #created_at:>={created_at_filter}"
current_offset = 0
entries = []
while True:
search_result = bugout_client.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=journal_id,
query=query,
offset=current_offset,
limit=limit,
)
entries.extend(search_result.results)
# if len(entries) >= search_result.total_results:
if len(search_result.results) == 0:
break
current_offset += limit
return entries
def _get_tag(entry: BugoutSearchResult, tag: str) -> str:
for entry_tag in entry.tags:
if entry_tag.startswith(tag):
return entry_tag.split(":")[1]
raise ValueError(f"Tag {tag} not found in {entry}")
def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJob]:
"""
Create EventCrawlJob objects from bugout entries.
"""
crawl_job_by_hash: Dict[str, EventCrawlJob] = {}
for entry in entries:
abi_hash = _get_tag(entry, "abi_method_hash")
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
existing_crawl_job = crawl_job_by_hash.get(abi_hash)
if existing_crawl_job is not None:
if contract_address not in existing_crawl_job.contracts:
existing_crawl_job.contracts.append(contract_address)
else:
abi = cast(str, entry.content)
new_crawl_job = EventCrawlJob(
event_abi_hash=abi_hash,
event_abi=json.loads(abi),
contracts=[contract_address],
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
crawl_job_by_hash[abi_hash] = new_crawl_job
return [crawl_job for crawl_job in crawl_job_by_hash.values()]
def make_function_call_crawl_jobs(
entries: List[BugoutSearchResult],
) -> List[FunctionCallCrawlJob]:
"""
Create FunctionCallCrawlJob objects from bugout entries.
"""
crawl_job_by_address: Dict[str, FunctionCallCrawlJob] = {}
for entry in entries:
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
abi = cast(str, entry.content)
if contract_address not in crawl_job_by_address:
crawl_job_by_address[contract_address] = FunctionCallCrawlJob(
contract_abi=[json.loads(abi)],
contract_address=contract_address,
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
else:
crawl_job_by_address[contract_address].contract_abi.append(json.loads(abi))
return [crawl_job for crawl_job in crawl_job_by_address.values()]
def merge_event_crawl_jobs(
old_crawl_jobs: List[EventCrawlJob], new_event_crawl_jobs: List[EventCrawlJob]
) -> List[EventCrawlJob]:
"""
Merge new event crawl jobs with old ones.
If there is a new event crawl job with the same event_abi_hash
then we will merge the contracts to one job.
Othervise new job will be created
Important:
old_crawl_jobs will be modified
Returns:
Merged list of event crawl jobs
"""
for new_crawl_job in new_event_crawl_jobs:
for old_crawl_job in old_crawl_jobs:
if new_crawl_job.event_abi_hash == old_crawl_job.event_abi_hash:
old_crawl_job.contracts.extend(new_crawl_job.contracts)
break
else:
old_crawl_jobs.append(new_crawl_job)
return old_crawl_jobs
def merge_function_call_crawl_jobs(
old_crawl_jobs: List[FunctionCallCrawlJob],
new_function_call_crawl_jobs: List[FunctionCallCrawlJob],
) -> List[FunctionCallCrawlJob]:
"""
Merge new function call crawl jobs with old ones.
If there is a new function call crawl job with the same contract_address
then we will merge the contracts to one job.
Othervise new job will be created
Important:
old_crawl_jobs will be modified
Returns:
Merged list of function call crawl jobs
"""
for new_crawl_job in new_function_call_crawl_jobs:
for old_crawl_job in old_crawl_jobs:
if new_crawl_job.contract_address == old_crawl_job.contract_address:
old_crawl_job.contract_abi.extend(new_crawl_job.contract_abi)
break
else:
old_crawl_jobs.append(new_crawl_job)
return old_crawl_jobs
def _get_heartbeat_entry_id(
crawler_type: str, blockchain_type: AvailableBlockchainType
) -> str:
entries = bugout_client.search(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
query=f"#{crawler_type} #heartbeat #{blockchain_type.value} !#dead",
limit=1,
)
if entries.results:
return entries.results[0].entry_url.split("/")[-1]
else:
logger.info(f"No {crawler_type} heartbeat entry found, creating one")
entry = bugout_client.create_entry(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
title=f"{crawler_type} Heartbeat - {blockchain_type.value}",
tags=[crawler_type, "heartbeat", blockchain_type.value],
content="",
)
return str(entry.id)
def heartbeat(
crawler_type: str,
blockchain_type: AvailableBlockchainType,
crawler_status: Dict[str, Any],
is_dead: bool = False,
) -> None:
"""
Periodically crawler will update the status in bugout entry:
- Started at timestamp
- Started at block number
- Status: Running/Dead
- Last crawled block number
- Number of current jobs
- Time taken to crawl last crawl_step and speed per block
and other information later will be added.
"""
heartbeat_entry_id = _get_heartbeat_entry_id(crawler_type, blockchain_type)
bugout_client.update_entry_content(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=heartbeat_entry_id,
title=f"{crawler_type} Heartbeat - {blockchain_type.value}",
content=f"{json.dumps(crawler_status, indent=2)}",
)
if is_dead:
bugout_client.update_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=heartbeat_entry_id,
tags=[crawler_type, "heartbeat", blockchain_type.value, "dead"],
)

Wyświetl plik

@ -0,0 +1,169 @@
import logging
from typing import Any, Dict, List, Optional, Union
from eth_typing.evm import ChecksumAddress
from hexbytes.main import HexBytes
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import (
Base,
EthereumLabel,
EthereumTransaction,
PolygonLabel,
PolygonTransaction,
)
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import label
from ..blockchain import connect, get_block_model, get_label_model
from ..data import AvailableBlockchainType
from ..settings import CRAWLER_LABEL
from .crawler import FunctionCallCrawlJob, _generate_reporter_callback
from .event_crawler import Event
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _event_to_label(blockchain_type: AvailableBlockchainType, event: Event) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=CRAWLER_LABEL,
label_data={
"type": "event",
"name": event.event_name,
"args": event.args,
},
address=event.address,
block_number=event.block_number,
block_timestamp=event.block_timestamp,
transaction_hash=event.transaction_hash,
log_index=event.log_index,
)
return label
def _function_call_to_label(
blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall
) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=CRAWLER_LABEL,
label_data={
"type": "tx_call",
"name": function_call.function_name,
"caller": function_call.caller_address,
"args": function_call.function_args,
"status": function_call.status,
"gasUsed": function_call.gas_used,
},
address=function_call.contract_address,
block_number=function_call.block_number,
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
)
return label
def get_last_labeled_block_number(
db_session: Session, blockchain_type: AvailableBlockchainType
) -> Optional[int]:
label_model = get_label_model(blockchain_type)
block_number = (
db_session.query(label_model.block_number)
.filter(label_model.label == CRAWLER_LABEL)
.order_by(label_model.block_number.desc())
.limit(1)
.one_or_none()
)
return block_number[0] if block_number else None
def save_labels(db_session: Session, labels: List[Base]) -> None:
"""
Save labels in the database.
"""
try:
db_session.add_all(labels)
db_session.commit()
except Exception as e:
logger.error(f"Failed to save labels: {e}")
db_session.rollback()
raise e
def save_events(
db_session: Session, events: List[Event], blockchain_type: AvailableBlockchainType
) -> None:
label_model = get_label_model(blockchain_type)
events_hashes_to_save = [event.transaction_hash for event in events]
existing_labels = (
db_session.query(label_model.transaction_hash, label_model.log_index)
.filter(
label_model.label == CRAWLER_LABEL,
label_model.log_index != None,
label_model.transaction_hash.in_(events_hashes_to_save),
)
.all()
)
existing_labels_transactions = []
existing_log_index_by_tx_hash: Dict[str, List[int]] = {}
for label in existing_labels:
if label[0] not in existing_labels_transactions:
existing_labels_transactions.append(label[0])
existing_log_index_by_tx_hash[label[0]] = []
existing_log_index_by_tx_hash[label[0]].append(label[1])
labels_to_save = []
for event in events:
if event.transaction_hash not in existing_labels_transactions:
labels_to_save.append(_event_to_label(blockchain_type, event))
elif (
event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
):
labels_to_save.append(_event_to_label(blockchain_type, event))
save_labels(db_session, labels_to_save)
def save_function_calls(
db_session: Session,
function_calls: List[ContractFunctionCall],
blockchain_type: AvailableBlockchainType,
) -> None:
label_model = get_label_model(blockchain_type)
transactions_hashes_to_save = [
function_call.transaction_hash for function_call in function_calls
]
existing_labels = (
db_session.query(label_model.transaction_hash)
.filter(
label_model.label == CRAWLER_LABEL,
label_model.log_index == None,
label_model.transaction_hash.in_(transactions_hashes_to_save),
)
.all()
)
existing_labels_transactions = [label[0] for label in existing_labels]
labels_to_save = [
_function_call_to_label(blockchain_type, function_call)
for function_call in function_calls
if function_call.transaction_hash not in existing_labels_transactions
]
save_labels(db_session, labels_to_save)

Wyświetl plik

@ -0,0 +1,146 @@
import logging
import traceback
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union, cast
from eth_typing.evm import ChecksumAddress
from moonstreamdb.models import Base
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import and_
from web3 import Web3
from ..blockchain import connect, get_block_model, get_label_model
from ..data import AvailableBlockchainType
from ..settings import CRAWLER_LABEL
from .crawler import EventCrawlJob
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Event:
event_name: str
args: Dict[str, Any]
address: str
block_number: int
block_timestamp: int
transaction_hash: str
log_index: int
def _get_block_timestamp_from_web3(
web3: Web3,
block_number: int,
) -> int:
"""
Gets the timestamp of a block from the blockchain.
will raise an exception if the block is not found.
"""
return web3.eth.getBlock(block_number).timestamp
# I am using blocks_cache as the argument, to reuse this function in tx_call crawler
# and support one cashe for both tx_call and event_crawler
def get_block_timestamp(
db_session: Session,
web3: Web3,
blockchain_type: AvailableBlockchainType,
block_number: int,
blocks_cache: Dict[int, int],
max_blocks_batch: int = 30,
) -> int:
"""
Get the timestamp of a block.
First tries to get the block from the cache,
then tries to get the block from the db,
then tries to get it from the blockchain.
After the call cache is updated.
If the cache grows too large, it is cleared.
:param block_number: The block number.
:param max_blocks_batch: The maximum number of blocks to fetch in a single batch from db query.
:param blocks_cache: The cache of blocks.
:return: The timestamp of the block.
"""
assert max_blocks_batch > 0
if block_number in blocks_cache:
return blocks_cache[block_number]
block_model = get_block_model(blockchain_type)
blocks = (
db_session.query(block_model)
.filter(
and_(
block_model.block_number >= block_number,
block_model.block_number <= block_number + max_blocks_batch - 1,
)
)
.order_by(block_model.block_number.asc())
.all()
)
target_block_timestamp: Optional[int] = None
if blocks and blocks[0].block_number == block_number:
target_block_timestamp = blocks[0].timestamp
if target_block_timestamp is None:
target_block_timestamp = _get_block_timestamp_from_web3(web3, block_number)
if len(blocks_cache) > max_blocks_batch * 2:
blocks_cache.clear()
blocks_cache[block_number] = target_block_timestamp
for block in blocks:
blocks_cache[block.block_number] = block.timestamp
return target_block_timestamp
def _crawl_events(
db_session: Session,
blockchain_type: AvailableBlockchainType,
web3: Web3,
jobs: List[EventCrawlJob],
from_block: int,
to_block: int,
blocks_cache: Dict[int, int] = {},
db_block_query_batch=10,
) -> List[Event]:
all_events = []
for job in jobs:
raw_events = _fetch_events_chunk(
web3,
job.event_abi,
from_block,
to_block,
job.contracts,
on_decode_error=lambda e: print(
f"Error decoding event: {e}"
), # TODO report via humbug
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,
web3,
blockchain_type,
raw_event["blockNumber"],
blocks_cache,
db_block_query_batch,
)
event = Event(
event_name=raw_event["event"],
args=raw_event["args"],
address=raw_event["address"],
block_number=raw_event["blockNumber"],
block_timestamp=raw_event["blockTimestamp"],
transaction_hash=raw_event["transactionHash"],
log_index=raw_event["logIndex"],
)
all_events.append(event)
return all_events

Wyświetl plik

@ -0,0 +1,96 @@
import logging
from typing import Any, Dict, List, Optional, Union
from eth_typing.evm import ChecksumAddress
from hexbytes.main import HexBytes
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import (
Base,
EthereumLabel,
EthereumTransaction,
PolygonLabel,
PolygonTransaction,
)
from moonworm.crawler.function_call_crawler import ( # type: ignore
ContractFunctionCall,
FunctionCallCrawler,
)
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
)
from moonworm.crawler.networks import Network # type: ignore
from moonworm.cu_watch import MockState # type: ignore
from sqlalchemy.orm import Session
from web3 import Web3
from ..blockchain import connect, get_block_model, get_label_model
from ..data import AvailableBlockchainType
from ..settings import CRAWLER_LABEL
from .crawler import FunctionCallCrawlJob, _generate_reporter_callback
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _crawl_functions(
blockchain_type: AvailableBlockchainType,
ethereum_state_provider: MoonstreamEthereumStateProvider,
jobs: List[FunctionCallCrawlJob],
from_block: int,
to_block: int,
) -> List[ContractFunctionCall]:
shared_state = MockState()
crawled_functions = []
for job in jobs:
function_call_crawler = FunctionCallCrawler(
shared_state,
ethereum_state_provider,
job.contract_abi,
[job.contract_address],
on_decode_error=_generate_reporter_callback(
"function_call", blockchain_type
),
)
function_call_crawler.crawl(
from_block,
to_block,
)
crawled_functions = shared_state.state
return crawled_functions
def function_call_crawler(
db_session: Session,
blockchain_type: AvailableBlockchainType,
web3: Web3,
jobs: List[FunctionCallCrawlJob],
start_block: int,
end_block: int,
batch_size: int,
):
network = (
Network.ethereum
if blockchain_type == AvailableBlockchainType.ETHEREUM
else Network.polygon
)
ethereum_state_provider = MoonstreamEthereumStateProvider(
web3,
network,
db_session,
)
for i in range(start_block, end_block + 1, batch_size):
logger.info(f"Crawling from block {i} to {i + batch_size - 1}")
crawled_functions = _crawl_functions(
blockchain_type,
ethereum_state_provider,
jobs,
i,
min(i + batch_size - 1, end_block),
)
logger.info(f"Crawled {len(crawled_functions)} functions")
for function_call in crawled_functions:
print(function_call)

Wyświetl plik

@ -23,7 +23,7 @@ DOCS_TARGET_PATH = "docs"
# Crawler label
CRAWLER_LABEL = "moonworm"
CRAWLER_LABEL = "moonworm-alpha"
# Geth connection address
MOONSTREAM_NODE_ETHEREUM_IPC_ADDR = os.environ.get(
@ -79,3 +79,11 @@ if MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX is None:
raise ValueError(
"MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX environment variable must be set"
)
MOONSTREAM_MOONWORM_TASKS_JOURNAL = os.environ.get(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL", ""
)
if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "":
raise ValueError(
"MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set"
)

Wyświetl plik

@ -14,27 +14,26 @@ from uuid import UUID
import boto3 # type: ignore
from bugout.data import BugoutResource, BugoutResources
from moonstreamdb.db import yield_db_session_ctx
from sqlalchemy import Column, and_, func, text, distinct
from sqlalchemy import Column, and_, distinct, func, text
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.operators import in_op
from web3 import Web3
from ..blockchain import (
connect,
get_block_model,
get_label_model,
get_transaction_model,
connect,
)
from ..data import AvailableBlockchainType
from ..reporter import reporter
from ..settings import (
CRAWLER_LABEL,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
CRAWLER_LABEL,
)
from ..reporter import reporter
from ..settings import bugout_client as bc
from web3 import Web3
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@ -259,7 +258,9 @@ def generate_data(
functions: List[str],
start: Any,
metric_type: str,
crawler_label: str,
):
label_model = get_label_model(blockchain_type)
# create empty time series
@ -286,7 +287,7 @@ def generate_data(
label_requested = (
db_session.query(label_model.label_data["name"].astext.label("label"))
.filter(label_model.address == address)
.filter(label_model.label == CRAWLER_LABEL)
.filter(label_model.label == crawler_label)
.filter(
and_(
label_model.label_data["type"].astext == metric_type,
@ -329,7 +330,7 @@ def generate_data(
label_model.label_data["name"].astext.label("label"),
)
.filter(label_model.address == address)
.filter(label_model.label == CRAWLER_LABEL)
.filter(label_model.label == crawler_label)
.filter(
and_(
label_model.label_data["type"].astext == metric_type,
@ -404,14 +405,17 @@ def cast_to_python_type(evm_type: str) -> Callable:
def get_unique_address(
db_session: Session, blockchain_type: AvailableBlockchainType, address: str
db_session: Session,
blockchain_type: AvailableBlockchainType,
address: str,
crawler_label: str,
):
label_model = get_label_model(blockchain_type)
return (
db_session.query(label_model.label_data["args"]["to"])
.filter(label_model.address == address)
.filter(label_model.label == CRAWLER_LABEL)
.filter(label_model.label == crawler_label)
.filter(label_model.label_data["type"].astext == "event")
.filter(label_model.label_data["name"].astext == "Transfer")
.distinct()
@ -552,6 +556,7 @@ def get_count(
select_expression: Any,
blockchain_type: AvailableBlockchainType,
address: str,
crawler_label: str,
):
"""
Return count of event from database.
@ -561,7 +566,7 @@ def get_count(
return (
db_session.query(select_expression)
.filter(label_model.address == address)
.filter(label_model.label == CRAWLER_LABEL)
.filter(label_model.label == crawler_label)
.filter(label_model.label_data["type"].astext == type)
.filter(label_model.label_data["name"].astext == name)
.count()
@ -639,6 +644,14 @@ def stats_generate_handler(args: argparse.Namespace):
"address"
]
crawler_label = CRAWLER_LABEL
if address in (
"0xdC0479CC5BbA033B3e7De9F178607150B3AbCe1f",
"0xA2a13cE1824F3916fC84C65e559391fc6674e6e8",
):
crawler_label = "moonworm"
generic = dashboard_subscription_filters["generic"]
generic_metrics_names = [item["name"] for item in generic]
@ -693,6 +706,7 @@ def stats_generate_handler(args: argparse.Namespace):
db_session=db_session,
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
@ -709,6 +723,7 @@ def stats_generate_handler(args: argparse.Namespace):
select_expression=get_label_model(blockchain_type),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
@ -729,6 +744,7 @@ def stats_generate_handler(args: argparse.Namespace):
),
blockchain_type=blockchain_type,
address=address,
crawler_label=crawler_label,
),
}
)
@ -757,6 +773,7 @@ def stats_generate_handler(args: argparse.Namespace):
functions=methods,
start=start_date,
metric_type="tx_call",
crawler_label=crawler_label,
)
s3_data_object["functions"] = functions_calls_data
@ -769,6 +786,7 @@ def stats_generate_handler(args: argparse.Namespace):
functions=events,
start=start_date,
metric_type="event",
crawler_label=crawler_label,
)
s3_data_object["events"] = events_data

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.1.1"
MOONCRAWL_VERSION = "0.1.2"

Wyświetl plik

@ -16,3 +16,4 @@ export HUMBUG_REPORTER_CRAWLERS_TOKEN="<Bugout Humbug token for crash reports>"
export MOONSTREAM_DATA_JOURNAL_ID="<Bugout journal id for moonstream>"
export MOONSTREAM_ADMIN_ACCESS_TOKEN="<Bugout access token for moonstream>"
export NFT_HUMBUG_TOKEN="<Token for nft crawler>"
export MOONSTREAM_MOONWORM_TASKS_JOURNAL="<journal_with_tasks_for_moonworm_crawler>"

Wyświetl plik

@ -34,17 +34,18 @@ setup(
zip_safe=False,
install_requires=[
"boto3",
"bugout",
"bugout>=0.1.19",
"chardet",
"fastapi",
"moonstreamdb",
"moonstreamdb>=0.2.2",
"moonworm>=0.1.7",
"humbug",
"pydantic",
"python-dateutil",
"requests",
"tqdm",
"uvicorn",
"web3",
"web3[tester]",
],
extras_require={
"dev": ["black", "isort", "mypy", "types-requests", "types-python-dateutil"],
@ -57,6 +58,7 @@ setup(
"esd=mooncrawl.esd:main",
"etherscan=mooncrawl.etherscan:main",
"identity=mooncrawl.identity:main",
"moonworm-crawler=mooncrawl.moonworm_crawler.cli:main",
"nft=mooncrawl.nft.cli:main",
"statistics=mooncrawl.stats_worker.dashboard:main",
]