removed unique constraint

pull/304/head
yhtiyar 2021-09-29 20:47:36 +03:00
rodzic fe4fa6c6fd
commit c0d3f23500
2 zmienionych plików z 29 dodań i 15 usunięć

Wyświetl plik

@ -42,8 +42,7 @@ CREATE TABLE {event_tables[event_type]}
from_address TEXT,
to_address TEXT,
transaction_value INTEGER,
timestamp INTEGER,
UNIQUE (transaction_hash, nft_address, from_address, to_address, token_id)
timestamp INTEGER
);
"""
return creation_query

Wyświetl plik

@ -9,13 +9,13 @@ from moonstreamdb.models import (
EthereumTransaction,
EthereumBlock,
)
from sqlalchemy import or_
from sqlalchemy import or_, and_
from sqlalchemy.orm import Session
from tqdm import tqdm
from web3 import Web3
import requests
from .data import BlockBounds, EventType, NFTEvent, event_types, nft_event
from .data import BlockBounds, EventType, NFTEvent, event_types
from .datastore import get_checkpoint_offset, insert_checkpoint, insert_events
logging.basicConfig(level=logging.INFO)
@ -102,6 +102,7 @@ def enrich_from_web3(
if len(transactions_to_query) == 0:
return nft_events
logger.info("Calling jrpc")
jrpc_response = batch_loader.load_transactions(list(transactions_to_query))
breakpoint()
transactions_map = {
@ -158,17 +159,18 @@ def get_events(
EthereumTransaction.block_number == EthereumBlock.block_number,
)
.filter(EthereumLabel.label == event_type.value)
.order_by(EthereumLabel.created_at)
.order_by(
EthereumLabel.created_at.asc(),
EthereumLabel.transaction_hash.asc(),
EthereumLabel.address_id.asc(),
)
)
if bounds is not None:
bounds_filters = [
EthereumTransaction.hash == None,
EthereumTransaction.block_number >= bounds.starting_block,
]
time_filters = [EthereumTransaction.block_number >= bounds.starting_block]
if bounds.ending_block is not None:
bounds_filters.append(
EthereumTransaction.block_number <= bounds.ending_block
)
time_filters.append(EthereumTransaction.block_number <= bounds.ending_block)
bounds_filters = [EthereumTransaction.hash == None, and_(*time_filters)]
query = query.filter(or_(*bounds_filters))
offset = initial_offset
while True:
@ -218,7 +220,7 @@ def create_dataset(
offset = 0
logger.info(f"Did not found any checkpoint for {event_type.value}")
raw_events = get_events(db_session, event_type, bounds, offset, batch_size)
raw_events = get_events(db_session, event_type, None, offset, batch_size)
raw_events_batch: List[NFTEvent] = []
for event in tqdm(raw_events, desc="Events processed", colour="#DD6E0F"):
@ -226,14 +228,27 @@ def create_dataset(
if len(raw_events_batch) == batch_size:
logger.info("Writing batch of events to datastore")
insert_events(datastore_conn, raw_events_batch)
insert_events(
datastore_conn,
enrich_from_web3(web3_client, raw_events_batch, batch_loader),
)
offset += batch_size
insert_checkpoint(
datastore_conn, event_type, offset, event.transaction_hash
datastore_conn,
event_type,
offset,
event.transaction_hash,
)
raw_events_batch = []
logger.info("Writing remaining events to datastore")
insert_events(
datastore_conn, enrich_from_web3(web3_client, raw_events_batch, batch_loader)
)
offset += len(raw_events_batch)
insert_checkpoint(
datastore_conn,
event_type,
offset,
raw_events_batch[-1].transaction_hash,
)