2021-09-26 18:14:40 +00:00
import argparse
import contextlib
2021-09-26 20:06:09 +00:00
import logging
2021-09-25 19:44:41 +00:00
import os
2021-09-26 18:14:40 +00:00
import sqlite3
2021-10-05 16:33:53 +00:00
from shutil import copyfile
2021-10-02 18:16:16 +00:00
from typing import Optional
2021-09-25 19:44:41 +00:00
2021-09-25 17:23:33 +00:00
from moonstreamdb . db import yield_db_session_ctx
2021-09-26 18:14:40 +00:00
2021-10-04 16:29:57 +00:00
from . enrich import EthereumBatchloader , enrich
2021-10-04 16:24:11 +00:00
from . data import EventType , event_types , nft_event , BlockBounds
2021-10-05 16:33:53 +00:00
from . datastore import setup_database , import_data , filter_data
2021-10-04 15:30:23 +00:00
from . derive import current_owners , current_market_values , current_values_distribution
2021-10-04 16:24:11 +00:00
from . materialize import create_dataset
2021-09-26 18:14:40 +00:00
2021-09-26 20:06:09 +00:00
logging . basicConfig ( level = logging . INFO )
logger = logging . getLogger ( __name__ )
2021-10-04 13:49:35 +00:00
derive_functions = {
" current_owners " : current_owners ,
2021-10-04 15:30:23 +00:00
" current_market_values " : current_market_values ,
2021-10-04 13:49:35 +00:00
" current_values_distribution " : current_values_distribution ,
2021-10-05 16:49:30 +00:00
" transfer_statistics_by_address " : transfer_statistics_by_address ,
2021-10-04 13:49:35 +00:00
}
2021-09-26 18:14:40 +00:00
def handle_initdb ( args : argparse . Namespace ) - > None :
with contextlib . closing ( sqlite3 . connect ( args . datastore ) ) as conn :
setup_database ( conn )
2021-10-02 18:16:16 +00:00
def handle_import_data ( args : argparse . Namespace ) - > None :
event_type = nft_event ( args . type )
with contextlib . closing (
sqlite3 . connect ( args . target )
) as target_conn , contextlib . closing ( sqlite3 . connect ( args . source ) ) as source_conn :
import_data ( target_conn , source_conn , event_type , args . batch_size )
2021-10-05 16:33:53 +00:00
def handel_generate_filtered ( args : argparse . Namespace ) - > None :
with contextlib . closing ( sqlite3 . connect ( args . source ) ) as source_conn :
if not args . target :
# generate name if not set
path_list = args . source . split ( " / " )
file = path_list . pop ( )
old_prefix , ext = file . split ( " . " )
new_prefix = old_prefix
if args . start_time :
new_prefix + = f " - { args . start_time } "
if args . end_time :
new_prefix + = f " - { args . end_time } "
if args . type :
new_prefix + = f " - { args . type } "
name = f " { new_prefix } . { ext } "
else :
name = f " { args . target } "
if name == args . source :
name = f " { name } .dump "
path_list . append ( name )
print ( f " Creating new database: { name } " )
new_db_path = " / " . join ( path_list )
copyfile ( args . source , new_db_path )
# do connection
with contextlib . closing ( sqlite3 . connect ( new_db_path ) ) as source_conn :
print ( " Start filtering " )
filter_data ( source_conn , args )
print ( " Filtering end. " )
for index , function_name in enumerate ( derive_functions . keys ( ) ) :
print (
f " Derive process { function_name } { index + 1 } / { len ( derive_functions . keys ( ) ) } "
)
derive_functions [ function_name ] ( source_conn )
# Apply derive to new data
2021-09-26 18:14:40 +00:00
def handle_materialize ( args : argparse . Namespace ) - > None :
event_type = nft_event ( args . type )
2021-09-26 20:06:09 +00:00
bounds : Optional [ BlockBounds ] = None
if args . start is not None :
bounds = BlockBounds ( starting_block = args . start , ending_block = args . end )
elif args . end is not None :
raise ValueError ( " You cannot set --end unless you also set --start " )
2021-09-29 19:56:39 +00:00
batch_loader = EthereumBatchloader ( jsonrpc_url = args . jsonrpc )
2021-09-28 13:50:02 +00:00
2021-09-26 20:06:09 +00:00
logger . info ( f " Materializing NFT events to datastore: { args . datastore } " )
logger . info ( f " Block bounds: { bounds } " )
2021-09-26 18:14:40 +00:00
with yield_db_session_ctx ( ) as db_session , contextlib . closing (
sqlite3 . connect ( args . datastore )
) as moonstream_datastore :
2021-09-26 20:06:09 +00:00
create_dataset (
2021-10-05 16:33:53 +00:00
moonstream_datastore , db_session , event_type , bounds , args . batch_size ,
2021-09-26 20:06:09 +00:00
)
2021-09-26 18:14:40 +00:00
2021-10-04 16:24:11 +00:00
def handle_enrich ( args : argparse . Namespace ) - > None :
batch_loader = EthereumBatchloader ( jsonrpc_url = args . jsonrpc )
logger . info ( f " Enriching NFT events in datastore: { args . datastore } " )
with contextlib . closing ( sqlite3 . connect ( args . datastore ) ) as moonstream_datastore :
enrich (
2021-10-05 16:33:53 +00:00
moonstream_datastore , EventType . TRANSFER , batch_loader , args . batch_size ,
2021-10-04 16:24:11 +00:00
)
enrich (
2021-10-05 16:33:53 +00:00
moonstream_datastore , EventType . MINT , batch_loader , args . batch_size ,
2021-10-04 16:24:11 +00:00
)
2021-09-29 19:56:39 +00:00
def handle_derive ( args : argparse . Namespace ) - > None :
with contextlib . closing ( sqlite3 . connect ( args . datastore ) ) as moonstream_datastore :
2021-10-04 13:49:35 +00:00
calling_functions = [ ]
if not args . derive_functions :
calling_functions . extend ( derive_functions . keys ( ) )
else :
calling_functions . extend ( args . derive_functions )
for function_name in calling_functions :
if function_name in calling_functions :
derive_functions [ function_name ] ( moonstream_datastore )
2021-09-29 19:56:39 +00:00
logger . info ( " Done! " )
2021-09-26 18:14:40 +00:00
def main ( ) - > None :
"""
" nfts " command handler .
When reading this code , to find the definition of any of the " nfts " subcommands , grep for comments
of the form :
# Command: nfts <subcommand>
"""
default_web3_provider = os . environ . get ( " MOONSTREAM_WEB3_PROVIDER " )
2021-09-29 19:56:39 +00:00
if default_web3_provider is not None and not default_web3_provider . startswith (
" http "
) :
raise ValueError (
f " Please either unset MOONSTREAM_WEB3_PROVIDER environment variable or set it to an HTTP/HTTPS URL. Current value: { default_web3_provider } "
)
2021-09-26 18:14:40 +00:00
parser = argparse . ArgumentParser (
description = " Tools to work with the Moonstream NFTs dataset "
)
subcommands = parser . add_subparsers ( title = " Subcommands " )
# Command: nfts initdb
parser_initdb = subcommands . add_parser (
" initdb " ,
description = " Initialize an SQLite datastore for the Moonstream NFTs dataset " ,
)
parser_initdb . add_argument ( " datastore " )
parser_initdb . set_defaults ( func = handle_initdb )
# Command: nfts materialize
parser_materialize = subcommands . add_parser (
" materialize " , description = " Create/update the NFTs dataset "
)
parser_materialize . add_argument (
" -d " ,
" --datastore " ,
required = True ,
help = " Path to SQLite database representing the dataset " ,
)
parser_materialize . add_argument (
2021-09-29 19:56:39 +00:00
" --jsonrpc " ,
2021-09-28 13:50:02 +00:00
default = default_web3_provider ,
type = str ,
help = f " Http uri provider to use when collecting data directly from the Ethereum blockchain (default: { default_web3_provider } ) " ,
)
2021-09-26 18:14:40 +00:00
parser_materialize . add_argument (
" -t " ,
" --type " ,
choices = event_types ,
help = " Type of event to materialize intermediate data for " ,
)
2021-09-26 20:06:09 +00:00
parser_materialize . add_argument (
" --start " , type = int , default = None , help = " Starting block number "
)
parser_materialize . add_argument (
" --end " , type = int , default = None , help = " Ending block number "
)
parser_materialize . add_argument (
" -n " ,
" --batch-size " ,
type = int ,
default = 1000 ,
help = " Number of events to process per batch " ,
)
2021-09-26 18:14:40 +00:00
parser_materialize . set_defaults ( func = handle_materialize )
2021-09-29 19:56:39 +00:00
parser_derive = subcommands . add_parser (
" derive " , description = " Create/updated derived data in the dataset "
)
parser_derive . add_argument (
" -d " ,
" --datastore " ,
required = True ,
help = " Path to SQLite database representing the dataset " ,
)
2021-10-04 13:49:35 +00:00
parser_derive . add_argument (
" -f " ,
" --derive_functions " ,
required = False ,
nargs = " + " ,
help = f " Functions wich will call from derive module availabel { list ( derive_functions . keys ( ) ) } " ,
)
2021-09-29 19:56:39 +00:00
parser_derive . set_defaults ( func = handle_derive )
2021-10-02 18:16:16 +00:00
parser_import_data = subcommands . add_parser (
" import-data " ,
description = " Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore. " ,
)
parser_import_data . add_argument (
2021-10-05 16:33:53 +00:00
" --target " , required = True , help = " Datastore into which you want to import data " ,
2021-10-02 18:16:16 +00:00
)
parser_import_data . add_argument (
" --source " , required = True , help = " Datastore from which you want to import data "
)
parser_import_data . add_argument (
" --type " ,
required = True ,
choices = event_types ,
help = " Type of data you would like to import from source to target " ,
)
parser_import_data . add_argument (
" -N " ,
" --batch-size " ,
type = int ,
default = 10000 ,
help = " Batch size for database commits into target datastore. " ,
)
parser_import_data . set_defaults ( func = handle_import_data )
2021-10-05 16:33:53 +00:00
# crete dump for apply filters
parser_filtered_copy = subcommands . add_parser (
" copy " , description = " Create copy of database with applied filters. " ,
)
parser_filtered_copy . add_argument (
" --target " , help = " Datastore into which you want to import data " ,
)
parser_filtered_copy . add_argument (
" --source " , required = True , help = " Datastore from which you want to import data "
)
parser_filtered_copy . add_argument (
" --type " ,
choices = event_types ,
help = " Type of data you would like to import from source to target " ,
)
parser_filtered_copy . add_argument (
" --start-time " , required = False , type = int , help = " Start timestamp. " ,
)
parser_filtered_copy . add_argument (
" --end-time " , required = False , type = int , help = " End timestamp. " ,
)
parser_filtered_copy . set_defaults ( func = handel_generate_filtered )
2021-10-04 16:24:11 +00:00
parser_enrich = subcommands . add_parser (
" enrich " , description = " enrich dataset from geth node "
)
parser_enrich . add_argument (
" -d " ,
" --datastore " ,
required = True ,
help = " Path to SQLite database representing the dataset " ,
)
parser_enrich . add_argument (
" --jsonrpc " ,
default = default_web3_provider ,
type = str ,
help = f " Http uri provider to use when collecting data directly from the Ethereum blockchain (default: { default_web3_provider } ) " ,
)
parser_enrich . add_argument (
" -n " ,
" --batch-size " ,
type = int ,
default = 1000 ,
help = " Number of events to process per batch " ,
)
parser_enrich . set_defaults ( func = handle_enrich )
2021-09-26 18:14:40 +00:00
args = parser . parse_args ( )
args . func ( args )
2021-09-25 17:23:33 +00:00
if __name__ == " __main__ " :
2021-09-26 18:14:40 +00:00
main ( )