Added "nfts import-data" command

Tested on small databases. Yet to test on real datasets.

First test will be merge of @Yhtiyar's dataset with transfers and my
dataset with mints.
pull/304/head
Neeraj Kashyap 2021-10-02 11:16:16 -07:00
rodzic 3dec9fc5f6
commit 2f3978d416
2 zmienionych plików z 151 dodań i 15 usunięć

Wyświetl plik

@ -3,13 +3,12 @@ import contextlib
import logging
import os
import sqlite3
from typing import Optional, Union
from typing import Optional
from moonstreamdb.db import yield_db_session_ctx
from web3 import Web3, IPCProvider, HTTPProvider
from .data import event_types, nft_event, BlockBounds
from .datastore import setup_database
from .datastore import setup_database, import_data
from .derive import current_owners
from .materialize import create_dataset, EthereumBatchloader
@ -23,6 +22,14 @@ def handle_initdb(args: argparse.Namespace) -> None:
setup_database(conn)
def handle_import_data(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
with contextlib.closing(
sqlite3.connect(args.target)
) as target_conn, contextlib.closing(sqlite3.connect(args.source)) as source_conn:
import_data(target_conn, source_conn, event_type, args.batch_size)
def handle_materialize(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
bounds: Optional[BlockBounds] = None
@ -132,6 +139,33 @@ def main() -> None:
)
parser_derive.set_defaults(func=handle_derive)
parser_import_data = subcommands.add_parser(
"import-data",
description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.",
)
parser_import_data.add_argument(
"--target",
required=True,
help="Datastore into which you want to import data",
)
parser_import_data.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
)
parser_import_data.add_argument(
"--type",
required=True,
choices=event_types,
help="Type of data you would like to import from source to target",
)
parser_import_data.add_argument(
"-N",
"--batch-size",
type=int,
default=10000,
help="Batch size for database commits into target datastore.",
)
parser_import_data.set_defaults(func=handle_import_data)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -4,7 +4,7 @@ a datastore for a Moonstream NFTs dataset.
"""
import logging
import sqlite3
from typing import Any, List, Tuple, Optional
from typing import Any, cast, List, Tuple, Optional, Union
from .data import EventType, NFTEvent, NFTMetadata
@ -15,7 +15,7 @@ logger = logging.getLogger(__name__)
event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"}
CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts
(
(
address TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
name TEXT,
symbol TEXT,
@ -23,6 +23,10 @@ CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts
);
"""
BACKUP_NFTS_TABLE_QUERY = "ALTER TABLE nfts RENAME TO nfts_backup;"
DROP_BACKUP_NFTS_TABLE_QUERY = "DROP TABLE IF EXISTS nfts_backup;"
SELECT_NFTS_QUERY = "SELECT address, name, symbol FROM nfts;"
CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint
(
event_type STRING,
@ -34,7 +38,7 @@ CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint
def create_events_table_query(event_type: EventType) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {event_tables[event_type]}
(
(
event_id TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
transaction_hash TEXT,
block_number INTEGER,
@ -49,6 +53,34 @@ CREATE TABLE IF NOT EXISTS {event_tables[event_type]}
return creation_query
def backup_events_table_query(event_type: EventType) -> str:
backup_query = f"ALTER TABLE {event_tables[event_type]} RENAME TO {event_tables[event_type]}_backup;"
return backup_query
def drop_backup_events_table_query(event_type: EventType) -> str:
drop_query = f"DROP TABLE IF EXISTS {event_tables[event_type]}_backup;"
return drop_query
def select_events_table_query(event_type: EventType) -> str:
selection_query = f"""
SELECT
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
from_address,
to_address,
transaction_value,
timestamp
FROM {event_tables[event_type]};
"""
return selection_query
def setup_database(conn: sqlite3.Connection) -> None:
"""
Sets up the schema of the Moonstream NFTs dataset in the given SQLite database.
@ -117,10 +149,10 @@ def get_checkpoint_offset(
def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int):
query = f"""
INSERT INTO checkpoint (
event_type,
offset
) VALUES (?, ?)
INSERT INTO checkpoint (
event_type,
offset
) VALUES (?, ?)
"""
cur = conn.cursor()
cur.execute(query, [event_type.value, offset])
@ -132,11 +164,11 @@ def insert_address_metadata(
) -> None:
cur = conn.cursor()
query = f"""
INSERT INTO nfts (
address,
name,
symbol
) VALUES (?, ?, ?)
INSERT INTO nfts (
address,
name,
symbol
) VALUES (?, ?, ?)
"""
try:
nfts = [
@ -179,3 +211,73 @@ def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
logger.error(f"FAILED TO SAVE :{events}")
conn.rollback()
raise e
def import_data(
target_conn: sqlite3.Connection,
source_conn: sqlite3.Connection,
event_type: EventType,
batch_size: int = 1000,
) -> None:
"""
Imports the data correspondong to the given event type from the source database into the target
database.
Any existing data of that type in the target database is first deleted. It is a good idea to
create a backup copy of your target database before performing this operation.
"""
target_cur = target_conn.cursor()
drop_backup_query = DROP_BACKUP_NFTS_TABLE_QUERY
backup_table_query = BACKUP_NFTS_TABLE_QUERY
create_table_query = CREATE_NFTS_TABLE_QUERY
source_selection_query = SELECT_NFTS_QUERY
if event_type != EventType.ERC721:
drop_backup_query = drop_backup_events_table_query(event_type)
backup_table_query = backup_events_table_query(event_type)
create_table_query = create_events_table_query(event_type)
source_selection_query = select_events_table_query(event_type)
target_cur.execute(drop_backup_query)
target_cur.execute(backup_table_query)
target_cur.execute(create_table_query)
target_conn.commit()
source_cur = source_conn.cursor()
source_cur.execute(source_selection_query)
batch: List[Any] = []
for row in source_cur:
if event_type == EventType.ERC721:
batch.append(NFTMetadata(*cast(Tuple[str, str, str], row)))
else:
batch.append(
NFTEvent(
*cast(
Tuple[
str,
EventType,
str,
str,
str,
str,
str,
Optional[int],
Optional[int],
Optional[int],
],
row,
)
)
)
if len(batch) == batch_size:
if event_type == EventType.ERC721:
insert_address_metadata(target_conn, cast(List[NFTMetadata], batch))
else:
insert_events(target_conn, cast(List[NFTEvent], batch))
if event_type == EventType.ERC721:
insert_address_metadata(target_conn, cast(List[NFTMetadata], batch))
else:
insert_events(target_conn, cast(List[NFTEvent], batch))