Andrey Dolgolev 2024-02-01 13:54:36 +03:00 zatwierdzone przez GitHub
commit ed812717b7
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
2 zmienionych plików z 201 dodań i 2 usunięć

Wyświetl plik

@ -1,12 +1,13 @@
import argparse
import logging
from typing import Optional
from typing import Optional, Literal
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType
from web3 import Web3
from web3.middleware import geth_poa_middleware
from .db import deduplicate_records
from ..db import yield_db_session_ctx
from ..settings import (
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
@ -341,6 +342,21 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
)
def handle_deduplicate(args: argparse.Namespace) -> None:
"""
Deduplicate database records
"""
with yield_db_session_ctx() as db_session:
deduplicate_records(
db_session,
args.blockchain_type,
args.table,
args.label,
args.type,
)
def main() -> None:
parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda _: parser.print_help())
@ -536,6 +552,49 @@ def main() -> None:
)
historical_crawl_parser.set_defaults(func=handle_historical_crawl)
database_cli = subparsers.add_parser("database", help="Database operations")
database_cli.add_argument(
"--blockchain-type",
"-b",
type=str,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)
database_cli.set_defaults(func=lambda _: database_cli.print_help())
database_cli_subparsers = database_cli.add_subparsers()
deduplicate_parser = database_cli_subparsers.add_parser(
"deduplicate",
help="Deduplicate database records",
)
deduplicate_parser.add_argument(
"--table",
"-t",
type=str,
choices=["blocks", "labels", "transactions"],
required=True,
help="Table type to deduplicate",
)
deduplicate_parser.add_argument(
"--label",
"-l",
type=str,
required=False,
help="Label to deduplicate",
)
deduplicate_parser.add_argument(
"--type",
"-y",
type=str,
choices=["event", "function"],
required=True,
help="Type to deduplicate",
)
args = parser.parse_args()
args.func(args)

Wyświetl plik

@ -1,11 +1,13 @@
import logging
import json
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Literal
from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from moonstreamdb.models import Base
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
from sqlalchemy.orm import Session
from sqlalchemy.sql import text
from ..settings import CRAWLER_LABEL
from .event_crawler import Event
@ -218,3 +220,141 @@ def add_function_calls_to_session(
logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)
def deduplicate_records(
db_session: Session,
blockchain_type: AvailableBlockchainType,
table: Literal["labels", "transactions", "blocks"],
label_name: Optional[str] = None,
label_type: Optional[str] = None,
) -> None:
"""
Deduplicates records in the database.
label name and label type work only for labels table.
"""
if table == "blocks":
raise NotImplementedError("Deduplication for blocks is not implemented yet")
if table == "labels":
label_model = get_label_model(blockchain_type)
if label_name is None or label_type is None:
raise ValueError(
"label_name and label_type are required for deduplication of labels"
)
if label_type == "event":
# get list of all label_type addresses
all_addresses = (
db_session.query(label_model.address.label("address"))
.filter(label_model.label == label_name)
.filter(label_model.label_data["type"] == "event")
.distinct()
.all()
) # can take a while
for address_raw in all_addresses:
address = address_raw[0]
deduplicate_records = db_session.execute(
text(
"""
WITH lates_labels AS (
SELECT
DISTINCT ON (transaction_hash, log_index) transaction_hash, log_index,
block_number as block_number,
created_at as created_at
FROM
{}
WHERE
label=:label
AND address=:address
AND label_data->>'type' = :label_type
ORDER BY
transaction_hash ASC,
log_index ASC,
block_number ASC,
created_at ASC
)
DELETE FROM
{} USING lates_labels
WHERE
label=:label
AND address=:address
AND label_data->>'type' = :label_type
AND {}.id not in (select id from lates_labels ) RETURNING {}.block_number;
""".format(
table, table, table, table
)
),
{"address": address, "label": label_name, "label_type": label_type},
)
db_session.commit()
logger.info(
f"Deleted {deduplicate_records} duplicate labels for address {address}"
)
if label_type == "tx_call":
# get list of all label_type addresses
all_addresses = (
db_session.query(label_model.address.label("address"))
.filter(label_model.label == label_name)
.filter(label_model.label_data["type"] == "tx_call")
.distinct()
.all()
)
for address_raw in all_addresses:
address = address_raw[0]
deduplicate_records = db_session.execute(
text(
"""
WITH lates_labels AS (
SELECT
DISTINCT ON (transaction_hash) transaction_hash,
block_number as block_number,
created_at as created_at
FROM
{}
WHERE
label=:label
AND address=:address
AND label_data->>'type' = :label_type
AND log_index is null
ORDER BY
transaction_hash ASC,
block_number ASC,
created_at ASC
)
DELETE FROM
{} USING lates_labels
WHERE
label=:label
AND address=:address
AND label_data->>'type' = :label_type
AND log_index is null
AND {}.id not in (select id from lates_labels ) RETURNING {}.block_number;
""".format(
table, table, table, table
)
),
{"address": address, "label": label_name, "label_type": label_type},
)
db_session.commit()
logger.info(
f"Deleted {deduplicate_records} duplicate labels for address {address}"
)
if table == "transactions":
raise NotImplementedError(
"Deduplication for transactions is not implemented yet"
)