Added Humbug publication functionality for "nft ethereum" command

pull/226/head
Neeraj Kashyap 2021-09-02 22:16:42 -07:00
rodzic 0694e7d45e
commit f06f2e3fad
2 zmienionych plików z 53 dodań i 6 usunięć

Wyświetl plik

@ -2,13 +2,19 @@
A command line tool to crawl information about NFTs from various sources.
"""
import argparse
from typing import cast
from dataclasses import asdict
import json
import os
import sys
from typing import Any, cast, Dict, List
from web3 import Web3
from ..ethereum import connect
from .ethereum import get_nft_transfers
from ..publish import publish_json
from ..settings import MOONSTREAM_IPC_PATH
from ..version import MOONCRAWL_VERSION
def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3:
@ -30,11 +36,34 @@ def web3_client_from_cli_or_env(args: argparse.Namespace) -> Web3:
def ethereum_handler(args: argparse.Namespace) -> None:
web3_client = web3_client_from_cli_or_env(args)
transfers = get_nft_transfers(web3_client, args.start, args.end, args.address)
for transfer in transfers:
print(transfer)
print("Total transfers:", len(transfers))
print("Mints:", len([transfer for transfer in transfers if transfer.is_mint]))
# TODO(zomglings): Create a function which calculates statistics about ethereum NFTs in the
# ethereum module and call it here. Don't do this calculation here.
num_mints = len([transfer for transfer in transfers if transfer.is_mint])
# TODO(zomglings): Add dates as well as block numbers.
result = {
"num_transfers": len(transfers),
"num_mints": num_mints,
"initial_block": args.start,
"terminal_block": args.end,
}
humbug_token = args.humbug
if humbug_token is None:
humbug_token = os.environ.get("MOONSTREAM_HUMBUG_TOKEN")
if humbug_token:
title = f"NFT activity on the Ethereum blockchain: Blocks {args.start} to {args.end}"
publish_json(
"nft_ethereum",
humbug_token,
title,
result,
tags=[f"crawler_version:{MOONCRAWL_VERSION}"],
wait=False,
)
with args.outfile as ofp:
json.dump(result, ofp)
def main() -> None:
@ -73,6 +102,23 @@ def main() -> None:
default=None,
help="(Optional) Web3 connection string. If not provided, uses the value specified by MOONSTREAM_IPC_PATH environment variable.",
)
parser_ethereum.add_argument(
"--humbug",
default=None,
help=(
"If you would like to write this data to a Moonstream journal, please provide a Humbug "
"token for that here. (This argument overrides any value set in the "
"MOONSTREAM_HUMBUG_TOKEN environment variable)"
),
)
parser_ethereum.add_argument(
"-o",
"--outfile",
type=argparse.FileType("w"),
default=sys.stdout,
help="Optional file to write output to. By default, prints to stdout.",
)
parser_ethereum.set_defaults(func=ethereum_handler)
args = parser.parse_args()

Wyświetl plik

@ -11,6 +11,7 @@ def publish_json(
title: str,
content: Dict[str, Any],
tags: Optional[List[str]] = None,
wait: bool = True,
) -> None:
spire_api_url = os.environ.get(
"MOONSTREAM_SPIRE_API_URL", "https://spire.bugout.dev"
@ -26,7 +27,7 @@ def publish_json(
"Authorization": f"Bearer {humbug_token}",
}
request_body = {"title": title, "content": json.dumps(content), "tags": tags}
query_parameters = {"sync": True}
query_parameters = {"sync": wait}
response = requests.post(
report_url, headers=headers, json=request_body, params=query_parameters
)