Merge pull request #304 from bugout-dev/add-copy-method-for-dataset

Add ability copy sqlite3 database with filter and add dirive.
pull/286/head
Neeraj Kashyap 2021-10-07 19:26:52 -07:00 zatwierdzone przez GitHub
commit 23c91845cb
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
4 zmienionych plików z 431 dodań i 30 usunięć

Wyświetl plik

@ -3,14 +3,24 @@ import contextlib
import logging
import os
import sqlite3
from shutil import copyfile
from typing import Optional
from moonstreamdb.db import yield_db_session_ctx
from .enrich import EthereumBatchloader, enrich
from .data import EventType, event_types, nft_event, BlockBounds
from .datastore import setup_database, import_data
from .derive import current_owners, current_market_values, current_values_distribution
from .datastore import setup_database, import_data, filter_data
from .derive import (
current_owners,
current_market_values,
current_values_distribution,
transfer_statistics_by_address,
quartile_generating,
mint_holding_times,
transfer_holding_times,
transfers_mints_connection_table,
)
from .materialize import create_dataset
@ -22,6 +32,11 @@ derive_functions = {
"current_owners": current_owners,
"current_market_values": current_market_values,
"current_values_distribution": current_values_distribution,
"transfer_statistics_by_address": transfer_statistics_by_address,
"quartile_generating": quartile_generating,
"transfers_mints_connection_table": transfers_mints_connection_table,
"mint_holding_times": mint_holding_times,
"transfer_holding_times": transfer_holding_times,
}
@ -38,6 +53,35 @@ def handle_import_data(args: argparse.Namespace) -> None:
import_data(target_conn, source_conn, event_type, args.batch_size)
def handle_filter_data(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.source)) as source_conn:
if args.target == args.source and args.source is not None:
sqlite_path = f"{args.target}.dump"
else:
sqlite_path = args.target
print(f"Creating new database:{sqlite_path}")
copyfile(args.source, sqlite_path)
# do connection
with contextlib.closing(sqlite3.connect(sqlite_path)) as source_conn:
print("Start filtering")
filter_data(
source_conn, start_time=args.start_time, end_time=args.end_time,
)
print("Filtering end.")
for index, function_name in enumerate(derive_functions.keys()):
print(
f"Derive process {function_name} {index+1}/{len(derive_functions.keys())}"
)
derive_functions[function_name](source_conn)
# Apply derive to new data
def handle_materialize(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
bounds: Optional[BlockBounds] = None
@ -55,11 +99,7 @@ def handle_materialize(args: argparse.Namespace) -> None:
sqlite3.connect(args.datastore)
) as moonstream_datastore:
create_dataset(
moonstream_datastore,
db_session,
event_type,
bounds,
args.batch_size,
moonstream_datastore, db_session, event_type, bounds, args.batch_size,
)
@ -71,17 +111,11 @@ def handle_enrich(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
enrich(
moonstream_datastore,
EventType.TRANSFER,
batch_loader,
args.batch_size,
moonstream_datastore, EventType.TRANSFER, batch_loader, args.batch_size,
)
enrich(
moonstream_datastore,
EventType.MINT,
batch_loader,
args.batch_size,
moonstream_datastore, EventType.MINT, batch_loader, args.batch_size,
)
@ -188,9 +222,7 @@ def main() -> None:
description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.",
)
parser_import_data.add_argument(
"--target",
required=True,
help="Datastore into which you want to import data",
"--target", required=True, help="Datastore into which you want to import data",
)
parser_import_data.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
@ -210,6 +242,26 @@ def main() -> None:
)
parser_import_data.set_defaults(func=handle_import_data)
# Create dump of filtered data
parser_filtered_copy = subcommands.add_parser(
"filter-data", description="Create copy of database with applied filters.",
)
parser_filtered_copy.add_argument(
"--target", required=True, help="Datastore into which you want to import data",
)
parser_filtered_copy.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
)
parser_filtered_copy.add_argument(
"--start-time", required=False, type=int, help="Start timestamp.",
)
parser_filtered_copy.add_argument(
"--end-time", required=False, type=int, help="End timestamp.",
)
parser_filtered_copy.set_defaults(func=handle_filter_data)
parser_enrich = subcommands.add_parser(
"enrich", description="enrich dataset from geth node"
)

Wyświetl plik

