Merge pull request #86 from zomglings/crawlers-improve-synchronization

"--confirmations" argument for Ethereum crawler
pull/85/head
Neeraj Kashyap 2021-08-06 10:37:06 -07:00 zatwierdzone przez GitHub
commit 9d41941403
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
9 zmienionych plików z 74 dodań i 42 usunięć

Wyświetl plik

@ -1,4 +1,4 @@
# moonstream crawlers
# Moonstream Crawlers
## Installation
@ -24,13 +24,13 @@ This crawler retrieves Ethereum function signatures from the Ethereum Signature
#### Crawling ESD function signatures
```bash
python -m moonstreamcrawlers.esd --interval 0.3 functions
python -m mooncrawl.esd --interval 0.3 functions
```
#### Crawling ESD event signatures
```bash
python -m moonstreamcrawlers.esd --interval 0.3 events
python -m mooncrawl.esd --interval 0.3 events
```
### Ethereum contract registrar
@ -41,17 +41,17 @@ addresses from transaction receipts.
To run this crawler:
```bash
python -m moonstreamcrawlers.cli ethcrawler contracts update
python -m mooncrawl.cli ethcrawler contracts update
```
Output is JSON list of pairs `[..., (<transaction_hash>, <contract_address>), ...]`, so you can pipe to `jq`:
```bash
python -m moonstreamcrawlers.cli ethcrawler contracts update | jq .
python -m mooncrawl.cli ethcrawler contracts update | jq .
```
You can also specify an output file:
```bash
python -m moonstreamcrawlers.cli ethcrawler contracts update -o new_contracts.json
python -m mooncrawl.cli ethcrawler contracts update -o new_contracts.json
```

Wyświetl plik

