Add distribution values request.

pull/304/head
Andrey Dolgolev 2021-10-04 16:49:35 +03:00
rodzic 4be1fe4d7a
commit 3e4acd4d5c
2 zmienionych plików z 46 dodań i 2 usunięć

Wyświetl plik

@ -10,7 +10,7 @@ from web3 import Web3, IPCProvider, HTTPProvider
from .data import event_types, nft_event, BlockBounds
from .datastore import setup_database
from .derive import current_owners
from .derive import current_owners, current_values_distribution
from .materialize import create_dataset, EthereumBatchloader
@ -18,6 +18,12 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
derive_functions = {
"current_owners": current_owners,
"current_values_distribution": current_values_distribution,
}
def handle_initdb(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as conn:
setup_database(conn)
@ -51,7 +57,15 @@ def handle_materialize(args: argparse.Namespace) -> None:
def handle_derive(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
results = current_owners(moonstream_datastore)
calling_functions = []
if not args.derive_functions:
calling_functions.extend(derive_functions.keys())
else:
calling_functions.extend(args.derive_functions)
for function_name in calling_functions:
if function_name in calling_functions:
derive_functions[function_name](moonstream_datastore)
logger.info("Done!")
@ -130,6 +144,13 @@ def main() -> None:
required=True,
help="Path to SQLite database representing the dataset",
)
parser_derive.add_argument(
"-f",
"--derive_functions",
required=False,
nargs="+",
help=f"Functions wich will call from derive module availabel {list(derive_functions.keys())}",
)
parser_derive.set_defaults(func=handle_derive)
args = parser.parse_args()

Wyświetl plik

@ -57,3 +57,26 @@ def current_owners(conn: sqlite3.Connection) -> List[Tuple]:
conn.rollback()
logger.error("Could not create derived dataset: current_owners")
logger.error(e)
def current_values_distribution(conn: sqlite3.Connection) -> List[Tuple]:
"""
Requires a connection to a dataset in which the raw data (esp. transfers) has already been
loaded.
"""
ensure_custom_aggregate_functions(conn)
drop_existing_values_distribution_query = (
"DROP TABLE IF EXISTS market_values_distribution;"
)
current_values_distribution_query = """
CREATE TABLE market_values_distribution AS
select nft_address as address, market_value as value, CUME_DIST() over (PARTITION BY nft_address ORDER BY market_value) as cumulate_value from current_market_values;"""
cur = conn.cursor()
try:
cur.execute(drop_existing_values_distribution_query)
cur.execute(current_values_distribution_query)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: current_owners")
logger.error(e)