Merge remote-tracking branch 'upstream/main' into entry-cards

pull/47/head
Tim Pechersky 2021-08-03 19:48:13 +08:00
commit 7bfc0b34eb
5 zmienionych plików z 223 dodań i 56 usunięć

Wyświetl plik

@ -11,7 +11,7 @@ fastapi==0.66.0
h11==0.12.0 h11==0.12.0
idna==3.2 idna==3.2
jmespath==0.10.0 jmespath==0.10.0
-e git+https://git@github.com/bugout-dev/moonstream.git@5946abb75bb91f27dbcf93dc15be205700dc7a51#egg=moonstreamdb&subdirectory=db -e git+https://git@github.com/bugout-dev/moonstream.git@ec3278e192119d1e8a273cfaab6cb53890d2e8e9#egg=moonstreamdb&subdirectory=db
mypy==0.910 mypy==0.910
mypy-extensions==0.4.3 mypy-extensions==0.4.3
pathspec==0.9.0 pathspec==0.9.0

Wyświetl plik

@ -3,9 +3,11 @@ Moonstream crawlers CLI.
""" """
import argparse import argparse
from distutils.util import strtobool from distutils.util import strtobool
from enum import Enum
import json import json
import sys import sys
import time import time
from typing import Iterator, List
from .ethereum import ( from .ethereum import (
crawl_blocks_executor, crawl_blocks_executor,
@ -17,37 +19,62 @@ from .ethereum import (
from .settings import MOONSTREAM_CRAWL_WORKERS from .settings import MOONSTREAM_CRAWL_WORKERS
def yield_blocks_numbers_lists(blocks_range_str: str) -> None: class ProcessingOrder(Enum):
DESCENDING = 0
ASCENDING = 1
def yield_blocks_numbers_lists(
blocks_range_str: str,
order: ProcessingOrder = ProcessingOrder.DESCENDING,
block_step: int = 1000,
) -> Iterator[List[int]]:
""" """
Generate list of blocks. Generate list of blocks.
Block steps used to prevent long executor tasks and data loss possibility. Block steps used to prevent long executor tasks and data loss possibility.
""" """
block_step = 1000
try: try:
blocks_start_end = blocks_range_str.split("-") blocks_start_end = blocks_range_str.split("-")
bottom_block_number = int(blocks_start_end[0]) input_start_block = int(blocks_start_end[0])
top_block_number = int(blocks_start_end[1]) input_end_block = int(blocks_start_end[1])
required_blocks_len = top_block_number - bottom_block_number + 1
except Exception: except Exception:
print( print(
"Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340" "Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340"
) )
return return
print(f"Required {required_blocks_len} blocks to process") starting_block = max(input_start_block, input_end_block)
ending_block = min(input_start_block, input_end_block)
while not top_block_number < bottom_block_number: stepsize = -1
temp_bottom_block_number = top_block_number - block_step if order == ProcessingOrder.ASCENDING:
if temp_bottom_block_number < bottom_block_number: starting_block = min(input_start_block, input_end_block)
temp_bottom_block_number = bottom_block_number - 1 ending_block = max(input_start_block, input_end_block)
blocks_numbers_list = list( stepsize = 1
range(top_block_number, temp_bottom_block_number, -1)
) current_block = starting_block
def keep_going() -> bool:
if order == ProcessingOrder.ASCENDING:
return current_block <= ending_block
return current_block >= ending_block
while keep_going():
temp_ending_block = current_block + stepsize * block_step
if order == ProcessingOrder.ASCENDING:
if temp_ending_block > ending_block:
temp_ending_block = ending_block + 1
else:
if temp_ending_block < ending_block:
temp_ending_block = ending_block - 1
blocks_numbers_list = list(range(current_block, temp_ending_block, stepsize))
yield blocks_numbers_list yield blocks_numbers_list
top_block_number -= block_step if order == ProcessingOrder.ASCENDING:
current_block += block_step
else:
current_block -= block_step
def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None: def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None:
@ -64,28 +91,21 @@ def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None:
print( print(
f"Synchronization is unnecessary for blocks {bottom_block_number}-{top_block_number - 1}" f"Synchronization is unnecessary for blocks {bottom_block_number}-{top_block_number - 1}"
) )
time.sleep(20) time.sleep(5)
continue continue
if top_block_number - bottom_block_number >= 10:
for blocks_numbers_list in yield_blocks_numbers_lists( for blocks_numbers_list in yield_blocks_numbers_lists(
f"{bottom_block_number}-{top_block_number}" f"{bottom_block_number}-{top_block_number}",
order=args.order,
): ):
print( print(f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}")
f"Adding blocks {blocks_numbers_list[-1]}-{blocks_numbers_list[0]}" # TODO(kompotkot): Set num_processes argument based on number of blocks to synchronize.
)
crawl_blocks_executor( crawl_blocks_executor(
block_numbers_list=blocks_numbers_list, block_numbers_list=blocks_numbers_list,
with_transactions=bool(strtobool(args.transactions)), with_transactions=bool(strtobool(args.transactions)),
) num_processes=args.jobs,
else:
blocks_numbers_list = range(bottom_block_number, top_block_number + 1)
print(f"Adding blocks {bottom_block_number}-{top_block_number - 1}")
crawl_blocks(
blocks_numbers=blocks_numbers_list,
with_transactions=bool(strtobool(args.transactions)),
) )
print(f"Synchronized blocks from {bottom_block_number} to {top_block_number}") print(f"Synchronized blocks from {bottom_block_number} to {top_block_number}")
time.sleep(10)
def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None: def ethcrawler_blocks_add_handler(args: argparse.Namespace) -> None:
@ -169,6 +189,18 @@ def main() -> None:
description="Ethereum blocks commands" description="Ethereum blocks commands"
) )
valid_processing_orders = {
"asc": ProcessingOrder.ASCENDING,
"desc": ProcessingOrder.DESCENDING,
}
def processing_order(raw_order: str) -> ProcessingOrder:
if raw_order in valid_processing_orders:
return valid_processing_orders[raw_order]
raise ValueError(
f"Invalid processing order ({raw_order}). Valid choices: {valid_processing_orders.keys()}"
)
parser_ethcrawler_blocks_sync = subcommands_ethcrawler_blocks.add_parser( parser_ethcrawler_blocks_sync = subcommands_ethcrawler_blocks.add_parser(
"synchronize", description="Synchronize to latest ethereum block commands" "synchronize", description="Synchronize to latest ethereum block commands"
) )
@ -186,6 +218,22 @@ def main() -> None:
default=0, default=0,
help="(Optional) Block to start synchronization from. Default: 0", help="(Optional) Block to start synchronization from. Default: 0",
) )
parser_ethcrawler_blocks_sync.add_argument(
"--order",
type=processing_order,
default=ProcessingOrder.DESCENDING,
help="Order in which to process blocks (choices: desc, asc; default: desc)",
)
parser_ethcrawler_blocks_sync.add_argument(
"-j",
"--jobs",
type=int,
default=MOONSTREAM_CRAWL_WORKERS,
help=(
f"Number of processes to use when synchronizing (default: {MOONSTREAM_CRAWL_WORKERS})."
" If you set to 1, the main process handles synchronization without spawning subprocesses."
),
)
parser_ethcrawler_blocks_sync.set_defaults(func=ethcrawler_blocks_sync_handler) parser_ethcrawler_blocks_sync.set_defaults(func=ethcrawler_blocks_sync_handler)
parser_ethcrawler_blocks_add = subcommands_ethcrawler_blocks.add_parser( parser_ethcrawler_blocks_add = subcommands_ethcrawler_blocks.add_parser(

Wyświetl plik

@ -136,6 +136,7 @@ def crawl_blocks_executor(
block_numbers_list: List[int], block_numbers_list: List[int],
with_transactions: bool = False, with_transactions: bool = False,
verbose: bool = False, verbose: bool = False,
num_processes: int = MOONSTREAM_CRAWL_WORKERS,
) -> None: ) -> None:
""" """
Execute crawler in processes. Execute crawler in processes.
@ -153,7 +154,9 @@ def crawl_blocks_executor(
worker_job_lists[i % MOONSTREAM_CRAWL_WORKERS].append(block_number) worker_job_lists[i % MOONSTREAM_CRAWL_WORKERS].append(block_number)
results: List[Future] = [] results: List[Future] = []
if num_processes == 1:
return crawl_blocks(block_numbers_list, with_transactions, verbose)
else:
with ProcessPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor: with ProcessPoolExecutor(max_workers=MOONSTREAM_CRAWL_WORKERS) as executor:
for worker in worker_indices: for worker in worker_indices:
if verbose: if verbose:
@ -174,7 +177,6 @@ def crawl_blocks_executor(
for error in errors: for error in errors:
print(f"- {error}") print(f"- {error}")
def process_contract_deployments() -> List[Tuple[str, str]]: def process_contract_deployments() -> List[Tuple[str, str]]:
""" """
Checks for new smart contracts that have been deployed to the blockchain but not registered in Checks for new smart contracts that have been deployed to the blockchain but not registered in

Wyświetl plik

@ -0,0 +1,117 @@
import unittest
from . import cli
class TestYieldBlockNumbersLists(unittest.TestCase):
def test_yield_descending_10_6_step_4(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", block_step=4
)
]
self.assertListEqual(partition, [[10, 9, 8, 7], [6]])
def test_yield_descending_10_6_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", block_step=3
)
]
self.assertListEqual(partition, [[10, 9, 8], [7, 6]])
def test_yield_descending_10_6_descending_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", cli.ProcessingOrder.DESCENDING, 3
)
]
self.assertListEqual(partition, [[10, 9, 8], [7, 6]])
def test_yield_descending_10_6_descending_step_10(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", cli.ProcessingOrder.DESCENDING, 10
)
]
self.assertListEqual(partition, [[10, 9, 8, 7, 6]])
def test_yield_descending_6_10_step_4(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", block_step=4
)
]
self.assertListEqual(partition, [[10, 9, 8, 7], [6]])
def test_yield_descending_6_10_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", block_step=3
)
]
self.assertListEqual(partition, [[10, 9, 8], [7, 6]])
def test_yield_descending_6_10_descending_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", cli.ProcessingOrder.DESCENDING, 3
)
]
self.assertListEqual(partition, [[10, 9, 8], [7, 6]])
def test_yield_descending_6_10_descending_step_10(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", cli.ProcessingOrder.DESCENDING, 10
)
]
self.assertListEqual(partition, [[10, 9, 8, 7, 6]])
def test_yield_ascending_10_6_ascending_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", cli.ProcessingOrder.ASCENDING, 3
)
]
self.assertListEqual(partition, [[6, 7, 8], [9, 10]])
def test_yield_ascending_10_6_ascending_step_10(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", cli.ProcessingOrder.ASCENDING, 10
)
]
self.assertListEqual(partition, [[6, 7, 8, 9, 10]])
def test_yield_ascending_6_10_ascending_step_4(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", cli.ProcessingOrder.ASCENDING, 4
)
]
self.assertListEqual(partition, [[6, 7, 8, 9], [10]])
def test_yield_ascending_6_10_ascending_step_10(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", cli.ProcessingOrder.ASCENDING, 10
)
]
self.assertListEqual(partition, [[6, 7, 8, 9, 10]])
if __name__ == "__main__":
unittest.main()

Wyświetl plik

@ -36,8 +36,8 @@ const LandingNavbar = () => {
<Link> <Link>
<Image <Image
w="200px" w="200px"
src="/icons/bugout-dev-white.svg" src="https://s3.amazonaws.com/static.simiotics.com/moonstream/assets/White+logo.svg"
alt="bugout.dev" alt="Moonstream logo"
/> />
</Link> </Link>
</RouterLink> </RouterLink>