summary function for Ethereum NFTs

pull/226/head
Neeraj Kashyap 2021-09-02 22:37:40 -07:00
rodzic f06f2e3fad
commit a564737dbc
2 zmienionych plików z 44 dodań i 17 usunięć

Wyświetl plik

@ -2,16 +2,15 @@
A command line tool to crawl information about NFTs from various sources.
"""
import argparse
from dataclasses import asdict
import json
import os
import sys
from typing import Any, cast, Dict, List
from typing import cast
from web3 import Web3
from ..ethereum import connect
from .ethereum import get_nft_transfers
from .ethereum import summary as ethereum_summary
from ..publish import publish_json
from ..settings import MOONSTREAM_IPC_PATH
from ..version import MOONCRAWL_VERSION
@ -35,19 +34,7 @@ def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3:
def ethereum_handler(args: argparse.Namespace) -> None:
web3_client = web3_client_from_cli_or_env(args)
transfers = get_nft_transfers(web3_client, args.start, args.end, args.address)
# TODO(zomglings): Create a function which calculates statistics about ethereum NFTs in the
# ethereum module and call it here. Don't do this calculation here.
num_mints = len([transfer for transfer in transfers if transfer.is_mint])
# TODO(zomglings): Add dates as well as block numbers.
result = {
"num_transfers": len(transfers),
"num_mints": num_mints,
"initial_block": args.start,
"terminal_block": args.end,
}
result = ethereum_summary(web3_client, args.start, args.end, args.address)
humbug_token = args.humbug
if humbug_token is None:

Wyświetl plik

@ -1,5 +1,6 @@
from dataclasses import dataclass, asdict
from typing import cast, List, Optional
from datetime import datetime
from typing import Any, cast, Dict, List, Optional
from hexbytes.main import HexBytes
from eth_typing.encoding import HexStr
@ -183,3 +184,42 @@ def get_nft_transfers(
parsed_transfer = NFTTransfer(**kwargs) # type: ignore
nft_transfers.append(parsed_transfer)
return nft_transfers
def summary(
w3: Web3,
from_block: Optional[int] = None,
to_block: Optional[int] = None,
address: Optional[str] = None,
) -> Dict[str, Any]:
if to_block is None:
to_block = w3.eth.get_block_number()
# By default, let us summarize 100 blocks worth of NFT transfers
if from_block is None:
from_block = to_block - 100
start_block = w3.eth.get_block(from_block)
start_time = datetime.utcfromtimestamp(start_block.timestamp).isoformat()
end_block = w3.eth.get_block(to_block)
end_time = datetime.utcfromtimestamp(end_block.timestamp).isoformat()
transfers = get_nft_transfers(w3, from_block, to_block, address)
num_mints = sum(transfer.is_mint for transfer in transfers)
result = {
"date_range": {
"start_time": start_time,
"include_start": True,
"end_time": end_time,
"include_end": True,
},
"blocks": {
"start": from_block,
"end": to_block,
},
"num_transfers": len(transfers),
"num_mints": num_mints,
}
return result