quartile -> quantile

pull/286/head
Neeraj Kashyap 2021-10-07 19:30:41 -07:00
rodzic 23c91845cb
commit 841189319b
2 zmienionych plików z 68 dodań i 45 usunięć

Wyświetl plik

@ -16,7 +16,7 @@ from .derive import (
current_market_values, current_market_values,
current_values_distribution, current_values_distribution,
transfer_statistics_by_address, transfer_statistics_by_address,
quartile_generating, quantile_generating,
mint_holding_times, mint_holding_times,
transfer_holding_times, transfer_holding_times,
transfers_mints_connection_table, transfers_mints_connection_table,
@ -33,7 +33,7 @@ derive_functions = {
"current_market_values": current_market_values, "current_market_values": current_market_values,
"current_values_distribution": current_values_distribution, "current_values_distribution": current_values_distribution,
"transfer_statistics_by_address": transfer_statistics_by_address, "transfer_statistics_by_address": transfer_statistics_by_address,
"quartile_generating": quartile_generating, "quantile_generating": quantile_generating,
"transfers_mints_connection_table": transfers_mints_connection_table, "transfers_mints_connection_table": transfers_mints_connection_table,
"mint_holding_times": mint_holding_times, "mint_holding_times": mint_holding_times,
"transfer_holding_times": transfer_holding_times, "transfer_holding_times": transfer_holding_times,
@ -70,7 +70,9 @@ def handle_filter_data(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(sqlite_path)) as source_conn: with contextlib.closing(sqlite3.connect(sqlite_path)) as source_conn:
print("Start filtering") print("Start filtering")
filter_data( filter_data(
source_conn, start_time=args.start_time, end_time=args.end_time, source_conn,
start_time=args.start_time,
end_time=args.end_time,
) )
print("Filtering end.") print("Filtering end.")
for index, function_name in enumerate(derive_functions.keys()): for index, function_name in enumerate(derive_functions.keys()):
@ -99,7 +101,11 @@ def handle_materialize(args: argparse.Namespace) -> None:
sqlite3.connect(args.datastore) sqlite3.connect(args.datastore)
) as moonstream_datastore: ) as moonstream_datastore:
create_dataset( create_dataset(
moonstream_datastore, db_session, event_type, bounds, args.batch_size, moonstream_datastore,
db_session,
event_type,
bounds,
args.batch_size,
) )
@ -111,11 +117,17 @@ def handle_enrich(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore: with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
enrich( enrich(
moonstream_datastore, EventType.TRANSFER, batch_loader, args.batch_size, moonstream_datastore,
EventType.TRANSFER,
batch_loader,
args.batch_size,
) )
enrich( enrich(
moonstream_datastore, EventType.MINT, batch_loader, args.batch_size, moonstream_datastore,
EventType.MINT,
batch_loader,
args.batch_size,
) )
@ -222,7 +234,9 @@ def main() -> None:
description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.", description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.",
) )
parser_import_data.add_argument( parser_import_data.add_argument(
"--target", required=True, help="Datastore into which you want to import data", "--target",
required=True,
help="Datastore into which you want to import data",
) )
parser_import_data.add_argument( parser_import_data.add_argument(
"--source", required=True, help="Datastore from which you want to import data" "--source", required=True, help="Datastore from which you want to import data"
@ -245,19 +259,28 @@ def main() -> None:
# Create dump of filtered data # Create dump of filtered data
parser_filtered_copy = subcommands.add_parser( parser_filtered_copy = subcommands.add_parser(
"filter-data", description="Create copy of database with applied filters.", "filter-data",
description="Create copy of database with applied filters.",
) )
parser_filtered_copy.add_argument( parser_filtered_copy.add_argument(
"--target", required=True, help="Datastore into which you want to import data", "--target",
required=True,
help="Datastore into which you want to import data",
) )
parser_filtered_copy.add_argument( parser_filtered_copy.add_argument(
"--source", required=True, help="Datastore from which you want to import data" "--source", required=True, help="Datastore from which you want to import data"
) )
parser_filtered_copy.add_argument( parser_filtered_copy.add_argument(
"--start-time", required=False, type=int, help="Start timestamp.", "--start-time",
required=False,
type=int,
help="Start timestamp.",
) )
parser_filtered_copy.add_argument( parser_filtered_copy.add_argument(
"--end-time", required=False, type=int, help="End timestamp.", "--end-time",
required=False,
type=int,
help="End timestamp.",
) )
parser_filtered_copy.set_defaults(func=handle_filter_data) parser_filtered_copy.set_defaults(func=handle_filter_data)

Wyświetl plik

@ -48,24 +48,24 @@ class LastNonzeroValue:
return self.value return self.value
class QuartileFunction: class QuantileFunction:
""" Split vlues to quartiles """ """Split vlues to quantiles"""
def __init__(self, num_quartiles) -> None: def __init__(self, num_quantiles) -> None:
self.divider = 1 / num_quartiles self.divider = 1 / num_quantiles
def __call__(self, value): def __call__(self, value):
if value is None or value == "None": if value is None or value == "None":
value = 0 value = 0
quartile = self.divider quantile = self.divider
try: try:
while value > quartile: while value > quantile:
quartile += self.divider quantile += self.divider
if quartile > 1: if quantile > 1:
quartile = 1 quantile = 1
return quartile return quantile
except Exception as err: except Exception as err:
print(err) print(err)
@ -78,8 +78,8 @@ def ensure_custom_aggregate_functions(conn: sqlite3.Connection) -> None:
""" """
conn.create_aggregate("last_value", 1, LastValue) conn.create_aggregate("last_value", 1, LastValue)
conn.create_aggregate("last_nonzero_value", 1, LastNonzeroValue) conn.create_aggregate("last_nonzero_value", 1, LastNonzeroValue)
conn.create_function("quartile_10", 1, QuartileFunction(10)) conn.create_function("quantile_10", 1, QuantileFunction(10))
conn.create_function("quartile_25", 1, QuartileFunction(25)) conn.create_function("quantile_25", 1, QuantileFunction(25))
def current_owners(conn: sqlite3.Connection) -> None: def current_owners(conn: sqlite3.Connection) -> None:
@ -218,19 +218,19 @@ def transfer_statistics_by_address(conn: sqlite3.Connection) -> None:
logger.error(e) logger.error(e)
def quartile_generating(conn: sqlite3.Connection): def quantile_generating(conn: sqlite3.Connection):
""" """
Create quartile wich depends on setted on class defenition Create quantile wich depends on setted on class defenition
""" """
ensure_custom_aggregate_functions(conn) ensure_custom_aggregate_functions(conn)
drop_calculate_10_quartiles = ( drop_calculate_10_quantiles = (
"DROP TABLE IF EXISTS transfer_values_quartile_10_distribution_per_address;" "DROP TABLE IF EXISTS transfer_values_quantile_10_distribution_per_address;"
) )
calculate_10_quartiles = """ calculate_10_quantiles = """
CREATE TABLE transfer_values_quartile_10_distribution_per_address AS CREATE TABLE transfer_values_quantile_10_distribution_per_address AS
select select
cumulate.address as address, cumulate.address as address,
CAST(quartile_10(cumulate.relative_value) as TEXT) as quartiles, CAST(quantile_10(cumulate.relative_value) as TEXT) as quantiles,
cumulate.relative_value as relative_value cumulate.relative_value as relative_value
from from
( (
@ -254,14 +254,14 @@ def quartile_generating(conn: sqlite3.Connection):
) as cumulate ) as cumulate
""" """
drop_calculate_25_quartiles = ( drop_calculate_25_quantiles = (
"DROP TABLE IF EXISTS transfer_values_quartile_25_distribution_per_address;" "DROP TABLE IF EXISTS transfer_values_quantile_25_distribution_per_address;"
) )
calculate_25_quartiles = """ calculate_25_quantiles = """
CREATE TABLE transfer_values_quartile_25_distribution_per_address AS CREATE TABLE transfer_values_quantile_25_distribution_per_address AS
select select
cumulate.address as address, cumulate.address as address,
CAST(quartile_25(cumulate.relative_value) as TEXT) as quartiles, CAST(quantile_25(cumulate.relative_value) as TEXT) as quantiles,
cumulate.relative_value as relative_value cumulate.relative_value as relative_value
from from
( (
@ -287,16 +287,16 @@ def quartile_generating(conn: sqlite3.Connection):
""" """
cur = conn.cursor() cur = conn.cursor()
try: try:
print("Creating transfer_values_quartile_10_distribution_per_address") print("Creating transfer_values_quantile_10_distribution_per_address")
cur.execute(drop_calculate_10_quartiles) cur.execute(drop_calculate_10_quantiles)
cur.execute(calculate_10_quartiles) cur.execute(calculate_10_quantiles)
print("Creating transfer_values_quartile_25_distribution_per_address") print("Creating transfer_values_quantile_25_distribution_per_address")
cur.execute(drop_calculate_25_quartiles) cur.execute(drop_calculate_25_quantiles)
cur.execute(calculate_25_quartiles) cur.execute(calculate_25_quantiles)
conn.commit() conn.commit()
except Exception as e: except Exception as e:
conn.rollback() conn.rollback()
logger.error("Could not create derived dataset: quartile_generating") logger.error("Could not create derived dataset: quantile_generating")
logger.error(e) logger.error(e)