Added "nfts derive" command

Removed old web3 provider code in favor of @Andrei-Dolgolev's batching
interface.
pull/304/head
Neeraj Kashyap 2021-09-29 12:56:39 -07:00
rodzic c0d3f23500
commit 4be1fe4d7a
4 zmienionych plików z 121 dodań i 47 usunięć

Wyświetl plik

@ -10,6 +10,7 @@ from web3 import Web3, IPCProvider, HTTPProvider
from .data import event_types, nft_event, BlockBounds
from .datastore import setup_database
from .derive import current_owners
from .materialize import create_dataset, EthereumBatchloader
@ -17,24 +18,6 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def web3_connection(web3_uri: Optional[str] = None) -> Web3:
"""
Connect to the given web3 provider. You may specify a web3 provider either as a path to an IPC
socket on your filesystem or as an HTTP(S) URI to a JSON RPC provider.
If web3_uri is not provided or is set to None, this function attempts to use the default behavior
of the web3.py IPCProvider (one of the steps is looking for .ethereum/geth.ipc, but there may be others).
"""
web3_provider: Union[IPCProvider, HTTPProvider] = Web3.IPCProvider()
if web3_uri is not None:
if web3_uri.startswith("http://") or web3_uri.startswith("https://"):
web3_provider = Web3.HTTPProvider(web3_uri)
else:
web3_provider = Web3.IPCProvider(web3_uri)
web3_client = Web3(web3_provider)
return web3_client
def handle_initdb(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as conn:
setup_database(conn)
@ -48,7 +31,7 @@ def handle_materialize(args: argparse.Namespace) -> None:
elif args.end is not None:
raise ValueError("You cannot set --end unless you also set --start")
batch_loader = EthereumBatchloader(jrpc_url=args.jrpc)
batch_loader = EthereumBatchloader(jsonrpc_url=args.jsonrpc)
logger.info(f"Materializing NFT events to datastore: {args.datastore}")
logger.info(f"Block bounds: {bounds}")
@ -59,7 +42,6 @@ def handle_materialize(args: argparse.Namespace) -> None:
create_dataset(
moonstream_datastore,
db_session,
args.web3,
event_type,
batch_loader,
bounds,
@ -67,6 +49,12 @@ def handle_materialize(args: argparse.Namespace) -> None:
)
def handle_derive(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
results = current_owners(moonstream_datastore)
logger.info("Done!")
def main() -> None:
"""
"nfts" command handler.
@ -76,6 +64,12 @@ def main() -> None:
# Command: nfts <subcommand>
"""
default_web3_provider = os.environ.get("MOONSTREAM_WEB3_PROVIDER")
if default_web3_provider is not None and not default_web3_provider.startswith(
"http"
):
raise ValueError(
f"Please either unset MOONSTREAM_WEB3_PROVIDER environment variable or set it to an HTTP/HTTPS URL. Current value: {default_web3_provider}"
)
parser = argparse.ArgumentParser(
description="Tools to work with the Moonstream NFTs dataset"
@ -101,13 +95,7 @@ def main() -> None:
help="Path to SQLite database representing the dataset",
)
parser_materialize.add_argument(
"--web3",
default=default_web3_provider,
type=web3_connection,
help=f"Web3 provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
)
parser_materialize.add_argument(
"--jrpc",
"--jsonrpc",
default=default_web3_provider,
type=str,
help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
@ -133,6 +121,17 @@ def main() -> None:
)
parser_materialize.set_defaults(func=handle_materialize)
parser_derive = subcommands.add_parser(
"derive", description="Create/updated derived data in the dataset"
)
parser_derive.add_argument(
"-d",
"--datastore",
required=True,
help="Path to SQLite database representing the dataset",
)
parser_derive.set_defaults(func=handle_derive)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -14,7 +14,7 @@ logger = logging.getLogger(__name__)
event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"}
CREATE_NFTS_TABLE_QUERY = """CREATE TABLE nfts
CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts
(
address TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
name TEXT,
@ -22,8 +22,8 @@ CREATE_NFTS_TABLE_QUERY = """CREATE TABLE nfts
);
"""
CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE checkpoint
(
CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint
(
event_type STRING,
offset INTEGER,
transaction_hash STRING
@ -33,7 +33,7 @@ CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE checkpoint
def create_events_table_query(event_type: EventType) -> str:
creation_query = f"""
CREATE TABLE {event_tables[event_type]}
CREATE TABLE IF NOT EXISTS {event_tables[event_type]}
(
transaction_hash TEXT,
block_number INTEGER,
@ -53,10 +53,12 @@ def setup_database(conn: sqlite3.Connection) -> None:
Sets up the schema of the Moonstream NFTs dataset in the given SQLite database.
"""
cur = conn.cursor()
cur.execute(CREATE_NFTS_TABLE_QUERY)
cur.execute(create_events_table_query(EventType.TRANSFER))
cur.execute(create_events_table_query(EventType.MINT))
cur.execute(CREATE_CHECKPOINT_TABLE_QUERY)
conn.commit()

Wyświetl plik

@ -0,0 +1,59 @@
"""
Tools to build derived relations from raw data (nfts, transfers, mints relations).
For example:
- Current owner of each token
- Current value of each token
"""
import logging
from typing import List, Tuple
import sqlite3
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)
class LastValue:
"""
Stores the last seen value in a given column. This is meant to be used as an aggregate function.
We use it, for example, to get the current owner of an NFT (inside a given window of time).
"""
def __init__(self):
self.value = None
def step(self, value):
self.value = value
def finalize(self):
return self.value
def ensure_custom_aggregate_functions(conn: sqlite3.Connection) -> None:
"""
Loads custom aggregate functions to an active SQLite3 connection.
"""
conn.create_aggregate("last_value", 1, LastValue)
def current_owners(conn: sqlite3.Connection) -> List[Tuple]:
"""
Requires a connection to a dataset in which the raw data (esp. transfers) has already been
loaded.
"""
ensure_custom_aggregate_functions(conn)
drop_existing_current_owners_query = "DROP TABLE IF EXISTS current_owners;"
current_owners_query = """
CREATE TABLE current_owners AS
SELECT nft_address, token_id, CAST(last_value(to_address) AS TEXT) AS owner FROM transfers
GROUP BY nft_address, token_id;"""
cur = conn.cursor()
try:
cur.execute(drop_existing_current_owners_query)
cur.execute(current_owners_query)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("Could not create derived dataset: current_owners")
logger.error(e)

Wyświetl plik

@ -1,3 +1,4 @@
from dataclasses import is_dataclass
import logging
import sqlite3
from typing import Any, cast, Iterator, List, Optional, Set
@ -23,9 +24,8 @@ logger = logging.getLogger(__name__)
class EthereumBatchloader:
def __init__(self, jrpc_url) -> None:
self.jrpc_url = jrpc_url
def __init__(self, jsonrpc_url) -> None:
self.jsonrpc_url = jsonrpc_url
self.message_number = 0
self.commands: List[Any] = []
self.requests_banch: List[Any] = []
@ -69,7 +69,9 @@ class EthereumBatchloader:
headers = {"Content-Type": "application/json"}
try:
r = requests.post(self.jrpc_url, headers=headers, data=payload, timeout=300)
r = requests.post(
self.jsonrpc_url, headers=headers, data=payload, timeout=300
)
except Exception as e:
print(e)
return r
@ -82,9 +84,9 @@ class EthereumBatchloader:
def enrich_from_web3(
web3_client: Web3,
nft_events: List[NFTEvent],
batch_loader: EthereumBatchloader,
bounds: Optional[BlockBounds] = None,
) -> List[NFTEvent]:
"""
Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db)
@ -102,15 +104,16 @@ def enrich_from_web3(
if len(transactions_to_query) == 0:
return nft_events
logger.info("Calling jrpc")
jrpc_response = batch_loader.load_transactions(list(transactions_to_query))
breakpoint()
logger.info("Calling JSON RPC API")
jsonrpc_transactions_response = batch_loader.load_transactions(
list(transactions_to_query)
)
transactions_map = {
result["result"]["hash"]: (
int(result["result"]["value"], 16),
int(result["result"]["blockNumber"], 16),
)
for result in jrpc_response
for result in jsonrpc_transactions_response
}
blocks_to_query: Set[int] = set()
@ -122,14 +125,26 @@ def enrich_from_web3(
if len(blocks_to_query) == 0:
return nft_events
jrpc_response = batch_loader.load_blocks(list(blocks_to_query), False)
jsonrpc_blocks_response = batch_loader.load_blocks(list(blocks_to_query), False)
blocks_map = {
int(result["result"]["number"], 16): int(result["result"]["timestamp"], 16)
for result in jrpc_response
for result in jsonrpc_blocks_response
}
for index in indices_to_update:
nft_events[index].timestamp = blocks_map[cast(int, nft_event.block_number)]
return nft_events
def check_bounds(event: NFTEvent) -> bool:
if bounds is None:
return True
is_admissible = True
if event.block_number < bounds.starting_block:
is_admissible = False
if bounds.ending_block is not None and event.block_number > bounds.ending_block:
is_admissible = False
return is_admissible
admissible_events = [event for event in nft_events if check_bounds(event)]
return admissible_events
def get_events(
@ -204,7 +219,6 @@ def get_events(
def create_dataset(
datastore_conn: sqlite3.Connection,
db_session: Session,
web3_client: Web3,
event_type: EventType,
batch_loader: EthereumBatchloader,
bounds: Optional[BlockBounds] = None,
@ -220,7 +234,7 @@ def create_dataset(
offset = 0
logger.info(f"Did not found any checkpoint for {event_type.value}")
raw_events = get_events(db_session, event_type, None, offset, batch_size)
raw_events = get_events(db_session, event_type, bounds, offset, batch_size)
raw_events_batch: List[NFTEvent] = []
for event in tqdm(raw_events, desc="Events processed", colour="#DD6E0F"):
@ -230,7 +244,7 @@ def create_dataset(
insert_events(
datastore_conn,
enrich_from_web3(web3_client, raw_events_batch, batch_loader),
enrich_from_web3(raw_events_batch, batch_loader, bounds),
)
offset += batch_size
@ -243,7 +257,7 @@ def create_dataset(
raw_events_batch = []
logger.info("Writing remaining events to datastore")
insert_events(
datastore_conn, enrich_from_web3(web3_client, raw_events_batch, batch_loader)
datastore_conn, enrich_from_web3(raw_events_batch, batch_loader, bounds)
)
offset += len(raw_events_batch)
insert_checkpoint(