Merge branch 'main' into xdai-subscription

pull/616/head
kompotkot 2022-05-26 13:00:47 +00:00
commit ea62342b07
24 zmienionych plików z 2291 dodań i 2010 usunięć

Wyświetl plik

@ -1,35 +1,34 @@
"""
The Mooncrawl HTTP API
"""
import logging
import time
from cgi import test
from datetime import datetime, timedelta
import logging
from os import times
import time
from typing import Dict, Any, List
from typing import Any, Dict, List
from uuid import UUID
import boto3 # type: ignore
from fastapi import FastAPI, BackgroundTasks
from bugout.data import BugoutResource, BugoutResources
from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import text
from bugout.data import BugoutResource, BugoutResources
from . import data
from .middleware import MoonstreamHTTPException
from .settings import (
DOCS_TARGET_PATH,
ORIGINS,
bugout_client as bc,
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,
NB_CONTROLLER_ACCESS_ID,
DOCS_TARGET_PATH,
MOONSTREAM_S3_QUERIES_BUCKET,
MOONSTREAM_S3_QUERIES_BUCKET_PREFIX,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID,
ORIGINS,
)
from .version import MOONCRAWL_VERSION
from .settings import bugout_client as bc
from .stats_worker import dashboard, queries
from .version import MOONCRAWL_VERSION
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Wyświetl plik

@ -26,12 +26,12 @@ from web3.types import BlockData
from .data import AvailableBlockchainType, DateRange
from .settings import (
NB_ACCESS_ID_HEADER,
NB_DATA_SOURCE_HEADER,
MOONSTREAM_CRAWL_WORKERS,
MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI,
MOONSTREAM_POLYGON_WEB3_PROVIDER_URI,
MOONSTREAM_XDAI_WEB3_PROVIDER_URI,
NB_ACCESS_ID_HEADER,
NB_DATA_SOURCE_HEADER,
)
logger = logging.getLogger(__name__)

Wyświetl plik

@ -11,8 +11,8 @@ from web3 import Web3
from ..blockchain import connect
from ..data import AvailableBlockchainType
from .deployment_crawler import ContractDeploymentCrawler, MoonstreamDataStore
from ..settings import NB_CONTROLLER_ACCESS_ID
from .deployment_crawler import ContractDeploymentCrawler, MoonstreamDataStore
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

Wyświetl plik

@ -23,7 +23,7 @@ from .blockchain import (
)
from .data import AvailableBlockchainType
from .publish import publish_json
from .settings import NB_CONTROLLER_ACCESS_ID, MOONSTREAM_CRAWL_WORKERS
from .settings import MOONSTREAM_CRAWL_WORKERS, NB_CONTROLLER_ACCESS_ID
from .version import MOONCRAWL_VERSION
logging.basicConfig(level=logging.INFO)

Wyświetl plik

@ -1,7 +1,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import List, Any, Dict
from typing import Any, Dict, List
from pydantic import BaseModel, Field

Wyświetl plik

@ -0,0 +1,344 @@
[
{
"inputs": [
{
"internalType": "string",
"name": "name_",
"type": "string"
},
{
"internalType": "string",
"name": "symbol_",
"type": "string"
},
{
"internalType": "address",
"name": "owner",
"type": "address"
}
],
"stateMutability": "nonpayable",
"type": "constructor"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "spender",
"type": "address"
},
{
"indexed": false,
"internalType": "uint256",
"name": "value",
"type": "uint256"
}
],
"name": "Approval",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "from",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "to",
"type": "address"
},
{
"indexed": false,
"internalType": "uint256",
"name": "value",
"type": "uint256"
}
],
"name": "Transfer",
"type": "event"
},
{
"inputs": [
{
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"internalType": "address",
"name": "spender",
"type": "address"
}
],
"name": "allowance",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "spender",
"type": "address"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
}
],
"name": "approve",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "account",
"type": "address"
}
],
"name": "balanceOf",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "decimals",
"outputs": [
{
"internalType": "uint8",
"name": "",
"type": "uint8"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "spender",
"type": "address"
},
{
"internalType": "uint256",
"name": "subtractedValue",
"type": "uint256"
}
],
"name": "decreaseAllowance",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "spender",
"type": "address"
},
{
"internalType": "uint256",
"name": "addedValue",
"type": "uint256"
}
],
"name": "increaseAllowance",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "account",
"type": "address"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
}
],
"name": "mint",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [],
"name": "name",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "owner",
"outputs": [
{
"internalType": "address",
"name": "",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "renounceOwnership",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [],
"name": "symbol",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "totalSupply",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "recipient",
"type": "address"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
}
],
"name": "transfer",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "sender",
"type": "address"
},
{
"internalType": "address",
"name": "recipient",
"type": "address"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
}
],
"name": "transferFrom",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "newOwner",
"type": "address"
}
],
"name": "transferOwnership",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
}
]

Wyświetl plik

@ -0,0 +1,404 @@
[
{
"inputs": [
{
"internalType": "string",
"name": "name_",
"type": "string"
},
{
"internalType": "string",
"name": "symbol_",
"type": "string"
},
{
"internalType": "address",
"name": "owner",
"type": "address"
}
],
"stateMutability": "nonpayable",
"type": "constructor"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "approved",
"type": "address"
},
{
"indexed": true,
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "Approval",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "operator",
"type": "address"
},
{
"indexed": false,
"internalType": "bool",
"name": "approved",
"type": "bool"
}
],
"name": "ApprovalForAll",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "from",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "to",
"type": "address"
},
{
"indexed": true,
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "Transfer",
"type": "event"
},
{
"inputs": [
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "approve",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "owner",
"type": "address"
}
],
"name": "balanceOf",
"outputs": [
{
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "getApproved",
"outputs": [
{
"internalType": "address",
"name": "",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"internalType": "address",
"name": "operator",
"type": "address"
}
],
"name": "isApprovedForAll",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "mint",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [],
"name": "name",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "owner",
"outputs": [
{
"internalType": "address",
"name": "",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "ownerOf",
"outputs": [
{
"internalType": "address",
"name": "",
"type": "address"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "renounceOwnership",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "from",
"type": "address"
},
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "safeTransferFrom",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "from",
"type": "address"
},
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
},
{
"internalType": "bytes",
"name": "_data",
"type": "bytes"
}
],
"name": "safeTransferFrom",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "operator",
"type": "address"
},
{
"internalType": "bool",
"name": "approved",
"type": "bool"
}
],
"name": "setApprovalForAll",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "bytes4",
"name": "interfaceId",
"type": "bytes4"
}
],
"name": "supportsInterface",
"outputs": [
{
"internalType": "bool",
"name": "",
"type": "bool"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "symbol",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "tokenURI",
"outputs": [
{
"internalType": "string",
"name": "",
"type": "string"
}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "from",
"type": "address"
},
{
"internalType": "address",
"name": "to",
"type": "address"
},
{
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "transferFrom",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
},
{
"inputs": [
{
"internalType": "address",
"name": "newOwner",
"type": "address"
}
],
"name": "transferOwnership",
"outputs": [],
"stateMutability": "nonpayable",
"type": "function"
}
]

Wyświetl plik

@ -0,0 +1,452 @@
import json
import logging
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Union
import web3
from eth_typing import ChecksumAddress
from hexbytes.main import HexBytes
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import (
Base,
EthereumLabel,
EthereumTransaction,
PolygonLabel,
PolygonTransaction,
)
from moonworm.crawler.function_call_crawler import ( # type: ignore
ContractFunctionCall,
utfy_dict,
)
from moonworm.crawler.log_scanner import _fetch_events_chunk # type: ignore
from sqlalchemy.orm.session import Session
from tqdm import tqdm
from web3 import Web3
from web3._utils.events import get_event_data
from mooncrawl.data import AvailableBlockchainType # type: ignore
from ..blockchain import (
connect,
get_block_model,
get_label_model,
get_transaction_model,
)
from ..moonworm_crawler.db import (
_event_to_label,
add_events_to_session,
commit_session,
get_last_labeled_block_number,
)
from ..moonworm_crawler.event_crawler import Event, get_block_timestamp
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# TODO: ADD VALUE!!!
@dataclass
class ExtededFunctionCall(ContractFunctionCall):
gas_price: int
max_fee_per_gas: Optional[int] = None
max_priority_fee_per_gas: Optional[int] = None
value: int = 0
def _function_call_with_gas_price_to_label(
blockchain_type: AvailableBlockchainType,
function_call: ExtededFunctionCall,
label_name: str,
) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=label_name,
label_data={
"type": "tx_call",
"name": function_call.function_name,
"caller": function_call.caller_address,
"args": function_call.function_args,
"status": function_call.status,
"gasUsed": function_call.gas_used,
"gasPrice": function_call.gas_price,
"maxFeePerGas": function_call.max_fee_per_gas,
"maxPriorityFeePerGas": function_call.max_priority_fee_per_gas,
"value": function_call.value,
},
address=function_call.contract_address,
block_number=function_call.block_number,
transaction_hash=function_call.transaction_hash,
block_timestamp=function_call.block_timestamp,
)
return label
def add_function_calls_with_gas_price_to_session(
db_session: Session,
function_calls: List[ExtededFunctionCall],
blockchain_type: AvailableBlockchainType,
label_name: str,
) -> None:
label_model = get_label_model(blockchain_type)
transactions_hashes_to_save = [
function_call.transaction_hash for function_call in function_calls
]
existing_labels = (
db_session.query(label_model.transaction_hash)
.filter(
label_model.label == label_name,
label_model.log_index == None,
label_model.transaction_hash.in_(transactions_hashes_to_save),
)
.all()
)
existing_labels_transactions = [label[0] for label in existing_labels]
labels_to_save = [
_function_call_with_gas_price_to_label(
blockchain_type, function_call, label_name
)
for function_call in function_calls
if function_call.transaction_hash not in existing_labels_transactions
]
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)
def _transform_to_w3_tx(
tx_raw: Union[EthereumTransaction, PolygonTransaction],
) -> Dict[str, Any]:
"""
Transform db transaction model to web3 transaction
"""
tx = {
"blockNumber": tx_raw.block_number,
"from": tx_raw.from_address,
"gas": tx_raw.gas,
"gasPrice": tx_raw.gas_price,
"hash": HexBytes(tx_raw.hash),
"input": tx_raw.input,
"maxFeePerGas": tx_raw.max_fee_per_gas,
"maxPriorityFeePerGas": tx_raw.max_priority_fee_per_gas,
"nonce": tx_raw.nonce,
"to": tx_raw.to_address,
"transactionIndex": tx_raw.transaction_index,
"value": tx_raw.value,
}
if tx["maxFeePerGas"] is not None:
tx["maxFeePerGas"] = int(tx["maxFeePerGas"])
if tx["maxPriorityFeePerGas"] is not None:
tx["maxPriorityFeePerGas"] = int(tx["maxPriorityFeePerGas"])
if tx["gasPrice"] is not None:
tx["gasPrice"] = int(tx["gasPrice"])
if tx["value"] is not None:
tx["value"] = int(tx["value"])
return tx
def process_transaction(
db_session: Session,
web3: Web3,
blockchain_type: AvailableBlockchainType,
contract: Any,
secondary_abi: List[Dict[str, Any]],
transaction: Dict[str, Any],
blocks_cache: Dict[int, int],
):
try:
raw_function_call = contract.decode_function_input(transaction["input"])
function_name = raw_function_call[0].fn_name
function_args = utfy_dict(raw_function_call[1])
except Exception as e:
# logger.error(f"Failed to decode transaction : {str(e)}")
selector = transaction["input"][:10]
function_name = selector
function_args = "unknown"
transaction_reciept = web3.eth.getTransactionReceipt(transaction["hash"])
block_timestamp = get_block_timestamp(
db_session,
web3,
blockchain_type,
transaction["blockNumber"],
blocks_cache,
100,
)
function_call = ExtededFunctionCall(
block_number=transaction["blockNumber"],
block_timestamp=block_timestamp,
transaction_hash=transaction["hash"].hex(),
contract_address=transaction["to"],
caller_address=transaction["from"],
function_name=function_name,
function_args=function_args,
status=transaction_reciept["status"],
gas_used=transaction_reciept["gasUsed"],
gas_price=transaction["gasPrice"],
max_fee_per_gas=transaction.get(
"maxFeePerGas",
),
max_priority_fee_per_gas=transaction.get("maxPriorityFeePerGas"),
value=transaction["value"],
)
secondary_logs = []
for log in transaction_reciept["logs"]:
for abi in secondary_abi:
try:
raw_event = get_event_data(web3.codec, abi, log)
event = {
"event": raw_event["event"],
"args": json.loads(Web3.toJSON(utfy_dict(dict(raw_event["args"])))),
"address": raw_event["address"],
"blockNumber": raw_event["blockNumber"],
"transactionHash": raw_event["transactionHash"].hex(),
"logIndex": raw_event["logIndex"],
"blockTimestamp": block_timestamp,
}
processed_event = _processEvent(event)
secondary_logs.append(processed_event)
break
except:
pass
return function_call, secondary_logs
def _get_transactions(
db_session: Session,
web3: Web3,
blockchain_type: AvailableBlockchainType,
transaction_hashes: Set[str],
):
transaction_model = get_transaction_model(blockchain_type)
transactions = (
db_session.query(transaction_model)
.filter(transaction_model.hash.in_(transaction_hashes))
.all()
)
web3_transactions = [
_transform_to_w3_tx(transaction) for transaction in transactions
]
not_found_transaction_hashes = [
transaction_hash
for transaction_hash in transaction_hashes
if transaction_hash not in [transaction.hash for transaction in transactions]
]
for nf_transaction in not_found_transaction_hashes:
tx = web3.eth.getTransaction(nf_transaction)
web3_transactions.append(tx)
return web3_transactions
def _processEvent(raw_event: Dict[str, Any]):
event = Event(
event_name=raw_event["event"],
args=raw_event["args"],
address=raw_event["address"],
block_number=raw_event["blockNumber"],
block_timestamp=raw_event["blockTimestamp"],
transaction_hash=raw_event["transactionHash"],
log_index=raw_event["logIndex"],
)
return event
def populate_with_events(
db_session: Session,
web3: Web3,
blockchain_type: AvailableBlockchainType,
label_name: str,
populate_from_label: str,
abi: List[Dict[str, Any]],
from_block: int,
to_block: int,
batch_size: int = 100,
):
current_block = from_block
events_abi = [event for event in abi if event["type"] == "event"]
label_model = get_label_model(blockchain_type)
pbar = tqdm(total=(to_block - from_block + 1))
pbar.set_description(f"Populating events for blocks {from_block}-{to_block}")
while current_block <= to_block:
batch_end = min(current_block + batch_size, to_block)
events = []
logger.info("Fetching events")
txs = (
db_session.query(
label_model.transaction_hash,
label_model.block_number,
label_model.block_timestamp,
)
.filter(
label_model.label == populate_from_label,
label_model.block_number >= current_block,
label_model.block_number <= batch_end,
)
.distinct()
.all()
)
txs_to_populate = {tx[0] for tx in txs}
block_timestamps = {tx[1]: tx[2] for tx in txs}
logger.info(f"Theoretically {len(txs_to_populate)} transactions to populate")
if len(txs_to_populate) == 0:
pbar.update(batch_end - current_block + 1)
current_block = batch_end + 1
continue
for event_abi in events_abi:
raw_events = _fetch_events_chunk(
web3,
event_abi,
current_block,
batch_end,
)
logger.info(f"Fetched {len(raw_events)} events")
for raw_event in raw_events:
if raw_event["transactionHash"] not in txs_to_populate:
continue
raw_event["blockTimestamp"] = block_timestamps[raw_event["blockNumber"]]
event = _processEvent(raw_event)
events.append(event)
logger.info(f"Found {len(events)} events for populate")
add_events_to_session(db_session, events, blockchain_type, label_name)
commit_session(db_session)
pbar.update(batch_end - current_block + 1)
current_block = batch_end + 1
def crawl(
db_session: Session,
web3: Web3,
blockchain_type: AvailableBlockchainType,
label_name: str,
abi: Dict[str, Any],
secondary_abi: List[Dict[str, Any]],
from_block: int,
to_block: int,
crawl_transactions: bool = True,
addresses: Optional[List[ChecksumAddress]] = None,
batch_size: int = 100,
) -> None:
current_block = from_block
db_blocks_cache: Dict[int, int] = {}
contract = web3.eth.contract(abi=abi)
# TODO(yhtiyar): load checkpoint
events_abi = [item for item in abi if item["type"] == "event"] # type: ignore
pbar = tqdm(total=(to_block - from_block + 1))
pbar.set_description(f"Crawling blocks {from_block}-{to_block}")
while current_block <= to_block:
blockchain_block = web3.eth.block_number
if current_block > blockchain_block:
logger.info("Current block is greater than blockchain block, sleeping")
time.sleep(1)
continue
batch_end = min(current_block + batch_size, to_block)
logger.info(f"Crawling blocks {current_block}-{current_block + batch_size}")
events = []
logger.info("Fetching events")
for event_abi in events_abi:
raw_events = _fetch_events_chunk(
web3,
event_abi,
current_block,
batch_end,
addresses,
)
for raw_event in raw_events:
raw_event["blockTimestamp"] = get_block_timestamp(
db_session,
web3,
blockchain_type,
raw_event["blockNumber"],
blocks_cache=db_blocks_cache,
max_blocks_batch=1000,
)
event = _processEvent(raw_event)
events.append(event)
if crawl_transactions:
transaction_hashes = {event.transaction_hash for event in events}
logger.info(f"Fetched {len(events)} events")
logger.info(f"Fetching {len(transaction_hashes)} transactions")
transactions = _get_transactions(
db_session, web3, blockchain_type, transaction_hashes
)
logger.info(f"Fetched {len(transactions)} transactions")
function_calls = []
for tx in transactions:
processed_tx, secondary_logs = process_transaction(
db_session,
web3,
blockchain_type,
contract,
secondary_abi,
tx,
db_blocks_cache,
)
function_calls.append(processed_tx)
events.extend(secondary_logs)
add_function_calls_with_gas_price_to_session(
db_session,
function_calls,
blockchain_type,
label_name,
)
add_events_to_session(
db_session,
events,
blockchain_type,
label_name,
)
commit_session(db_session)
pbar.update(batch_end - current_block + 1)
current_block = batch_end + 1
def get_checkpoint(
db_session: Session,
blockchain_type: AvailableBlockchainType,
from_block: int,
to_block: int,
label_name: str,
) -> int:
label_model = get_label_model(blockchain_type)
last_labeled_block = (
db_session.query(label_model.block_number)
.filter(label_model.label == label_name)
.filter(label_model.block_number <= to_block)
.filter(label_model.block_number >= from_block)
.order_by(label_model.block_number.desc())
.first()
)
if last_labeled_block is None:
return from_block
return last_labeled_block[0] + 1

