diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index 32a2d61c..f8fc0b14 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -10,7 +10,7 @@ from web3 import Web3, IPCProvider, HTTPProvider from .data import event_types, nft_event, BlockBounds from .datastore import setup_database -from .derive import current_owners +from .derive import current_owners, current_values_distribution from .materialize import create_dataset, EthereumBatchloader @@ -18,6 +18,12 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +derive_functions = { + "current_owners": current_owners, + "current_values_distribution": current_values_distribution, +} + + def handle_initdb(args: argparse.Namespace) -> None: with contextlib.closing(sqlite3.connect(args.datastore)) as conn: setup_database(conn) @@ -51,7 +57,15 @@ def handle_materialize(args: argparse.Namespace) -> None: def handle_derive(args: argparse.Namespace) -> None: with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore: - results = current_owners(moonstream_datastore) + calling_functions = [] + if not args.derive_functions: + calling_functions.extend(derive_functions.keys()) + else: + calling_functions.extend(args.derive_functions) + + for function_name in calling_functions: + if function_name in calling_functions: + derive_functions[function_name](moonstream_datastore) logger.info("Done!") @@ -130,6 +144,13 @@ def main() -> None: required=True, help="Path to SQLite database representing the dataset", ) + parser_derive.add_argument( + "-f", + "--derive_functions", + required=False, + nargs="+", + help=f"Functions wich will call from derive module availabel {list(derive_functions.keys())}", + ) parser_derive.set_defaults(func=handle_derive) args = parser.parse_args() diff --git a/datasets/nfts/nfts/derive.py b/datasets/nfts/nfts/derive.py index dc13c7c4..ee0632d7 100644 --- a/datasets/nfts/nfts/derive.py +++ b/datasets/nfts/nfts/derive.py @@ -57,3 +57,26 @@ def current_owners(conn: sqlite3.Connection) -> List[Tuple]: conn.rollback() logger.error("Could not create derived dataset: current_owners") logger.error(e) + + +def current_values_distribution(conn: sqlite3.Connection) -> List[Tuple]: + """ + Requires a connection to a dataset in which the raw data (esp. transfers) has already been + loaded. + """ + ensure_custom_aggregate_functions(conn) + drop_existing_values_distribution_query = ( + "DROP TABLE IF EXISTS market_values_distribution;" + ) + current_values_distribution_query = """ + CREATE TABLE market_values_distribution AS + select nft_address as address, market_value as value, CUME_DIST() over (PARTITION BY nft_address ORDER BY market_value) as cumulate_value from current_market_values;""" + cur = conn.cursor() + try: + cur.execute(drop_existing_values_distribution_query) + cur.execute(current_values_distribution_query) + conn.commit() + except Exception as e: + conn.rollback() + logger.error("Could not create derived dataset: current_owners") + logger.error(e)