Merge pull request #789 from bugout-dev/historical-crawl-tasks

Historical crawl tasks
pull/794/head
Andrey Dolgolev 2023-05-31 17:09:52 +03:00 zatwierdzone przez GitHub
commit 72ccc6c0e0
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
31 zmienionych plików z 984 dodań i 30 usunięć

Wyświetl plik

@ -502,21 +502,26 @@ def apply_moonworm_tasks(
subscription_type: str,
abi: Any,
address: str,
entries_limit: int = 100,
) -> None:
"""
Get list of subscriptions loads abi and apply them as moonworm tasks if it not exist
"""
entries_pack = []
moonworm_abi_tasks_entries_pack = []
try:
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
search_query=f"tag:address:{address} tag:subscription_type:{subscription_type}",
limit=100,
limit=entries_limit, # load per request
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
# create historical crawl task in journal
# will use create_entries_pack for creating entries in journal
existing_tags = [entry.tags for entry in entries]
existing_hashes = [
@ -534,7 +539,16 @@ def apply_moonworm_tasks(
for hash in abi_hashes_dict:
if hash not in existing_hashes:
entries_pack.append(
abi_selector = Web3.keccak(
text=abi_hashes_dict[hash]["name"]
+ "("
+ ",".join(
map(lambda x: x["type"], abi_hashes_dict[hash]["inputs"])
)
+ ")"
)[:4].hex()
moonworm_abi_tasks_entries_pack.append(
{
"title": address,
"content": json.dumps(abi_hashes_dict[hash], indent=4),
@ -542,22 +556,32 @@ def apply_moonworm_tasks(
f"address:{address}",
f"type:{abi_hashes_dict[hash]['type']}",
f"abi_method_hash:{hash}",
f"abi_selector:{abi_selector}",
f"subscription_type:{subscription_type}",
f"abi_name:{abi_hashes_dict[hash]['name']}",
f"status:active",
f"task_type:moonworm",
f"moonworm_task_pickedup:False", # True if task picked up by moonworm-crawler(default each 120 sec)
f"historical_crawl_status:pending", # pending, in_progress, done
f"progress:0", # 0-100 %
],
}
)
except Exception as e:
logger.error(f"Error get moonworm tasks: {str(e)}")
reporter.error_report(e)
if len(entries_pack) > 0:
bc.create_entries_pack(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries=entries_pack,
timeout=15,
)
if len(moonworm_abi_tasks_entries_pack) > 0:
try:
bc.create_entries_pack(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries=moonworm_abi_tasks_entries_pack,
timeout=25,
)
except Exception as e:
logger.error(f"Error create moonworm tasks: {str(e)}")
reporter.error_report(e)
def name_normalization(query_name: str) -> str:
@ -754,3 +778,18 @@ def query_parameter_hash(params: Dict[str, Any]) -> str:
).hexdigest()
return hash
def get_moonworm_jobs(
address: str,
subscription_type_id: str,
entries_limit: int = 100,
):
entries = get_all_entries_from_search(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
search_query=f"tag:address:{address} tag:subscription_type:{subscription_type_id}",
limit=entries_limit, # load per request
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
return entries

Wyświetl plik

@ -15,6 +15,7 @@ from ..actions import (
apply_moonworm_tasks,
get_entity_subscription_collection_id,
EntityCollectionNotFoundException,
get_moonworm_jobs,
)
from ..admin import subscription_types
from .. import data
@ -22,7 +23,7 @@ from ..admin import subscription_types
from ..middleware import MoonstreamHTTPException
from ..reporter import reporter
from ..settings import bugout_client as bc, entity_client as ec
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN
from ..settings import MOONSTREAM_ADMIN_ACCESS_TOKEN, MOONSTREAM_MOONWORM_TASKS_JOURNAL
from ..web3_provider import yield_web3_provider
@ -484,6 +485,59 @@ async def get_subscription_abi_handler(
)
@router.get(
"/{subscription_id}/jobs",
tags=["subscriptions"],
response_model=data.SubdcriptionsAbiResponse,
)
async def get_subscription_jobs_handler(
request: Request,
subscription_id: str,
) -> Any:
token = request.state.token
user = request.state.user
try:
collection_id = get_entity_subscription_collection_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)
# get subscription entity
subscription_resource = ec.get_entity(
token=token,
collection_id=collection_id,
entity_id=subscription_id,
)
except EntityCollectionNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions collection not found",
internal_error=e,
)
except Exception as e:
logger.error(
f"Error get subscriptions for user ({user}) with token ({token}), error: {str(e)}"
)
raise MoonstreamHTTPException(status_code=500, internal_error=e)
for field in subscription_resource.required_fields:
if "subscription_type_id" in field:
subscription_type_id = field["subscription_type_id"]
if "address" in field:
subscription_address = field["address"]
get_moonworm_jobs_response = get_moonworm_jobs(
subscription_type_id=subscription_type_id,
address=subscription_address,
)
return get_moonworm_jobs_response
@router.get(
"/types", tags=["subscriptions"], response_model=data.SubscriptionTypesListResponse
)

Wyświetl plik

@ -37,6 +37,10 @@ ETHEREUM_MISSING_TIMER_FILE="ethereum-missing.timer"
ETHEREUM_MOONWORM_CRAWLER_SERVICE_FILE="ethereum-moonworm-crawler.service"
ETHEREUM_ORANGE_DAO_REPORTS_TOKENONOMICS_SERVICE_FILE="ethereum-orange-dao-reports-tokenonomics.service"
ETHEREUM_ORANGE_DAO_TOKENONOMICS_TIMER_FILE="ethereum-orange-dao-reports-tokenonomics.timer"
ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="ethereum-historical-crawl-transactions.service"
ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="ethereum-historical-crawl-transactions.timer"
ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="ethereum-historical-crawl-events.service"
ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="ethereum-historical-crawl-events.timer"
# Polygon service files
POLYGON_SYNCHRONIZE_SERVICE="polygon-synchronize.service"
@ -56,6 +60,10 @@ POLYGON_CU_REPORTS_TOKENONOMICS_SERVICE_FILE="polygon-cu-reports-tokenonomics.se
POLYGON_CU_REPORTS_TOKENONOMICS_TIMER_FILE="polygon-cu-reports-tokenonomics.timer"
POLYGON_CU_NFT_DASHBOARD_SERVICE_FILE="polygon-cu-nft-dashboard.service"
POLYGON_CU_NFT_DASHBOARD_TIMER_FILE="polygon-cu-nft-dashboard.timer"
POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="polygon-historical-crawl-transactions.service"
POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="polygon-historical-crawl-transactions.timer"
POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="polygon-historical-crawl-events.service"
POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="polygon-historical-crawl-events.timer"
# Mumbai service files
MUMBAI_SYNCHRONIZE_SERVICE="mumbai-synchronize.service"
@ -68,6 +76,11 @@ MUMBAI_STATE_CLEAN_SERVICE_FILE="mumbai-state-clean.service"
MUMBAI_STATE_CLEAN_TIMER_FILE="mumbai-state-clean.timer"
MUMBAI_METADATA_SERVICE_FILE="mumbai-metadata.service"
MUMBAI_METADATA_TIMER_FILE="mumbai-metadata.timer"
MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE="mumbai-history-crawl-transactions.service"
MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE="mumbai-history-crawl-transactions.timer"
MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE="mumbai-history-crawl-events.service"
MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE="mumbai-history-crawl-events.timer"
# XDai service files
XDAI_SYNCHRONIZE_SERVICE="xdai-synchronize.service"
@ -76,6 +89,10 @@ XDAI_MISSING_TIMER_FILE="xdai-missing.timer"
XDAI_STATISTICS_SERVICE_FILE="xdai-statistics.service"
XDAI_STATISTICS_TIMER_FILE="xdai-statistics.timer"
XDAI_MOONWORM_CRAWLER_SERVICE_FILE="xdai-moonworm-crawler.service"
XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="xdai-historical-crawl-transactions.service"
XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="xdai-historical-crawl-transactions.timer"
XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="xdai-historical-crawl-events.service"
XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="xdai-historical-crawl-events.timer"
# Wyrm service files
WYRM_SYNCHRONIZE_SERVICE="wyrm-synchronize.service"
@ -84,6 +101,10 @@ WYRM_MISSING_TIMER_FILE="wyrm-missing.timer"
WYRM_STATISTICS_SERVICE_FILE="wyrm-statistics.service"
WYRM_STATISTICS_TIMER_FILE="wyrm-statistics.timer"
WYRM_MOONWORM_CRAWLER_SERVICE_FILE="wyrm-moonworm-crawler.service"
WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE="wyrm-historical-crawl-transactions.service"
WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE="wyrm-historical-crawl-transactions.timer"
WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE="wyrm-historical-crawl-events.service"
WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE="wyrm-historical-crawl-events.timer"
set -eu
@ -181,6 +202,24 @@ cp "${SCRIPT_DIR}/${ETHEREUM_ORANGE_DAO_TOKENONOMICS_TIMER_FILE}" "/home/ubuntu/
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${ETHEREUM_ORANGE_DAO_TOKENONOMICS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Ethereum historical transactions crawler service and timer with: ${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${ETHEREUM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Ethereum historical events crawler service and timer with: ${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${ETHEREUM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
echo
echo
@ -269,6 +308,24 @@ cp "${SCRIPT_DIR}/${POLYGON_CU_NFT_DASHBOARD_TIMER_FILE}" "/home/ubuntu/.config/
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${POLYGON_CU_NFT_DASHBOARD_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Polygon historical transactions crawler service and timer with: ${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${POLYGON_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Polygon historical events crawler service and timer with: ${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${POLYGON_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Mumbai block with transactions syncronizer service definition with ${MUMBAI_SYNCHRONIZE_SERVICE}"
@ -321,6 +378,24 @@ cp "${SCRIPT_DIR}/${MUMBAI_METADATA_TIMER_FILE}" "/home/ubuntu/.config/systemd/u
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${MUMBAI_METADATA_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing MUMBAI historical transactions crawler service and timer with: ${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${MUMBAI_HISTORY_CRAWL_TRANSACTIONS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing MUMBAI historical events crawler service and timer with: ${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}, ${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_EVENTS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${MUMBAI_HISTORY_CRAWL_EVENTS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing XDai block with transactions syncronizer service definition with ${XDAI_SYNCHRONIZE_SERVICE}"
@ -355,6 +430,24 @@ cp "${SCRIPT_DIR}/${XDAI_MOONWORM_CRAWLER_SERVICE_FILE}" "/home/ubuntu/.config/s
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${XDAI_MOONWORM_CRAWLER_SERVICE_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing xDai historical transactions crawler service and timer with: ${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${XDai_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing xDai historical events crawler service and timer with: ${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${XDai_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Wyrm block with transactions syncronizer service definition with ${WYRM_SYNCHRONIZE_SERVICE}"
@ -387,4 +480,22 @@ echo -e "${PREFIX_INFO} Replacing existing Wyrm moonworm crawler service definit
chmod 644 "${SCRIPT_DIR}/${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user restart --no-block "${WYRM_MOONWORM_CRAWLER_SERVICE_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Wyrm historical transactions crawler service and timer with: ${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}, ${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${WYRM_HISTORICAL_CRAWL_TRANSACTIONS_TIMER_FILE}"
echo
echo
echo -e "${PREFIX_INFO} Replacing existing Wyrm historical events crawler service and timer with: ${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}, ${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
chmod 644 "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_EVENTS_SERVICE_FILE}"
cp "${SCRIPT_DIR}/${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}" "/home/ubuntu/.config/systemd/user/${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"
XDG_RUNTIME_DIR="/run/user/1000" systemctl --user daemon-reload
XDG_RUNTIME_DIR="/run/user/1000" systemctl enable --user "${WYRM_HISTORICAL_CRAWL_EVENTS_TIMER_FILE}"

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=Ethereum historical crawler events
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type ethereum --find-deployed-blocks --end 0 --tasks-journal --only-events
CPUWeight=70
SyslogIdentifier=ethereum-historical-crawler-events
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs events historical crawler on ethereum
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=Ethereum historical crawler transactions
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type ethereum --find-deployed-blocks --end 0 --tasks-journal --only-transactions
CPUWeight=70
SyslogIdentifier=ethereum-historical-crawler-transactions
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs transactions historical crawler on ethereum
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=Mumbai historical crawler events
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type mumbai --find-deployed-blocks --end 0 --tasks-journal --only-events
CPUWeight=70
SyslogIdentifier=mumbai-historical-crawler-events
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs events historical crawler on mumbai
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=Mumbai historical crawler transactions
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type mumbai --find-deployed-blocks --end 0 --tasks-journal --only-transactions
CPUWeight=70
SyslogIdentifier=mumbai-historical-crawler-transactions
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs transactions historical crawler on mumbai
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=Polygon historical crawler events
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type polygon --find-deployed-blocks --end 0 --tasks-journal --only-events
CPUWeight=70
SyslogIdentifier=polygon-historical-crawler-events
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs events historical crawler on polygon
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=Polygon historical crawler transactions
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type polygon --find-deployed-blocks --end 0 --tasks-journal --only-transactions
CPUWeight=70
SyslogIdentifier=polygon-historical-crawler-transactions
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs transactions historical crawler on polygon
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=Wyrm historical crawler events
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type wyrm --find-deployed-blocks --end 0 --tasks-journal --only-events
CPUWeight=70
SyslogIdentifier=wyrm-historical-crawler-events
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs events historical crawler on wyrm
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=Wyrm historical crawler transactions
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type wyrm --find-deployed-blocks --end 0 --tasks-journal --only-transactions
CPUWeight=70
SyslogIdentifier=wyrm-historical-crawler-transactions
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs transactions historical crawler on wyrm
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=XDai historical crawler events
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type xdai --find-deployed-blocks --end 0 --tasks-journal --only-events
CPUWeight=70
SyslogIdentifier=xdai-historical-crawler-events
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs events historical crawler on xdai
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -0,0 +1,17 @@
[Unit]
Description=XDai historical crawler transactions
After=network.target
StartLimitIntervalSec=300
StartLimitBurst=3
[Service]
WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl
EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env
Restart=on-failure
RestartSec=15s
ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.moonworm_crawler.cli --access-id "${NB_CONTROLLER_ACCESS_ID}" historical-crawl --blockchain-type xdai --find-deployed-blocks --end 0 --tasks-journal --only-transactions
CPUWeight=70
SyslogIdentifier=xdai-historical-crawler-transactions
[Install]
WantedBy=multi-user.target

Wyświetl plik

@ -0,0 +1,9 @@
[Unit]
Description=Runs transactions historical crawler on xdai
[Timer]
OnBootSec=60s
OnUnitActiveSec=10m
[Install]
WantedBy=timers.target

Wyświetl plik

@ -13,7 +13,7 @@ from typing import Iterator, List
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
import dateutil.parser
import dateutil.parser # type: ignore
from .blockchain import (
DateRange,

Wyświetl plik

@ -8,7 +8,12 @@ from web3 import Web3
from web3.middleware import geth_poa_middleware
from ..db import yield_db_session_ctx
from ..settings import MOONSTREAM_MOONWORM_TASKS_JOURNAL, NB_CONTROLLER_ACCESS_ID
from ..settings import (
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
NB_CONTROLLER_ACCESS_ID,
HISTORICAL_CRAWLER_STATUSES,
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES,
)
from .continuous_crawler import _retry_connect_web3, continuous_crawler
from .crawler import (
SubscriptionTypes,
@ -16,6 +21,9 @@ from .crawler import (
get_crawl_job_entries,
make_event_crawl_jobs,
make_function_call_crawl_jobs,
find_all_deployed_blocks,
update_job_state_with_filters,
moonworm_crawler_update_job_as_pickedup,
)
from .db import get_first_labeled_block_number, get_last_labeled_block_number
from .historical_crawler import historical_crawler
@ -48,6 +56,14 @@ def handle_crawl(args: argparse.Namespace) -> None:
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
)
(
initial_event_jobs,
initial_function_call_jobs,
) = moonworm_crawler_update_job_as_pickedup(
event_crawl_jobs=initial_event_jobs,
function_call_crawl_jobs=initial_function_call_jobs,
)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
@ -125,20 +141,33 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain_type)
subscription_type = blockchain_type_to_subscription_type(blockchain_type)
extend_tags = []
addresses_filter = []
if args.address is not None:
addresses_filter = [Web3.toChecksumAddress(args.address)]
if args.tasks_journal:
addresses_filter = []
extend_tags.extend(
[
"#moonworm_task_pickedup:True",
f"!#{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}",
]
)
all_event_jobs = make_event_crawl_jobs(
get_crawl_job_entries(
subscription_type,
"event",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
extend_tags=extend_tags,
)
)
filtered_event_jobs = []
for job in all_event_jobs:
if addresses_filter:
if addresses_filter and not args.tasks_journal:
intersection = [
address for address in job.contracts if address in addresses_filter
]
@ -155,21 +184,57 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
subscription_type,
"function",
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
extend_tags=extend_tags,
)
)
if addresses_filter:
filtered_function_call_jobs = [
job
for job in all_function_call_jobs
if job.contract_address in addresses_filter
]
filtered_function_call_jobs = [job for job in all_function_call_jobs]
else:
filtered_function_call_jobs = all_function_call_jobs
# get set of addresses from event jobs and function call jobs
if args.only_events:
filtered_function_call_jobs = []
logger.info(f"Removing function call crawl jobs since --only-events is set")
if args.only_functions:
filtered_event_jobs = []
logger.info(
f"Removing event crawl jobs since --only-functions is set. Function call jobs count: {len(filtered_function_call_jobs)}"
)
if args.only_events and args.only_functions:
raise ValueError(
"--only-events and --only-functions cannot be set at the same time"
)
if args.tasks_journal:
if len(filtered_event_jobs) > 0:
filtered_event_jobs = update_job_state_with_filters( # type: ignore
events=filtered_event_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pickedup:True",
],
tags_to_add=["historical_crawl_status:in_progress"],
tags_to_delete=["historical_crawl_status:pending"],
)
if len(filtered_function_call_jobs) > 0:
filtered_function_call_jobs = update_job_state_with_filters( # type: ignore
events=filtered_function_call_jobs,
address_filter=[],
required_tags=[
"historical_crawl_status:pending",
"moonworm_task_pickedup:True",
],
tags_to_add=["historical_crawl_status:in_progress"],
tags_to_delete=["historical_crawl_status:pending"],
)
logger.info(
f"Initial function call crawl jobs count: {len(filtered_function_call_jobs)}"
)
@ -198,7 +263,30 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
)
logger.info(f"Last labeled block: {last_labeled_block}")
start_block = args.start
addresses_deployment_blocks = None
end_block = args.end
# get set of addresses from event jobs and function call jobs
if args.find_deployed_blocks:
addresses_set = set()
for job in filtered_event_jobs:
addresses_set.update(job.contracts)
for function_job in filtered_function_call_jobs:
addresses_set.add(function_job.contract_address)
if args.start is None:
start_block = web3.eth.blockNumber - 1
addresses_deployment_blocks = find_all_deployed_blocks(
web3, list(addresses_set)
)
if len(addresses_deployment_blocks) == 0:
raise ValueError(
"No addresses found in the blockchain. Please check your addresses and try again"
)
end_block = min(addresses_deployment_blocks.values())
if start_block is None:
logger.info("No start block provided")
if last_labeled_block is not None:
@ -226,9 +314,9 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
else:
logger.info(f"Using start block: {start_block}")
if start_block < args.end:
if start_block < end_block:
raise ValueError(
f"Start block {start_block} is less than end block {args.end}. This crawler crawls in the reverse direction."
f"Start block {start_block} is less than end block {end_block}. This crawler crawls in the reverse direction."
)
historical_crawler(
@ -238,10 +326,11 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
filtered_event_jobs,
filtered_function_call_jobs,
start_block,
args.end,
end_block,
args.max_blocks_batch,
args.min_sleep_time,
access_id=args.access_id,
addresses_deployment_blocks=addresses_deployment_blocks,
)
@ -371,7 +460,7 @@ def main() -> None:
"--end",
"-e",
type=int,
required=True,
required=False,
)
historical_crawl_parser.add_argument(
"--blockchain-type",
@ -420,6 +509,24 @@ def main() -> None:
default=False,
help="Only crawl events",
)
historical_crawl_parser.add_argument(
"--only-functions",
action="store_true",
default=False,
help="Only crawl function calls",
)
historical_crawl_parser.add_argument(
"--find-deployed-blocks",
action="store_true",
default=False,
help="Find all deployed blocks",
)
historical_crawl_parser.add_argument(
"--tasks-journal",
action="store_true",
default=False,
help="Use tasks journal wich will fill all required fields for historical crawl",
)
historical_crawl_parser.set_defaults(func=handle_historical_crawl)
args = parser.parse_args()

Wyświetl plik

@ -24,10 +24,15 @@ from .crawler import (
make_function_call_crawl_jobs,
merge_event_crawl_jobs,
merge_function_call_crawl_jobs,
moonworm_crawler_update_job_as_pickedup,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events
from .function_call_crawler import _crawl_functions
from ..settings import (
HISTORICAL_CRAWLER_STATUSES,
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -220,6 +225,15 @@ def continuous_crawler(
event_crawl_jobs, function_call_crawl_jobs = _refetch_new_jobs(
event_crawl_jobs, function_call_crawl_jobs, blockchain_type
)
(
event_crawl_jobs,
function_call_crawl_jobs,
) = moonworm_crawler_update_job_as_pickedup(
event_crawl_jobs=event_crawl_jobs,
function_call_crawl_jobs=function_call_crawl_jobs,
)
jobs_refetchet_time = current_time
if current_time - last_heartbeat_time > timedelta(

Wyświetl plik

@ -5,13 +5,14 @@ import time
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, cast
from typing import Any, Callable, Dict, List, Optional, cast, Union, Tuple
from uuid import UUID
from bugout.data import BugoutSearchResult
from bugout.data import BugoutSearchResult, BugoutJournalEntries
from eth_typing.evm import ChecksumAddress
from moonstreamdb.blockchain import AvailableBlockchainType
from web3.main import Web3
from moonworm.deployment import find_deployment_block # type: ignore
from ..blockchain import connect
from ..reporter import reporter
@ -20,6 +21,8 @@ from ..settings import (
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
bugout_client,
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES,
HISTORICAL_CRAWLER_STATUSES,
)
logging.basicConfig(level=logging.INFO)
@ -145,6 +148,7 @@ class EventCrawlJob:
event_abi_hash: str
event_abi: Dict[str, Any]
contracts: List[ChecksumAddress]
address_entries: Dict[ChecksumAddress, Dict[UUID, List[str]]]
created_at: int
@ -152,6 +156,7 @@ class EventCrawlJob:
class FunctionCallCrawlJob:
contract_abi: List[Dict[str, Any]]
contract_address: ChecksumAddress
entries_tags: Dict[UUID, List[str]]
created_at: int
@ -161,6 +166,7 @@ def get_crawl_job_entries(
journal_id: str = MOONSTREAM_MOONWORM_TASKS_JOURNAL,
created_at_filter: Optional[int] = None,
limit: int = 200,
extend_tags: Optional[List[str]] = None,
) -> List[BugoutSearchResult]:
"""
Get all event ABIs from bugout journal
@ -172,6 +178,10 @@ def get_crawl_job_entries(
"""
query = f"#status:active #type:{crawler_type} #subscription_type:{subscription_type.value}"
if extend_tags is not None:
for tag in extend_tags:
query += f" {tag.rstrip()}"
if created_at_filter is not None:
# Filtering by created_at
# Filtering not by strictly greater than
@ -201,6 +211,32 @@ def get_crawl_job_entries(
return entries
def find_all_deployed_blocks(
web3: Web3, addresses_set: List[ChecksumAddress]
) -> Dict[ChecksumAddress, int]:
"""
find all deployed blocks for given addresses
"""
all_deployed_blocks = {}
for address in addresses_set:
try:
code = web3.eth.getCode(address)
if code != "0x":
block = find_deployment_block(
web3_client=web3,
contract_address=address,
web3_interval=0.5,
)
if block is not None:
all_deployed_blocks[address] = block
if block is None:
logger.error(f"Failed to get deployment block for {address}")
except Exception as e:
logger.error(f"Failed to get code for {address}: {e}")
return all_deployed_blocks
def _get_tag(entry: BugoutSearchResult, tag: str) -> str:
for entry_tag in entry.tags:
if entry_tag.startswith(tag):
@ -219,16 +255,23 @@ def make_event_crawl_jobs(entries: List[BugoutSearchResult]) -> List[EventCrawlJ
abi_hash = _get_tag(entry, "abi_method_hash")
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji
existing_crawl_job = crawl_job_by_hash.get(abi_hash)
if existing_crawl_job is not None:
if contract_address not in existing_crawl_job.contracts:
existing_crawl_job.contracts.append(contract_address)
existing_crawl_job.address_entries[contract_address] = {
entry_id: entry.tags
}
else:
abi = cast(str, entry.content)
new_crawl_job = EventCrawlJob(
event_abi_hash=abi_hash,
event_abi=json.loads(abi),
contracts=[contract_address],
address_entries={contract_address: {entry_id: entry.tags}},
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
crawl_job_by_hash[abi_hash] = new_crawl_job
@ -247,15 +290,18 @@ def make_function_call_crawl_jobs(
method_signature_by_address: Dict[str, List[str]] = {}
for entry in entries:
entry_id = UUID(entry.entry_url.split("/")[-1]) # crying emoji
contract_address = Web3().toChecksumAddress(_get_tag(entry, "address"))
abi = json.loads(cast(str, entry.content))
method_signature = encode_function_signature(abi)
if method_signature is None:
raise ValueError(f"{abi} is not a function ABI")
if contract_address not in crawl_job_by_address:
crawl_job_by_address[contract_address] = FunctionCallCrawlJob(
contract_abi=[abi],
contract_address=contract_address,
entries_tags={entry_id: entry.tags},
created_at=int(datetime.fromisoformat(entry.created_at).timestamp()),
)
method_signature_by_address[contract_address] = [method_signature]
@ -264,6 +310,9 @@ def make_function_call_crawl_jobs(
if method_signature not in method_signature_by_address[contract_address]:
crawl_job_by_address[contract_address].contract_abi.append(abi)
method_signature_by_address[contract_address].append(method_signature)
crawl_job_by_address[contract_address].entries_tags[
entry_id
] = entry.tags
return [crawl_job for crawl_job in crawl_job_by_address.values()]
@ -392,3 +441,268 @@ def heartbeat(
tags=[crawler_type, "heartbeat", blockchain_type.value, "dead"],
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
def bugout_state_update(
entries_tags_add: List[Dict[str, Any]],
entries_tags_delete: List[Dict[str, Any]],
) -> BugoutJournalEntries:
"""
Run update of entries tags in bugout
First add tags to entries
Second delete tags from entries
With condition that if first step failed, second step will not be executed
"""
new_entreis_state = BugoutJournalEntries(entries=[])
if len(entries_tags_add) > 0:
try:
new_entreis_state = bugout_client.create_entries_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries_tags=entries_tags_add,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except Exception as e:
logger.error(f"Failed to add tags to entries: {e}")
if len(entries_tags_delete) > 0 and (
len(entries_tags_add) < 0 or len(new_entreis_state.entries) > 0
):
try:
new_entreis_state = bugout_client.delete_entries_tags(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entries_tags=entries_tags_delete,
timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS,
)
except Exception as e:
logger.error(f"Failed to delete tags from entries: {e}")
return new_entreis_state
def moonworm_crawler_update_job_as_pickedup(
event_crawl_jobs: List[EventCrawlJob],
function_call_crawl_jobs: List[FunctionCallCrawlJob],
) -> Tuple[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Apply jobs of moonworm as taked to process
"""
if len(event_crawl_jobs) > 0:
event_crawl_jobs = update_job_state_with_filters( # type: ignore
events=event_crawl_jobs,
address_filter=[],
required_tags=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}",
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False",
],
tags_to_add=["moonworm_task_pickedup:True"],
tags_to_delete=["moonworm_task_pickedup:False"],
)
if len(function_call_crawl_jobs) > 0:
function_call_crawl_jobs = update_job_state_with_filters( # type: ignore
events=function_call_crawl_jobs,
address_filter=[],
required_tags=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['pending']}",
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False",
],
tags_to_add=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:True"
],
tags_to_delete=[
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['moonworm_status']}:False"
],
)
return event_crawl_jobs, function_call_crawl_jobs
def update_job_tags(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
new_entreis_state: BugoutJournalEntries,
):
"""
Update tags of the jobs in job object
"""
entry_tags_by_id = {entry.id: entry.tags for entry in new_entreis_state.entries}
for event in events:
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.address_entries.items():
for entry_id in entries_ids.keys():
if entry_id in entry_tags_by_id:
event.address_entries[contract_address][
entry_id
] = entry_tags_by_id[entry_id]
if isinstance(event, FunctionCallCrawlJob):
for entry_id in event.entries_tags.keys():
if entry_id in entry_tags_by_id:
event.entries_tags[entry_id] = entry_tags_by_id[entry_id]
return events
def update_job_state_with_filters(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
address_filter: List[ChecksumAddress],
required_tags: List[str],
tags_to_add: List[str] = [],
tags_to_delete: List[str] = [],
) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Function that updates the state of the job in bugout.
"""
entries_ids_to_update: List[UUID] = []
### TODO: refactor this function
if len(tags_to_add) == 0 and len(tags_to_delete) == 0:
return events
for event in events:
# events
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.address_entries.items():
if address_filter and contract_address not in address_filter:
continue
for entry_id, tags in entries_ids.items():
if set(required_tags).issubset(set(tags)):
entries_ids_to_update.append(entry_id)
# functions
if isinstance(event, FunctionCallCrawlJob):
if address_filter and event.contract_address not in address_filter:
continue
for entry_id, tags in event.entries_tags.items():
if set(required_tags).issubset(set(tags)):
entries_ids_to_update.append(entry_id)
if len(entries_ids_to_update) == 0:
return events
new_entries_state = bugout_state_update(
entries_tags_add=[
{"entry_id": entry_id, "tags": tags_to_add}
for entry_id in entries_ids_to_update
],
entries_tags_delete=[
{"entry_id": entry_id, "tags": tags_to_delete}
for entry_id in entries_ids_to_update
],
)
events = update_job_tags(events, new_entries_state)
return events
def update_entries_status_and_progress(
events: Union[List[EventCrawlJob], List[FunctionCallCrawlJob]],
progess_map: Dict[ChecksumAddress, float],
) -> Union[List[EventCrawlJob], List[FunctionCallCrawlJob]]:
"""
Update entries status and progress in mooncrawl bugout journal
"""
entries_tags_delete: List[Dict[str, Any]] = []
entries_tags_add: List[Dict[str, Any]] = []
for event in events:
if isinstance(event, EventCrawlJob):
for contract_address, entries_ids in event.address_entries.items():
progress = round(progess_map.get(contract_address, 0), 4) * 100
(
entries_tags_delete,
entries_tags_add,
) = add_progress_to_tags(
entries=entries_ids,
contract_progress=progress,
entries_tags_delete=entries_tags_delete,
entries_tags_add=entries_tags_add,
)
if isinstance(event, FunctionCallCrawlJob):
progress = round(progess_map.get(event.contract_address, 0), 4) * 100
(
entries_tags_delete,
entries_tags_add,
) = add_progress_to_tags(
entries=entries_ids,
contract_progress=progress,
entries_tags_delete=entries_tags_delete,
entries_tags_add=entries_tags_add,
)
new_entries_state = bugout_state_update(
entries_tags_add=entries_tags_add,
entries_tags_delete=entries_tags_delete,
)
events = update_job_tags(events, new_entries_state)
return events
def add_progress_to_tags(
entries: Dict[UUID, List[str]],
contract_progress: float,
entries_tags_delete: List[Dict[str, Any]],
entries_tags_add: List[Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Calculate progress and add finished tag if progress is 100
"""
new_progress = f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}:{contract_progress}"
for entry_id, tags in entries.items():
# progress
if (
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
in tags
):
continue
if new_progress not in tags:
entries_tags_delete.append(
{
"entry_id": entry_id,
"tags": [
tag
for tag in tags
if tag.startswith(
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['progress_status']}"
)
],
}
)
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [new_progress],
}
)
if contract_progress >= 100:
entries_tags_add.append(
{
"entry_id": entry_id,
"tags": [
f"{HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES['historical_crawl_status']}:{HISTORICAL_CRAWLER_STATUSES['finished']}"
],
}
)
return entries_tags_delete, entries_tags_add

Wyświetl plik

@ -3,6 +3,7 @@ import time
from typing import Dict, List, Optional, Tuple
from uuid import UUID
from eth_typing.evm import ChecksumAddress
from moonstreamdb.blockchain import AvailableBlockchainType
from moonworm.crawler.moonstream_ethereum_state_provider import ( # type: ignore
MoonstreamEthereumStateProvider,
@ -11,7 +12,12 @@ from moonworm.crawler.networks import Network # type: ignore
from sqlalchemy.orm.session import Session
from web3 import Web3
from .crawler import EventCrawlJob, FunctionCallCrawlJob, _retry_connect_web3
from .crawler import (
EventCrawlJob,
FunctionCallCrawlJob,
_retry_connect_web3,
update_entries_status_and_progress,
)
from .db import add_events_to_session, add_function_calls_to_session, commit_session
from .event_crawler import _crawl_events, _autoscale_crawl_events
from .function_call_crawler import _crawl_functions
@ -31,6 +37,7 @@ def historical_crawler(
max_blocks_batch: int = 100,
min_sleep_time: float = 0.1,
access_id: Optional[UUID] = None,
addresses_deployment_blocks: Optional[Dict[ChecksumAddress, int]] = None,
):
assert max_blocks_batch > 0, "max_blocks_batch must be greater than 0"
assert min_sleep_time > 0, "min_sleep_time must be greater than 0"
@ -60,6 +67,10 @@ def historical_crawler(
blocks_cache: Dict[int, int] = {}
failed_count = 0
original_start_block = start_block
progess_map: Dict[ChecksumAddress, float] = {}
while start_block >= end_block:
try:
time.sleep(min_sleep_time)
@ -119,6 +130,27 @@ def historical_crawler(
db_session, all_function_calls, blockchain_type
)
if addresses_deployment_blocks:
for address, deployment_block in addresses_deployment_blocks.items():
current_position = batch_end_block
progess = (original_start_block - current_position) / (
original_start_block - deployment_block
)
progess_map[address] = progess
if len(function_call_crawl_jobs) > 0:
function_call_crawl_jobs = update_entries_status_and_progress( # type: ignore
events=function_call_crawl_jobs,
progess_map=progess_map,
)
if len(event_crawl_jobs) > 0:
event_crawl_jobs = update_entries_status_and_progress( # type: ignore
events=event_crawl_jobs,
progess_map=progess_map,
)
# Commiting to db
commit_session(db_session)

Wyświetl plik

@ -279,3 +279,26 @@ infura_networks = {
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION = "entity_subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"
# Historical crawler status config
HISTORICAL_CRAWLER_STATUSES = {
"pending": "pending",
"running": "running",
"finished": "finished",
}
# Historical crawler moonworm status config
HISTORICAL_CRAWLER_MOONWORM_STATUSES = {
"pickedup": True,
}
# Statuses tags prefixes
HISTORICAL_CRAWLER_STATUS_TAG_PREFIXES = {
"moonworm_status": "moonworm_task_pickedup",
"historical_crawl_status": "historical_crawl_status",
"progress_status": "progress",
}

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONCRAWL_VERSION = "0.3.0"
MOONCRAWL_VERSION = "0.3.1"

Wyświetl plik

@ -34,7 +34,7 @@ setup(
zip_safe=False,
install_requires=[
"boto3",
"bugout>=0.1.19",
"bugout>=0.2.8",
"chardet",
"fastapi",
"moonstreamdb>=0.3.3",