From f8fb803fd4e5208835fead57b6ad948dcc9351c2 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Sat, 2 Oct 2021 12:22:24 -0700 Subject: [PATCH] Handle checkpoints correctly when importing data Also added "nfts" command as a console script in setup.py --- datasets/nfts/nfts/datastore.py | 81 +++++++++++++++++++++++++-------- datasets/nfts/setup.py | 6 ++- 2 files changed, 66 insertions(+), 21 deletions(-) diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index acbf9658..066090f4 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -4,7 +4,9 @@ a datastore for a Moonstream NFTs dataset. """ import logging import sqlite3 -from typing import Any, cast, List, Tuple, Optional, Union +from typing import Any, cast, List, Tuple, Optional + +from tqdm import tqdm from .data import EventType, NFTEvent, NFTMetadata @@ -147,6 +149,19 @@ def get_checkpoint_offset( return None +def delete_checkpoints( + conn: sqlite3.Connection, event_type: EventType, commit: bool = True +) -> None: + cur = conn.cursor() + cur.execute(f"DELETE FROM checkpoint where event_type='{event_type.value}';") + if commit: + try: + conn.commit() + except: + conn.rollback() + raise + + def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int): query = f""" INSERT INTO checkpoint ( @@ -247,29 +262,47 @@ def import_data( batch: List[Any] = [] - for row in source_cur: + for row in tqdm(source_cur, desc="Rows processed"): if event_type == EventType.ERC721: batch.append(NFTMetadata(*cast(Tuple[str, str, str], row))) else: - batch.append( - NFTEvent( - *cast( - Tuple[ - str, - EventType, - str, - str, - str, - str, - str, - Optional[int], - Optional[int], - Optional[int], - ], - row, - ) - ) + ( + event_id, + nft_address, + token_id, + from_address, + to_address, + transaction_hash, + value, + block_number, + timestamp, + ) = cast( + Tuple[ + str, + str, + str, + str, + str, + str, + Optional[int], + Optional[int], + Optional[int], + ], + row, ) + event = NFTEvent( + event_id, + event_type, # Original argument to this function + nft_address, + token_id, + from_address, + to_address, + transaction_hash, + value, + block_number, + timestamp, + ) + batch.append(event) if len(batch) == batch_size: if event_type == EventType.ERC721: @@ -281,3 +314,11 @@ def import_data( insert_address_metadata(target_conn, cast(List[NFTMetadata], batch)) else: insert_events(target_conn, cast(List[NFTEvent], batch)) + + target_cur.execute(CREATE_CHECKPOINT_TABLE_QUERY) + target_conn.commit() + + source_offset = get_checkpoint_offset(source_conn, event_type) + if source_offset is not None: + delete_checkpoints(target_conn, event_type, commit=False) + insert_checkpoint(target_conn, event_type, source_offset) diff --git a/datasets/nfts/setup.py b/datasets/nfts/setup.py index 23c5542a..d0b5a435 100644 --- a/datasets/nfts/setup.py +++ b/datasets/nfts/setup.py @@ -41,5 +41,9 @@ setup( "dev": ["black", "mypy", "types-requests"], "distribute": ["setuptools", "twine", "wheel"], }, - entry_points={"console_scripts": []}, + entry_points={ + "console_scripts": [ + "nfts=nfts.cli:main", + ] + }, )