kopia lustrzana https://github.com/bugout-dev/moonstream
				
				
				
			
						commit
						8efa9858ac
					
				|  | @ -47,6 +47,8 @@ POLYGON_TXPOOL_SERVICE_FILE="polygon-txpool.service" | ||||||
| POLYGON_MOONWORM_CRAWLER_SERVICE_FILE="polygon-moonworm-crawler.service" | POLYGON_MOONWORM_CRAWLER_SERVICE_FILE="polygon-moonworm-crawler.service" | ||||||
| POLYGON_STATE_SERVICE_FILE="polygon-state.service" | POLYGON_STATE_SERVICE_FILE="polygon-state.service" | ||||||
| POLYGON_STATE_TIMER_FILE="polygon-state.timer" | POLYGON_STATE_TIMER_FILE="polygon-state.timer" | ||||||
|  | POLYGON_METADATA_SERVICE_FILE="polygon-metadata.service" | ||||||
|  | POLYGON_METADATA_TIMER_FILE="polygon-metadata.timer" | ||||||
| 
 | 
 | ||||||
| # XDai service file | # XDai service file | ||||||
| XDAI_SYNCHRONIZE_SERVICE="xdai-synchronize.service" | XDAI_SYNCHRONIZE_SERVICE="xdai-synchronize.service" | ||||||
|  | @ -223,3 +225,12 @@ cp "${SCRIPT_DIR}/${POLYGON_STATE_SERVICE_FILE}" "/etc/systemd/system/${POLYGON_ | ||||||
| cp "${SCRIPT_DIR}/${POLYGON_STATE_TIMER_FILE}" "/etc/systemd/system/${POLYGON_STATE_TIMER_FILE}" | cp "${SCRIPT_DIR}/${POLYGON_STATE_TIMER_FILE}" "/etc/systemd/system/${POLYGON_STATE_TIMER_FILE}" | ||||||
| systemctl daemon-reload | systemctl daemon-reload | ||||||
| systemctl restart --no-block "${POLYGON_STATE_TIMER_FILE}" | systemctl restart --no-block "${POLYGON_STATE_TIMER_FILE}" | ||||||
|  | 
 | ||||||
|  | echo | ||||||
|  | echo | ||||||
|  | echo -e "${PREFIX_INFO} Replacing existing Polygon metadata service and timer with: ${POLYGON_METADATA_SERVICE_FILE}, ${POLYGON_METADATA_TIMER_FILE}" | ||||||
|  | chmod 644 "${SCRIPT_DIR}/${POLYGON_METADATA_SERVICE_FILE}" "${SCRIPT_DIR}/${POLYGON_METADATA_TIMER_FILE}" | ||||||
|  | cp "${SCRIPT_DIR}/${POLYGON_METADATA_SERVICE_FILE}" "/etc/systemd/system/${POLYGON_METADATA_SERVICE_FILE}" | ||||||
|  | cp "${SCRIPT_DIR}/${POLYGON_METADATA_TIMER_FILE}" "/etc/systemd/system/${POLYGON_METADATA_TIMER_FILE}" | ||||||
|  | systemctl daemon-reload | ||||||
|  | systemctl restart --no-block "${POLYGON_METADATA_TIMER_FILE}" | ||||||
|  |  | ||||||
|  | @ -0,0 +1,13 @@ | ||||||
|  | [Unit] | ||||||
|  | Description=Execute metadata crawler | ||||||
|  | After=network.target | ||||||
|  | 
 | ||||||
|  | [Service] | ||||||
|  | Type=oneshot | ||||||
|  | User=ubuntu | ||||||
|  | Group=www-data | ||||||
|  | WorkingDirectory=/home/ubuntu/moonstream/crawlers/mooncrawl | ||||||
|  | EnvironmentFile=/home/ubuntu/moonstream-secrets/app.env | ||||||
|  | ExecStart=/home/ubuntu/moonstream-env/bin/python -m mooncrawl.metadata_crawler.cli crawl --blockchain polygon | ||||||
|  | CPUWeight=60 | ||||||
|  | SyslogIdentifier=polygon-metadata | ||||||
|  | @ -0,0 +1,9 @@ | ||||||
|  | [Unit] | ||||||
|  | Description=Execute Polygon metadata crawler each 10m | ||||||
|  | 
 | ||||||
