Add update logic.

state-crawler-improvments
Andrey 2022-09-14 20:46:28 +03:00
rodzic c11ddd51ce
commit 8afabf9f0f
3 zmienionych plików z 53 dodań i 10 usunięć

Wyświetl plik

@ -1,7 +1,8 @@
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from typing import Any, Dict, List from typing import Any, Dict, List, Optional
from uuid import UUID
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@ -57,3 +58,4 @@ class TokenURIs(BaseModel):
block_number: int block_number: int
block_timestamp: str block_timestamp: str
address: str address: str
metadata_id: Optional[UUID] = None

Wyświetl plik

@ -20,6 +20,7 @@ from .db import (
get_uri_addresses, get_uri_addresses,
get_not_updated_metadata_for_address, get_not_updated_metadata_for_address,
metadata_to_label, metadata_to_label,
update_metadata,
) )
from ..settings import ( from ..settings import (
MOONSTREAM_METADATA_CRAWLER_THREADS, MOONSTREAM_METADATA_CRAWLER_THREADS,
@ -121,12 +122,26 @@ def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int):
for result in executor.map( for result in executor.map(
crawl_uri, [request for request in requests_chunk] crawl_uri, [request for request in requests_chunk]
): ):
db_session.add(
metadata_to_label( metadata = result[0]
metadata=result[0], token_uri_data = result[1]
blockchain_type=blockchain_type, label = metadata_to_label(
token_uri_data=result[1], metadata=metadata,
) blockchain_type=blockchain_type,
token_uri_data=token_uri_data,
)
if token_uri_data.metadata_id is None:
db_session.add(label)
writed_labels += 1
continue
update_metadata(
db_session,
blockchain_type,
token_uri_data.metadata_id,
label,
) )
writed_labels += 1 writed_labels += 1

Wyświetl plik

@ -117,7 +117,8 @@ def get_not_updated_metadata_for_address(
SELECT SELECT
DISTINCT ON((label_data ->> 'token_id') :: int) (label_data ->> 'token_id') :: text as token_id, DISTINCT ON((label_data ->> 'token_id') :: int) (label_data ->> 'token_id') :: text as token_id,
label_data ->>'token_uri' as token_uri, label_data ->>'token_uri' as token_uri,
block_number block_number,
id
from from
{} {}
where where
@ -135,7 +136,8 @@ def get_not_updated_metadata_for_address(
current_tokens_uri.block_timestamp as block_timestamp, current_tokens_uri.block_timestamp as block_timestamp,
current_tokens_uri.address as address, current_tokens_uri.address as address,
tokens_metadata.block_number as metadata_block_number, tokens_metadata.block_number as metadata_block_number,
tokens_metadata.token_uri as metadata_token_uri tokens_metadata.token_uri as metadata_token_uri,
tokens_metadata.id as metadata_id
from from
current_tokens_uri current_tokens_uri
left JOIN tokens_metadata ON current_tokens_uri.token_id = tokens_metadata.token_id left JOIN tokens_metadata ON current_tokens_uri.token_id = tokens_metadata.token_id
@ -145,7 +147,8 @@ def get_not_updated_metadata_for_address(
state_token_uri, state_token_uri,
view_state_block_number, view_state_block_number,
block_timestamp, block_timestamp,
address address,
metadata_id
from from
tokens_state tokens_state
where where
@ -167,8 +170,31 @@ def get_not_updated_metadata_for_address(
block_number=data[2], block_number=data[2],
block_timestamp=data[3], block_timestamp=data[3],
address=data[4], address=data[4],
metadata_id=data[5],
) )
for data in current_metadata for data in current_metadata
] ]
return results return results
def update_metadata(
db_session: Session,
blockchain_type: AvailableBlockchainType,
id: Dict[str, Any],
label: Any,
) -> None:
"""
Update metadata.
"""
label_model = get_label_model(blockchain_type)
db_session.query(label_model).filter(label_model.id == id).update(
{
"label_data": label.label_data,
"block_number": label.block_number,
"block_timestamp": label.block_timestamp,
},
synchronize_session=False,
)