Values distributions
Transfers days holding
First transfers after mints days distributions.
Add fixes.
Remove type filter now is unrequired.
pull/304/head
Andrey Dolgolev 2021-10-06 22:36:44 +03:00
rodzic 9605f4ef52
commit 5ffa9c4f27
4 zmienionych plików z 222 dodań i 67 usunięć

Wyświetl plik

@ -11,7 +11,13 @@ from moonstreamdb.db import yield_db_session_ctx
from .enrich import EthereumBatchloader, enrich
from .data import EventType, event_types, nft_event, BlockBounds
from .datastore import setup_database, import_data, filter_data
from .derive import current_owners, current_market_values, current_values_distribution
from .derive import (
current_owners,
current_market_values,
current_values_distribution,
transfer_statistics_by_address,
qurtile_generating,
)
from .materialize import create_dataset
@ -24,6 +30,7 @@ derive_functions = {
"current_market_values": current_market_values,
"current_values_distribution": current_values_distribution,
"transfer_statistics_by_address": transfer_statistics_by_address,
"qurtile_generating": qurtile_generating,
}
@ -40,39 +47,28 @@ def handle_import_data(args: argparse.Namespace) -> None:
import_data(target_conn, source_conn, event_type, args.batch_size)
def handel_generate_filtered(args: argparse.Namespace) -> None:
def handle_filter_data(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.source)) as source_conn:
if not args.target:
# generate name if not set
path_list = args.source.split("/")
file = path_list.pop()
old_prefix, ext = file.split(".")
new_prefix = old_prefix
if args.start_time:
new_prefix += f"-{args.start_time}"
if args.end_time:
new_prefix += f"-{args.end_time}"
if args.type:
new_prefix += f"-{args.type}"
name = f"{new_prefix}.{ext}"
if args.target == args.source:
sqlite_path = f"{args.target}.dump"
else:
name = f"{args.target}"
sqlite_path = args.target
if name == args.source:
name = f"{name}.dump"
path_list.append(name)
print(f"Creating new database:{name}")
new_db_path = "/".join(path_list)
copyfile(args.source, new_db_path)
print(f"Creating new database:{sqlite_path}")
copyfile(args.source, sqlite_path)
# do connection
with contextlib.closing(sqlite3.connect(new_db_path)) as source_conn:
with contextlib.closing(sqlite3.connect(sqlite_path)) as source_conn:
print("Start filtering")
filter_data(source_conn, args)
filter_data(
source_conn,
start_time=args.start_time,
end_time=args.end_time,
type=args.type,
)
print("Filtering end.")
for index, function_name in enumerate(derive_functions.keys()):
print(
@ -243,22 +239,17 @@ def main() -> None:
)
parser_import_data.set_defaults(func=handle_import_data)
# crete dump for apply filters
# Create dump of filtered data
parser_filtered_copy = subcommands.add_parser(
"copy", description="Create copy of database with applied filters.",
"filter-data", description="Create copy of database with applied filters.",
)
parser_filtered_copy.add_argument(
"--target", help="Datastore into which you want to import data",
"--target", required=True, help="Datastore into which you want to import data",
)
parser_filtered_copy.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
)
parser_filtered_copy.add_argument(
"--type",
choices=event_types,
help="Type of data you would like to import from source to target",
)
parser_filtered_copy.add_argument(
"--start-time", required=False, type=int, help="Start timestamp.",
)
@ -266,7 +257,7 @@ def main() -> None:
"--end-time", required=False, type=int, help="End timestamp.",
)
parser_filtered_copy.set_defaults(func=handel_generate_filtered)
parser_filtered_copy.set_defaults(func=handle_filter_data)
parser_enrich = subcommands.add_parser(
"enrich", description="enrich dataset from geth node"

Wyświetl plik

@ -435,35 +435,30 @@ def import_data(
insert_checkpoint(target_conn, event_type, source_offset)
def filter_data(sqlite_db: sqlite3.Connection, cli_args):
def filter_data(
sqlite_db: sqlite3.Connection,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
):
"""
Run Deletes query depends on filters
"""
cur = sqlite_db.cursor()
print(f"Remove by timestamp < {cli_args.start_time}")
if cli_args.start_time:
cur.execute(f"DELETE from transfers where timestamp < {cli_args.start_time}")
print(f"filtered out: {cur.rowcount}")
print(f"Remove by timestamp <= {start_time}")
if start_time:
cur.execute(f"DELETE from transfers where timestamp <= {start_time}")
print(f"Transfers filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp < {cli_args.start_time}")
print(f"filtered out: {cur.rowcount}")
cur.execute(f"DELETE from mints where timestamp <= {start_time}")
print(f"Mints filtered out: {cur.rowcount}")
sqlite_db.commit()
print(f"Remove by timestamp > {cli_args.end_time}")
if cli_args.end_time:
cur.execute(f"DELETE from transfers where timestamp > {cli_args.end_time}")
print(f"filtered out: {cur.rowcount}")
print(f"Remove by timestamp >= {end_time}")
if end_time:
cur.execute(f"DELETE from transfers where timestamp >= {end_time}")
print(f"Transfers filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp > {cli_args.end_time}")
print(f"filtered out: {cur.rowcount}")
cur.execute(f"DELETE from mints where timestamp >= {end_time}")
print(f"Mints filtered out: {cur.rowcount}")
sqlite_db.commit()
# print(f"Remove by type != '{cli_args.type}")
# if cli_args.type:
# cur.execute(f"DELETE from transfers where type != '{cli_args.type}'")
# print(f"filtered out: {cur.rowcount}")
# sqlite_db.commit()
# cur.execute(f"DELETE from mints where type != '{cli_args.type}'")
# print(f"filtered out: {cur.rowcount}")
# sqlite_db.commit()

Wyświetl plik

@ -48,12 +48,38 @@ class LastNonzeroValue:
return self.value
class QuartileFunction:
""" Split vlues to quartiles """
def __init__(self, num_qurtiles) -> None:
self.divider = 1 / num_qurtiles
def __call__(self, value):
if value is None or value == "None":
value = 0
quartile = self.divider
try:
while value > quartile:
quartile += self.divider
if quartile > 1:
qurtile = 1
return qurtile
except Exception as err:
print(err)
raise
def ensure_custom_aggregate_functions(conn: sqlite3.Connection) -> None:
"""
Loads custom aggregate functions to an active SQLite3 connection.
"""
conn.create_aggregate("last_value", 1, LastValue)
conn.create_aggregate("last_nonzero_value", 1, LastNonzeroValue)
conn.create_function("quartile_10", 1, QuartileFunction(10))
conn.create_function("quartile_25", 1, QuartileFunction(25))
def current_owners(conn: sqlite3.Connection) -> None:
@ -111,7 +137,7 @@ def current_market_values(conn: sqlite3.Connection) -> None:
logger.error("Could not create derived dataset: current_market_values")
def current_values_distribution(conn: sqlite3.Connection) -> List[Tuple]:
def current_values_distribution(conn: sqlite3.Connection) -> None:
"""
Requires a connection to a dataset in which current_market_values has already been loaded.
"""
@ -190,3 +216,153 @@ def transfer_statistics_by_address(conn: sqlite3.Connection) -> None:
conn.rollback()
logger.error("Could not create derived dataset: current_values_distribution")
logger.error(e)
def qurtile_generating(conn: sqlite3.Connection):
"""
Create qurtile wich depends on setted on class defenition
"""
ensure_custom_aggregate_functions(conn)
drop_calculate_qurtiles = (
"DROP TABLE IF EXISTS transfer_values_quartile_10_distribution_per_address;"
)
calculate_qurtiles = """
CREATE TABLE transfer_values_quartile_10_distribution_per_address AS
select qurtiled_sum.address as address,
SUM(qurtiled_sum.sum_of_qurtile) over (PARTITION BY qurtiled_sum.address order by qurtiled_sum.qurtiles ) as cululative_total,
qurtiled_sum.qurtiles as qurtiles
from (
select
qurtiled.address,
count(qurtiled.relative_value)/count_value.count_value as sum_of_qurtile,
qurtiled.qurtiles as qurtiles
from
(
select
cumulate.address as address,
quartile_10(cumulate.relative_value) as qurtiles,
cumulate.relative_value as relative_value
from
(
select
current_market_values.nft_address as address,
COALESCE(
CAST(current_market_values.market_value as REAL) / max_values.max_value,
0
) as relative_value
from
current_market_values
inner join (
select
current_market_values.nft_address,
max(market_value) as max_value
from
current_market_values
group by
current_market_values.nft_address
) as max_values on current_market_values.nft_address = max_values.nft_address
) as cumulate
) as qurtiled
inner join (
select
current_market_values.nft_address,
count(market_value) as count_value
from
current_market_values
group by
current_market_values.nft_address
) as count_value on qurtiled.address = count_value.nft_address
) as qurtiled_sum;
"""
cur = conn.cursor()
try:
cur.execute(drop_calculate_qurtiles)
cur.execute(calculate_qurtiles)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: current_values_distribution")
logger.error(e)
def mint_holding_times(conn: sqlite3.Connection):
drop_mints_holding_table = "DROP TABLE IF EXISTS mint_holding_times;"
mints_holding_table = """
CREATE TABLE mint_holding_times AS
SELECT days_after_minted.days as days, count(*) as num_holds from (
SELECT
mints.nft_address,
mints.token_id,
(
firsts_transfers.firts_transfer - mints.timestamp
) / 86400 as days
from
mints
inner join (
select
nft_address,
token_id,
min(timestamp) as firts_transfer
from
transfers
group by
nft_address,
token_id
) as firsts_transfers on firsts_transfers.nft_address = mints.nft_address
and firsts_transfers.token_id = mints.token_id ) as days_after_minted
group by days;
"""
cur = conn.cursor()
try:
cur.execute(drop_mints_holding_table)
cur.execute(mints_holding_table)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: current_values_distribution")
logger.error(e)
def transfer_holding_times(conn: sqlite3.Connection):
"""
Create distributions of holding times beetween transfers
"""
drop_transfer_holding_times = "DROP TABLE IF EXISTS transfer_holding_times;"
transfer_holding_times = """
CREATE TABLE transfer_holding_times AS
select days_beetween.days as days, count(*) as num_holds
from (SELECT
middle.address,
middle.token_id,
(middle.LEAD - middle.timestamp) / 86400 as days
from
(
SELECT
nft_address AS address,
token_id as token_id,
timestamp as timestamp,
LEAD(timestamp, 1, Null) OVER (
PARTITION BY nft_address,
token_id
ORDER BY
timestamp
) as LEAD
FROM
transfers
) as middle
where
LEAD is not Null
) as days_beetween
group by days;
"""
cur = conn.cursor()
try:
cur.execute(drop_transfer_holding_times)
cur.execute(transfer_holding_times)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: current_values_distribution")
logger.error(e)

Wyświetl plik

@ -66,9 +66,7 @@ def add_events(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.order_by(
EthereumLabel.created_at.asc(),
)
.order_by(EthereumLabel.created_at.asc(),)
)
if bounds is not None:
time_filters = [EthereumTransaction.block_number >= bounds.starting_block]
@ -147,12 +145,7 @@ def create_dataset(
add_contracts_metadata(datastore_conn, db_session, offset, batch_size)
else:
add_events(
datastore_conn,
db_session,
event_type,
offset,
bounds,
batch_size,
datastore_conn, db_session, event_type, offset, bounds, batch_size,
)