|  | [Timer] | ||||||
|  | OnBootSec=10s | ||||||
|  | OnUnitActiveSec=60m | ||||||
|  | 
 | ||||||
|  | [Install] | ||||||
|  | WantedBy=timers.target | ||||||
|  | @ -49,3 +49,11 @@ class QueryDataUpdate(BaseModel): | ||||||
|     file_type: str |     file_type: str | ||||||
|     query: str |     query: str | ||||||
|     params: Dict[str, Any] = Field(default_factory=dict) |     params: Dict[str, Any] = Field(default_factory=dict) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class TokenURIs(BaseModel): | ||||||
|  |     token_id: str | ||||||
|  |     token_uri: str | ||||||
|  |     block_number: str | ||||||
|  |     block_timestamp: str | ||||||
|  |     address: str | ||||||
|  |  | ||||||
|  | @ -0,0 +1,163 @@ | ||||||
|  | import argparse | ||||||
|  | import json | ||||||
|  | from urllib.error import HTTPError | ||||||
|  | import urllib.request | ||||||
|  | import logging | ||||||
|  | from typing import Dict, Any | ||||||
|  | 
 | ||||||
|  | from moonstreamdb.blockchain import AvailableBlockchainType | ||||||
|  | from moonstreamdb.db import ( | ||||||
|  |     MOONSTREAM_DB_URI, | ||||||
|  |     MOONSTREAM_POOL_SIZE, | ||||||
|  |     create_moonstream_engine, | ||||||
|  | ) | ||||||
|  | from sqlalchemy.orm import sessionmaker | ||||||
|  | from .db import ( | ||||||
|  |     commit_session, | ||||||
|  |     get_uris_of_tokens, | ||||||
|  |     get_current_metadata_for_address, | ||||||
|  |     metadata_to_label, | ||||||
|  | ) | ||||||
|  | from ..settings import ( | ||||||
|  |     MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS, | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | logging.basicConfig(level=logging.INFO) | ||||||
|  | logger = logging.getLogger(__name__) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | batch_size = 50 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def crawl_uri(metadata_uri: str) -> Any: | ||||||
|  | 
 | ||||||
|  |     """ | ||||||
|  |     Get metadata from URI | ||||||
|  |     """ | ||||||
|  |     retry = 0 | ||||||
|  |     result = None | ||||||
|  |     while retry < 3: | ||||||
|  |         try: | ||||||
|  | 
 | ||||||
|  |             response = urllib.request.urlopen(metadata_uri, timeout=5) | ||||||
|  | 
 | ||||||
|  |             if response.status == 200: | ||||||
|  |                 result = json.loads(response.read()) | ||||||
|  |                 break | ||||||
|  |             retry += 1 | ||||||
|  | 
 | ||||||
|  |         except HTTPError as error: | ||||||
|  |             logger.error(f"request end with error statuscode: {error.code}") | ||||||
|  |             retry += 1 | ||||||
|  |             continue | ||||||
|  |         except Exception as err: | ||||||
|  |             logger.error(err) | ||||||
|  |             retry += 1 | ||||||
|  |             continue | ||||||
|  |     return result | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int): | ||||||
|  | 
 | ||||||
|  |     """ | ||||||
|  |     Parse all metadata of tokens. | ||||||
|  |     """ | ||||||
|  | 
 | ||||||
|  |     logger.info("Starting metadata crawler") | ||||||
|  |     logger.info(f"Connecting to blockchain {blockchain_type.value}") | ||||||
|  | 
 | ||||||
