Working version of crawler: added checkpointing

pull/17/head
yhtiyar 2021-11-15 14:41:28 +03:00
rodzic cb0a0f6652
commit 5a681244a1
3 zmienionych plików z 240 dodań i 10 usunięć

Wyświetl plik

@ -9,7 +9,7 @@ from web3.middleware import geth_poa_middleware
from moonworm.watch import watch_contract
from moonworm.cu_watch import watch_cu_contract
from moonstreamdb.db import yield_db_session_ctx
from .contracts import CU, ERC20, ERC721
from .generator import (
@ -100,11 +100,14 @@ def handle_watch(args: argparse.Namespace) -> None:
def handle_watch_cu(args: argparse.Namespace) -> None:
MOONSTREAM_DB_URI = os.environ.get("MOONSTREAM_DB_URI")
if not MOONSTREAM_DB_URI:
print("Please set MOONSTREAM_DB_URI environment variable")
return
from moonstreamdb.db import yield_db_session_ctx
if args.abi is not None:
with open(args.abi, "r") as ifp:
contract_abi = json.load(ifp)

Wyświetl plik

@ -1,3 +1,4 @@
from os import stat
import pprint as pp
from re import S
import time
@ -23,6 +24,25 @@ from .crawler.log_scanner import _fetch_events_chunk
from sqlalchemy.orm import Query, Session
def _get_last_crawled_block(
session: Session, contract_address: ChecksumAddress
) -> Optional[int]:
"""
Gets the last block that was crawled.
"""
query = (
session.query(PolygonLabel)
.filter(
PolygonLabel.label == "moonworm",
PolygonLabel.address == contract_address,
)
.order_by(PolygonLabel.block_number.desc())
)
if query.count():
return query.first().block_number
return None
def _add_function_call_labels(
session: Session,
function_calls: List[ContractFunctionCall],
@ -30,6 +50,33 @@ def _add_function_call_labels(
"""
Adds a label to a function call.
"""
existing_function_call_labels = (
session.query(PolygonLabel)
.filter(
PolygonLabel.label == "moonworm",
PolygonLabel.log_index == None,
PolygonLabel.transaction_hash.in_(
[call.transaction_hash for call in function_calls]
),
)
.all()
)
print(f"{len(existing_function_call_labels)} existing labels")
# deletin existing labels
for label in existing_function_call_labels:
session.delete(label)
try:
if existing_function_call_labels:
print(
f"Deleting {len(existing_function_call_labels)} existing event labels"
)
session.commit()
except Exception as e:
print(f"Failed!!!\n{e}")
session.rollback()
for function_call in function_calls:
label = PolygonLabel(
label="moonworm",
@ -59,6 +106,30 @@ def _add_event_labels(session: Session, events: List[Dict[str, Any]]) -> None:
"""
Adds events to database.
"""
transactions = [event["transactionHash"] for event in events]
existing_event_labels = (
session.query(PolygonLabel)
.filter(
PolygonLabel.label == "moonworm",
PolygonLabel.transaction_hash.in_(transactions),
PolygonLabel.log_index != None,
)
.all()
)
# deletin existing labels
for label in existing_event_labels:
session.delete(label)
try:
if existing_event_labels:
print(f"Deleting {len(existing_event_labels)} existing event labels")
session.commit()
except Exception as e:
print(f"Failed!!!\n{e}")
session.rollback()
for event in events:
label = PolygonLabel(
label="moonworm",
@ -75,7 +146,6 @@ def _add_event_labels(session: Session, events: List[Dict[str, Any]]) -> None:
)
session.add(label)
try:
print("Committing event labels to database...")
session.commit()
except Exception as e:
print(f"Failed!!!\n{e}")
@ -125,12 +195,26 @@ def watch_cu_contract(
[web3.toChecksumAddress(contract_address)],
)
event_abis = [item for item in contract_abi if item["type"] == "event"]
last_crawled_block = _get_last_crawled_block(session, contract_address)
if start_block is None:
current_block = web3.eth.blockNumber - num_confirmations * 2
if last_crawled_block is not None:
current_block = last_crawled_block
print(f"Starting from block {current_block}, last crawled block")
else:
current_block = web3.eth.blockNumber - num_confirmations * 2
print(f"Starting from block {current_block}, current block")
else:
current_block = start_block
if last_crawled_block is not None:
if start_block > last_crawled_block:
print(
f"Starting from block {start_block}, last crawled block {last_crawled_block}"
)
else:
current_block = last_crawled_block
print(f"Starting from last crawled block {start_block}")
event_abis = [item for item in contract_abi if item["type"] == "event"]
progress_bar = tqdm(unit=" blocks")
progress_bar.set_description(f"Current block {current_block}")
@ -146,9 +230,7 @@ def watch_cu_contract(
crawler.crawl(current_block, end_block)
if state.state:
_add_function_call_labels(session, state.state)
print("Got transaction calls:")
for call in state.state:
pp.pprint(call, width=200, indent=4)
print(f"Got {len(state.state)} transaction calls:")
state.flush()
for event_abi in event_abis:
@ -161,7 +243,6 @@ def watch_cu_contract(
)
all_events = []
for raw_event in raw_events:
print("Got event:")
event = {
"event": raw_event["event"],
"args": json.loads(Web3.toJSON(raw_event["args"])),
@ -174,7 +255,6 @@ def watch_cu_contract(
"logIndex": raw_event["logIndex"],
}
all_events.append(event)
pp.pprint(event, width=200, indent=4)
if all_events:
_add_event_labels(session, all_events)

Wyświetl plik

@ -717,6 +717,77 @@
"stateMutability": "nonpayable",
"type": "function"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "previousOwner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "newOwner",
"type": "address"
}
],
"name": "OwnershipTransferred",
"type": "event"
},
{
"inputs": [
{
"internalType": "address",
"name": "_newAddress",
"type": "address"
}
],
"name": "changeGameServerAddress",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [],
"name": "gameServer",
"outputs": [
{
"internalType": "address",
"name": "",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "owner",
"outputs": [
{
"internalType": "address",
"name": "owner_",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "_newOwner",
"type": "address"
}
],
"name": "transferOwnership",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"anonymous": false,
"inputs": [
@ -735,5 +806,81 @@
],
"name": "DNAUpdated",
"type": "event"
},
{
"inputs": [
{
"internalType": "bytes4",
"name": "_functionSelector",
"type": "bytes4"
}
],
"name": "facetAddress",
"outputs": [
{
"internalType": "address",
"name": "facetAddress_",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "facetAddresses",
"outputs": [
{
"internalType": "address[]",
"name": "facetAddresses_",
"type": "address[]"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "_facet",
"type": "address"
}
],
"name": "facetFunctionSelectors",
"outputs": [
{
"internalType": "bytes4[]",
"name": "facetFunctionSelectors_",
"type": "bytes4[]"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "facets",
"outputs": [
{
"components": [
{
"internalType": "address",
"name": "facetAddress",
"type": "address"
},
{
"internalType": "bytes4[]",
"name": "functionSelectors",
"type": "bytes4[]"
}
],
"internalType": "struct IDiamondLoupe.Facet[]",
"name": "facets_",
"type": "tuple[]"
}
],
"stateMutability": "view",
"type": "function"
}
]