pull/656/head
Andrey 2022-08-25 20:28:57 +03:00
rodzic af8e114515
commit b3eba087f0
1 zmienionych plików z 43 dodań i 47 usunięć

Wyświetl plik

@ -87,56 +87,52 @@ def data_generate(
db_session = process_session()
try:
try:
if file_type == "csv":
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=";")
if file_type == "csv":
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=";")
# engine.execution_options(stream_results=True)
result = db_session.execute(query, params).keys()
# engine.execution_options(stream_results=True)
result = db_session.execute(query, params).keys()
csv_writer.writerow(result.keys())
csv_writer.writerows(result.fetchAll())
csv_writer.writerow(result.keys())
csv_writer.writerows(result.fetchAll())
push_statistics(
s3=s3,
data=csv_buffer.getvalue().encode("utf-8"),
key=f"queries/{query_id}/data.{file_type}",
bucket=bucket,
)
else:
block_number, block_timestamp = db_session.execute(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;",
).one()
data = json.dumps(
{
"block_number": block_number,
"block_timestamp": block_timestamp,
"data": [
{
key: to_json_types(value)
for key, value in dict(row).items()
}
for row in db_session.execute(query, params)
],
}
).encode("utf-8")
push_statistics(
s3=s3,
data=data,
key=f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/data.{file_type}",
bucket=bucket,
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"queries",
"execution",
f"query_id:{query_id}" f"file_type:{file_type}",
],
push_statistics(
s3=s3,
data=csv_buffer.getvalue().encode("utf-8"),
key=f"queries/{query_id}/data.{file_type}",
bucket=bucket,
)
else:
block_number, block_timestamp = db_session.execute(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;",
).one()
data = json.dumps(
{
"block_number": block_number,
"block_timestamp": block_timestamp,
"data": [
{key: to_json_types(value) for key, value in dict(row).items()}
for row in db_session.execute(query, params)
],
}
).encode("utf-8")
push_statistics(
s3=s3,
data=data,
key=f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/data.{file_type}",
bucket=bucket,
)
except Exception as err:
db_session.rollback()
reporter.error_report(
err,
[
"queries",
"execution",
f"query_id:{query_id}" f"file_type:{file_type}",
],
)
finally:
db_session.close()