pull/304/head
yhtiyar 2021-09-29 15:41:50 +03:00
rodzic 405aa3871e
commit fe4fa6c6fd
4 zmienionych plików z 20 dodań i 22 usunięć

Wyświetl plik

@ -61,9 +61,9 @@ def handle_materialize(args: argparse.Namespace) -> None:
db_session,
args.web3,
event_type,
batch_loader,
bounds,
args.batch_size,
batch_loader
)

Wyświetl plik

@ -80,7 +80,9 @@ INSERT INTO {event_tables[event_type]}(
return query
def nft_event_to_tuple(event: NFTEvent) -> Tuple[Any]:
def nft_event_to_tuple(
event: NFTEvent,
) -> Tuple[str, str, str, str, str, str, str, str]:
"""
Converts an NFT event into a tuple for use with sqlite cursor executemany. This includes
dropping e.g. the event_type field.
@ -121,6 +123,7 @@ def insert_checkpoint(
"""
cur = conn.cursor()
cur.execute(query, [event_type.value, offset, transaction_hash])
conn.commit()
def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
@ -136,19 +139,13 @@ def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
for event in events
if event.event_type == EventType.TRANSFER
]
cur.executemany(insert_events_query(EventType.TRANSFER), transfers)
mints = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.MINT
]
# transfers = []
# mints = []
# for event in events:
# if event.event_type == EventType.TRANSFER:
# transfers.append(nft_event_to_tuple(event))
# elif event.event_type == EventType.MINT:
# mints.append(nft_event_to_tuple(event))
cur.executemany(insert_events_query(EventType.TRANSFER), transfers)
cur.executemany(insert_events_query(EventType.MINT), mints)

Wyświetl plik

@ -1,6 +1,6 @@
import logging
import sqlite3
from typing import Any, Iterator, List, Optional, Set
from typing import Any, cast, Iterator, List, Optional, Set
import json
from moonstreamdb.models import (
@ -82,7 +82,9 @@ class EthereumBatchloader:
def enrich_from_web3(
web3_client: Web3, nft_events: List[NFTEvent], batch_loader: EthereumBatchloader
web3_client: Web3,
nft_events: List[NFTEvent],
batch_loader: EthereumBatchloader,
) -> List[NFTEvent]:
"""
Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db)
@ -101,6 +103,7 @@ def enrich_from_web3(
if len(transactions_to_query) == 0:
return nft_events
jrpc_response = batch_loader.load_transactions(list(transactions_to_query))
breakpoint()
transactions_map = {
result["result"]["hash"]: (
int(result["result"]["value"], 16),
@ -114,7 +117,7 @@ def enrich_from_web3(
nft_events[index].value, nft_events[index].block_number = transactions_map[
nft_events[index].transaction_hash
]
blocks_to_query.add(nft_events[index].block_number)
blocks_to_query.add(cast(int, nft_events[index].block_number))
if len(blocks_to_query) == 0:
return nft_events
@ -124,7 +127,7 @@ def enrich_from_web3(
for result in jrpc_response
}
for index in indices_to_update:
nft_events[index].timestamp = blocks_map[nft_event.block_number]
nft_events[index].timestamp = blocks_map[cast(int, nft_event.block_number)]
return nft_events
@ -132,7 +135,7 @@ def get_events(
db_session: Session,
event_type: EventType,
bounds: Optional[BlockBounds] = None,
offset: int = 0,
initial_offset: int = 0,
batch_size: int = 1000,
) -> Iterator[NFTEvent]:
query = (
@ -167,7 +170,7 @@ def get_events(
EthereumTransaction.block_number <= bounds.ending_block
)
query = query.filter(or_(*bounds_filters))
offset = initial_offset
while True:
events = query.offset(offset).limit(batch_size).all()
if not events:
@ -201,9 +204,9 @@ def create_dataset(
db_session: Session,
web3_client: Web3,
event_type: EventType,
batch_loader: EthereumBatchloader,
bounds: Optional[BlockBounds] = None,
batch_size: int = 1000,
batch_loader: EthereumBatchloader = None,
) -> None:
"""
Creates Moonstream NFTs dataset in the given SQLite datastore.
@ -223,10 +226,7 @@ def create_dataset(
if len(raw_events_batch) == batch_size:
logger.info("Writing batch of events to datastore")
insert_events(
datastore_conn,
raw_events,
)
insert_events(datastore_conn, raw_events_batch)
offset += batch_size
insert_checkpoint(

Wyświetl plik

@ -35,9 +35,10 @@ setup(
"humbug",
"tqdm",
"web3",
"requests",
],
extras_require={
"dev": ["black", "mypy"],
"dev": ["black", "mypy", "types-requests"],
"distribute": ["setuptools", "twine", "wheel"],
},
entry_points={"console_scripts": []},