Add more complex typecheck.

Tested new created session with custom timeout..
Add reporter for stats execution.
pull/656/head
Andrey 2022-08-25 15:13:46 +03:00
rodzic d2b9aced9b
commit e6a730d760
2 zmienionych plików z 82 dodań i 41 usunięć

Wyświetl plik

@ -109,6 +109,18 @@ if MOONSTREAM_MOONWORM_TASKS_JOURNAL == "":
"MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set"
)
#queries
MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS = 30000
MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS_RAW = os.environ.get("MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS")
try:
if MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS_RAW is not None:
MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS=int(MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS_RAW)
except:
raise Exception(
f"Could not parse MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS as int: {MOONSTREAM_CRAWL_WORKERS_RAW}"
)
MOONSTREAM_S3_QUERIES_BUCKET = os.environ.get("MOONSTREAM_S3_QUERIES_BUCKET", "")
if MOONSTREAM_S3_QUERIES_BUCKET == "":

Wyświetl plik

@ -6,9 +6,11 @@ from io import StringIO
from typing import Any, Dict, Optional
import boto3 # type: ignore
from moonstreamdb.db import yield_db_read_only_session_ctx
from moonstreamdb.db import create_moonstream_engine, MOONSTREAM_DB_URI_READ_ONLY
from sqlalchemy.orm import sessionmaker
from ..reporter import reporter
from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX
from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@ -45,6 +47,17 @@ def query_validation(query: str) -> str:
return query
def to_json_types(value):
if isinstance(value, (str, int, tuple, list, dict)):
return value
elif isinstance(value, set):
return list(value)
else:
return str(value)
def data_generate(
bucket: str,
query_id: str,
@ -57,52 +70,68 @@ def data_generate(
"""
s3 = boto3.client("s3")
with yield_db_read_only_session_ctx() as db_session:
# Create session
engine = create_moonstream_engine(MOONSTREAM_DB_URI_READ_ONLY, pool_size=1, statement_timeout=MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS)
process_session = sessionmaker(bind=engine)
db_session = process_session()
try:
try:
db_session.execute("SELECT 1")
except Exception as e:
db_session.rollback()
try:
if file_type == "csv":
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=";")
if file_type == "csv":
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=";")
# engine.execution_options(stream_results=True)
result = db_session.execute(query, params).keys()
# engine.execution_options(stream_results=True)
result = db_session.execute(query, params).keys()
csv_writer.writerow(result.keys())
csv_writer.writerows(result.fetchAll())
csv_writer.writerow(result.keys())
csv_writer.writerows(result.fetchAll())
push_statistics(
s3=s3,
data=csv_buffer.getvalue().encode("utf-8"),
key=f"queries/{query_id}/data.{file_type}",
bucket=bucket,
)
else:
block_number, block_timestamp = db_session.execute(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;",
).one()
push_statistics(
s3=s3,
data=csv_buffer.getvalue().encode("utf-8"),
key=f"queries/{query_id}/data.{file_type}",
bucket=bucket,
)
else:
block_number, block_timestamp = db_session.execute(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;",
).one()
data = json.dumps(
{
"block_number": block_number,
"block_timestamp": block_timestamp,
"data": [
{
key: (
value if type(value) is any((int, str)) else str(value)
)
for key, value in dict(row).items()
}
for row in db_session.execute(query, params)
],
}
).encode("utf-8")
push_statistics(
s3=s3,
data=data,
key=f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/data.{file_type}",
bucket=bucket,
data = json.dumps(
{
"block_number": block_number,
"block_timestamp": block_timestamp,
"data": [
{
key: to_json_types(value)
for key, value in dict(row).items()
}
for row in db_session.execute(query, params)
],
}
).encode("utf-8")
push_statistics(
s3=s3,
data=data,
key=f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/data.{file_type}",
bucket=bucket,
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"queries",
"execution",
f"query_id:{query_id}"
f"file_type:{file_type}",
],
)
finally:
db_session.close()