Handle checkpoints correctly when importing data

Also added "nfts" command as a console script in setup.py
pull/304/head
Neeraj Kashyap 2021-10-02 12:22:24 -07:00
rodzic 2f3978d416
commit f8fb803fd4
2 zmienionych plików z 66 dodań i 21 usunięć

Wyświetl plik

@ -4,7 +4,9 @@ a datastore for a Moonstream NFTs dataset.
"""
import logging
import sqlite3
from typing import Any, cast, List, Tuple, Optional, Union
from typing import Any, cast, List, Tuple, Optional
from tqdm import tqdm
from .data import EventType, NFTEvent, NFTMetadata
@ -147,6 +149,19 @@ def get_checkpoint_offset(
return None
def delete_checkpoints(
conn: sqlite3.Connection, event_type: EventType, commit: bool = True
) -> None:
cur = conn.cursor()
cur.execute(f"DELETE FROM checkpoint where event_type='{event_type.value}';")
if commit:
try:
conn.commit()
except:
conn.rollback()
raise
def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int):
query = f"""
INSERT INTO checkpoint (
@ -247,29 +262,47 @@ def import_data(
batch: List[Any] = []
for row in source_cur:
for row in tqdm(source_cur, desc="Rows processed"):
if event_type == EventType.ERC721:
batch.append(NFTMetadata(*cast(Tuple[str, str, str], row)))
else:
batch.append(
NFTEvent(
*cast(
Tuple[
str,
EventType,
str,
str,
str,
str,
str,
Optional[int],
Optional[int],
Optional[int],
],
row,
)
)
(
event_id,
nft_address,
token_id,
from_address,
to_address,
transaction_hash,
value,
block_number,
timestamp,
) = cast(
Tuple[
str,
str,
str,
str,
str,
str,
Optional[int],
Optional[int],
Optional[int],
],
row,
)
event = NFTEvent(
event_id,
event_type, # Original argument to this function
nft_address,
token_id,
from_address,
to_address,
transaction_hash,
value,
block_number,
timestamp,
)
batch.append(event)
if len(batch) == batch_size:
if event_type == EventType.ERC721:
@ -281,3 +314,11 @@ def import_data(
insert_address_metadata(target_conn, cast(List[NFTMetadata], batch))
else:
insert_events(target_conn, cast(List[NFTEvent], batch))
target_cur.execute(CREATE_CHECKPOINT_TABLE_QUERY)
target_conn.commit()
source_offset = get_checkpoint_offset(source_conn, event_type)
if source_offset is not None:
delete_checkpoints(target_conn, event_type, commit=False)
insert_checkpoint(target_conn, event_type, source_offset)

Wyświetl plik

@ -41,5 +41,9 @@ setup(
"dev": ["black", "mypy", "types-requests"],
"distribute": ["setuptools", "twine", "wheel"],
},
entry_points={"console_scripts": []},
entry_points={
"console_scripts": [
"nfts=nfts.cli:main",
]
},
)