added batching support for request to node

pull/304/head
yhtiyar 2021-09-29 00:25:01 +03:00
rodzic c4db62be65
commit 9a16ad207b
1 zmienionych plików z 74 dodań i 65 usunięć

Wyświetl plik

@ -1,6 +1,6 @@
import logging
import sqlite3
from typing import Any, Iterator, List, Optional
from typing import Any, Iterator, List, Optional, Set
import json
from moonstreamdb.models import (
@ -15,7 +15,7 @@ from tqdm import tqdm
from web3 import Web3
import requests
from .data import BlockBounds, EventType, NFTEvent, event_types
from .data import BlockBounds, EventType, NFTEvent, event_types, nft_event
from .datastore import insert_events
logging.basicConfig(level=logging.INFO)
@ -32,7 +32,7 @@ class EthereumBatchloader:
def load_blocks(self, block_list: List[int], with_transactions: bool):
"""
Request list of blocks
Request list of blocks
"""
rpc = [
{
@ -48,22 +48,22 @@ class EthereumBatchloader:
response = self.send_json_message(rpc)
return response
def put_to_batch(self, command: str, *params, **kwargs):
def load_transactions(self, transaction_hashes: List[str]):
"""
Put command request to batch
Request list of transactions
"""
self.message_number += 1
rpc = {
"jsonrpc": "2.0",
"id": self.message_number,
"method": command,
"params": [i for i in params],
}
self.requests_banch.append(rpc)
self.commands.append(command)
rpc = [
{
"jsonrpc": "2.0",
"method": "eth_getTransactionByHash",
"id": index,
"params": [tx_hash],
}
for index, tx_hash in enumerate(transaction_hashes)
]
response = self.send_json_message(rpc)
return response
def send_message(self, payload):
headers = {"Content-Type": "application/json"}
@ -76,48 +76,53 @@ class EthereumBatchloader:
def send_json_message(self, message):
encoded_json = json.dumps(message)
response = self.send_message(encoded_json.encode("utf8")).json()
raw_response = self.send_message(encoded_json.encode("utf8"))
response = raw_response.json()
return response
def process_batch(self):
"""
Start processing batch of requests
return dict with banches of responces
"""
responses = self.send_json_message(self.requests_banch)
returned_objects = {}
for i, response in enumerate(responses):
if not returned_objects.get(self.commands[i]):
returned_objects[self.commands[i]] = [response]
else:
returned_objects[self.commands[i]].append(response)
self.commands.clear()
self.requests_banch.clear()
self.message_number = 0
return returned_objects
def enrich_from_web3(web3_client: Web3, nft_event: NFTEvent) -> NFTEvent:
def enrich_from_web3(
web3_client: Web3, nft_events: NFTEvent, batch_loader: EthereumBatchloader
) -> NFTEvent:
"""
Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db)
"""
if (
nft_event.block_number is None
or nft_event.value is None
or nft_event.timestamp is None
):
logger.info("Enriching from web3")
transaction = web3_client.eth.get_transaction(nft_event.transaction_hash)
nft_event.value = transaction["value"]
nft_event.block_number = transaction["blockNumber"]
block = web3_client.eth.get_block(transaction["blockNumber"])
nft_event.timestamp = block["timestamp"]
return nft_event
transactions_to_query = {
nft_event.transaction_hash
for nft_event in nft_events
if (nft_event.value is None or nft_event.block_number is None)
}
jrpc_response = batch_loader.load_transactions(list(transactions_to_query))
transactions_map = {
result["result"]["hash"]: (
int(result["result"]["value"], 16),
int(result["result"]["blockNumber"], 16),
)
for result in jrpc_response
}
blocks_to_query: Set[int] = set()
indices_to_update: List[int] = []
for index, nft_event in enumerate(nft_events):
if (
nft_event.block_number is None
or nft_event.value is None
or nft_event.timestamp is None
):
nft_event.value, nft_event.block_number = transactions_map[
nft_event.transaction_hash
]
blocks_to_query.add(nft_event.block_number)
indices_to_update.append(index)
jrpc_response = batch_loader.load_blocks(list(blocks_to_query), False)
blocks_map = {
int(result["result"]["number"], 16): int(result["result"]["timestamp"], 16)
for result in jrpc_response
}
for index in indices_to_update:
nft_events[index].timestamp = blocks_map[nft_event.block_number]
return nft_events
def get_events(
@ -125,7 +130,7 @@ def get_events(
web3_client: Web3,
event_type: EventType,
bounds: Optional[BlockBounds] = None,
batch_size: int = 1000,
batch_size: int = 200,
) -> Iterator[NFTEvent]:
query = (
db_session.query(
@ -147,6 +152,7 @@ def get_events(
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(EthereumLabel.label == event_type.value)
.order_by(EthereumLabel.created_at)
)
if bounds is not None:
bounds_filters = [
@ -185,8 +191,7 @@ def get_events(
block_number=block_number,
timestamp=timestamp,
)
event = enrich_from_web3(web3_client, raw_event)
yield event
yield raw_event
def create_dataset(
@ -195,20 +200,24 @@ def create_dataset(
web3_client: Web3,
event_type: EventType,
bounds: Optional[BlockBounds] = None,
batch_size: int = 1000,
batch_size: int = 200,
batch_loader: EthereumBatchloader = None,
) -> None:
"""
Creates Moonstream NFTs dataset in the given SQLite datastore.
"""
events = get_events(db_session, web3_client, event_type, bounds, batch_size)
events_batch: List[NFTEvent] = []
for event in tqdm(events, desc="Events processed", colour="#DD6E0F"):
events_batch.append(event)
if len(events_batch) == batch_size:
raw_events = get_events(db_session, web3_client, event_type, bounds, batch_size)
raw_events_batch: List[NFTEvent] = []
for event in tqdm(raw_events, desc="Events processed", colour="#DD6E0F"):
raw_events_batch.append(event)
if len(raw_events_batch) == batch_size:
logger.info("Writing batch of events to datastore")
insert_events(datastore_conn, events_batch)
events_batch = []
insert_events(
datastore_conn,
enrich_from_web3(web3_client, raw_events_batch, batch_loader),
)
raw_events_batch = []
logger.info("Writing remaining events to datastore")
insert_events(datastore_conn, events_batch)
insert_events(
datastore_conn, enrich_from_web3(web3_client, raw_events_batch, batch_loader)
)