Add ability copy sqlite3 database with aplling filter.

pull/304/head
Andrey Dolgolev 2021-10-05 19:33:53 +03:00
rodzic 6ed3bc3122
commit 45b84785e3
2 zmienionych plików z 113 dodań i 17 usunięć

Wyświetl plik

@ -3,13 +3,14 @@ import contextlib
import logging import logging
import os import os
import sqlite3 import sqlite3
from shutil import copyfile
from typing import Optional from typing import Optional
from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.db import yield_db_session_ctx
from .enrich import EthereumBatchloader, enrich from .enrich import EthereumBatchloader, enrich
from .data import EventType, event_types, nft_event, BlockBounds from .data import EventType, event_types, nft_event, BlockBounds
from .datastore import setup_database, import_data from .datastore import setup_database, import_data, filter_data
from .derive import current_owners, current_market_values, current_values_distribution from .derive import current_owners, current_market_values, current_values_distribution
from .materialize import create_dataset from .materialize import create_dataset
@ -38,6 +39,54 @@ def handle_import_data(args: argparse.Namespace) -> None:
import_data(target_conn, source_conn, event_type, args.batch_size) import_data(target_conn, source_conn, event_type, args.batch_size)
def handel_generate_filtered(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.source)) as source_conn:
if not args.target:
# generate name if not set
path_list = args.source.split("/")
file = path_list.pop()
old_prefix, ext = file.split(".")
new_prefix = old_prefix
if args.start_time:
new_prefix += f"-{args.start_time}"
if args.end_time:
new_prefix += f"-{args.end_time}"
if args.type:
new_prefix += f"-{args.type}"
name = f"{new_prefix}.{ext}"
else:
name = f"{args.target}"
if name == args.source:
name = f"{name}.dump"
path_list.append(name)
print(f"Creating new database:{name}")
new_db_path = "/".join(path_list)
copyfile(args.source, new_db_path)
# with io.open(name, "w") as f:
# for linha in source_conn.iterdump():
# f.write("%s\n" % linha)
os
# do connection
with contextlib.closing(sqlite3.connect(new_db_path)) as source_conn:
print("Start filtering")
filter_data(source_conn, args)
print("Filtering end.")
for index, function_name in enumerate(derive_functions.keys()):
print(
f"Derive process {function_name} {index+1}/{len(derive_functions.keys())}"
)
derive_functions[function_name](source_conn)
# Apply derive to new data
def handle_materialize(args: argparse.Namespace) -> None: def handle_materialize(args: argparse.Namespace) -> None:
event_type = nft_event(args.type) event_type = nft_event(args.type)
bounds: Optional[BlockBounds] = None bounds: Optional[BlockBounds] = None
@ -55,11 +104,7 @@ def handle_materialize(args: argparse.Namespace) -> None:
sqlite3.connect(args.datastore) sqlite3.connect(args.datastore)
) as moonstream_datastore: ) as moonstream_datastore:
create_dataset( create_dataset(
moonstream_datastore, moonstream_datastore, db_session, event_type, bounds, args.batch_size,
db_session,
event_type,
bounds,
args.batch_size,
) )
@ -71,17 +116,11 @@ def handle_enrich(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore: with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
enrich( enrich(
moonstream_datastore, moonstream_datastore, EventType.TRANSFER, batch_loader, args.batch_size,
EventType.TRANSFER,
batch_loader,
args.batch_size,
) )
enrich( enrich(
moonstream_datastore, moonstream_datastore, EventType.MINT, batch_loader, args.batch_size,
EventType.MINT,
batch_loader,
args.batch_size,
) )
@ -188,9 +227,7 @@ def main() -> None:
description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.", description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.",
) )
parser_import_data.add_argument( parser_import_data.add_argument(
"--target", "--target", required=True, help="Datastore into which you want to import data",
required=True,
help="Datastore into which you want to import data",
) )
parser_import_data.add_argument( parser_import_data.add_argument(
"--source", required=True, help="Datastore from which you want to import data" "--source", required=True, help="Datastore from which you want to import data"
@ -210,6 +247,31 @@ def main() -> None:
) )
parser_import_data.set_defaults(func=handle_import_data) parser_import_data.set_defaults(func=handle_import_data)
# crete dump for apply filters
parser_filtered_copy = subcommands.add_parser(
"copy", description="Create copy of database with applied filters.",
)
parser_filtered_copy.add_argument(
"--target", help="Datastore into which you want to import data",
)
parser_filtered_copy.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
)
parser_filtered_copy.add_argument(
"--type",
choices=event_types,
help="Type of data you would like to import from source to target",
)
parser_filtered_copy.add_argument(
"--start-time", required=False, type=int, help="Start timestamp.",
)
parser_filtered_copy.add_argument(
"--end-time", required=False, type=int, help="End timestamp.",
)
parser_filtered_copy.set_defaults(func=handel_generate_filtered)
parser_enrich = subcommands.add_parser( parser_enrich = subcommands.add_parser(
"enrich", description="enrich dataset from geth node" "enrich", description="enrich dataset from geth node"
) )

Wyświetl plik

@ -433,3 +433,37 @@ def import_data(
if source_offset is not None: if source_offset is not None:
delete_checkpoints(target_conn, event_type, commit=False) delete_checkpoints(target_conn, event_type, commit=False)
insert_checkpoint(target_conn, event_type, source_offset) insert_checkpoint(target_conn, event_type, source_offset)
def filter_data(sqlite_db: sqlite3.Connection, cli_args):
"""
Run Deletes query depends on filters
"""
cur = sqlite_db.cursor()
print(f"Remove by timestamp < {cli_args.start_time}")
if cli_args.start_time:
cur.execute(f"DELETE from transfers where timestamp < {cli_args.start_time}")
print(f"filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp < {cli_args.start_time}")
print(f"filtered out: {cur.rowcount}")
sqlite_db.commit()
print(f"Remove by timestamp > {cli_args.end_time}")
if cli_args.end_time:
cur.execute(f"DELETE from transfers where timestamp > {cli_args.end_time}")
print(f"filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp > {cli_args.end_time}")
print(f"filtered out: {cur.rowcount}")
sqlite_db.commit()
# print(f"Remove by type != '{cli_args.type}")
# if cli_args.type:
# cur.execute(f"DELETE from transfers where type != '{cli_args.type}'")
# print(f"filtered out: {cur.rowcount}")
# sqlite_db.commit()
# cur.execute(f"DELETE from mints where type != '{cli_args.type}'")
# print(f"filtered out: {cur.rowcount}")
# sqlite_db.commit()