Add changes for labels table.

pull/313/head
Andrey Dolgolev 2021-10-14 18:01:00 +03:00
rodzic ac60de5979
commit 6890d38097
7 zmienionych plików z 186 dodań i 161 usunięć

Wyświetl plik

@ -46,8 +46,12 @@ def get_contract_source_info(
if id is None:
return None
labels = (
db_session.query(EthereumLabel).filter(EthereumLabel.address_id == id[0]).all()
db_session.query(EthereumLabel)
.filter(EthereumLabel.address == contract_address)
.all()
)
if not labels:
return None
for label in labels:
if label.label == ETHERSCAN_SMARTCONTRACT_LABEL_NAME:
@ -80,12 +84,6 @@ class LabelNames(Enum):
def get_ethereum_address_info(
db_session: Session, address: str
) -> Optional[data.EthereumAddressInfo]:
query = db_session.query(EthereumAddress.id).filter(
EthereumAddress.address == address
)
id = query.one_or_none()
if id is None:
return None
address_info = data.EthereumAddressInfo(address=address)
etherscan_address_url = f"https://etherscan.io/address/{address}"
@ -94,7 +92,7 @@ def get_ethereum_address_info(
# Checking for token:
coinmarketcap_label: Optional[EthereumLabel] = (
db_session.query(EthereumLabel)
.filter(EthereumLabel.address_id == id[0])
.filter(EthereumLabel.address == address)
.filter(EthereumLabel.label == LabelNames.COINMARKETCAP_TOKEN.value)
.order_by(text("created_at desc"))
.limit(1)
@ -114,7 +112,7 @@ def get_ethereum_address_info(
# Checking for smart contract
etherscan_label: Optional[EthereumLabel] = (
db_session.query(EthereumLabel)
.filter(EthereumLabel.address_id == id[0])
.filter(EthereumLabel.address == address)
.filter(EthereumLabel.label == LabelNames.ETHERSCAN_SMARTCONTRACT.value)
.order_by(text("created_at desc"))
.limit(1)
@ -130,7 +128,7 @@ def get_ethereum_address_info(
# Checking for smart contract
erc721_label: Optional[EthereumLabel] = (
db_session.query(EthereumLabel)
.filter(EthereumLabel.address_id == id[0])
.filter(EthereumLabel.address == address)
.filter(EthereumLabel.label == LabelNames.ERC721.value)
.order_by(text("created_at desc"))
.limit(1)
@ -152,24 +150,23 @@ def get_address_labels(
"""
Attach labels to addresses.
"""
query = db_session.query(EthereumAddress)
if addresses is not None:
addresses_list = addresses.split(",")
query = query.filter(EthereumAddress.address.in_(addresses_list))
addresses_obj = query.order_by(EthereumAddress.id).slice(start, start + limit).all()
addresses_obj = addresses_list[start : start + limit]
else:
addresses_obj = []
addresses_response = data.AddressListLabelsResponse(addresses=[])
for address in addresses_obj:
labels_obj = (
db_session.query(EthereumLabel)
.filter(EthereumLabel.address_id == address.id)
.filter(EthereumLabel.address == address)
.all()
)
addresses_response.addresses.append(
data.AddressLabelsResponse(
address=address.address,
address=address,
labels=[
data.AddressLabelResponse(
label=label.label, label_data=label.label_data

Wyświetl plik

@ -229,8 +229,7 @@ def query_ethereum_transactions(
if parsed_filters.labels:
label_clause = (
db_session.query(EthereumAddress)
.join(EthereumLabel, EthereumAddress.id == EthereumLabel.address_id)
db_session.query(EthereumLabel)
.filter(
or_(
*[

Wyświetl plik

@ -55,31 +55,6 @@ def push_to_bucket(contract_data: Dict[str, Any], contract_file: str):
)
def get_address_id(db_session: Session, contract_address: str) -> int:
"""
Searches for given address in EthereumAddress table,
If doesn't find one, creates new.
Returns id of address
"""
query = db_session.query(EthereumAddress.id).filter(
EthereumAddress.address == contract_address
)
id = query.one_or_none()
if id is not None:
return id[0]
smart_contract = EthereumAddress(
address=contract_address,
)
try:
db_session.add(smart_contract)
db_session.commit()
return smart_contract.id
except Exception as e:
db_session.rollback()
raise e
def crawl_step(db_session: Session, contract: VerifiedSmartContract, crawl_url: str):
attempt = 0
current_interval = 2
@ -110,10 +85,9 @@ def crawl_step(db_session: Session, contract: VerifiedSmartContract, crawl_url:
push_to_bucket(contract_info, object_key)
try:
eth_address_id = get_address_id(db_session, contract.address)
eth_label = EthereumLabel(
label=ETHERSCAN_SMARTCONTRACTS_LABEL_NAME,
address_id=eth_address_id,
address_id=contract.address,
label_data={
"object_uri": f"s3://{bucket}/{object_key}",
"name": contract.name,

Wyświetl plik

@ -58,15 +58,6 @@ def identities_cmc_add_handler(args: argparse.Namespace) -> None:
break
with yield_db_session_ctx() as db_session:
latest_address = 1
latest_address_obj = (
db_session.query(EthereumAddress.id)
.order_by(text("id desc"))
.limit(1)
.one_or_none()
)
if latest_address_obj is not None:
latest_address = latest_address_obj[0]
for coin in response["data"]:
if coin["platform"] is not None:
@ -75,34 +66,15 @@ def identities_cmc_add_handler(args: argparse.Namespace) -> None:
and coin["platform"]["token_address"] is not None
):
token_address = coin["platform"]["token_address"]
# Check if address already exists
address = (
db_session.query(EthereumAddress)
.filter(EthereumAddress.address == token_address)
.one_or_none()
)
# Add new address
if address is None:
latest_address += 1
eth_token_id = latest_address
eth_token = EthereumAddress(
id=eth_token_id,
address=token_address,
)
db_session.add(eth_token)
logger.info(f"Added {coin['name']} token")
else:
eth_token_id = address.id
label = (
db_session.query(EthereumLabel)
.filter(EthereumLabel.address_id == eth_token_id)
.filter(EthereumLabel.address == token_address)
.one_or_none()
)
if label is None:
eth_token_label = EthereumLabel(
label="coinmarketcap_token",
address_id=eth_token_id,
address=token_address,
label_data={
"name": coin["name"],
"symbol": coin["symbol"],

Wyświetl plik

@ -12,7 +12,7 @@ from typing import Any, cast, Dict, Optional
from bugout.app import Bugout
from bugout.journal import SearchOrder
from moonstreamdb.db import yield_db_session_ctx
from moonstreamdb.models import EthereumBlock, EthereumTransaction, EthereumLabel
from moonstreamdb.models import EthereumBlock
from sqlalchemy.orm.session import Session
from web3 import Web3

Wyświetl plik

@ -19,87 +19,178 @@ depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"ethereum_labels", sa.Column("address", sa.VARCHAR(length=256), nullable=True)
)
op.add_column(
"ethereum_labels", sa.Column("block_number", sa.BigInteger(), nullable=True)
)
op.add_column(
"ethereum_labels",
sa.Column("log_index", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
)
op.add_column(
"ethereum_labels",
sa.Column("transaction_timestamp", sa.BigInteger(), nullable=True),
)
op.create_index(
op.f("ix_ethereum_labels_address"), "ethereum_labels", ["address"], unique=False
)
op.create_index(
op.f("ix_ethereum_labels_block_number"),
"ethereum_labels",
["block_number"],
unique=False,
)
op.create_index(
op.f("ix_ethereum_labels_transaction_timestamp"),
"ethereum_labels",
["transaction_timestamp"],
unique=False,
)
# via
###### insert into table
op.execute(
"""CREATE TABLE ethereum_labels_new_V2 AS
SELECT
ethereum_labels.id as id,
ethereum_labels.label as label,
ethereum_labels.label_data as label_data,
ethereum_labels.created_at as created_at,
ethereum_labels.transaction_hash as transaction_hash,
ethereum_addresses.address as address
FROM
ethereum_labels
left join ethereum_addresses ON ethereum_labels.address_id = ethereum_addresses.id;
/* Set correct columns metadata columns TYPE | Nullable | Default */
ALTER TABLE
ethereum_labels_new_V2
ALTER COLUMN
id
SET
NOT NULL,
ALTER COLUMN
label
SET
NOT NULL,
ALTER COLUMN
created_at TYPE timestamp with time zone,
ALTER COLUMN
created_at
SET
DEFAULT timezone('utc' :: text, statement_timestamp()),
ALTER COLUMN
created_at
SET
NOT NULL,
ALTER COLUMN
transaction_hash TYPE character varying(256),
ALTER COLUMN
address TYPE character varying(256);
/* ADD new columns */
ALTER TABLE
ethereum_labels_new_V2
ADD
log_index JSONB NULL,
ADD
block_number bigint NULl,
ADD
transaction_timestamp bigint NULL;
/* Create constraine must be unique cross database */
ALTER TABLE
ONLY public.ethereum_labels_new_V2
ADD
CONSTRAINT pk_ethereum_labels_V2 PRIMARY KEY (id);
ALTER TABLE
ONLY public.ethereum_labels_new_V2
ADD
CONSTRAINT uq_ethereum_labels_id_V2 UNIQUE (id);
/* Create indexes must be unique cross database */
CREATE INDEX idx_ethereum_labels_opensea_nft_name_V2 ON public.ethereum_labels_new_V2 USING btree (((label_data ->> 'name' :: text)))
WHERE
((label) :: text = 'opensea_nft' :: text);
CREATE INDEX ix_ethereum_labels_address_V2 ON public.ethereum_labels_new_V2 USING btree (address);
CREATE INDEX ix_ethereum_labels_block_number_V2 ON public.ethereum_labels_new_V2 USING btree (block_number);
CREATE INDEX ix_ethereum_labels_label_V2 ON public.ethereum_labels_new_V2 USING btree (label);
CREATE INDEX ix_ethereum_labels_transaction_hash_V2 ON public.ethereum_labels_new_V2 USING btree (transaction_hash);
CREATE INDEX ix_ethereum_labels_transaction_timestamp_V2 ON public.ethereum_labels_new_V2 USING btree (transaction_timestamp);
"""
)
op.execute(
""" UPDATE ethereum_labels
SET address = ethereum_addresses.address
FROM ethereum_labels AS labels
inner join ethereum_addresses
ON labels.address_id = ethereum_addresses.id
where ethereum_labels.address_id is not null;
"""
BEGIN TRANSACTION;
LOCK TABLE ethereum_labels IN ACCESS EXCLUSIVE MODE;
INSERT INTO
ethereum_labels_new_V2 (
id,
label,
label_data,
created_at,
transaction_hash,
address
)
SELECT
ethereum_labels.id as id,
ethereum_labels.label as label,
ethereum_labels.label_data as label_data,
ethereum_labels.created_at as created_at,
ethereum_labels.transaction_hash as transaction_hash,
ethereum_addresses.address as address
FROM
(
SELECT
Row_Number() over (
order by
id
) AS RowIndex,
*
from
ethereum_labels_new_V2
) AS ethereum_labels_new_V2
INNER JOIN (
SELECT
Row_Number() over (
order by
id
) AS RowIndex,
*
from
ethereum_labels
) AS ethereum_labels ON ethereum_labels_old.RowIndex > ethereum_labels.RowIndex
INNER JOIN ethereum_addresses ON ethereum_labels.address_id = ethereum_addresses.id;
/* Rename tabel */
ALTER TABLE
ethereum_labels RENAME TO ethereum_labels_old;
ALTER TABLE
ethereum_labels_new_V2 RENAME TO ethereum_labels;
COMMIT;
"""
)
op.drop_index("ix_ethereum_labels_address_id", table_name="ethereum_labels")
op.drop_constraint(
"fk_ethereum_labels_address_id_ethereum_addresses",
"ethereum_labels",
type_="foreignkey",
)
op.drop_column("ethereum_labels", "address_id")
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"ethereum_labels",
sa.Column("address_id", sa.INTEGER(), autoincrement=False, nullable=True),
)
op.create_foreign_key(
"fk_ethereum_labels_address_id_ethereum_addresses",
"ethereum_labels",
"ethereum_addresses",
["address_id"],
["id"],
ondelete="CASCADE",
)
op.create_index(
"ix_ethereum_labels_address_id", "ethereum_labels", ["address_id"], unique=False
)
op.drop_index(
op.f("ix_ethereum_labels_transaction_timestamp"), table_name="ethereum_labels"
)
op.drop_index(op.f("ix_ethereum_labels_block_number"), table_name="ethereum_labels")
op.drop_index(op.f("ix_ethereum_labels_address"), table_name="ethereum_labels")
op.drop_column("ethereum_labels", "transaction_timestamp")
op.drop_column("ethereum_labels", "log_index")
op.drop_column("ethereum_labels", "block_number")
op.execute(
""" UPDATE ethereum_labels
SET
address_id = address.id
FROM ethereum_labels labels
left join ethereum_addresses address
ON labels.address = address.address
"""
)
op.drop_column("ethereum_labels", "address")
# op.add_column(
# "ethereum_labels",
# sa.Column("address_id", sa.INTEGER(), autoincrement=False, nullable=True),
# )
# op.create_foreign_key(
# "fk_ethereum_labels_address_id_ethereum_addresses",
# "ethereum_labels",
# "ethereum_addresses",
# ["address_id"],
# ["id"],
# ondelete="CASCADE",
# )
# op.create_index(
# "ix_ethereum_labels_address_id", "ethereum_labels", ["address_id"], unique=False
# )
# op.drop_index(
# op.f("ix_ethereum_labels_transaction_timestamp"), table_name="ethereum_labels"
# )
# op.drop_index(op.f("ix_ethereum_labels_block_number"), table_name="ethereum_labels")
# op.drop_index(op.f("ix_ethereum_labels_address"), table_name="ethereum_labels")
# op.drop_column("ethereum_labels", "transaction_timestamp")
# op.drop_column("ethereum_labels", "log_index")
# op.drop_column("ethereum_labels", "block_number")
# op.execute(
# """ UPDATE ethereum_labels
# SET
# address_id = address.id
# FROM ethereum_labels AS labels
# inner join ethereum_addresses address
# ON labels.address = address.address
# where ethereum_labels.address IS NOT NULL
# """
# )
# op.drop_column("ethereum_labels", "address")
# ### end Alembic commands ###

Wyświetl plik

@ -16,17 +16,9 @@ def labels_add_handler(args: argparse.Namespace) -> None:
raise ValueError("Unable to parse data as dictionary")
with yield_db_session_ctx() as db_session:
address = (
db_session.query(EthereumAddress)
.filter(EthereumAddress.address == str(args.address))
.one_or_none()
)
if address is None:
print(f"There is no {args.address} address")
return
label = EthereumLabel(
label=args.label, address_id=address.id, label_data=label_data
label=args.label, address=str(args.address), label_data=label_data
)
db_session.add(label)
db_session.commit()
@ -36,7 +28,7 @@ def labels_add_handler(args: argparse.Namespace) -> None:
{
"id": str(label.id),
"label": str(label.label),
"address_id": str(label.address_id),
"address_id": str(label.address),
"label_data": str(label.label_data),
"created_at": str(label.created_at),
}