From 6794cbf74b92bca4f9e51c3b71fbced782fc70e9 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Mon, 4 Oct 2021 08:26:33 -0700 Subject: [PATCH] Fixed import data column order This was a nasty bug. --- datasets/nfts/nfts/cli.py | 5 +-- datasets/nfts/nfts/datastore.py | 25 ++++++++------- datasets/nfts/nfts/derive.py | 57 +++++++++++++++++++++++++++++++-- 3 files changed, 71 insertions(+), 16 deletions(-) diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index fbb58590..b20d44f0 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -9,7 +9,7 @@ from moonstreamdb.db import yield_db_session_ctx from .data import event_types, nft_event, BlockBounds from .datastore import setup_database, import_data -from .derive import current_owners +from .derive import current_owners, current_market_values from .materialize import create_dataset, EthereumBatchloader @@ -58,7 +58,8 @@ def handle_materialize(args: argparse.Namespace) -> None: def handle_derive(args: argparse.Namespace) -> None: with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore: - results = current_owners(moonstream_datastore) + current_owners(moonstream_datastore) + current_market_values(moonstream_datastore) logger.info("Done!") diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index 066090f4..a9896294 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -70,12 +70,12 @@ def select_events_table_query(event_type: EventType) -> str: SELECT event_id, transaction_hash, - block_number, nft_address, token_id, from_address, to_address, transaction_value, + block_number, timestamp FROM {event_tables[event_type]}; """ @@ -266,13 +266,14 @@ def import_data( if event_type == EventType.ERC721: batch.append(NFTMetadata(*cast(Tuple[str, str, str], row))) else: + # Order matches select query returned by select_events_table_query ( event_id, + transaction_hash, nft_address, token_id, from_address, to_address, - transaction_hash, value, block_number, timestamp, @@ -291,16 +292,16 @@ def import_data( row, ) event = NFTEvent( - event_id, - event_type, # Original argument to this function - nft_address, - token_id, - from_address, - to_address, - transaction_hash, - value, - block_number, - timestamp, + event_id=event_id, + event_type=event_type, # Original argument to this function + nft_address=nft_address, + token_id=token_id, + from_address=from_address, + to_address=to_address, + transaction_hash=transaction_hash, + value=value, + block_number=block_number, + timestamp=timestamp, ) batch.append(event) diff --git a/datasets/nfts/nfts/derive.py b/datasets/nfts/nfts/derive.py index dc13c7c4..53f50637 100644 --- a/datasets/nfts/nfts/derive.py +++ b/datasets/nfts/nfts/derive.py @@ -30,14 +30,33 @@ class LastValue: return self.value +class LastNonzeroValue: + """ + Stores the last non-zero value in a given column. This is meant to be used as an aggregate + function. We use it, for example, to get the current market value of an NFT (inside a given + window of time). + """ + + def __init__(self): + self.value = 0 + + def step(self, value): + if value != 0: + self.value = value + + def finalize(self): + return self.value + + def ensure_custom_aggregate_functions(conn: sqlite3.Connection) -> None: """ Loads custom aggregate functions to an active SQLite3 connection. """ conn.create_aggregate("last_value", 1, LastValue) + conn.create_aggregate("last_nonzero_value", 1, LastNonzeroValue) -def current_owners(conn: sqlite3.Connection) -> List[Tuple]: +def current_owners(conn: sqlite3.Connection) -> None: """ Requires a connection to a dataset in which the raw data (esp. transfers) has already been loaded. @@ -46,7 +65,12 @@ def current_owners(conn: sqlite3.Connection) -> List[Tuple]: drop_existing_current_owners_query = "DROP TABLE IF EXISTS current_owners;" current_owners_query = """ CREATE TABLE current_owners AS - SELECT nft_address, token_id, CAST(last_value(to_address) AS TEXT) AS owner FROM transfers + SELECT nft_address, token_id, last_value(to_address) AS owner FROM + ( + SELECT * FROM mints + UNION ALL + SELECT * FROM transfers + ) GROUP BY nft_address, token_id;""" cur = conn.cursor() try: @@ -57,3 +81,32 @@ def current_owners(conn: sqlite3.Connection) -> List[Tuple]: conn.rollback() logger.error("Could not create derived dataset: current_owners") logger.error(e) + + +def current_market_values(conn: sqlite3.Connection) -> None: + """ + Requires a connection to a dataset in which the raw data (esp. transfers) has already been + loaded. + """ + ensure_custom_aggregate_functions(conn) + drop_existing_current_market_values_query = ( + "DROP TABLE IF EXISTS current_market_values;" + ) + current_market_values_query = """ + CREATE TABLE current_market_values AS + SELECT nft_address, token_id, last_nonzero_value(transaction_value) AS market_value FROM + ( + SELECT * FROM mints + UNION ALL + SELECT * FROM transfers + ) + GROUP BY nft_address, token_id;""" + cur = conn.cursor() + try: + cur.execute(drop_existing_current_market_values_query) + cur.execute(current_market_values_query) + conn.commit() + except Exception as e: + conn.rollback() + logger.error("Could not create derived dataset: current_market_values") + logger.error(e)