diff --git a/backend/moonstreamapi/actions.py b/backend/moonstreamapi/actions.py index a00c716f..836688bc 100644 --- a/backend/moonstreamapi/actions.py +++ b/backend/moonstreamapi/actions.py @@ -502,21 +502,26 @@ def apply_moonworm_tasks( subscription_type: str, abi: Any, address: str, + entries_limit: int = 100, ) -> None: """ Get list of subscriptions loads abi and apply them as moonworm tasks if it not exist """ - entries_pack = [] + moonworm_abi_tasks_entries_pack = [] try: entries = get_all_entries_from_search( journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, search_query=f"tag:address:{address} tag:subscription_type:{subscription_type}", - limit=100, + limit=entries_limit, # load per request token=MOONSTREAM_ADMIN_ACCESS_TOKEN, ) + # create historical crawl task in journal + + # will use create_entries_pack for creating entries in journal + existing_tags = [entry.tags for entry in entries] existing_hashes = [ @@ -534,7 +539,16 @@ def apply_moonworm_tasks( for hash in abi_hashes_dict: if hash not in existing_hashes: - entries_pack.append( + abi_selector = Web3.keccak( + text=abi_hashes_dict[hash]["name"] + + "(" + + ",".join( + map(lambda x: x["type"], abi_hashes_dict[hash]["inputs"]) + ) + + ")" + )[:4].hex() + + moonworm_abi_tasks_entries_pack.append( { "title": address, "content": json.dumps(abi_hashes_dict[hash], indent=4), @@ -542,22 +556,32 @@ def apply_moonworm_tasks( f"address:{address}", f"type:{abi_hashes_dict[hash]['type']}", f"abi_method_hash:{hash}", + f"abi_selector:{abi_selector}", f"subscription_type:{subscription_type}", f"abi_name:{abi_hashes_dict[hash]['name']}", f"status:active", + f"task_type:moonworm", + f"moonworm_task_pickedup:False", # True if task picked up by moonworm-crawler(default each 120 sec) + f"historical_crawl_status:pending", # pending, in_progress, done + f"progress:0", # 0-100 % ], } ) except Exception as e: + logger.error(f"Error get moonworm tasks: {str(e)}") reporter.error_report(e) - if len(entries_pack) > 0: - bc.create_entries_pack( - token=MOONSTREAM_ADMIN_ACCESS_TOKEN, - journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, - entries=entries_pack, - timeout=15, - ) + if len(moonworm_abi_tasks_entries_pack) > 0: + try: + bc.create_entries_pack( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entries=moonworm_abi_tasks_entries_pack, + timeout=25, + ) + except Exception as e: + logger.error(f"Error create moonworm tasks: {str(e)}") + reporter.error_report(e) def name_normalization(query_name: str) -> str: @@ -754,3 +778,18 @@ def query_parameter_hash(params: Dict[str, Any]) -> str: ).hexdigest() return hash + + +def get_moonworm_jobs( + address: str, + subscription_type_id: str, + entries_limit: int = 100, +): + entries = get_all_entries_from_search( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + search_query=f"tag:address:{address} tag:subscription_type:{subscription_type_id}", + limit=entries_limit, # load per request + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + ) + + return entries diff --git a/backend/moonstreamapi/routes/subscriptions.py b/backend/moonstreamapi/routes/subscriptions.py index 836c03e2..2ca44fce 100644 --- a/backend/moonstreamapi/routes/subscriptions.py +++ b/backend/moonstreamapi/routes/subscriptions.py @@ -15,6 +15,7 @@ from ..actions import ( apply_moonworm_tasks, get_entity_subscription_collection_id, EntityCollectionNotFoundException, + get_moonworm_jobs, ) from ..admin import subscription_types from .. import data @@ -22,7 +23,7 @@ from ..admin import subscription_types from ..middleware import MoonstreamHTTPException from ..reporter import reporter from ..settings import bugout_client as bc, entity_client as ec -from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN +from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL from ..web3_provider import yield_web3_provider @@ -484,6 +485,59 @@ async def get_subscription_abi_handler( ) +@router.get( + "/{subscription_id}/jobs", + tags=["subscriptions"], + response_model=data.SubdcriptionsAbiResponse, +) +async def get_subscription_jobs_handler( + request: Request, + subscription_id: str, +) -> Any: + token = request.state.token + user = request.state.user + + try: + collection_id = get_entity_subscription_collection_id( + resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + user_id=user.id, + ) + + # get subscription entity + subscription_resource = ec.get_entity( + token=token, + collection_id=collection_id, + entity_id=subscription_id, + ) + + except EntityCollectionNotFoundException as e: + raise MoonstreamHTTPException( + status_code=404, + detail="User subscriptions collection not found", + internal_error=e, + ) + except Exception as e: + logger.error( + f"Error get subscriptions for user ({user}) with token ({token}), error: {str(e)}" + ) + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + for field in subscription_resource.required_fields: + if "subscription_type_id" in field: + subscription_type_id = field["subscription_type_id"] + + if "address" in field: + subscription_address = field["address"] + + get_moonworm_jobs_response = get_moonworm_jobs( + subscription_type_id=subscription_type_id, + address=subscription_address, + ) + + return get_moonworm_jobs_response + + @router.get( "/types", tags=["subscriptions"], response_model=data.SubscriptionTypesListResponse ) diff --git a/crawlers/deploy/deploy.bash b/crawlers/deploy/deploy.bash index fab521a6..907f0046 100755 --- a/crawlers/deploy/deploy.bash +++ b/crawlers/deploy/deploy.bash @@ -37,6 +37,10 @@ ETHEREUM_MISSING_TIMER_FILE="ethereum-missing.timer" ETHEREUM_MOONWORM_CRAWLER_SERVICE_FILE="ethereum-moonworm-crawler.service" ETHEREUM_ORANGE_DAO_REPORTS_TOKENONOMICS_SERVICE_FILE="ethereum-orange-dao-reports-tokenonomics.service" ETHEREUM_ORANGE_DAO_TOKENONOMICS_TIMER_FILE="ethereum-orange-dao-reports-tokenonomics.timer" +ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="ethereum-historical-crawl-transactions.service" +ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="ethereum-historical-crawl-transactions.timer" +ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="ethereum-historical-crawl-events.service" +ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="ethereum-historical-crawl-events.timer" # Polygon service files POLYGON_SYNCHRONIZE_SERVICE="polygon-synchronize.service" @@ -56,6 +60,10 @@ POLYGON_CU_REPORTS_TOKENONOMICS_SERVICE_FILE="polygon-cu-reports-tokenonomics.se POLYGON_CU_REPORTS_TOKENONOMICS_TIMER_FILE="polygon-cu-reports-tokenonomics.timer" POLYGON_CU_NFT_DASHBOARD_SERVICE_FILE="polygon-cu-nft-dashboard.service" POLYGON_CU_NFT_DASHBOARD_TIMER_FILE="polygon-cu-nft-dashboard.timer" +POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="polygon-historical-crawl-transactions.service" +POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="polygon-historical-crawl-transactions.timer" +POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="polygon-historical-crawl-events.service" +POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="polygon-historical-crawl-events.timer" # Mumbai service files MUMBAI_SYNCHRONIZE_SERVICE="mumbai-synchronize.service" @@ -68,6 +76,11 @@ MUMBAI_STATE_CLEAN_SERVICE_FILE="mumbai-state-clean.service" MUMBAI_STATE_CLEAN_TIMER_FILE="mumbai-state-clean.timer" MUMBAI_METADATA_SERVICE_FILE="mumbai-metadata.service" MUMBAI_METADATA_TIMER_FILE="mumbai-metadata.timer" +MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE="mumbai-history-crawl-transactions.service" +MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE="mumbai-history-crawl-transactions.timer" +MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE="mumbai-history-crawl-events.service" +MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE="mumbai-history-crawl-events.timer" + # XDai service files XDAI_SYNCHRONIZE_SERVICE="xdai-synchronize.service" @@ -76,6 +89,10 @@ XDAI_MISSING_TIMER_FILE="xdai-missing.timer" XDAI_STATISTICS_SERVICE_FILE="xdai-statistics.service" XDAI_STATISTICS_TIMER_FILE="xdai-statistics.timer" XDAI_MOONWORM_CRAWLER_SERVICE_FILE="xdai-moonworm-crawler.service" +XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="xdai-historical-crawl-transactions.service" +XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="xdai-historical-crawl-transactions.timer" +XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="xdai-historical-crawl-events.service" +XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="xdai-historical-crawl-events.timer" # Wyrm service files WYRM_SYNCHRONIZE_SERVICE="wyrm-synchronize.service" @@ -84,6 +101,10 @@ WYRM_MISSING_TIMER_FILE="wyrm-missing.timer" WYRM_STATISTICS_SERVICE_FILE="wyrm-statistics.service" WYRM_STATISTICS_TIMER_FILE="wyrm-statistics.timer" WYRM_MOONWORM_CRAWLER_SERVICE_FILE="wyrm-moonworm-crawler.service" +WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="wyrm-historical-crawl-transactions.service" +WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="wyrm-historical-crawl-transactions.timer" +WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="wyrm-historical-crawl-events.service" +WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="wyrm-historical-crawl-events.timer" set -eu @@ -181,6 +202,24 @@ cp "${SCRIPT_DIR}/${ETHEREUM_ORANGE_DAO_TOKENONOMICS_TIMER_FILE}" "/home/ubuntu/ XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${ETHEREUM_ORANGE_DAO_TOKENONOMICS_TIMER_FILE}" +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Ethereum historical transactions crawler service and timer with: ${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Ethereum historical events crawler service and timer with: ${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" + echo echo @@ -269,6 +308,24 @@ cp "${SCRIPT_DIR}/${POLYGON_CU_NFT_DASHBOARD_TIMER_FILE}" "/home/ubuntu/.config/ XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${POLYGON_CU_NFT_DASHBOARD_TIMER_FILE}" +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Polygon historical transactions crawler service and timer with: ${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Polygon historical events crawler service and timer with: ${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" + echo echo echo -e "${PREFIX_INFO} Replacing existing Mumbai block with transactions syncronizer service definition with ${MUMBAI_SYNCHRONIZE_SERVICE}" @@ -321,6 +378,24 @@ cp "${SCRIPT_DIR}/${MUMBAI_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/u XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${MUMBAI_METADATA_TIMER_FILE}" +echo +echo +echo -e "${PREFIX_INFO} Replacing existing MUMBAI historical transactions crawler service and timer with: ${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing MUMBAI historical events crawler service and timer with: ${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}, ${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" + echo echo echo -e "${PREFIX_INFO} Replacing existing XDai block with transactions syncronizer service definition with ${XDAI_SYNCHRONIZE_SERVICE}" @@ -355,6 +430,24 @@ cp "${SCRIPT_DIR}/${XDAI_MOONWORM_CRAWLER_SERVICE_FILE}" "/home/ubuntu/.config/s XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${XDAI_MOONWORM_CRAWLER_SERVICE_FILE}" +echo +echo +echo -e "${PREFIX_INFO} Replacing existing xDai historical transactions crawler service and timer with: ${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing xDai historical events crawler service and timer with: ${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" + echo echo echo -e "${PREFIX_INFO} Replacing existing Wyrm block with transactions syncronizer service definition with ${WYRM_SYNCHRONIZE_SERVICE}" @@ -387,4 +480,22 @@ echo -e "${PREFIX_INFO} Replacing existing Wyrm moonworm crawler service definit chmod 644 "${SCRIPT_DIR}/${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" cp "${SCRIPT_DIR}/${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload -XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" \ No newline at end of file +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Wyrm historical transactions crawler service and timer with: ${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" + +echo +echo +echo -e "${PREFIX_INFO} Replacing existing Wyrm historical events crawler service and timer with: ${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +chmod 644 "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" +cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" +XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload +XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" diff --git a/crawlers/deploy/ethereum-historical-crawl-events.service b/crawlers/deploy/ethereum-historical-crawl-events.service new file mode 100644 index 00000000..006a03c4 --- /dev/null +++ b/crawlers/deploy/ethereum-historical-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=Ethereum historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type ethereum --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=ethereum-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/ethereum-historical-crawl-events.timer b/crawlers/deploy/ethereum-historical-crawl-events.timer new file mode 100644 index 00000000..57801df3 --- /dev/null +++ b/crawlers/deploy/ethereum-historical-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on ethereum + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/ethereum-historical-crawl-transactions.service b/crawlers/deploy/ethereum-historical-crawl-transactions.service new file mode 100644 index 00000000..1b6b4c2a --- /dev/null +++ b/crawlers/deploy/ethereum-historical-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=Ethereum historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type ethereum --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=ethereum-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/ethereum-historical-crawl-transactions.timer b/crawlers/deploy/ethereum-historical-crawl-transactions.timer new file mode 100644 index 00000000..4038c649 --- /dev/null +++ b/crawlers/deploy/ethereum-historical-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on ethereum + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/mumbai-history-crawl-events.service b/crawlers/deploy/mumbai-history-crawl-events.service new file mode 100644 index 00000000..488a1a94 --- /dev/null +++ b/crawlers/deploy/mumbai-history-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=Mumbai historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type mumbai --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=mumbai-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/mumbai-history-crawl-events.timer b/crawlers/deploy/mumbai-history-crawl-events.timer new file mode 100644 index 00000000..8fd46b46 --- /dev/null +++ b/crawlers/deploy/mumbai-history-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on mumbai + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/mumbai-history-crawl-transactions.service b/crawlers/deploy/mumbai-history-crawl-transactions.service new file mode 100644 index 00000000..1efb0082 --- /dev/null +++ b/crawlers/deploy/mumbai-history-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=Mumbai historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type mumbai --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=mumbai-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/mumbai-history-crawl-transactions.timer b/crawlers/deploy/mumbai-history-crawl-transactions.timer new file mode 100644 index 00000000..b1c8d824 --- /dev/null +++ b/crawlers/deploy/mumbai-history-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on mumbai + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/polygon-historical-crawl-events.service b/crawlers/deploy/polygon-historical-crawl-events.service new file mode 100644 index 00000000..c8017a93 --- /dev/null +++ b/crawlers/deploy/polygon-historical-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=Polygon historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type polygon --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=polygon-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/polygon-historical-crawl-events.timer b/crawlers/deploy/polygon-historical-crawl-events.timer new file mode 100644 index 00000000..f0560ac8 --- /dev/null +++ b/crawlers/deploy/polygon-historical-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on polygon + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/polygon-historical-crawl-transactions.service b/crawlers/deploy/polygon-historical-crawl-transactions.service new file mode 100644 index 00000000..30a245b9 --- /dev/null +++ b/crawlers/deploy/polygon-historical-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=Polygon historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type polygon --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=polygon-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/polygon-historical-crawl-transactions.timer b/crawlers/deploy/polygon-historical-crawl-transactions.timer new file mode 100644 index 00000000..4a2cdd21 --- /dev/null +++ b/crawlers/deploy/polygon-historical-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on polygon + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/wyrm-historical-crawl-events.service b/crawlers/deploy/wyrm-historical-crawl-events.service new file mode 100644 index 00000000..cd73d8c0 --- /dev/null +++ b/crawlers/deploy/wyrm-historical-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=Wyrm historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type wyrm --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=wyrm-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/wyrm-historical-crawl-events.timer b/crawlers/deploy/wyrm-historical-crawl-events.timer new file mode 100644 index 00000000..bd402c01 --- /dev/null +++ b/crawlers/deploy/wyrm-historical-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on wyrm + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/wyrm-historical-crawl-transactions.service b/crawlers/deploy/wyrm-historical-crawl-transactions.service new file mode 100644 index 00000000..5c496583 --- /dev/null +++ b/crawlers/deploy/wyrm-historical-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=Wyrm historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type wyrm --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=wyrm-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/wyrm-historical-crawl-transactions.timer b/crawlers/deploy/wyrm-historical-crawl-transactions.timer new file mode 100644 index 00000000..40ea12bd --- /dev/null +++ b/crawlers/deploy/wyrm-historical-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on wyrm + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/xdai-historical-crawl-events.service b/crawlers/deploy/xdai-historical-crawl-events.service new file mode 100644 index 00000000..11b0fa5a --- /dev/null +++ b/crawlers/deploy/xdai-historical-crawl-events.service @@ -0,0 +1,17 @@ +[Unit] +Description=XDai historical crawler events +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type xdai --find-deployed-blocks --end 0 --tasks-journal --only-events +CPUWeight=70 +SyslogIdentifier=xdai-historical-crawler-events + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/xdai-historical-crawl-events.timer b/crawlers/deploy/xdai-historical-crawl-events.timer new file mode 100644 index 00000000..153cfaa8 --- /dev/null +++ b/crawlers/deploy/xdai-historical-crawl-events.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs events historical crawler on xdai + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/deploy/xdai-historical-crawl-transactions.service b/crawlers/deploy/xdai-historical-crawl-transactions.service new file mode 100644 index 00000000..8fb6303b --- /dev/null +++ b/crawlers/deploy/xdai-historical-crawl-transactions.service @@ -0,0 +1,17 @@ +[Unit] +Description=XDai historical crawler transactions +After=network.target +StartLimitIntervalSec=300 +StartLimitBurst=3 + +[Service] +WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl +EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env +Restart=on-failure +RestartSec=15s +ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type xdai --find-deployed-blocks --end 0 --tasks-journal --only-transactions +CPUWeight=70 +SyslogIdentifier=xdai-historical-crawler-transactions + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/crawlers/deploy/xdai-historical-crawl-transactions.timer b/crawlers/deploy/xdai-historical-crawl-transactions.timer new file mode 100644 index 00000000..89dbcb8f --- /dev/null +++ b/crawlers/deploy/xdai-historical-crawl-transactions.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Runs transactions historical crawler on xdai + +[Timer] +OnBootSec=60s +OnUnitActiveSec=10m + +[Install] +WantedBy=timers.target diff --git a/crawlers/mooncrawl/mooncrawl/crawler.py b/crawlers/mooncrawl/mooncrawl/crawler.py index b733ad89..0dcd7f69 100644 --- a/crawlers/mooncrawl/mooncrawl/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/crawler.py @@ -13,7 +13,7 @@ from typing import Iterator, List from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType -import dateutil.parser +import dateutil.parser # type: ignore from .blockchain import ( DateRange, diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 43d435d9..d83e41a0 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -8,7 +8,12 @@ from web3 import Web3 from web3.middleware import geth_poa_middleware from ..db import yield_db_session_ctx -from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID +from ..settings import ( + MOONSTREAM_MOONWORM_TASKS_JOURNAL, + NB_CONTROLLER_ACCESS_ID, + HISTORICAL_CRAWLER_STATUSES, + HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES, +) from .continuous_crawler import _retry_connect_web3, continuous_crawler from .crawler import ( SubscriptionTypes, @@ -16,6 +21,9 @@ from .crawler import ( get_crawl_job_entries, make_event_crawl_jobs, make_function_call_crawl_jobs, + find_all_deployed_blocks, + update_job_state_with_filters, + moonworm_crawler_update_job_as_pickedup, ) from .db import get_first_labeled_block_number, get_last_labeled_block_number from .historical_crawler import historical_crawler @@ -48,6 +56,14 @@ def handle_crawl(args: argparse.Namespace) -> None: f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}" ) + ( + initial_event_jobs, + initial_function_call_jobs, + ) = moonworm_crawler_update_job_as_pickedup( + event_crawl_jobs=initial_event_jobs, + function_call_crawl_jobs=initial_function_call_jobs, + ) + logger.info(f"Blockchain type: {blockchain_type.value}") with yield_db_session_ctx() as db_session: web3: Optional[Web3] = None @@ -125,20 +141,33 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: blockchain_type = AvailableBlockchainType(args.blockchain_type) subscription_type = blockchain_type_to_subscription_type(blockchain_type) + extend_tags = [] + addresses_filter = [] if args.address is not None: addresses_filter = [Web3.toChecksumAddress(args.address)] + if args.tasks_journal: + addresses_filter = [] + extend_tags.extend( + [ + "#moonworm_task_pickedup:True", + f"!#{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}", + ] + ) + all_event_jobs = make_event_crawl_jobs( get_crawl_job_entries( subscription_type, "event", MOONSTREAM_MOONWORM_TASKS_JOURNAL, + extend_tags=extend_tags, ) ) + filtered_event_jobs = [] for job in all_event_jobs: - if addresses_filter: + if addresses_filter and not args.tasks_journal: intersection = [ address for address in job.contracts if address in addresses_filter ] @@ -155,21 +184,57 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: subscription_type, "function", MOONSTREAM_MOONWORM_TASKS_JOURNAL, + extend_tags=extend_tags, ) ) + if addresses_filter: - filtered_function_call_jobs = [ - job - for job in all_function_call_jobs - if job.contract_address in addresses_filter - ] + filtered_function_call_jobs = [job for job in all_function_call_jobs] else: filtered_function_call_jobs = all_function_call_jobs + # get set of addresses from event jobs and function call jobs + if args.only_events: filtered_function_call_jobs = [] logger.info(f"Removing function call crawl jobs since --only-events is set") + if args.only_functions: + filtered_event_jobs = [] + logger.info( + f"Removing event crawl jobs since --only-functions is set. Function call jobs count: {len(filtered_function_call_jobs)}" + ) + + if args.only_events and args.only_functions: + raise ValueError( + "--only-events and --only-functions cannot be set at the same time" + ) + + if args.tasks_journal: + if len(filtered_event_jobs) > 0: + filtered_event_jobs = update_job_state_with_filters( # type: ignore + events=filtered_event_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pickedup:True", + ], + tags_to_add=["historical_crawl_status:in_progress"], + tags_to_delete=["historical_crawl_status:pending"], + ) + + if len(filtered_function_call_jobs) > 0: + filtered_function_call_jobs = update_job_state_with_filters( # type: ignore + events=filtered_function_call_jobs, + address_filter=[], + required_tags=[ + "historical_crawl_status:pending", + "moonworm_task_pickedup:True", + ], + tags_to_add=["historical_crawl_status:in_progress"], + tags_to_delete=["historical_crawl_status:pending"], + ) + logger.info( f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}" ) @@ -198,7 +263,30 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: ) logger.info(f"Last labeled block: {last_labeled_block}") - start_block = args.start + addresses_deployment_blocks = None + + end_block = args.end + + # get set of addresses from event jobs and function call jobs + if args.find_deployed_blocks: + addresses_set = set() + for job in filtered_event_jobs: + addresses_set.update(job.contracts) + for function_job in filtered_function_call_jobs: + addresses_set.add(function_job.contract_address) + + if args.start is None: + start_block = web3.eth.blockNumber - 1 + + addresses_deployment_blocks = find_all_deployed_blocks( + web3, list(addresses_set) + ) + if len(addresses_deployment_blocks) == 0: + raise ValueError( + "No addresses found in the blockchain. Please check your addresses and try again" + ) + end_block = min(addresses_deployment_blocks.values()) + if start_block is None: logger.info("No start block provided") if last_labeled_block is not None: @@ -226,9 +314,9 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: else: logger.info(f"Using start block: {start_block}") - if start_block < args.end: + if start_block < end_block: raise ValueError( - f"Start block {start_block} is less than end block {args.end}. This crawler crawls in the reverse direction." + f"Start block {start_block} is less than end block {end_block}. This crawler crawls in the reverse direction." ) historical_crawler( @@ -238,10 +326,11 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: filtered_event_jobs, filtered_function_call_jobs, start_block, - args.end, + end_block, args.max_blocks_batch, args.min_sleep_time, access_id=args.access_id, + addresses_deployment_blocks=addresses_deployment_blocks, ) @@ -371,7 +460,7 @@ def main() -> None: "--end", "-e", type=int, - required=True, + required=False, ) historical_crawl_parser.add_argument( "--blockchain-type", @@ -420,6 +509,24 @@ def main() -> None: default=False, help="Only crawl events", ) + historical_crawl_parser.add_argument( + "--only-functions", + action="store_true", + default=False, + help="Only crawl function calls", + ) + historical_crawl_parser.add_argument( + "--find-deployed-blocks", + action="store_true", + default=False, + help="Find all deployed blocks", + ) + historical_crawl_parser.add_argument( + "--tasks-journal", + action="store_true", + default=False, + help="Use tasks journal wich will fill all required fields for historical crawl", + ) historical_crawl_parser.set_defaults(func=handle_historical_crawl) args = parser.parse_args() diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py index d33c8b35..fa8249b6 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/continuous_crawler.py @@ -24,10 +24,15 @@ from .crawler import ( make_function_call_crawl_jobs, merge_event_crawl_jobs, merge_function_call_crawl_jobs, + moonworm_crawler_update_job_as_pickedup, ) from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events from .function_call_crawler import _crawl_functions +from ..settings import ( + HISTORICAL_CRAWLER_STATUSES, + HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES, +) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -220,6 +225,15 @@ def continuous_crawler( event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs( event_crawl_jobs, function_call_crawl_jobs, blockchain_type ) + + ( + event_crawl_jobs, + function_call_crawl_jobs, + ) = moonworm_crawler_update_job_as_pickedup( + event_crawl_jobs=event_crawl_jobs, + function_call_crawl_jobs=function_call_crawl_jobs, + ) + jobs_refetchet_time = current_time if current_time - last_heartbeat_time > timedelta( diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py index 9f21fefe..3418959d 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/crawler.py @@ -5,13 +5,14 @@ import time from dataclasses import dataclass from datetime import datetime from enum import Enum -from typing import Any, Callable, Dict, List, Optional, cast +from typing import Any, Callable, Dict, List, Optional, cast, Union, Tuple from uuid import UUID -from bugout.data import BugoutSearchResult +from bugout.data import BugoutSearchResult, BugoutJournalEntries from eth_typing.evm import ChecksumAddress from moonstreamdb.blockchain import AvailableBlockchainType from web3.main import Web3 +from moonworm.deployment import find_deployment_block # type: ignore from ..blockchain import connect from ..reporter import reporter @@ -20,6 +21,8 @@ from ..settings import ( MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL, bugout_client, + HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES, + HISTORICAL_CRAWLER_STATUSES, ) logging.basicConfig(level=logging.INFO) @@ -145,6 +148,7 @@ class EventCrawlJob: event_abi_hash: str event_abi: Dict[str, Any] contracts: List[ChecksumAddress] + address_entries: Dict[ChecksumAddress, Dict[UUID, List[str]]] created_at: int @@ -152,6 +156,7 @@ class EventCrawlJob: class FunctionCallCrawlJob: contract_abi: List[Dict[str, Any]] contract_address: ChecksumAddress + entries_tags: Dict[UUID, List[str]] created_at: int @@ -161,6 +166,7 @@ def get_crawl_job_entries( journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL, created_at_filter: Optional[int] = None, limit: int = 200, + extend_tags: Optional[List[str]] = None, ) -> List[BugoutSearchResult]: """ Get all event ABIs from bugout journal @@ -172,6 +178,10 @@ def get_crawl_job_entries( """ query = f"#status:active #type:{crawler_type} #subscription_type:{subscription_type.value}" + if extend_tags is not None: + for tag in extend_tags: + query += f" {tag.rstrip()}" + if created_at_filter is not None: # Filtering by created_at # Filtering not by strictly greater than @@ -201,6 +211,32 @@ def get_crawl_job_entries( return entries +def find_all_deployed_blocks( + web3: Web3, addresses_set: List[ChecksumAddress] +) -> Dict[ChecksumAddress, int]: + """ + find all deployed blocks for given addresses + """ + + all_deployed_blocks = {} + for address in addresses_set: + try: + code = web3.eth.getCode(address) + if code != "0x": + block = find_deployment_block( + web3_client=web3, + contract_address=address, + web3_interval=0.5, + ) + if block is not None: + all_deployed_blocks[address] = block + if block is None: + logger.error(f"Failed to get deployment block for {address}") + except Exception as e: + logger.error(f"Failed to get code for {address}: {e}") + return all_deployed_blocks + + def _get_tag(entry: BugoutSearchResult, tag: str) -> str: for entry_tag in entry.tags: if entry_tag.startswith(tag): @@ -219,16 +255,23 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ abi_hash = _get_tag(entry, "abi_method_hash") contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) + entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji + existing_crawl_job = crawl_job_by_hash.get(abi_hash) if existing_crawl_job is not None: if contract_address not in existing_crawl_job.contracts: existing_crawl_job.contracts.append(contract_address) + existing_crawl_job.address_entries[contract_address] = { + entry_id: entry.tags + } + else: abi = cast(str, entry.content) new_crawl_job = EventCrawlJob( event_abi_hash=abi_hash, event_abi=json.loads(abi), contracts=[contract_address], + address_entries={contract_address: {entry_id: entry.tags}}, created_at=int(datetime.fromisoformat(entry.created_at).timestamp()), ) crawl_job_by_hash[abi_hash] = new_crawl_job @@ -247,15 +290,18 @@ def make_function_call_crawl_jobs( method_signature_by_address: Dict[str, List[str]] = {} for entry in entries: + entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji contract_address = Web3().toChecksumAddress(_get_tag(entry, "address")) abi = json.loads(cast(str, entry.content)) method_signature = encode_function_signature(abi) if method_signature is None: raise ValueError(f"{abi} is not a function ABI") + if contract_address not in crawl_job_by_address: crawl_job_by_address[contract_address] = FunctionCallCrawlJob( contract_abi=[abi], contract_address=contract_address, + entries_tags={entry_id: entry.tags}, created_at=int(datetime.fromisoformat(entry.created_at).timestamp()), ) method_signature_by_address[contract_address] = [method_signature] @@ -264,6 +310,9 @@ def make_function_call_crawl_jobs( if method_signature not in method_signature_by_address[contract_address]: crawl_job_by_address[contract_address].contract_abi.append(abi) method_signature_by_address[contract_address].append(method_signature) + crawl_job_by_address[contract_address].entries_tags[ + entry_id + ] = entry.tags return [crawl_job for crawl_job in crawl_job_by_address.values()] @@ -392,3 +441,268 @@ def heartbeat( tags=[crawler_type, "heartbeat", blockchain_type.value, "dead"], timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) + + +def bugout_state_update( + entries_tags_add: List[Dict[str, Any]], + entries_tags_delete: List[Dict[str, Any]], +) -> BugoutJournalEntries: + """ + Run update of entries tags in bugout + First add tags to entries + Second delete tags from entries + With condition that if first step failed, second step will not be executed + """ + + new_entreis_state = BugoutJournalEntries(entries=[]) + + if len(entries_tags_add) > 0: + try: + new_entreis_state = bugout_client.create_entries_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entries_tags=entries_tags_add, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + except Exception as e: + logger.error(f"Failed to add tags to entries: {e}") + + if len(entries_tags_delete) > 0 and ( + len(entries_tags_add) < 0 or len(new_entreis_state.entries) > 0 + ): + try: + new_entreis_state = bugout_client.delete_entries_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + entries_tags=entries_tags_delete, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + except Exception as e: + logger.error(f"Failed to delete tags from entries: {e}") + + return new_entreis_state + + +def moonworm_crawler_update_job_as_pickedup( + event_crawl_jobs: List[EventCrawlJob], + function_call_crawl_jobs: List[FunctionCallCrawlJob], +) -> Tuple[List[EventCrawlJob], List[FunctionCallCrawlJob]]: + """ + Apply jobs of moonworm as taked to process + """ + + if len(event_crawl_jobs) > 0: + event_crawl_jobs = update_job_state_with_filters( # type: ignore + events=event_crawl_jobs, + address_filter=[], + required_tags=[ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}", + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False", + ], + tags_to_add=["moonworm_task_pickedup:True"], + tags_to_delete=["moonworm_task_pickedup:False"], + ) + + if len(function_call_crawl_jobs) > 0: + function_call_crawl_jobs = update_job_state_with_filters( # type: ignore + events=function_call_crawl_jobs, + address_filter=[], + required_tags=[ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}", + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False", + ], + tags_to_add=[ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:True" + ], + tags_to_delete=[ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False" + ], + ) + + return event_crawl_jobs, function_call_crawl_jobs + + +def update_job_tags( + events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], + new_entreis_state: BugoutJournalEntries, +): + """ + Update tags of the jobs in job object + """ + entry_tags_by_id = {entry.id: entry.tags for entry in new_entreis_state.entries} + + for event in events: + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.address_entries.items(): + for entry_id in entries_ids.keys(): + if entry_id in entry_tags_by_id: + event.address_entries[contract_address][ + entry_id + ] = entry_tags_by_id[entry_id] + + if isinstance(event, FunctionCallCrawlJob): + for entry_id in event.entries_tags.keys(): + if entry_id in entry_tags_by_id: + event.entries_tags[entry_id] = entry_tags_by_id[entry_id] + + return events + + +def update_job_state_with_filters( + events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], + address_filter: List[ChecksumAddress], + required_tags: List[str], + tags_to_add: List[str] = [], + tags_to_delete: List[str] = [], +) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]: + """ + Function that updates the state of the job in bugout. + """ + + entries_ids_to_update: List[UUID] = [] + + ### TODO: refactor this function + + if len(tags_to_add) == 0 and len(tags_to_delete) == 0: + return events + + for event in events: + # events + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.address_entries.items(): + if address_filter and contract_address not in address_filter: + continue + for entry_id, tags in entries_ids.items(): + if set(required_tags).issubset(set(tags)): + entries_ids_to_update.append(entry_id) + + # functions + if isinstance(event, FunctionCallCrawlJob): + if address_filter and event.contract_address not in address_filter: + continue + for entry_id, tags in event.entries_tags.items(): + if set(required_tags).issubset(set(tags)): + entries_ids_to_update.append(entry_id) + + if len(entries_ids_to_update) == 0: + return events + + new_entries_state = bugout_state_update( + entries_tags_add=[ + {"entry_id": entry_id, "tags": tags_to_add} + for entry_id in entries_ids_to_update + ], + entries_tags_delete=[ + {"entry_id": entry_id, "tags": tags_to_delete} + for entry_id in entries_ids_to_update + ], + ) + + events = update_job_tags(events, new_entries_state) + + return events + + +def update_entries_status_and_progress( + events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]], + progess_map: Dict[ChecksumAddress, float], +) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]: + """ + Update entries status and progress in mooncrawl bugout journal + """ + + entries_tags_delete: List[Dict[str, Any]] = [] + + entries_tags_add: List[Dict[str, Any]] = [] + + for event in events: + if isinstance(event, EventCrawlJob): + for contract_address, entries_ids in event.address_entries.items(): + progress = round(progess_map.get(contract_address, 0), 4) * 100 + + ( + entries_tags_delete, + entries_tags_add, + ) = add_progress_to_tags( + entries=entries_ids, + contract_progress=progress, + entries_tags_delete=entries_tags_delete, + entries_tags_add=entries_tags_add, + ) + + if isinstance(event, FunctionCallCrawlJob): + progress = round(progess_map.get(event.contract_address, 0), 4) * 100 + + ( + entries_tags_delete, + entries_tags_add, + ) = add_progress_to_tags( + entries=entries_ids, + contract_progress=progress, + entries_tags_delete=entries_tags_delete, + entries_tags_add=entries_tags_add, + ) + + new_entries_state = bugout_state_update( + entries_tags_add=entries_tags_add, + entries_tags_delete=entries_tags_delete, + ) + + events = update_job_tags(events, new_entries_state) + + return events + + +def add_progress_to_tags( + entries: Dict[UUID, List[str]], + contract_progress: float, + entries_tags_delete: List[Dict[str, Any]], + entries_tags_add: List[Dict[str, Any]], +) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Calculate progress and add finished tag if progress is 100 + """ + + new_progress = f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{contract_progress}" + + for entry_id, tags in entries.items(): + # progress + + if ( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + in tags + ): + continue + + if new_progress not in tags: + entries_tags_delete.append( + { + "entry_id": entry_id, + "tags": [ + tag + for tag in tags + if tag.startswith( + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}" + ) + ], + } + ) + + entries_tags_add.append( + { + "entry_id": entry_id, + "tags": [new_progress], + } + ) + + if contract_progress >= 100: + entries_tags_add.append( + { + "entry_id": entry_id, + "tags": [ + f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}" + ], + } + ) + + return entries_tags_delete, entries_tags_add diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py index 564f1dab..0cedf625 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/historical_crawler.py @@ -3,6 +3,7 @@ import time from typing import Dict, List, Optional, Tuple from uuid import UUID +from eth_typing.evm import ChecksumAddress from moonstreamdb.blockchain import AvailableBlockchainType from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore MoonstreamEthereumStateProvider, @@ -11,7 +12,12 @@ from moonworm.crawler.networks import Network # type: ignore from sqlalchemy.orm.session import Session from web3 import Web3 -from .crawler import EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3 +from .crawler import ( + EventCrawlJob, + FunctionCallCrawlJob, + _retry_connect_web3, + update_entries_status_and_progress, +) from .db import add_events_to_session, add_function_calls_to_session, commit_session from .event_crawler import _crawl_events, _autoscale_crawl_events from .function_call_crawler import _crawl_functions @@ -31,6 +37,7 @@ def historical_crawler( max_blocks_batch: int = 100, min_sleep_time: float = 0.1, access_id: Optional[UUID] = None, + addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None, ): assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0" assert min_sleep_time > 0, "min_sleep_time must be greater than 0" @@ -60,6 +67,10 @@ def historical_crawler( blocks_cache: Dict[int, int] = {} failed_count = 0 + original_start_block = start_block + + progess_map: Dict[ChecksumAddress, float] = {} + while start_block >= end_block: try: time.sleep(min_sleep_time) @@ -119,6 +130,27 @@ def historical_crawler( db_session, all_function_calls, blockchain_type ) + if addresses_deployment_blocks: + for address, deployment_block in addresses_deployment_blocks.items(): + current_position = batch_end_block + + progess = (original_start_block - current_position) / ( + original_start_block - deployment_block + ) + progess_map[address] = progess + + if len(function_call_crawl_jobs) > 0: + function_call_crawl_jobs = update_entries_status_and_progress( # type: ignore + events=function_call_crawl_jobs, + progess_map=progess_map, + ) + + if len(event_crawl_jobs) > 0: + event_crawl_jobs = update_entries_status_and_progress( # type: ignore + events=event_crawl_jobs, + progess_map=progess_map, + ) + # Commiting to db commit_session(db_session) diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index cd697b0f..d9a24f70 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -279,3 +279,26 @@ infura_networks = { BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription" BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription" BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards" + + +# Historical crawler status config + +HISTORICAL_CRAWLER_STATUSES = { + "pending": "pending", + "running": "running", + "finished": "finished", +} + +# Historical crawler moonworm status config + +HISTORICAL_CRAWLER_MOONWORM_STATUSES = { + "pickedup": True, +} + +# Statuses tags prefixes + +HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES = { + "moonworm_status": "moonworm_task_pickedup", + "historical_crawl_status": "historical_crawl_status", + "progress_status": "progress", +} diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index 47f45764..581a985f 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.3.0" +MOONCRAWL_VERSION = "0.3.1" diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index 96271e8d..8104d35c 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -34,7 +34,7 @@ setup( zip_safe=False, install_requires=[ "boto3", - "bugout>=0.1.19", + "bugout>=0.2.8", "chardet", "fastapi", "moonstreamdb>=0.3.3",