s3 storaging + WIP gsheets DB

pull/72/head
msramalho 2023-01-04 18:02:44 +00:00
rodzic bb512b36c9
commit 1cdc006b27
9 zmienionych plików z 110 dodań i 37 usunięć

Wyświetl plik

@ -90,11 +90,13 @@ class ConfigV2:
self.enrichers = [Enricher.init(e, self.config) for e in steps.get("enrichers", [])]
self.archivers = [Archiverv2.init(e, self.config) for e in steps.get("archivers", [])]
self.databases = [Database.init(e, self.config) for e in steps.get("databases", [])]
self.storages = [StorageV2.init(e, self.config) for e in steps.get("storages", [])]
print("feeder", self.feeder)
print("enrichers", [e for e in self.enrichers])
print("archivers", [e for e in self.archivers])
print("databases", [e for e in self.databases])
print("storages", [e for e in self.storages])
def validate(self):
pass

Wyświetl plik

@ -1,5 +1,5 @@
from typing import Union, Tuple
import gspread
import gspread, datetime
# from metadata import Metadata
from loguru import logger
@ -7,6 +7,7 @@ from loguru import logger
# from . import Enricher
from databases import Database
from metadata import Metadata
from media import Media
from steps.gsheet import Gsheets
from utils import GWorksheet
@ -48,8 +49,37 @@ class GsheetsDb(Database):
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item}")
gw, row = self._retrieve_gsheet(item)
self._safe_status_update(item, 'done')
pass
# self._safe_status_update(item, 'done')
cell_updates = []
row_values = gw.get_row(row)
def batch_if_valid(col, val, final_value=None):
final_value = final_value or val
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == '':
cell_updates.append((row, col, final_value))
cell_updates.append((row, 'status', item.status))
media: Media = item.get_single_media()
batch_if_valid('archive', media.cdn_url)
batch_if_valid('date', True, datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat())
batch_if_valid('title', item.get_title())
batch_if_valid('text', item.get("content", "")[:500])
batch_if_valid('timestamp', item.get_timestamp())
# TODO: AFTER ENRICHMENTS
# batch_if_valid('hash', media.hash)
# batch_if_valid('thumbnail', result.thumbnail, f'=IMAGE("{result.thumbnail}")')
# batch_if_valid('thumbnail_index', result.thumbnail_index)
# batch_if_valid('duration', result.duration, str(result.duration))
# batch_if_valid('screenshot', result.screenshot)
# if result.wacz is not None:
# batch_if_valid('wacz', result.wacz)
# batch_if_valid('replaywebpage', f'https://replayweb.page/?source={quote(result.wacz)}#view=pages&url={quote(url)}')
gw.batch_set_cell(cell_updates)
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:

Wyświetl plik

@ -1,4 +1,4 @@
import gspread
import gspread, os
# from metadata import Metadata
from loguru import logger
@ -8,7 +8,7 @@ from feeders import Feeder
from metadata import Metadata
from steps.gsheet import Gsheets
from utils import GWorksheet
from slugify import slugify
class GsheetsFeeder(Gsheets, Feeder):
name = "gsheet_feeder"
@ -60,7 +60,7 @@ class GsheetsFeeder(Gsheets, Feeder):
if status not in ['', None]: continue
# All checks done - archival process starts here
yield Metadata().set_url(url).set("gsheet", {"row": row, "worksheet": gw}, True)
yield Metadata().set_url(url).set("gsheet", {"row": row, "worksheet": gw}, True).set("folder", os.path.join(slugify(self.sheet), slugify(wks.title)), True)
logger.success(f'Finished worksheet {wks.title}')

Wyświetl plik

@ -10,8 +10,7 @@ import json
@dataclass
class Media:
filename: str
id: str = None
hash: str = None
key: str = None
cdn_url: str = None
hash: str = None
# id: str = None
# hash: str = None # TODO: added by enrichers

Wyświetl plik

