Added --order argument to "blocks synchronize"

This specifies if you want to crawl blocks in ascending or descending
order.
pull/44/head
Neeraj Kashyap 2021-08-02 11:36:57 -07:00
rodzic c8869cbb2d
commit 50cad0f17f
2 zmienionych plików z 177 dodań i 15 usunięć

Wyświetl plik

@ -3,9 +3,11 @@ Moonstream crawlers CLI.
""" """
import argparse import argparse
from distutils.util import strtobool from distutils.util import strtobool
from enum import Enum
import json import json
import sys import sys
import time import time
from typing import Iterator, List
from .ethereum import ( from .ethereum import (
crawl_blocks_executor, crawl_blocks_executor,
@ -17,37 +19,62 @@ from .ethereum import (
from .settings import MOONSTREAM_CRAWL_WORKERS from .settings import MOONSTREAM_CRAWL_WORKERS
def yield_blocks_numbers_lists(blocks_range_str: str) -> None: class ProcessingOrder(Enum):
DESCENDING = 0
ASCENDING = 1
def yield_blocks_numbers_lists(
blocks_range_str: str,
order: ProcessingOrder = ProcessingOrder.DESCENDING,
block_step: int = 1000,
) -> Iterator[List[int]]:
""" """
Generate list of blocks. Generate list of blocks.
Block steps used to prevent long executor tasks and data loss possibility. Block steps used to prevent long executor tasks and data loss possibility.
""" """
block_step = 1000
try: try:
blocks_start_end = blocks_range_str.split("-") blocks_start_end = blocks_range_str.split("-")
bottom_block_number = int(blocks_start_end[0]) input_start_block = int(blocks_start_end[0])
top_block_number = int(blocks_start_end[1]) input_end_block = int(blocks_start_end[1])
required_blocks_len = top_block_number - bottom_block_number + 1
except Exception: except Exception:
print( print(
"Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340" "Wrong format provided, expected {bottom_block}-{top_block}, as ex. 105-340"
) )
return return
print(f"Required {required_blocks_len} blocks to process") starting_block = max(input_start_block, input_end_block)
ending_block = min(input_start_block, input_end_block)
while not top_block_number < bottom_block_number: stepsize = -1
temp_bottom_block_number = top_block_number - block_step if order == ProcessingOrder.ASCENDING:
if temp_bottom_block_number < bottom_block_number: starting_block = min(input_start_block, input_end_block)
temp_bottom_block_number = bottom_block_number - 1 ending_block = max(input_start_block, input_end_block)
blocks_numbers_list = list( stepsize = 1
range(top_block_number, temp_bottom_block_number, -1)
) current_block = starting_block
def keep_going() -> bool:
if order == ProcessingOrder.ASCENDING:
return current_block <= ending_block
return current_block >= ending_block
while keep_going():
temp_ending_block = current_block + stepsize * block_step
if order == ProcessingOrder.ASCENDING:
if temp_ending_block > ending_block:
temp_ending_block = ending_block + 1
else:
if temp_ending_block < ending_block:
temp_ending_block = ending_block - 1
blocks_numbers_list = list(range(current_block, temp_ending_block, stepsize))
yield blocks_numbers_list yield blocks_numbers_list
top_block_number -= block_step if order == ProcessingOrder.ASCENDING:
current_block += block_step
else:
current_block -= block_step
def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None: def ethcrawler_blocks_sync_handler(args: argparse.Namespace) -> None:
@ -169,6 +196,18 @@ def main() -> None:
description="Ethereum blocks commands" description="Ethereum blocks commands"
) )
valid_processing_orders = {
"asc": ProcessingOrder.ASCENDING,
"desc": ProcessingOrder.DESCENDING,
}
def processing_order(raw_order: str) -> ProcessingOrder:
if raw_order in valid_processing_orders:
return valid_processing_orders[raw_order]
raise ValueError(
f"Invalid processing order ({raw_order}). Valid choices: {valid_processing_orders.keys()}"
)
parser_ethcrawler_blocks_sync = subcommands_ethcrawler_blocks.add_parser( parser_ethcrawler_blocks_sync = subcommands_ethcrawler_blocks.add_parser(
"synchronize", description="Synchronize to latest ethereum block commands" "synchronize", description="Synchronize to latest ethereum block commands"
) )
@ -186,6 +225,12 @@ def main() -> None:
default=0, default=0,
help="(Optional) Block to start synchronization from. Default: 0", help="(Optional) Block to start synchronization from. Default: 0",
) )
parser_ethcrawler_blocks_sync.add_argument(
"--order",
type=processing_order,
choices=valid_processing_orders,
help="Order in which to process blocks",
)
parser_ethcrawler_blocks_sync.set_defaults(func=ethcrawler_blocks_sync_handler) parser_ethcrawler_blocks_sync.set_defaults(func=ethcrawler_blocks_sync_handler)
parser_ethcrawler_blocks_add = subcommands_ethcrawler_blocks.add_parser( parser_ethcrawler_blocks_add = subcommands_ethcrawler_blocks.add_parser(

Wyświetl plik

@ -0,0 +1,117 @@
import unittest
from . import cli
class TestYieldBlockNumbersLists(unittest.TestCase):
def test_yield_descending_10_6_step_4(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", block_step=4
)
]
self.assertListEqual(partition, [[10, 9, 8, 7], [6]])
def test_yield_descending_10_6_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", block_step=3
)
]
self.assertListEqual(partition, [[10, 9, 8], [7, 6]])
def test_yield_descending_10_6_descending_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", cli.ProcessingOrder.DESCENDING, 3
)
]
self.assertListEqual(partition, [[10, 9, 8], [7, 6]])
def test_yield_descending_10_6_descending_step_10(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", cli.ProcessingOrder.DESCENDING, 10
)
]
self.assertListEqual(partition, [[10, 9, 8, 7, 6]])
def test_yield_descending_6_10_step_4(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", block_step=4
)
]
self.assertListEqual(partition, [[10, 9, 8, 7], [6]])
def test_yield_descending_6_10_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", block_step=3
)
]
self.assertListEqual(partition, [[10, 9, 8], [7, 6]])
def test_yield_descending_6_10_descending_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", cli.ProcessingOrder.DESCENDING, 3
)
]
self.assertListEqual(partition, [[10, 9, 8], [7, 6]])
def test_yield_descending_6_10_descending_step_10(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", cli.ProcessingOrder.DESCENDING, 10
)
]
self.assertListEqual(partition, [[10, 9, 8, 7, 6]])
def test_yield_ascending_10_6_ascending_step_3(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", cli.ProcessingOrder.ASCENDING, 3
)
]
self.assertListEqual(partition, [[6, 7, 8], [9, 10]])
def test_yield_ascending_10_6_ascending_step_10(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"10-6", cli.ProcessingOrder.ASCENDING, 10
)
]
self.assertListEqual(partition, [[6, 7, 8, 9, 10]])
def test_yield_ascending_6_10_ascending_step_4(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", cli.ProcessingOrder.ASCENDING, 4
)
]
self.assertListEqual(partition, [[6, 7, 8, 9], [10]])
def test_yield_ascending_6_10_ascending_step_10(self):
partition = [
block_numbers_list
for block_numbers_list in cli.yield_blocks_numbers_lists(
"6-10", cli.ProcessingOrder.ASCENDING, 10
)
]
self.assertListEqual(partition, [[6, 7, 8, 9, 10]])
if __name__ == "__main__":
unittest.main()