From 27aa0ce3a537d279b553e1f7ca063040c853538d Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Tue, 14 Sep 2021 20:19:04 +0300 Subject: [PATCH 1/9] Add init version. --- backend/moonstream/providers/__init__.py | 2 + backend/moonstream/providers/bugout.py | 11 ++- .../providers/ethereum_blockchain.py | 76 ++++++++++++++----- backend/moonstream/routes/streams.py | 12 ++- 4 files changed, 76 insertions(+), 25 deletions(-) diff --git a/backend/moonstream/providers/__init__.py b/backend/moonstream/providers/__init__.py index f884fe1b..37a692ee 100644 --- a/backend/moonstream/providers/__init__.py +++ b/backend/moonstream/providers/__init__.py @@ -242,6 +242,7 @@ def previous_event( max_workers=max_threads, thread_name_prefix="event_providers_" ) as executor: for provider_name, provider in event_providers.items(): + print(provider_name, provider) futures[provider_name] = executor.submit( provider.previous_event, db_session, @@ -252,6 +253,7 @@ def previous_event( query, user_subscriptions, ) + print(user_subscriptions) results: Dict[str, data.Event] = {} for provider_name, future in futures.items(): diff --git a/backend/moonstream/providers/bugout.py b/backend/moonstream/providers/bugout.py index 020733be..dad47488 100644 --- a/backend/moonstream/providers/bugout.py +++ b/backend/moonstream/providers/bugout.py @@ -20,6 +20,8 @@ from ..stream_queries import StreamQuery logger = logging.getLogger(__name__) logger.setLevel(logging.WARN) +allowed_tags = ["tag:erc721"] + class BugoutEventProviderError(Exception): """ @@ -297,9 +299,12 @@ class EthereumTXPoolProvider(BugoutEventProvider): ] subscriptions_filters = [] for address in addresses: - subscriptions_filters.extend( - [f"?#from_address:{address}", f"?#to_address:{address}"] - ) + if address in allowed_tags: + subscriptions_filters.append(address) + else: + subscriptions_filters.extend( + [f"?#from_address:{address}", f"?#to_address:{address}"] + ) return subscriptions_filters diff --git a/backend/moonstream/providers/ethereum_blockchain.py b/backend/moonstream/providers/ethereum_blockchain.py index 5a478a84..6ca8304b 100644 --- a/backend/moonstream/providers/ethereum_blockchain.py +++ b/backend/moonstream/providers/ethereum_blockchain.py @@ -8,14 +8,18 @@ from bugout.data import BugoutResource from moonstreamdb.models import ( EthereumBlock, EthereumTransaction, + EthereumAddress, + EthereumLabel, ) from sqlalchemy import or_, and_, text +from sqlalchemy.dialects import postgresql # For Debug remove from sqlalchemy.orm import Session, Query from sqlalchemy.sql.functions import user + from .. import data from ..settings import DEFAULT_STREAM_TIMEINTERVAL -from ..stream_boundaries import validate_stream_boundary +from ..stream_boundaries import validate_stream_boundary, InvalidStreamBoundary from ..stream_queries import StreamQuery @@ -24,6 +28,7 @@ logger.setLevel(logging.WARN) event_type = "ethereum_blockchain" +allowed_tags = ["tag:erc721"] def validate_subscription( @@ -71,6 +76,7 @@ class Filters: from_addresses: List[str] = field(default_factory=list) to_addresses: List[str] = field(default_factory=list) + labels: List[str] = field(default_factory=list) def default_filters(subscriptions: List[BugoutResource]) -> Filters: @@ -83,8 +89,11 @@ def default_filters(subscriptions: List[BugoutResource]) -> Filters: Optional[str], subscription.resource_data.get("address") ) if subscription_address is not None: - filters.from_addresses.append(subscription_address) - filters.to_addresses.append(subscription_address) + if subscription_address in allowed_tags: + filters.labels.append(subscription_address.split(":")[1]) + else: + filters.from_addresses.append(subscription_address) + filters.to_addresses.append(subscription_address) else: logger.warn( f"Could not find subscription address for subscription with resource id: {subscription.id}" @@ -149,14 +158,18 @@ def parse_filters( parsed_filters.from_addresses.append(address) parsed_filters.to_addresses.append(address) - if not (parsed_filters.from_addresses or parsed_filters.to_addresses): + if not ( + parsed_filters.from_addresses + or parsed_filters.to_addresses + or parsed_filters.labels + ): return None return parsed_filters def query_ethereum_transactions( - db_session: Session, stream_boundary: data.StreamBoundary, parsed_filters: Filters + db_session: Session, stream_boundary: data.StreamBoundary, parsed_filters: Filters, ) -> Query: """ Builds a database query for Ethereum transactions that occurred within the window of time that @@ -174,8 +187,7 @@ def query_ethereum_transactions( EthereumTransaction.value, EthereumBlock.timestamp.label("timestamp"), ).join( - EthereumBlock, - EthereumTransaction.block_number == EthereumBlock.block_number, + EthereumBlock, EthereumTransaction.block_number == EthereumBlock.block_number, ) if stream_boundary.include_start: @@ -190,15 +202,41 @@ def query_ethereum_transactions( query = query.filter(EthereumBlock.timestamp <= stream_boundary.end_time) # We want to take a big disjunction (OR) over ALL the filters, be they on "from" address or "to" address - address_clauses = [ - EthereumTransaction.from_address == address - for address in parsed_filters.from_addresses - ] + [ - EthereumTransaction.to_address == address - for address in parsed_filters.to_addresses - ] - if address_clauses: - query = query.filter(or_(*address_clauses)) + address_clauses = [] + + address_clauses.extend( + [ + EthereumTransaction.from_address == address + for address in parsed_filters.from_addresses + ] + + [ + EthereumTransaction.to_address == address + for address in parsed_filters.to_addresses + ] + ) + + labels_clause = [] + + if parsed_filters.labels: + label_clause = ( + db_session.query(EthereumAddress) + .join(EthereumLabel, EthereumAddress.id == EthereumLabel.address_id) + .filter( + or_( + *[ + EthereumLabel.label.contains(label) + for label in list(set(parsed_filters.labels)) + ] + ) + ) + .exists() + ) + labels_clause.append(label_clause) + + subscriptions_clause = address_clauses + labels_clause + + if subscriptions_clause: + query = query.filter(or_(*subscriptions_clause)) return query @@ -345,9 +383,10 @@ def next_event( query_ethereum_transactions(db_session, next_stream_boundary, parsed_filters) .order_by(text("timestamp asc")) .limit(1) - .one_or_none() ) + maybe_ethereum_transaction = maybe_ethereum_transaction.one_or_none() + if maybe_ethereum_transaction is None: return None return ethereum_transaction_event(maybe_ethereum_transaction) @@ -386,9 +425,8 @@ def previous_event( ) .order_by(text("timestamp desc")) .limit(1) - .one_or_none() ) - + maybe_ethereum_transaction = maybe_ethereum_transaction.one_or_none() if maybe_ethereum_transaction is None: return None return ethereum_transaction_event(maybe_ethereum_transaction) diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index 10d0426c..6aec9630 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -69,9 +69,7 @@ def get_user_subscriptions(token: str) -> Dict[str, List[BugoutResource]]: """ response = bc.list_resources( token=token, - params={ - "type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, - }, + params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,}, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) @@ -135,6 +133,7 @@ async def stream_handler( raise_on_error=True, ) except ReceivingEventsException as e: + logger.error(e) logger.error("Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: @@ -178,6 +177,7 @@ async def latest_events_handler( sort_events=True, ) except ReceivingEventsException as e: + logger.error(e) logger.error("Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: @@ -206,6 +206,7 @@ async def next_event_handler( All times must be given as seconds since the Unix epoch. """ + print("next") stream_boundary = data.StreamBoundary( start_time=start_time, end_time=end_time, @@ -233,9 +234,11 @@ async def next_event_handler( raise_on_error=True, ) except ReceivingEventsException as e: + print(e) logger.error("Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: + logger.error(e) logger.error("Unable to get next events") raise MoonstreamHTTPException(status_code=500, internal_error=e) @@ -261,6 +264,7 @@ async def previous_event_handler( All times must be given as seconds since the Unix epoch. """ + print("previous") stream_boundary = data.StreamBoundary( start_time=start_time, end_time=end_time, @@ -288,9 +292,11 @@ async def previous_event_handler( raise_on_error=True, ) except ReceivingEventsException as e: + print(e) logger.error("Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: + logger.error(e) logger.error("Unable to get previous events") raise MoonstreamHTTPException(status_code=500, internal_error=e) From f598596f07469c48b63bed86579a4b4c2a9465cf Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Tue, 14 Sep 2021 20:20:27 +0300 Subject: [PATCH 2/9] Remove print. --- backend/moonstream/providers/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/moonstream/providers/__init__.py b/backend/moonstream/providers/__init__.py index 37a692ee..f884fe1b 100644 --- a/backend/moonstream/providers/__init__.py +++ b/backend/moonstream/providers/__init__.py @@ -242,7 +242,6 @@ def previous_event( max_workers=max_threads, thread_name_prefix="event_providers_" ) as executor: for provider_name, provider in event_providers.items(): - print(provider_name, provider) futures[provider_name] = executor.submit( provider.previous_event, db_session, @@ -253,7 +252,6 @@ def previous_event( query, user_subscriptions, ) - print(user_subscriptions) results: Dict[str, data.Event] = {} for provider_name, future in futures.items(): From eb2d21485926a39478dbb22ca9cc1a2c18ef7e6a Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Tue, 14 Sep 2021 20:21:51 +0300 Subject: [PATCH 3/9] Remove import. --- backend/moonstream/providers/ethereum_blockchain.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/moonstream/providers/ethereum_blockchain.py b/backend/moonstream/providers/ethereum_blockchain.py index 6ca8304b..48641fe0 100644 --- a/backend/moonstream/providers/ethereum_blockchain.py +++ b/backend/moonstream/providers/ethereum_blockchain.py @@ -19,7 +19,7 @@ from sqlalchemy.sql.functions import user from .. import data from ..settings import DEFAULT_STREAM_TIMEINTERVAL -from ..stream_boundaries import validate_stream_boundary, InvalidStreamBoundary +from ..stream_boundaries import validate_stream_boundary from ..stream_queries import StreamQuery From 2f10e470ed070c6a6013e367b06e58fd7f47bc8f Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Tue, 14 Sep 2021 20:25:33 +0300 Subject: [PATCH 4/9] Add logger and remove prints. --- backend/moonstream/routes/streams.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index 6aec9630..a14ad4c0 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -133,8 +133,7 @@ async def stream_handler( raise_on_error=True, ) except ReceivingEventsException as e: - logger.error(e) - logger.error("Error receiving events from provider") + logger.error(f"Error receiving events from provider err: {e}") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get events") @@ -177,8 +176,7 @@ async def latest_events_handler( sort_events=True, ) except ReceivingEventsException as e: - logger.error(e) - logger.error("Error receiving events from provider") + logger.error(f"Error receiving events from provider err: {e}") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get latest events") @@ -206,7 +204,6 @@ async def next_event_handler( All times must be given as seconds since the Unix epoch. """ - print("next") stream_boundary = data.StreamBoundary( start_time=start_time, end_time=end_time, @@ -234,8 +231,7 @@ async def next_event_handler( raise_on_error=True, ) except ReceivingEventsException as e: - print(e) - logger.error("Error receiving events from provider") + logger.error(f"Error receiving events from provider err: {e}") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error(e) @@ -264,7 +260,6 @@ async def previous_event_handler( All times must be given as seconds since the Unix epoch. """ - print("previous") stream_boundary = data.StreamBoundary( start_time=start_time, end_time=end_time, @@ -292,8 +287,7 @@ async def previous_event_handler( raise_on_error=True, ) except ReceivingEventsException as e: - print(e) - logger.error("Error receiving events from provider") + logger.error(f"Error receiving events from provider err: {e}") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error(e) From 8626eac4426d303bb307d9bb64bec46238d0110a Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Tue, 14 Sep 2021 20:28:05 +0300 Subject: [PATCH 5/9] Remove logger. --- backend/moonstream/routes/streams.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index a14ad4c0..7f2965ba 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -234,7 +234,6 @@ async def next_event_handler( logger.error(f"Error receiving events from provider err: {e}") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: - logger.error(e) logger.error("Unable to get next events") raise MoonstreamHTTPException(status_code=500, internal_error=e) @@ -290,7 +289,6 @@ async def previous_event_handler( logger.error(f"Error receiving events from provider err: {e}") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: - logger.error(e) logger.error("Unable to get previous events") raise MoonstreamHTTPException(status_code=500, internal_error=e) From 3f7ff0c151286c2a6dd54e34c296422e0ae23f3c Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Tue, 14 Sep 2021 20:31:59 +0300 Subject: [PATCH 6/9] Remove dialect. --- backend/moonstream/providers/ethereum_blockchain.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/backend/moonstream/providers/ethereum_blockchain.py b/backend/moonstream/providers/ethereum_blockchain.py index 48641fe0..48f282d0 100644 --- a/backend/moonstream/providers/ethereum_blockchain.py +++ b/backend/moonstream/providers/ethereum_blockchain.py @@ -12,7 +12,6 @@ from moonstreamdb.models import ( EthereumLabel, ) from sqlalchemy import or_, and_, text -from sqlalchemy.dialects import postgresql # For Debug remove from sqlalchemy.orm import Session, Query from sqlalchemy.sql.functions import user @@ -383,9 +382,7 @@ def next_event( query_ethereum_transactions(db_session, next_stream_boundary, parsed_filters) .order_by(text("timestamp asc")) .limit(1) - ) - - maybe_ethereum_transaction = maybe_ethereum_transaction.one_or_none() + ).one_or_none() if maybe_ethereum_transaction is None: return None @@ -425,8 +422,7 @@ def previous_event( ) .order_by(text("timestamp desc")) .limit(1) - ) - maybe_ethereum_transaction = maybe_ethereum_transaction.one_or_none() + ).one_or_none() if maybe_ethereum_transaction is None: return None return ethereum_transaction_event(maybe_ethereum_transaction) From d35eded11d404fdd0d9c08a38044f88631364665 Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Wed, 15 Sep 2021 14:54:26 +0300 Subject: [PATCH 7/9] Black formating. --- backend/moonstream/providers/ethereum_blockchain.py | 7 +++++-- backend/moonstream/routes/streams.py | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/backend/moonstream/providers/ethereum_blockchain.py b/backend/moonstream/providers/ethereum_blockchain.py index 48f282d0..22c4fc64 100644 --- a/backend/moonstream/providers/ethereum_blockchain.py +++ b/backend/moonstream/providers/ethereum_blockchain.py @@ -168,7 +168,9 @@ def parse_filters( def query_ethereum_transactions( - db_session: Session, stream_boundary: data.StreamBoundary, parsed_filters: Filters, + db_session: Session, + stream_boundary: data.StreamBoundary, + parsed_filters: Filters, ) -> Query: """ Builds a database query for Ethereum transactions that occurred within the window of time that @@ -186,7 +188,8 @@ def query_ethereum_transactions( EthereumTransaction.value, EthereumBlock.timestamp.label("timestamp"), ).join( - EthereumBlock, EthereumTransaction.block_number == EthereumBlock.block_number, + EthereumBlock, + EthereumTransaction.block_number == EthereumBlock.block_number, ) if stream_boundary.include_start: diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index 7f2965ba..1739242b 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -69,7 +69,9 @@ def get_user_subscriptions(token: str) -> Dict[str, List[BugoutResource]]: """ response = bc.list_resources( token=token, - params={"type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION,}, + params={ + "type": BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, + }, timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, ) From 7cc96ca3fae44532eb62abe943396dbaca113453 Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Thu, 30 Sep 2021 14:47:52 +0300 Subject: [PATCH 8/9] Remove log error. --- backend/moonstream/routes/streams.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index 1739242b..5a2785e7 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -135,7 +135,7 @@ async def stream_handler( raise_on_error=True, ) except ReceivingEventsException as e: - logger.error(f"Error receiving events from provider err: {e}") + logger.error(f"Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get events") @@ -178,7 +178,7 @@ async def latest_events_handler( sort_events=True, ) except ReceivingEventsException as e: - logger.error(f"Error receiving events from provider err: {e}") + logger.error(f"Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get latest events") @@ -233,7 +233,7 @@ async def next_event_handler( raise_on_error=True, ) except ReceivingEventsException as e: - logger.error(f"Error receiving events from provider err: {e}") + logger.error(f"Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get next events") @@ -288,7 +288,7 @@ async def previous_event_handler( raise_on_error=True, ) except ReceivingEventsException as e: - logger.error(f"Error receiving events from provider err: {e}") + logger.error(f"Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get previous events") From b6e8f870e51ffdf4093cd8a06e75609261d23eac Mon Sep 17 00:00:00 2001 From: Andrey Dolgolev Date: Thu, 30 Sep 2021 15:04:56 +0300 Subject: [PATCH 9/9] Remove f strings. --- backend/moonstream/routes/streams.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/moonstream/routes/streams.py b/backend/moonstream/routes/streams.py index 5a2785e7..10d0426c 100644 --- a/backend/moonstream/routes/streams.py +++ b/backend/moonstream/routes/streams.py @@ -135,7 +135,7 @@ async def stream_handler( raise_on_error=True, ) except ReceivingEventsException as e: - logger.error(f"Error receiving events from provider") + logger.error("Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get events") @@ -178,7 +178,7 @@ async def latest_events_handler( sort_events=True, ) except ReceivingEventsException as e: - logger.error(f"Error receiving events from provider") + logger.error("Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get latest events") @@ -233,7 +233,7 @@ async def next_event_handler( raise_on_error=True, ) except ReceivingEventsException as e: - logger.error(f"Error receiving events from provider") + logger.error("Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get next events") @@ -288,7 +288,7 @@ async def previous_event_handler( raise_on_error=True, ) except ReceivingEventsException as e: - logger.error(f"Error receiving events from provider") + logger.error("Error receiving events from provider") raise MoonstreamHTTPException(status_code=500, internal_error=e) except Exception as e: logger.error("Unable to get previous events")