black formating.

pull/752/head
Andrey 2023-02-10 12:04:34 +02:00
rodzic a3038b1f88
commit 84ec8c80a8
18 zmienionych plików z 1 dodań i 97 usunięć

Wyświetl plik

@ -113,7 +113,6 @@ async def status_handler(
s3_client = boto3.client("s3")
try:
background_tasks.add_task(
dashboard.stats_generate_api_task,
timescales=stats_update.timescales,
@ -133,13 +132,11 @@ async def status_handler(
for dashboard_subscription_filters in dashboard_resource.resource_data[
"subscription_settings"
]:
subscription = subscription_by_id[
dashboard_subscription_filters["subscription_id"]
]
for timescale in stats_update.timescales:
presigned_urls_response[subscription.id] = {}
try:
@ -181,7 +178,6 @@ async def queries_data_update_handler(
request_data: data.QueryDataUpdate,
background_tasks: BackgroundTasks,
) -> Dict[str, Any]:
s3_client = boto3.client("s3")
expected_query_parameters = text(request_data.query)._bindparams.keys()
@ -213,7 +209,6 @@ async def queries_data_update_handler(
raise MoonstreamHTTPException(status_code=500)
try:
background_tasks.add_task(
queries.data_generate,
bucket=MOONSTREAM_S3_QUERIES_BUCKET,

Wyświetl plik

@ -40,7 +40,6 @@ def recive_S3_data_from_query(
time_await: int = 2,
max_retries: int = 30,
) -> Any:
"""
Await the query to be update data on S3 with if_modified_since and return new the data.
"""
@ -97,7 +96,6 @@ def generate_report(
"""
try:
json_data = recive_S3_data_from_query(
client=client,
token=token,
@ -149,7 +147,6 @@ def delete_user_query(client: Moonstream, token: str, query_name: str):
def init_game_bank_queries_handler(args: argparse.Namespace):
"""
Create the game bank queries.
"""
@ -157,7 +154,6 @@ def init_game_bank_queries_handler(args: argparse.Namespace):
client = Moonstream()
for query in cu_bank_queries:
try:
if args.overwrite:
try:
@ -184,7 +180,6 @@ def init_game_bank_queries_handler(args: argparse.Namespace):
def init_tokenomics_queries_handler(args: argparse.Namespace):
"""
Create the tokenomics queries.
"""
@ -192,7 +187,6 @@ def init_tokenomics_queries_handler(args: argparse.Namespace):
client = Moonstream()
for query in tokenomics_queries:
try:
if args.overwrite:
try:
@ -219,7 +213,6 @@ def init_tokenomics_queries_handler(args: argparse.Namespace):
def run_tokenomics_queries_handler(args: argparse.Namespace):
client = Moonstream()
query_name = "erc20_721_volume"
@ -236,7 +229,6 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
for address, type in addresess_erc20_721.items():
for range in ranges:
params: Dict[str, Any] = {
"address": address,
"type": type,
@ -260,7 +252,6 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
for address, type in addresess_erc20_721.items():
for range in ranges:
params = {
"address": address,
"type": type,
@ -285,7 +276,6 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
for address in addresess_erc1155:
for range in ranges:
params = {
"address": address,
"time_format": range["time_format"],
@ -329,11 +319,8 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
query_name = "most_active_buyers"
for address, type in addresess_erc20_721.items():
if type == "NFT":
for range in ranges:
params = {
"address": address,
"time_range": range["time_range"],
@ -354,11 +341,8 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
query_name = "most_active_sellers"
for address, type in addresess_erc20_721.items():
if type == "NFT":
for range in ranges:
params = {
"address": address,
"time_range": range["time_range"],
@ -378,9 +362,7 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
query_name = "lagerst_owners"
for address, type in addresess_erc20_721.items():
if type == "NFT":
params = {
"address": address,
}
@ -400,9 +382,7 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
query_name = "total_supply_erc721"
for address, type in addresess_erc20_721.items():
if type == "NFT":
params = {
"address": address,
}
@ -422,7 +402,6 @@ def run_tokenomics_queries_handler(args: argparse.Namespace):
query_name = "total_supply_terminus"
for address in addresess_erc1155:
params = {
"address": address,
}
@ -471,9 +450,7 @@ def create_user_query_handler(args: argparse.Namespace):
client = Moonstream()
for query in tokenomics_queries:
if query["name"] == args.name:
create_user_query(
client=client,
token=args.moonstream_token,
@ -493,7 +470,6 @@ def generate_game_bank_report(args: argparse.Namespace):
for query in client.list_queries(
token=args.moonstream_token,
).queries:
params = {}
if (
@ -534,7 +510,6 @@ def generate_game_bank_report(args: argparse.Namespace):
def main():
parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda _: parser.print_help())

Wyświetl plik

@ -45,7 +45,6 @@ class NowResponse(BaseModel):
class QueryDataUpdate(BaseModel):
file_type: str
query: str
params: Dict[str, Any] = Field(default_factory=dict)

Wyświetl plik

@ -35,6 +35,7 @@ from ..moonworm_crawler.event_crawler import Event, get_block_timestamp
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# TODO: ADD VALUE!!!
@dataclass
class ExtededFunctionCall(ContractFunctionCall):
@ -82,7 +83,6 @@ def add_function_calls_with_gas_price_to_session(
blockchain_type: AvailableBlockchainType,
label_name: str,
) -> None:
label_model = get_label_model(blockchain_type)
transactions_hashes_to_save = [
function_call.transaction_hash for function_call in function_calls
@ -152,7 +152,6 @@ def process_transaction(
transaction: Dict[str, Any],
blocks_cache: Dict[int, int],
):
try:
raw_function_call = contract.decode_function_input(transaction["input"])
function_name = raw_function_call[0].fn_name
@ -270,7 +269,6 @@ def populate_with_events(
to_block: int,
batch_size: int = 100,
):
current_block = from_block
events_abi = [event for event in abi if event["type"] == "event"]

Wyświetl plik

@ -58,7 +58,6 @@ def identities_cmc_add_handler(args: argparse.Namespace) -> None:
break
with yield_db_session_ctx() as db_session:
for coin in response["data"]:
if coin["platform"] is not None:
if (

Wyświetl plik

@ -43,7 +43,6 @@ def leak_of_crawled_uri(
def crawl_uri(metadata_uri: str) -> Any:
"""
Get metadata from URI
"""
@ -51,7 +50,6 @@ def crawl_uri(metadata_uri: str) -> Any:
result = None
while retry < 3:
try:
response = urllib.request.urlopen(metadata_uri, timeout=10)
if response.status == 200:
@ -74,7 +72,6 @@ def crawl_uri(metadata_uri: str) -> Any:
def parse_metadata(
blockchain_type: AvailableBlockchainType, batch_size: int, max_recrawl: int
):
"""
Parse all metadata of tokens.
"""
@ -95,7 +92,6 @@ def parse_metadata(
with yield_db_read_only_session_ctx() as db_session_read_only:
try:
# get all tokens with uri
logger.info("Requesting all tokens with uri from database")
uris_of_tokens = get_uris_of_tokens(db_session_read_only, blockchain_type)
@ -108,7 +104,6 @@ def parse_metadata(
tokens_uri_by_address[token_uri_data.address].append(token_uri_data)
for address in tokens_uri_by_address:
logger.info(f"Starting to crawl metadata for address: {address}")
already_parsed = get_current_metadata_for_address(
@ -153,7 +148,6 @@ def parse_metadata(
with db_session.begin():
for token_uri_data in requests_chunk:
if token_uri_data.token_id not in parsed_with_leak:
metadata = crawl_uri(token_uri_data.token_uri)
@ -181,7 +175,6 @@ def parse_metadata(
def handle_crawl(args: argparse.Namespace) -> None:
"""
Parse all metadata of tokens.
"""

Wyświetl plik

@ -18,7 +18,6 @@ def metadata_to_label(
token_uri_data: TokenURIs,
label_name=METADATA_CRAWLER_LABEL,
):
"""
Creates a label model.
"""
@ -62,7 +61,6 @@ def commit_session(db_session: Session) -> None:
def get_uris_of_tokens(
db_session: Session, blockchain_type: AvailableBlockchainType
) -> List[TokenURIs]:
"""
Get meatadata URIs.
"""
@ -144,7 +142,6 @@ def get_current_metadata_for_address(
def get_tokens_wich_maybe_updated(
db_session: Session, blockchain_type: AvailableBlockchainType, address: str
):
label_model = get_label_model(blockchain_type)
table = label_model.__tablename__

Wyświetl plik

@ -25,7 +25,6 @@ logger = logging.getLogger(__name__)
def handle_crawl(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain_type)
subscription_type = blockchain_type_to_subscription_type(blockchain_type)

Wyświetl plik

@ -258,7 +258,6 @@ def make_function_call_crawl_jobs(
method_signature_by_address[contract_address] = [method_signature]
else:
if method_signature not in method_signature_by_address[contract_address]:
crawl_job_by_address[contract_address].contract_abi.append(abi)
method_signature_by_address[contract_address].append(method_signature)

Wyświetl plik

@ -193,7 +193,6 @@ def add_function_calls_to_session(
blockchain_type: AvailableBlockchainType,
label_name=CRAWLER_LABEL,
) -> None:
label_model = get_label_model(blockchain_type)
transactions_hashes_to_save = [
function_call.transaction_hash for function_call in function_calls

Wyświetl plik

@ -152,13 +152,11 @@ def _autoscale_crawl_events(
batch_size: int = 1000,
db_block_query_batch=10,
) -> Tuple[List[Event], int]:
"""
Crawl events with auto regulated batch_size.
"""
all_events = []
for job in jobs:
raw_events, batch_size = moonworm_autoscale_crawl_events(
web3,
job.event_abi,

Wyświetl plik

@ -27,7 +27,6 @@ def _crawl_functions(
from_block: int,
to_block: int,
) -> List[ContractFunctionCall]:
shared_state = MockState()
crawled_functions = []

Wyświetl plik

@ -62,7 +62,6 @@ def historical_crawler(
while start_block >= end_block:
try:
time.sleep(min_sleep_time)
batch_end_block = max(
@ -85,7 +84,6 @@ def historical_crawler(
)
else:
all_events, max_blocks_batch = _autoscale_crawl_events(
db_session=db_session,
blockchain_type=blockchain_type,

Wyświetl plik

@ -44,7 +44,6 @@ def make_multicall(
block_timestamp: int,
block_number: str = "latest",
) -> Any:
multicall_calls = []
for call in calls:
@ -140,11 +139,9 @@ def crawl_calls_level(
max_batch_size=5000,
min_batch_size=4,
):
calls_of_level = []
for call in calls:
if call["generated_hash"] in responces:
continue
parameters = []
@ -152,7 +149,6 @@ def crawl_calls_level(
logger.info(f"Call: {json.dumps(call, indent=4)}")
for input in call["inputs"]:
if type(input["value"]) in (str, int):
if input["value"] not in responces:
parameters.append([input["value"]])
@ -173,7 +169,6 @@ def crawl_calls_level(
raise
for call_parameters in itertools.product(*parameters):
# hack for tuples product
if len(call_parameters) == 1 and type(call_parameters[0]) == tuple:
call_parameters = call_parameters[0]
@ -191,10 +186,8 @@ def crawl_calls_level(
retry = 0
while len(calls_of_level) > 0:
make_multicall_result = []
try:
call_chunk = calls_of_level[:batch_size]
logger.info(
@ -240,7 +233,6 @@ def crawl_calls_level(
# results parsing and writing to database
add_to_session_count = 0
for result in make_multicall_result:
db_view = view_call_to_label(blockchain_type, result)
db_session.add(db_view)
add_to_session_count += 1
@ -322,7 +314,6 @@ def parse_jobs(
for input in method_abi["inputs"]:
if type(input["value"]) in (str, int, list):
abi["inputs"].append(input)
elif type(input["value"]) == dict:
@ -346,7 +337,6 @@ def parse_jobs(
calls[level] = []
calls[level].append(abi)
else:
level = 0
if not calls.get(level):
@ -374,7 +364,6 @@ def parse_jobs(
interfaces = {}
for contract_address in contracts_ABIs:
# collect abis for each contract
abis = []
@ -422,7 +411,6 @@ def parse_jobs(
)
for level in call_tree_levels:
logger.info(f"Crawl level: {level}")
logger.info(f"Jobs amount: {len(calls[level])}")
@ -445,7 +433,6 @@ def parse_jobs(
def handle_crawl(args: argparse.Namespace) -> None:
"""
Ability to track states of the contracts.
@ -497,7 +484,6 @@ def parse_abi(args: argparse.Namespace) -> None:
def clean_labels_handler(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain)
web3_client = _retry_connect_web3(

Wyświetl plik

@ -16,7 +16,6 @@ def view_call_to_label(
call: Dict[str, Any],
label_name=VIEW_STATE_CRAWLER_LABEL,
):
"""
Creates a label model.

Wyświetl plik

@ -55,7 +55,6 @@ def get_nonce(web3: Web3, address: ChecksumAddress) -> Nonce:
def submit_transaction(
web3: Web3, transaction: Union[TxParams, Any], signer_private_key: str
) -> HexBytes:
"""
Signs and submits json transaction to blockchain from the name of signer
"""

Wyświetl plik

@ -96,7 +96,6 @@ def push_statistics(
bucket: str,
dashboard_id: Union[UUID, str],
) -> None:
result_bytes = json.dumps(statistics_data).encode("utf-8")
result_key = f'{MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX}/{blockchain_by_subscription_id[subscription.resource_data["subscription_type_id"]]}/contracts_data/{subscription.resource_data["address"]}/{dashboard_id}/v1/{timescale}.json'
@ -122,7 +121,6 @@ def generate_data(
metric_type: str,
crawler_label: str,
):
label_model = get_label_model(blockchain_type)
# create empty time series
@ -223,7 +221,6 @@ def generate_data(
response_labels: Dict[Any, Any] = {}
for created_date, label, count in labels_time_series:
if not response_labels.get(label):
response_labels[label] = []
@ -269,7 +266,6 @@ def get_unique_address(
def get_blocks_state(
db_session: Session, blockchain_type: AvailableBlockchainType
) -> Dict[str, int]:
"""
Generate meta information about
"""
@ -325,7 +321,6 @@ def get_blocks_state(
def generate_list_of_names(
type: str, subscription_filters: Dict[str, Any], read_abi: bool, abi_json: Any
):
"""
Generate list of names for select from database by name field
"""
@ -333,7 +328,6 @@ def generate_list_of_names(
if read_abi:
names = [item["name"] for item in abi_json if item["type"] == type]
else:
names = [
item["name"]
for item in subscription_filters[abi_type_to_dashboards_type[type]]
@ -356,7 +350,6 @@ def process_external_merged(
result: Dict[str, Any] = {}
for external_call_hash, external_call in external_calls.items():
try:
func_input_abi = []
input_args = []
@ -540,7 +533,6 @@ def generate_web3_metrics(
# TODO: Remove it if ABI already have correct web3_call signature.
if "HatchStartedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches started.",
@ -557,7 +549,6 @@ def generate_web3_metrics(
)
if "HatchFinishedEvent" in events:
extention_data.append(
{
"display_name": "Number of hatches finished.",
@ -584,7 +575,6 @@ def stats_generate_handler(args: argparse.Namespace):
blockchain_type = AvailableBlockchainType(args.blockchain)
with yield_db_read_only_session_ctx() as db_session:
start_time = time.time()
dashboard_resources: BugoutResources = bc.list_resources(
@ -599,7 +589,6 @@ def stats_generate_handler(args: argparse.Namespace):
available_subscriptions: List[BugoutResource] = []
for subscription_type in subscription_ids_by_blockchain[args.blockchain]:
# Create subscriptions dict for get subscriptions by id.
blockchain_subscriptions: BugoutResources = bc.list_resources(
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
@ -646,7 +635,6 @@ def stats_generate_handler(args: argparse.Namespace):
address_dashboard_id_subscription_id_tree: Dict[str, Any] = {}
for dashboard in dashboard_resources.resources:
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
@ -750,7 +738,6 @@ def stats_generate_handler(args: argparse.Namespace):
]
if len(external_calls) > 0:
for external_call in external_calls:
# create external_call selectors.
# display_name not included in hash
external_call_without_display_name = {
@ -816,7 +803,6 @@ def stats_generate_handler(args: argparse.Namespace):
)
for address in address_dashboard_id_subscription_id_tree.keys():
current_blocks_state = get_blocks_state(
db_session=db_session, blockchain_type=blockchain_type
)
@ -868,15 +854,12 @@ def stats_generate_handler(args: argparse.Namespace):
for dashboard_id in address_dashboard_id_subscription_id_tree[
address
]: # Dashboards loop for address
for (
subscription_id
) in address_dashboard_id_subscription_id_tree[address][
dashboard_id
]:
try:
extention_data = []
s3_subscription_data_object: Dict[str, Any] = {}
@ -892,9 +875,7 @@ def stats_generate_handler(args: argparse.Namespace):
) in merged_external_calls[dashboard_id][
subscription_id
].items():
if external_call_hash in external_calls_results:
extention_data.append(
{
"display_name": display_name,
@ -995,7 +976,6 @@ def stats_generate_api_task(
"""
with yield_db_read_only_session_ctx() as db_session:
logger.info(f"Amount of blockchain subscriptions: {len(subscription_by_id)}")
s3_client = boto3.client("s3")
@ -1003,9 +983,7 @@ def stats_generate_api_task(
for dashboard_subscription_filters in dashboard.resource_data[
"subscription_settings"
]:
try:
subscription_id = dashboard_subscription_filters["subscription_id"]
blockchain_type = AvailableBlockchainType(
@ -1029,13 +1007,11 @@ def stats_generate_api_task(
# Read required events, functions and web3_call form ABI
if not subscription_by_id[subscription_id].resource_data["abi"]:
methods = []
events = []
abi_json = {}
else:
bucket = subscription_by_id[subscription_id].resource_data["bucket"]
key = subscription_by_id[subscription_id].resource_data["s3_path"]
abi = s3_client.get_object(
@ -1075,7 +1051,6 @@ def stats_generate_api_task(
)
for timescale in timescales:
start_date = (
datetime.utcnow() - timescales_delta[timescale]["timedelta"]
)

Wyświetl plik

@ -32,7 +32,6 @@ class QueryNotValid(Exception):
def push_statistics(s3: Any, data: Any, key: str, bucket: str) -> None:
s3.put_object(
Body=data,
Bucket=bucket,
@ -55,7 +54,6 @@ def query_validation(query: str) -> str:
def to_json_types(value):
if isinstance(value, (str, int, tuple, list, dict)):
return value
elif isinstance(value, set):
@ -65,7 +63,6 @@ def to_json_types(value):
def from_json_types(value):
if isinstance(value, (str, int, tuple, dict)):
return value
elif isinstance(value, list): # psycopg2 issue with list support