@ -433,3 +433,32 @@ def import_data(
if source_offset is not None:
delete_checkpoints(target_conn, event_type, commit=False)
insert_checkpoint(target_conn, event_type, source_offset)
def filter_data(
sqlite_db: sqlite3.Connection,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
):
"""
Run Deletes query depends on filters
"""
cur = sqlite_db.cursor()
print(f"Remove by timestamp <= {start_time}")
if start_time:
cur.execute(f"DELETE from transfers where timestamp <= {start_time}")
print(f"Transfers filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp <= {start_time}")
print(f"Mints filtered out: {cur.rowcount}")
sqlite_db.commit()
print(f"Remove by timestamp >= {end_time}")
if end_time:
cur.execute(f"DELETE from transfers where timestamp >= {end_time}")
print(f"Transfers filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp >= {end_time}")
print(f"Mints filtered out: {cur.rowcount}")
sqlite_db.commit()

Wyświetl plik

@ -48,12 +48,38 @@ class LastNonzeroValue:
return self.value
class QuartileFunction:
""" Split vlues to quartiles """
def __init__(self, num_quartiles) -> None:
self.divider = 1 / num_quartiles
def __call__(self, value):
if value is None or value == "None":
value = 0
quartile = self.divider
try:
while value > quartile:
quartile += self.divider
if quartile > 1:
quartile = 1
return quartile
except Exception as err:
print(err)
raise
def ensure_custom_aggregate_functions(conn: sqlite3.Connection) -> None:
"""
Loads custom aggregate functions to an active SQLite3 connection.
"""
conn.create_aggregate("last_value", 1, LastValue)
conn.create_aggregate("last_nonzero_value", 1, LastNonzeroValue)
conn.create_function("quartile_10", 1, QuartileFunction(10))
conn.create_function("quartile_25", 1, QuartileFunction(25))
def current_owners(conn: sqlite3.Connection) -> None:
@ -111,7 +137,7 @@ def current_market_values(conn: sqlite3.Connection) -> None:
logger.error("Could not create derived dataset: current_market_values")
def current_values_distribution(conn: sqlite3.Connection) -> List[Tuple]:
def current_values_distribution(conn: sqlite3.Connection) -> None:
"""
Requires a connection to a dataset in which current_market_values has already been loaded.
"""
@ -120,8 +146,23 @@ def current_values_distribution(conn: sqlite3.Connection) -> List[Tuple]:
"DROP TABLE IF EXISTS market_values_distribution;"
)
current_values_distribution_query = """
CREATE TABLE market_values_distribution AS
select nft_address as address, market_value as value, CUME_DIST() over (PARTITION BY nft_address ORDER BY market_value) as cumulate_value from current_market_values;"""
CREATE TABLE market_values_distribution AS
select
current_market_values.nft_address as address,
current_market_values.token_id as token_id,
CAST(current_market_values.market_value as REAL) / max_values.max_value as relative_value
from
current_market_values
inner join (
select
nft_address,
max(market_value) as max_value
from
current_market_values
group by
nft_address
) as max_values on current_market_values.nft_address = max_values.nft_address;
"""
cur = conn.cursor()
try:
cur.execute(drop_existing_values_distribution_query)
@ -131,3 +172,289 @@ def current_values_distribution(conn: sqlite3.Connection) -> List[Tuple]:
conn.rollback()
logger.error("Could not create derived dataset: current_values_distribution")
logger.error(e)
def transfer_statistics_by_address(conn: sqlite3.Connection) -> None:
"""
Create transfer in and transfer out for each address.
"""
drop_existing_transfer_statistics_by_address_query = (
"DROP TABLE IF EXISTS transfer_statistics_by_address;"
)
transfer_statistics_by_address_query = """
CREATE TABLE transfer_statistics_by_address AS
SELECT
address,
sum(transfer_out) as transfers_out,
sum(transfer_in) as transfers_in
from
(
SELECT
from_address as address,
1 as transfer_out,
0 as transfer_in
from
transfers
UNION
ALL
select
to_address as address,
0 as transfer_out,
1 as transfer_in
from
transfers
)
group by
address;
"""
cur = conn.cursor()
try:
cur.execute(drop_existing_transfer_statistics_by_address_query)
cur.execute(transfer_statistics_by_address_query)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: transfer_statistics_by_address")
logger.error(e)
def quartile_generating(conn: sqlite3.Connection):
"""
Create quartile wich depends on setted on class defenition
"""
ensure_custom_aggregate_functions(conn)
drop_calculate_10_quartiles = (
"DROP TABLE IF EXISTS transfer_values_quartile_10_distribution_per_address;"
)
calculate_10_quartiles = """
CREATE TABLE transfer_values_quartile_10_distribution_per_address AS
select
cumulate.address as address,
CAST(quartile_10(cumulate.relative_value) as TEXT) as quartiles,
cumulate.relative_value as relative_value
from
(
select
current_market_values.nft_address as address,
COALESCE(
CAST(current_market_values.market_value as REAL) / max_values.max_value,
0
) as relative_value
from
current_market_values
inner join (
select
current_market_values.nft_address,
max(market_value) as max_value
from
current_market_values
group by
current_market_values.nft_address
) as max_values on current_market_values.nft_address = max_values.nft_address
) as cumulate
"""
drop_calculate_25_quartiles = (
"DROP TABLE IF EXISTS transfer_values_quartile_25_distribution_per_address;"
)
calculate_25_quartiles = """
CREATE TABLE transfer_values_quartile_25_distribution_per_address AS
select
cumulate.address as address,
CAST(quartile_25(cumulate.relative_value) as TEXT) as quartiles,
cumulate.relative_value as relative_value
from
(
select
current_market_values.nft_address as address,
COALESCE(
CAST(current_market_values.market_value as REAL) / max_values.max_value,
0
) as relative_value
from
current_market_values
inner join (
select
current_market_values.nft_address,
max(market_value) as max_value
from
current_market_values
group by
current_market_values.nft_address
) as max_values on current_market_values.nft_address = max_values.nft_address
) as cumulate
"""
cur = conn.cursor()
try:
print("Creating transfer_values_quartile_10_distribution_per_address")
cur.execute(drop_calculate_10_quartiles)
cur.execute(calculate_10_quartiles)
print("Creating transfer_values_quartile_25_distribution_per_address")
cur.execute(drop_calculate_25_quartiles)
cur.execute(calculate_25_quartiles)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: quartile_generating")
logger.error(e)
def transfers_mints_connection_table(conn: sqlite3.Connection):
"""
Create cinnection transfers and mints
"""
drop_transfers_mints_connection = "DROP TABLE IF EXISTS transfers_mints;"
transfers_mints_connection = """
CREATE TABLE transfers_mints as
select
transfers.event_id as transfer_id,
mints.mint_id as mint_id
from
transfers
inner join (
select
Max(posable_mints.mints_time) as mint_time,
posable_mints.transfer_id as transfer_id
from
(
select
mint_id,
mints.timestamp as mints_time,
transfers.token_id,
transfers.timestamp,
transfers.event_id as transfer_id
from
transfers
inner join (
select
mints.event_id as mint_id,
mints.nft_address,
mints.token_id,
mints.timestamp
from
mints
group by
mints.nft_address,
mints.token_id,
mints.timestamp
) as mints on transfers.nft_address = mints.nft_address
and transfers.token_id = mints.token_id
and mints.timestamp <= transfers.timestamp
) as posable_mints
group by
posable_mints.transfer_id
) as mint_time on mint_time.transfer_id = transfers.event_id
inner join (
select
mints.event_id as mint_id,
mints.nft_address,
mints.token_id,
mints.timestamp
from
mints
) as mints on transfers.nft_address = mints.nft_address
and transfers.token_id = mints.token_id
and mints.timestamp = mint_time.mint_time;
"""
cur = conn.cursor()
try:
cur.execute(drop_transfers_mints_connection)
cur.execute(transfers_mints_connection)
conn.commit()
except Exception as e:
conn.rollback()
logger.error(
"Could not create derived dataset: transfers_mints_connection_table"
)
logger.error(e)
def mint_holding_times(conn: sqlite3.Connection):
drop_mints_holding_table = "DROP TABLE IF EXISTS mint_holding_times;"
mints_holding_table = """
CREATE TABLE mint_holding_times AS
SELECT
days_after_minted.days as days,
count(*) as num_holds
from
(
SELECT
mints.nft_address,
mints.token_id,
(
firsts_transfers.firts_transfer - mints.timestamp
) / 86400 as days
from
mints
inner join (
select
transfers_mints.mint_id,
transfers.nft_address,
transfers.token_id,
min(transfers.timestamp) as firts_transfer
from
transfers
inner join transfers_mints on transfers_mints.transfer_id = transfers.event_id
group by
transfers.nft_address,
transfers.token_id,
transfers_mints.mint_id
) as firsts_transfers on firsts_transfers.mint_id = mints.event_id
) as days_after_minted
group by days;
"""
cur = conn.cursor()
try:
cur.execute(drop_mints_holding_table)
cur.execute(mints_holding_table)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: mint_holding_times")
logger.error(e)
def transfer_holding_times(conn: sqlite3.Connection):
"""
Create distributions of holding times beetween transfers
"""
drop_transfer_holding_times = "DROP TABLE IF EXISTS transfer_holding_times;"
transfer_holding_times = """
CREATE TABLE transfer_holding_times AS
select days_beetween.days as days, count(*) as num_holds
from (SELECT
middle.address,
middle.token_id,
(middle.LEAD - middle.timestamp) / 86400 as days
from
(
SELECT
nft_address AS address,
token_id as token_id,
timestamp as timestamp,
LEAD(timestamp, 1, Null) OVER (
PARTITION BY nft_address,
token_id
ORDER BY
timestamp
) as LEAD
FROM
transfers
) as middle
where
LEAD is not Null
) as days_beetween
group by days;
"""
cur = conn.cursor()
try:
cur.execute(drop_transfer_holding_times)
cur.execute(transfer_holding_times)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: transfer_holding_times")
logger.error(e)

Wyświetl plik

@ -66,9 +66,7 @@ def add_events(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.order_by(
EthereumLabel.created_at.asc(),
)
.order_by(EthereumLabel.created_at.asc(),)
)
if bounds is not None:
time_filters = [EthereumTransaction.block_number >= bounds.starting_block]
@ -147,12 +145,7 @@ def create_dataset(
add_contracts_metadata(datastore_conn, db_session, offset, batch_size)
else:
add_events(
datastore_conn,
db_session,
event_type,
offset,
bounds,
batch_size,
datastore_conn, db_session, event_type, offset, bounds, batch_size,
)