diff --git a/datasets/nfts/nfts/data.py b/datasets/nfts/nfts/data.py index da70e5cb..7358dcd4 100644 --- a/datasets/nfts/nfts/data.py +++ b/datasets/nfts/nfts/data.py @@ -31,6 +31,7 @@ def nft_event(raw_event: str) -> EventType: @dataclass class NFTEvent: + event_id: str event_type: EventType nft_address: str token_id: str diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index acd41964..558b54e8 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -15,7 +15,7 @@ logger = logging.getLogger(__name__) event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"} CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts - ( + ( address TEXT NOT NULL UNIQUE ON CONFLICT FAIL, name TEXT, symbol TEXT, @@ -26,8 +26,7 @@ CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint ( event_type STRING, - offset INTEGER, - transaction_hash STRING + offset INTEGER ); """ @@ -35,7 +34,8 @@ CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint def create_events_table_query(event_type: EventType) -> str: creation_query = f""" CREATE TABLE IF NOT EXISTS {event_tables[event_type]} - ( + ( + event_id TEXT NOT NULL UNIQUE ON CONFLICT FAIL, transaction_hash TEXT, block_number INTEGER, nft_address TEXT REFERENCES nfts(address), @@ -68,7 +68,8 @@ def insert_events_query(event_type: EventType) -> str: Generates a query which inserts NFT events into the appropriate events table. """ query = f""" -INSERT INTO {event_tables[event_type]}( +INSERT OR IGNORE INTO {event_tables[event_type]}( + event_id, transaction_hash, block_number, nft_address, @@ -77,19 +78,20 @@ INSERT INTO {event_tables[event_type]}( to_address, transaction_value, timestamp -) VALUES (?, ?, ?, ?, ?, ?, ?, ?) +) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """ return query def nft_event_to_tuple( event: NFTEvent, -) -> Tuple[str, str, str, str, str, str, str, str]: +) -> Tuple[str, str, str, str, str, str, str, str, str]: """ Converts an NFT event into a tuple for use with sqlite cursor executemany. This includes dropping e.g. the event_type field. """ return ( + str(event.event_id), str(event.transaction_hash), str(event.block_number), str(event.nft_address), @@ -113,18 +115,15 @@ def get_checkpoint_offset( return None -def insert_checkpoint( - conn: sqlite3.Connection, event_type: EventType, offset: int, transaction_hash: str -): +def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int): query = f""" INSERT INTO checkpoint ( event_type, - offset, - transaction_hash - ) VALUES (?, ?, ?) + offset + ) VALUES (?, ?) """ cur = conn.cursor() - cur.execute(query, [event_type.value, offset, transaction_hash]) + cur.execute(query, [event_type.value, offset]) conn.commit() diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index 1464b128..29479959 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -152,15 +152,28 @@ def enrich_from_web3( return admissible_events -def get_events( +def add_events( + datastore_conn: sqlite3.Connection, db_session: Session, event_type: EventType, + batch_loader: EthereumBatchloader, + initial_offset=0, bounds: Optional[BlockBounds] = None, - initial_offset: int = 0, - batch_size: int = 1000, -) -> Iterator[NFTEvent]: + batch_size: int = 10, +) -> None: + raw_created_at_list = ( + db_session.query(EthereumLabel.created_at) + .filter(EthereumLabel.label == event_type.value) + .order_by(EthereumLabel.created_at.asc()) + .distinct(EthereumLabel.created_at) + ).all() + + created_at_list = [ + created_at[0] for created_at in raw_created_at_list[initial_offset:] + ] query = ( db_session.query( + EthereumLabel.id, EthereumLabel.label, EthereumAddress.address, EthereumLabel.label_data, @@ -181,8 +194,6 @@ def get_events( ) .order_by( EthereumLabel.created_at.asc(), - EthereumLabel.transaction_hash.asc(), - EthereumLabel.address_id.asc(), ) ) if bounds is not None: @@ -192,13 +203,23 @@ def get_events( bounds_filters = [EthereumTransaction.hash == None, and_(*time_filters)] query = query.filter(or_(*bounds_filters)) - offset = initial_offset - while True: - events = query.offset(offset).limit(batch_size).all() + + pbar = tqdm(total=(len(raw_created_at_list))) + pbar.set_description(f"Processing created ats") + pbar.update(initial_offset) + batch_start = 0 + batch_end = batch_start + batch_size + while batch_start <= len(created_at_list): + + events = query.filter( + EthereumLabel.created_at.in_(created_at_list[batch_start : batch_end + 1]) + ).all() if not events: - break - offset += batch_size + continue + + raw_events_batch = [] for ( + event_id, label, address, label_data, @@ -208,6 +229,7 @@ def get_events( timestamp, ) in events: raw_event = NFTEvent( + event_id=event_id, event_type=event_types[label], nft_address=address, token_id=label_data["tokenId"], @@ -218,7 +240,16 @@ def get_events( block_number=block_number, timestamp=timestamp, ) - yield raw_event + raw_events_batch.append(raw_event) + + logger.info(f"Adding {len(raw_events_batch)} to database") + insert_events( + datastore_conn, enrich_from_web3(raw_events_batch, batch_loader, bounds) + ) + insert_checkpoint(datastore_conn, event_type, batch_end + initial_offset) + pbar.update(batch_end - batch_start + 1) + batch_start = batch_end + 1 + batch_end = min(batch_end + batch_size, len(created_at_list)) def create_dataset( @@ -227,7 +258,7 @@ def create_dataset( event_type: EventType, batch_loader: EthereumBatchloader, bounds: Optional[BlockBounds] = None, - batch_size: int = 1000, + batch_size: int = 10, ) -> None: """ Creates Moonstream NFTs dataset in the given SQLite datastore. @@ -241,39 +272,16 @@ def create_dataset( if event_type == EventType.ERC721: add_contracts_metadata(datastore_conn, db_session, offset, batch_size) - return - raw_events = get_events(db_session, event_type, bounds, offset, batch_size) - raw_events_batch: List[NFTEvent] = [] - - for event in tqdm(raw_events, desc="Events processed", colour="#DD6E0F"): - raw_events_batch.append(event) - if len(raw_events_batch) == batch_size: - logger.info("Writing batch of events to datastore") - - insert_events( - datastore_conn, - enrich_from_web3(raw_events_batch, batch_loader, bounds), - ) - offset += batch_size - - insert_checkpoint( - datastore_conn, - event_type, - offset, - event.transaction_hash, - ) - raw_events_batch = [] - logger.info("Writing remaining events to datastore") - insert_events( - datastore_conn, enrich_from_web3(raw_events_batch, batch_loader, bounds) - ) - offset += len(raw_events_batch) - insert_checkpoint( - datastore_conn, - event_type, - offset, - raw_events_batch[-1].transaction_hash, - ) + else: + add_events( + datastore_conn, + db_session, + event_type, + batch_loader, + offset, + bounds, + batch_size, + ) def add_contracts_metadata( @@ -307,7 +315,7 @@ def add_contracts_metadata( ) ) insert_address_metadata(datastore_conn, events_batch) - insert_checkpoint(datastore_conn, EventType.ERC721, offset, "ERC721 checkpoint") + insert_checkpoint(datastore_conn, EventType.ERC721, offset) logger.info(f"Already added {offset}") logger.info(f"Added total of {offset-initial_offset} nfts metadata")