Add autoscale batch size.

pull/684/head
Andrey 2022-10-26 14:48:30 +03:00
rodzic c580a7d3c2
commit 15126c5086
2 zmienionych plików z 132 dodań i 61 usunięć

Wyświetl plik

@ -33,6 +33,7 @@ tags_metadata = [
},
{"name": "dashboards", "description": "Operations with user dashboards."},
{"name": "queries", "description": "Operations with user queries."},
{"name": "public", "description": "Operations with public endpoints."},
{"name": "streams", "description": "Operations with data streams and filters."},
{"name": "subscriptions", "description": "Operations with user subscriptions."},
{"name": "time", "description": "Server timestamp endpoints."},

Wyświetl plik

@ -41,13 +41,31 @@ def make_multicall(
block_number: str = "latest",
) -> Any:
multicall_calls = [
(
call["address"],
call["method"].encode_data(call["inputs"]).hex(),
)
for call in calls
]
# multicall_calls = [
# (
# call["address"],
# call["method"].encode_data(call["inputs"]).hex(),
# )
# for call in calls
# ]
multicall_calls = []
# Remove!
logger.info(f"multicall for methods {set([call['method'].name for call in calls])}")
for call in calls:
try:
multicall_calls.append(
(
call["address"],
call["method"].encode_data(call["inputs"]).hex(),
)
)
except Exception as e:
logger.error(
f'Error encoding data for method {call["method"].name} call: {call}'
)
multicall_result = multicall_method(False, calls=multicall_calls).call(
block_identifier=block_number
@ -57,10 +75,41 @@ def make_multicall(
# Handle the case with not successful calls
for index, encoded_data in enumerate(multicall_result):
if encoded_data[0]:
try:
if encoded_data[0]:
results.append(
{
"result": calls[index]["method"].decode_data(encoded_data[1]),
"hash": calls[index]["hash"],
"method": calls[index]["method"],
"address": calls[index]["address"],
"name": calls[index]["method"].name,
"inputs": calls[index]["inputs"],
"call_data": multicall_calls[index][1],
"block_number": block_number,
"block_timestamp": block_timestamp,
"status": encoded_data[0],
}
)
else:
results.append(
{
"result": calls[index]["method"].decode_data(encoded_data[1]),
"hash": calls[index]["hash"],
"method": calls[index]["method"],
"address": calls[index]["address"],
"name": calls[index]["method"].name,
"inputs": calls[index]["inputs"],
"call_data": multicall_calls[index][1],
"block_number": block_number,
"block_timestamp": block_timestamp,
"status": encoded_data[0],
}
)
except Exception as e:
results.append(
{
"result": calls[index]["method"].decode_data(encoded_data[1])[0],
"result": str(encoded_data[1]),
"hash": calls[index]["hash"],
"method": calls[index]["method"],
"address": calls[index]["address"],
@ -70,23 +119,16 @@ def make_multicall(
"block_number": block_number,
"block_timestamp": block_timestamp,
"status": encoded_data[0],
"error": str(e),
}
)
else:
results.append(
{
"result": calls[index]["method"].decode_data(encoded_data[1]),
"hash": calls[index]["hash"],
"method": calls[index]["method"],
"address": calls[index]["address"],
"name": calls[index]["method"].name,
"inputs": calls[index]["inputs"],
"call_data": multicall_calls[index][1],
"block_number": block_number,
"block_timestamp": block_timestamp,
"status": encoded_data[0],
}
logger.error(
f"Error decoding data for for method {call['method'].name} call {calls[index]}: {e}."
)
# data is not decoded, return the encoded data
logger.error(f"Encoded data: {encoded_data}")
return results
@ -101,9 +143,12 @@ def crawl_calls_level(
block_number,
blockchain_type,
block_timestamp,
max_batch_size=5000,
min_batch_size=4,
):
calls_of_level = []
make_multicall_result = []
for call in calls:
parameters = []
@ -118,8 +163,9 @@ def crawl_calls_level(
contracts_ABIs[call["address"]][input["value"]]["name"]
== "totalSupply"
):
print(responces[input["value"]][0])
parameters.append(
list(range(1, responces[input["value"]][0] + 1))
list(range(1, responces[input["value"]][0][0] + 1))
)
else:
parameters.append(responces[input["value"]])
@ -140,43 +186,54 @@ def crawl_calls_level(
}
)
for call_chunk in [
calls_of_level[i : i + batch_size]
for i in range(0, len(calls_of_level), batch_size)
]:
retry = 0
while True:
try:
# for call_chunk in [
# calls_of_level[i : i + batch_size]
# for i in range(0, len(calls_of_level), batch_size)
# ]:
retry = 0
logger.info(
f"Calling multicall2 with {len(call_chunk)} calls at block {block_number}"
)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(
make_multicall,
multicall_method,
call_chunk,
block_timestamp,
block_number,
)
make_multicall_result = future.result(timeout=20)
logger.info(
f"Multicall2 returned {len(make_multicall_result)} results at block {block_number}"
)
retry = 0
break
except ValueError as e:
logger.info(f"ValueError: {e}, retrying")
retry = +1
if retry > 5:
raise (e)
except TimeoutError as e:
logger.info(f"TimeoutError: {e}, retrying")
retry = +1
if retry > 5:
raise (e)
time.sleep(2)
while len(calls_of_level) > 0:
try:
call_chunk = calls_of_level[:batch_size]
logger.info(
f"Calling multicall2 with {len(call_chunk)} calls at block {block_number}"
)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(
make_multicall,
multicall_method,
call_chunk,
block_timestamp,
block_number,
)
make_multicall_result = future.result(timeout=20)
logger.info(
f"Multicall2 returned {len(make_multicall_result)} results at block {block_number}"
)
retry = 0
batch_size = min(batch_size * 2, max_batch_size)
calls_of_level = calls_of_level[batch_size:]
except ValueError as e:
logger.info(f"ValueError: {e}, retrying")
retry += 1
if "missing trie node" in str(e):
time.sleep(20)
if retry > 5:
raise (e)
batch_size = max(batch_size // 3, min_batch_size)
except TimeoutError as e:
logger.info(f"TimeoutError: {e}, retrying")
retry += 1
if retry > 5:
raise (e)
batch_size = max(batch_size // 3, min_batch_size)
except Exception as e:
logger.info(f"Exception: {e}")
raise (e)
time.sleep(2)
print(f"retry: {retry}")
# results parsing and writing to database
add_to_session_count = 0
for result in make_multicall_result:
@ -190,6 +247,9 @@ def crawl_calls_level(
responces[result["hash"]].append(result["result"])
commit_session(db_session)
logger.info(f"{add_to_session_count} labels commit to database.")
make_multicall_result = []
return batch_size
def parse_jobs(
@ -322,7 +382,7 @@ def parse_jobs(
logger.info("Crawl level: 0")
logger.info(f"Jobs amount: {len(calls[0])}")
crawl_calls_level(
batch_size = crawl_calls_level(
db_session,
calls[0],
responces,
@ -340,7 +400,7 @@ def parse_jobs(
logger.info(f"Crawl level: {level}")
logger.info(f"Jobs amount: {len(calls[level])}")
crawl_calls_level(
batch_size = crawl_calls_level(
db_session,
calls[level],
responces,
@ -602,3 +662,13 @@ def main() -> None:
if __name__ == "__main__":
main()
(
False,
b"\x08\xc3y\xa0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00<LibStats: Invalid DNA Lifecycle Stage, must be baby or adult\x00\x00\x00\x00",
)
(
False,
b"\x08\xc3y\xa0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00<LibStats: Invalid DNA Lifecycle Stage, must be baby or adult\x00\x00\x00\x00",
)