added qurying over created adds, and other fixes

pull/304/head
yhtiyar 2021-10-02 03:03:08 +03:00
rodzic cb2c65b0ed
commit da3db29e0a
3 zmienionych plików z 69 dodań i 61 usunięć

Wyświetl plik

@ -31,6 +31,7 @@ def nft_event(raw_event: str) -> EventType:
@dataclass
class NFTEvent:
event_id: str
event_type: EventType
nft_address: str
token_id: str

Wyświetl plik

@ -15,7 +15,7 @@ logger = logging.getLogger(__name__)
event_tables = {EventType.TRANSFER: "transfers", EventType.MINT: "mints"}
CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts
(
(
address TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
name TEXT,
symbol TEXT,
@ -26,8 +26,7 @@ CREATE_NFTS_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS nfts
CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint
(
event_type STRING,
offset INTEGER,
transaction_hash STRING
offset INTEGER
);
"""
@ -35,7 +34,8 @@ CREATE_CHECKPOINT_TABLE_QUERY = """CREATE TABLE IF NOT EXISTS checkpoint
def create_events_table_query(event_type: EventType) -> str:
creation_query = f"""
CREATE TABLE IF NOT EXISTS {event_tables[event_type]}
(
(
event_id TEXT NOT NULL UNIQUE ON CONFLICT FAIL,
transaction_hash TEXT,
block_number INTEGER,
nft_address TEXT REFERENCES nfts(address),
@ -68,7 +68,8 @@ def insert_events_query(event_type: EventType) -> str:
Generates a query which inserts NFT events into the appropriate events table.
"""
query = f"""
INSERT INTO {event_tables[event_type]}(
INSERT OR IGNORE INTO {event_tables[event_type]}(
event_id,
transaction_hash,
block_number,
nft_address,
@ -77,19 +78,20 @@ INSERT INTO {event_tables[event_type]}(
to_address,
transaction_value,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
return query
def nft_event_to_tuple(
event: NFTEvent,
) -> Tuple[str, str, str, str, str, str, str, str]:
) -> Tuple[str, str, str, str, str, str, str, str, str]:
"""
Converts an NFT event into a tuple for use with sqlite cursor executemany. This includes
dropping e.g. the event_type field.
"""
return (
str(event.event_id),
str(event.transaction_hash),
str(event.block_number),
str(event.nft_address),
@ -113,18 +115,15 @@ def get_checkpoint_offset(
return None
def insert_checkpoint(
conn: sqlite3.Connection, event_type: EventType, offset: int, transaction_hash: str
):
def insert_checkpoint(conn: sqlite3.Connection, event_type: EventType, offset: int):
query = f"""
INSERT INTO checkpoint (
event_type,
offset,
transaction_hash
) VALUES (?, ?, ?)
offset
) VALUES (?, ?)
"""
cur = conn.cursor()
cur.execute(query, [event_type.value, offset, transaction_hash])
cur.execute(query, [event_type.value, offset])
conn.commit()

Wyświetl plik

@ -152,15 +152,28 @@ def enrich_from_web3(
return admissible_events
def get_events(
def add_events(
datastore_conn: sqlite3.Connection,
db_session: Session,
event_type: EventType,
batch_loader: EthereumBatchloader,
initial_offset=0,
bounds: Optional[BlockBounds] = None,
initial_offset: int = 0,
batch_size: int = 1000,
) -> Iterator[NFTEvent]:
batch_size: int = 10,
) -> None:
raw_created_at_list = (
db_session.query(EthereumLabel.created_at)
.filter(EthereumLabel.label == event_type.value)
.order_by(EthereumLabel.created_at.asc())
.distinct(EthereumLabel.created_at)
).all()
created_at_list = [
created_at[0] for created_at in raw_created_at_list[initial_offset:]
]
query = (
db_session.query(
EthereumLabel.id,
EthereumLabel.label,
EthereumAddress.address,
EthereumLabel.label_data,
@ -181,8 +194,6 @@ def get_events(
)
.order_by(
EthereumLabel.created_at.asc(),
EthereumLabel.transaction_hash.asc(),
EthereumLabel.address_id.asc(),
)
)
if bounds is not None:
@ -192,13 +203,23 @@ def get_events(
bounds_filters = [EthereumTransaction.hash == None, and_(*time_filters)]
query = query.filter(or_(*bounds_filters))
offset = initial_offset
while True:
events = query.offset(offset).limit(batch_size).all()
pbar = tqdm(total=(len(raw_created_at_list)))
pbar.set_description(f"Processing created ats")
pbar.update(initial_offset)
batch_start = 0
batch_end = batch_start + batch_size
while batch_start <= len(created_at_list):
events = query.filter(
EthereumLabel.created_at.in_(created_at_list[batch_start : batch_end + 1])
).all()
if not events:
break
offset += batch_size
continue
raw_events_batch = []
for (
event_id,
label,
address,
label_data,
@ -208,6 +229,7 @@ def get_events(
timestamp,
) in events:
raw_event = NFTEvent(
event_id=event_id,
event_type=event_types[label],
nft_address=address,
token_id=label_data["tokenId"],
@ -218,7 +240,16 @@ def get_events(
block_number=block_number,
timestamp=timestamp,
)
yield raw_event
raw_events_batch.append(raw_event)
logger.info(f"Adding {len(raw_events_batch)} to database")
insert_events(
datastore_conn, enrich_from_web3(raw_events_batch, batch_loader, bounds)
)
insert_checkpoint(datastore_conn, event_type, batch_end + initial_offset)
pbar.update(batch_end - batch_start + 1)
batch_start = batch_end + 1
batch_end = min(batch_end + batch_size, len(created_at_list))
def create_dataset(
@ -227,7 +258,7 @@ def create_dataset(
event_type: EventType,
batch_loader: EthereumBatchloader,
bounds: Optional[BlockBounds] = None,
batch_size: int = 1000,
batch_size: int = 10,
) -> None:
"""
Creates Moonstream NFTs dataset in the given SQLite datastore.
@ -241,39 +272,16 @@ def create_dataset(
if event_type == EventType.ERC721:
add_contracts_metadata(datastore_conn, db_session, offset, batch_size)
return
raw_events = get_events(db_session, event_type, bounds, offset, batch_size)
raw_events_batch: List[NFTEvent] = []
for event in tqdm(raw_events, desc="Events processed", colour="#DD6E0F"):
raw_events_batch.append(event)
if len(raw_events_batch) == batch_size:
logger.info("Writing batch of events to datastore")
insert_events(
datastore_conn,
enrich_from_web3(raw_events_batch, batch_loader, bounds),
)
offset += batch_size
insert_checkpoint(
datastore_conn,
event_type,
offset,
event.transaction_hash,
)
raw_events_batch = []
logger.info("Writing remaining events to datastore")
insert_events(
datastore_conn, enrich_from_web3(raw_events_batch, batch_loader, bounds)
)
offset += len(raw_events_batch)
insert_checkpoint(
datastore_conn,
event_type,
offset,
raw_events_batch[-1].transaction_hash,
)
else:
add_events(
datastore_conn,
db_session,
event_type,
batch_loader,
offset,
bounds,
batch_size,
)
def add_contracts_metadata(
@ -307,7 +315,7 @@ def add_contracts_metadata(
)
)
insert_address_metadata(datastore_conn, events_batch)
insert_checkpoint(datastore_conn, EventType.ERC721, offset, "ERC721 checkpoint")
insert_checkpoint(datastore_conn, EventType.ERC721, offset)
logger.info(f"Already added {offset}")
logger.info(f"Added total of {offset-initial_offset} nfts metadata")