pull/313/head
Andrey Dolgolev 2021-10-19 23:02:10 +03:00
rodzic cfaa72e043
commit 2f4c995687
3 zmienionych plików z 294 dodań i 184 usunięć

Wyświetl plik

@ -260,46 +260,9 @@ def get_block_bounds(
return start, end
def ensure_addresses(db_session: Session, addresses: Set[str]) -> Dict[str, int]:
"""
Ensures that the given addresses are registered in the ethereum_addresses table of the given
moonstreamdb database connection. Returns a mapping from the addresses to the ids of their
corresponding row in the ethereum_addresses table.
Returns address_ids for *every* address, not just the new ones.
"""
if len(addresses) == 0:
return {}
# SQLAlchemy reference:
# https://docs.sqlalchemy.org/en/14/orm/persistence_techniques.html#using-postgresql-on-conflict-with-returning-to-return-upserted-orm-objects
stmt = (
insert(EthereumAddress)
.values([{"address": address} for address in addresses])
.on_conflict_do_nothing(index_elements=[EthereumAddress.address])
)
try:
db_session.execute(stmt)
db_session.commit()
except Exception:
db_session.rollback()
raise
rows = (
db_session.query(EthereumAddress)
.filter(EthereumAddress.address.in_(addresses))
.all()
)
address_ids = {address.address: address.id for address in rows}
return address_ids
def label_erc721_addresses(
w3: Web3, db_session: Session, address_ids: List[Tuple[str, int]]
) -> None:
def label_erc721_addresses(w3: Web3, db_session: Session, addresses: List[str]) -> None:
labels: List[EthereumLabel] = []
for address, id in address_ids:
for address in addresses:
try:
contract_info = get_erc721_contract_info(w3, address)
@ -319,7 +282,7 @@ def label_erc721_addresses(
labels.append(
EthereumLabel(
address_id=id,
address=address,
label=NFT_LABEL,
label_data={
"name": contract_name,
@ -342,7 +305,7 @@ def label_erc721_addresses(
def label_key(label: EthereumLabel) -> Tuple[str, int, int, str, str]:
return (
label.transaction_hash,
label.address_id,
label.address,
label.label_data["tokenId"],
label.label_data["from"],
label.label_data["to"],
@ -350,7 +313,7 @@ def label_key(label: EthereumLabel) -> Tuple[str, int, int, str, str]:
def label_transfers(
db_session: Session, transfers: List[NFTTransfer], address_ids: Dict[str, int]
db_session: Session, transfers: List[NFTTransfer], addresses: Set[str]
) -> None:
"""
Adds "nft_mint" or "nft_transfer" to the (transaction, address) pair represented by each of the
@ -361,10 +324,9 @@ def label_transfers(
for transfer in transfers:
transaction_hash = transfer.transfer_tx
transaction_hashes.append(transaction_hash)
address_id = address_ids.get(transfer.contract_address)
label = MINT_LABEL if transfer.is_mint else TRANSFER_LABEL
row = EthereumLabel(
address_id=address_id,
address=transfer.contract_address,
transaction_hash=transaction_hash,
label=label,
label_data={
@ -377,7 +339,7 @@ def label_transfers(
existing_labels = (
db_session.query(EthereumLabel)
.filter(EthereumLabel.address_id.in_(address_ids.values()))
.filter(EthereumLabel.address.in_(addresses))
.filter(EthereumLabel.transaction_hash.in_(transaction_hashes))
).all()
existing_label_keys = {label_key(label) for label in existing_labels}
@ -471,29 +433,23 @@ def add_labels(
contract_address=contract_address,
)
contract_addresses = {transfer.contract_address for transfer in job}
updated_address_ids = ensure_addresses(db_session, contract_addresses)
address_ids: Dict[str, int] = {}
for address, address_id in updated_address_ids.items():
address_ids[address] = address_id
labelled_address_ids = [
label.address_id
labelled_address = [
label.address
for label in (
db_session.query(EthereumLabel)
.filter(EthereumLabel.label == NFT_LABEL)
.filter(EthereumLabel.address_id.in_(address_ids.values()))
.filter(EthereumLabel.address.in_(contract_addresses))
.all()
)
]
unlabelled_address_ids = [
(address, address_id)
for address, address_id in address_ids.items()
if address_id not in labelled_address_ids
unlabelled_address = [
address for address in contract_addresses if address not in labelled_address
]
# Add 'erc721' labels
try:
label_erc721_addresses(w3, db_session, unlabelled_address_ids)
label_erc721_addresses(w3, db_session, unlabelled_address)
except Exception as e:
reporter.error_report(
e,
@ -507,7 +463,7 @@ def add_labels(
# Add mint/transfer labels to (transaction, contract_address) pairs
try:
label_transfers(db_session, job, updated_address_ids)
label_transfers(db_session, job, contract_addresses)
except Exception as e:
reporter.error_report(
e,
@ -555,7 +511,7 @@ def block_bounded_summary(
db_session.query(
EthereumLabel.label,
EthereumLabel.label_data,
EthereumLabel.address_id,
EthereumLabel.address,
EthereumTransaction.hash,
EthereumTransaction.value,
EthereumBlock.block_number,
@ -580,7 +536,7 @@ def block_bounded_summary(
def holder_query(label: str) -> Query:
query = (
db_session.query(
EthereumLabel.address_id.label("address_id"),
EthereumLabel.address.label("address"),
EthereumLabel.label_data["to"].astext.label("owner_address"),
EthereumLabel.label_data["tokenId"].astext.label("token_id"),
EthereumTransaction.block_number.label("block_number"),
@ -602,10 +558,10 @@ def block_bounded_summary(
# does not seem to be deterministic.
# Maybe relevant Stackoverflow post: https://stackoverflow.com/a/59410440
text(
"address_id, token_id, block_number desc, transaction_index desc, transfer_value, owner_address"
"address, token_id, block_number desc, transaction_index desc, transfer_value, owner_address"
)
)
.distinct("address_id", "token_id")
.distinct("address", "token_id")
)
return query

Wyświetl plik

@ -0,0 +1,264 @@
"""Drop etherium_addresses and address_id column
Revision ID: 240476c67b9f
Revises: f1e8cf50a3ff
Create Date: 2021-10-19 14:49:07.905565
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "240476c67b9f"
down_revision = "f1e8cf50a3ff"
branch_labels = None
depends_on = None
def upgrade():
op.execute(
"""
LOCK TABLE ethereum_labels IN ACCESS EXCLUSIVE MODE;
INSERT INTO
ethereum_labels_v2 (
id,
label,
label_data,
created_at,
transaction_hash,
address
)
SELECT
ethereum_labels.id as id,
ethereum_labels.label as label,
ethereum_labels.label_data as label_data,
ethereum_labels.created_at as created_at,
ethereum_labels.transaction_hash as transaction_hash,
ethereum_addresses.address as address
FROM
(
SELECT
Row_Number() over (
order by
id
) AS RowIndex,
*
from
ethereum_labels
) AS ethereum_labels
INNER JOIN ethereum_addresses ON RowIndex > (select count(*) from ethereum_labels_v2 ) and ethereum_labels.address_id = ethereum_addresses.id;
"""
)
op.execute(
"ALTER TABLE IF EXISTS ethereum_labels DROP CONSTRAINT IF EXISTS fk_ethereum_labels_address_id_ethereum_addresses;"
)
op.execute(
"ALTER TABLE IF EXISTS ethereum_addresses DROP CONSTRAINT IF EXISTS fk_ethereum_smart_contracts_transaction_hash_ethereum_t_f928;"
)
op.execute(
"""
/* Rename tabels */
ALTER TABLE
ethereum_labels RENAME TO ethereum_labels_v1;
ALTER TABLE
ethereum_labels_v2 RENAME TO ethereum_labels;
"""
)
op.execute(
"""
ALTER INDEX pk_ethereum_labels RENAME TO pk_ethereum_labels_v1;
ALTER INDEX idx_ethereum_labels_opensea_nft_name RENAME TO idx_ethereum_labels_opensea_nft_name_v1;
ALTER INDEX ix_ethereum_labels_label RENAME TO ix_ethereum_labels_label_v1;
ALTER INDEX ix_ethereum_labels_transaction_hash RENAME TO ix_ethereum_labels_transaction_hash_v1;
ALTER INDEX uq_ethereum_labels_id RENAME TO uq_ethereum_labels_id_v1;
"""
)
op.execute(
"""
ALTER TABLE
ONLY public.ethereum_labels
ADD
CONSTRAINT pk_ethereum_labels PRIMARY KEY (id);
ALTER TABLE
ONLY public.ethereum_labels
ADD
CONSTRAINT uq_ethereum_labels_id UNIQUE (id);
/* Create indexes must be unique cross database */
CREATE INDEX idx_ethereum_labels_opensea_nft_name ON public.ethereum_labels USING btree (((label_data ->> 'name' :: text)))
WHERE
((label) :: text = 'opensea_nft' :: text);
CREATE INDEX ix_ethereum_labels_address ON public.ethereum_labels USING btree (address);
CREATE INDEX ix_ethereum_labels_block_number ON public.ethereum_labels USING btree (block_number);
CREATE INDEX ix_ethereum_labels_label ON public.ethereum_labels USING btree (label);
CREATE INDEX ix_ethereum_labels_transaction_hash ON public.ethereum_labels USING btree (transaction_hash);
CREATE INDEX ix_ethereum_labels_block_timestamp ON public.ethereum_labels USING btree (block_timestamp);
"""
)
op.execute(
"""
DROP TABLE ethereum_addresses;
"""
)
def downgrade():
op.execute(
"""
CREATE TABLE public.ethereum_addresses (
id integer NOT NULL,
transaction_hash character varying(256),
address character varying(256) NOT NULL,
created_at timestamp with time zone DEFAULT timezone('utc'::text, statement_timestamp()) NOT NULL
);
ALTER TABLE public.ethereum_addresses OWNER TO postgres;
CREATE UNIQUE INDEX ix_ethereum_addresses_address ON public.ethereum_addresses USING btree (address);
CREATE INDEX ix_ethereum_addresses_transaction_hash ON public.ethereum_addresses USING btree (transaction_hash);
"""
)
# sequence creation
op.execute(
"""
INSERT INTO
ethereum_addresses (
id,
address
)
SELECT
distinct(ethereum_labels.address_id) as id,
ethereum_labels.address as address
FROM
ethereum_labels
where address_id IS NOT NULL
order by id;
"""
)
conn = op.get_bind()
latest_id = conn.execute(
"select MAX(address_id) + 1 from ethereum_labels"
).fetchall()
if latest_id:
max_id = latest_id[0][0]
else:
max_id = 1
op.execute(
f"CREATE SEQUENCE public.ethereum_smart_contracts_id_seq INCREMENT BY 1 START WITH {max_id} NO MINVALUE NO MAXVALUE CACHE 1"
)
# id column settings
op.execute(
"""
ALTER TABLE public.ethereum_smart_contracts_id_seq OWNER TO postgres;
ALTER SEQUENCE public.ethereum_smart_contracts_id_seq OWNED BY public.ethereum_addresses.id;
ALTER TABLE ONLY public.ethereum_addresses ALTER COLUMN id SET DEFAULT nextval('public.ethereum_smart_contracts_id_seq'::regclass);
ALTER TABLE ONLY public.ethereum_addresses ADD CONSTRAINT pk_ethereum_smart_contracts PRIMARY KEY (id);
"""
)
op.execute(
"""
INSERT INTO
ethereum_addresses (
address
)
select result.address from (
SELECT
ethereum_labels.address as address
FROM
ethereum_labels
where address_id IS NULL
EXCEPT
SELECT
ethereum_labels.address as address
FROM
ethereum_labels
where address_id IS NOT NULL
) AS result
"""
)
op.execute(
"ALTER TABLE IF EXISTS ethereum_labels DROP CONSTRAINT IF EXISTS pk_ethereum_labels;"
)
op.execute(
"ALTER TABLE IF EXISTS ethereum_labels DROP CONSTRAINT IF EXISTS uq_ethereum_labels_id;"
)
op.execute(
"""
DROP INDEX IF EXISTS pk_ethereum_labels;
DROP INDEX IF EXISTS idx_ethereum_labels_opensea_nft_name;
DROP INDEX IF EXISTS ix_ethereum_labels_address;
DROP INDEX IF EXISTS ix_ethereum_labels_label;
DROP INDEX IF EXISTS ix_ethereum_labels_transaction_hash;
DROP INDEX IF EXISTS ix_ethereum_labels_block_number;
DROP INDEX IF EXISTS ix_ethereum_labels_block_timestamp;
DROP INDEX IF EXISTS uq_ethereum_labels_id;
"""
)
op.execute(
"""
ALTER TABLE ethereum_labels RENAME TO ethereum_labels_v2;
ALTER TABLE ethereum_labels_v1 RENAME TO ethereum_labels;
"""
)
op.execute(
"""
ALTER INDEX pk_ethereum_labels_v1 RENAME TO pk_ethereum_labels;
ALTER INDEX idx_ethereum_labels_opensea_nft_name_v1 RENAME TO idx_ethereum_labels_opensea_nft_name;
ALTER INDEX ix_ethereum_labels_label_v1 RENAME TO ix_ethereum_labels_label;
ALTER INDEX ix_ethereum_labels_transaction_hash_v1 RENAME TO ix_ethereum_labels_transaction_hash;
ALTER INDEX uq_ethereum_labels_id_v1 RENAME TO uq_ethereum_labels_id;
"""
)
op.create_foreign_key(
"fk_ethereum_labels_address_id_ethereum_addresses",
"ethereum_labels",
"ethereum_addresses",
["address_id"],
["id"],
ondelete="CASCADE",
)

Wyświetl plik

@ -19,17 +19,17 @@ depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
# via
###### insert into table
op.execute(
"""CREATE TABLE ethereum_labels_new_V2 AS
"""CREATE TABLE ethereum_labels_v2 AS
SELECT
ethereum_labels.id as id,
ethereum_labels.label as label,
ethereum_labels.label_data as label_data,
ethereum_labels.created_at as created_at,
ethereum_labels.transaction_hash as transaction_hash,
ethereum_addresses.id as address_id,
ethereum_addresses.address as address
FROM
ethereum_labels
@ -37,7 +37,7 @@ def upgrade():
/* Set correct columns metadata columns TYPE | Nullable | Default */
ALTER TABLE
ethereum_labels_new_V2
ethereum_labels_v2
ALTER COLUMN
id
SET
@ -63,134 +63,24 @@ def upgrade():
/* ADD new columns */
ALTER TABLE
ethereum_labels_new_V2
ethereum_labels_v2
ADD
log_index JSONB NULL,
log_index INTEGER,
ADD
block_number bigint NULl,
ADD
transaction_timestamp bigint NULL;
/* Create constraine must be unique cross database */
ALTER TABLE
ONLY public.ethereum_labels_new_V2
ADD
CONSTRAINT pk_ethereum_labels_V2 PRIMARY KEY (id);
ALTER TABLE
ONLY public.ethereum_labels_new_V2
ADD
CONSTRAINT uq_ethereum_labels_id_V2 UNIQUE (id);
/* Create indexes must be unique cross database */
CREATE INDEX idx_ethereum_labels_opensea_nft_name_V2 ON public.ethereum_labels_new_V2 USING btree (((label_data ->> 'name' :: text)))
WHERE
((label) :: text = 'opensea_nft' :: text);
CREATE INDEX ix_ethereum_labels_address_V2 ON public.ethereum_labels_new_V2 USING btree (address);
CREATE INDEX ix_ethereum_labels_block_number_V2 ON public.ethereum_labels_new_V2 USING btree (block_number);
CREATE INDEX ix_ethereum_labels_label_V2 ON public.ethereum_labels_new_V2 USING btree (label);
CREATE INDEX ix_ethereum_labels_transaction_hash_V2 ON public.ethereum_labels_new_V2 USING btree (transaction_hash);
CREATE INDEX ix_ethereum_labels_transaction_timestamp_V2 ON public.ethereum_labels_new_V2 USING btree (transaction_timestamp);
block_timestamp bigint NULL;
"""
)
op.execute(
"""
BEGIN TRANSACTION;
LOCK TABLE ethereum_labels IN ACCESS EXCLUSIVE MODE;
INSERT INTO
ethereum_labels_new_V2 (
id,
label,
label_data,
created_at,
transaction_hash,
address
)
SELECT
ethereum_labels.id as id,
ethereum_labels.label as label,
ethereum_labels.label_data as label_data,
ethereum_labels.created_at as created_at,
ethereum_labels.transaction_hash as transaction_hash,
ethereum_addresses.address as address
FROM
(
SELECT
Row_Number() over (
order by
id
) AS RowIndex,
*
from
ethereum_labels_new_V2
) AS ethereum_labels_new_V2
INNER JOIN (
SELECT
Row_Number() over (
order by
id
) AS RowIndex,
*
from
ethereum_labels
) AS ethereum_labels ON ethereum_labels_old.RowIndex > ethereum_labels.RowIndex
INNER JOIN ethereum_addresses ON ethereum_labels.address_id = ethereum_addresses.id;
/* Rename tabel */
ALTER TABLE
ethereum_labels RENAME TO ethereum_labels_old;
ALTER TABLE
ethereum_labels_new_V2 RENAME TO ethereum_labels;
COMMIT;
"""
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
# op.add_column(
# "ethereum_labels",
# sa.Column("address_id", sa.INTEGER(), autoincrement=False, nullable=True),
# )
# op.create_foreign_key(
# "fk_ethereum_labels_address_id_ethereum_addresses",
# "ethereum_labels",
# "ethereum_addresses",
# ["address_id"],
# ["id"],
# ondelete="CASCADE",
# )
# op.create_index(
# "ix_ethereum_labels_address_id", "ethereum_labels", ["address_id"], unique=False
# )
# op.drop_index(
# op.f("ix_ethereum_labels_transaction_timestamp"), table_name="ethereum_labels"
# )
# op.drop_index(op.f("ix_ethereum_labels_block_number"), table_name="ethereum_labels")
# op.drop_index(op.f("ix_ethereum_labels_address"), table_name="ethereum_labels")
# op.drop_column("ethereum_labels", "transaction_timestamp")
# op.drop_column("ethereum_labels", "log_index")
# op.drop_column("ethereum_labels", "block_number")
# op.execute(
# """ UPDATE ethereum_labels
# SET
# address_id = address.id
# FROM ethereum_labels AS labels
# inner join ethereum_addresses address
# ON labels.address = address.address
# where ethereum_labels.address IS NOT NULL
# """
# )
# op.drop_column("ethereum_labels", "address")
# ### end Alembic commands ###
op.execute(
"""
DROP TABLE ethereum_labels_v2;
"""
)