Merge branch 'dataset-nfts' of github.com:bugout-dev/moonstream into dataset-nfts

pull/304/head
Andrey Dolgolev 2021-10-04 16:52:51 +03:00
commit b51922c13f
5 zmienionych plików z 333 dodań i 66 usunięć

Wyświetl plik

@ -3,14 +3,14 @@ import contextlib
import logging
import os
import sqlite3
from typing import Optional, Union
from typing import Optional
from moonstreamdb.db import yield_db_session_ctx
from web3 import Web3, IPCProvider, HTTPProvider
from .data import event_types, nft_event, BlockBounds
from .datastore import setup_database
from .datastore import setup_database, import_data
from .derive import current_owners, current_values_distribution
from .materialize import create_dataset, EthereumBatchloader
@ -29,6 +29,14 @@ def handle_initdb(args: argparse.Namespace) -> None:
setup_database(conn)
def handle_import_data(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
with contextlib.closing(
sqlite3.connect(args.target)
) as target_conn, contextlib.closing(sqlite3.connect(args.source)) as source_conn:
import_data(target_conn, source_conn, event_type, args.batch_size)
def handle_materialize(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
bounds: Optional[BlockBounds] = None
@ -153,6 +161,31 @@ def main() -> None:
)
parser_derive.set_defaults(func=handle_derive)
parser_import_data = subcommands.add_parser(
"import-data",
description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.",
)
parser_import_data.add_argument(
"--target", required=True, help="Datastore into which you want to import data",
)
parser_import_data.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
)
parser_import_data.add_argument(
"--type",
required=True,
choices=event_types,
help="Type of data you would like to import from source to target",
)
parser_import_data.add_argument(
"-N",
"--batch-size",
type=int,
default=10000,
help="Batch size for database commits into target datastore.",
)
parser_import_data.set_defaults(func=handle_import_data)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -3,6 +3,7 @@ Data structures used in (and as part of the maintenance of) the Moonstream NFTs
"""
from dataclasses import dataclass
from enum import Enum
from os import name
from typing import Optional
@ -15,6 +16,7 @@ class BlockBounds:
class EventType(Enum):
TRANSFER = "nft_transfer"
MINT = "nft_mint"
ERC721 = "erc721"
event_types = {event_type.value: event_type for event_type in EventType}
@ -29,6 +31,7 @@ def nft_event(raw_event: str) -> EventType:
@dataclass
class NFTEvent:
event_id: str
event_type: EventType
nft_address: str
token_id: str
@ -38,3 +41,10 @@ class NFTEvent:
value: Optional[int] = None
block_number: Optional[int] = None
timestamp: Optional[int] = None
@dataclass
class NFTMetadata:
address: str
name: str
symbol: str

Wyświetl plik

@ -4,9 +4,11 @@ a datastore for a Moonstream NFTs dataset.
"""
import logging
import sqlite3
from typing import Any, List, Tuple, Optional
from typing import Any, cast, List, Tuple, Optional
from .data import EventType, NFTEvent, nft_event
from tqdm import tqdm
from .data import EventType, NFTEvent, NFTMetadata
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -18,15 +20,19 @@ CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts
(
address TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
name TEXT,
symbol TEXT
symbol TEXT,
UNIQUE(address, name, symbol)
);
"""
BACKUP_NFTS_TABLE_QUERY = "ALTER TABLE nfts RENAME TO nfts_backup;"
DROP_BACKUP_NFTS_TABLE_QUERY = "DROP TABLE IF EXISTS nfts_backup;"
SELECT_NFTS_QUERY = "SELECT address, name, symbol FROM nfts;"
CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint
(
event_type STRING,
offset INTEGER,
transaction_hash STRING
offset INTEGER
);
"""
@ -35,6 +41,7 @@ def create_events_table_query(event_type: EventType) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {event_tables[event_type]}
(
event_id TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
transaction_hash TEXT,
block_number INTEGER,
nft_address TEXT REFERENCES nfts(address),
@ -48,6 +55,34 @@ CREATE TABLE IF NOT EXISTS {event_tables[event_type]}
return creation_query
def backup_events_table_query(event_type: EventType) -> str:
backup_query = f"ALTER TABLE {event_tables[event_type]} RENAME TO {event_tables[event_type]}_backup;"
return backup_query
def drop_backup_events_table_query(event_type: EventType) -> str:
drop_query = f"DROP TABLE IF EXISTS {event_tables[event_type]}_backup;"
return drop_query
def select_events_table_query(event_type: EventType) -> str:
selection_query = f"""
SELECT
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
from_address,
to_address,
transaction_value,
timestamp
FROM {event_tables[event_type]};
"""
return selection_query
def setup_database(conn: sqlite3.Connection) -> None:
"""
Sets up the schema of the Moonstream NFTs dataset in the given SQLite database.
@ -67,7 +102,8 @@ def insert_events_query(event_type: EventType) -> str:
Generates a query which inserts NFT events into the appropriate events table.
"""
query = f"""
INSERT INTO {event_tables[event_type]}(
INSERT OR IGNORE INTO {event_tables[event_type]}(
event_id,
transaction_hash,
block_number,
nft_address,
@ -76,19 +112,20 @@ INSERT INTO {event_tables[event_type]}(
to_address,
transaction_value,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
return query
def nft_event_to_tuple(
event: NFTEvent,
) -> Tuple[str, str, str, str, str, str, str, str]:
) -> Tuple[str, str, str, str, str, str, str, str, str]:
"""
Converts an NFT event into a tuple for use with sqlite cursor executemany. This includes
dropping e.g. the event_type field.
"""
return (
str(event.event_id),
str(event.transaction_hash),
str(event.block_number),
str(event.nft_address),
@ -112,21 +149,55 @@ def get_checkpoint_offset(
return None
def insert_checkpoint(
conn: sqlite3.Connection, event_type: EventType, offset: int, transaction_hash: str
):
def delete_checkpoints(
conn: sqlite3.Connection, event_type: EventType, commit: bool = True
) -> None:
cur = conn.cursor()
cur.execute(f"DELETE FROM checkpoint where event_type='{event_type.value}';")
if commit:
try:
conn.commit()
except:
conn.rollback()
raise
def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int):
query = f"""
INSERT INTO checkpoint (
event_type,
offset,
transaction_hash
) VALUES (?, ?, ?)
INSERT INTO checkpoint (
event_type,
offset
) VALUES (?, ?)
"""
cur = conn.cursor()
cur.execute(query, [event_type.value, offset, transaction_hash])
cur.execute(query, [event_type.value, offset])
conn.commit()
def insert_address_metadata(
conn: sqlite3.Connection, metadata_list: List[NFTMetadata]
) -> None:
cur = conn.cursor()
query = f"""
INSERT INTO nfts (
address,
name,
symbol
) VALUES (?, ?, ?)
"""
try:
nfts = [
(metadata.address, metadata.name, metadata.symbol)
for metadata in metadata_list
]
cur.executemany(query, nfts)
conn.commit()
except Exception as e:
logger.error(f"Failed to save :\n {metadata_list}")
conn.rollback()
raise e
def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
"""
Inserts the given events into the appropriate events table in the given SQLite database.
@ -155,3 +226,99 @@ def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
logger.error(f"FAILED TO SAVE :{events}")
conn.rollback()
raise e
def import_data(
target_conn: sqlite3.Connection,
source_conn: sqlite3.Connection,
event_type: EventType,
batch_size: int = 1000,
) -> None:
"""
Imports the data correspondong to the given event type from the source database into the target
database.
Any existing data of that type in the target database is first deleted. It is a good idea to
create a backup copy of your target database before performing this operation.
"""
target_cur = target_conn.cursor()
drop_backup_query = DROP_BACKUP_NFTS_TABLE_QUERY
backup_table_query = BACKUP_NFTS_TABLE_QUERY
create_table_query = CREATE_NFTS_TABLE_QUERY
source_selection_query = SELECT_NFTS_QUERY
if event_type != EventType.ERC721:
drop_backup_query = drop_backup_events_table_query(event_type)
backup_table_query = backup_events_table_query(event_type)
create_table_query = create_events_table_query(event_type)
source_selection_query = select_events_table_query(event_type)
target_cur.execute(drop_backup_query)
target_cur.execute(backup_table_query)
target_cur.execute(create_table_query)
target_conn.commit()
source_cur = source_conn.cursor()
source_cur.execute(source_selection_query)
batch: List[Any] = []
for row in tqdm(source_cur, desc="Rows processed"):
if event_type == EventType.ERC721:
batch.append(NFTMetadata(*cast(Tuple[str, str, str], row)))
else:
(
event_id,
nft_address,
token_id,
from_address,
to_address,
transaction_hash,
value,
block_number,
timestamp,
) = cast(
Tuple[
str,
str,
str,
str,
str,
str,
Optional[int],
Optional[int],
Optional[int],
],
row,
)
event = NFTEvent(
event_id,
event_type, # Original argument to this function
nft_address,
token_id,
from_address,
to_address,
transaction_hash,
value,
block_number,
timestamp,
)
batch.append(event)
if len(batch) == batch_size:
if event_type == EventType.ERC721:
insert_address_metadata(target_conn, cast(List[NFTMetadata], batch))
else:
insert_events(target_conn, cast(List[NFTEvent], batch))
if event_type == EventType.ERC721:
insert_address_metadata(target_conn, cast(List[NFTMetadata], batch))
else:
insert_events(target_conn, cast(List[NFTEvent], batch))
target_cur.execute(CREATE_CHECKPOINT_TABLE_QUERY)
target_conn.commit()
source_offset = get_checkpoint_offset(source_conn, event_type)
if source_offset is not None:
delete_checkpoints(target_conn, event_type, commit=False)
insert_checkpoint(target_conn, event_type, source_offset)

Wyświetl plik

@ -16,8 +16,13 @@ from tqdm import tqdm
from web3 import Web3
import requests
from .data import BlockBounds, EventType, NFTEvent, event_types
from .datastore import get_checkpoint_offset, insert_checkpoint, insert_events
from .data import BlockBounds, EventType, NFTEvent, NFTMetadata, event_types
from .datastore import (
get_checkpoint_offset,
insert_address_metadata,
insert_checkpoint,
insert_events,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -147,15 +152,28 @@ def enrich_from_web3(
return admissible_events
def get_events(
def add_events(
datastore_conn: sqlite3.Connection,
db_session: Session,
event_type: EventType,
batch_loader: EthereumBatchloader,
initial_offset=0,
bounds: Optional[BlockBounds] = None,
initial_offset: int = 0,
batch_size: int = 1000,
) -> Iterator[NFTEvent]:
batch_size: int = 10,
) -> None:
raw_created_at_list = (
db_session.query(EthereumLabel.created_at)
.filter(EthereumLabel.label == event_type.value)
.order_by(EthereumLabel.created_at.asc())
.distinct(EthereumLabel.created_at)
).all()
created_at_list = [
created_at[0] for created_at in raw_created_at_list[initial_offset:]
]
query = (
db_session.query(
EthereumLabel.id,
EthereumLabel.label,
EthereumAddress.address,
EthereumLabel.label_data,
@ -164,6 +182,7 @@ def get_events(
EthereumTransaction.block_number,
EthereumBlock.timestamp,
)
.filter(EthereumLabel.label == event_type.value)
.join(EthereumAddress, EthereumLabel.address_id == EthereumAddress.id)
.outerjoin(
EthereumTransaction,
@ -173,11 +192,8 @@ def get_events(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(EthereumLabel.label == event_type.value)
.order_by(
EthereumLabel.created_at.asc(),
EthereumLabel.transaction_hash.asc(),
EthereumLabel.address_id.asc(),
)
)
if bounds is not None:
@ -187,13 +203,23 @@ def get_events(
bounds_filters = [EthereumTransaction.hash == None, and_(*time_filters)]
query = query.filter(or_(*bounds_filters))
offset = initial_offset
while True:
events = query.offset(offset).limit(batch_size).all()
pbar = tqdm(total=(len(raw_created_at_list)))
pbar.set_description(f"Processing created ats")
pbar.update(initial_offset)
batch_start = 0
batch_end = batch_start + batch_size
while batch_start <= len(created_at_list):
events = query.filter(
EthereumLabel.created_at.in_(created_at_list[batch_start : batch_end + 1])
).all()
if not events:
break
offset += batch_size
continue
raw_events_batch = []
for (
event_id,
label,
address,
label_data,
@ -203,6 +229,7 @@ def get_events(
timestamp,
) in events:
raw_event = NFTEvent(
event_id=event_id,
event_type=event_types[label],
nft_address=address,
token_id=label_data["tokenId"],
@ -213,7 +240,16 @@ def get_events(
block_number=block_number,
timestamp=timestamp,
)
yield raw_event
raw_events_batch.append(raw_event)
logger.info(f"Adding {len(raw_events_batch)} to database")
insert_events(
datastore_conn, raw_events_batch
) # TODO REMOVED WEB3 enrich, since node is down
insert_checkpoint(datastore_conn, event_type, batch_end + initial_offset)
pbar.update(batch_end - batch_start + 1)
batch_start = batch_end + 1
batch_end = min(batch_end + batch_size, len(created_at_list))
def create_dataset(
@ -222,7 +258,7 @@ def create_dataset(
event_type: EventType,
batch_loader: EthereumBatchloader,
bounds: Optional[BlockBounds] = None,
batch_size: int = 1000,
batch_size: int = 10,
) -> None:
"""
Creates Moonstream NFTs dataset in the given SQLite datastore.
@ -234,35 +270,52 @@ def create_dataset(
offset = 0
logger.info(f"Did not found any checkpoint for {event_type.value}")
raw_events = get_events(db_session, event_type, bounds, offset, batch_size)
raw_events_batch: List[NFTEvent] = []
if event_type == EventType.ERC721:
add_contracts_metadata(datastore_conn, db_session, offset, batch_size)
else:
add_events(
datastore_conn,
db_session,
event_type,
batch_loader,
offset,
bounds,
batch_size,
)
for event in tqdm(raw_events, desc="Events processed", colour="#DD6E0F"):
raw_events_batch.append(event)
if len(raw_events_batch) == batch_size:
logger.info("Writing batch of events to datastore")
insert_events(
datastore_conn,
enrich_from_web3(raw_events_batch, batch_loader, bounds),
)
offset += batch_size
insert_checkpoint(
datastore_conn,
event_type,
offset,
event.transaction_hash,
)
raw_events_batch = []
logger.info("Writing remaining events to datastore")
insert_events(
datastore_conn, enrich_from_web3(raw_events_batch, batch_loader, bounds)
)
offset += len(raw_events_batch)
insert_checkpoint(
datastore_conn,
event_type,
offset,
raw_events_batch[-1].transaction_hash,
def add_contracts_metadata(
datastore_conn: sqlite3.Connection,
db_session: Session,
initial_offset: int = 0,
batch_size: int = 1000,
) -> None:
logger.info("Adding erc721 contract metadata")
query = (
db_session.query(EthereumLabel.label_data, EthereumAddress.address)
.filter(EthereumLabel.label == EventType.ERC721.value)
.join(EthereumAddress, EthereumLabel.address_id == EthereumAddress.id)
.order_by(EthereumLabel.created_at, EthereumLabel.address_id)
)
offset = initial_offset
while True:
events = query.offset(offset).limit(batch_size).all()
if not events:
break
offset += len(events)
events_batch: List[NFTMetadata] = []
for label_data, address in events:
events_batch.append(
NFTMetadata(
address=address,
name=label_data.get("name", None),
symbol=label_data.get("symbol", None),
)
)
insert_address_metadata(datastore_conn, events_batch)
insert_checkpoint(datastore_conn, EventType.ERC721, offset)
logger.info(f"Already added {offset}")
logger.info(f"Added total of {offset-initial_offset} nfts metadata")

Wyświetl plik

@ -41,5 +41,9 @@ setup(
"dev": ["black", "mypy", "types-requests"],
"distribute": ["setuptools", "twine", "wheel"],
},
entry_points={"console_scripts": []},
entry_points={
"console_scripts": [
"nfts=nfts.cli:main",
]
},
)