@ -3,7 +3,7 @@ from __future__ import annotations
from ast import List, Set
from typing import Any, Union, Dict
from dataclasses import dataclass, field
from datetime import datetime
import datetime
# import json
from media import Media
@ -70,26 +70,40 @@ class Metadata:
def set_title(self, title: str) -> Metadata:
return self.set("title", title)
def set_timestamp(self, timestamp: datetime) -> Metadata:
assert type(timestamp) == datetime, "set_timestamp expects a datetime instance"
def get_title(self) -> str:
return self.get("title")
def set_timestamp(self, timestamp: datetime.datetime) -> Metadata:
assert type(timestamp) == datetime.datetime, "set_timestamp expects a datetime instance"
return self.set("timestamp", timestamp)
def get_timestamp(self, utc=True, iso=True) -> datetime.datetime:
ts = self.get("timestamp")
if not ts: return ts
if utc: ts = ts.replace(tzinfo=datetime.timezone.utc)
if iso: return ts.isoformat()
return ts
def add_media(self, media: Media) -> Metadata:
# print(f"adding {filename} to {self.metadata.get('media')}")
# return self.set("media", self.get_media() + [filename])
# return self.get_media().append(media)
return self.media.append(media)
def get_single_media(self) -> Media:
# TODO: check if formatters were applied and choose with priority
return self.media[0]
# def as_json(self) -> str:
# # converts all metadata and data into JSON
# return json.dumps(self.metadata)
# #TODO: datetime is not serializable
def cleanup(self) -> Metadata:
#TODO: refactor so it returns a JSON with all intended properties, except tmp_keys
# TODO: refactor so it returns a JSON with all intended properties, except tmp_keys
# the code below leads to errors if database needs tmp_keys after they are removed
# """removes temporary metadata fields, ideally called after all ops except writing"""
# for tmp_key in self.tmp_keys:
# self.metadata.pop(tmp_key, None)
# self.metadata.pop(tmp_key, None)
# self.tmp_keys = set()
pass

Wyświetl plik

