kopia lustrzana https://github.com/bugout-dev/moonstream
added enrich command
rodzic
f8fb803fd4
commit
c543ab92f5
|
@ -7,10 +7,11 @@ from typing import Optional
|
||||||
|
|
||||||
from moonstreamdb.db import yield_db_session_ctx
|
from moonstreamdb.db import yield_db_session_ctx
|
||||||
|
|
||||||
from .data import event_types, nft_event, BlockBounds
|
from .data import EventType, event_types, nft_event, BlockBounds
|
||||||
from .datastore import setup_database, import_data
|
from .datastore import setup_database, import_data
|
||||||
from .derive import current_owners
|
from .derive import current_owners
|
||||||
from .materialize import create_dataset, EthereumBatchloader
|
from .enrich import EthereumBatchloader, enrich
|
||||||
|
from .materialize import create_dataset
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
@ -50,12 +51,33 @@ def handle_materialize(args: argparse.Namespace) -> None:
|
||||||
moonstream_datastore,
|
moonstream_datastore,
|
||||||
db_session,
|
db_session,
|
||||||
event_type,
|
event_type,
|
||||||
batch_loader,
|
|
||||||
bounds,
|
bounds,
|
||||||
args.batch_size,
|
args.batch_size,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def handle_enrich(args: argparse.Namespace) -> None:
|
||||||
|
|
||||||
|
batch_loader = EthereumBatchloader(jsonrpc_url=args.jsonrpc)
|
||||||
|
|
||||||
|
logger.info(f"Enriching NFT events in datastore: {args.datastore}")
|
||||||
|
|
||||||
|
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
|
||||||
|
enrich(
|
||||||
|
moonstream_datastore,
|
||||||
|
EventType.TRANSFER,
|
||||||
|
batch_loader,
|
||||||
|
args.batch_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
enrich(
|
||||||
|
moonstream_datastore,
|
||||||
|
EventType.MINT,
|
||||||
|
batch_loader,
|
||||||
|
args.batch_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def handle_derive(args: argparse.Namespace) -> None:
|
def handle_derive(args: argparse.Namespace) -> None:
|
||||||
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
|
with contextlib.closing(sqlite3.connect(args.datastore)) as moonstream_datastore:
|
||||||
results = current_owners(moonstream_datastore)
|
results = current_owners(moonstream_datastore)
|
||||||
|
@ -166,6 +188,30 @@ def main() -> None:
|
||||||
)
|
)
|
||||||
parser_import_data.set_defaults(func=handle_import_data)
|
parser_import_data.set_defaults(func=handle_import_data)
|
||||||
|
|
||||||
|
parser_enrich = subcommands.add_parser(
|
||||||
|
"enrich", description="enrich dataset from geth node"
|
||||||
|
)
|
||||||
|
parser_enrich.add_argument(
|
||||||
|
"-d",
|
||||||
|
"--datastore",
|
||||||
|
required=True,
|
||||||
|
help="Path to SQLite database representing the dataset",
|
||||||
|
)
|
||||||
|
parser_enrich.add_argument(
|
||||||
|
"--jsonrpc",
|
||||||
|
default=default_web3_provider,
|
||||||
|
type=str,
|
||||||
|
help=f"Http uri provider to use when collecting data directly from the Ethereum blockchain (default: {default_web3_provider})",
|
||||||
|
)
|
||||||
|
parser_enrich.add_argument(
|
||||||
|
"-n",
|
||||||
|
"--batch-size",
|
||||||
|
type=int,
|
||||||
|
default=1000,
|
||||||
|
help="Number of events to process per batch",
|
||||||
|
)
|
||||||
|
parser_enrich.set_defaults(func=handle_enrich)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
args.func(args)
|
args.func(args)
|
||||||
|
|
||||||
|
|
|
@ -83,6 +83,116 @@ FROM {event_tables[event_type]};
|
||||||
return selection_query
|
return selection_query
|
||||||
|
|
||||||
|
|
||||||
|
def get_events_for_enrich(
|
||||||
|
conn: sqlite3.Connection, event_type: EventType
|
||||||
|
) -> List[NFTEvent]:
|
||||||
|
def select_query(event_type: EventType) -> str:
|
||||||
|
selection_query = f"""
|
||||||
|
SELECT
|
||||||
|
event_id,
|
||||||
|
transaction_hash,
|
||||||
|
block_number,
|
||||||
|
nft_address,
|
||||||
|
token_id,
|
||||||
|
from_address,
|
||||||
|
to_address,
|
||||||
|
transaction_value,
|
||||||
|
timestamp
|
||||||
|
FROM {event_tables[event_type]} WHERE block_number = 'None';
|
||||||
|
"""
|
||||||
|
|
||||||
|
return selection_query
|
||||||
|
|
||||||
|
logger.info(f"Loading {event_tables[event_type]} table events for enrich")
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(select_query(event_type))
|
||||||
|
|
||||||
|
events: List[NFTEvent] = []
|
||||||
|
|
||||||
|
for row in cur:
|
||||||
|
(
|
||||||
|
event_id,
|
||||||
|
transaction_hash,
|
||||||
|
block_number,
|
||||||
|
nft_address,
|
||||||
|
token_id,
|
||||||
|
from_address,
|
||||||
|
to_address,
|
||||||
|
value,
|
||||||
|
timestamp,
|
||||||
|
) = cast(
|
||||||
|
Tuple[
|
||||||
|
str,
|
||||||
|
str,
|
||||||
|
Optional[int],
|
||||||
|
str,
|
||||||
|
str,
|
||||||
|
str,
|
||||||
|
str,
|
||||||
|
Optional[int],
|
||||||
|
Optional[int],
|
||||||
|
],
|
||||||
|
row,
|
||||||
|
)
|
||||||
|
event = NFTEvent(
|
||||||
|
event_id=event_id,
|
||||||
|
event_type=event_type, # Original argument to this function
|
||||||
|
nft_address=nft_address,
|
||||||
|
token_id=token_id,
|
||||||
|
from_address=from_address,
|
||||||
|
to_address=to_address,
|
||||||
|
transaction_hash=transaction_hash,
|
||||||
|
value=value,
|
||||||
|
block_number=block_number,
|
||||||
|
timestamp=timestamp,
|
||||||
|
)
|
||||||
|
events.append(event)
|
||||||
|
logger.info(f"Found {len(events)} events to enrich")
|
||||||
|
return events
|
||||||
|
|
||||||
|
|
||||||
|
def update_events_batch(conn: sqlite3.Connection, events: List[NFTEvent]) -> None:
|
||||||
|
def replace_query(event_type: EventType) -> str:
|
||||||
|
query = f"""
|
||||||
|
REPLACE INTO {event_tables[event_type]}(
|
||||||
|
event_id,
|
||||||
|
transaction_hash,
|
||||||
|
block_number,
|
||||||
|
nft_address,
|
||||||
|
token_id,
|
||||||
|
from_address,
|
||||||
|
to_address,
|
||||||
|
transaction_value,
|
||||||
|
timestamp
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
"""
|
||||||
|
return query
|
||||||
|
|
||||||
|
logger.info("Updating events in sqlite")
|
||||||
|
cur = conn.cursor()
|
||||||
|
try:
|
||||||
|
transfers = [
|
||||||
|
nft_event_to_tuple(event)
|
||||||
|
for event in events
|
||||||
|
if event.event_type == EventType.TRANSFER
|
||||||
|
]
|
||||||
|
|
||||||
|
mints = [
|
||||||
|
nft_event_to_tuple(event)
|
||||||
|
for event in events
|
||||||
|
if event.event_type == EventType.MINT
|
||||||
|
]
|
||||||
|
|
||||||
|
cur.executemany(replace_query(EventType.TRANSFER), transfers)
|
||||||
|
cur.executemany(replace_query(EventType.MINT), mints)
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"FAILED TO replace!!! :{events}")
|
||||||
|
conn.rollback()
|
||||||
|
raise e
|
||||||
|
|
||||||
|
|
||||||
def setup_database(conn: sqlite3.Connection) -> None:
|
def setup_database(conn: sqlite3.Connection) -> None:
|
||||||
"""
|
"""
|
||||||
Sets up the schema of the Moonstream NFTs dataset in the given SQLite database.
|
Sets up the schema of the Moonstream NFTs dataset in the given SQLite database.
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
from dataclasses import is_dataclass
|
|
||||||
import logging
|
import logging
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from typing import Any, cast, Iterator, List, Optional, Set
|
from typing import Any, cast, Iterator, List, Optional, Set
|
||||||
|
@ -28,135 +27,10 @@ logging.basicConfig(level=logging.INFO)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class EthereumBatchloader:
|
|
||||||
def __init__(self, jsonrpc_url) -> None:
|
|
||||||
self.jsonrpc_url = jsonrpc_url
|
|
||||||
self.message_number = 0
|
|
||||||
self.commands: List[Any] = []
|
|
||||||
self.requests_banch: List[Any] = []
|
|
||||||
|
|
||||||
def load_blocks(self, block_list: List[int], with_transactions: bool):
|
|
||||||
"""
|
|
||||||
Request list of blocks
|
|
||||||
"""
|
|
||||||
rpc = [
|
|
||||||
{
|
|
||||||
"jsonrpc": "2.0",
|
|
||||||
"id": index,
|
|
||||||
"method": "eth_getBlockByNumber",
|
|
||||||
"params": params_single,
|
|
||||||
}
|
|
||||||
for index, params_single in enumerate(
|
|
||||||
[[hex(block_number), with_transactions] for block_number in block_list]
|
|
||||||
)
|
|
||||||
]
|
|
||||||
response = self.send_json_message(rpc)
|
|
||||||
return response
|
|
||||||
|
|
||||||
def load_transactions(self, transaction_hashes: List[str]):
|
|
||||||
"""
|
|
||||||
Request list of transactions
|
|
||||||
"""
|
|
||||||
|
|
||||||
rpc = [
|
|
||||||
{
|
|
||||||
"jsonrpc": "2.0",
|
|
||||||
"method": "eth_getTransactionByHash",
|
|
||||||
"id": index,
|
|
||||||
"params": [tx_hash],
|
|
||||||
}
|
|
||||||
for index, tx_hash in enumerate(transaction_hashes)
|
|
||||||
]
|
|
||||||
response = self.send_json_message(rpc)
|
|
||||||
return response
|
|
||||||
|
|
||||||
def send_message(self, payload):
|
|
||||||
headers = {"Content-Type": "application/json"}
|
|
||||||
|
|
||||||
try:
|
|
||||||
r = requests.post(
|
|
||||||
self.jsonrpc_url, headers=headers, data=payload, timeout=300
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
return r
|
|
||||||
|
|
||||||
def send_json_message(self, message):
|
|
||||||
encoded_json = json.dumps(message)
|
|
||||||
raw_response = self.send_message(encoded_json.encode("utf8"))
|
|
||||||
response = raw_response.json()
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
def enrich_from_web3(
|
|
||||||
nft_events: List[NFTEvent],
|
|
||||||
batch_loader: EthereumBatchloader,
|
|
||||||
bounds: Optional[BlockBounds] = None,
|
|
||||||
) -> List[NFTEvent]:
|
|
||||||
"""
|
|
||||||
Adds block number, value, timestamp from web3 if they are None (because that transaction is missing in db)
|
|
||||||
"""
|
|
||||||
transactions_to_query = set()
|
|
||||||
indices_to_update: List[int] = []
|
|
||||||
for index, nft_event in enumerate(nft_events):
|
|
||||||
if (
|
|
||||||
nft_event.block_number is None
|
|
||||||
or nft_event.value is None
|
|
||||||
or nft_event.timestamp is None
|
|
||||||
):
|
|
||||||
transactions_to_query.add(nft_event.transaction_hash)
|
|
||||||
indices_to_update.append(index)
|
|
||||||
|
|
||||||
if len(transactions_to_query) == 0:
|
|
||||||
return nft_events
|
|
||||||
logger.info("Calling JSON RPC API")
|
|
||||||
jsonrpc_transactions_response = batch_loader.load_transactions(
|
|
||||||
list(transactions_to_query)
|
|
||||||
)
|
|
||||||
transactions_map = {
|
|
||||||
result["result"]["hash"]: (
|
|
||||||
int(result["result"]["value"], 16),
|
|
||||||
int(result["result"]["blockNumber"], 16),
|
|
||||||
)
|
|
||||||
for result in jsonrpc_transactions_response
|
|
||||||
}
|
|
||||||
|
|
||||||
blocks_to_query: Set[int] = set()
|
|
||||||
for index in indices_to_update:
|
|
||||||
nft_events[index].value, nft_events[index].block_number = transactions_map[
|
|
||||||
nft_events[index].transaction_hash
|
|
||||||
]
|
|
||||||
blocks_to_query.add(cast(int, nft_events[index].block_number))
|
|
||||||
|
|
||||||
if len(blocks_to_query) == 0:
|
|
||||||
return nft_events
|
|
||||||
jsonrpc_blocks_response = batch_loader.load_blocks(list(blocks_to_query), False)
|
|
||||||
blocks_map = {
|
|
||||||
int(result["result"]["number"], 16): int(result["result"]["timestamp"], 16)
|
|
||||||
for result in jsonrpc_blocks_response
|
|
||||||
}
|
|
||||||
for index in indices_to_update:
|
|
||||||
nft_events[index].timestamp = blocks_map[cast(int, nft_event.block_number)]
|
|
||||||
|
|
||||||
def check_bounds(event: NFTEvent) -> bool:
|
|
||||||
if bounds is None:
|
|
||||||
return True
|
|
||||||
is_admissible = True
|
|
||||||
if event.block_number < bounds.starting_block:
|
|
||||||
is_admissible = False
|
|
||||||
if bounds.ending_block is not None and event.block_number > bounds.ending_block:
|
|
||||||
is_admissible = False
|
|
||||||
return is_admissible
|
|
||||||
|
|
||||||
admissible_events = [event for event in nft_events if check_bounds(event)]
|
|
||||||
return admissible_events
|
|
||||||
|
|
||||||
|
|
||||||
def add_events(
|
def add_events(
|
||||||
datastore_conn: sqlite3.Connection,
|
datastore_conn: sqlite3.Connection,
|
||||||
db_session: Session,
|
db_session: Session,
|
||||||
event_type: EventType,
|
event_type: EventType,
|
||||||
batch_loader: EthereumBatchloader,
|
|
||||||
initial_offset=0,
|
initial_offset=0,
|
||||||
bounds: Optional[BlockBounds] = None,
|
bounds: Optional[BlockBounds] = None,
|
||||||
batch_size: int = 10,
|
batch_size: int = 10,
|
||||||
|
@ -256,7 +130,6 @@ def create_dataset(
|
||||||
datastore_conn: sqlite3.Connection,
|
datastore_conn: sqlite3.Connection,
|
||||||
db_session: Session,
|
db_session: Session,
|
||||||
event_type: EventType,
|
event_type: EventType,
|
||||||
batch_loader: EthereumBatchloader,
|
|
||||||
bounds: Optional[BlockBounds] = None,
|
bounds: Optional[BlockBounds] = None,
|
||||||
batch_size: int = 10,
|
batch_size: int = 10,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -277,7 +150,6 @@ def create_dataset(
|
||||||
datastore_conn,
|
datastore_conn,
|
||||||
db_session,
|
db_session,
|
||||||
event_type,
|
event_type,
|
||||||
batch_loader,
|
|
||||||
offset,
|
offset,
|
||||||
bounds,
|
bounds,
|
||||||
batch_size,
|
batch_size,
|
||||||
|
|
Ładowanie…
Reference in New Issue