Working version of "nfts initdb" and "nfts materialize"

Started a job to collect NFT mints from production to a local SQLite
datastore.

It looks like it will take a few hours to complete.
pull/304/head
Neeraj Kashyap 2021-09-26 13:06:09 -07:00
rodzic 6439324aba
commit 897bfda684
3 zmienionych plików z 101 dodań i 42 usunięć

Wyświetl plik

@ -1,5 +1,6 @@
import argparse
import contextlib
import logging
import os
import sqlite3
from typing import Optional, Union
@ -7,11 +8,15 @@ from typing import Optional, Union
from moonstreamdb.db import yield_db_session_ctx
from web3 import Web3, IPCProvider, HTTPProvider
from .data import event_types, nft_event
from .data import event_types, nft_event, BlockBounds
from .datastore import setup_database
from .materialize import create_dataset
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def web3_connection(web3_uri: Optional[str] = None) -> Web3:
"""
Connect to the given web3 provider. You may specify a web3 provider either as a path to an IPC
@ -37,11 +42,26 @@ def handle_initdb(args: argparse.Namespace) -> None:
def handle_materialize(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
print(args)
bounds: Optional[BlockBounds] = None
if args.start is not None:
bounds = BlockBounds(starting_block=args.start, ending_block=args.end)
elif args.end is not None:
raise ValueError("You cannot set --end unless you also set --start")
logger.info(f"Materializing NFT events to datastore: {args.datastore}")
logger.info(f"Block bounds: {bounds}")
with yield_db_session_ctx() as db_session, contextlib.closing(
sqlite3.connect(args.datastore)
) as moonstream_datastore:
create_dataset(moonstream_datastore, db_session, args.web3, event_type)
create_dataset(
moonstream_datastore,
db_session,
args.web3,
event_type,
bounds,
args.batch_size,
)
def main() -> None:
@ -89,6 +109,19 @@ def main() -> None:
choices=event_types,
help="Type of event to materialize intermediate data for",
)
parser_materialize.add_argument(
"--start", type=int, default=None, help="Starting block number"
)
parser_materialize.add_argument(
"--end", type=int, default=None, help="Ending block number"
)
parser_materialize.add_argument(
"-n",
"--batch-size",
type=int,
default=1000,
help="Number of events to process per batch",
)
parser_materialize.set_defaults(func=handle_materialize)
args = parser.parse_args()

Wyświetl plik

@ -29,8 +29,8 @@ CREATE TABLE {event_tables[event_type]}
token_id TEXT,
from_address TEXT,
to_address TEXT,
value INT,
timestamp INT
value INTEGER,
timestamp INTEGER
);
"""
return creation_query
@ -72,14 +72,14 @@ def nft_event_to_tuple(event: NFTEvent) -> Tuple[Any]:
dropping e.g. the event_type field.
"""
return (
event.transaction_hash,
event.block_number,
event.nft_address,
event.token_id,
event.from_address,
event.to_address,
int(event.value),
event.timestamp,
str(event.transaction_hash),
str(event.block_number),
str(event.nft_address),
str(event.token_id),
str(event.from_address),
str(event.to_address),
str(event.value),
str(event.timestamp),
)

Wyświetl plik

@ -1,3 +1,4 @@
import logging
import sqlite3
from typing import Iterator, List, Optional
@ -7,6 +8,7 @@ from moonstreamdb.models import (
EthereumTransaction,
EthereumBlock,
)
from sqlalchemy import or_
from sqlalchemy.orm import Session
from tqdm import tqdm
from web3 import Web3
@ -14,6 +16,9 @@ from web3 import Web3
from .data import BlockBounds, EventType, NFTEvent, event_types
from .datastore import insert_events
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def enrich_from_web3(web3_client: Web3, nft_event: NFTEvent) -> NFTEvent:
"""
@ -24,6 +29,7 @@ def enrich_from_web3(web3_client: Web3, nft_event: NFTEvent) -> NFTEvent:
or nft_event.value is None
or nft_event.timestamp is None
):
logger.info("Enriching from web3")
transaction = web3_client.eth.get_transaction(nft_event.transaction_hash)
nft_event.value = transaction["value"]
nft_event.block_number = transaction["blockNumber"]
@ -32,8 +38,12 @@ def enrich_from_web3(web3_client: Web3, nft_event: NFTEvent) -> NFTEvent:
return nft_event
def get_events_from_db(
db_session: Session, event_type: EventType, bounds: Optional[BlockBounds] = None
def get_events(
db_session: Session,
web3_client: Web3,
event_type: EventType,
bounds: Optional[BlockBounds] = None,
batch_size: int = 1000,
) -> Iterator[NFTEvent]:
query = (
db_session.query(
@ -55,29 +65,46 @@ def get_events_from_db(
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(EthereumLabel.label == event_type.value)
.limit(5)
)
if bounds is not None:
bounds_filters = [
EthereumTransaction.hash == None,
EthereumTransaction.block_number >= bounds.starting_block,
]
if bounds.ending_block is not None:
bounds_filters.append(
EthereumTransaction.block_number <= bounds.ending_block
)
query = query.filter(or_(*bounds_filters))
for (
label,
address,
label_data,
transaction_hash,
value,
block_number,
timestamp,
) in query:
yield NFTEvent(
event_type=event_types[label],
nft_address=address,
token_id=label_data["tokenId"],
from_address=label_data["from"],
to_address=label_data["to"],
transaction_hash=transaction_hash,
value=value,
block_number=block_number,
timestamp=timestamp,
)
offset = 0
while True:
events = query.offset(offset).limit(batch_size).all()
if not events:
break
offset += batch_size
for (
label,
address,
label_data,
transaction_hash,
value,
block_number,
timestamp,
) in events:
raw_event = NFTEvent(
event_type=event_types[label],
nft_address=address,
token_id=label_data["tokenId"],
from_address=label_data["from"],
to_address=label_data["to"],
transaction_hash=transaction_hash,
value=value,
block_number=block_number,
timestamp=timestamp,
)
event = enrich_from_web3(web3_client, raw_event)
yield event
def create_dataset(
@ -85,20 +112,19 @@ def create_dataset(
db_session: Session,
web3_client: Web3,
event_type: EventType,
bounds: Optional[BlockBounds] = None,
batch_size: int = 1000,
) -> None:
"""
Creates Moonstream NFTs dataset in the given SQLite datastore.
"""
events = map(
lambda e: enrich_from_web3(web3_client, e),
get_events_from_db(db_session, event_type),
)
events = get_events(db_session, web3_client, event_type, bounds, batch_size)
events_batch: List[NFTEvent] = []
for event in tqdm(events):
print(event)
for event in tqdm(events, desc="Events processed", colour="#DD6E0F"):
events_batch.append(event)
if len(events_batch) == batch_size:
logger.info("Writing batch of events to datastore")
insert_events(datastore_conn, events_batch)
events_batch = []
logger.info("Writing remaining events to datastore")
insert_events(datastore_conn, events_batch)