added sync functionality

pull/263/head
yhtiyar 2021-09-16 18:52:07 +03:00
rodzic c134d4f6ac
commit 42f6d893cc
1 zmienionych plików z 99 dodań i 58 usunięć

Wyświetl plik

@ -13,7 +13,7 @@ from typing import Any, cast, Dict, Optional
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumBlock
from moonstreamdb.models import EthereumBlock, EthereumTransaction, EthereumLabel
from sqlalchemy.orm.session import Session
from web3 import Web3
@ -21,6 +21,8 @@ from ..ethereum import connect
from .ethereum import (
summary as ethereum_summary,
add_labels,
MINT_LABEL,
TRANSFER_LABEL,
SUMMARY_KEY_ARGS,
SUMMARY_KEY_ID,
SUMMARY_KEY_NUM_BLOCKS,
@ -61,53 +63,45 @@ def get_latest_block_from_db(db_session: Session):
def get_latest_summary_block() -> Optional[int]:
pass
return None
def get_latest_nft_event_date() -> Optional[datetime]:
pass
def get_latest_nft_labeled_block(db_session: Session) -> Optional[int]:
query = (
db_session.query(
EthereumLabel.label,
EthereumTransaction.hash,
EthereumBlock.block_number,
)
.join(
EthereumTransaction,
EthereumLabel.transaction_hash == EthereumTransaction.hash,
)
.join(
EthereumBlock,
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(EthereumLabel.label.in_([MINT_LABEL, TRANSFER_LABEL]))
.order_by(EthereumBlock.block_number.desc())
.limit(1)
)
return query.one_or_none().block_number
def sync_labels(
db_session: Session, web3_client: Web3, start: Optional[int]
) -> EthereumBlock:
def sync_labels(db_session: Session, web3_client: Web3, start: Optional[int]) -> int:
if start is None:
logger.info(
"Syncing start block is not given, getting it from latest nft label in db"
)
start_date = get_latest_nft_event_date()
if start_date is None:
start = get_latest_nft_labeled_block(db_session)
if start is None:
logger.warning(
"Didn't find any nft labels in db, starting sync from 3 month before now"
)
time_now = datetime.now(timezone.utc)
start_date = time_now - relativedelta(months=3)
start = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.timestamp >= start_date.timestamp())
.order_by(EthereumBlock.timestamp.asc())
.limit(1)
.one()
).block_number
logger.info(f"Start block: {start}, date: {start_date}")
latest_block = get_latest_block_from_db(db_session)
end = latest_block.block_number
assert start <= end, f"Start block {start} is greater than latest_block {end} in db"
logger.info(f"Labeling blocks {start}-{end}")
add_labels(web3_client, db_session, start, end)
return latest_block
def sync_summaries(db_session: Session, end: int, start: Optional[int]):
if start is None:
logger.info(
"Syncing start time is not given, getting it from latest nft label in db"
)
start = get_latest_summary_block()
if start is None:
time_now = datetime.now(timezone.utc)
start_date = time_now - relativedelta(months=3)
start = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.timestamp >= start_date.timestamp())
@ -115,20 +109,60 @@ def sync_summaries(db_session: Session, end: int, start: Optional[int]):
.limit(1)
.one()
).block_number
start += 1
logger.info(f"Syncing labels, start block: {start}")
latest_block = get_latest_block_from_db(db_session)
end = latest_block.block_number
if start > end:
logger.warn(f"Start block {start} is greater than latest_block {end} in db")
logger.warn("Maybe ethcrawler is not syncing or nft sync is up to date")
return start - 1
logger.info(f"Labeling blocks {start}-{end}")
add_labels(web3_client, db_session, start, end)
return latest_block.block_number
while start < end:
current_end = start + BLOCKS_PER_SUMMARY - 1
current_end_time = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.block_number <= current_end)
.order_by(EthereumBlock.block_number.desc())
.limit(1)
.one()
).timestamp
summary_result = ethereum_summary(
db_session, datetime.fromtimestamp(current_end_time, timezone.utc)
def sync_summaries(
db_session: Session,
start: Optional[int],
end: int,
humbug_token: str,
) -> int:
if start is None:
logger.info(
"Syncing start time is not given, getting it from latest nft label in db"
)
start = get_latest_summary_block()
if start is not None:
start += 1
else:
logger.info(
"There is no entry in humbug, starting to create summaries from 3 month ago"
)
time_now = datetime.now(timezone.utc)
start_date = time_now - relativedelta(months=3)
start = (
db_session.query(EthereumBlock)
.filter(EthereumBlock.timestamp >= start_date.timestamp())
.order_by(EthereumBlock.timestamp.asc())
.limit(1)
.one()
).block_number
logger.info(f"Syncing summaries start_block: {start}")
batch_end = start + BLOCKS_PER_SUMMARY - 1
if batch_end > end:
logger.warn("Syncing summaries is not required")
while batch_end <= end:
summary_result = ethereum_summary(db_session, start, batch_end)
push_summary(summary_result, humbug_token)
logger.info(f"Pushed summary of blocks : {start}-{batch_end}")
start = batch_end + 1
batch_end += BLOCKS_PER_SUMMARY
if start == end:
return end
else:
return start - 1
def ethereum_sync_handler(args: argparse.Namespace) -> None:
@ -139,20 +173,22 @@ def ethereum_sync_handler(args: argparse.Namespace) -> None:
raise ValueError("MOONSTREAM_HUMBUG_TOKEN env variable is not set")
with yield_db_session_ctx() as db_session:
logger.info("Initial labeling:")
last_labeled = sync_labels(db_session, web3_client, args.start)
logger.info("Initial summary creation:")
last_summary_created = sync_summaries(
db_session, args.start, last_labeled, humbug_token
)
while True:
end_time = datetime.fromtimestamp(latest_block.timestamp, timezone.utc)
print(f"Creating summary with endtime={end_time}")
result = ethereum_summary(db_session, end_time)
push_summary(result, end, humbug_token)
sleep_time = 60 * 60
print(f"Going to sleep for:{sleep_time}s")
logger.info("Syncing")
last_labeled = sync_labels(db_session, web3_client, last_labeled + 1)
last_summary_created = sync_summaries(
db_session, last_summary_created + 1, last_labeled, humbug_token
)
sleep_time = 20 * 60
logger.info(f"Going to sleep for {sleep_time}s")
time.sleep(sleep_time)
start = end + 1
latest_block = get_latest_block_from_db(db_session)
end = latest_block.block_number
def ethereum_label_handler(args: argparse.Namespace) -> None:
web3_client = web3_client_from_cli_or_env(args)
@ -310,6 +346,11 @@ def main() -> None:
required=False,
help="Starting block number (inclusive if block available)",
)
parser_ethereum_sync.add_argument(
"--humbug",
default=None,
help=("Humbug token to publish summary reports"),
)
parser_ethereum_sync.set_defaults(func=ethereum_sync_handler)
args = parser.parse_args()