Add init version off jrps requests class.

pull/304/head
Andrey Dolgolev 2021-09-28 16:50:02 +03:00
rodzic 897bfda684
commit e4f1fc2d15
2 zmienionych plików z 97 dodań i 2 usunięć

Wyświetl plik

@ -10,7 +10,7 @@ from web3 import Web3, IPCProvider, HTTPProvider
from .data import event_types, nft_event, BlockBounds
from .datastore import setup_database
from .materialize import create_dataset
from .materialize import create_dataset, EthereumBatchloader
logging.basicConfig(level=logging.INFO)
@ -48,6 +48,8 @@ def handle_materialize(args: argparse.Namespace) -> None:
elif args.end is not None:
raise ValueError("You cannot set --end unless you also set --start")
batch_loader = EthereumBatchloader(jrpc_url=args.jrpc)
logger.info(f"Materializing NFT events to datastore: {args.datastore}")
logger.info(f"Block bounds: {bounds}")
@ -61,6 +63,7 @@ def handle_materialize(args: argparse.Namespace) -> None:
event_type,
bounds,
args.batch_size,
batch_loader
)
@ -103,6 +106,12 @@ def main() -> None:
type=web3_connection,
help=f"Web3 provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
)
parser_materialize.add_argument(
"--jrpc",
default=default_web3_provider,
type=str,
help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
)
parser_materialize.add_argument(
"-t",
"--type",

Wyświetl plik

@ -1,6 +1,9 @@
import logging
import sqlite3
from typing import Iterator, List, Optional
from typing import Any, Iterator, List, Optional
import json
from sqlalchemy.sql.expression import select
from moonstreamdb.models import (
EthereumAddress,
@ -12,6 +15,7 @@ from sqlalchemy import or_
from sqlalchemy.orm import Session
from tqdm import tqdm
from web3 import Web3
import requests
from .data import BlockBounds, EventType, NFTEvent, event_types
from .datastore import insert_events
@ -20,6 +24,86 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EthereumBatchloader:
def __init__(self, jrpc_url) -> None:
self.jrpc_url = jrpc_url
self.message_number = 0
self.commands: List[Any] = []
self.requests_banch: List[Any] = []
def load_blocks(self, block_list: List[int], with_transactions: bool):
"""
Request list of blocks
"""
rpc = [
{
"jsonrpc": "2.0",
"id": index,
"method": "eth_getBlockByNumber",
"params": params_single,
}
for index, params_single in enumerate(
[[hex(block_number), with_transactions] for block_number in block_list]
)
]
response = self.send_json_message(rpc)
return response
def put_to_batch(self, command: str, *params, **kwargs):
"""
Put command request to batch
"""
self.message_number += 1
rpc = {
"jsonrpc": "2.0",
"id": self.message_number,
"method": command,
"params": [i for i in params],
}
self.requests_banch.append(rpc)
self.commands.append(command)
def send_message(self, payload):
headers = {"Content-Type": "application/json"}
try:
r = requests.post(self.jrpc_url, headers=headers, data=payload)
except Exception as e:
print(e)
return r
def send_json_message(self, message):
encoded_json = json.dumps(message)
response = self.send_message(encoded_json.encode("utf8")).json()
return response
def process_batch(self):
"""
Start processing batch of requests
return dict with banches of responces
"""
responses = self.send_json_message(self.requests_banch)
returned_objects = {}
for i, response in enumerate(responses):
if not returned_objects.get(self.commands[i]):
returned_objects[self.commands[i]] = [response]
else:
returned_objects[self.commands[i]].append(response)
self.commands.clear()
self.requests_banch.clear()
self.message_number = 0
return returned_objects
def enrich_from_web3(web3_client: Web3, nft_event: NFTEvent) -> NFTEvent:
"""
Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db)
@ -114,6 +198,7 @@ def create_dataset(
event_type: EventType,
bounds: Optional[BlockBounds] = None,
batch_size: int = 1000,
batch_loader: EthereumBatchloader = None,
) -> None:
"""
Creates Moonstream NFTs dataset in the given SQLite datastore.
@ -128,3 +213,4 @@ def create_dataset(
events_batch = []
logger.info("Writing remaining events to datastore")
insert_events(datastore_conn, events_batch)