Merge pull request #39 from zomglings/crawlers-ethcrawler-sync

Crawlers ethcrawler sync
pull/23/head
Neeraj Kashyap 2021-08-02 08:36:46 -07:00 zatwierdzone przez GitHub
commit 6d4b4ddef8
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
3 zmienionych plików z 60 dodań i 15 usunięć

Wyświetl plik

@ -54,11 +54,12 @@ def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None:
"""
Synchronize latest Ethereum blocks with database.
"""
starting_block: int = args.start
while True:
bottom_block_number, top_block_number = get_latest_blocks(
bool(strtobool(args.transactions))
)
bottom_block_number = bottom_block_number + 1
bottom_block_number = max(bottom_block_number + 1, starting_block)
if bottom_block_number >= top_block_number:
print(
f"Synchronization is unnecessary for blocks {bottom_block_number}-{top_block_number - 1}"
@ -178,6 +179,13 @@ def main() -> None:
default="True",
help="Add or not block transactions",
)
parser_ethcrawler_blocks_sync.add_argument(
"-s",
"--start",
type=int,
default=0,
help="(Optional) Block to start synchronization from. Default: 0",
)
parser_ethcrawler_blocks_sync.set_defaults(func=ethcrawler_blocks_sync_handler)
parser_ethcrawler_blocks_add = subcommands_ethcrawler_blocks.add_parser(

Wyświetl plik

@ -18,13 +18,18 @@ DB_MODELS = {
"events": ESDEventSignature,
}
def crawl_step(db_session: Session, crawl_url: str, db_model: Union[ESDEventSignature, ESDFunctionSignature]) -> Optional[str]:
def crawl_step(
db_session: Session,
crawl_url: str,
db_model: Union[ESDEventSignature, ESDFunctionSignature],
) -> Optional[str]:
attempt = 0
current_interval = 2
success = False
response: Optional[requests.Response] = None
while (not success) and attempt < 3:
while (not success) and attempt < 3:
attempt += 1
try:
response = requests.get(crawl_url)
@ -41,12 +46,21 @@ def crawl_step(db_session: Session, crawl_url: str, db_model: Union[ESDEventSign
page = response.json()
results = page.get("results", [])
rows = [db_model(id=row.get("id"), text_signature=row.get("text_signature"), hex_signature=row.get("hex_signature"), created_at=row.get("created_at")) for row in results]
rows = [
db_model(
id=row.get("id"),
text_signature=row.get("text_signature"),
hex_signature=row.get("hex_signature"),
created_at=row.get("created_at"),
)
for row in results
]
db_session.bulk_save_objects(rows)
db_session.commit()
return page.get("next")
def crawl(crawl_type: str, interval: float) -> None:
crawl_url: Optional[str] = CRAWL_URLS[crawl_type]
db_model = DB_MODELS[crawl_type]
@ -56,13 +70,26 @@ def crawl(crawl_type: str, interval: float) -> None:
crawl_url = crawl_step(db_session, crawl_url, db_model)
time.sleep(interval)
def main():
parser = argparse.ArgumentParser(description="Crawls function and event signatures from the Ethereum Signature Database (https://www.4byte.directory/)")
parser.add_argument("crawl_type", choices=CRAWL_URLS, help="Specifies whether to crawl function signatures or event signatures")
parser.add_argument("--interval", type=float, default=0.1, help="Number of seconds to wait between requests to the Ethereum Signature Database API")
parser = argparse.ArgumentParser(
description="Crawls function and event signatures from the Ethereum Signature Database (https://www.4byte.directory/)"
)
parser.add_argument(
"crawl_type",
choices=CRAWL_URLS,
help="Specifies whether to crawl function signatures or event signatures",
)
parser.add_argument(
"--interval",
type=float,
default=0.1,
help="Number of seconds to wait between requests to the Ethereum Signature Database API",
)
args = parser.parse_args()
crawl(args.crawl_type, args.interval)
if __name__ == "__main__":
main()
main()

Wyświetl plik

@ -1,8 +1,8 @@
from concurrent.futures import Future, ProcessPoolExecutor, wait
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, Union
from sqlalchemy import desc
from web3 import Web3
from web3 import Web3, IPCProvider, HTTPProvider
from web3.types import BlockData
from .settings import MOONSTREAM_IPC_PATH, MOONSTREAM_CRAWL_WORKERS
@ -14,9 +14,14 @@ from moonstreamdb.models import (
)
# TODO(kompotkot): Write logic to chose between http and ipc
def connect(ipc_path: str = MOONSTREAM_IPC_PATH):
web3_client = Web3(Web3.IPCProvider(ipc_path))
def connect(web3_uri: Optional[str] = MOONSTREAM_IPC_PATH):
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
if web3_uri is not None:
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
web3_provider = Web3.HTTPProvider(web3_uri)
else:
web3_provider = Web3.IPCProvider(web3_uri)
web3_client = Web3(web3_provider)
return web3_client
@ -66,17 +71,22 @@ def add_block_transactions(db_session, block: BlockData) -> None:
db_session.add(tx_obj)
def get_latest_blocks(with_transactions: bool = False) -> None:
def get_latest_blocks(with_transactions: bool = False) -> Tuple[Optional[int], int]:
web3_client = connect()
block_latest: BlockData = web3_client.eth.get_block(
"latest", full_transactions=with_transactions
)
with yield_db_session_ctx() as db_session:
block_number_latest_exist = (
block_number_latest_exist_row = (
db_session.query(EthereumBlock.block_number)
.order_by(EthereumBlock.block_number.desc())
.first()
)
block_number_latest_exist = (
None
if block_number_latest_exist_row is None
else block_number_latest_exist_row[0]
)
return block_number_latest_exist, block_latest.number