diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index 21b704bb..32a2d61c 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -10,6 +10,7 @@ from web3 import Web3, IPCProvider, HTTPProvider from .data import event_types, nft_event, BlockBounds from .datastore import setup_database +from .derive import current_owners from .materialize import create_dataset, EthereumBatchloader @@ -17,24 +18,6 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -def web3_connection(web3_uri: Optional[str] = None) -> Web3: - """ - Connect to the given web3 provider. You may specify a web3 provider either as a path to an IPC - socket on your filesystem or as an HTTP(S) URI to a JSON RPC provider. - - If web3_uri is not provided or is set to None, this function attempts to use the default behavior - of the web3.py IPCProvider (one of the steps is looking for .ethereum/geth.ipc, but there may be others). - """ - web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider() - if web3_uri is not None: - if web3_uri.startswith("http://") or web3_uri.startswith("https://"): - web3_provider = Web3.HTTPProvider(web3_uri) - else: - web3_provider = Web3.IPCProvider(web3_uri) - web3_client = Web3(web3_provider) - return web3_client - - def handle_initdb(args: argparse.Namespace) -> None: with contextlib.closing(sqlite3.connect(args.datastore)) as conn: setup_database(conn) @@ -48,7 +31,7 @@ def handle_materialize(args: argparse.Namespace) -> None: elif args.end is not None: raise ValueError("You cannot set --end unless you also set --start") - batch_loader = EthereumBatchloader(jrpc_url=args.jrpc) + batch_loader = EthereumBatchloader(jsonrpc_url=args.jsonrpc) logger.info(f"Materializing NFT events to datastore: {args.datastore}") logger.info(f"Block bounds: {bounds}") @@ -59,7 +42,6 @@ def handle_materialize(args: argparse.Namespace) -> None: create_dataset( moonstream_datastore, db_session, - args.web3, event_type, batch_loader, bounds, @@ -67,6 +49,12 @@ def handle_materialize(args: argparse.Namespace) -> None: ) +def handle_derive(args: argparse.Namespace) -> None: + with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore: + results = current_owners(moonstream_datastore) + logger.info("Done!") + + def main() -> None: """ "nfts" command handler. @@ -76,6 +64,12 @@ def main() -> None: # Command: nfts """ default_web3_provider = os.environ.get("MOONSTREAM_WEB3_PROVIDER") + if default_web3_provider is not None and not default_web3_provider.startswith( + "http" + ): + raise ValueError( + f"Please either unset MOONSTREAM_WEB3_PROVIDER environment variable or set it to an HTTP/HTTPS URL. Current value: {default_web3_provider}" + ) parser = argparse.ArgumentParser( description="Tools to work with the Moonstream NFTs dataset" @@ -101,13 +95,7 @@ def main() -> None: help="Path to SQLite database representing the dataset", ) parser_materialize.add_argument( - "--web3", - default=default_web3_provider, - type=web3_connection, - help=f"Web3 provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})", - ) - parser_materialize.add_argument( - "--jrpc", + "--jsonrpc", default=default_web3_provider, type=str, help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})", @@ -133,6 +121,17 @@ def main() -> None: ) parser_materialize.set_defaults(func=handle_materialize) + parser_derive = subcommands.add_parser( + "derive", description="Create/updated derived data in the dataset" + ) + parser_derive.add_argument( + "-d", + "--datastore", + required=True, + help="Path to SQLite database representing the dataset", + ) + parser_derive.set_defaults(func=handle_derive) + args = parser.parse_args() args.func(args) diff --git a/datasets/nfts/nfts/datastore.py b/datasets/nfts/nfts/datastore.py index b0d15be9..9dc96b4a 100644 --- a/datasets/nfts/nfts/datastore.py +++ b/datasets/nfts/nfts/datastore.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"} -CREATE_NFTS_TABLE_QUERY = """CREATE TABLE nfts +CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts ( address TEXT NOT NULL UNIQUE ON CONFLICT FAIL, name TEXT, @@ -22,8 +22,8 @@ CREATE_NFTS_TABLE_QUERY = """CREATE TABLE nfts ); """ -CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE checkpoint - ( +CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint + ( event_type STRING, offset INTEGER, transaction_hash STRING @@ -33,7 +33,7 @@ CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE checkpoint def create_events_table_query(event_type: EventType) -> str: creation_query = f""" -CREATE TABLE {event_tables[event_type]} +CREATE TABLE IF NOT EXISTS {event_tables[event_type]} ( transaction_hash TEXT, block_number INTEGER, @@ -53,10 +53,12 @@ def setup_database(conn: sqlite3.Connection) -> None: Sets up the schema of the Moonstream NFTs dataset in the given SQLite database. """ cur = conn.cursor() + cur.execute(CREATE_NFTS_TABLE_QUERY) cur.execute(create_events_table_query(EventType.TRANSFER)) cur.execute(create_events_table_query(EventType.MINT)) cur.execute(CREATE_CHECKPOINT_TABLE_QUERY) + conn.commit() diff --git a/datasets/nfts/nfts/derive.py b/datasets/nfts/nfts/derive.py new file mode 100644 index 00000000..dc13c7c4 --- /dev/null +++ b/datasets/nfts/nfts/derive.py @@ -0,0 +1,59 @@ +""" +Tools to build derived relations from raw data (nfts, transfers, mints relations). + +For example: +- Current owner of each token +- Current value of each token +""" +import logging +from typing import List, Tuple +import sqlite3 + + +logging.basicConfig(level=logging.ERROR) +logger = logging.getLogger(__name__) + + +class LastValue: + """ + Stores the last seen value in a given column. This is meant to be used as an aggregate function. + We use it, for example, to get the current owner of an NFT (inside a given window of time). + """ + + def __init__(self): + self.value = None + + def step(self, value): + self.value = value + + def finalize(self): + return self.value + + +def ensure_custom_aggregate_functions(conn: sqlite3.Connection) -> None: + """ + Loads custom aggregate functions to an active SQLite3 connection. + """ + conn.create_aggregate("last_value", 1, LastValue) + + +def current_owners(conn: sqlite3.Connection) -> List[Tuple]: + """ + Requires a connection to a dataset in which the raw data (esp. transfers) has already been + loaded. + """ + ensure_custom_aggregate_functions(conn) + drop_existing_current_owners_query = "DROP TABLE IF EXISTS current_owners;" + current_owners_query = """ + CREATE TABLE current_owners AS + SELECT nft_address, token_id, CAST(last_value(to_address) AS TEXT) AS owner FROM transfers + GROUP BY nft_address, token_id;""" + cur = conn.cursor() + try: + cur.execute(drop_existing_current_owners_query) + cur.execute(current_owners_query) + conn.commit() + except Exception as e: + conn.rollback() + logger.error("Could not create derived dataset: current_owners") + logger.error(e) diff --git a/datasets/nfts/nfts/materialize.py b/datasets/nfts/nfts/materialize.py index c1ecdff4..664f4ce0 100644 --- a/datasets/nfts/nfts/materialize.py +++ b/datasets/nfts/nfts/materialize.py @@ -1,3 +1,4 @@ +from dataclasses import is_dataclass import logging import sqlite3 from typing import Any, cast, Iterator, List, Optional, Set @@ -23,9 +24,8 @@ logger = logging.getLogger(__name__) class EthereumBatchloader: - def __init__(self, jrpc_url) -> None: - - self.jrpc_url = jrpc_url + def __init__(self, jsonrpc_url) -> None: + self.jsonrpc_url = jsonrpc_url self.message_number = 0 self.commands: List[Any] = [] self.requests_banch: List[Any] = [] @@ -69,7 +69,9 @@ class EthereumBatchloader: headers = {"Content-Type": "application/json"} try: - r = requests.post(self.jrpc_url, headers=headers, data=payload, timeout=300) + r = requests.post( + self.jsonrpc_url, headers=headers, data=payload, timeout=300 + ) except Exception as e: print(e) return r @@ -82,9 +84,9 @@ class EthereumBatchloader: def enrich_from_web3( - web3_client: Web3, nft_events: List[NFTEvent], batch_loader: EthereumBatchloader, + bounds: Optional[BlockBounds] = None, ) -> List[NFTEvent]: """ Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db) @@ -102,15 +104,16 @@ def enrich_from_web3( if len(transactions_to_query) == 0: return nft_events - logger.info("Calling jrpc") - jrpc_response = batch_loader.load_transactions(list(transactions_to_query)) - breakpoint() + logger.info("Calling JSON RPC API") + jsonrpc_transactions_response = batch_loader.load_transactions( + list(transactions_to_query) + ) transactions_map = { result["result"]["hash"]: ( int(result["result"]["value"], 16), int(result["result"]["blockNumber"], 16), ) - for result in jrpc_response + for result in jsonrpc_transactions_response } blocks_to_query: Set[int] = set() @@ -122,14 +125,26 @@ def enrich_from_web3( if len(blocks_to_query) == 0: return nft_events - jrpc_response = batch_loader.load_blocks(list(blocks_to_query), False) + jsonrpc_blocks_response = batch_loader.load_blocks(list(blocks_to_query), False) blocks_map = { int(result["result"]["number"], 16): int(result["result"]["timestamp"], 16) - for result in jrpc_response + for result in jsonrpc_blocks_response } for index in indices_to_update: nft_events[index].timestamp = blocks_map[cast(int, nft_event.block_number)] - return nft_events + + def check_bounds(event: NFTEvent) -> bool: + if bounds is None: + return True + is_admissible = True + if event.block_number < bounds.starting_block: + is_admissible = False + if bounds.ending_block is not None and event.block_number > bounds.ending_block: + is_admissible = False + return is_admissible + + admissible_events = [event for event in nft_events if check_bounds(event)] + return admissible_events def get_events( @@ -204,7 +219,6 @@ def get_events( def create_dataset( datastore_conn: sqlite3.Connection, db_session: Session, - web3_client: Web3, event_type: EventType, batch_loader: EthereumBatchloader, bounds: Optional[BlockBounds] = None, @@ -220,7 +234,7 @@ def create_dataset( offset = 0 logger.info(f"Did not found any checkpoint for {event_type.value}") - raw_events = get_events(db_session, event_type, None, offset, batch_size) + raw_events = get_events(db_session, event_type, bounds, offset, batch_size) raw_events_batch: List[NFTEvent] = [] for event in tqdm(raw_events, desc="Events processed", colour="#DD6E0F"): @@ -230,7 +244,7 @@ def create_dataset( insert_events( datastore_conn, - enrich_from_web3(web3_client, raw_events_batch, batch_loader), + enrich_from_web3(raw_events_batch, batch_loader, bounds), ) offset += batch_size @@ -243,7 +257,7 @@ def create_dataset( raw_events_batch = [] logger.info("Writing remaining events to datastore") insert_events( - datastore_conn, enrich_from_web3(web3_client, raw_events_batch, batch_loader) + datastore_conn, enrich_from_web3(raw_events_batch, batch_loader, bounds) ) offset += len(raw_events_batch) insert_checkpoint(