kopia lustrzana https://github.com/bugout-dev/moonstream
added crawling of erc721
rodzic
4be1fe4d7a
commit
cb2c65b0ed
|
@ -3,6 +3,7 @@ Data structures used in (and as part of the maintenance of) the Moonstream NFTs
|
|||
"""
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from os import name
|
||||
from typing import Optional
|
||||
|
||||
|
||||
|
@ -15,6 +16,7 @@ class BlockBounds:
|
|||
class EventType(Enum):
|
||||
TRANSFER = "nft_transfer"
|
||||
MINT = "nft_mint"
|
||||
ERC721 = "erc721"
|
||||
|
||||
|
||||
event_types = {event_type.value: event_type for event_type in EventType}
|
||||
|
@ -38,3 +40,10 @@ class NFTEvent:
|
|||
value: Optional[int] = None
|
||||
block_number: Optional[int] = None
|
||||
timestamp: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class NFTMetadata:
|
||||
address: str
|
||||
name: str
|
||||
symbol: str
|
||||
|
|
|
@ -6,7 +6,7 @@ import logging
|
|||
import sqlite3
|
||||
from typing import Any, List, Tuple, Optional
|
||||
|
||||
from .data import EventType, NFTEvent, nft_event
|
||||
from .data import EventType, NFTEvent, NFTMetadata
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -18,7 +18,8 @@ CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts
|
|||
(
|
||||
address TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
|
||||
name TEXT,
|
||||
symbol TEXT
|
||||
symbol TEXT,
|
||||
UNIQUE(address, name, symbol)
|
||||
);
|
||||
"""
|
||||
|
||||
|
@ -127,6 +128,30 @@ def insert_checkpoint(
|
|||
conn.commit()
|
||||
|
||||
|
||||
def insert_address_metadata(
|
||||
conn: sqlite3.Connection, metadata_list: List[NFTMetadata]
|
||||
) -> None:
|
||||
cur = conn.cursor()
|
||||
query = f"""
|
||||
INSERT INTO nfts (
|
||||
address,
|
||||
name,
|
||||
symbol
|
||||
) VALUES (?, ?, ?)
|
||||
"""
|
||||
try:
|
||||
nfts = [
|
||||
(metadata.address, metadata.name, metadata.symbol)
|
||||
for metadata in metadata_list
|
||||
]
|
||||
cur.executemany(query, nfts)
|
||||
conn.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save :\n {metadata_list}")
|
||||
conn.rollback()
|
||||
raise e
|
||||
|
||||
|
||||
def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
|
||||
"""
|
||||
Inserts the given events into the appropriate events table in the given SQLite database.
|
||||
|
|
|
@ -16,8 +16,13 @@ from tqdm import tqdm
|
|||
from web3 import Web3
|
||||
import requests
|
||||
|
||||
from .data import BlockBounds, EventType, NFTEvent, event_types
|
||||
from .datastore import get_checkpoint_offset, insert_checkpoint, insert_events
|
||||
from .data import BlockBounds, EventType, NFTEvent, NFTMetadata, event_types
|
||||
from .datastore import (
|
||||
get_checkpoint_offset,
|
||||
insert_address_metadata,
|
||||
insert_checkpoint,
|
||||
insert_events,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -164,6 +169,7 @@ def get_events(
|
|||
EthereumTransaction.block_number,
|
||||
EthereumBlock.timestamp,
|
||||
)
|
||||
.filter(EthereumLabel.label == event_type.value)
|
||||
.join(EthereumAddress, EthereumLabel.address_id == EthereumAddress.id)
|
||||
.outerjoin(
|
||||
EthereumTransaction,
|
||||
|
@ -173,7 +179,6 @@ def get_events(
|
|||
EthereumBlock,
|
||||
EthereumTransaction.block_number == EthereumBlock.block_number,
|
||||
)
|
||||
.filter(EthereumLabel.label == event_type.value)
|
||||
.order_by(
|
||||
EthereumLabel.created_at.asc(),
|
||||
EthereumLabel.transaction_hash.asc(),
|
||||
|
@ -234,6 +239,9 @@ def create_dataset(
|
|||
offset = 0
|
||||
logger.info(f"Did not found any checkpoint for {event_type.value}")
|
||||
|
||||
if event_type == EventType.ERC721:
|
||||
add_contracts_metadata(datastore_conn, db_session, offset, batch_size)
|
||||
return
|
||||
raw_events = get_events(db_session, event_type, bounds, offset, batch_size)
|
||||
raw_events_batch: List[NFTEvent] = []
|
||||
|
||||
|
@ -266,3 +274,40 @@ def create_dataset(
|
|||
offset,
|
||||
raw_events_batch[-1].transaction_hash,
|
||||
)
|
||||
|
||||
|
||||
def add_contracts_metadata(
|
||||
datastore_conn: sqlite3.Connection,
|
||||
db_session: Session,
|
||||
initial_offset: int = 0,
|
||||
batch_size: int = 1000,
|
||||
) -> None:
|
||||
logger.info("Adding erc721 contract metadata")
|
||||
query = (
|
||||
db_session.query(EthereumLabel.label_data, EthereumAddress.address)
|
||||
.filter(EthereumLabel.label == EventType.ERC721.value)
|
||||
.join(EthereumAddress, EthereumLabel.address_id == EthereumAddress.id)
|
||||
.order_by(EthereumLabel.created_at, EthereumLabel.address_id)
|
||||
)
|
||||
|
||||
offset = initial_offset
|
||||
while True:
|
||||
events = query.offset(offset).limit(batch_size).all()
|
||||
if not events:
|
||||
break
|
||||
offset += len(events)
|
||||
|
||||
events_batch: List[NFTMetadata] = []
|
||||
for label_data, address in events:
|
||||
events_batch.append(
|
||||
NFTMetadata(
|
||||
address=address,
|
||||
name=label_data.get("name", None),
|
||||
symbol=label_data.get("symbol", None),
|
||||
)
|
||||
)
|
||||
insert_address_metadata(datastore_conn, events_batch)
|
||||
insert_checkpoint(datastore_conn, EventType.ERC721, offset, "ERC721 checkpoint")
|
||||
logger.info(f"Already added {offset}")
|
||||
|
||||
logger.info(f"Added total of {offset-initial_offset} nfts metadata")
|
||||
|
|
Ładowanie…
Reference in New Issue