@ -2,15 +2,18 @@ from __future__ import annotations
from ast import List
from typing import Union, Dict
from dataclasses import dataclass
from archivers.archiver import Archiverv2
from enrichers.enricher import Enricher
from databases.database import Database
from archivers import Archiverv2
from storages import StorageV2
from enrichers import Enricher
from databases import Database
from metadata import Metadata
import tempfile, time, traceback
from loguru import logger
"""
how not to couple the different pieces of logic
due to the use of constants for the metadata keys?
@ -133,11 +136,11 @@ class ArchivingOrchestrator:
self.enrichers = config.enrichers
self.archivers: List[Archiverv2] = config.archivers
self.databases: List[Database] = config.databases
self.storages: List[StorageV2] = config.storages
for a in self.archivers: a.setup()
self.formatters = []
self.storages = []
# self.formatters = [
# Formatter.init(f, config)
# for f in config.formatters
@ -184,7 +187,7 @@ class ArchivingOrchestrator:
def archive(self, result: Metadata) -> Union[Metadata, None]:
url = result.get_url()
# TODO: clean urls
# TODO: clean urls
for a in self.archivers:
url = a.clean_url(url)
result.set_url(url)
@ -240,8 +243,8 @@ class ArchivingOrchestrator:
# storage
for s in self.storages:
for m in result.media:
result.merge(s.store(m))
for i, m in enumerate(result.media):
result.media[i] = s.store(m, result)
# signal completion to databases (DBs, Google Sheets, CSV, ...)
# a hash registration service could be one database: forensic archiving

Wyświetl plik

@ -30,6 +30,7 @@ class Gsheets(Step):
'thumbnail_index': 'thumbnail index',
'timestamp': 'upload timestamp',
'title': 'upload title',
'text': 'text content',
'duration': 'duration',
'screenshot': 'screenshot',
'hash': 'hash',

Wyświetl plik

@ -1,8 +1,10 @@
from typing import IO
from typing import IO, Any
import boto3, uuid, os, mimetypes
from botocore.errorfactory import ClientError
from src.storages import StorageV2
from metadata import Metadata
from media import Media
from storages import StorageV2
from loguru import logger
from slugify import slugify
@ -14,10 +16,10 @@ class S3StorageV2(StorageV2):
super().__init__(config)
self.s3 = boto3.client(
's3',
region_name=config.region,
endpoint_url=config.endpoint_url.format(region=config.region),
aws_access_key_id=config.key,
aws_secret_access_key=config.secret
region_name=self.region,
endpoint_url=self.endpoint_url.format(region=self.region),
aws_access_key_id=self.key,
aws_secret_access_key=self.secret
)
@staticmethod
@ -37,31 +39,31 @@ class S3StorageV2(StorageV2):
"help": "S3 CDN url, {bucket}, {region} and {key} are inserted at runtime"
},
"private": {"default": False, "help": "if true S3 files will not be readable online"},
"key_path": {"default": "random", "help": "S3 file names are non-predictable strings, one of ['random', 'default']"},
# "key_path": {"default": "random", "help": "S3 file names are non-predictable strings, one of ['random', 'default']"},
}
def get_cdn_url(self, key: str) -> str:
return self.cdn_url.format(bucket=self.bucket, region=self.region, key=self._get_path(key))
def get_cdn_url(self, media: Media) -> str:
return self.cdn_url.format(bucket=self.bucket, region=self.region, key=media.key)
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> None:
def uploadf(self, file: IO[bytes], media: Media, **kwargs: dict) -> Any:
extra_args = kwargs.get("extra_args", {})
if not self.private and 'ACL' not in extra_args:
extra_args['ACL'] = 'public-read'
if 'ContentType' not in extra_args:
try:
extra_args['ContentType'] = mimetypes.guess_type(key)[0]
extra_args['ContentType'] = mimetypes.guess_type(media.key)[0]
except Exception as e:
logger.error(f"Unable to get mimetype for {key=}, error: {e}")
logger.warning(f"Unable to get mimetype for {media.key=}, error: {e}")
self.s3.upload_fileobj(file, Bucket=self.bucket, Key=self._get_path(key), ExtraArgs=extra_args)
self.s3.upload_fileobj(file, Bucket=self.bucket, Key=media.key, ExtraArgs=extra_args)
def exists(self, key: str) -> bool:
"""
Tests if a given file with key=key exists in the bucket
"""
try:
self.s3.head_object(Bucket=self.bucket, Key=self._get_path(key))
self.s3.head_object(Bucket=self.bucket, Key=key)
return True
except ClientError as e:
logger.warning(f"got a ClientError when checking if {key=} exists in bucket={self.bucket}: {e}")

Wyświetl plik

@ -1,8 +1,12 @@
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from typing import IO, Any
from media import Media
from metadata import Metadata
from steps.step import Step
from loguru import logger
import os, uuid
@dataclass
@ -17,5 +21,23 @@ class StorageV2(Step):
def init(name: str, config: dict) -> StorageV2:
return Step.init(name, config, StorageV2)
def store(self, media: Media, item: Metadata) -> Media:
media = self.set_key(media, item)
self.upload(media)
media.cdn_url = self.get_cdn_url(media)
return media
@abstractmethod
def store(self, item: Metadata) -> Metadata: pass
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> Any: pass
def upload(self, media: Media, **kwargs) -> Any:
logger.debug(f'[{self.__class__.name}] uploading file {media.filename} with key {media.key}')
with open(media.filename, 'rb') as f:
return self.uploadf(f, media, **kwargs)
def set_key(self, media: Media, item: Metadata) -> Media:
"""takes the media and optionally item info and generates a key"""
folder = item.get("folder", "")
ext = os.path.splitext(media.filename)[1]
media.key = os.path.join(folder, f"{str(uuid.uuid4())}{ext}")
return media