From 2f4c9956873e8c30f9fa8a49f6a1577c6ffdacf2 Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Tue, 19 Oct 2021 23:02:10 +0300 Subject: [PATCH] Add fixes. --- crawlers/mooncrawl/mooncrawl/nft/ethereum.py | 80 ++---- ...drop_etherium_addresses_and_address_id_.py | 264 ++++++++++++++++++ ...f_add_log_index_block_number_timestamp_.py | 134 +-------- 3 files changed, 294 insertions(+), 184 deletions(-) create mode 100644 db/alembic/versions/240476c67b9f_drop_etherium_addresses_and_address_id_.py diff --git a/crawlers/mooncrawl/mooncrawl/nft/ethereum.py b/crawlers/mooncrawl/mooncrawl/nft/ethereum.py index 794cfe91..260eb25b 100644 --- a/crawlers/mooncrawl/mooncrawl/nft/ethereum.py +++ b/crawlers/mooncrawl/mooncrawl/nft/ethereum.py @@ -260,46 +260,9 @@ def get_block_bounds( return start, end -def ensure_addresses(db_session: Session, addresses: Set[str]) -> Dict[str, int]: - """ - Ensures that the given addresses are registered in the ethereum_addresses table of the given - moonstreamdb database connection. Returns a mapping from the addresses to the ids of their - corresponding row in the ethereum_addresses table. - - Returns address_ids for *every* address, not just the new ones. - """ - if len(addresses) == 0: - return {} - - # SQLAlchemy reference: - # https://docs.sqlalchemy.org/en/14/orm/persistence_techniques.html#using-postgresql-on-conflict-with-returning-to-return-upserted-orm-objects - stmt = ( - insert(EthereumAddress) - .values([{"address": address} for address in addresses]) - .on_conflict_do_nothing(index_elements=[EthereumAddress.address]) - ) - - try: - db_session.execute(stmt) - db_session.commit() - except Exception: - db_session.rollback() - raise - - rows = ( - db_session.query(EthereumAddress) - .filter(EthereumAddress.address.in_(addresses)) - .all() - ) - address_ids = {address.address: address.id for address in rows} - return address_ids - - -def label_erc721_addresses( - w3: Web3, db_session: Session, address_ids: List[Tuple[str, int]] -) -> None: +def label_erc721_addresses(w3: Web3, db_session: Session, addresses: List[str]) -> None: labels: List[EthereumLabel] = [] - for address, id in address_ids: + for address in addresses: try: contract_info = get_erc721_contract_info(w3, address) @@ -319,7 +282,7 @@ def label_erc721_addresses( labels.append( EthereumLabel( - address_id=id, + address=address, label=NFT_LABEL, label_data={ "name": contract_name, @@ -342,7 +305,7 @@ def label_erc721_addresses( def label_key(label: EthereumLabel) -> Tuple[str, int, int, str, str]: return ( label.transaction_hash, - label.address_id, + label.address, label.label_data["tokenId"], label.label_data["from"], label.label_data["to"], @@ -350,7 +313,7 @@ def label_key(label: EthereumLabel) -> Tuple[str, int, int, str, str]: def label_transfers( - db_session: Session, transfers: List[NFTTransfer], address_ids: Dict[str, int] + db_session: Session, transfers: List[NFTTransfer], addresses: Set[str] ) -> None: """ Adds "nft_mint" or "nft_transfer" to the (transaction, address) pair represented by each of the @@ -361,10 +324,9 @@ def label_transfers( for transfer in transfers: transaction_hash = transfer.transfer_tx transaction_hashes.append(transaction_hash) - address_id = address_ids.get(transfer.contract_address) label = MINT_LABEL if transfer.is_mint else TRANSFER_LABEL row = EthereumLabel( - address_id=address_id, + address=transfer.contract_address, transaction_hash=transaction_hash, label=label, label_data={ @@ -377,7 +339,7 @@ def label_transfers( existing_labels = ( db_session.query(EthereumLabel) - .filter(EthereumLabel.address_id.in_(address_ids.values())) + .filter(EthereumLabel.address.in_(addresses)) .filter(EthereumLabel.transaction_hash.in_(transaction_hashes)) ).all() existing_label_keys = {label_key(label) for label in existing_labels} @@ -471,29 +433,23 @@ def add_labels( contract_address=contract_address, ) contract_addresses = {transfer.contract_address for transfer in job} - updated_address_ids = ensure_addresses(db_session, contract_addresses) - address_ids: Dict[str, int] = {} - for address, address_id in updated_address_ids.items(): - address_ids[address] = address_id - labelled_address_ids = [ - label.address_id + labelled_address = [ + label.address for label in ( db_session.query(EthereumLabel) .filter(EthereumLabel.label == NFT_LABEL) - .filter(EthereumLabel.address_id.in_(address_ids.values())) + .filter(EthereumLabel.address.in_(contract_addresses)) .all() ) ] - unlabelled_address_ids = [ - (address, address_id) - for address, address_id in address_ids.items() - if address_id not in labelled_address_ids + unlabelled_address = [ + address for address in contract_addresses if address not in labelled_address ] # Add 'erc721' labels try: - label_erc721_addresses(w3, db_session, unlabelled_address_ids) + label_erc721_addresses(w3, db_session, unlabelled_address) except Exception as e: reporter.error_report( e, @@ -507,7 +463,7 @@ def add_labels( # Add mint/transfer labels to (transaction, contract_address) pairs try: - label_transfers(db_session, job, updated_address_ids) + label_transfers(db_session, job, contract_addresses) except Exception as e: reporter.error_report( e, @@ -555,7 +511,7 @@ def block_bounded_summary( db_session.query( EthereumLabel.label, EthereumLabel.label_data, - EthereumLabel.address_id, + EthereumLabel.address, EthereumTransaction.hash, EthereumTransaction.value, EthereumBlock.block_number, @@ -580,7 +536,7 @@ def block_bounded_summary( def holder_query(label: str) -> Query: query = ( db_session.query( - EthereumLabel.address_id.label("address_id"), + EthereumLabel.address.label("address"), EthereumLabel.label_data["to"].astext.label("owner_address"), EthereumLabel.label_data["tokenId"].astext.label("token_id"), EthereumTransaction.block_number.label("block_number"), @@ -602,10 +558,10 @@ def block_bounded_summary( # does not seem to be deterministic. # Maybe relevant Stackoverflow post: https://stackoverflow.com/a/59410440 text( - "address_id, token_id, block_number desc, transaction_index desc, transfer_value, owner_address" + "address, token_id, block_number desc, transaction_index desc, transfer_value, owner_address" ) ) - .distinct("address_id", "token_id") + .distinct("address", "token_id") ) return query diff --git a/db/alembic/versions/240476c67b9f_drop_etherium_addresses_and_address_id_.py b/db/alembic/versions/240476c67b9f_drop_etherium_addresses_and_address_id_.py new file mode 100644 index 00000000..0c5fd0b8 --- /dev/null +++ b/db/alembic/versions/240476c67b9f_drop_etherium_addresses_and_address_id_.py @@ -0,0 +1,264 @@ +"""Drop etherium_addresses and address_id column + +Revision ID: 240476c67b9f +Revises: f1e8cf50a3ff +Create Date: 2021-10-19 14:49:07.905565 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "240476c67b9f" +down_revision = "f1e8cf50a3ff" +branch_labels = None +depends_on = None + + +def upgrade(): + + op.execute( + """ + LOCK TABLE ethereum_labels IN ACCESS EXCLUSIVE MODE; + + INSERT INTO + ethereum_labels_v2 ( + id, + label, + label_data, + created_at, + transaction_hash, + address + ) + SELECT + ethereum_labels.id as id, + ethereum_labels.label as label, + ethereum_labels.label_data as label_data, + ethereum_labels.created_at as created_at, + ethereum_labels.transaction_hash as transaction_hash, + ethereum_addresses.address as address + FROM + ( + SELECT + Row_Number() over ( + order by + id + ) AS RowIndex, + * + from + ethereum_labels + ) AS ethereum_labels + INNER JOIN ethereum_addresses ON RowIndex > (select count(*) from ethereum_labels_v2 ) and ethereum_labels.address_id = ethereum_addresses.id; + + """ + ) + + op.execute( + "ALTER TABLE IF EXISTS ethereum_labels DROP CONSTRAINT IF EXISTS fk_ethereum_labels_address_id_ethereum_addresses;" + ) + + op.execute( + "ALTER TABLE IF EXISTS ethereum_addresses DROP CONSTRAINT IF EXISTS fk_ethereum_smart_contracts_transaction_hash_ethereum_t_f928;" + ) + + op.execute( + """ + /* Rename tabels */ + ALTER TABLE + ethereum_labels RENAME TO ethereum_labels_v1; + + ALTER TABLE + ethereum_labels_v2 RENAME TO ethereum_labels; + """ + ) + + op.execute( + """ + ALTER INDEX pk_ethereum_labels RENAME TO pk_ethereum_labels_v1; + ALTER INDEX idx_ethereum_labels_opensea_nft_name RENAME TO idx_ethereum_labels_opensea_nft_name_v1; + ALTER INDEX ix_ethereum_labels_label RENAME TO ix_ethereum_labels_label_v1; + ALTER INDEX ix_ethereum_labels_transaction_hash RENAME TO ix_ethereum_labels_transaction_hash_v1; + ALTER INDEX uq_ethereum_labels_id RENAME TO uq_ethereum_labels_id_v1; + """ + ) + + op.execute( + """ + + ALTER TABLE + ONLY public.ethereum_labels + ADD + CONSTRAINT pk_ethereum_labels PRIMARY KEY (id); + + ALTER TABLE + ONLY public.ethereum_labels + ADD + CONSTRAINT uq_ethereum_labels_id UNIQUE (id); + + /* Create indexes must be unique cross database */ + CREATE INDEX idx_ethereum_labels_opensea_nft_name ON public.ethereum_labels USING btree (((label_data ->> 'name' :: text))) + WHERE + ((label) :: text = 'opensea_nft' :: text); + + CREATE INDEX ix_ethereum_labels_address ON public.ethereum_labels USING btree (address); + + CREATE INDEX ix_ethereum_labels_block_number ON public.ethereum_labels USING btree (block_number); + + CREATE INDEX ix_ethereum_labels_label ON public.ethereum_labels USING btree (label); + + CREATE INDEX ix_ethereum_labels_transaction_hash ON public.ethereum_labels USING btree (transaction_hash); + + CREATE INDEX ix_ethereum_labels_block_timestamp ON public.ethereum_labels USING btree (block_timestamp); + + """ + ) + + op.execute( + """ + DROP TABLE ethereum_addresses; + """ + ) + + +def downgrade(): + + op.execute( + """ + CREATE TABLE public.ethereum_addresses ( + id integer NOT NULL, + transaction_hash character varying(256), + address character varying(256) NOT NULL, + created_at timestamp with time zone DEFAULT timezone('utc'::text, statement_timestamp()) NOT NULL + ); + + + ALTER TABLE public.ethereum_addresses OWNER TO postgres; + + CREATE UNIQUE INDEX ix_ethereum_addresses_address ON public.ethereum_addresses USING btree (address); + + CREATE INDEX ix_ethereum_addresses_transaction_hash ON public.ethereum_addresses USING btree (transaction_hash); + + """ + ) + + # sequence creation + + op.execute( + """ + + INSERT INTO + ethereum_addresses ( + id, + address + ) + SELECT + distinct(ethereum_labels.address_id) as id, + ethereum_labels.address as address + FROM + ethereum_labels + where address_id IS NOT NULL + order by id; + + """ + ) + + conn = op.get_bind() + latest_id = conn.execute( + "select MAX(address_id) + 1 from ethereum_labels" + ).fetchall() + + if latest_id: + max_id = latest_id[0][0] + else: + max_id = 1 + + op.execute( + f"CREATE SEQUENCE public.ethereum_smart_contracts_id_seq INCREMENT BY 1 START WITH {max_id} NO MINVALUE NO MAXVALUE CACHE 1" + ) + + # id column settings + op.execute( + """ + + ALTER TABLE public.ethereum_smart_contracts_id_seq OWNER TO postgres; + + ALTER SEQUENCE public.ethereum_smart_contracts_id_seq OWNED BY public.ethereum_addresses.id; + + ALTER TABLE ONLY public.ethereum_addresses ALTER COLUMN id SET DEFAULT nextval('public.ethereum_smart_contracts_id_seq'::regclass); + + ALTER TABLE ONLY public.ethereum_addresses ADD CONSTRAINT pk_ethereum_smart_contracts PRIMARY KEY (id); + + """ + ) + + op.execute( + """ + + INSERT INTO + ethereum_addresses ( + address + ) + select result.address from ( + SELECT + ethereum_labels.address as address + FROM + ethereum_labels + where address_id IS NULL + EXCEPT + SELECT + ethereum_labels.address as address + FROM + ethereum_labels + where address_id IS NOT NULL + ) AS result + """ + ) + + op.execute( + "ALTER TABLE IF EXISTS ethereum_labels DROP CONSTRAINT IF EXISTS pk_ethereum_labels;" + ) + + op.execute( + "ALTER TABLE IF EXISTS ethereum_labels DROP CONSTRAINT IF EXISTS uq_ethereum_labels_id;" + ) + + op.execute( + """ + DROP INDEX IF EXISTS pk_ethereum_labels; + DROP INDEX IF EXISTS idx_ethereum_labels_opensea_nft_name; + DROP INDEX IF EXISTS ix_ethereum_labels_address; + DROP INDEX IF EXISTS ix_ethereum_labels_label; + DROP INDEX IF EXISTS ix_ethereum_labels_transaction_hash; + DROP INDEX IF EXISTS ix_ethereum_labels_block_number; + DROP INDEX IF EXISTS ix_ethereum_labels_block_timestamp; + DROP INDEX IF EXISTS uq_ethereum_labels_id; + """ + ) + + op.execute( + """ + ALTER TABLE ethereum_labels RENAME TO ethereum_labels_v2; + + ALTER TABLE ethereum_labels_v1 RENAME TO ethereum_labels; + """ + ) + + op.execute( + """ + ALTER INDEX pk_ethereum_labels_v1 RENAME TO pk_ethereum_labels; + ALTER INDEX idx_ethereum_labels_opensea_nft_name_v1 RENAME TO idx_ethereum_labels_opensea_nft_name; + ALTER INDEX ix_ethereum_labels_label_v1 RENAME TO ix_ethereum_labels_label; + ALTER INDEX ix_ethereum_labels_transaction_hash_v1 RENAME TO ix_ethereum_labels_transaction_hash; + ALTER INDEX uq_ethereum_labels_id_v1 RENAME TO uq_ethereum_labels_id; + """ + ) + + op.create_foreign_key( + "fk_ethereum_labels_address_id_ethereum_addresses", + "ethereum_labels", + "ethereum_addresses", + ["address_id"], + ["id"], + ondelete="CASCADE", + ) diff --git a/db/alembic/versions/f1e8cf50a3ff_add_log_index_block_number_timestamp_.py b/db/alembic/versions/f1e8cf50a3ff_add_log_index_block_number_timestamp_.py index a1d1f985..d699075e 100644 --- a/db/alembic/versions/f1e8cf50a3ff_add_log_index_block_number_timestamp_.py +++ b/db/alembic/versions/f1e8cf50a3ff_add_log_index_block_number_timestamp_.py @@ -19,17 +19,17 @@ depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - # via ###### insert into table op.execute( - """CREATE TABLE ethereum_labels_new_V2 AS + """CREATE TABLE ethereum_labels_v2 AS SELECT ethereum_labels.id as id, ethereum_labels.label as label, ethereum_labels.label_data as label_data, ethereum_labels.created_at as created_at, ethereum_labels.transaction_hash as transaction_hash, + ethereum_addresses.id as address_id, ethereum_addresses.address as address FROM ethereum_labels @@ -37,7 +37,7 @@ def upgrade(): /* Set correct columns metadata columns TYPE | Nullable | Default */ ALTER TABLE - ethereum_labels_new_V2 + ethereum_labels_v2 ALTER COLUMN id SET @@ -63,134 +63,24 @@ def upgrade(): /* ADD new columns */ ALTER TABLE - ethereum_labels_new_V2 + ethereum_labels_v2 ADD - log_index JSONB NULL, + log_index INTEGER, ADD block_number bigint NULl, ADD - transaction_timestamp bigint NULL; - - /* Create constraine must be unique cross database */ - ALTER TABLE - ONLY public.ethereum_labels_new_V2 - ADD - CONSTRAINT pk_ethereum_labels_V2 PRIMARY KEY (id); - - ALTER TABLE - ONLY public.ethereum_labels_new_V2 - ADD - CONSTRAINT uq_ethereum_labels_id_V2 UNIQUE (id); - - /* Create indexes must be unique cross database */ - CREATE INDEX idx_ethereum_labels_opensea_nft_name_V2 ON public.ethereum_labels_new_V2 USING btree (((label_data ->> 'name' :: text))) - WHERE - ((label) :: text = 'opensea_nft' :: text); - - CREATE INDEX ix_ethereum_labels_address_V2 ON public.ethereum_labels_new_V2 USING btree (address); - - CREATE INDEX ix_ethereum_labels_block_number_V2 ON public.ethereum_labels_new_V2 USING btree (block_number); - - CREATE INDEX ix_ethereum_labels_label_V2 ON public.ethereum_labels_new_V2 USING btree (label); - - CREATE INDEX ix_ethereum_labels_transaction_hash_V2 ON public.ethereum_labels_new_V2 USING btree (transaction_hash); - - CREATE INDEX ix_ethereum_labels_transaction_timestamp_V2 ON public.ethereum_labels_new_V2 USING btree (transaction_timestamp); + block_timestamp bigint NULL; + """ ) - op.execute( - """ - BEGIN TRANSACTION; - - LOCK TABLE ethereum_labels IN ACCESS EXCLUSIVE MODE; - - INSERT INTO - ethereum_labels_new_V2 ( - id, - label, - label_data, - created_at, - transaction_hash, - address - ) - SELECT - ethereum_labels.id as id, - ethereum_labels.label as label, - ethereum_labels.label_data as label_data, - ethereum_labels.created_at as created_at, - ethereum_labels.transaction_hash as transaction_hash, - ethereum_addresses.address as address - FROM - ( - SELECT - Row_Number() over ( - order by - id - ) AS RowIndex, - * - from - ethereum_labels_new_V2 - ) AS ethereum_labels_new_V2 - INNER JOIN ( - SELECT - Row_Number() over ( - order by - id - ) AS RowIndex, - * - from - ethereum_labels - ) AS ethereum_labels ON ethereum_labels_old.RowIndex > ethereum_labels.RowIndex - INNER JOIN ethereum_addresses ON ethereum_labels.address_id = ethereum_addresses.id; - - /* Rename tabel */ - ALTER TABLE - ethereum_labels RENAME TO ethereum_labels_old; - - ALTER TABLE - ethereum_labels_new_V2 RENAME TO ethereum_labels; - - COMMIT; - """ - ) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### - # op.add_column( - # "ethereum_labels", - # sa.Column("address_id", sa.INTEGER(), autoincrement=False, nullable=True), - # ) - # op.create_foreign_key( - # "fk_ethereum_labels_address_id_ethereum_addresses", - # "ethereum_labels", - # "ethereum_addresses", - # ["address_id"], - # ["id"], - # ondelete="CASCADE", - # ) - # op.create_index( - # "ix_ethereum_labels_address_id", "ethereum_labels", ["address_id"], unique=False - # ) - # op.drop_index( - # op.f("ix_ethereum_labels_transaction_timestamp"), table_name="ethereum_labels" - # ) - # op.drop_index(op.f("ix_ethereum_labels_block_number"), table_name="ethereum_labels") - # op.drop_index(op.f("ix_ethereum_labels_address"), table_name="ethereum_labels") - # op.drop_column("ethereum_labels", "transaction_timestamp") - # op.drop_column("ethereum_labels", "log_index") - # op.drop_column("ethereum_labels", "block_number") - # op.execute( - # """ UPDATE ethereum_labels - # SET - # address_id = address.id - # FROM ethereum_labels AS labels - # inner join ethereum_addresses address - # ON labels.address = address.address - # where ethereum_labels.address IS NOT NULL - # """ - # ) - # op.drop_column("ethereum_labels", "address") - # ### end Alembic commands ### + op.execute( + """ + DROP TABLE ethereum_labels_v2; + """ + )