|  |     engine = create_moonstream_engine( | ||||||
|  |         MOONSTREAM_DB_URI, | ||||||
|  |         pool_pre_ping=True, | ||||||
|  |         pool_size=MOONSTREAM_POOL_SIZE, | ||||||
|  |         statement_timeout=MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS, | ||||||
|  |     ) | ||||||
|  |     process_session = sessionmaker(bind=engine) | ||||||
|  |     db_session = process_session() | ||||||
|  | 
 | ||||||
|  |     # run crawling of levels | ||||||
|  |     try: | ||||||
|  | 
 | ||||||
|  |         uris_of_tokens = get_uris_of_tokens(db_session, blockchain_type) | ||||||
|  | 
 | ||||||
|  |         tokens_uri_by_address: Dict[str, Any] = {} | ||||||
|  | 
 | ||||||
|  |         for token_uri_data in uris_of_tokens: | ||||||
|  |             if token_uri_data.address not in tokens_uri_by_address: | ||||||
|  |                 tokens_uri_by_address[token_uri_data.address] = [] | ||||||
|  |             tokens_uri_by_address[token_uri_data.address].append(token_uri_data) | ||||||
|  | 
 | ||||||
|  |         for address in tokens_uri_by_address: | ||||||
|  | 
 | ||||||
