2022-02-16 00:57:39 +00:00
import json
import logging
2022-02-16 16:12:42 +00:00
from typing import Any , Dict , Optional
from io import StringIO
import csv
2022-02-16 00:57:39 +00:00
2022-02-16 16:12:42 +00:00
2022-03-08 15:06:50 +00:00
import boto3
from moonstreamdb . db import yield_db_read_only_session_ctx
2022-03-08 13:18:27 +00:00
from . . settings import MOONSTREAM_QUERIES_BUCKET_PREFIX
2022-02-16 00:57:39 +00:00
logging . basicConfig ( level = logging . INFO )
logger = logging . getLogger ( __name__ )
2022-02-16 16:12:42 +00:00
def push_statistics ( s3 : Any , data : Any , key : str , bucket : str ) - > None :
2022-02-16 00:57:39 +00:00
s3 . put_object (
2022-02-16 16:12:42 +00:00
Body = data ,
2022-02-16 00:57:39 +00:00
Bucket = bucket ,
Key = key ,
ContentType = " application/json " ,
Metadata = { " drone_query " : " data " } ,
)
logger . info ( f " Statistics push to bucket: s3:// { bucket } / { key } " )
2022-02-16 16:12:42 +00:00
def data_generate (
bucket : str ,
query_id : str ,
file_type : str ,
query : str ,
params : Optional [ Dict [ str , Any ] ] ,
) :
2022-02-16 00:57:39 +00:00
"""
Generate query and push it to S3
"""
2022-02-16 16:12:42 +00:00
s3 = boto3 . client ( " s3 " )
2022-03-08 15:06:50 +00:00
with yield_db_read_only_session_ctx ( ) as db_session :
2022-02-16 16:12:42 +00:00
if file_type == " csv " :
csv_buffer = StringIO ( )
csv_writer = csv . writer ( csv_buffer , delimiter = " ; " )
# engine.execution_options(stream_results=True)
result = db_session . execute ( query , params ) . keys ( )
csv_writer . writerow ( result . keys ( ) )
csv_writer . writerows ( result . fetchAll ( ) )
push_statistics (
s3 = s3 ,
data = csv_buffer . getvalue ( ) . encode ( " utf-8 " ) ,
key = f " queries/ { query_id } /data. { file_type } " ,
bucket = bucket ,
)
else :
2022-02-17 15:26:48 +00:00
block_number , block_timestamp = db_session . execute (
" SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label= ' moonworm-alpha ' ) limit 1; " ,
) . one ( )
2022-02-16 16:12:42 +00:00
data = json . dumps (
2022-02-17 15:26:48 +00:00
{
" block_number " : block_number ,
" block_timestamp " : block_timestamp ,
" data " : [ dict ( row ) for row in db_session . execute ( query , params ) ] ,
}
2022-02-16 16:12:42 +00:00
) . encode ( " utf-8 " )
push_statistics (
s3 = s3 ,
data = data ,
2022-03-08 13:18:27 +00:00
key = f " { MOONSTREAM_QUERIES_BUCKET_PREFIX } /queries/ { query_id } /data. { file_type } " ,
2022-02-16 16:12:42 +00:00
bucket = bucket ,
)