@ -82,9 +82,7 @@ def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None:
"""
starting_block: int = args.start
while True:
bottom_block_number, top_block_number = get_latest_blocks(
with_transactions=not args.notransactions
)
bottom_block_number, top_block_number = get_latest_blocks(args.confirmations)
bottom_block_number = max(bottom_block_number + 1, starting_block)
if bottom_block_number >= top_block_number:
print(
@ -218,6 +216,13 @@ def main() -> None:
default=0,
help="(Optional) Block to start synchronization from. Default: 0",
)
parser_ethcrawler_blocks_sync.add_argument(
"-c",
"--confirmations",
type=int,
default=0,
help="Number of confirmations we require before storing a block in the database. (Default: 0)",
)
parser_ethcrawler_blocks_sync.add_argument(
"--order",
type=processing_order,

Wyświetl plik

@ -14,6 +14,12 @@ from moonstreamdb.models import (
)
class EthereumBlockCrawlError(Exception):
"""
Raised when there is a problem crawling Ethereum blocks.
"""
def connect(web3_uri: Optional[str] = MOONSTREAM_IPC_PATH):
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
if web3_uri is not None:
@ -71,24 +77,29 @@ def add_block_transactions(db_session, block: BlockData) -> None:
db_session.add(tx_obj)
def get_latest_blocks(with_transactions: bool = False) -> Tuple[Optional[int], int]:
def get_latest_blocks(confirmations: int = 0) -> Tuple[Optional[int], int]:
"""
Retrieve the latest block from the connected node (connection is created by the connect() method).
If confirmations > 0, and the latest block on the node has block number N, this returns the block
with block_number (N - confirmations)
"""
web3_client = connect()
block_latest: BlockData = web3_client.eth.get_block(
"latest", full_transactions=with_transactions
)
latest_block_number: int = web3_client.eth.block_number
if confirmations > 0:
latest_block_number -= confirmations
with yield_db_session_ctx() as db_session:
block_number_latest_exist_row = (
latest_stored_block_row = (
db_session.query(EthereumBlock.block_number)
.order_by(EthereumBlock.block_number.desc())
.first()
)
block_number_latest_exist = (
None
if block_number_latest_exist_row is None
else block_number_latest_exist_row[0]
latest_stored_block_number = (
None if latest_stored_block_row is None else latest_stored_block_row[0]
)
return block_number_latest_exist, block_latest.number
return latest_stored_block_number, latest_block_number
def crawl_blocks(
@ -98,20 +109,31 @@ def crawl_blocks(
Open database and geth sessions and fetch block data from blockchain.
"""
web3_client = connect()
for block_number in blocks_numbers:
with yield_db_session_ctx() as db_session:
block: BlockData = web3_client.eth.get_block(
block_number, full_transactions=with_transactions
)
add_block(db_session, block)
with yield_db_session_ctx() as db_session:
for block_number in blocks_numbers:
try:
block: BlockData = web3_client.eth.get_block(
block_number, full_transactions=with_transactions
)
add_block(db_session, block)
if with_transactions:
add_block_transactions(db_session, block)
if with_transactions:
add_block_transactions(db_session, block)
db_session.commit()
db_session.commit()
except Exception as err:
db_session.rollback()
message = f"Error adding block (number={block_number}) to database:\n{repr(err)}"
raise EthereumBlockCrawlError(message)
except:
db_session.rollback()
print(
f"Interrupted while adding block (number={block_number}) to database."
)
raise
if verbose:
print(f"Added {block_number} block")
print(f"Added block: {block_number}")
def check_missing_blocks(blocks_numbers: List[int]) -> List[int]:
@ -143,6 +165,15 @@ def crawl_blocks_executor(
) -> None:
"""
Execute crawler in processes.
Args:
block_numbers_list - List of block numbers to add to database.
with_transactions - If True, also adds transactions from those blocks to the ethereum_transactions table.
verbose - Print logs to stdout?
num_processes - Number of processes to use to feed blocks into database.
Returns nothing, but if there was an error processing the given blocks it raises an EthereumBlocksCrawlError.
The error message is a list of all the things that went wrong in the crawl.
"""
errors: List[Exception] = []
@ -173,12 +204,10 @@ def crawl_blocks_executor(
results.append(result)
wait(results)
# TODO(kompotkot): Return list of errors and colors responsible for
# handling errors
if len(errors) > 0:
print("Errors:")
for error in errors:
print(f"- {error}")
error_messages = "\n".join([f"- {error}" for error in errors])
message = f"Error processing blocks in list:\n{error_messages}"
raise EthereumBlockCrawlError(message)
def process_contract_deployments() -> List[Tuple[str, str]]:

Wyświetl plik

@ -2,4 +2,4 @@
Moonstream crawlers version.
"""
MOONSTREAMCRAWLERS_VERSION = "0.0.1"
MOONCRAWL_VERSION = "0.0.2"

Wyświetl plik

@ -1,14 +1,14 @@
from setuptools import find_packages, setup
from moonstreamcrawlers.version import MOONSTREAMCRAWLERS_VERSION
from mooncrawl.version import MOONCRAWL_VERSION
long_description = ""
with open("README.md") as ifp:
long_description = ifp.read()
setup(
name="moonstreamcrawlers",
version=MOONSTREAMCRAWLERS_VERSION,
name="mooncrawl",
version=MOONCRAWL_VERSION,
author="Bugout.dev",
author_email="engineers@bugout.dev",
license="Apache License 2.0",
@ -30,7 +30,7 @@ setup(
],
python_requires=">=3.6",
packages=find_packages(),
package_data={"moonstreamcrawlers": ["py.typed"]},
package_data={"mooncrawl": ["py.typed"]},
zip_safe=False,
install_requires=[
"moonstreamdb @ git+https://git@github.com/bugout-dev/moonstream.git@ec3278e192119d1e8a273cfaab6cb53890d2e8e9#egg=moonstreamdb&subdirectory=db",
@ -39,7 +39,5 @@ setup(
"web3",
],
extras_require={"dev": ["black", "mypy", "types-requests"]},
entry_points={
"console_scripts": ["moonstreamcrawlers=moonstreamcrawlers.cli:main"]
},
entry_points={"console_scripts": ["mooncrawl=mooncrawl.cli:main"]},
)