dataset builder update

pull/617/head
Yhtyyar Sahatov 2022-03-07 18:08:44 +03:00
rodzic 301f3bb745
commit 0b8ab06865
5 zmienionych plików z 608 dodań i 922 usunięć

Wyświetl plik

@ -1,5 +1,6 @@
import argparse
import contextlib
from enum import Enum
import logging
import os
import sqlite3
@ -7,10 +8,10 @@ from shutil import copyfile
from typing import Optional
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumLabel, PolygonLabel
from .enrich import EthereumBatchloader, enrich
from .data import EventType, event_types, nft_event, BlockBounds
from .datastore import setup_database, import_data, filter_data
from .data import BlockBounds
from .datastore import setup_database
from .derive import (
current_owners,
current_market_values,
@ -22,7 +23,7 @@ from .derive import (
transfer_holding_times,
transfers_mints_connection_table,
)
from .materialize import create_dataset
from .materialize import crawl_erc721_labels
logging.basicConfig(level=logging.INFO)
@ -42,94 +43,43 @@ derive_functions = {
}
class Blockchain(Enum):
ETHEREUM = "ethereum"
POLYGON = "polygon"
def handle_initdb(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as conn:
setup_database(conn)
def handle_import_data(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
with contextlib.closing(
sqlite3.connect(args.target)
) as target_conn, contextlib.closing(sqlite3.connect(args.source)) as source_conn:
import_data(target_conn, source_conn, event_type, args.batch_size)
def handle_filter_data(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.source)) as source_conn:
if args.target == args.source and args.source is not None:
sqlite_path = f"{args.target}.dump"
else:
sqlite_path = args.target
print(f"Creating new database:{sqlite_path}")
copyfile(args.source, sqlite_path)
# do connection
with contextlib.closing(sqlite3.connect(sqlite_path)) as source_conn:
print("Start filtering")
filter_data(
source_conn,
start_time=args.start_time,
end_time=args.end_time,
)
print("Filtering end.")
for index, function_name in enumerate(derive_functions.keys()):
print(
f"Derive process {function_name} {index+1}/{len(derive_functions.keys())}"
)
derive_functions[function_name](source_conn)
# Apply derive to new data
def handle_materialize(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
bounds: Optional[BlockBounds] = None
if args.start is not None:
bounds = BlockBounds(starting_block=args.start, ending_block=args.end)
elif args.end is not None:
raise ValueError("You cannot set --end unless you also set --start")
batch_loader = EthereumBatchloader(jsonrpc_url=args.jsonrpc)
logger.info(f"Materializing NFT events to datastore: {args.datastore}")
logger.info(f"Block bounds: {bounds}")
label_model = (
EthereumLabel if args.blockchain == Blockchain.ETHEREUM else PolygonLabel
)
print(label_model)
with yield_db_session_ctx() as db_session, contextlib.closing(
sqlite3.connect(args.datastore)
) as moonstream_datastore:
create_dataset(
moonstream_datastore,
crawl_erc721_labels(
db_session,
event_type,
bounds,
args.batch_size,
)
def handle_enrich(args: argparse.Namespace) -> None:
batch_loader = EthereumBatchloader(jsonrpc_url=args.jsonrpc)
logger.info(f"Enriching NFT events in datastore: {args.datastore}")
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
enrich(
moonstream_datastore,
EventType.TRANSFER,
batch_loader,
args.batch_size,
)
enrich(
moonstream_datastore,
EventType.MINT,
batch_loader,
args.batch_size,
label_model,
start_block=bounds.starting_block,
end_block=bounds.starting_block + 500000,
batch_size=10000,
)
@ -186,18 +136,7 @@ def main() -> None:
required=True,
help="Path to SQLite database representing the dataset",
)
parser_materialize.add_argument(
"--jsonrpc",
default=default_web3_provider,
type=str,
help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
)
parser_materialize.add_argument(
"-t",
"--type",
choices=event_types,
help="Type of event to materialize intermediate data for",
)
parser_materialize.add_argument(
"--start", type=int, default=None, help="Starting block number"
)
@ -211,6 +150,14 @@ def main() -> None:
default=1000,
help="Number of events to process per batch",
)
parser_materialize.add_argument(
"--blockchain",
type=Blockchain,
choices=[Blockchain.ETHEREUM, Blockchain.POLYGON],
help="Blockchain to use",
)
parser_materialize.set_defaults(func=handle_materialize)
parser_derive = subcommands.add_parser(
@ -231,86 +178,6 @@ def main() -> None:
)
parser_derive.set_defaults(func=handle_derive)
parser_import_data = subcommands.add_parser(
"import-data",
description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.",
)
parser_import_data.add_argument(
"--target",
required=True,
help="Datastore into which you want to import data",
)
parser_import_data.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
)
parser_import_data.add_argument(
"--type",
required=True,
choices=event_types,
help="Type of data you would like to import from source to target",
)
parser_import_data.add_argument(
"-N",
"--batch-size",
type=int,
default=10000,
help="Batch size for database commits into target datastore.",
)
parser_import_data.set_defaults(func=handle_import_data)
# Create dump of filtered data
parser_filtered_copy = subcommands.add_parser(
"filter-data",
description="Create copy of database with applied filters.",
)
parser_filtered_copy.add_argument(
"--target",
required=True,
help="Datastore into which you want to import data",
)
parser_filtered_copy.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
)
parser_filtered_copy.add_argument(
"--start-time",
required=False,
type=int,
help="Start timestamp.",
)
parser_filtered_copy.add_argument(
"--end-time",
required=False,
type=int,
help="End timestamp.",
)
parser_filtered_copy.set_defaults(func=handle_filter_data)
parser_enrich = subcommands.add_parser(
"enrich", description="enrich dataset from geth node"
)
parser_enrich.add_argument(
"-d",
"--datastore",
required=True,
help="Path to SQLite database representing the dataset",
)
parser_enrich.add_argument(
"--jsonrpc",
default=default_web3_provider,
type=str,
help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
)
parser_enrich.add_argument(
"-n",
"--batch-size",
type=int,
default=1000,
help="Number of events to process per batch",
)
parser_enrich.set_defaults(func=handle_enrich)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -4,7 +4,7 @@ Data structures used in (and as part of the maintenance of) the Moonstream NFTs
from dataclasses import dataclass
from enum import Enum
from os import name
from typing import Optional
from typing import Any, Dict, Optional, Union
@dataclass
@ -13,38 +13,59 @@ class BlockBounds:
ending_block: Optional[int] = None
class EventType(Enum):
TRANSFER = "nft_transfer"
MINT = "nft_mint"
ERC721 = "erc721"
event_types = {event_type.value: event_type for event_type in EventType}
def nft_event(raw_event: str) -> EventType:
try:
return event_types[raw_event]
except KeyError:
raise ValueError(f"Unknown nft event type: {raw_event}")
@dataclass
class NftTransaction:
blockchain_type: str
block_number: int
block_timestamp: int
transaction_hash: str
contract_address: str
caller_address: str
function_name: str
function_args: Union[Dict[str, Any], str]
gas_used: int
gas_price: int
value: int
status: int
max_fee_per_gas: Optional[int] = None
max_priority_fee_per_gas: Optional[int] = None
@dataclass
class NFTEvent:
event_id: str
event_type: EventType
nft_address: str
class NftApprovalEvent:
blockchain_type: str
owner: str
approved: str
token_id: str
transaction_hash: str
log_index: int
@dataclass
class NftApprovalForAllEvent:
blockchain_type: str
owner: str
approved: str
operator: str
transaction_hash: str
log_index: int
@dataclass
class NftTransferEvent:
blockchain_type: str
from_address: str
to_address: str
token_id: str
transaction_hash: str
value: Optional[int] = None
block_number: Optional[int] = None
timestamp: Optional[int] = None
log_index: int
@dataclass
class NFTMetadata:
address: str
name: str
symbol: str
class Erc20TransferEvent:
blockchain_type: str
from_address: str
to_address: str
value: int
transaction_hash: str
log_index: int

