Fixed import data column order

This was a nasty bug.
pull/304/head
Neeraj Kashyap 2021-10-04 08:26:33 -07:00
rodzic f8fb803fd4
commit 6794cbf74b
3 zmienionych plików z 71 dodań i 16 usunięć

Wyświetl plik

@ -9,7 +9,7 @@ from moonstreamdb.db import yield_db_session_ctx
from .data import event_types, nft_event, BlockBounds
from .datastore import setup_database, import_data
from .derive import current_owners
from .derive import current_owners, current_market_values
from .materialize import create_dataset, EthereumBatchloader
@ -58,7 +58,8 @@ def handle_materialize(args: argparse.Namespace) -> None:
def handle_derive(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
results = current_owners(moonstream_datastore)
current_owners(moonstream_datastore)
current_market_values(moonstream_datastore)
logger.info("Done!")

Wyświetl plik

@ -70,12 +70,12 @@ def select_events_table_query(event_type: EventType) -> str:
SELECT
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
from_address,
to_address,
transaction_value,
block_number,
timestamp
FROM {event_tables[event_type]};
"""
@ -266,13 +266,14 @@ def import_data(
if event_type == EventType.ERC721:
batch.append(NFTMetadata(*cast(Tuple[str, str, str], row)))
else:
# Order matches select query returned by select_events_table_query
(
event_id,
transaction_hash,
nft_address,
token_id,
from_address,
to_address,
transaction_hash,
value,
block_number,
timestamp,
@ -291,16 +292,16 @@ def import_data(
row,
)
event = NFTEvent(
event_id,
event_type, # Original argument to this function
nft_address,
token_id,
from_address,
to_address,
transaction_hash,
value,
block_number,
timestamp,
event_id=event_id,
event_type=event_type, # Original argument to this function
nft_address=nft_address,
token_id=token_id,
from_address=from_address,
to_address=to_address,
transaction_hash=transaction_hash,
value=value,
block_number=block_number,
timestamp=timestamp,
)
batch.append(event)

Wyświetl plik

@ -30,14 +30,33 @@ class LastValue:
return self.value
class LastNonzeroValue:
"""
Stores the last non-zero value in a given column. This is meant to be used as an aggregate
function. We use it, for example, to get the current market value of an NFT (inside a given
window of time).
"""
def __init__(self):
self.value = 0
def step(self, value):
if value != 0:
self.value = value
def finalize(self):
return self.value
def ensure_custom_aggregate_functions(conn: sqlite3.Connection) -> None:
"""
Loads custom aggregate functions to an active SQLite3 connection.
"""
conn.create_aggregate("last_value", 1, LastValue)
conn.create_aggregate("last_nonzero_value", 1, LastNonzeroValue)
def current_owners(conn: sqlite3.Connection) -> List[Tuple]:
def current_owners(conn: sqlite3.Connection) -> None:
"""
Requires a connection to a dataset in which the raw data (esp. transfers) has already been
loaded.
@ -46,7 +65,12 @@ def current_owners(conn: sqlite3.Connection) -> List[Tuple]:
drop_existing_current_owners_query = "DROP TABLE IF EXISTS current_owners;"
current_owners_query = """
CREATE TABLE current_owners AS
SELECT nft_address, token_id, CAST(last_value(to_address) AS TEXT) AS owner FROM transfers
SELECT nft_address, token_id, last_value(to_address) AS owner FROM
(
SELECT * FROM mints
UNION ALL
SELECT * FROM transfers
)
GROUP BY nft_address, token_id;"""
cur = conn.cursor()
try:
@ -57,3 +81,32 @@ def current_owners(conn: sqlite3.Connection) -> List[Tuple]:
conn.rollback()
logger.error("Could not create derived dataset: current_owners")
logger.error(e)
def current_market_values(conn: sqlite3.Connection) -> None:
"""
Requires a connection to a dataset in which the raw data (esp. transfers) has already been
loaded.
"""
ensure_custom_aggregate_functions(conn)
drop_existing_current_market_values_query = (
"DROP TABLE IF EXISTS current_market_values;"
)
current_market_values_query = """
CREATE TABLE current_market_values AS
SELECT nft_address, token_id, last_nonzero_value(transaction_value) AS market_value FROM
(
SELECT * FROM mints
UNION ALL
SELECT * FROM transfers
)
GROUP BY nft_address, token_id;"""
cur = conn.cursor()
try:
cur.execute(drop_existing_current_market_values_query)
cur.execute(current_market_values_query)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: current_market_values")
logger.error(e)