pull/33/head
msramalho 2022-06-07 18:41:58 +02:00
rodzic d46b8e1157
commit f87acb6d1d
13 zmienionych plików z 187 dodań i 157 usunięć

3
.gitignore vendored
Wyświetl plik

@ -12,4 +12,5 @@ anu.html
anon*
config.json
config-*.json
logs/*
logs/*
local_archive/

Wyświetl plik

@ -35,6 +35,9 @@ class Archiver(ABC):
def __str__(self):
return self.__class__.__name__
def __repr__(self):
return self.__str__()
@abstractmethod
def download(self, url, check_if_exists=False): pass
@ -134,6 +137,7 @@ class Archiver(ABC):
return hash.hexdigest()
def get_screenshot(self, url):
logger.debug(f"getting screenshot for {url=}")
key = self.get_key(urlparse(url).path.replace(
"/", "_") + datetime.datetime.utcnow().isoformat().replace(" ", "_") + ".png")
filename = Storage.TMP_FOLDER + key

Wyświetl plik

@ -18,8 +18,8 @@ class TiktokArchiver(Archiver):
try:
info = tiktok_downloader.info_post(url)
key = self.get_key(f'{info.id}.mp4')
cdn_url = self.storage.get_cdn_url(key)
filename = Storage.TMP_FOLDER + key
logger.info(f'found video {key=}')
if check_if_exists and self.storage.exists(key):
status = 'already archived'
@ -28,13 +28,15 @@ class TiktokArchiver(Archiver):
if len(media) <= 0:
if status == 'already archived':
return ArchiveResult(status='Could not download media, but already archived', cdn_url=cdn_url)
return ArchiveResult(status='Could not download media, but already archived', cdn_url=self.storage.get_cdn_url(key))
else:
return ArchiveResult(status='Could not download media')
logger.info(f'downloading video {key=}')
media[0].download(filename)
if status != 'already archived':
logger.info(f'uploading video {key=}')
self.storage.upload(filename, key)
try:
@ -50,6 +52,7 @@ class TiktokArchiver(Archiver):
try: os.remove(filename)
except FileNotFoundError:
logger.info(f'tmp file not found thus not deleted {filename}')
cdn_url = self.storage.get_cdn_url(key)
return ArchiveResult(status=status, cdn_url=cdn_url, thumbnail=key_thumb,
thumbnail_index=thumb_index, duration=info.duration, title=info.caption, timestamp=info.create.isoformat(),

Wyświetl plik

@ -8,26 +8,31 @@ from .base_archiver import Archiver, ArchiveResult
from configs import WaybackConfig
class WaybackArchiver(Archiver):
name = "wayback"
def __init__(self, storage: Storage, driver, config: WaybackConfig):
super(WaybackArchiver, self).__init__(storage, driver)
self.config = config
# TODO: this logic should live at the auto-archiver level
self.seen_urls = {}
def download(self, url, check_if_exists=False):
if check_if_exists and url in self.seen_urls:
return self.seen_urls[url]
if check_if_exists:
if url in self.seen_urls: return self.seen_urls[url]
logger.debug(f"checking if {url=} already on archive.org")
archive_url = f"https://web.archive.org/web/{url}"
req = requests.get(archive_url)
if req.status_code == 200:
return self.if_archived_return_with_screenshot(url, archive_url, req=req, status='already archived')
logger.debug(f"POSTing {url=} to web.archive.org")
ia_headers = {
"Accept": "application/json",
"Authorization": f"LOW {self.config.key}:{self.config.secret}"
}
r = requests.post(
'https://web.archive.org/save/', headers=ia_headers, data={'url': url})
r = requests.post('https://web.archive.org/save/', headers=ia_headers, data={'url': url})
if r.status_code != 200:
logger.warning(f"Internet archive failed with status of {r.status_code}")
@ -38,47 +43,41 @@ class WaybackArchiver(Archiver):
return ArchiveResult(status=f"Internet archive failed: {r.json()['message']}")
job_id = r.json()['job_id']
status_r = requests.get('https://web.archive.org/save/status/' + job_id, headers=ia_headers)
logger.debug(f"GETting status for {job_id=} on {url=}")
status_r = requests.get(f'https://web.archive.org/save/status/{job_id}', headers=ia_headers)
retries = 0
# TODO: make the job queue parallel -> consider propagation of results back to sheet though
# wait 90-120 seconds for the archive job to finish
while (status_r.status_code != 200 or status_r.json()['status'] == 'pending') and retries < 30:
time.sleep(3)
try:
status_r = requests.get(
'https://web.archive.org/save/status/' + job_id, headers=ia_headers)
logger.debug(f"GETting status for {job_id=} on {url=} [{retries=}]")
status_r = requests.get(f'https://web.archive.org/save/status/{job_id}', headers=ia_headers)
except:
time.sleep(1)
retries += 1
if status_r.status_code != 200:
return ArchiveResult(status="Internet archive failed")
status_json = status_r.json()
if status_json['status'] != 'success':
return ArchiveResult(status='Internet Archive failed: ' + str(status_json))
archive_url = 'https://web.archive.org/web/' + \
status_json['timestamp'] + '/' + status_json['original_url']
archive_url = f"https://web.archive.org/web/{status_json['timestamp']}/{status_json['original_url']}"
return self.if_archived_return_with_screenshot(archive_url)
def if_archived_return_with_screenshot(self, url, archive_url, req=None, status='success'):
try:
r = requests.get(archive_url)
parsed = BeautifulSoup(r.content, 'html.parser')
if req is None:
req = requests.get(archive_url)
parsed = BeautifulSoup(req.content, 'html.parser')
title = parsed.find_all('title')[0].text
if title == 'Wayback Machine':
title = 'Could not get title'
except:
title = "Could not get title"
screenshot = self.get_screenshot(url)
result = ArchiveResult(status='success', cdn_url=archive_url, title=title, screenshot=screenshot)
self.seen_urls[url] = result
return result
self.seen_urls[url] = ArchiveResult(status=status, cdn_url=archive_url, title=title, screenshot=screenshot)
return self.seen_urls[url]

Wyświetl plik

@ -1,11 +1,12 @@
import datetime
import shutil
import traceback
import os, datetime, shutil, traceback
from loguru import logger
from slugify import slugify
from archivers import TelethonArchiver, TelegramArchiver, TiktokArchiver, YoutubeDLArchiver, TwitterArchiver, WaybackArchiver, ArchiveResult
from utils import GWorksheet, mkdir_if_not_exists, expand_url
from configs import Config
from storages import Storage
def update_sheet(gw, row, result: ArchiveResult):
@ -42,12 +43,12 @@ def update_sheet(gw, row, result: ArchiveResult):
def missing_required_columns(gw: GWorksheet):
required_found = True
missing = False
for required_col in ['url', 'status']:
if not gw.col_exists(required_col):
logger.warning(f'Required column for {required_col}: "{gw.columns[required_col]}" not found, skipping worksheet {gw.worksheet.title}')
required_found = False
return required_found
logger.warning(f'Required column for {required_col}: "{gw.columns[required_col]}" not found, skipping worksheet {gw.wks.title}')
missing = True
return missing
def process_sheet(c: Config):
@ -60,9 +61,9 @@ def process_sheet(c: Config):
if missing_required_columns(gw): continue
# archives will be in a folder 'doc_name/worksheet_name'
# TODO: use slugify lib
c.set_folder(f'{c.sheet.replace(" ", "_")}/{wks.title.replace(" ", "_")}/')
# archives will default to being in a folder 'doc_name/worksheet_name'
default_folder = os.path.join(slugify(c.sheet), slugify(wks.title))
c.set_folder(default_folder)
storage = c.get_storage()
# loop through rows in worksheet
@ -76,7 +77,7 @@ def process_sheet(c: Config):
# All checks done - archival process starts here
gw.set_cell(row, 'status', 'Archive in progress')
url = expand_url(url)
storage.update_properties(subfolder=gw.get_cell_or_default(row, 'subfolder'))
c.set_folder(gw.get_cell_or_default(row, 'folder', default_folder, when_empty_use_default=True))
# make a new driver so each spreadsheet row is idempotent
c.recreate_webdriver()
@ -92,26 +93,27 @@ def process_sheet(c: Config):
]
for archiver in active_archivers:
logger.debug(f'Trying {archiver=} on {row=}')
logger.debug(f'Trying {archiver} on {row=}')
try:
result = archiver.download(url, check_if_exists=True)
except KeyboardInterrupt:
# catches keyboard interruptions to do a clean exit
logger.warning(f"caught interrupt for {archiver=} on {row=}")
logger.warning(f"caught interrupt for {archiver} on {row=}")
gw.set_cell(row, 'status', '')
c.destroy_webdriver()
exit()
except Exception as e:
result = False
logger.error(f'Got unexpected error in row {row} with {archiver=} for {url=}: {e}\n{traceback.format_exc()}')
logger.error(f'Got unexpected error in row {row} with {archiver.name} for {url=}: {e}\n{traceback.format_exc()}')
if result:
success = result.status in ['success', 'already archived']
result.status = f"{archiver.name}: {result.status}"
if result.status in ['success', 'already archived']:
logger.success(f'{archiver=} succeeded on {row=}, {url=}')
if success:
logger.success(f'{archiver.name} succeeded on {row=}, {url=}')
break
logger.warning(f'{archiver} did not succeed on {row=}, final status: {result.status}')
logger.warning(f'{archiver.name} did not succeed on {row=}, final status: {result.status}')
if result:
update_sheet(gw, row, result)
@ -125,10 +127,10 @@ def main():
c = Config()
c.parse()
logger.info(f'Opening document {c.sheet} for header {c.header}')
mkdir_if_not_exists(c.tmp_folder)
mkdir_if_not_exists(Storage.TMP_FOLDER)
process_sheet(c)
c.destroy_webdriver()
shutil.rmtree(c.tmp_folder)
shutil.rmtree(Storage.TMP_FOLDER)
if __name__ == '__main__':

Wyświetl plik

@ -3,12 +3,12 @@ import argparse, json
import gspread
from loguru import logger
from selenium import webdriver
from dataclasses import dataclass
from dataclasses import dataclass, asdict
from utils.gworksheet import GWorksheet
from utils import GWorksheet, getattr_or
from .wayback_config import WaybackConfig
from .telethon_config import TelethonConfig
from storages import Storage, S3Config, S3Storage, GDStorage, GDConfig, LocalStorage
from storages import Storage, S3Config, S3Storage, GDStorage, GDConfig, LocalStorage, LocalConfig
@dataclass
@ -39,6 +39,7 @@ class Config:
self.set_log_files()
def set_log_files(self):
# TODO: isolate to config
logger.add("logs/1trace.log", level="TRACE")
logger.add("logs/2info.log", level="INFO")
logger.add("logs/3success.log", level="SUCCESS")
@ -59,21 +60,18 @@ class Config:
# ----------------------EXECUTION - execution configurations
execution = self.config.get("execution", {})
self.sheet = getattr(self.args, "sheet", execution.get("sheet"))
self.sheet = getattr_or(self.args, "sheet", execution.get("sheet"))
assert self.sheet is not None, "'sheet' must be provided either through command line or configuration file"
self.header = int(getattr(self.args, "header", execution.get("header", 1)))
self.header = int(getattr_or(self.args, "header", execution.get("header", 1)))
Storage.TMP_FOLDER = execution.get("tmp_folder", Storage.TMP_FOLDER)
self.storage = getattr(self.args, "storage", execution.get("storage", "s3"))
for key, name in [("s3", "s3"), ("gd", "google_drive")]:
assert self.storage != key or name in secrets, f"selected storage '{key}' requires secrets.'{name}' in {self.config_file}"
self.storage = getattr_or(self.args, "storage", execution.get("storage", "s3"))
# Column names come from config and can be overwritten by CMD
# in the end all are considered as lower case
config_column_names = execution.get("column_names", {})
self.column_names = {}
for k in GWorksheet.COLUMN_NAMES.keys():
self.column_names[k] = getattr(self.args, k, config_column_names.get(k, GWorksheet.COLUMN_NAMES[k])).lower()
self.column_names[k] = getattr_or(self.args, k, config_column_names.get(k, GWorksheet.COLUMN_NAMES[k])).lower()
# selenium driver
selenium_configs = execution.get("selenium", {})
@ -87,6 +85,10 @@ class Config:
# ---------------------- SECRETS - APIs and service configurations
secrets = self.config.get("secrets", {})
# assert selected storage credentials exist
for key, name in [("s3", "s3"), ("gd", "google_drive"), ("local", "local")]:
assert self.storage != key or name in secrets, f"selected storage '{key}' requires secrets.'{name}' in {self.config_file}"
# google sheets config
self.gsheets_client = gspread.service_account(
filename=secrets.get("google_sheets", {}).get("service_account", 'service_account.json')
@ -106,8 +108,7 @@ class Config:
endpoint_url=s3.get("endpoint_url", S3Config.endpoint_url),
cdn_url=s3.get("cdn_url", S3Config.cdn_url),
key_path=s3.get("key_path", S3Config.key_path),
private=getattr(self.args, "s3-private", s3.get("private", S3Config.private)),
no_folder=s3.get("no_folder", S3Config.no_folder),
private=getattr_or(self.args, "s3-private", s3.get("private", S3Config.private))
)
# GDrive config
@ -115,8 +116,12 @@ class Config:
gd = secrets["google_drive"]
self.gd_config = GDConfig(
root_folder_id=gd.get("root_folder_id"),
default_folder=gd.get("default_folder", GDConfig.default_folder),
service_account=gd.get("service_account", GDConfig.service_account),
service_account=gd.get("service_account", GDConfig.service_account)
)
if "local" in secrets:
self.local_config = LocalConfig(
save_to=secrets["local"].get("save_to", LocalConfig.save_to),
)
# wayback machine config
@ -153,30 +158,40 @@ class Config:
for k, v in GWorksheet.COLUMN_NAMES.items():
help = f"the name of the column to FILL WITH {k} (default='{v}')"
if k in ["url", "subfolder"]:
if k in ["url", "folder"]:
help = f"the name of the column to READ {k} FROM (default='{v}')"
parser.add_argument(f'--col-{k}', action='store', dest=k, help=help)
return parser
def set_folder(self, folder):
# update the folder in each of the storages
"""
update the folder in each of the storages
"""
self.folder = folder
if self.s3_config:
self.s3_config.folder = folder
if self.gd_config:
self.gd_config.default_folder = folder
# s3
if hasattr(self, "s3_config"): self.s3_config.folder = folder
if hasattr(self, "s3_storage"): self.s3_storage.folder = folder
# gdrive
if hasattr(self, "gd_config"): self.gd_config.folder = folder
if hasattr(self, "gd_storage"): self.gd_storage.folder = folder
# local
if hasattr(self, "local_config"): self.local_config.folder = folder
if hasattr(self, "local_storage"): self.local_storage.folder = folder
def get_storage(self):
"""
creates and returns the configured type of storage
returns the configured type of storage, creating if needed
"""
if self.storage == "s3":
return S3Storage(self.s3_config)
self.s3_storage = getattr_or(self, "s3_storage", S3Storage(self.s3_config))
return self.s3_storage
elif self.storage == "gd":
return GDStorage(self.gd_config)
self.gd_storage = getattr_or(self, "gd_storage", GDStorage(self.gd_config))
return self.gd_storage
elif self.storage == "local":
return LocalStorage(self.folder)
self.local_storage = getattr_or(self, "local_storage", LocalStorage(self.local_config))
return self.local_storage
raise f"storage {self.storage} not implemented, available: {Config.AVAILABLE_STORAGES}"
def destroy_webdriver(self):
@ -197,12 +212,13 @@ class Config:
return json.dumps({
"config_file": self.config_file,
"sheet": self.sheet,
"storage": self.storage,
"header": self.header,
"tmp_folder": Storage.TMP_FOLDER,
"selenium_config": self.selenium_config,
"selenium_config": asdict(self.selenium_config),
"selenium_webdriver": self.webdriver != None,
"s3_config": self.s3_config != None,
"s3_private": getattr(self.s3_config, "private", None),
"s3_private": getattr_or(self.s3_config, "private", None),
"wayback_config": self.wayback_config != None,
"telegram_config": self.telegram_config != None,
"gsheets_client": self.gsheets_client != None,

Wyświetl plik

@ -1,5 +1,5 @@
# we need to explicitly expose the available imports here
from .base_storage import Storage
from .local_storage import LocalStorage
from .local_storage import LocalStorage, LocalConfig
from .s3_storage import S3Config, S3Storage
from .gd_storage import GDConfig, GDStorage

Wyświetl plik

@ -23,23 +23,7 @@ class Storage(ABC):
with open(filename, 'rb') as f:
self.uploadf(f, key, **kwargs)
def update_properties(self, **kwargs):
"""
method used to update general properties that some children may use
and others not, but that all can call
"""
for k, v in kwargs.items():
if k in self._get_allowed_properties():
setattr(self, k, v)
else:
logger.warning(f'[{self.__class__.__name__}] does not accept dynamic property "{k}"')
def _get_allowed_properties(self):
"""
child classes should specify which properties they allow to be set
"""
return set(["subfolder"])
#TODO: is this really necessary if only use os.path operations
def _clean_path(self, folder, default="", add_forward_slash=True):
if folder is None or type(folder) != str or len(folder.strip()) == 0:
return default

Wyświetl plik

@ -1,24 +1,23 @@
import os, time
from loguru import logger
from .base_storage import Storage
from dataclasses import dataclass
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2 import service_account
import time
@dataclass
class GDConfig:
root_folder_id: str
default_folder: str = "default"
folder: str = "default"
service_account: str = "service_account.json"
class GDStorage(Storage):
def __init__(self, config: GDConfig):
self.default_folder = config.default_folder
self.folder = config.folder
self.root_folder_id = config.root_folder_id
creds = service_account.Credentials.from_service_account_file(
config.service_account, scopes=['https://www.googleapis.com/auth/drive'])
@ -29,77 +28,73 @@ class GDStorage(Storage):
only support files saved in a folder for GD
S3 supports folder and all stored in the root
"""
self.subfolder = self._clean_path(self.subfolder, self.default_folder, False)
filename = key
logger.debug(f'Looking for {self.subfolder} and filename: {filename} on GD')
folder_id = self._get_id_from_parent_and_name(self.root_folder_id, self.subfolder, 5, 10)
# check for sub folder in file youtube_dl_abcde/index.html, needed for thumbnails
# a='youtube_dl_abcde', b='index.html'
a, _, b = filename.partition('/')
if b != '':
logger.debug(f'get_cdn_url: Found a subfolder so need to split on: {a=} and {b=}')
folder_id = self._get_id_from_parent_and_name(folder_id, a, use_mime_type=True)
filename = b
full_name = os.path.join(self.folder, key)
parent_id, folder_id = self.root_folder_id, None
path_parts = full_name.split(os.path.sep)
filename = path_parts[-1]
logger.info(f"looking for folders for {path_parts=} before uploading {filename=}")
for folder in path_parts[0:-1]:
folder_id = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=True)
parent_id = folder_id
# get id of file inside folder (or sub folder)
file_id = self._get_id_from_parent_and_name(folder_id, filename)
return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"
def exists(self, _key):
# TODO: How to check for google drive, as it accepts different names?
return False
def exists(self, key):
try:
self.get_cdn_url(key)
return True
except: return False
def uploadf(self, file, key, **_kwargs):
def uploadf(self, file: str, key: str, **_kwargs):
"""
1. check if subfolder exists or create it
2. check if key contains sub-subfolder, check if exists or create it
3. upload file to root_id/subfolder[/sub-subfolder]/filename
1. for each sub-folder in the path check if exists or create
2. upload file to root_id/other_paths.../filename
"""
self.subfolder = self._clean_path(self.subfolder, GDStorage.DEFAULT_UPLOAD_FOLDER_NAME, False)
filename = key
# get id of subfolder or create if it does not exist
folder_id_to_upload_to = self._get_id_from_parent_and_name(self.root_folder_id, self.subfolder, use_mime_type=True, raise_on_missing=False)
if folder_id_to_upload_to is None:
folder_id_to_upload_to = self._mkdir(self.subfolder, self.root_folder_id)
# check for sub folder in file youtube_dl_abcde/index.html, needed for thumbnails
# a='youtube_dl_abcde', b='index.html'
a, _, b = filename.partition('/')
if b != '':
logger.debug(f'uploadf: Found a subfolder so need to split on: {a=} and {b=}')
# get id of subfolder or create if it does not exist
sub_folder_id_to_upload_to = self._get_id_from_parent_and_name(folder_id_to_upload_to, a, use_mime_type=True, raise_on_missing=False)
if sub_folder_id_to_upload_to is None:
sub_folder_id_to_upload_to = self._mkdir(a, folder_id_to_upload_to)
filename = b
folder_id_to_upload_to = sub_folder_id_to_upload_to
full_name = os.path.join(self.folder, key)
parent_id, upload_to = self.root_folder_id, None
path_parts = full_name.split(os.path.sep)
filename = path_parts[-1]
logger.info(f"checking folders {path_parts[0:-1]} exist (or creating) before uploading {filename=}")
for folder in path_parts[0:-1]:
upload_to = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=False)
if upload_to is None:
upload_to = self._mkdir(folder, parent_id)
parent_id = upload_to
# upload file to gd
logger.debug(f'uploading {filename=} to folder id {upload_to}')
file_metadata = {
'name': [filename],
'parents': [folder_id_to_upload_to]
'parents': [upload_to]
}
media = MediaFileUpload(file, resumable=True)
gd_file = self.service.files().create(body=file_metadata, media_body=media, fields='id').execute()
logger.debug(f'uploadf: uploaded file {gd_file["id"]} succesfully in folder={folder_id_to_upload_to}')
logger.debug(f'uploadf: uploaded file {gd_file["id"]} succesfully in folder={upload_to}')
def upload(self, filename: str, key: str, **kwargs):
# GD only requires the filename not a file reader
logger.debug(f'[{self.__class__.__name__}] uploading file {filename} with key {key}')
self.uploadf(filename, key, **kwargs)
def _get_id_from_parent_and_name(self, parent_id: str, name: str, retries: int = 1, sleep_seconds: int = 10, use_mime_type: bool = False, raise_on_missing: bool = True):
def _get_id_from_parent_and_name(self, parent_id: str, name: str, retries: int = 1, sleep_seconds: int = 10, use_mime_type: bool = False, raise_on_missing: bool = True, use_cache=True):
"""
Retrieves the id of a folder or file from its @name and the @parent_id folder
Optionally does multiple @retries and sleeps @sleep_seconds between them
If @use_mime_type will restrict search to "mimeType='application/vnd.google-apps.folder'"
If @raise_on_missing will throw error when not found, or returns None
Will remember previous calls to avoid duplication if @use_cache
Returns the id of the file or folder from its name as a string
"""
# cache logic
if use_cache:
self.api_cache = getattr(self, "api_cache", {})
cache_key = f"{parent_id}_{name}_{use_mime_type}"
if cache_key in self.api_cache:
logger.debug(f"cache hit for {cache_key=}")
return self.api_cache[cache_key]
# API logic
debug_header: str = f"[searching {name=} in {parent_id=}]"
query_string = f"'{parent_id}' in parents and name = '{name}' "
if use_mime_type:
@ -115,10 +110,14 @@ class GDStorage(Storage):
if len(items) > 0:
logger.debug(f"{debug_header} found {len(items)} matches, returning last of {','.join([i['id'] for i in items])}")
return items[-1]['id']
_id = items[-1]['id']
if use_cache: self.api_cache[cache_key] = _id
return _id
else:
logger.debug(f'{debug_header} not found, attempt {attempt+1}/{retries}. sleeping for {sleep_seconds} second(s)')
if attempt < retries - 1: time.sleep(sleep_seconds)
logger.debug(f'{debug_header} not found, attempt {attempt+1}/{retries}.')
if attempt < retries - 1:
logger.debug(f'sleeping for {sleep_seconds} second(s)')
time.sleep(sleep_seconds)
if raise_on_missing:
raise ValueError(f'{debug_header} not found after {retries} attempt(s)')
@ -129,7 +128,7 @@ class GDStorage(Storage):
Creates a new GDrive folder @name inside folder @parent_id
Returns id of the created folder
"""
logger.debug(f'[_mkdir] Creating new folder with {name=} inside {parent_id=}')
logger.debug(f'Creating new folder with {name=} inside {parent_id=}')
file_metadata = {
'name': [name],
'mimeType': 'application/vnd.google-apps.folder',

Wyświetl plik

@ -1,13 +1,26 @@
import os
from .base_storage import Storage
from dataclasses import dataclass
from .base_storage import Storage
from utils import mkdir_if_not_exists
@dataclass
class LocalConfig:
folder: str = ""
save_to: str = "./"
class LocalStorage(Storage):
def __init__(self, folder):
self.folder = self._clean_path(folder)
def __init__(self, config:LocalConfig):
self.folder = self._clean_path(config.folder)
self.save_to = self._clean_path(config.save_to)
mkdir_if_not_exists(self.save_to)
def get_cdn_url(self, key):
return self.folder + self._clean_path(self.subfolder) + key
full_path = os.path.join(self.save_to, self.folder, key)
mkdir_if_not_exists(os.path.join(*full_path.split(os.path.sep)[0:-1]))
return os.path.abspath(full_path)
def exists(self, key):
return os.path.isfile(self.get_cdn_url(key))

Wyświetl plik

@ -20,8 +20,6 @@ class S3Config:
cdn_url: str = "https://{bucket}.{region}.cdn.digitaloceanspaces.com/{key}"
private: bool = False
key_path: str = "default" # 'default' uses full naming, 'random' uses generated uuid
no_folder: bool = False # when true folders are not used for url path
class S3Storage(Storage):
@ -54,7 +52,7 @@ class S3Storage(Storage):
ext = os.path.splitext(key)[1]
self.key_dict[key] = f"{str(uuid.uuid4())}{ext}"
final_key = self.key_dict[key]
return self.folder + self._clean_path(self.subfolder) + final_key
return os.path.join(self.folder, final_key)
def get_cdn_url(self, key):
return self.cdn_url.format(bucket=self.bucket, region=self.region, key=self._get_path(key))

Wyświetl plik

@ -10,10 +10,10 @@ class GWorksheet:
"""
COLUMN_NAMES = {
'url': 'link',
'subfolder': 'sub folder',
'status': 'archive status',
'folder': 'destination folder',
'archive': 'archive location',
'date': 'archive date',
'status': 'archive status',
'thumbnail': 'thumbnail',
'thumbnail_index': 'thumbnail index',
'timestamp': 'upload timestamp',
@ -72,12 +72,15 @@ class GWorksheet:
return ''
return row[col_index]
def get_cell_or_default(self, row, col: str, default: str = None, fresh=False):
def get_cell_or_default(self, row, col: str, default: str = None, fresh=False, when_empty_use_default=True):
"""
return self.get_cell or default value on error (eg: column is missing)
"""
try:
return self.get_cell(row, col, fresh)
val = self.get_cell(row, col, fresh)
if when_empty_use_default and val.strip() == "":
return default
return val
except:
return default

Wyświetl plik

@ -1,11 +1,11 @@
import os, requests
import os, sys, requests
from loguru import logger
def mkdir_if_not_exists(folder):
if not os.path.exists(folder):
os.mkdir(folder)
os.makedirs(folder)
def expand_url(url):
@ -18,3 +18,11 @@ def expand_url(url):
except:
logger.error(f'Failed to expand url {url}')
return url
def getattr_or(o: object, prop: str, default: None = None):
try:
res = getattr(o, prop)
if res is None: raise
return res
except:
return default