Add address split.

pull/919/head
Andrey 2023-09-19 14:56:20 +03:00
rodzic 1e848f72d4
commit 04c3c7aad5
1 zmienionych plików z 41 dodań i 30 usunięć

Wyświetl plik

@ -125,49 +125,60 @@ def deduplicate_moonworm_task_by_selector():
## get selector
selector = [tag for tag in tags if tag.startswith("abi_selector:")]
address = [tag for tag in tags if tag.startswith("address:")]
if len(selector) == 0:
logger.warn(
f"Unable to find selector in task: {task.entry_url.split()[-1]}"
)
continue
selector = selector[0]
selector = selector[0].split(":")[1]
if selector not in selectors:
selectors[selector] = {"entries": {}}
selectors[selector]["entries"][task.entry_url.split("/")[-1]] = task.created_at
logger.info(f"Found {len(selectors)} selectors")
for selector, tasks_dict in selectors.items():
if len(tasks_dict["entries"]) == 1:
if len(address) == 0:
logger.warn(f"Unable to find address in task: {task.entry_url.split()[-1]}")
continue
## find latest task
address = address[0].split(":")[1]
latest_task_id = max(
tasks_dict["entries"], key=lambda key: tasks_dict["entries"][key]
)
if address not in selectors:
selectors[address] = {selector: {"entries": {}}}
## remove all tasks except latest
selectors[address][selector]["entries"][
task.entry_url.split("/")[-1]
] = task.created_at
logger.info(
f"Found {len(tasks_dict['entries'])} tasks with selector {selector}"
)
logger.info(f"Found {len(selectors)} addresses")
for task_id in tasks_dict["entries"]:
if task_id == latest_task_id:
for address, selectors_dict in selectors.items():
for selector, tasks_dict in selectors_dict.items():
if len(tasks_dict["entries"]) == 1:
continue
try:
bc.delete_entry(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=task_id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
except BugoutResponseException as e:
logger.error(f"Unable to delete entry: {e}")
continue
## find earliest task
logger.info(f"Deleted entry: {task_id}")
earliest_task_id = min(
tasks_dict["entries"], key=lambda key: tasks_dict["entries"][key]
)
## remove all tasks except latest
logger.info(
f"Found {len(tasks_dict['entries'])} tasks with selector {selector}"
)
for task_id in tasks_dict["entries"]:
if task_id == earliest_task_id:
continue
try:
bc.delete_entry(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
entry_id=task_id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
)
except BugoutResponseException as e:
logger.error(f"Unable to delete entry: {e}")
continue
logger.info(f"Deleted entry: {task_id}")