From fe4fa6c6fde55f87d978c11a1cbbe611fb24eb43 Mon Sep 17 00:00:00 2001 From: yhtiyar Date: Wed, 29 Sep 2021 15:41:50 +0300 Subject: [PATCH] temporary fix --- datasets/nfts/nfts/cli.py | 2 +- datasets/nfts/nfts/datastore.py | 15 ++++++--------- datasets/nfts/nfts/materialize.py | 22 +++++++++++----------- datasets/nfts/setup.py | 3 ++- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index 86f4527e..21b704bb 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -61,9 +61,9 @@ def handle_materialize(args: argparse.Namespace) -> None: db_session, args.web3, event_type, + batch_loader, bounds, args.batch_size, - batch_loader ) diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index 2f0552b9..ff9466fc 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -80,7 +80,9 @@ INSERT INTO {event_tables[event_type]}( return query -def nft_event_to_tuple(event: NFTEvent) -> Tuple[Any]: +def nft_event_to_tuple( + event: NFTEvent, +) -> Tuple[str, str, str, str, str, str, str, str]: """ Converts an NFT event into a tuple for use with sqlite cursor executemany. This includes dropping e.g. the event_type field. @@ -121,6 +123,7 @@ def insert_checkpoint( """ cur = conn.cursor() cur.execute(query, [event_type.value, offset, transaction_hash]) + conn.commit() def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None: @@ -136,19 +139,13 @@ def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None: for event in events if event.event_type == EventType.TRANSFER ] - cur.executemany(insert_events_query(EventType.TRANSFER), transfers) + mints = [ nft_event_to_tuple(event) for event in events if event.event_type == EventType.MINT ] - # transfers = [] - # mints = [] - # for event in events: - # if event.event_type == EventType.TRANSFER: - # transfers.append(nft_event_to_tuple(event)) - # elif event.event_type == EventType.MINT: - # mints.append(nft_event_to_tuple(event)) + cur.executemany(insert_events_query(EventType.TRANSFER), transfers) cur.executemany(insert_events_query(EventType.MINT), mints) diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 107c2dcd..408c0fd7 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -1,6 +1,6 @@ import logging import sqlite3 -from typing import Any, Iterator, List, Optional, Set +from typing import Any, cast, Iterator, List, Optional, Set import json from moonstreamdb.models import ( @@ -82,7 +82,9 @@ class EthereumBatchloader: def enrich_from_web3( - web3_client: Web3, nft_events: List[NFTEvent], batch_loader: EthereumBatchloader + web3_client: Web3, + nft_events: List[NFTEvent], + batch_loader: EthereumBatchloader, ) -> List[NFTEvent]: """ Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db) @@ -101,6 +103,7 @@ def enrich_from_web3( if len(transactions_to_query) == 0: return nft_events jrpc_response = batch_loader.load_transactions(list(transactions_to_query)) + breakpoint() transactions_map = { result["result"]["hash"]: ( int(result["result"]["value"], 16), @@ -114,7 +117,7 @@ def enrich_from_web3( nft_events[index].value, nft_events[index].block_number = transactions_map[ nft_events[index].transaction_hash ] - blocks_to_query.add(nft_events[index].block_number) + blocks_to_query.add(cast(int, nft_events[index].block_number)) if len(blocks_to_query) == 0: return nft_events @@ -124,7 +127,7 @@ def enrich_from_web3( for result in jrpc_response } for index in indices_to_update: - nft_events[index].timestamp = blocks_map[nft_event.block_number] + nft_events[index].timestamp = blocks_map[cast(int, nft_event.block_number)] return nft_events @@ -132,7 +135,7 @@ def get_events( db_session: Session, event_type: EventType, bounds: Optional[BlockBounds] = None, - offset: int = 0, + initial_offset: int = 0, batch_size: int = 1000, ) -> Iterator[NFTEvent]: query = ( @@ -167,7 +170,7 @@ def get_events( EthereumTransaction.block_number <= bounds.ending_block ) query = query.filter(or_(*bounds_filters)) - + offset = initial_offset while True: events = query.offset(offset).limit(batch_size).all() if not events: @@ -201,9 +204,9 @@ def create_dataset( db_session: Session, web3_client: Web3, event_type: EventType, + batch_loader: EthereumBatchloader, bounds: Optional[BlockBounds] = None, batch_size: int = 1000, - batch_loader: EthereumBatchloader = None, ) -> None: """ Creates Moonstream NFTs dataset in the given SQLite datastore. @@ -223,10 +226,7 @@ def create_dataset( if len(raw_events_batch) == batch_size: logger.info("Writing batch of events to datastore") - insert_events( - datastore_conn, - raw_events, - ) + insert_events(datastore_conn, raw_events_batch) offset += batch_size insert_checkpoint( diff --git a/datasets/nfts/setup.py b/datasets/nfts/setup.py index 6cf9ef9f..23c5542a 100644 --- a/datasets/nfts/setup.py +++ b/datasets/nfts/setup.py @@ -35,9 +35,10 @@ setup( "humbug", "tqdm", "web3", + "requests", ], extras_require={ - "dev": ["black", "mypy"], + "dev": ["black", "mypy", "types-requests"], "distribute": ["setuptools", "twine", "wheel"], }, entry_points={"console_scripts": []},