Switch to urlopen lib.

pull/660/head
Andrey 2022-09-01 16:04:07 +03:00
rodzic aaa6c95953
commit 42ebb66e4d
2 zmienionych plików z 54 dodań i 55 usunięć

Wyświetl plik

@ -1,14 +1,9 @@
import argparse import argparse
import json import json
import hashlib from urllib.error import HTTPError
import itertools import urllib.request
from pickle import TRUE
from pprint import pprint
import logging import logging
from random import random from typing import Dict, Any
import requests
from typing import Dict, List, Any
from uuid import UUID
from moonstreamdb.blockchain import AvailableBlockchainType from moonstreamdb.blockchain import AvailableBlockchainType
from moonstreamdb.db import ( from moonstreamdb.db import (
@ -31,6 +26,9 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
batch_size = 50
def crawl_uri(metadata_uri: str) -> Dict[str, Any]: def crawl_uri(metadata_uri: str) -> Dict[str, Any]:
""" """
@ -40,19 +38,26 @@ def crawl_uri(metadata_uri: str) -> Dict[str, Any]:
result = None result = None
while retry < 3: while retry < 3:
try: try:
metadata = requests.get(metadata_uri)
if metadata.status_code == 200: response = urllib.request.urlopen(metadata_uri, timeout=5)
result = metadata.data
if response.status == 200:
result = json.loads(response.read())
break break
retry += 1 retry += 1
except HTTPError as error:
logger.error(f"request end with error statuscode: {error.code}")
retry += 1
continue
except Exception as err: except Exception as err:
print(err) logger.error(err)
retry += 1 retry += 1
continue continue
return result return result
def parse_metadata(jobs, blockchain_type, block_number): def parse_metadata(blockchain_type: AvailableBlockchainType, batch_size: int):
engine = create_moonstream_engine( engine = create_moonstream_engine(
MOONSTREAM_DB_URI_READ_ONLY, MOONSTREAM_DB_URI_READ_ONLY,
@ -81,19 +86,24 @@ def parse_metadata(jobs, blockchain_type, block_number):
db_session=db_session, blockchain_type=blockchain_type, address=address db_session=db_session, blockchain_type=blockchain_type, address=address
) )
for token_uri_data in tokens_uri_by_address[address]: for requests_chunk in [
tokens_uri_by_address[address][i : i + batch_size]
for i in range(0, len(tokens_uri_by_address[address]), batch_size)
]:
if token_uri_data.token_id not in already_parsed: for token_uri_data in requests_chunk:
metadata = crawl_uri(token_uri_data)
db_session.add( if token_uri_data.token_id not in already_parsed:
metadata_to_label( metadata = crawl_uri(token_uri_data.token_uri)
blockchain_type=blockchain_type,
metadata=metadata, db_session.add(
token_uri_data=token_uri_data, metadata_to_label(
blockchain_type=blockchain_type,
metadata=metadata,
token_uri_data=token_uri_data,
)
) )
) commit_session(db_session)
commit_session(db_session)
finally: finally:
db_session.close() db_session.close()
@ -107,26 +117,7 @@ def handle_crawl(args: argparse.Namespace) -> None:
blockchain_type = AvailableBlockchainType(args.blockchain_type) blockchain_type = AvailableBlockchainType(args.blockchain_type)
parse_metadata(blockchain_type) parse_metadata(blockchain_type, args.batch_size)
def parse_abi(args: argparse.Namespace) -> None:
"""
Parse the abi of the contract and save it to the database.
"""
with open(args.abi_file, "r") as f:
# read json and parse only stateMutability equal to view
abi = json.load(f)
output_json = []
for method in abi:
if method.get("stateMutability") and method["stateMutability"] == "view":
output_json.append(method)
with open(f"view+{args.abi_file}", "w") as f:
json.dump(output_json, f)
def main() -> None: def main() -> None:
@ -137,15 +128,22 @@ def main() -> None:
metadata_crawler_parser = subparsers.add_parser( metadata_crawler_parser = subparsers.add_parser(
"crawl", "crawl",
help="continuous crawling the event/function call jobs from bugout journal", help="Crawler of tokens metadata.",
) )
metadata_crawler_parser.add_argument( metadata_crawler_parser.add_argument(
"--blockchain-type", "--blockchain-type",
"-b", "-b",
type=str, type=str,
help="Type of blovkchain wich writng in database", help="Type of blockchain wich writng in database",
required=True, required=True,
) )
metadata_crawler_parser.add_argument(
"--commit-batch-size",
"-c",
type=int,
default=50,
help="Amount of requests before commiting to database",
)
metadata_crawler_parser.set_defaults(func=handle_crawl) metadata_crawler_parser.set_defaults(func=handle_crawl)
args = parser.parse_args() args = parser.parse_args()

Wyświetl plik

@ -32,10 +32,10 @@ def metadata_to_label(
"token_id": token_uri_data.token_id, "token_id": token_uri_data.token_id,
"metadata": metadata, "metadata": metadata,
}, },
address=token_uri_data["address"], address=token_uri_data.address,
block_number=token_uri_data["block_number"], block_number=token_uri_data.block_number,
transaction_hash=None, transaction_hash=None,
block_timestamp=token_uri_data["block_timestamp"], block_timestamp=token_uri_data.block_timestamp,
) )
return label return label
@ -61,24 +61,24 @@ def get_uris_of_tokens(
""" """
Get meatadata URIs. Get meatadata URIs.
""" """
metadata_for_parsing = db_session.query( metadata_for_parsing = db_session.execute(
""" SELECT """ SELECT
DISTINCT ON(label_data -> 'inputs'-> 0 ) label_data -> 'inputs'-> 0 as token_id, DISTINCT ON(label_data -> 'inputs'-> 0 ) label_data -> 'inputs'-> 0 as token_id,
label_data -> 'result' as token_uri, label_data -> 'result' as token_uri,
block_number as block_number, block_number as block_number,
block_timestamp as block_timestamp, block_timestamp as block_timestamp,
address as address, address as address
FROM FROM
polygon_labels polygon_labels
WHERE WHERE
AND label = 'view-state-alpha' label = 'view-state-alpha'
AND label_data ->> 'name' = 'tokenURI' AND label_data ->> 'name' = 'tokenURI'
ORDER BY ORDER BY
label_data -> 'inputs'-> 0 ASC, label_data -> 'inputs'-> 0 ASC,
block_number :: INT DESC; block_number :: INT DESC;
""" """
).execute() )
results = [ results = [
TokenURIs( TokenURIs(
@ -101,7 +101,7 @@ def get_current_metadata_for_address(
""" """
Get existing metadata. Get existing metadata.
""" """
current_metadata = db_session.query( current_metadata = db_session.execute(
""" SELECT """ SELECT
DISTINCT ON(label_data ->> 'token_id') label_data ->> 'token_id' as token_id DISTINCT ON(label_data ->> 'token_id') label_data ->> 'token_id' as token_id
FROM FROM
@ -112,8 +112,9 @@ def get_current_metadata_for_address(
ORDER BY ORDER BY
label_data ->> 'token_id' ASC, label_data ->> 'token_id' ASC,
block_number :: INT DESC; block_number :: INT DESC;
""" """,
).execute() {"address": address},
)
result = [data[0] for data in current_metadata] result = [data[0] for data in current_metadata]