Wyświetl plik

@ -2,195 +2,370 @@
This module provides tools to interact with and maintain a SQLite database which acts/should act as
a datastore for a Moonstream NFTs dataset.
"""
from ctypes import Union
import json
import logging
import sqlite3
from typing import Any, cast, List, Tuple, Optional
from tqdm import tqdm
from .data import EventType, NFTEvent, NFTMetadata
from .data import (
NftTransaction,
NftApprovalEvent,
NftTransferEvent,
NftApprovalForAllEvent,
Erc20TransferEvent,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"}
CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts
(
address TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
name TEXT,
symbol TEXT,
UNIQUE(address, name, symbol)
);
"""
BACKUP_NFTS_TABLE_QUERY = "ALTER TABLE nfts RENAME TO nfts_backup;"
DROP_BACKUP_NFTS_TABLE_QUERY = "DROP TABLE IF EXISTS nfts_backup;"
SELECT_NFTS_QUERY = "SELECT address, name, symbol FROM nfts;"
CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint
(
event_type STRING,
offset INTEGER
);
"""
def create_events_table_query(event_type: EventType) -> str:
def create_transactions_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {event_tables[event_type]}
(
event_id TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
transaction_hash TEXT,
block_number INTEGER,
nft_address TEXT REFERENCES nfts(address),
token_id TEXT,
from_address TEXT,
to_address TEXT,
transaction_value INTEGER,
timestamp INTEGER
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
transactionHash TEXT NOT NULL,
blockNumber INTEGER NOT NULL,
blockTimestamp INTEGER NOT NULL,
contractAddress TEXT,
from_address TEXT NOT NULL,
functionName TEXT NOT NULL,
functionArgs JSON NOT NULL,
value INTEGER NOT NULL,
gasUsed INTEGER NOT NULL,
gasPrice INTEGER NOT NULL,
maxFeePerGas INTEGER,
maxPriorityFeePerGas INTEGER,
UNIQUE(blockchainType, transactionHash)
);
"""
return creation_query
def backup_events_table_query(event_type: EventType) -> str:
backup_query = f"ALTER TABLE {event_tables[event_type]} RENAME TO {event_tables[event_type]}_backup;"
return backup_query
def drop_backup_events_table_query(event_type: EventType) -> str:
drop_query = f"DROP TABLE IF EXISTS {event_tables[event_type]}_backup;"
return drop_query
def select_events_table_query(event_type: EventType) -> str:
selection_query = f"""
SELECT
event_id,
transaction_hash,
nft_address,
token_id,
from_address,
to_address,
transaction_value,
block_number,
timestamp
FROM {event_tables[event_type]};
def create_approvals_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
owner TEXT NOT NULL,
approved TEXT NOT NULL,
tokenId TEXT NOT NULL,
transactionHash TEXT NOT NULL,
logIndex INTEGER NOT NULL,
UNIQUE(blockchainType, transactionHash, logIndex)
);
"""
return selection_query
return creation_query
def get_events_for_enrich(
conn: sqlite3.Connection, event_type: EventType
) -> List[NFTEvent]:
def select_query(event_type: EventType) -> str:
selection_query = f"""
SELECT
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
def create_approval_for_all_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
owner TEXT NOT NULL,
approved BOOL NOT NULL,
operator TEXT NOT NULL,
transactionHash TEXT NOT NULL,
logIndex INTEGER NOT NULL,
UNIQUE(blockchainType, transactionHash, logIndex)
);
"""
return creation_query
def create_transfers_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
from_address TEXT NOT NULL,
to_address TEXT NOT NULL,
tokenId TEXT NOT NULL,
transactionHash TEXT NOT NULL,
logIndex INTEGER NOT NULL,
UNIQUE(blockchainType, transactionHash, logIndex)
);
"""
return creation_query
def create_erc20_transfers_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
from_address TEXT NOT NULL,
to_address TEXT NOT NULL,
value INTEGER NOT NULL,
transactionHash TEXT NOT NULL,
logIndex INTEGER NOT NULL,
UNIQUE(blockchainType, transactionHash, logIndex)
);
"""
return creation_query
def insertTransactionQuery(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
transactionHash,
blockNumber,
blockTimestamp,
contractAddress,
from_address,
functionName,
functionArgs,
value,
gasUsed,
gasPrice,
maxFeePerGas,
maxPriorityFeePerGas
)
VALUES
(
?,?,?,?,?,?,?,?,?,?,?,?,?
);
"""
return query
def insert_nft_approval_query(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
owner,
approved,
tokenId,
transactionHash,
logIndex
)
VALUES
(
?,?,?,?,?,?
);
"""
return query
def insert_nft_approval_for_all_query(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
owner,
approved,
operator,
transactionHash,
logIndex
)
VALUES
(
?,?,?,?,?,?
);
"""
return query
def insert_nft_transfers_query(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
from_address,
to_address,
transaction_value,
timestamp
FROM {event_tables[event_type]} WHERE block_number = 'None';
"""
tokenId,
transactionHash,
logIndex
)
VALUES
return selection_query
logger.info(f"Loading {event_tables[event_type]} table events for enrich")
cur = conn.cursor()
cur.execute(select_query(event_type))
events: List[NFTEvent] = []
for row in cur:
(
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
from_address,
to_address,
value,
timestamp,
) = cast(
Tuple[
str,
str,
Optional[int],
str,
str,
str,
str,
Optional[int],
Optional[int],
],
row,
)
event = NFTEvent(
event_id=event_id,
event_type=event_type, # Original argument to this function
nft_address=nft_address,
token_id=token_id,
from_address=from_address,
to_address=to_address,
transaction_hash=transaction_hash,
value=value,
block_number=block_number,
timestamp=timestamp,
)
events.append(event)
logger.info(f"Found {len(events)} events to enrich")
return events
(
?,?,?,?,?,?
);
"""
return query
def update_events_batch(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
def replace_query(event_type: EventType) -> str:
query = f"""
REPLACE INTO {event_tables[event_type]}(
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
def insert_erc20_transfer_query(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
from_address,
to_address,
transaction_value,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
return query
value,
transactionHash,
logIndex
)
VALUES
(
?,?,?,?,?,?
);
"""
return query
logger.info("Updating events in sqlite")
def create_blockchain_type_index_query(tabel_name) -> str:
creation_query = f"""
CREATE INDEX IF NOT EXISTS {tabel_name}_blockchainType ON {tabel_name} (blockchainType);
"""
return creation_query
def nft_transaction_to_tuple(nft_transaction: NftTransaction) -> Tuple[Any]:
"""
Converts a NftTransaction object to a tuple which can be inserted into the database.
"""
return (
nft_transaction.blockchain_type,
nft_transaction.transaction_hash,
nft_transaction.block_number,
nft_transaction.block_timestamp,
nft_transaction.contract_address,
nft_transaction.caller_address,
nft_transaction.function_name,
json.dumps(nft_transaction.function_args),
str(nft_transaction.value),
str(nft_transaction.gas_used),
str(nft_transaction.gas_price),
str(nft_transaction.max_fee_per_gas),
str(nft_transaction.max_priority_fee_per_gas),
)
def nft_approval_to_tuple(nft_approval: NftApprovalEvent) -> Tuple[Any]:
"""
Converts a NftApprovalEvent object to a tuple which can be inserted into the database.
"""
return (
nft_approval.blockchain_type,
nft_approval.owner,
nft_approval.approved,
str(nft_approval.token_id),
nft_approval.transaction_hash,
nft_approval.log_index,
)
def nft_approval_for_all_to_tuple(
nft_approval_for_all: NftApprovalForAllEvent,
) -> Tuple[Any]:
"""
Converts a NftApprovalForAllEvent object to a tuple which can be inserted into the database.
"""
return (
nft_approval_for_all.blockchain_type,
nft_approval_for_all.owner,
nft_approval_for_all.approved,
nft_approval_for_all.operator,
nft_approval_for_all.transaction_hash,
nft_approval_for_all.log_index,
)
def nft_transfer_to_tuple(nft_transfer: NftTransferEvent) -> Tuple[Any]:
"""
Converts a NftTransferEvent object to a tuple which can be inserted into the database.
"""
return (
nft_transfer.blockchain_type,
nft_transfer.from_address,
nft_transfer.to_address,
str(nft_transfer.token_id),
nft_transfer.transaction_hash,
nft_transfer.log_index,
)
def erc20_nft_transfer_to_tuple(
erc20_nft_transfer: Erc20TransferEvent,
) -> Tuple[Any]:
"""
Converts a Erc20NftTransferEvent object to a tuple which can be inserted into the database.
"""
return (
erc20_nft_transfer.blockchain_type,
erc20_nft_transfer.from_address,
erc20_nft_transfer.to_address,
str(erc20_nft_transfer.value),
erc20_nft_transfer.transaction_hash,
erc20_nft_transfer.log_index,
)
def insert_transactions(
conn: sqlite3.Connection, transactions: List[NftTransaction]
) -> None:
"""
Inserts the given NftTransaction objects into the database.
"""
cur = conn.cursor()
try:
transfers = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.TRANSFER
]
mints = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.MINT
]
query = insertTransactionQuery("transactions")
cur.executemany(replace_query(EventType.TRANSFER), transfers)
cur.executemany(replace_query(EventType.MINT), mints)
cur.executemany(
query,
[nft_transaction_to_tuple(nft_transaction) for nft_transaction in transactions],
)
conn.commit()
except Exception as e:
logger.error(f"FAILED TO replace!!! :{events}")
conn.rollback()
raise e
conn.commit()
def insert_events(
conn: sqlite3.Connection,
events: list,
) -> None:
"""
Inserts the given NftApprovalForAllEvent, NftApprovalEvent, or NftTransferEvent objects into the database.
"""
cur = conn.cursor()
nft_transfers = []
erc20_transfers = []
approvals = []
approvals_for_all = []
for event in events:
if isinstance(event, NftApprovalEvent):
approvals.append(nft_approval_to_tuple(event))
elif isinstance(event, NftApprovalForAllEvent):
approvals_for_all.append(nft_approval_for_all_to_tuple(event))
elif isinstance(event, NftTransferEvent):
nft_transfers.append(nft_transfer_to_tuple(event))
elif isinstance(event, Erc20TransferEvent):
erc20_transfers.append(erc20_nft_transfer_to_tuple(event))
else:
raise ValueError(f"Unknown event type: {type(event)}")
if len(nft_transfers) > 0:
query = insert_nft_transfers_query("transfers")
cur.executemany(
query,
nft_transfers,
)
if len(approvals) > 0:
query = insert_nft_approval_query("approvals")
cur.executemany(
query,
approvals,
)
if len(approvals_for_all) > 0:
query = insert_nft_approval_for_all_query("approvals_for_all")
cur.executemany(query, approvals_for_all)
if len(erc20_transfers) > 0:
query = insert_erc20_transfer_query("erc20_transfers")
cur.executemany(query, erc20_transfers)
conn.commit()
def setup_database(conn: sqlite3.Connection) -> None:
@ -199,266 +374,16 @@ def setup_database(conn: sqlite3.Connection) -> None:
"""
cur = conn.cursor()
cur.execute(CREATE_NFTS_TABLE_QUERY)
cur.execute(create_events_table_query(EventType.TRANSFER))
cur.execute(create_events_table_query(EventType.MINT))
cur.execute(CREATE_CHECKPOINT_TABLE_QUERY)
cur.execute(create_transactions_table_query("transactions"))
cur.execute(create_approvals_table_query("approvals"))
cur.execute(create_approval_for_all_table_query("approvals_for_all"))
cur.execute(create_transfers_table_query("transfers"))
cur.execute(create_erc20_transfers_table_query("erc20_transfers"))
cur.execute(create_blockchain_type_index_query("transactions"))
cur.execute(create_blockchain_type_index_query("approvals"))
cur.execute(create_blockchain_type_index_query("approvals_for_all"))
cur.execute(create_blockchain_type_index_query("transfers"))
cur.execute(create_blockchain_type_index_query("erc20_transfers"))
conn.commit()
def insert_events_query(event_type: EventType) -> str:
"""
Generates a query which inserts NFT events into the appropriate events table.
"""
query = f"""
INSERT OR IGNORE INTO {event_tables[event_type]}(
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
from_address,
to_address,
transaction_value,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
return query
def nft_event_to_tuple(
event: NFTEvent,
) -> Tuple[str, str, str, str, str, str, str, str, str]:
"""
Converts an NFT event into a tuple for use with sqlite cursor executemany. This includes
dropping e.g. the event_type field.
"""
return (
str(event.event_id),
str(event.transaction_hash),
str(event.block_number),
str(event.nft_address),
str(event.token_id),
str(event.from_address),
str(event.to_address),
str(event.value),
str(event.timestamp),
)
def get_checkpoint_offset(
conn: sqlite3.Connection, event_type: EventType
) -> Optional[int]:
cur = conn.cursor()
response = cur.execute(
f"SELECT * from checkpoint where event_type='{event_type.value}' order by rowid desc limit 1"
)
for row in response:
return row[1]
return None
def delete_checkpoints(
conn: sqlite3.Connection, event_type: EventType, commit: bool = True
) -> None:
cur = conn.cursor()
cur.execute(f"DELETE FROM checkpoint where event_type='{event_type.value}';")
if commit:
try:
conn.commit()
except:
conn.rollback()
raise
def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int):
query = f"""
INSERT INTO checkpoint (
event_type,
offset
) VALUES (?, ?)
"""
cur = conn.cursor()
cur.execute(query, [event_type.value, offset])
conn.commit()
def insert_address_metadata(
conn: sqlite3.Connection, metadata_list: List[NFTMetadata]
) -> None:
cur = conn.cursor()
query = f"""
INSERT INTO nfts (
address,
name,
symbol
) VALUES (?, ?, ?)
"""
try:
nfts = [
(metadata.address, metadata.name, metadata.symbol)
for metadata in metadata_list
]
cur.executemany(query, nfts)
conn.commit()
except Exception as e:
logger.error(f"Failed to save :\n {metadata_list}")
conn.rollback()
raise e
def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
"""
Inserts the given events into the appropriate events table in the given SQLite database.
This method works with batches of events.
"""
cur = conn.cursor()
try:
transfers = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.TRANSFER
]
mints = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.MINT
]
cur.executemany(insert_events_query(EventType.TRANSFER), transfers)
cur.executemany(insert_events_query(EventType.MINT), mints)
conn.commit()
except Exception as e:
logger.error(f"FAILED TO SAVE :{events}")
conn.rollback()
raise e
def import_data(
target_conn: sqlite3.Connection,
source_conn: sqlite3.Connection,
event_type: EventType,
batch_size: int = 1000,
) -> None:
"""
Imports the data correspondong to the given event type from the source database into the target
database.
Any existing data of that type in the target database is first deleted. It is a good idea to
create a backup copy of your target database before performing this operation.
"""
target_cur = target_conn.cursor()
drop_backup_query = DROP_BACKUP_NFTS_TABLE_QUERY
backup_table_query = BACKUP_NFTS_TABLE_QUERY
create_table_query = CREATE_NFTS_TABLE_QUERY
source_selection_query = SELECT_NFTS_QUERY
if event_type != EventType.ERC721:
drop_backup_query = drop_backup_events_table_query(event_type)
backup_table_query = backup_events_table_query(event_type)
create_table_query = create_events_table_query(event_type)
source_selection_query = select_events_table_query(event_type)
target_cur.execute(drop_backup_query)
target_cur.execute(backup_table_query)
target_cur.execute(create_table_query)
target_conn.commit()
source_cur = source_conn.cursor()
source_cur.execute(source_selection_query)
batch: List[Any] = []
for row in tqdm(source_cur, desc="Rows processed"):
if event_type == EventType.ERC721:
batch.append(NFTMetadata(*cast(Tuple[str, str, str], row)))
else:
# Order matches select query returned by select_events_table_query
(
event_id,
transaction_hash,
nft_address,
token_id,
from_address,
to_address,
value,
block_number,
timestamp,
) = cast(
Tuple[
str,
str,
str,
str,
str,
str,
Optional[int],
Optional[int],
Optional[int],
],
row,
)
event = NFTEvent(
event_id=event_id,
event_type=event_type, # Original argument to this function
nft_address=nft_address,
token_id=token_id,
from_address=from_address,
to_address=to_address,
transaction_hash=transaction_hash,
value=value,
block_number=block_number,
timestamp=timestamp,
)
batch.append(event)
if len(batch) == batch_size:
if event_type == EventType.ERC721:
insert_address_metadata(target_conn, cast(List[NFTMetadata], batch))
else:
insert_events(target_conn, cast(List[NFTEvent], batch))
if event_type == EventType.ERC721:
insert_address_metadata(target_conn, cast(List[NFTMetadata], batch))
else:
insert_events(target_conn, cast(List[NFTEvent], batch))
target_cur.execute(CREATE_CHECKPOINT_TABLE_QUERY)
target_conn.commit()
source_offset = get_checkpoint_offset(source_conn, event_type)
if source_offset is not None:
delete_checkpoints(target_conn, event_type, commit=False)
insert_checkpoint(target_conn, event_type, source_offset)
def filter_data(
sqlite_db: sqlite3.Connection,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
):
"""
Run Deletes query depends on filters
"""
cur = sqlite_db.cursor()
print(f"Remove by timestamp < {start_time}")
if start_time:
cur.execute(f"DELETE from transfers where timestamp < {start_time}")
print(f"Transfers filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp < {start_time}")
print(f"Mints filtered out: {cur.rowcount}")
sqlite_db.commit()
print(f"Remove by timestamp > {end_time}")
if end_time:
cur.execute(f"DELETE from transfers where timestamp > {end_time}")
print(f"Transfers filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp > {end_time}")
print(f"Mints filtered out: {cur.rowcount}")
sqlite_db.commit()

Wyświetl plik

@ -6,156 +6,4 @@ import json
from tqdm import tqdm
import requests
from .data import BlockBounds, EventType, NFTEvent, event_types
from .datastore import (
get_checkpoint_offset,
get_events_for_enrich,
insert_address_metadata,
insert_checkpoint,
insert_events,
update_events_batch,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EthereumBatchloader:
def __init__(self, jsonrpc_url) -> None:
self.jsonrpc_url = jsonrpc_url
self.message_number = 0
self.commands: List[Any] = []
self.requests_banch: List[Any] = []
def load_blocks(self, block_list: List[int], with_transactions: bool):
"""
Request list of blocks
"""
rpc = [
{
"jsonrpc": "2.0",
"id": index,
"method": "eth_getBlockByNumber",
"params": params_single,
}
for index, params_single in enumerate(
[[hex(block_number), with_transactions] for block_number in block_list]
)
]
response = self.send_json_message(rpc)
return response
def load_transactions(self, transaction_hashes: List[str]):
"""
Request list of transactions
"""
rpc = [
{
"jsonrpc": "2.0",
"method": "eth_getTransactionByHash",
"id": index,
"params": [tx_hash],
}
for index, tx_hash in enumerate(transaction_hashes)
]
response = self.send_json_message(rpc)
return response
def send_message(self, payload):
headers = {"Content-Type": "application/json"}
try:
r = requests.post(
self.jsonrpc_url, headers=headers, data=payload, timeout=300
)
except Exception as e:
print(e)
raise e
return r
def send_json_message(self, message):
encoded_json = json.dumps(message)
raw_response = self.send_message(encoded_json.encode("utf8"))
response = raw_response.json()
return response
def enrich_from_web3(
nft_events: List[NFTEvent],
batch_loader: EthereumBatchloader,
) -> List[NFTEvent]:
"""
Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db)
"""
transactions_to_query = set()
indices_to_update: List[int] = []
for index, nft_event in enumerate(nft_events):
if (
nft_event.block_number == "None"
or nft_event.value == "None"
or nft_event.timestamp == "None"
):
transactions_to_query.add(nft_event.transaction_hash)
indices_to_update.append(index)
if len(transactions_to_query) == 0:
return nft_events
logger.info("Calling JSON RPC API")
jsonrpc_transactions_response = batch_loader.load_transactions(
list(transactions_to_query)
)
transactions_map = {
result["result"]["hash"]: (
int(result["result"]["value"], 16),
int(result["result"]["blockNumber"], 16),
)
for result in jsonrpc_transactions_response
}
blocks_to_query: Set[int] = set()
for index in indices_to_update:
nft_events[index].value, nft_events[index].block_number = transactions_map[
nft_events[index].transaction_hash
]
blocks_to_query.add(cast(int, nft_events[index].block_number))
if len(blocks_to_query) == 0:
return nft_events
jsonrpc_blocks_response = batch_loader.load_blocks(list(blocks_to_query), False)
blocks_map = {
int(result["result"]["number"], 16): int(result["result"]["timestamp"], 16)
for result in jsonrpc_blocks_response
}
for index in indices_to_update:
nft_events[index].timestamp = blocks_map[cast(int, nft_event.block_number)]
return nft_events
def enrich(
datastore_conn: sqlite3.Connection,
event_type: EventType,
batch_loader: EthereumBatchloader,
batch_size: int = 1000,
) -> None:
events = get_events_for_enrich(datastore_conn, event_type)
events_batch = []
for event in tqdm(events, f"Processing events for {event_type.value} event type"):
events_batch.append(event)
if len(events_batch) == batch_size:
logger.info("Getting data from JSONrpc")
enriched_events = enrich_from_web3(
events_batch,
batch_loader,
)
update_events_batch(datastore_conn, enriched_events)
events_batch = []
logger.info("Getting data from JSONrpc")
enriched_events = enrich_from_web3(
events_batch,
batch_loader,
)
update_events_batch(datastore_conn, enriched_events)
from .data import BlockBounds

Wyświetl plik

@ -1,190 +1,215 @@
import logging
import sqlite3
from typing import Any, cast, Iterator, List, Optional, Set
from typing import Any, Dict, Union, cast, Iterator, List, Optional, Set
import json
from attr import dataclass
from moonstreamdb.models import (
EthereumLabel,
EthereumTransaction,
EthereumBlock,
PolygonLabel,
)
from sqlalchemy import or_, and_
from sqlalchemy.orm import Session
from tqdm import tqdm
from web3 import Web3
import requests
from .data import BlockBounds, EventType, NFTEvent, NFTMetadata, event_types
from .datastore import (
get_checkpoint_offset,
insert_address_metadata,
insert_checkpoint,
insert_events,
from .data import (
NftApprovalEvent,
NftApprovalForAllEvent,
NftTransaction,
NftTransferEvent,
Erc20TransferEvent,
)
from .datastore import insert_events, insert_transactions
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
ERC721_LABEL = "erc721"
def add_events(
datastore_conn: sqlite3.Connection,
db_session: Session,
event_type: EventType,
initial_offset=0,
bounds: Optional[BlockBounds] = None,
batch_size: int = 10,
) -> None:
raw_created_at_list = (
db_session.query(EthereumLabel.created_at)
.filter(EthereumLabel.label == event_type.value)
.order_by(EthereumLabel.created_at.asc())
.distinct(EthereumLabel.created_at)
).all()
created_at_list = [
created_at[0] for created_at in raw_created_at_list[initial_offset:]
]
query = (
db_session.query(
EthereumLabel.id,
EthereumLabel.label,
EthereumLabel.address,
EthereumLabel.label_data,
EthereumLabel.transaction_hash,
EthereumTransaction.value,
EthereumTransaction.block_number,
EthereumBlock.timestamp,
)
.filter(EthereumLabel.label == event_type.value)
.outerjoin(
EthereumTransaction,
EthereumLabel.transaction_hash == EthereumTransaction.hash,
)
.outerjoin(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.order_by(
EthereumLabel.created_at.asc(),
)
def _get_last_labeled_erc721_block(
session: Session, label_model: Union[EthereumLabel, PolygonLabel]
) -> int:
last = (
session.query(label_model.block_number)
.filter(label_model.label == ERC721_LABEL)
.order_by(label_model.block_number.desc())
.first()
)
if bounds is not None:
time_filters = [EthereumTransaction.block_number >= bounds.starting_block]
if bounds.ending_block is not None:
time_filters.append(EthereumTransaction.block_number <= bounds.ending_block)
bounds_filters = [EthereumTransaction.hash == None, and_(*time_filters)]
query = query.filter(or_(*bounds_filters))
pbar = tqdm(total=(len(raw_created_at_list)))
pbar.set_description(f"Processing created ats")
pbar.update(initial_offset)
batch_start = 0
batch_end = batch_start + batch_size
while batch_start <= len(created_at_list):
events = query.filter(
EthereumLabel.created_at.in_(created_at_list[batch_start : batch_end + 1])
).all()
if not events:
continue
raw_events_batch = []
for (
event_id,
label,
address,
label_data,
transaction_hash,
value,
block_number,
timestamp,
) in events:
raw_event = NFTEvent(
event_id=event_id,
event_type=event_types[label],
nft_address=address,
token_id=label_data["tokenId"],
from_address=label_data["from"],
to_address=label_data["to"],
transaction_hash=transaction_hash,
value=value,
block_number=block_number,
timestamp=timestamp,
)
raw_events_batch.append(raw_event)
logger.info(f"Adding {len(raw_events_batch)} to database")
insert_events(
datastore_conn, raw_events_batch
) # TODO REMOVED WEB3 enrich, since node is down
insert_checkpoint(datastore_conn, event_type, batch_end + initial_offset)
pbar.update(batch_end - batch_start + 1)
batch_start = batch_end + 1
batch_end = min(batch_end + batch_size, len(created_at_list))
if last is None:
raise ValueError(f"No ERC721 labels found in {label_model.__tablename__} table")
return last[0]
def create_dataset(
datastore_conn: sqlite3.Connection,
db_session: Session,
event_type: EventType,
bounds: Optional[BlockBounds] = None,
batch_size: int = 10,
) -> None:
"""
Creates Moonstream NFTs dataset in the given SQLite datastore.
"""
offset = get_checkpoint_offset(datastore_conn, event_type)
if offset is not None:
logger.info(f"Found checkpoint for {event_type.value}: offset = {offset}")
def parse_transaction_label(
label_model: Union[EthereumLabel, PolygonLabel]
) -> NftTransaction:
assert (
label_model.label_data["type"] == "tx_call"
), "Expected label to be of type 'tx_call'"
if isinstance(label_model, EthereumLabel):
blockchain_type = "ethereum"
else:
offset = 0
logger.info(f"Did not found any checkpoint for {event_type.value}")
blockchain_type = "polygon"
if event_type == EventType.ERC721:
add_contracts_metadata(datastore_conn, db_session, offset, batch_size)
else:
add_events(
datastore_conn,
db_session,
event_type,
offset,
bounds,
batch_size,
)
# TODO: this is done because I forgot to add value in polygon labels
value = 0
if label_model.label_data.get("value") is not None:
value = label_model.label_data["value"]
def add_contracts_metadata(
datastore_conn: sqlite3.Connection,
db_session: Session,
initial_offset: int = 0,
batch_size: int = 1000,
) -> None:
logger.info("Adding erc721 contract metadata")
query = (
db_session.query(EthereumLabel.label_data, EthereumLabel.address)
.filter(EthereumLabel.label == EventType.ERC721.value)
.order_by(EthereumLabel.created_at)
return NftTransaction(
blockchain_type=blockchain_type,
block_number=label_model.block_number,
block_timestamp=label_model.block_timestamp,
transaction_hash=label_model.transaction_hash,
contract_address=label_model.address,
caller_address=label_model.label_data["caller"],
function_name=label_model.label_data["name"],
function_args=label_model.label_data["args"],
gas_used=label_model.label_data["gasUsed"],
gas_price=label_model.label_data["gasPrice"],
value=value,
status=label_model.label_data["status"],
max_fee_per_gas=label_model.label_data["maxFeePerGas"],
max_priority_fee_per_gas=label_model.label_data["maxPriorityFeePerGas"],
)
offset = initial_offset
while True:
events = query.offset(offset).limit(batch_size).all()
if not events:
break
offset += len(events)
events_batch: List[NFTMetadata] = []
for label_data, address in events:
events_batch.append(
NFTMetadata(
address=address,
name=label_data.get("name", None),
symbol=label_data.get("symbol", None),
)
def _parse_transfer_event(
label_model: Union[EthereumLabel, PolygonLabel]
) -> NftTransferEvent:
assert (
label_model.label_data["type"] == "event"
), "Expected label to be of type 'event'"
assert (
label_model.label_data["name"] == "Transfer"
), "Expected label to be of type 'Transfer'"
if isinstance(label_model, EthereumLabel):
blockchain_type = "ethereum"
else:
blockchain_type = "polygon"
if label_model.label_data["args"].get("tokenId") is not None:
return NftTransferEvent(
blockchain_type=blockchain_type,
from_address=label_model.label_data["args"]["from"],
to_address=label_model.label_data["args"]["to"],
token_id=label_model.label_data["args"]["tokenId"],
log_index=label_model.log_index,
transaction_hash=label_model.transaction_hash,
)
else:
return Erc20TransferEvent(
blockchain_type=blockchain_type,
from_address=label_model.label_data["args"]["from"],
to_address=label_model.label_data["args"]["to"],
value=label_model.label_data["args"]["value"],
log_index=label_model.log_index,
transaction_hash=label_model.transaction_hash,
)
def _parse_approval_event(
label_model: Union[EthereumLabel, PolygonLabel]
) -> NftApprovalEvent:
assert (
label_model.label_data["type"] == "event"
), "Expected label to be of type 'event'"
assert (
label_model.label_data["name"] == "Approval"
), "Expected label to be of type 'Approval'"
if isinstance(label_model, EthereumLabel):
blockchain_type = "ethereum"
else:
blockchain_type = "polygon"
return NftApprovalEvent(
blockchain_type=blockchain_type,
owner=label_model.label_data["args"]["owner"],
approved=label_model.label_data["args"]["approved"],
token_id=label_model.label_data["args"]["tokenId"],
log_index=label_model.log_index,
transaction_hash=label_model.transaction_hash,
)
def _parse_approval_for_all_event(
label_model: Union[EthereumLabel, PolygonLabel]
) -> NftApprovalForAllEvent:
assert (
label_model.label_data["type"] == "event"
), "Expected label to be of type 'event'"
assert (
label_model.label_data["name"] == "ApprovalForAll"
), "Expected label to be of type 'ApprovalForAll'"
if isinstance(label_model, EthereumLabel):
blockchain_type = "ethereum"
else:
blockchain_type = "polygon"
return NftApprovalForAllEvent(
blockchain_type=blockchain_type,
owner=label_model.label_data["args"]["owner"],
operator=label_model.label_data["args"]["operator"],
approved=label_model.label_data["args"]["approved"],
log_index=label_model.log_index,
transaction_hash=label_model.transaction_hash,
)
def parse_event(
label_model: Union[EthereumLabel, PolygonLabel]
) -> Union[NftTransferEvent, NftApprovalEvent, NftApprovalForAllEvent]:
if label_model.label_data["name"] == "Transfer":
return _parse_transfer_event(label_model)
elif label_model.label_data["name"] == "Approval":
return _parse_approval_event(label_model)
elif label_model.label_data["name"] == "ApprovalForAll":
return _parse_approval_for_all_event(label_model)
else:
raise ValueError(f"Unknown label type: {label_model.label_data['name']}")
def crawl_erc721_labels(
db_session: Session,
conn: sqlite3.Connection,
label_model: Union[EthereumLabel, PolygonLabel],
start_block: int,
end_block: int,
batch_size: int = 10000,
):
logger.info(
f"Crawling {label_model.__tablename__} from {start_block} to {end_block}"
)
pbar = tqdm(total=(end_block - start_block + 1))
pbar.set_description(
f"Crawling {label_model.__tablename__} blocks {start_block}-{end_block}"
)
current_block = start_block
while current_block <= end_block:
batch_end = min(current_block + batch_size, end_block)
logger.info(f"Crawling {current_block}-{batch_end}")
labels = db_session.query(label_model).filter(
and_(
label_model.block_number >= current_block,
label_model.block_number <= batch_end,
label_model.label == ERC721_LABEL,
)
insert_address_metadata(datastore_conn, events_batch)
insert_checkpoint(datastore_conn, EventType.ERC721, offset)
logger.info(f"Already added {offset}")
)
logger.info(f"Added total of {offset-initial_offset} nfts metadata")
transactions = []
events = []
for label in labels:
if label.label_data["type"] == "tx_call":
transactions.append(parse_transaction_label(label))
else:
events.append(parse_event(label))
insert_transactions(conn, transactions)
insert_events(conn, events)
logger.info(f"Found {len(events)} events and {len(transactions)} transactions")
pbar.update(batch_end - current_block + 1)
current_block = batch_end + 1