|  |             already_parsed = get_current_metadata_for_address( | ||||||
|  |                 db_session=db_session, blockchain_type=blockchain_type, address=address | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             for requests_chunk in [ | ||||||
|  |                 tokens_uri_by_address[address][i : i + batch_size] | ||||||
|  |                 for i in range(0, len(tokens_uri_by_address[address]), batch_size) | ||||||
|  |             ]: | ||||||
|  |                 writed_labels = 0 | ||||||
|  |                 for token_uri_data in requests_chunk: | ||||||
|  | 
 | ||||||
|  |                     if token_uri_data.token_id not in already_parsed: | ||||||
|  |                         metadata = crawl_uri(token_uri_data.token_uri) | ||||||
|  | 
 | ||||||
|  |                         db_session.add( | ||||||
|  |                             metadata_to_label( | ||||||
|  |                                 blockchain_type=blockchain_type, | ||||||
|  |                                 metadata=metadata, | ||||||
|  |                                 token_uri_data=token_uri_data, | ||||||
|  |                             ) | ||||||
|  |                         ) | ||||||
|  |                         writed_labels += 1 | ||||||
|  |                 commit_session(db_session) | ||||||
|  |                 logger.info(f"Write {writed_labels} labels for {address}") | ||||||
|  | 
 | ||||||
|  |     finally: | ||||||
|  |         db_session.close() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def handle_crawl(args: argparse.Namespace) -> None: | ||||||
|  | 
 | ||||||
|  |     """ | ||||||
|  |     Parse all metadata of tokens. | ||||||
|  |     """ | ||||||
|  | 
 | ||||||
|  |     blockchain_type = AvailableBlockchainType(args.blockchain) | ||||||
|  | 
 | ||||||
|  |     parse_metadata(blockchain_type, args.commit_batch_size) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def main() -> None: | ||||||
|  |     parser = argparse.ArgumentParser() | ||||||
|  |     parser.set_defaults(func=lambda _: parser.print_help()) | ||||||
|  | 
 | ||||||
|  |     subparsers = parser.add_subparsers() | ||||||
|  | 
 | ||||||
|  |     metadata_crawler_parser = subparsers.add_parser( | ||||||
|  |         "crawl", | ||||||
|  |         help="Crawler of tokens metadata.", | ||||||
|  |     ) | ||||||
|  |     metadata_crawler_parser.add_argument( | ||||||
|  |         "--blockchain", | ||||||
|  |         "-b", | ||||||
|  |         type=str, | ||||||
|  |         help="Type of blockchain wich writng in database", | ||||||
|  |         required=True, | ||||||
|  |     ) | ||||||
|  |     metadata_crawler_parser.add_argument( | ||||||
|  |         "--commit-batch-size", | ||||||
|  |         "-c", | ||||||
|  |         type=int, | ||||||
|  |         default=50, | ||||||
|  |         help="Amount of requests before commiting to database", | ||||||
|  |     ) | ||||||
|  |     metadata_crawler_parser.set_defaults(func=handle_crawl) | ||||||
|  | 
 | ||||||
|  |     args = parser.parse_args() | ||||||
|  |     args.func(args) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == "__main__": | ||||||
|  |     main() | ||||||
|  | @ -0,0 +1,140 @@ | ||||||
|  | import logging | ||||||
|  | import json | ||||||
|  | from typing import Dict, Any, Optional, List | ||||||
|  | 
 | ||||||
|  | from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model | ||||||
|  | from sqlalchemy.orm import Session | ||||||
|  | 
 | ||||||
|  | from ..data import TokenURIs | ||||||
|  | from ..settings import VIEW_STATE_CRAWLER_LABEL, METADATA_CRAWLER_LABEL | ||||||
|  | 
 | ||||||
|  | logging.basicConfig(level=logging.INFO) | ||||||
|  | logger = logging.getLogger(__name__) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def metadata_to_label( | ||||||
|  |     blockchain_type: AvailableBlockchainType, | ||||||
|  |     metadata: Optional[Dict[str, Any]], | ||||||
|  |     token_uri_data: TokenURIs, | ||||||
|  |     label_name=METADATA_CRAWLER_LABEL, | ||||||
|  | ): | ||||||
|  | 
 | ||||||
|  |     """ | ||||||
|  |     Creates a label model. | ||||||
|  |     """ | ||||||
|  |     label_model = get_label_model(blockchain_type) | ||||||
|  | 
 | ||||||
|  |     sanityzed_label_data = json.loads( | ||||||
|  |         json.dumps( | ||||||
|  |             { | ||||||
|  |                 "type": "metadata", | ||||||
|  |                 "token_id": token_uri_data.token_id, | ||||||
|  |                 "metadata": metadata, | ||||||
|  |             } | ||||||
|  |         ).replace(r"\u0000", "") | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     label = label_model( | ||||||
|  |         label=label_name, | ||||||
|  |         label_data=sanityzed_label_data, | ||||||
|  |         address=token_uri_data.address, | ||||||
|  |         block_number=token_uri_data.block_number, | ||||||
|  |         transaction_hash=None, | ||||||
|  |         block_timestamp=token_uri_data.block_timestamp, | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     return label | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def commit_session(db_session: Session) -> None: | ||||||
|  |     """ | ||||||
|  |     Save labels in the database. | ||||||
|  |     """ | ||||||
|  |     try: | ||||||
|  |         logger.info("Committing session to database") | ||||||
|  |         db_session.commit() | ||||||
|  |     except Exception as e: | ||||||
|  |         logger.error(f"Failed to save labels: {e}") | ||||||
|  |         db_session.rollback() | ||||||
|  |         raise e | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def get_uris_of_tokens( | ||||||
|  |     db_session: Session, blockchain_type: AvailableBlockchainType | ||||||
|  | ) -> List[TokenURIs]: | ||||||
|  | 
 | ||||||
|  |     """ | ||||||
|  |     Get meatadata URIs. | ||||||
|  |     """ | ||||||
|  | 
 | ||||||
|  |     label_model = get_label_model(blockchain_type) | ||||||
|  | 
 | ||||||
|  |     table = label_model.__tablename__ | ||||||
|  | 
 | ||||||
|  |     metadata_for_parsing = db_session.execute( | ||||||
|  |         """ SELECT | ||||||
|  |             DISTINCT ON(label_data -> 'inputs'-> 0 ) label_data -> 'inputs'-> 0 as token_id, | ||||||
|  |             label_data -> 'result' as token_uri, | ||||||
|  |             block_number as block_number, | ||||||
|  |             block_timestamp as block_timestamp, | ||||||
|  |             address as address | ||||||
|  | 
 | ||||||
|  |         FROM | ||||||
|  |             {} | ||||||
|  |         WHERE | ||||||
|  |             label = :label | ||||||
|  |             AND label_data ->> 'name' = :name | ||||||
|  |         ORDER BY | ||||||
|  |             label_data -> 'inputs'-> 0 ASC, | ||||||
|  |             block_number :: INT DESC; | ||||||
|  |     """.format( | ||||||
|  |             table | ||||||
|  |         ), | ||||||
|  |         {"table": table, "label": VIEW_STATE_CRAWLER_LABEL, "name": "tokenURI"}, | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     results = [ | ||||||
|  |         TokenURIs( | ||||||
|  |             token_id=data[0], | ||||||
|  |             token_uri=data[1], | ||||||
|  |             block_number=data[2], | ||||||
|  |             block_timestamp=data[3], | ||||||
|  |             address=data[4], | ||||||
|  |         ) | ||||||
|  |         for data in metadata_for_parsing | ||||||
|  |     ] | ||||||
|  | 
 | ||||||
|  |     return results | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def get_current_metadata_for_address( | ||||||
|  |     db_session: Session, blockchain_type: AvailableBlockchainType, address: str | ||||||
|  | ): | ||||||
|  |     """ | ||||||
|  |     Get existing metadata. | ||||||
|  |     """ | ||||||
|  | 
 | ||||||
|  |     label_model = get_label_model(blockchain_type) | ||||||
|  | 
 | ||||||
|  |     table = label_model.__tablename__ | ||||||
|  | 
 | ||||||
|  |     current_metadata = db_session.execute( | ||||||
|  |         """ SELECT | ||||||
|  |             DISTINCT ON(label_data ->> 'token_id') label_data ->> 'token_id' as token_id | ||||||
|  |         FROM | ||||||
|  |             {} | ||||||
|  |         WHERE | ||||||
|  |             address = :address | ||||||
|  |             AND label = :label | ||||||
|  |         ORDER BY | ||||||
|  |             label_data ->> 'token_id' ASC, | ||||||
|  |             block_number :: INT DESC; | ||||||
|  |     """.format( | ||||||
|  |             table | ||||||
|  |         ), | ||||||
|  |         {"address": address, "label": METADATA_CRAWLER_LABEL}, | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     result = [data[0] for data in current_metadata] | ||||||
|  | 
 | ||||||
|  |     return result | ||||||
|  | @ -43,7 +43,7 @@ DOCS_TARGET_PATH = "docs" | ||||||
| # Crawler label | # Crawler label | ||||||
| CRAWLER_LABEL = "moonworm-alpha" | CRAWLER_LABEL = "moonworm-alpha" | ||||||
| VIEW_STATE_CRAWLER_LABEL = "view-state-alpha" | VIEW_STATE_CRAWLER_LABEL = "view-state-alpha" | ||||||
| 
 | METADATA_CRAWLER_LABEL = "metadata-crawler" | ||||||
| 
 | 
 | ||||||
| MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS = 30000 | MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS = 30000 | ||||||
| MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS_RAW = os.environ.get( | MOONSTREAM_STATE_CRAWLER_DB_STATEMENT_TIMEOUT_MILLIS_RAW = os.environ.get( | ||||||
|  |  | ||||||
|  | @ -62,7 +62,8 @@ setup( | ||||||
|             "moonworm-crawler=mooncrawl.moonworm_crawler.cli:main", |             "moonworm-crawler=mooncrawl.moonworm_crawler.cli:main", | ||||||
|             "nft=mooncrawl.nft.cli:main", |             "nft=mooncrawl.nft.cli:main", | ||||||
|             "statistics=mooncrawl.stats_worker.dashboard:main", |             "statistics=mooncrawl.stats_worker.dashboard:main", | ||||||
|             "state-crawler=mooncrawl.state_crawler.cli:main" |             "state-crawler=mooncrawl.state_crawler.cli:main", | ||||||
|  |             "metadata-crawler=mooncrawl.metadata_crawler.cli:main", | ||||||
|         ] |         ] | ||||||
|     }, |     }, | ||||||
| ) | ) | ||||||
|  |  | ||||||
		Ładowanie…
	
		Reference in New Issue
	
	 Andrey Dolgolev
						Andrey Dolgolev