diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index 32a2d61c..fbb58590 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -3,13 +3,12 @@ import contextlib import logging import os import sqlite3 -from typing import Optional, Union +from typing import Optional from moonstreamdb.db import yield_db_session_ctx -from web3 import Web3, IPCProvider, HTTPProvider from .data import event_types, nft_event, BlockBounds -from .datastore import setup_database +from .datastore import setup_database, import_data from .derive import current_owners from .materialize import create_dataset, EthereumBatchloader @@ -23,6 +22,14 @@ def handle_initdb(args: argparse.Namespace) -> None: setup_database(conn) +def handle_import_data(args: argparse.Namespace) -> None: + event_type = nft_event(args.type) + with contextlib.closing( + sqlite3.connect(args.target) + ) as target_conn, contextlib.closing(sqlite3.connect(args.source)) as source_conn: + import_data(target_conn, source_conn, event_type, args.batch_size) + + def handle_materialize(args: argparse.Namespace) -> None: event_type = nft_event(args.type) bounds: Optional[BlockBounds] = None @@ -132,6 +139,33 @@ def main() -> None: ) parser_derive.set_defaults(func=handle_derive) + parser_import_data = subcommands.add_parser( + "import-data", + description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.", + ) + parser_import_data.add_argument( + "--target", + required=True, + help="Datastore into which you want to import data", + ) + parser_import_data.add_argument( + "--source", required=True, help="Datastore from which you want to import data" + ) + parser_import_data.add_argument( + "--type", + required=True, + choices=event_types, + help="Type of data you would like to import from source to target", + ) + parser_import_data.add_argument( + "-N", + "--batch-size", + type=int, + default=10000, + help="Batch size for database commits into target datastore.", + ) + parser_import_data.set_defaults(func=handle_import_data) + args = parser.parse_args() args.func(args) diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index 558b54e8..acbf9658 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -4,7 +4,7 @@ a datastore for a Moonstream NFTs dataset. """ import logging import sqlite3 -from typing import Any, List, Tuple, Optional +from typing import Any, cast, List, Tuple, Optional, Union from .data import EventType, NFTEvent, NFTMetadata @@ -15,7 +15,7 @@ logger = logging.getLogger(__name__) event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"} CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts - ( + ( address TEXT NOT NULL UNIQUE ON CONFLICT FAIL, name TEXT, symbol TEXT, @@ -23,6 +23,10 @@ CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts ); """ +BACKUP_NFTS_TABLE_QUERY = "ALTER TABLE nfts RENAME TO nfts_backup;" +DROP_BACKUP_NFTS_TABLE_QUERY = "DROP TABLE IF EXISTS nfts_backup;" +SELECT_NFTS_QUERY = "SELECT address, name, symbol FROM nfts;" + CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint ( event_type STRING, @@ -34,7 +38,7 @@ CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint def create_events_table_query(event_type: EventType) -> str: creation_query = f""" CREATE TABLE IF NOT EXISTS {event_tables[event_type]} - ( + ( event_id TEXT NOT NULL UNIQUE ON CONFLICT FAIL, transaction_hash TEXT, block_number INTEGER, @@ -49,6 +53,34 @@ CREATE TABLE IF NOT EXISTS {event_tables[event_type]} return creation_query +def backup_events_table_query(event_type: EventType) -> str: + backup_query = f"ALTER TABLE {event_tables[event_type]} RENAME TO {event_tables[event_type]}_backup;" + return backup_query + + +def drop_backup_events_table_query(event_type: EventType) -> str: + drop_query = f"DROP TABLE IF EXISTS {event_tables[event_type]}_backup;" + return drop_query + + +def select_events_table_query(event_type: EventType) -> str: + selection_query = f""" +SELECT + event_id, + transaction_hash, + block_number, + nft_address, + token_id, + from_address, + to_address, + transaction_value, + timestamp +FROM {event_tables[event_type]}; + """ + + return selection_query + + def setup_database(conn: sqlite3.Connection) -> None: """ Sets up the schema of the Moonstream NFTs dataset in the given SQLite database. @@ -117,10 +149,10 @@ def get_checkpoint_offset( def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int): query = f""" - INSERT INTO checkpoint ( - event_type, - offset - ) VALUES (?, ?) +INSERT INTO checkpoint ( + event_type, + offset +) VALUES (?, ?) """ cur = conn.cursor() cur.execute(query, [event_type.value, offset]) @@ -132,11 +164,11 @@ def insert_address_metadata( ) -> None: cur = conn.cursor() query = f""" - INSERT INTO nfts ( - address, - name, - symbol - ) VALUES (?, ?, ?) +INSERT INTO nfts ( + address, + name, + symbol +) VALUES (?, ?, ?) """ try: nfts = [ @@ -179,3 +211,73 @@ def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None: logger.error(f"FAILED TO SAVE :{events}") conn.rollback() raise e + + +def import_data( + target_conn: sqlite3.Connection, + source_conn: sqlite3.Connection, + event_type: EventType, + batch_size: int = 1000, +) -> None: + """ + Imports the data correspondong to the given event type from the source database into the target + database. + + Any existing data of that type in the target database is first deleted. It is a good idea to + create a backup copy of your target database before performing this operation. + """ + target_cur = target_conn.cursor() + drop_backup_query = DROP_BACKUP_NFTS_TABLE_QUERY + backup_table_query = BACKUP_NFTS_TABLE_QUERY + create_table_query = CREATE_NFTS_TABLE_QUERY + source_selection_query = SELECT_NFTS_QUERY + if event_type != EventType.ERC721: + drop_backup_query = drop_backup_events_table_query(event_type) + backup_table_query = backup_events_table_query(event_type) + create_table_query = create_events_table_query(event_type) + source_selection_query = select_events_table_query(event_type) + + target_cur.execute(drop_backup_query) + target_cur.execute(backup_table_query) + target_cur.execute(create_table_query) + target_conn.commit() + + source_cur = source_conn.cursor() + source_cur.execute(source_selection_query) + + batch: List[Any] = [] + + for row in source_cur: + if event_type == EventType.ERC721: + batch.append(NFTMetadata(*cast(Tuple[str, str, str], row))) + else: + batch.append( + NFTEvent( + *cast( + Tuple[ + str, + EventType, + str, + str, + str, + str, + str, + Optional[int], + Optional[int], + Optional[int], + ], + row, + ) + ) + ) + + if len(batch) == batch_size: + if event_type == EventType.ERC721: + insert_address_metadata(target_conn, cast(List[NFTMetadata], batch)) + else: + insert_events(target_conn, cast(List[NFTEvent], batch)) + + if event_type == EventType.ERC721: + insert_address_metadata(target_conn, cast(List[NFTMetadata], batch)) + else: + insert_events(target_conn, cast(List[NFTEvent], batch))