Wyświetl plik

@ -0,0 +1,361 @@
import argparse
import json
import logging
from typing import Optional
from moonstreamdb.db import yield_db_session_ctx
from web3 import Web3
from web3.middleware import geth_poa_middleware
from mooncrawl.data import AvailableBlockchainType # type: ignore
from ..blockchain import connect
from .base import crawl, get_checkpoint, populate_with_events
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def handle_nft_crawler(args: argparse.Namespace) -> None:
logger.info(f"Starting NFT crawler")
with open("mooncrawl/generic_crawler/abis/erc721.json") as f:
abi = json.load(f)
label = args.label_name
from_block = args.start_block
to_block = args.end_block
# january_2021_block = 9013700 # for polygon
# to_block = 24975113 # for polygon, 15 february 2022
blockchain_type = AvailableBlockchainType(args.blockchain_type)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = connect(blockchain_type)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(args.web3),
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_crawled_block = get_checkpoint(
db_session, blockchain_type, from_block, to_block, label
)
logger.info(f"Starting from block: {last_crawled_block}")
crawl(
db_session,
web3,
blockchain_type,
label,
abi,
[],
from_block=last_crawled_block,
to_block=to_block,
batch_size=args.max_blocks_batch,
)
def populate_with_erc20_transfers(args: argparse.Namespace) -> None:
logger.info(f"Starting erc20 transfer crawler")
label = args.label_name
from_block = args.start_block
to_block = args.end_block
with open(args.abi) as f:
erc20_abi = json.load(f)
# Taking only transfer event from erc20_abi
erc20_abi = [
event
for event in erc20_abi
if event["type"] == "event" and event["name"] == "Transfer"
]
blockchain_type = AvailableBlockchainType(args.blockchain_type)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = connect(blockchain_type)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(args.web3),
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_crawled_block = get_checkpoint(
db_session, blockchain_type, from_block, to_block, label
)
logger.info(f"Starting from block: {last_crawled_block}")
populate_with_events(
db_session,
web3,
blockchain_type,
label,
args.label_to_populate,
erc20_abi,
last_crawled_block,
to_block,
batch_size=args.max_blocks_batch,
)
def handle_crawl(args: argparse.Namespace) -> None:
logger.info(f"Starting generic crawler")
label = args.label_name
from_block = args.start_block
to_block = args.end_block
with open(args.abi) as f:
abi = json.load(f)
blockchain_type = AvailableBlockchainType(args.blockchain_type)
logger.info(f"Blockchain type: {blockchain_type.value}")
with yield_db_session_ctx() as db_session:
web3: Optional[Web3] = None
if args.web3 is None:
logger.info(
"No web3 provider URL provided, using default (blockchan.py: connect())"
)
web3 = connect(blockchain_type)
else:
logger.info(f"Using web3 provider URL: {args.web3}")
web3 = Web3(
Web3.HTTPProvider(args.web3),
)
if args.poa:
logger.info("Using PoA middleware")
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
last_crawled_block = get_checkpoint(
db_session, blockchain_type, from_block, to_block, label
)
logger.info(f"Starting from block: {last_crawled_block}")
crawl_transaction = not args.disable_transactions
crawl(
db_session,
web3,
blockchain_type,
label,
abi,
secondary_abi=[],
from_block=last_crawled_block,
to_block=to_block,
crawl_transactions=crawl_transaction,
batch_size=args.max_blocks_batch,
)
def main():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
crawl_parser = subparsers.add_parser("crawl", help="Crawl with abi")
crawl_parser.add_argument(
"--blockchain_type",
type=str,
required=True,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
crawl_parser.add_argument(
"--abi",
type=str,
default=None,
help="Abi of the contract",
)
crawl_parser.add_argument(
"--disable_transactions",
action="store_true",
help="Disable transactions crawling",
)
crawl_parser.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
crawl_parser.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
crawl_parser.add_argument(
"--start_block",
type=int,
default=None,
)
crawl_parser.add_argument(
"--end_block",
type=int,
default=None,
)
crawl_parser.add_argument(
"--max_blocks_batch",
type=int,
default=500,
help="Maximum number of blocks to crawl in a single crawl step",
)
crawl_parser.add_argument(
"--label_name",
type=str,
default="erc721",
help="Label name",
)
crawl_parser.set_defaults(func=handle_crawl)
nft_crawler_parser = subparsers.add_parser(
"nft",
help="Run the NFT crawler",
)
nft_crawler_parser.add_argument(
"--blockchain_type",
type=str,
required=True,
choices=[
"ethereum",
"polygon",
],
)
nft_crawler_parser.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
nft_crawler_parser.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
nft_crawler_parser.add_argument(
"--start_block",
type=int,
default=None,
)
nft_crawler_parser.add_argument(
"--end_block",
type=int,
default=None,
)
nft_crawler_parser.add_argument(
"--max_blocks_batch",
type=int,
default=500,
help="Maximum number of blocks to crawl in a single crawl step",
)
nft_crawler_parser.add_argument(
"--label_name",
type=str,
default="erc721",
help="Label name",
)
nft_crawler_parser.set_defaults(func=handle_nft_crawler)
erc20_populate_parser = subparsers.add_parser(
"erc20_populate",
help="Populate erc20 labels",
)
erc20_populate_parser.add_argument(
"--blockchain_type",
type=str,
required=True,
choices=[
"ethereum",
"polygon",
],
)
erc20_populate_parser.add_argument(
"--web3",
type=str,
default=None,
help="Web3 provider URL",
)
erc20_populate_parser.add_argument(
"--poa",
action="store_true",
default=False,
help="Use PoA middleware",
)
erc20_populate_parser.add_argument(
"--start_block",
type=int,
default=None,
)
erc20_populate_parser.add_argument(
"--end_block",
type=int,
default=None,
)
erc20_populate_parser.add_argument(
"--max_blocks_batch",
type=int,
default=500,
help="Maximum number of blocks to crawl in a single crawl step",
)
erc20_populate_parser.add_argument(
"--label_name",
type=str,
required=True,
help="Label name",
)
erc20_populate_parser.add_argument(
"--label_to_populate",
type=str,
required=True,
help="Label name to populate",
)
erc20_populate_parser.add_argument(
"--abi",
type=str,
default=None,
help="Abi of the erc20 contract",
)
erc20_populate_parser.set_defaults(func=populate_with_erc20_transfers)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

Wyświetl plik

@ -44,8 +44,6 @@ def handle_crawl(args: argparse.Namespace) -> None:
f"Initial function call crawl jobs count: {len(initial_function_call_jobs)}"
)
# Couldn't figure out how to convert from string to AvailableBlockchainType
# AvailableBlockchainType(args.blockchain_type) is not working
blockchain_type = AvailableBlockchainType(args.blockchain_type)
logger.info(f"Blockchain type: {blockchain_type.value}")
@ -139,11 +137,7 @@ def main() -> None:
"--blockchain-type",
"-b",
type=str,
choices=[
AvailableBlockchainType.ETHEREUM.value,
AvailableBlockchainType.POLYGON.value,
],
required=True,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
crawl_parser.add_argument(
"--web3",

Wyświetl plik

@ -25,13 +25,15 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _event_to_label(blockchain_type: AvailableBlockchainType, event: Event) -> Base:
def _event_to_label(
blockchain_type: AvailableBlockchainType, event: Event, label_name=CRAWLER_LABEL
) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=CRAWLER_LABEL,
label=label_name,
label_data={
"type": "event",
"name": event.event_name,
@ -47,14 +49,16 @@ def _event_to_label(blockchain_type: AvailableBlockchainType, event: Event) -> B
def _function_call_to_label(
blockchain_type: AvailableBlockchainType, function_call: ContractFunctionCall
blockchain_type: AvailableBlockchainType,
function_call: ContractFunctionCall,
label_name=CRAWLER_LABEL,
) -> Base:
"""
Creates a label model.
"""
label_model = get_label_model(blockchain_type)
label = label_model(
label=CRAWLER_LABEL,
label=label_name,
label_data={
"type": "tx_call",
"name": function_call.function_name,
@ -73,12 +77,14 @@ def _function_call_to_label(
def get_last_labeled_block_number(
db_session: Session, blockchain_type: AvailableBlockchainType
db_session: Session,
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
) -> Optional[int]:
label_model = get_label_model(blockchain_type)
block_number = (
db_session.query(label_model.block_number)
.filter(label_model.label == CRAWLER_LABEL)
.filter(label_model.label == label_name)
.order_by(label_model.block_number.desc())
.limit(1)
.one_or_none()
@ -101,7 +107,10 @@ def commit_session(db_session: Session) -> None:
def add_events_to_session(
db_session: Session, events: List[Event], blockchain_type: AvailableBlockchainType
db_session: Session,
events: List[Event],
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
) -> None:
label_model = get_label_model(blockchain_type)
@ -110,7 +119,7 @@ def add_events_to_session(
existing_labels = (
db_session.query(label_model.transaction_hash, label_model.log_index)
.filter(
label_model.label == CRAWLER_LABEL,
label_model.label == label_name,
label_model.log_index != None,
label_model.transaction_hash.in_(events_hashes_to_save),
)
@ -128,13 +137,13 @@ def add_events_to_session(
labels_to_save = []
for event in events:
if event.transaction_hash not in existing_labels_transactions:
labels_to_save.append(_event_to_label(blockchain_type, event))
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
elif (
event.log_index not in existing_log_index_by_tx_hash[event.transaction_hash]
):
labels_to_save.append(_event_to_label(blockchain_type, event))
labels_to_save.append(_event_to_label(blockchain_type, event, label_name))
logger.info(f"Saving {len(labels_to_save)} labels to session")
logger.info(f"Saving {len(labels_to_save)} event labels to session")
db_session.add_all(labels_to_save)
@ -142,6 +151,7 @@ def add_function_calls_to_session(
db_session: Session,
function_calls: List[ContractFunctionCall],
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
) -> None:
label_model = get_label_model(blockchain_type)
@ -152,7 +162,7 @@ def add_function_calls_to_session(
existing_labels = (
db_session.query(label_model.transaction_hash)
.filter(
label_model.label == CRAWLER_LABEL,
label_model.label == label_name,
label_model.log_index == None,
label_model.transaction_hash.in_(transactions_hashes_to_save),
)

Wyświetl plik

@ -1,369 +0,0 @@
"""
A command line tool to crawl information about NFTs from various sources.
"""
import argparse
import json
import logging
import sys
import time
from datetime import datetime, timezone
from typing import Any, Dict, Optional, cast
from uuid import UUID
from bugout.app import Bugout
from bugout.journal import SearchOrder
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumBlock
from sqlalchemy.orm.session import Session
from web3 import Web3
from ..blockchain import connect
from ..data import AvailableBlockchainType
from ..publish import publish_json
from ..settings import (
NB_CONTROLLER_ACCESS_ID,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_DATA_JOURNAL_ID,
MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI,
NFT_HUMBUG_TOKEN,
)
from ..version import MOONCRAWL_VERSION
from .ethereum import (
SUMMARY_KEY_ARGS,
SUMMARY_KEY_END_BLOCK,
SUMMARY_KEY_ID,
SUMMARY_KEY_NUM_BLOCKS,
SUMMARY_KEY_START_BLOCK,
add_labels,
)
from .ethereum import summary as ethereum_summary
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
BLOCKS_PER_SUMMARY = 40
def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3:
"""
Returns a web3 client either by parsing "--web3" argument on the given arguments or by looking up
the MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI environment variable.
"""
web3_connection_string = MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI
args_web3 = vars(args).get("web3")
if args_web3 is not None:
web3_connection_string = cast(str, args_web3)
if web3_connection_string is None:
raise ValueError(
"Could not find Web3 connection information in arguments or in MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI environment variable"
)
return connect(
AvailableBlockchainType.ETHEREUM,
web3_connection_string,
access_id=args.access_id,
)
def get_latest_block_from_node(web3_client: Web3):
return web3_client.eth.block_number
def get_latest_summary_block() -> Optional[int]:
try:
bugout_client = Bugout()
query = "#crawl_type:nft_ethereum"
events = bugout_client.search(
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_DATA_JOURNAL_ID,
query,
limit=1,
timeout=30.0,
order=SearchOrder.DESCENDING,
)
if not events.results:
logger.warning("There is no summaries in Bugout")
return None
last_event = events.results[0]
content = cast(str, last_event.content)
return json.loads(content)["end_block"]
except Exception as e:
logger.error(f"Failed to get summary from Bugout : {e}")
return None
# TODO(yhtiyar):
# Labeling status should be stored in new tabel, and
# latest label should be got from there
def get_latest_nft_labeled_block(db_session: Session) -> Optional[int]:
raise NotImplementedError()
def sync_labels(db_session: Session, web3_client: Web3, start: Optional[int]) -> int:
if start is None:
logger.info(
"Syncing label start block is not given, getting it from latest nft label in db"
)
start = get_latest_nft_labeled_block(db_session)
if start is None:
logger.warning(
"Didn't find any nft labels in db, starting sync from 1st Jan 2021 before now"
)
start_date = datetime(2021, 1, 1, tzinfo=timezone.utc)
start = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.timestamp >= start_date.timestamp())
.order_by(EthereumBlock.timestamp.asc())
.limit(1)
.one()
).block_number
logger.info(f"Syncing labels, start block: {start}")
end = get_latest_block_from_node(web3_client)
if start > end:
logger.warn(f"Start block {start} is greater than latest_block {end} in db")
logger.warn("Maybe ethcrawler is not syncing or nft sync is up to date")
return start - 1
logger.info(f"Labeling blocks {start}-{end}")
add_labels(web3_client, db_session, start, end)
return end
def sync_summaries(
db_session: Session,
start: Optional[int],
end: int,
) -> int:
if start is None:
logger.info(
"Syncing summary start block is not given, getting it from latest nft summary from Bugout"
)
start = get_latest_summary_block()
if start is not None:
start += 1
else:
logger.info(
"There is no entry in Bugout, starting to create summaries from 1st Jan 2021"
)
start_date = datetime(2021, 1, 1, tzinfo=timezone.utc)
start = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.timestamp >= start_date.timestamp())
.order_by(EthereumBlock.timestamp.asc())
.limit(1)
.one()
).block_number
logger.info(f"Syncing summaries start_block: {start}")
batch_end = start + BLOCKS_PER_SUMMARY - 1
if batch_end > end:
logger.warn("Syncing summaries is not required")
while batch_end <= end:
summary_result = ethereum_summary(db_session, start, batch_end)
push_summary(summary_result)
logger.info(f"Pushed summary of blocks : {start}-{batch_end}")
start = batch_end + 1
batch_end += BLOCKS_PER_SUMMARY
if start == end:
return end
else:
return start - 1
def ethereum_sync_handler(args: argparse.Namespace) -> None:
web3_client = web3_client_from_cli_or_env(args)
with yield_db_session_ctx() as db_session:
logger.info("Initial labeling:")
last_labeled = sync_labels(db_session, web3_client, args.start)
logger.info("Initial summary creation:")
last_summary_created = sync_summaries(
db_session,
args.start,
last_labeled,
)
while True:
logger.info("Syncing")
last_labeled = sync_labels(db_session, web3_client, last_labeled + 1)
last_summary_created = sync_summaries(
db_session,
last_summary_created + 1,
last_labeled,
)
sleep_time = 10 * 60
logger.info(f"Going to sleep for {sleep_time}s")
time.sleep(sleep_time)
def ethereum_label_handler(args: argparse.Namespace) -> None:
web3_client = web3_client_from_cli_or_env(args)
with yield_db_session_ctx() as db_session:
add_labels(web3_client, db_session, args.start, args.end, args.address)
def push_summary(result: Dict[str, Any], humbug_token: Optional[str] = None):
if humbug_token is None:
humbug_token = NFT_HUMBUG_TOKEN
title = (
f"NFT activity on the Ethereum blockchain: crawled at: {result['crawled_at'] })"
)
tags = [
f"crawler_version:{MOONCRAWL_VERSION}",
f"summary_id:{result.get(SUMMARY_KEY_ID, '')}",
f"start_block:{result.get(SUMMARY_KEY_START_BLOCK)}",
f"end_block:{result.get(SUMMARY_KEY_END_BLOCK)}",
]
# Add an "error:missing_blocks" tag for all summaries in which the number of blocks processed
# is not equal to the expected number of blocks.
args = result.get(SUMMARY_KEY_ARGS, {})
args_start = args.get("start")
args_end = args.get("end")
expected_num_blocks = None
if args_start is not None and args_end is not None:
expected_num_blocks = cast(int, args_end) - cast(int, args_start) + 1
num_blocks = result.get(SUMMARY_KEY_NUM_BLOCKS)
if (
expected_num_blocks is None
or num_blocks is None
or num_blocks != expected_num_blocks
):
tags.append("error:missing_blocks")
# TODO(yhtyyar, zomglings): Also add checkpoints in database for nft labelling. This way, we can
# add an "error:stale" tag to summaries generated before nft labels were processed for the
# block range in the summary.
created_at = result.get("date_range", {}).get("end_time")
publish_json(
"nft_ethereum",
humbug_token,
title,
result,
tags=tags,
wait=True,
created_at=created_at,
)
def ethereum_summary_handler(args: argparse.Namespace) -> None:
with yield_db_session_ctx() as db_session:
result = ethereum_summary(db_session, args.start, args.end)
push_summary(result, args.humbug)
with args.outfile as ofp:
json.dump(result, ofp)
def main() -> None:
parser = argparse.ArgumentParser(description="Moonstream NFT crawlers")
parser.set_defaults(func=lambda _: parser.print_help())
parser.add_argument(
"--access-id",
default=NB_CONTROLLER_ACCESS_ID,
type=UUID,
help="User access ID",
)
subcommands = parser.add_subparsers(description="Subcommands")
parser_ethereum = subcommands.add_parser(
"ethereum",
description="Collect information about NFTs from Ethereum blockchains",
)
parser_ethereum.set_defaults(func=lambda _: parser_ethereum.print_help())
subparsers_ethereum = parser_ethereum.add_subparsers()
parser_ethereum_label = subparsers_ethereum.add_parser(
"label",
description="Label addresses and transactions in databse using crawled NFT transfer information",
)
parser_ethereum_label.add_argument(
"-s",
"--start",
type=int,
default=None,
help="Starting block number (inclusive if block available)",
)
parser_ethereum_label.add_argument(
"-e",
"--end",
type=int,
default=None,
help="Ending block number (inclusive if block available)",
)
parser_ethereum_label.add_argument(
"-a",
"--address",
type=str,
default=None,
help="(Optional) NFT contract address that you want to limit the crawl to, e.g. 0x06012c8cf97BEaD5deAe237070F9587f8E7A266d for CryptoKitties.",
)
parser_ethereum_label.add_argument(
"--web3",
type=str,
default=None,
help="(Optional) Web3 connection string. If not provided, uses the value specified by MOONSTREAM_ETHEREUM_WEB3_PROVIDER_URI environment variable.",
)
parser_ethereum_label.set_defaults(func=ethereum_label_handler)
parser_ethereum_summary = subparsers_ethereum.add_parser(
"summary", description="Generate Ethereum NFT summary"
)
parser_ethereum_summary.add_argument(
"-s",
"--start",
type=int,
required=True,
help=f"Start block for window to calculate NFT statistics",
)
parser_ethereum_summary.add_argument(
"-e",
"--end",
type=int,
required=True,
help=f"End block for window to calculate NFT statistics",
)
parser_ethereum_summary.add_argument(
"-o",
"--outfile",
type=argparse.FileType("w"),
default=sys.stdout,
help="Optional file to write output to. By default, prints to stdout.",
)
parser_ethereum_summary.add_argument(
"--humbug",
default=None,
help=(
"If you would like to write this data to a Moonstream journal, please provide a Humbug "
"token for that here. (This argument overrides any value set in the "
"MOONSTREAM_HUMBUG_TOKEN environment variable)"
),
)
parser_ethereum_summary.set_defaults(func=ethereum_summary_handler)
parser_ethereum_sync = subparsers_ethereum.add_parser(
"synchronize",
description="Label addresses and transactions in databse using crawled NFT transfer information, sync mode",
)
parser_ethereum_sync.add_argument(
"-s",
"--start",
type=int,
required=False,
help="Starting block number (inclusive if block available)",
)
parser_ethereum_sync.set_defaults(func=ethereum_sync_handler)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

Wyświetl plik

@ -1,646 +0,0 @@
import json
import logging
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set, Tuple, cast
from eth_typing.encoding import HexStr
from hexbytes.main import HexBytes
from moonstreamdb.models import EthereumBlock, EthereumLabel, EthereumTransaction
from sqlalchemy import and_, func, text
from sqlalchemy.orm import Query, Session
from tqdm import tqdm
from web3 import Web3
from web3._utils.events import get_event_data
from web3.types import FilterParams, LogReceipt
from ..reporter import reporter
# Default length (in blocks) of an Ethereum NFT crawl.
DEFAULT_CRAWL_LENGTH = 100
NFT_LABEL = "erc721"
MINT_LABEL = "nft_mint"
TRANSFER_LABEL = "nft_transfer"
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Summary keys
SUMMARY_KEY_ID = "summary_id"
SUMMARY_KEY_ARGS = "args"
SUMMARY_KEY_START_BLOCK = "start_block"
SUMMARY_KEY_END_BLOCK = "end_block"
SUMMARY_KEY_NUM_BLOCKS = "num_blocks"
SUMMARY_KEY_NUM_TRANSACTIONS = "num_transactions"
SUMMARY_KEY_TOTAL_VALUE = "total_value"
SUMMARY_KEY_NFT_TRANSFERS = "nft_transfers"
SUMMARY_KEY_NFT_TRANSFER_VALUE = "nft_transfer_value"
SUMMARY_KEY_NFT_MINTS = "nft_mints"
SUMMARY_KEY_NFT_PURCHASERS = "nft_owners"
SUMMARY_KEY_NFT_MINTERS = "nft_minters"
SUMMARY_KEYS = [
SUMMARY_KEY_ID,
SUMMARY_KEY_ARGS,
SUMMARY_KEY_START_BLOCK,
SUMMARY_KEY_END_BLOCK,
SUMMARY_KEY_NUM_BLOCKS,
SUMMARY_KEY_NUM_TRANSACTIONS,
SUMMARY_KEY_TOTAL_VALUE,
SUMMARY_KEY_NFT_TRANSFERS,
SUMMARY_KEY_NFT_TRANSFER_VALUE,
SUMMARY_KEY_NFT_MINTS,
SUMMARY_KEY_NFT_PURCHASERS,
SUMMARY_KEY_NFT_MINTERS,
]
# The erc721 standart requieres that Transfer event is indexed for all arguments
# That is how we get distinguished from erc20 transfer events
# Second abi is for old NFT's like crypto kitties
erc721_transfer_event_abis = [
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "from", "type": "address"},
{"indexed": True, "name": "to", "type": "address"},
{"indexed": True, "name": "tokenId", "type": "uint256"},
],
"name": "Transfer",
"type": "event",
},
{
"anonymous": False,
"inputs": [
{"indexed": False, "name": "from", "type": "address"},
{"indexed": False, "name": "to", "type": "address"},
{"indexed": False, "name": "tokenId", "type": "uint256"},
],
"name": "Transfer",
"type": "event",
},
]
erc721_functions_abi = [
{
"inputs": [{"internalType": "address", "name": "owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}],
"payable": False,
"stateMutability": "view",
"type": "function",
"constant": True,
},
{
"inputs": [],
"name": "name",
"outputs": [{"internalType": "string", "name": "", "type": "string"}],
"stateMutability": "view",
"type": "function",
"constant": True,
},
{
"inputs": [{"internalType": "uint256", "name": "tokenId", "type": "uint256"}],
"name": "ownerOf",
"outputs": [{"internalType": "address", "name": "", "type": "address"}],
"payable": False,
"stateMutability": "view",
"type": "function",
"constant": True,
},
{
"inputs": [],
"name": "symbol",
"outputs": [{"internalType": "string", "name": "", "type": "string"}],
"stateMutability": "view",
"type": "function",
"constant": True,
},
{
"inputs": [],
"name": "totalSupply",
"outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}],
"stateMutability": "view",
"type": "function",
"constant": True,
},
]
@dataclass
class NFTContract:
address: str
name: Optional[str] = None
symbol: Optional[str] = None
def get_erc721_contract_info(w3: Web3, address: str) -> NFTContract:
contract = w3.eth.contract(
address=w3.toChecksumAddress(address), abi=erc721_functions_abi
)
name: Optional[str] = None
try:
name = contract.functions.name().call()
except:
logger.error(f"Could not get name for potential NFT contract: {address}")
symbol: Optional[str] = None
try:
symbol = contract.functions.symbol().call()
except:
logger.error(f"Could not get symbol for potential NFT contract: {address}")
return NFTContract(
address=address,
name=name,
symbol=symbol,
)
# SHA3 hash of the string "Transfer(address,address,uint256)"
TRANSFER_EVENT_SIGNATURE = HexBytes(
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
)
@dataclass
class NFTTransferRaw:
contract_address: str
transfer_from: str
transfer_to: str
tokenId: int
transfer_tx: HexBytes
@dataclass
class NFTTransfer:
contract_address: str
transfer_from: str
transfer_to: str
tokenId: int
transfer_tx: str
value: Optional[int] = None
is_mint: bool = False
def decode_nft_transfer_data(w3: Web3, log: LogReceipt) -> Optional[NFTTransferRaw]:
for abi in erc721_transfer_event_abis:
try:
transfer_data = get_event_data(w3.codec, abi, log)
nft_transfer = NFTTransferRaw(
contract_address=transfer_data["address"],
transfer_from=transfer_data["args"]["from"],
transfer_to=transfer_data["args"]["to"],
tokenId=transfer_data["args"]["tokenId"],
transfer_tx=transfer_data["transactionHash"],
)
return nft_transfer
except:
continue
return None
def get_nft_transfers(
w3: Web3,
from_block: Optional[int] = None,
to_block: Optional[int] = None,
contract_address: Optional[str] = None,
) -> List[NFTTransfer]:
filter_params = FilterParams(topics=[cast(HexStr, TRANSFER_EVENT_SIGNATURE.hex())])
if from_block is not None:
filter_params["fromBlock"] = from_block
if to_block is not None:
filter_params["toBlock"] = to_block
if contract_address is not None:
filter_params["address"] = w3.toChecksumAddress(contract_address)
logs = w3.eth.get_logs(filter_params)
nft_transfers: List[NFTTransfer] = []
for log in tqdm(logs, desc=f"Processing logs for blocks {from_block}-{to_block}"):
nft_transfer = decode_nft_transfer_data(w3, log)
if nft_transfer is not None:
kwargs = {
**asdict(nft_transfer),
"transfer_tx": nft_transfer.transfer_tx.hex(),
"is_mint": nft_transfer.transfer_from
== "0x0000000000000000000000000000000000000000",
}
parsed_transfer = NFTTransfer(**kwargs) # type: ignore
nft_transfers.append(parsed_transfer)
return nft_transfers
def get_block_bounds(
w3: Web3, from_block: Optional[int] = None, to_block: Optional[int] = None
) -> Tuple[int, int]:
"""
Returns starting and ending blocks for an "nft ethereum" crawl subject to the following rules:
1. Neither start nor end can be None.
2. If both from_block and to_block are None, then start = end - DEFAULT_CRAWL_LENGTH + 1
"""
end = to_block
if end is None:
end = w3.eth.get_block_number()
start = from_block
if start is None:
start = end - DEFAULT_CRAWL_LENGTH + 1
return start, end
def label_erc721_addresses(w3: Web3, db_session: Session, addresses: List[str]) -> None:
labels: List[EthereumLabel] = []
for address in addresses:
try:
contract_info = get_erc721_contract_info(w3, address)
# Postgres cannot store the following unicode code point in a string: \u0000
# Therefore, we replace that code point with the empty string to avoid errors:
# https://stackoverflow.com/a/31672314
contract_name: Optional[str] = None
if contract_info.name is not None:
contract_name = contract_info.name.replace("\\u0000", "").replace(
"\x00", ""
)
contract_symbol: Optional[str] = None
if contract_info.symbol is not None:
contract_symbol = contract_info.symbol.replace("\\u0000", "").replace(
"\x00", ""
)
labels.append(
EthereumLabel(
address=address,
label=NFT_LABEL,
label_data={
"name": contract_name,
"symbol": contract_symbol,
},
)
)
except Exception as e:
logger.error(f"Failed to get metadata of contract {address}")
logger.error(e)
try:
db_session.bulk_save_objects(labels)
db_session.commit()
except Exception as e:
db_session.rollback()
logger.error(f"Failed to save erc721 labels to db:\n{e}")
raise e
def label_key(label: EthereumLabel) -> Tuple[str, int, int, str, str]:
return (
label.transaction_hash,
label.address,
label.label_data["tokenId"],
label.label_data["from"],
label.label_data["to"],
)
def label_transfers(
db_session: Session, transfers: List[NFTTransfer], addresses: Set[str]
) -> None:
"""
Adds "nft_mint" or "nft_transfer" to the (transaction, address) pair represented by each of the
given NFTTransfer objects.
"""
transaction_hashes: List[str] = []
labels: List[EthereumLabel] = []
for transfer in transfers:
transaction_hash = transfer.transfer_tx
transaction_hashes.append(transaction_hash)
label = MINT_LABEL if transfer.is_mint else TRANSFER_LABEL
row = EthereumLabel(
address=transfer.contract_address,
transaction_hash=transaction_hash,
label=label,
label_data={
"tokenId": transfer.tokenId,
"from": transfer.transfer_from,
"to": transfer.transfer_to,
},
)
labels.append(row)
existing_labels = (
db_session.query(EthereumLabel)
.filter(EthereumLabel.address.in_(addresses))
.filter(EthereumLabel.transaction_hash.in_(transaction_hashes))
).all()
existing_label_keys = {label_key(label) for label in existing_labels}
new_labels = [
label for label in labels if label_key(label) not in existing_label_keys
]
try:
db_session.bulk_save_objects(new_labels)
db_session.commit()
except Exception as e:
db_session.rollback()
logger.error("Could not write transfer/mint labels to database")
logger.error(e)
raise e
def add_labels(
w3: Web3,
db_session: Session,
from_block: Optional[int] = None,
to_block: Optional[int] = None,
contract_address: Optional[str] = None,
batch_size: int = 100,
) -> None:
"""
Crawls blocks between from_block and to_block checking for NFT mints and transfers.
For each mint/transfer, if the contract address involved in the operation has not already been
added to the ethereum_addresses table, this method adds it and labels the address with the NFT
collection metadata.
It also adds mint/transfer labels to each (transaction, contract address) pair describing the
NFT operation they represent.
## NFT collection metadata labels
Label has type "erc721".
Label data:
{
"name": "<name of contract>",
"symbol": "<symbol of contract>",
"totalSupply": "<totalSupply of contract>"
}
## Mint and transfer labels
Adds labels to the database for each transaction that involved an NFT transfer. Adds the contract
address in the address_id column of ethereum_labels.
Labels (transaction, contract address) pair as:
- "nft_mint" if the transaction minted a token on the NFT contract
- "nft_transfer" if the transaction transferred a token on the NFT contract
Label data will always be of the form:
{
"tokenId": "<ID of token minted/transferred on NFT contract>",
"from": "<previous owner address>",
"to": "<new owner address>"
}
Arguments:
- w3: Web3 client
- db_session: Connection to Postgres database with moonstreamdb schema
- from_block and to_block: Blocks to crawl
- address: Optional contract address representing an NFT collection to restrict the crawl to
- batch_size: Number of mint/transfer transactions to label at a time (per database transaction)
"""
assert batch_size > 0, f"Batch size must be positive (received {batch_size})"
start, end = get_block_bounds(w3, from_block, to_block)
batch_start = start
batch_end = min(start + batch_size - 1, end)
# TODO(yhtiyar): Make address_ids as global cache to fast up crawling
# address_ids: Dict[str, int] = {}
# For now quitting this idea because some contracts have unicode escapes
# in their names, and global cache will fuck up not only that batch labeling
# but later ones as well
pbar = tqdm(total=(end - start + 1))
pbar.set_description(f"Labeling blocks {start}-{end}")
while batch_start <= batch_end:
job = get_nft_transfers(
w3,
from_block=batch_start,
to_block=batch_end,
contract_address=contract_address,
)
contract_addresses = {transfer.contract_address for transfer in job}
labelled_address = [
label.address
for label in (
db_session.query(EthereumLabel)
.filter(EthereumLabel.label == NFT_LABEL)
.filter(EthereumLabel.address.in_(contract_addresses))
.all()
)
]
unlabelled_address = [
address for address in contract_addresses if address not in labelled_address
]
# Add 'erc721' labels
try:
label_erc721_addresses(w3, db_session, unlabelled_address)
except Exception as e:
reporter.error_report(
e,
[
"nft_crawler",
"erc721_label",
f"batch_start:{batch_start}",
f"batch_end:{batch_end}",
],
)
# Add mint/transfer labels to (transaction, contract_address) pairs
try:
label_transfers(db_session, job, contract_addresses)
except Exception as e:
reporter.error_report(
e,
[
"nft_crawler",
"nft_transfer",
f"batch_start:{batch_start}",
f"batch_end:{batch_end}",
],
)
# Update batch at end of iteration
pbar.update(batch_end - batch_start + 1)
batch_start = batch_end + 1
batch_end = min(batch_end + batch_size, end)
pbar.close()
def block_bounded_summary(
db_session: Session,
start_block: int,
end_block: int,
) -> Dict[str, Any]:
"""
Produces a summary of Ethereum NFT activity between the given start_time and end_time (inclusive).
"""
summary_id = f"nft-ethereum-start-{start_block}-end-{end_block}"
block_filter = and_(
EthereumBlock.block_number >= start_block,
EthereumBlock.block_number <= end_block,
)
transactions_query = (
db_session.query(EthereumTransaction)
.join(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(block_filter)
)
def nft_query(label: str) -> Query:
query = (
db_session.query(
EthereumLabel.label,
EthereumLabel.label_data,
EthereumLabel.address,
EthereumTransaction.hash,
EthereumTransaction.value,
EthereumBlock.block_number,
EthereumBlock.timestamp,
)
.join(
EthereumTransaction,
EthereumLabel.transaction_hash == EthereumTransaction.hash,
)
.join(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(block_filter)
.filter(EthereumLabel.label == label)
)
return query
transfer_query = nft_query(TRANSFER_LABEL)
mint_query = nft_query(MINT_LABEL)
def holder_query(label: str) -> Query:
query = (
db_session.query(
EthereumLabel.address.label("address"),
EthereumLabel.label_data["to"].astext.label("owner_address"),
EthereumLabel.label_data["tokenId"].astext.label("token_id"),
EthereumTransaction.block_number.label("block_number"),
EthereumTransaction.transaction_index.label("transaction_index"),
EthereumTransaction.value.label("transfer_value"),
)
.join(
EthereumTransaction,
EthereumLabel.transaction_hash == EthereumTransaction.hash,
)
.join(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(EthereumLabel.label == label)
.filter(block_filter)
.order_by(
# Without "transfer_value" and "owner_address" as sort keys, the final distinct query
# does not seem to be deterministic.
# Maybe relevant Stackoverflow post: https://stackoverflow.com/a/59410440
text(
"address, token_id, block_number desc, transaction_index desc, transfer_value, owner_address"
)
)
.distinct("address", "token_id")
)
return query
purchaser_query = holder_query(TRANSFER_LABEL)
minter_query = holder_query(MINT_LABEL)
blocks = (
db_session.query(EthereumBlock)
.filter(block_filter)
.order_by(EthereumBlock.block_number.asc())
)
first_block = None
last_block = None
num_blocks = 0
for block in blocks:
if num_blocks == 0:
min_block = block
max_block = block
num_blocks += 1
start_time = None
end_time = None
if min_block is not None:
first_block = min_block.block_number
start_time = datetime.fromtimestamp(
min_block.timestamp, timezone.utc
).isoformat()
if max_block is not None:
last_block = max_block.block_number
end_time = datetime.fromtimestamp(max_block.timestamp, timezone.utc).isoformat()
num_transactions = transactions_query.distinct(EthereumTransaction.hash).count()
num_transfers = transfer_query.distinct(EthereumTransaction.hash).count()
total_value = db_session.query(
func.sum(transactions_query.subquery().c.value)
).scalar()
transfer_value = db_session.query(
func.sum(transfer_query.subquery().c.value)
).scalar()
num_minted = mint_query.count()
num_purchasers = (
db_session.query(purchaser_query.subquery())
.distinct(text("owner_address"))
.count()
)
num_minters = (
db_session.query(minter_query.subquery())
.distinct(text("owner_address"))
.count()
)
result = {
"date_range": {
"start_time": start_time,
"include_start": True,
"end_time": end_time,
"include_end": True,
},
SUMMARY_KEY_ID: summary_id,
SUMMARY_KEY_ARGS: {"start": start_block, "end": end_block},
SUMMARY_KEY_START_BLOCK: first_block,
SUMMARY_KEY_END_BLOCK: last_block,
SUMMARY_KEY_NUM_BLOCKS: num_blocks,
SUMMARY_KEY_NUM_TRANSACTIONS: f"{num_transactions}",
SUMMARY_KEY_TOTAL_VALUE: f"{total_value}",
SUMMARY_KEY_NFT_TRANSFERS: f"{num_transfers}",
SUMMARY_KEY_NFT_TRANSFER_VALUE: f"{transfer_value}",
SUMMARY_KEY_NFT_MINTS: f"{num_minted}",
SUMMARY_KEY_NFT_PURCHASERS: f"{num_purchasers}",
SUMMARY_KEY_NFT_MINTERS: f"{num_minters}",
}
return result
def summary(db_session: Session, start_block: int, end_block: int) -> Dict[str, Any]:
"""
Produces a summary of all Ethereum NFT activity:
From 1 hour before end_time to end_time
"""
result = block_bounded_summary(db_session, start_block, end_block)
result["crawled_at"] = datetime.utcnow().isoformat()
return result

Wyświetl plik

@ -1,10 +1,9 @@
import os
from typing import cast, Optional
from typing import Optional, cast
from uuid import UUID
from bugout.app import Bugout
BUGOUT_RESOURCE_TYPE_SUBSCRIPTION = "subscription"
BUGOUT_RESOURCE_TYPE_DASHBOARD = "dashboards"

Wyświetl plik

@ -19,18 +19,14 @@ from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.operators import in_op
from web3 import Web3
from ..blockchain import (
connect,
get_label_model,
get_transaction_model,
)
from ..blockchain import connect, get_label_model, get_transaction_model
from ..data import AvailableBlockchainType
from ..reporter import reporter
from ..settings import (
CRAWLER_LABEL,
NB_CONTROLLER_ACCESS_ID,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX,
NB_CONTROLLER_ACCESS_ID,
)
from ..settings import bugout_client as bc

Wyświetl plik

@ -58,6 +58,7 @@ setup(
"esd=mooncrawl.esd:main",
"etherscan=mooncrawl.etherscan:main",
"identity=mooncrawl.identity:main",
"generic-crawler=mooncrawl.generic_crawler.cli:main",
"moonworm-crawler=mooncrawl.moonworm_crawler.cli:main",
"nft=mooncrawl.nft.cli:main",
"statistics=mooncrawl.stats_worker.dashboard:main",

Wyświetl plik

@ -1,16 +1,18 @@
import argparse
import contextlib
from enum import Enum
import logging
import os
import sqlite3
from shutil import copyfile
from typing import Optional
from typing import Optional, Union
from mooncrawl.data import AvailableBlockchainType
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumLabel, PolygonLabel
from .enrich import EthereumBatchloader, enrich
from .data import EventType, event_types, nft_event, BlockBounds
from .datastore import setup_database, import_data, filter_data
from .data import BlockBounds
from .datastore import setup_database, get_last_saved_block
from .derive import (
current_owners,
current_market_values,
@ -22,7 +24,7 @@ from .derive import (
transfer_holding_times,
transfers_mints_connection_table,
)
from .materialize import create_dataset
from .materialize import crawl_erc721_labels
logging.basicConfig(level=logging.INFO)
@ -42,94 +44,60 @@ derive_functions = {
}
class Blockchain(Enum):
ETHEREUM = "ethereum"
POLYGON = "polygon"
def handle_initdb(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.datastore)) as conn:
setup_database(conn)
def handle_import_data(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
with contextlib.closing(
sqlite3.connect(args.target)
) as target_conn, contextlib.closing(sqlite3.connect(args.source)) as source_conn:
import_data(target_conn, source_conn, event_type, args.batch_size)
def handle_filter_data(args: argparse.Namespace) -> None:
with contextlib.closing(sqlite3.connect(args.source)) as source_conn:
if args.target == args.source and args.source is not None:
sqlite_path = f"{args.target}.dump"
else:
sqlite_path = args.target
print(f"Creating new database:{sqlite_path}")
copyfile(args.source, sqlite_path)
# do connection
with contextlib.closing(sqlite3.connect(sqlite_path)) as source_conn:
print("Start filtering")
filter_data(
source_conn,
start_time=args.start_time,
end_time=args.end_time,
)
print("Filtering end.")
for index, function_name in enumerate(derive_functions.keys()):
print(
f"Derive process {function_name} {index+1}/{len(derive_functions.keys())}"
)
derive_functions[function_name](source_conn)
# Apply derive to new data
def _get_label_model(
blockchain: AvailableBlockchainType,
) -> Union[EthereumLabel, PolygonLabel]:
if blockchain == AvailableBlockchainType.ETHEREUM:
return EthereumLabel
elif blockchain == AvailableBlockchainType.POLYGON:
return PolygonLabel
else:
raise ValueError(f"Unknown blockchain: {blockchain}")
def handle_materialize(args: argparse.Namespace) -> None:
event_type = nft_event(args.type)
if args.start is None or args.end is None:
raise ValueError("Set --end --start")
bounds: Optional[BlockBounds] = None
if args.start is not None:
bounds = BlockBounds(starting_block=args.start, ending_block=args.end)
elif args.end is not None:
raise ValueError("You cannot set --end unless you also set --start")
batch_loader = EthereumBatchloader(jsonrpc_url=args.jsonrpc)
logger.info(f"Materializing NFT events to datastore: {args.datastore}")
logger.info(f"Block bounds: {bounds}")
blockchain_type = AvailableBlockchainType(args.blockchain)
label_model = _get_label_model(blockchain_type)
with yield_db_session_ctx() as db_session, contextlib.closing(
sqlite3.connect(args.datastore)
) as moonstream_datastore:
create_dataset(
moonstream_datastore,
last_saved_block = get_last_saved_block(moonstream_datastore, args.blockchain)
logger.info(f"Last saved block: {last_saved_block}")
if last_saved_block and last_saved_block >= bounds.starting_block:
logger.info(
f"Skipping blocks {bounds.starting_block}-{last_saved_block}, starting from {last_saved_block + 1}"
)
bounds.starting_block = last_saved_block + 1
crawl_erc721_labels(
db_session,
event_type,
bounds,
args.batch_size,
)
def handle_enrich(args: argparse.Namespace) -> None:
batch_loader = EthereumBatchloader(jsonrpc_url=args.jsonrpc)
logger.info(f"Enriching NFT events in datastore: {args.datastore}")
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
enrich(
moonstream_datastore,
EventType.TRANSFER,
batch_loader,
args.batch_size,
)
enrich(
moonstream_datastore,
EventType.MINT,
batch_loader,
args.batch_size,
label_model,
start_block=bounds.starting_block,
end_block=bounds.ending_block,
batch_size=args.batch_size,
)
@ -186,18 +154,7 @@ def main() -> None:
required=True,
help="Path to SQLite database representing the dataset",
)
parser_materialize.add_argument(
"--jsonrpc",
default=default_web3_provider,
type=str,
help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
)
parser_materialize.add_argument(
"-t",
"--type",
choices=event_types,
help="Type of event to materialize intermediate data for",
)
parser_materialize.add_argument(
"--start", type=int, default=None, help="Starting block number"
)
@ -211,6 +168,13 @@ def main() -> None:
default=1000,
help="Number of events to process per batch",
)
parser_materialize.add_argument(
"--blockchain",
type=str,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
parser_materialize.set_defaults(func=handle_materialize)
parser_derive = subcommands.add_parser(
@ -231,86 +195,6 @@ def main() -> None:
)
parser_derive.set_defaults(func=handle_derive)
parser_import_data = subcommands.add_parser(
"import-data",
description="Import data from another source NFTs dataset datastore. This operation is performed per table, and replaces the existing table in the target datastore.",
)
parser_import_data.add_argument(
"--target",
required=True,
help="Datastore into which you want to import data",
)
parser_import_data.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
)
parser_import_data.add_argument(
"--type",
required=True,
choices=event_types,
help="Type of data you would like to import from source to target",
)
parser_import_data.add_argument(
"-N",
"--batch-size",
type=int,
default=10000,
help="Batch size for database commits into target datastore.",
)
parser_import_data.set_defaults(func=handle_import_data)
# Create dump of filtered data
parser_filtered_copy = subcommands.add_parser(
"filter-data",
description="Create copy of database with applied filters.",
)
parser_filtered_copy.add_argument(
"--target",
required=True,
help="Datastore into which you want to import data",
)
parser_filtered_copy.add_argument(
"--source", required=True, help="Datastore from which you want to import data"
)
parser_filtered_copy.add_argument(
"--start-time",
required=False,
type=int,
help="Start timestamp.",
)
parser_filtered_copy.add_argument(
"--end-time",
required=False,
type=int,
help="End timestamp.",
)
parser_filtered_copy.set_defaults(func=handle_filter_data)
parser_enrich = subcommands.add_parser(
"enrich", description="enrich dataset from geth node"
)
parser_enrich.add_argument(
"-d",
"--datastore",
required=True,
help="Path to SQLite database representing the dataset",
)
parser_enrich.add_argument(
"--jsonrpc",
default=default_web3_provider,
type=str,
help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
)
parser_enrich.add_argument(
"-n",
"--batch-size",
type=int,
default=1000,
help="Number of events to process per batch",
)
parser_enrich.set_defaults(func=handle_enrich)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -4,7 +4,7 @@ Data structures used in (and as part of the maintenance of) the Moonstream NFTs
from dataclasses import dataclass
from enum import Enum
from os import name
from typing import Optional
from typing import Any, Dict, Optional, Union
@dataclass
@ -13,38 +13,63 @@ class BlockBounds:
ending_block: Optional[int] = None
class EventType(Enum):
TRANSFER = "nft_transfer"
MINT = "nft_mint"
ERC721 = "erc721"
event_types = {event_type.value: event_type for event_type in EventType}
def nft_event(raw_event: str) -> EventType:
try:
return event_types[raw_event]
except KeyError:
raise ValueError(f"Unknown nft event type: {raw_event}")
@dataclass
class NftTransaction:
blockchain_type: str
block_number: int
block_timestamp: int
transaction_hash: str
contract_address: str
caller_address: str
function_name: str
function_args: Union[Dict[str, Any], str]
gas_used: int
gas_price: int
value: int
status: int
max_fee_per_gas: Optional[int] = None
max_priority_fee_per_gas: Optional[int] = None
@dataclass
class NFTEvent:
event_id: str
event_type: EventType
nft_address: str
class NftApprovalEvent:
blockchain_type: str
token_address: str
owner: str
approved: str
token_id: str
transaction_hash: str
log_index: int
@dataclass
class NftApprovalForAllEvent:
blockchain_type: str
token_address: str
owner: str
approved: str
operator: str
transaction_hash: str
log_index: int
@dataclass
class NftTransferEvent:
blockchain_type: str
token_address: str
from_address: str
to_address: str
token_id: str
transaction_hash: str
value: Optional[int] = None
block_number: Optional[int] = None
timestamp: Optional[int] = None
log_index: int
@dataclass
class NFTMetadata:
address: str
name: str
symbol: str
class Erc20TransferEvent:
blockchain_type: str
token_address: str
from_address: str
to_address: str
value: int
transaction_hash: str
log_index: int

Wyświetl plik

@ -11,40 +11,28 @@ from tqdm import tqdm
from .datastore import event_tables, EventType
# TODO(zomglings): Make it so that table names are parametrized by importable variables. The way
# things are now, we have to be very careful if we ever rename a table in our dataset. We should
# also propagate the name change here.
NFTS = "nfts"
MINTS = event_tables[EventType.MINT]
TRANSFERS = event_tables[EventType.TRANSFER]
CURRENT_OWNERS = "current_owners"
CURRENT_MARKET_VALUES = "current_market_values"
TRANSFER_STATISTICS_BY_ADDRESS = "transfer_statistics_by_address"
MINT_HOLDING_TIMES = "mint_holding_times"
TRANSFER_HOLDING_TIMES = "transfer_holding_times"
OWNERSHIP_TRANSITIONS = "ownership_transitions"
# TODO(yhtiyar): Add this table to the dataset
CONTRACTS = "contracts"
TRANSFERS = "transfers"
# CURRENT_OWNERS = "current_owners"
# CURRENT_MARKET_VALUES = "current_market_values"
# TRANSFER_STATISTICS_BY_ADDRESS = "transfer_statistics_by_address"
# MINT_HOLDING_TIMES = "mint_holding_times"
# TRANSFER_HOLDING_TIMES = "transfer_holding_times"
# OWNERSHIP_TRANSITIONS = "ownership_transitions"
AVAILABLE_DATAFRAMES = {
NFTS: """Describes the NFT contracts represented in this dataset, with a name and symbol if they were available at time of crawl.
CONTRACTS: """Describes the NFT and ERC20 contracts represented in this dataset, with a type, name, symbol, decimals (for erc20) if they were available at time of crawl.
Columns:
1. address: The Ethereum address of the NFT contract.
2. name: The name of the collection of NFTs that the contract represents.
3. symbol: The symbol of the collection of NFTs that the contract represents.
""",
MINTS: """All token mint events crawled in this dataset.
Columns:
1. event_id: A unique event ID associated with the event.
2. transaction_hash: The hash of the transaction which triggered the event.
3. block_number: The transaction block in which the transaction was mined.
4. nft_address: The address of the NFT collection containing the minted token.
5. token_id: The ID of the token that was minted.
6. from_address: The "from" address for the transfer event. For a mint, this should be the 0 address: 0x0000000000000000000000000000000000000000.
7. to_address: The "to" address for the transfer event. This represents the owner of the freshly minted token.
8. transaction_value: The amount of WEI that were sent with the transaction in which the token was minted.
9. timestamp: The time at which the mint operation was mined into the blockchain (this is the timestamp for the mined block).
""",
# TODO (yhtiyar): update description for the contracts
TRANSFERS: """All token transfer events crawled in this dataset.
Columns:
@ -58,14 +46,14 @@ Columns:
8. transaction_value: The amount of WEI that were sent with the transaction in which the token was transferred.
9. timestamp: The time at which the transfer operation was mined into the blockchain (this is the timestamp for the mined block).
""",
CURRENT_OWNERS: f"""This table is derived from the {NFTS}, {MINTS}, and {TRANSFERS} tables. It represents the current owner of each token in the dataset.
CURRENT_OWNERS: f"""This table is derived from the {CONTRACTS}, {MINTS}, and {TRANSFERS} tables. It represents the current owner of each token in the dataset.
Columns:
1. nft_address: The address of the NFT collection containing the token whose ownership we are denoting.
2. token_id: The ID of the token (inside the collection) whose ownership we are denoting.
3. owner: The address that owned the token at the time of construction of this dataset.
""",
CURRENT_MARKET_VALUES: f"""This table is derived from the {NFTS}, {MINTS}, and {TRANSFERS} tables. It represents the current market value (in WEI) of each token in the dataset.
CURRENT_MARKET_VALUES: f"""This table is derived from the {CONTRACTS}, {MINTS}, and {TRANSFERS} tables. It represents the current market value (in WEI) of each token in the dataset.
Columns:
1. nft_address: The address of the NFT collection containing the token whose market value we are denoting.
@ -76,7 +64,7 @@ For this dataset, we estimate the market value as the last non-zero transaction
This estimate may be inaccurate for some transfers (e.g. multiple token transfers made by an escrow contract in a single transaction)
but ought to be reasonably accurate for a large majority of tokens.
""",
TRANSFER_STATISTICS_BY_ADDRESS: f"""This table is derived from the {NFTS}, {MINTS}, and {TRANSFERS} tables. For each address that participated in
TRANSFER_STATISTICS_BY_ADDRESS: f"""This table is derived from the {CONTRACTS}, {MINTS}, and {TRANSFERS} tables. For each address that participated in
at least one NFT transfer between April 1, 2021 and September 25, 2021, this table shows exactly how many NFTs that address transferred to
other addresses and how many NFT transfers that address was the recipient of.

Wyświetl plik

@ -2,195 +2,398 @@
This module provides tools to interact with and maintain a SQLite database which acts/should act as
a datastore for a Moonstream NFTs dataset.
"""
from ctypes import Union
import json
import logging
import sqlite3
from typing import Any, cast, List, Tuple, Optional
from tqdm import tqdm
from .data import EventType, NFTEvent, NFTMetadata
from .data import (
NftTransaction,
NftApprovalEvent,
NftTransferEvent,
NftApprovalForAllEvent,
Erc20TransferEvent,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"}
CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts
(
address TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
name TEXT,
symbol TEXT,
UNIQUE(address, name, symbol)
);
"""
BACKUP_NFTS_TABLE_QUERY = "ALTER TABLE nfts RENAME TO nfts_backup;"
DROP_BACKUP_NFTS_TABLE_QUERY = "DROP TABLE IF EXISTS nfts_backup;"
SELECT_NFTS_QUERY = "SELECT address, name, symbol FROM nfts;"
CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint
(
event_type STRING,
offset INTEGER
);
"""
def create_events_table_query(event_type: EventType) -> str:
def create_transactions_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {event_tables[event_type]}
(
event_id TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
transaction_hash TEXT,
block_number INTEGER,
nft_address TEXT REFERENCES nfts(address),
token_id TEXT,
from_address TEXT,
to_address TEXT,
transaction_value INTEGER,
timestamp INTEGER
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
transactionHash TEXT NOT NULL,
blockNumber INTEGER NOT NULL,
blockTimestamp INTEGER NOT NULL,
contractAddress TEXT,
from_address TEXT NOT NULL,
functionName TEXT NOT NULL,
functionArgs JSON NOT NULL,
value INTEGER NOT NULL,
gasUsed INTEGER NOT NULL,
gasPrice INTEGER NOT NULL,
maxFeePerGas INTEGER,
maxPriorityFeePerGas INTEGER,
UNIQUE(blockchainType, transactionHash)
);
"""
return creation_query
def backup_events_table_query(event_type: EventType) -> str:
backup_query = f"ALTER TABLE {event_tables[event_type]} RENAME TO {event_tables[event_type]}_backup;"
return backup_query
def drop_backup_events_table_query(event_type: EventType) -> str:
drop_query = f"DROP TABLE IF EXISTS {event_tables[event_type]}_backup;"
return drop_query
def select_events_table_query(event_type: EventType) -> str:
selection_query = f"""
SELECT
event_id,
transaction_hash,
nft_address,
token_id,
from_address,
to_address,
transaction_value,
block_number,
timestamp
FROM {event_tables[event_type]};
def create_approvals_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
tokenAddress TEXT NOT NULL,
owner TEXT NOT NULL,
approved TEXT NOT NULL,
tokenId TEXT NOT NULL,
transactionHash TEXT NOT NULL,
logIndex INTEGER NOT NULL,
UNIQUE(blockchainType, transactionHash, logIndex)
);
"""
return selection_query
return creation_query
def get_events_for_enrich(
conn: sqlite3.Connection, event_type: EventType
) -> List[NFTEvent]:
def select_query(event_type: EventType) -> str:
selection_query = f"""
SELECT
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
def create_approval_for_all_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
tokenAddress TEXT NOT NULL,
owner TEXT NOT NULL,
approved BOOL NOT NULL,
operator TEXT NOT NULL,
transactionHash TEXT NOT NULL,
logIndex INTEGER NOT NULL,
UNIQUE(blockchainType, transactionHash, logIndex)
);
"""
return creation_query
def create_transfers_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
tokenAddress TEXT NOT NULL,
from_address TEXT NOT NULL,
to_address TEXT NOT NULL,
tokenId TEXT NOT NULL,
transactionHash TEXT NOT NULL,
logIndex INTEGER NOT NULL,
UNIQUE(blockchainType, transactionHash, logIndex)
);
"""
return creation_query
def create_erc20_transfers_table_query(tabel_name) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {tabel_name}
(
blockchainType TEXT NOT NULL,
tokenAddress TEXT NOT NULL,
from_address TEXT NOT NULL,
to_address TEXT NOT NULL,
value INTEGER NOT NULL,
transactionHash TEXT NOT NULL,
logIndex INTEGER NOT NULL,
UNIQUE(blockchainType, transactionHash, logIndex)
);
"""
return creation_query
def insertTransactionQuery(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
transactionHash,
blockNumber,
blockTimestamp,
contractAddress,
from_address,
functionName,
functionArgs,
value,
gasUsed,
gasPrice,
maxFeePerGas,
maxPriorityFeePerGas
)
VALUES
(
?,?,?,?,?,?,?,?,?,?,?,?,?
);
"""
return query
def insert_nft_approval_query(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
tokenAddress,
owner,
approved,
tokenId,
transactionHash,
logIndex
)
VALUES
(
?,?,?,?,?,?,?
);
"""
return query
def insert_nft_approval_for_all_query(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
tokenAddress,
owner,
approved,
operator,
transactionHash,
logIndex
)
VALUES
(
?,?,?,?,?,?, ?
);
"""
return query
def insert_nft_transfers_query(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
tokenAddress,
from_address,
to_address,
transaction_value,
timestamp
FROM {event_tables[event_type]} WHERE block_number = 'None';
"""
tokenId,
transactionHash,
logIndex
)
VALUES
return selection_query
logger.info(f"Loading {event_tables[event_type]} table events for enrich")
cur = conn.cursor()
cur.execute(select_query(event_type))
events: List[NFTEvent] = []
for row in cur:
(
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
from_address,
to_address,
value,
timestamp,
) = cast(
Tuple[
str,
str,
Optional[int],
str,
str,
str,
str,
Optional[int],
Optional[int],
],
row,
)
event = NFTEvent(
event_id=event_id,
event_type=event_type, # Original argument to this function
nft_address=nft_address,
token_id=token_id,
from_address=from_address,
to_address=to_address,
transaction_hash=transaction_hash,
value=value,
block_number=block_number,
timestamp=timestamp,
)
events.append(event)
logger.info(f"Found {len(events)} events to enrich")
return events
(
?,?,?,?,?,?,?
);
"""
return query
def update_events_batch(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
def replace_query(event_type: EventType) -> str:
query = f"""
REPLACE INTO {event_tables[event_type]}(
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
def insert_erc20_transfer_query(tabel_name):
query = f"""
INSERT INTO {tabel_name}
(
blockchainType,
tokenAddress,
from_address,
to_address,
transaction_value,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
return query
value,
transactionHash,
logIndex
)
VALUES
(
?,?,?,?,?,?,?
);
"""
return query
logger.info("Updating events in sqlite")
def create_blockchain_type_index_query(tabel_name) -> str:
creation_query = f"""
CREATE INDEX IF NOT EXISTS {tabel_name}_blockchainType ON {tabel_name} (blockchainType);
"""
return creation_query
def nft_transaction_to_tuple(nft_transaction: NftTransaction) -> Tuple[Any]:
"""
Converts a NftTransaction object to a tuple which can be inserted into the database.
"""
return (
nft_transaction.blockchain_type,
nft_transaction.transaction_hash,
nft_transaction.block_number,
nft_transaction.block_timestamp,
nft_transaction.contract_address,
nft_transaction.caller_address,
nft_transaction.function_name,
json.dumps(nft_transaction.function_args),
str(nft_transaction.value),
str(nft_transaction.gas_used),
str(nft_transaction.gas_price),
str(nft_transaction.max_fee_per_gas),
str(nft_transaction.max_priority_fee_per_gas),
)
def nft_approval_to_tuple(nft_approval: NftApprovalEvent) -> Tuple[Any]:
"""
Converts a NftApprovalEvent object to a tuple which can be inserted into the database.
"""
return (
nft_approval.blockchain_type,
nft_approval.token_address,
nft_approval.owner,
nft_approval.approved,
str(nft_approval.token_id),
nft_approval.transaction_hash,
nft_approval.log_index,
)
def nft_approval_for_all_to_tuple(
nft_approval_for_all: NftApprovalForAllEvent,
) -> Tuple[Any]:
"""
Converts a NftApprovalForAllEvent object to a tuple which can be inserted into the database.
"""
return (
nft_approval_for_all.blockchain_type,
nft_approval_for_all.token_address,
nft_approval_for_all.owner,
nft_approval_for_all.approved,
nft_approval_for_all.operator,
nft_approval_for_all.transaction_hash,
nft_approval_for_all.log_index,
)
def nft_transfer_to_tuple(nft_transfer: NftTransferEvent) -> Tuple[Any]:
"""
Converts a NftTransferEvent object to a tuple which can be inserted into the database.
"""
return (
nft_transfer.blockchain_type,
nft_transfer.token_address,
nft_transfer.from_address,
nft_transfer.to_address,
str(nft_transfer.token_id),
nft_transfer.transaction_hash,
nft_transfer.log_index,
)
def erc20_nft_transfer_to_tuple(
erc20_nft_transfer: Erc20TransferEvent,
) -> Tuple[Any]:
"""
Converts a Erc20NftTransferEvent object to a tuple which can be inserted into the database.
"""
return (
erc20_nft_transfer.blockchain_type,
erc20_nft_transfer.token_address,
erc20_nft_transfer.from_address,
erc20_nft_transfer.to_address,
str(erc20_nft_transfer.value),
erc20_nft_transfer.transaction_hash,
erc20_nft_transfer.log_index,
)
def insert_transactions(
conn: sqlite3.Connection, transactions: List[NftTransaction]
) -> None:
"""
Inserts the given NftTransaction objects into the database.
"""
cur = conn.cursor()
try:
transfers = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.TRANSFER
]
mints = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.MINT
]
query = insertTransactionQuery("transactions")
cur.executemany(replace_query(EventType.TRANSFER), transfers)
cur.executemany(replace_query(EventType.MINT), mints)
cur.executemany(
query,
[nft_transaction_to_tuple(nft_transaction) for nft_transaction in transactions],
)
conn.commit()
except Exception as e:
logger.error(f"FAILED TO replace!!! :{events}")
conn.rollback()
raise e
conn.commit()
def insert_events(
conn: sqlite3.Connection,
events: list,
) -> None:
"""
Inserts the given NftApprovalForAllEvent, NftApprovalEvent, or NftTransferEvent objects into the database.
"""
cur = conn.cursor()
nft_transfers = []
erc20_transfers = []
approvals = []
approvals_for_all = []
for event in events:
if isinstance(event, NftApprovalEvent):
approvals.append(nft_approval_to_tuple(event))
elif isinstance(event, NftApprovalForAllEvent):
approvals_for_all.append(nft_approval_for_all_to_tuple(event))
elif isinstance(event, NftTransferEvent):
nft_transfers.append(nft_transfer_to_tuple(event))
elif isinstance(event, Erc20TransferEvent):
erc20_transfers.append(erc20_nft_transfer_to_tuple(event))
else:
raise ValueError(f"Unknown event type: {type(event)}")
if len(nft_transfers) > 0:
query = insert_nft_transfers_query("transfers")
cur.executemany(
query,
nft_transfers,
)
if len(approvals) > 0:
query = insert_nft_approval_query("approvals")
cur.executemany(
query,
approvals,
)
if len(approvals_for_all) > 0:
query = insert_nft_approval_for_all_query("approvals_for_all")
cur.executemany(query, approvals_for_all)
if len(erc20_transfers) > 0:
query = insert_erc20_transfer_query("erc20_transfers")
cur.executemany(query, erc20_transfers)
conn.commit()
def get_last_saved_block(
conn: sqlite3.Connection, blockchain_type: str
) -> Optional[int]:
"""
Returns the last block number that was saved to the database.
"""
cur = conn.cursor()
query = f"SELECT MAX(blockNumber) FROM transactions WHERE blockchainType = '{blockchain_type}'"
cur.execute(query)
result = cur.fetchone()
return result[0]
def setup_database(conn: sqlite3.Connection) -> None:
@ -199,266 +402,16 @@ def setup_database(conn: sqlite3.Connection) -> None:
"""
cur = conn.cursor()
cur.execute(CREATE_NFTS_TABLE_QUERY)
cur.execute(create_events_table_query(EventType.TRANSFER))
cur.execute(create_events_table_query(EventType.MINT))
cur.execute(CREATE_CHECKPOINT_TABLE_QUERY)
cur.execute(create_transactions_table_query("transactions"))
cur.execute(create_approvals_table_query("approvals"))
cur.execute(create_approval_for_all_table_query("approvals_for_all"))
cur.execute(create_transfers_table_query("transfers"))
cur.execute(create_erc20_transfers_table_query("erc20_transfers"))
cur.execute(create_blockchain_type_index_query("transactions"))
cur.execute(create_blockchain_type_index_query("approvals"))
cur.execute(create_blockchain_type_index_query("approvals_for_all"))
cur.execute(create_blockchain_type_index_query("transfers"))
cur.execute(create_blockchain_type_index_query("erc20_transfers"))
conn.commit()
def insert_events_query(event_type: EventType) -> str:
"""
Generates a query which inserts NFT events into the appropriate events table.
"""
query = f"""
INSERT OR IGNORE INTO {event_tables[event_type]}(
event_id,
transaction_hash,
block_number,
nft_address,
token_id,
from_address,
to_address,
transaction_value,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
return query
def nft_event_to_tuple(
event: NFTEvent,
) -> Tuple[str, str, str, str, str, str, str, str, str]:
"""
Converts an NFT event into a tuple for use with sqlite cursor executemany. This includes
dropping e.g. the event_type field.
"""
return (
str(event.event_id),
str(event.transaction_hash),
str(event.block_number),
str(event.nft_address),
str(event.token_id),
str(event.from_address),
str(event.to_address),
str(event.value),
str(event.timestamp),
)
def get_checkpoint_offset(
conn: sqlite3.Connection, event_type: EventType
) -> Optional[int]:
cur = conn.cursor()
response = cur.execute(
f"SELECT * from checkpoint where event_type='{event_type.value}' order by rowid desc limit 1"
)
for row in response:
return row[1]
return None
def delete_checkpoints(
conn: sqlite3.Connection, event_type: EventType, commit: bool = True
) -> None:
cur = conn.cursor()
cur.execute(f"DELETE FROM checkpoint where event_type='{event_type.value}';")
if commit:
try:
conn.commit()
except:
conn.rollback()
raise
def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int):
query = f"""
INSERT INTO checkpoint (
event_type,
offset
) VALUES (?, ?)
"""
cur = conn.cursor()
cur.execute(query, [event_type.value, offset])
conn.commit()
def insert_address_metadata(
conn: sqlite3.Connection, metadata_list: List[NFTMetadata]
) -> None:
cur = conn.cursor()
query = f"""
INSERT INTO nfts (
address,
name,
symbol
) VALUES (?, ?, ?)
"""
try:
nfts = [
(metadata.address, metadata.name, metadata.symbol)
for metadata in metadata_list
]
cur.executemany(query, nfts)
conn.commit()
except Exception as e:
logger.error(f"Failed to save :\n {metadata_list}")
conn.rollback()
raise e
def insert_events(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
"""
Inserts the given events into the appropriate events table in the given SQLite database.
This method works with batches of events.
"""
cur = conn.cursor()
try:
transfers = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.TRANSFER
]
mints = [
nft_event_to_tuple(event)
for event in events
if event.event_type == EventType.MINT
]
cur.executemany(insert_events_query(EventType.TRANSFER), transfers)
cur.executemany(insert_events_query(EventType.MINT), mints)
conn.commit()
except Exception as e:
logger.error(f"FAILED TO SAVE :{events}")
conn.rollback()
raise e
def import_data(
target_conn: sqlite3.Connection,
source_conn: sqlite3.Connection,
event_type: EventType,
batch_size: int = 1000,
) -> None:
"""
Imports the data correspondong to the given event type from the source database into the target
database.
Any existing data of that type in the target database is first deleted. It is a good idea to
create a backup copy of your target database before performing this operation.
"""
target_cur = target_conn.cursor()
drop_backup_query = DROP_BACKUP_NFTS_TABLE_QUERY
backup_table_query = BACKUP_NFTS_TABLE_QUERY
create_table_query = CREATE_NFTS_TABLE_QUERY
source_selection_query = SELECT_NFTS_QUERY
if event_type != EventType.ERC721:
drop_backup_query = drop_backup_events_table_query(event_type)
backup_table_query = backup_events_table_query(event_type)
create_table_query = create_events_table_query(event_type)
source_selection_query = select_events_table_query(event_type)
target_cur.execute(drop_backup_query)
target_cur.execute(backup_table_query)
target_cur.execute(create_table_query)
target_conn.commit()
source_cur = source_conn.cursor()
source_cur.execute(source_selection_query)
batch: List[Any] = []
for row in tqdm(source_cur, desc="Rows processed"):
if event_type == EventType.ERC721:
batch.append(NFTMetadata(*cast(Tuple[str, str, str], row)))
else:
# Order matches select query returned by select_events_table_query
(
event_id,
transaction_hash,
nft_address,
token_id,
from_address,
to_address,
value,
block_number,
timestamp,
) = cast(
Tuple[
str,
str,
str,
str,
str,
str,
Optional[int],
Optional[int],
Optional[int],
],
row,
)
event = NFTEvent(
event_id=event_id,
event_type=event_type, # Original argument to this function
nft_address=nft_address,
token_id=token_id,
from_address=from_address,
to_address=to_address,
transaction_hash=transaction_hash,
value=value,
block_number=block_number,
timestamp=timestamp,
)
batch.append(event)
if len(batch) == batch_size:
if event_type == EventType.ERC721:
insert_address_metadata(target_conn, cast(List[NFTMetadata], batch))
else:
insert_events(target_conn, cast(List[NFTEvent], batch))
if event_type == EventType.ERC721:
insert_address_metadata(target_conn, cast(List[NFTMetadata], batch))
else:
insert_events(target_conn, cast(List[NFTEvent], batch))
target_cur.execute(CREATE_CHECKPOINT_TABLE_QUERY)
target_conn.commit()
source_offset = get_checkpoint_offset(source_conn, event_type)
if source_offset is not None:
delete_checkpoints(target_conn, event_type, commit=False)
insert_checkpoint(target_conn, event_type, source_offset)
def filter_data(
sqlite_db: sqlite3.Connection,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
):
"""
Run Deletes query depends on filters
"""
cur = sqlite_db.cursor()
print(f"Remove by timestamp < {start_time}")
if start_time:
cur.execute(f"DELETE from transfers where timestamp < {start_time}")
print(f"Transfers filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp < {start_time}")
print(f"Mints filtered out: {cur.rowcount}")
sqlite_db.commit()
print(f"Remove by timestamp > {end_time}")
if end_time:
cur.execute(f"DELETE from transfers where timestamp > {end_time}")
print(f"Transfers filtered out: {cur.rowcount}")
sqlite_db.commit()
cur.execute(f"DELETE from mints where timestamp > {end_time}")
print(f"Mints filtered out: {cur.rowcount}")
sqlite_db.commit()

Wyświetl plik

@ -6,156 +6,4 @@ import json
from tqdm import tqdm
import requests
from .data import BlockBounds, EventType, NFTEvent, event_types
from .datastore import (
get_checkpoint_offset,
get_events_for_enrich,
insert_address_metadata,
insert_checkpoint,
insert_events,
update_events_batch,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EthereumBatchloader:
def __init__(self, jsonrpc_url) -> None:
self.jsonrpc_url = jsonrpc_url
self.message_number = 0
self.commands: List[Any] = []
self.requests_banch: List[Any] = []
def load_blocks(self, block_list: List[int], with_transactions: bool):
"""
Request list of blocks
"""
rpc = [
{
"jsonrpc": "2.0",
"id": index,
"method": "eth_getBlockByNumber",
"params": params_single,
}
for index, params_single in enumerate(
[[hex(block_number), with_transactions] for block_number in block_list]
)
]
response = self.send_json_message(rpc)
return response
def load_transactions(self, transaction_hashes: List[str]):
"""
Request list of transactions
"""
rpc = [
{
"jsonrpc": "2.0",
"method": "eth_getTransactionByHash",
"id": index,
"params": [tx_hash],
}
for index, tx_hash in enumerate(transaction_hashes)
]
response = self.send_json_message(rpc)
return response
def send_message(self, payload):
headers = {"Content-Type": "application/json"}
try:
r = requests.post(
self.jsonrpc_url, headers=headers, data=payload, timeout=300
)
except Exception as e:
print(e)
raise e
return r
def send_json_message(self, message):
encoded_json = json.dumps(message)
raw_response = self.send_message(encoded_json.encode("utf8"))
response = raw_response.json()
return response
def enrich_from_web3(
nft_events: List[NFTEvent],
batch_loader: EthereumBatchloader,
) -> List[NFTEvent]:
"""
Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db)
"""
transactions_to_query = set()
indices_to_update: List[int] = []
for index, nft_event in enumerate(nft_events):
if (
nft_event.block_number == "None"
or nft_event.value == "None"
or nft_event.timestamp == "None"
):
transactions_to_query.add(nft_event.transaction_hash)
indices_to_update.append(index)
if len(transactions_to_query) == 0:
return nft_events
logger.info("Calling JSON RPC API")
jsonrpc_transactions_response = batch_loader.load_transactions(
list(transactions_to_query)
)
transactions_map = {
result["result"]["hash"]: (
int(result["result"]["value"], 16),
int(result["result"]["blockNumber"], 16),
)
for result in jsonrpc_transactions_response
}
blocks_to_query: Set[int] = set()
for index in indices_to_update:
nft_events[index].value, nft_events[index].block_number = transactions_map[
nft_events[index].transaction_hash
]
blocks_to_query.add(cast(int, nft_events[index].block_number))
if len(blocks_to_query) == 0:
return nft_events
jsonrpc_blocks_response = batch_loader.load_blocks(list(blocks_to_query), False)
blocks_map = {
int(result["result"]["number"], 16): int(result["result"]["timestamp"], 16)
for result in jsonrpc_blocks_response
}
for index in indices_to_update:
nft_events[index].timestamp = blocks_map[cast(int, nft_event.block_number)]
return nft_events
def enrich(
datastore_conn: sqlite3.Connection,
event_type: EventType,
batch_loader: EthereumBatchloader,
batch_size: int = 1000,
) -> None:
events = get_events_for_enrich(datastore_conn, event_type)
events_batch = []
for event in tqdm(events, f"Processing events for {event_type.value} event type"):
events_batch.append(event)
if len(events_batch) == batch_size:
logger.info("Getting data from JSONrpc")
enriched_events = enrich_from_web3(
events_batch,
batch_loader,
)
update_events_batch(datastore_conn, enriched_events)
events_batch = []
logger.info("Getting data from JSONrpc")
enriched_events = enrich_from_web3(
events_batch,
batch_loader,
)
update_events_batch(datastore_conn, enriched_events)
from .data import BlockBounds

Wyświetl plik

@ -1,190 +1,227 @@
import logging
import sqlite3
from typing import Any, cast, Iterator, List, Optional, Set
from typing import Any, Dict, Union, cast, Iterator, List, Optional, Set
import json
from attr import dataclass
from moonstreamdb.models import (
EthereumLabel,
EthereumTransaction,
EthereumBlock,
PolygonLabel,
)
from sqlalchemy import or_, and_
from sqlalchemy.orm import Session
from tqdm import tqdm
from web3 import Web3
import requests
from .data import BlockBounds, EventType, NFTEvent, NFTMetadata, event_types
from .datastore import (
get_checkpoint_offset,
insert_address_metadata,
insert_checkpoint,
insert_events,
from .data import (
NftApprovalEvent,
NftApprovalForAllEvent,
NftTransaction,
NftTransferEvent,
Erc20TransferEvent,
)
from .datastore import insert_events, insert_transactions
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
ERC721_LABEL = "erc721"
ERC20_LABEL = "test-erc20"
def add_events(
datastore_conn: sqlite3.Connection,
db_session: Session,
event_type: EventType,
initial_offset=0,
bounds: Optional[BlockBounds] = None,
batch_size: int = 10,
) -> None:
raw_created_at_list = (
db_session.query(EthereumLabel.created_at)
.filter(EthereumLabel.label == event_type.value)
.order_by(EthereumLabel.created_at.asc())
.distinct(EthereumLabel.created_at)
).all()
created_at_list = [
created_at[0] for created_at in raw_created_at_list[initial_offset:]
]
query = (
db_session.query(
EthereumLabel.id,
EthereumLabel.label,
EthereumLabel.address,
EthereumLabel.label_data,
EthereumLabel.transaction_hash,
EthereumTransaction.value,
EthereumTransaction.block_number,
EthereumBlock.timestamp,
)
.filter(EthereumLabel.label == event_type.value)
.outerjoin(
EthereumTransaction,
EthereumLabel.transaction_hash == EthereumTransaction.hash,
)
.outerjoin(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.order_by(
EthereumLabel.created_at.asc(),
)
def _get_last_labeled_erc721_block(
session: Session, label_model: Union[EthereumLabel, PolygonLabel]
) -> int:
last = (
session.query(label_model.block_number)
.filter(label_model.label == ERC721_LABEL)
.order_by(label_model.block_number.desc())
.first()
)
if bounds is not None:
time_filters = [EthereumTransaction.block_number >= bounds.starting_block]
if bounds.ending_block is not None:
time_filters.append(EthereumTransaction.block_number <= bounds.ending_block)
bounds_filters = [EthereumTransaction.hash == None, and_(*time_filters)]
query = query.filter(or_(*bounds_filters))
pbar = tqdm(total=(len(raw_created_at_list)))
pbar.set_description(f"Processing created ats")
pbar.update(initial_offset)
batch_start = 0
batch_end = batch_start + batch_size
while batch_start <= len(created_at_list):
events = query.filter(
EthereumLabel.created_at.in_(created_at_list[batch_start : batch_end + 1])
).all()
if not events:
continue
raw_events_batch = []
for (
event_id,
label,
address,
label_data,
transaction_hash,
value,
block_number,
timestamp,
) in events:
raw_event = NFTEvent(
event_id=event_id,
event_type=event_types[label],
nft_address=address,
token_id=label_data["tokenId"],
from_address=label_data["from"],
to_address=label_data["to"],
transaction_hash=transaction_hash,
value=value,
block_number=block_number,
timestamp=timestamp,
)
raw_events_batch.append(raw_event)
logger.info(f"Adding {len(raw_events_batch)} to database")
insert_events(
datastore_conn, raw_events_batch
) # TODO REMOVED WEB3 enrich, since node is down
insert_checkpoint(datastore_conn, event_type, batch_end + initial_offset)
pbar.update(batch_end - batch_start + 1)
batch_start = batch_end + 1
batch_end = min(batch_end + batch_size, len(created_at_list))
if last is None:
raise ValueError(f"No ERC721 labels found in {label_model.__tablename__} table")
return last[0]
def create_dataset(
datastore_conn: sqlite3.Connection,
db_session: Session,
event_type: EventType,
bounds: Optional[BlockBounds] = None,
batch_size: int = 10,
) -> None:
"""
Creates Moonstream NFTs dataset in the given SQLite datastore.
"""
offset = get_checkpoint_offset(datastore_conn, event_type)
if offset is not None:
logger.info(f"Found checkpoint for {event_type.value}: offset = {offset}")
def parse_transaction_label(
label_model: Union[EthereumLabel, PolygonLabel]
) -> NftTransaction:
assert (
label_model.label_data["type"] == "tx_call"
), "Expected label to be of type 'tx_call'"
if isinstance(label_model, EthereumLabel):
blockchain_type = "ethereum"
else:
offset = 0
logger.info(f"Did not found any checkpoint for {event_type.value}")
blockchain_type = "polygon"
if event_type == EventType.ERC721:
add_contracts_metadata(datastore_conn, db_session, offset, batch_size)
else:
add_events(
datastore_conn,
db_session,
event_type,
offset,
bounds,
batch_size,
)
# TODO: this is done because I forgot to add value in polygon labels
value = 0
if label_model.label_data.get("value") is not None:
value = label_model.label_data["value"]
def add_contracts_metadata(
datastore_conn: sqlite3.Connection,
db_session: Session,
initial_offset: int = 0,
batch_size: int = 1000,
) -> None:
logger.info("Adding erc721 contract metadata")
query = (
db_session.query(EthereumLabel.label_data, EthereumLabel.address)
.filter(EthereumLabel.label == EventType.ERC721.value)
.order_by(EthereumLabel.created_at)
return NftTransaction(
blockchain_type=blockchain_type,
block_number=label_model.block_number,
block_timestamp=label_model.block_timestamp,
transaction_hash=label_model.transaction_hash,
contract_address=label_model.address,
caller_address=label_model.label_data["caller"],
function_name=label_model.label_data["name"],
function_args=label_model.label_data["args"],
gas_used=label_model.label_data["gasUsed"],
gas_price=label_model.label_data["gasPrice"],
value=value,
status=label_model.label_data["status"],
max_fee_per_gas=label_model.label_data["maxFeePerGas"],
max_priority_fee_per_gas=label_model.label_data["maxPriorityFeePerGas"],
)
offset = initial_offset
while True:
events = query.offset(offset).limit(batch_size).all()
if not events:
break
offset += len(events)
events_batch: List[NFTMetadata] = []
for label_data, address in events:
events_batch.append(
NFTMetadata(
address=address,
name=label_data.get("name", None),
symbol=label_data.get("symbol", None),
)
def _parse_transfer_event(
label_model: Union[EthereumLabel, PolygonLabel]
) -> NftTransferEvent:
assert (
label_model.label_data["type"] == "event"
), "Expected label to be of type 'event'"
assert (
label_model.label_data["name"] == "Transfer"
), "Expected label to be of type 'Transfer'"
if isinstance(label_model, EthereumLabel):
blockchain_type = "ethereum"
else:
blockchain_type = "polygon"
if label_model.label_data["args"].get("tokenId") is not None:
return NftTransferEvent(
blockchain_type=blockchain_type,
token_address=label_model.address,
from_address=label_model.label_data["args"]["from"],
to_address=label_model.label_data["args"]["to"],
token_id=label_model.label_data["args"]["tokenId"],
log_index=label_model.log_index,
transaction_hash=label_model.transaction_hash,
)
else:
return Erc20TransferEvent(
blockchain_type=blockchain_type,
token_address=label_model.address,
from_address=label_model.label_data["args"]["from"],
to_address=label_model.label_data["args"]["to"],
value=label_model.label_data["args"]["value"],
log_index=label_model.log_index,
transaction_hash=label_model.transaction_hash,
)
def _parse_approval_event(
label_model: Union[EthereumLabel, PolygonLabel]
) -> NftApprovalEvent:
assert (
label_model.label_data["type"] == "event"
), "Expected label to be of type 'event'"
assert (
label_model.label_data["name"] == "Approval"
), "Expected label to be of type 'Approval'"
if isinstance(label_model, EthereumLabel):
blockchain_type = "ethereum"
else:
blockchain_type = "polygon"
return NftApprovalEvent(
blockchain_type=blockchain_type,
token_address=label_model.address,
owner=label_model.label_data["args"]["owner"],
approved=label_model.label_data["args"]["approved"],
token_id=label_model.label_data["args"]["tokenId"],
log_index=label_model.log_index,
transaction_hash=label_model.transaction_hash,
)
def _parse_approval_for_all_event(
label_model: Union[EthereumLabel, PolygonLabel]
) -> NftApprovalForAllEvent:
assert (
label_model.label_data["type"] == "event"
), "Expected label to be of type 'event'"
assert (
label_model.label_data["name"] == "ApprovalForAll"
), "Expected label to be of type 'ApprovalForAll'"
if isinstance(label_model, EthereumLabel):
blockchain_type = "ethereum"
else:
blockchain_type = "polygon"
return NftApprovalForAllEvent(
blockchain_type=blockchain_type,
token_address=label_model.address,
owner=label_model.label_data["args"]["owner"],
operator=label_model.label_data["args"]["operator"],
approved=label_model.label_data["args"]["approved"],
log_index=label_model.log_index,
transaction_hash=label_model.transaction_hash,
)
def parse_event(
label_model: Union[EthereumLabel, PolygonLabel]
) -> Union[NftTransferEvent, NftApprovalEvent, NftApprovalForAllEvent]:
if label_model.label_data["name"] == "Transfer":
return _parse_transfer_event(label_model)
elif label_model.label_data["name"] == "Approval":
return _parse_approval_event(label_model)
elif label_model.label_data["name"] == "ApprovalForAll":
return _parse_approval_for_all_event(label_model)
else:
raise ValueError(f"Unknown label type: {label_model.label_data['name']}")
def crawl_erc721_labels(
db_session: Session,
conn: sqlite3.Connection,
label_model: Union[EthereumLabel, PolygonLabel],
start_block: int,
end_block: int,
batch_size: int = 10000,
):
logger.info(
f"Crawling {label_model.__tablename__} from {start_block} to {end_block}"
)
pbar = tqdm(total=(end_block - start_block + 1))
pbar.set_description(
f"Crawling {label_model.__tablename__} blocks {start_block}-{end_block}"
)
current_block = start_block
while current_block <= end_block:
batch_end = min(current_block + batch_size, end_block)
logger.info(f"Crawling {current_block}-{batch_end}")
labels = db_session.query(label_model).filter(
and_(
label_model.block_number >= current_block,
label_model.block_number <= batch_end,
or_(
label_model.label == ERC721_LABEL, label_model.label == ERC20_LABEL
),
)
insert_address_metadata(datastore_conn, events_batch)
insert_checkpoint(datastore_conn, EventType.ERC721, offset)
logger.info(f"Already added {offset}")
)
logger.info(f"Added total of {offset-initial_offset} nfts metadata")
logger.info(f"Found {labels.count()} labels")
transactions = []
events = []
for label in labels:
if label.label_data["type"] == "tx_call":
transactions.append(parse_transaction_label(label))
else:
events.append(parse_event(label))
logger.info(f"Parsed {len(events)} events and {len(transactions)} transactions")
insert_transactions(conn, transactions)
insert_events(conn, events)
logger.info(f"Saved {len(events)} events and {len(transactions)} transactions")
pbar.update(batch_end - current_block + 1)
current_block = batch_end + 1

Wyświetl plik

@ -32,6 +32,7 @@ setup(
zip_safe=False,
install_requires=[
"moonstreamdb",
"mooncrawl",
"humbug",
"numpy",
"pandas",