From 841189319b1aa9f24c0eea99174f7889aad37a13 Mon Sep 17 00:00:00 2001 From: Neeraj Kashyap Date: Thu, 7 Oct 2021 19:30:41 -0700 Subject: [PATCH] quartile -> quantile --- datasets/nfts/nfts/cli.py | 45 ++++++++++++++++++------ datasets/nfts/nfts/derive.py | 68 ++++++++++++++++++------------------ 2 files changed, 68 insertions(+), 45 deletions(-) diff --git a/datasets/nfts/nfts/cli.py b/datasets/nfts/nfts/cli.py index fc60dcf2..63fe9be9 100644 --- a/datasets/nfts/nfts/cli.py +++ b/datasets/nfts/nfts/cli.py @@ -16,7 +16,7 @@ from .derive import ( current_market_values, current_values_distribution, transfer_statistics_by_address, - quartile_generating, + quantile_generating, mint_holding_times, transfer_holding_times, transfers_mints_connection_table, @@ -33,7 +33,7 @@ derive_functions = { "current_market_values": current_market_values, "current_values_distribution": current_values_distribution, "transfer_statistics_by_address": transfer_statistics_by_address, - "quartile_generating": quartile_generating, + "quantile_generating": quantile_generating, "transfers_mints_connection_table": transfers_mints_connection_table, "mint_holding_times": mint_holding_times, "transfer_holding_times": transfer_holding_times, @@ -70,7 +70,9 @@ def handle_filter_data(args: argparse.Namespace) -> None: with contextlib.closing(sqlite3.connect(sqlite_path)) as source_conn: print("Start filtering") filter_data( - source_conn, start_time=args.start_time, end_time=args.end_time, + source_conn, + start_time=args.start_time, + end_time=args.end_time, ) print("Filtering end.") for index, function_name in enumerate(derive_functions.keys()): @@ -99,7 +101,11 @@ def handle_materialize(args: argparse.Namespace) -> None: sqlite3.connect(args.datastore) ) as moonstream_datastore: create_dataset( - moonstream_datastore, db_session, event_type, bounds, args.batch_size, + moonstream_datastore, + db_session, + event_type, + bounds, + args.batch_size, ) @@ -111,11 +117,17 @@ def handle_enrich(args: argparse.Namespace) -> None: with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore: enrich( - moonstream_datastore, EventType.TRANSFER, batch_loader, args.batch_size, + moonstream_datastore, + EventType.TRANSFER, + batch_loader, + args.batch_size, ) enrich( - moonstream_datastore, EventType.MINT, batch_loader, args.batch_size, + moonstream_datastore, + EventType.MINT, + batch_loader, + args.batch_size, ) @@ -222,7 +234,9 @@ def main() -> None: description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.", ) parser_import_data.add_argument( - "--target", required=True, help="Datastore into which you want to import data", + "--target", + required=True, + help="Datastore into which you want to import data", ) parser_import_data.add_argument( "--source", required=True, help="Datastore from which you want to import data" @@ -245,19 +259,28 @@ def main() -> None: # Create dump of filtered data parser_filtered_copy = subcommands.add_parser( - "filter-data", description="Create copy of database with applied filters.", + "filter-data", + description="Create copy of database with applied filters.", ) parser_filtered_copy.add_argument( - "--target", required=True, help="Datastore into which you want to import data", + "--target", + required=True, + help="Datastore into which you want to import data", ) parser_filtered_copy.add_argument( "--source", required=True, help="Datastore from which you want to import data" ) parser_filtered_copy.add_argument( - "--start-time", required=False, type=int, help="Start timestamp.", + "--start-time", + required=False, + type=int, + help="Start timestamp.", ) parser_filtered_copy.add_argument( - "--end-time", required=False, type=int, help="End timestamp.", + "--end-time", + required=False, + type=int, + help="End timestamp.", ) parser_filtered_copy.set_defaults(func=handle_filter_data) diff --git a/datasets/nfts/nfts/derive.py b/datasets/nfts/nfts/derive.py index 8a09d6fb..4c1cf3ea 100644 --- a/datasets/nfts/nfts/derive.py +++ b/datasets/nfts/nfts/derive.py @@ -48,24 +48,24 @@ class LastNonzeroValue: return self.value -class QuartileFunction: - """ Split vlues to quartiles """ +class QuantileFunction: + """Split vlues to quantiles""" - def __init__(self, num_quartiles) -> None: - self.divider = 1 / num_quartiles + def __init__(self, num_quantiles) -> None: + self.divider = 1 / num_quantiles def __call__(self, value): if value is None or value == "None": value = 0 - quartile = self.divider + quantile = self.divider try: - while value > quartile: - quartile += self.divider + while value > quantile: + quantile += self.divider - if quartile > 1: - quartile = 1 + if quantile > 1: + quantile = 1 - return quartile + return quantile except Exception as err: print(err) @@ -78,8 +78,8 @@ def ensure_custom_aggregate_functions(conn: sqlite3.Connection) -> None: """ conn.create_aggregate("last_value", 1, LastValue) conn.create_aggregate("last_nonzero_value", 1, LastNonzeroValue) - conn.create_function("quartile_10", 1, QuartileFunction(10)) - conn.create_function("quartile_25", 1, QuartileFunction(25)) + conn.create_function("quantile_10", 1, QuantileFunction(10)) + conn.create_function("quantile_25", 1, QuantileFunction(25)) def current_owners(conn: sqlite3.Connection) -> None: @@ -218,19 +218,19 @@ def transfer_statistics_by_address(conn: sqlite3.Connection) -> None: logger.error(e) -def quartile_generating(conn: sqlite3.Connection): +def quantile_generating(conn: sqlite3.Connection): """ - Create quartile wich depends on setted on class defenition + Create quantile wich depends on setted on class defenition """ ensure_custom_aggregate_functions(conn) - drop_calculate_10_quartiles = ( - "DROP TABLE IF EXISTS transfer_values_quartile_10_distribution_per_address;" + drop_calculate_10_quantiles = ( + "DROP TABLE IF EXISTS transfer_values_quantile_10_distribution_per_address;" ) - calculate_10_quartiles = """ - CREATE TABLE transfer_values_quartile_10_distribution_per_address AS + calculate_10_quantiles = """ + CREATE TABLE transfer_values_quantile_10_distribution_per_address AS select cumulate.address as address, - CAST(quartile_10(cumulate.relative_value) as TEXT) as quartiles, + CAST(quantile_10(cumulate.relative_value) as TEXT) as quantiles, cumulate.relative_value as relative_value from ( @@ -252,16 +252,16 @@ def quartile_generating(conn: sqlite3.Connection): current_market_values.nft_address ) as max_values on current_market_values.nft_address = max_values.nft_address ) as cumulate - + """ - drop_calculate_25_quartiles = ( - "DROP TABLE IF EXISTS transfer_values_quartile_25_distribution_per_address;" + drop_calculate_25_quantiles = ( + "DROP TABLE IF EXISTS transfer_values_quantile_25_distribution_per_address;" ) - calculate_25_quartiles = """ - CREATE TABLE transfer_values_quartile_25_distribution_per_address AS + calculate_25_quantiles = """ + CREATE TABLE transfer_values_quantile_25_distribution_per_address AS select cumulate.address as address, - CAST(quartile_25(cumulate.relative_value) as TEXT) as quartiles, + CAST(quantile_25(cumulate.relative_value) as TEXT) as quantiles, cumulate.relative_value as relative_value from ( @@ -283,20 +283,20 @@ def quartile_generating(conn: sqlite3.Connection): current_market_values.nft_address ) as max_values on current_market_values.nft_address = max_values.nft_address ) as cumulate - + """ cur = conn.cursor() try: - print("Creating transfer_values_quartile_10_distribution_per_address") - cur.execute(drop_calculate_10_quartiles) - cur.execute(calculate_10_quartiles) - print("Creating transfer_values_quartile_25_distribution_per_address") - cur.execute(drop_calculate_25_quartiles) - cur.execute(calculate_25_quartiles) + print("Creating transfer_values_quantile_10_distribution_per_address") + cur.execute(drop_calculate_10_quantiles) + cur.execute(calculate_10_quantiles) + print("Creating transfer_values_quantile_25_distribution_per_address") + cur.execute(drop_calculate_25_quantiles) + cur.execute(calculate_25_quantiles) conn.commit() except Exception as e: conn.rollback() - logger.error("Could not create derived dataset: quartile_generating") + logger.error("Could not create derived dataset: quantile_generating") logger.error(e) @@ -307,7 +307,7 @@ def transfers_mints_connection_table(conn: sqlite3.Connection): drop_transfers_mints_connection = "DROP TABLE IF EXISTS transfers_mints;" transfers_mints_connection = """ - CREATE TABLE transfers_mints as + CREATE TABLE transfers_mints as select transfers.event_id as transfer_id, mints.mint_id as mint_id