Added support for created_at in Humbug reports

`nft ethereum summary` crawler reports the `"date_range.end_time"` as
its `created_at`. This allows us to query the Humbug journal for all
summaries involving end blocks that were mined during a given stream
boundary.
pull/263/head
Neeraj Kashyap 2021-09-16 02:14:22 -07:00
rodzic 33fa888117
commit c134d4f6ac
3 zmienionych plików z 60 dodań i 8 usunięć

Wyświetl plik

@ -2,15 +2,14 @@
A command line tool to crawl information about NFTs from various sources. A command line tool to crawl information about NFTs from various sources.
""" """
import argparse import argparse
from datetime import datetime, timedelta, timezone from datetime import datetime, timezone
import dateutil.parser
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
import json import json
import logging import logging
import os import os
import sys import sys
import time import time
from typing import Any, Dict, cast, Optional from typing import Any, cast, Dict, Optional
from moonstreamdb.db import yield_db_session_ctx from moonstreamdb.db import yield_db_session_ctx
@ -19,7 +18,13 @@ from sqlalchemy.orm.session import Session
from web3 import Web3 from web3 import Web3
from ..ethereum import connect from ..ethereum import connect
from .ethereum import summary as ethereum_summary, add_labels from .ethereum import (
summary as ethereum_summary,
add_labels,
SUMMARY_KEY_ARGS,
SUMMARY_KEY_ID,
SUMMARY_KEY_NUM_BLOCKS,
)
from ..publish import publish_json from ..publish import publish_json
from ..settings import MOONSTREAM_IPC_PATH from ..settings import MOONSTREAM_IPC_PATH
from ..version import MOONCRAWL_VERSION from ..version import MOONCRAWL_VERSION
@ -156,17 +161,45 @@ def ethereum_label_handler(args: argparse.Namespace) -> None:
def push_summary(result: Dict[str, Any], humbug_token: str): def push_summary(result: Dict[str, Any], humbug_token: str):
title = ( title = (
f"NFT activity on the Ethereum blockchain: end time: {result['crawled_at'] })" f"NFT activity on the Ethereum blockchain: end time: {result['crawled_at'] })"
) )
tags = [
f"crawler_version:{MOONCRAWL_VERSION}",
f"summary_id:{result.get(SUMMARY_KEY_ID, '')}",
]
# Add an "error:missing_blocks" tag for all summaries in which the number of blocks processed
# is not equal to the expected number of blocks.
args = result.get(SUMMARY_KEY_ARGS, {})
args_start = args.get("start")
args_end = args.get("end")
expected_num_blocks = None
if args_start is not None and args_end is not None:
expected_num_blocks = cast(int, args_end) - cast(int, args_start) + 1
num_blocks = result.get(SUMMARY_KEY_NUM_BLOCKS)
if (
expected_num_blocks is None
or num_blocks is None
or num_blocks != expected_num_blocks
):
tags.append("error:missing_blocks")
# TODO(yhtyyar, zomglings): Also add checkpoints in database for nft labelling. This way, we can
# add an "error:stale" tag to summaries generated before nft labels were processed for the
# block range in the summary.
created_at = result.get("date_range", {}).get("end_time")
publish_json( publish_json(
"nft_ethereum", "nft_ethereum",
humbug_token, humbug_token,
title, title,
result, result,
tags=[f"crawler_version:{MOONCRAWL_VERSION}"], tags=tags,
wait=False, wait=True,
created_at=created_at,
) )

Wyświetl plik

@ -31,6 +31,8 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
# Summary keys # Summary keys
SUMMARY_KEY_ID = "summary_id"
SUMMARY_KEY_ARGS = "args"
SUMMARY_KEY_START_BLOCK = "start_block" SUMMARY_KEY_START_BLOCK = "start_block"
SUMMARY_KEY_END_BLOCK = "end_block" SUMMARY_KEY_END_BLOCK = "end_block"
SUMMARY_KEY_NUM_BLOCKS = "num_blocks" SUMMARY_KEY_NUM_BLOCKS = "num_blocks"
@ -43,6 +45,8 @@ SUMMARY_KEY_NFT_PURCHASERS = "nft_owners"
SUMMARY_KEY_NFT_MINTERS = "nft_minters" SUMMARY_KEY_NFT_MINTERS = "nft_minters"
SUMMARY_KEYS = [ SUMMARY_KEYS = [
SUMMARY_KEY_ID,
SUMMARY_KEY_ARGS,
SUMMARY_KEY_START_BLOCK, SUMMARY_KEY_START_BLOCK,
SUMMARY_KEY_END_BLOCK, SUMMARY_KEY_END_BLOCK,
SUMMARY_KEY_NUM_BLOCKS, SUMMARY_KEY_NUM_BLOCKS,
@ -497,6 +501,8 @@ def block_bounded_summary(
""" """
Produces a summary of Ethereum NFT activity between the given start_time and end_time (inclusive). Produces a summary of Ethereum NFT activity between the given start_time and end_time (inclusive).
""" """
summary_id = f"nft-ethereum-start-{start_block}-end-{end_block}"
block_filter = and_( block_filter = and_(
EthereumBlock.block_number >= start_block, EthereumBlock.block_number >= start_block,
EthereumBlock.block_number <= end_block, EthereumBlock.block_number <= end_block,
@ -629,6 +635,8 @@ def block_bounded_summary(
"end_time": end_time, "end_time": end_time,
"include_end": True, "include_end": True,
}, },
SUMMARY_KEY_ID: summary_id,
SUMMARY_KEY_ARGS: {"start": start_block, "end": end_block},
SUMMARY_KEY_START_BLOCK: first_block, SUMMARY_KEY_START_BLOCK: first_block,
SUMMARY_KEY_END_BLOCK: last_block, SUMMARY_KEY_END_BLOCK: last_block,
SUMMARY_KEY_NUM_BLOCKS: num_blocks, SUMMARY_KEY_NUM_BLOCKS: num_blocks,

Wyświetl plik

@ -1,3 +1,4 @@
from datetime import datetime
import json import json
import os import os
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
@ -12,6 +13,7 @@ def publish_json(
content: Dict[str, Any], content: Dict[str, Any],
tags: Optional[List[str]] = None, tags: Optional[List[str]] = None,
wait: bool = True, wait: bool = True,
created_at: Optional[str] = None,
) -> None: ) -> None:
spire_api_url = os.environ.get( spire_api_url = os.environ.get(
"MOONSTREAM_SPIRE_API_URL", "https://spire.bugout.dev" "MOONSTREAM_SPIRE_API_URL", "https://spire.bugout.dev"
@ -26,9 +28,18 @@ def publish_json(
headers = { headers = {
"Authorization": f"Bearer {humbug_token}", "Authorization": f"Bearer {humbug_token}",
} }
request_body = {"title": title, "content": json.dumps(content), "tags": tags} request_body = {
"title": title,
"content": json.dumps(content),
"tags": tags,
}
if created_at is not None:
request_body["created_at"] = created_at
query_parameters = {"sync": wait} query_parameters = {"sync": wait}
response = requests.post( response = requests.post(
report_url, headers=headers, json=request_body, params=query_parameters report_url, headers=headers, json=request_body, params=query_parameters
) )
response.raise_for_status() response.raise_for_status()