diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index ff9466fc..b0d15be9 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -42,8 +42,7 @@ CREATE TABLE {event_tables[event_type]} from_address TEXT, to_address TEXT, transaction_value INTEGER, - timestamp INTEGER, - UNIQUE (transaction_hash, nft_address, from_address, to_address, token_id) + timestamp INTEGER ); """ return creation_query diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 408c0fd7..c1ecdff4 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -9,13 +9,13 @@ from moonstreamdb.models import ( EthereumTransaction, EthereumBlock, ) -from sqlalchemy import or_ +from sqlalchemy import or_, and_ from sqlalchemy.orm import Session from tqdm import tqdm from web3 import Web3 import requests -from .data import BlockBounds, EventType, NFTEvent, event_types, nft_event +from .data import BlockBounds, EventType, NFTEvent, event_types from .datastore import get_checkpoint_offset, insert_checkpoint, insert_events logging.basicConfig(level=logging.INFO) @@ -102,6 +102,7 @@ def enrich_from_web3( if len(transactions_to_query) == 0: return nft_events + logger.info("Calling jrpc") jrpc_response = batch_loader.load_transactions(list(transactions_to_query)) breakpoint() transactions_map = { @@ -158,17 +159,18 @@ def get_events( EthereumTransaction.block_number == EthereumBlock.block_number, ) .filter(EthereumLabel.label == event_type.value) - .order_by(EthereumLabel.created_at) + .order_by( + EthereumLabel.created_at.asc(), + EthereumLabel.transaction_hash.asc(), + EthereumLabel.address_id.asc(), + ) ) if bounds is not None: - bounds_filters = [ - EthereumTransaction.hash == None, - EthereumTransaction.block_number >= bounds.starting_block, - ] + time_filters = [EthereumTransaction.block_number >= bounds.starting_block] if bounds.ending_block is not None: - bounds_filters.append( - EthereumTransaction.block_number <= bounds.ending_block - ) + time_filters.append(EthereumTransaction.block_number <= bounds.ending_block) + bounds_filters = [EthereumTransaction.hash == None, and_(*time_filters)] + query = query.filter(or_(*bounds_filters)) offset = initial_offset while True: @@ -218,7 +220,7 @@ def create_dataset( offset = 0 logger.info(f"Did not found any checkpoint for {event_type.value}") - raw_events = get_events(db_session, event_type, bounds, offset, batch_size) + raw_events = get_events(db_session, event_type, None, offset, batch_size) raw_events_batch: List[NFTEvent] = [] for event in tqdm(raw_events, desc="Events processed", colour="#DD6E0F"): @@ -226,14 +228,27 @@ def create_dataset( if len(raw_events_batch) == batch_size: logger.info("Writing batch of events to datastore") - insert_events(datastore_conn, raw_events_batch) + insert_events( + datastore_conn, + enrich_from_web3(web3_client, raw_events_batch, batch_loader), + ) offset += batch_size insert_checkpoint( - datastore_conn, event_type, offset, event.transaction_hash + datastore_conn, + event_type, + offset, + event.transaction_hash, ) raw_events_batch = [] logger.info("Writing remaining events to datastore") insert_events( datastore_conn, enrich_from_web3(web3_client, raw_events_batch, batch_loader) ) + offset += len(raw_events_batch) + insert_checkpoint( + datastore_conn, + event_type, + offset, + raw_events_batch[-1].transaction_hash, + )