diff --git a/orchestration.example.yaml b/orchestration.example.yaml index 7163829..caf7737 100644 --- a/orchestration.example.yaml +++ b/orchestration.example.yaml @@ -1,7 +1,7 @@ steps: # only 1 feeder allowed # a feeder could be in an "infinite loop" for example: gsheets_infinite feeder which holds-> this could be an easy logic addiction by modifying for each to while not feeder.done() if it becomes necessary - feeder: gsheets_feeder # default -> only expects URL from CLI + feeder: gsheet_feeder # default -> only expects URL from CLI archivers: # order matters - telethon # - tiktok @@ -28,7 +28,7 @@ steps: configurations: global: - save_logs: False - gsheets_feeder: + gsheet_feeder: sheet: my-auto-archiver header: 2 # defaults to 1 in GSheetsFeeder service_account: "secrets/service_account.json" diff --git a/src/archivers/archiver.py b/src/archivers/archiver.py index 37f5d4d..f16464a 100644 --- a/src/archivers/archiver.py +++ b/src/archivers/archiver.py @@ -23,6 +23,10 @@ class Archiverv2(Step): # used when archivers need to login or do other one-time setup pass + def clean_url(self, url:str) -> str: + # used to clean unnecessary URL parameters + return url + def _guess_file_type(self, path: str) -> str: """ Receives a URL or filename and returns global mimetype like 'image' or 'video' diff --git a/src/configs/v2config.py b/src/configs/v2config.py index 75a125e..b028b5e 100644 --- a/src/configs/v2config.py +++ b/src/configs/v2config.py @@ -5,6 +5,8 @@ from dataclasses import dataclass, field from typing import List from archivers import Archiverv2 from feeders import Feeder +from databases import Database +from storages import StorageV2 from steps.step import Step from enrichers import Enricher from collections import defaultdict @@ -13,10 +15,13 @@ from collections import defaultdict @dataclass class ConfigV2: # TODO: should Config inherit from Step so it can have it's own configurations? + # these are only detected if they are put to the respective __init__.py configurable_parents = [ Feeder, Enricher, Archiverv2, + Database, + StorageV2 # Util ] feeder: Step # TODO:= BaseFeeder @@ -24,14 +29,14 @@ class ConfigV2: enrichers: List[Enricher] = field(default_factory=[]) formatters: List[Step] = field(default_factory=[]) # TODO: fix type storages: List[Step] = field(default_factory=[]) # TODO: fix type - databases: List[Step] = field(default_factory=[]) # TODO: fix type + databases: List[Database] = field(default_factory=[]) def __init__(self) -> None: self.defaults = {} self.cli_ops = {} self.config = {} - # TODO: make this work for nested props like gsheets_feeder.columns.url = "URL" + # TODO: make this work for nested props like gsheet_feeder.columns.url = "URL" def parse(self): # 1. parse CLI values parser = argparse.ArgumentParser( @@ -84,10 +89,12 @@ class ConfigV2: self.feeder = Feeder.init(steps.get("feeder", "cli_feeder"), self.config) self.enrichers = [Enricher.init(e, self.config) for e in steps.get("enrichers", [])] self.archivers = [Archiverv2.init(e, self.config) for e in steps.get("archivers", [])] + self.databases = [Database.init(e, self.config) for e in steps.get("databases", [])] print("feeder", self.feeder) print("enrichers", [e for e in self.enrichers]) print("archivers", [e for e in self.archivers]) + print("databases", [e for e in self.databases]) def validate(self): pass diff --git a/src/databases/__init__.py b/src/databases/__init__.py new file mode 100644 index 0000000..17b9c6d --- /dev/null +++ b/src/databases/__init__.py @@ -0,0 +1,2 @@ +from .database import Database +from .gsheet_db import GsheetsDb \ No newline at end of file diff --git a/src/databases/database.py b/src/databases/database.py index 15f8d0d..94b2178 100644 --- a/src/databases/database.py +++ b/src/databases/database.py @@ -1,9 +1,11 @@ from __future__ import annotations from dataclasses import dataclass from abc import abstractmethod, ABC +from typing import Union from metadata import Metadata from steps.step import Step + @dataclass class Database(Step, ABC): name = "database" @@ -11,11 +13,30 @@ class Database(Step, ABC): def __init__(self, config: dict) -> None: # without this STEP.__init__ is not called super().__init__(config) - - # only for typing... def init(name: str, config: dict) -> Database: + # only for typing... return Step.init(name, config, Database) @abstractmethod - def enrich(self, item: Metadata) -> Metadata: pass + def started(self, item: Metadata) -> None: + """signals the DB that the given item archival has started""" + pass + + def failed(self, item: Metadata) -> None: + """update DB accordingly for failure""" + pass + + def aborted(self, item: Metadata) -> None: + """abort notification if user cancelled after start""" + pass + + # @abstractmethod + def fetch(self, item: Metadata) -> Union[Metadata, bool]: + """check if the given item has been archived already""" + return False + + @abstractmethod + def done(self, item: Metadata) -> None: + """archival result ready - should be saved to DB""" + pass diff --git a/src/databases/gsheet_db.py b/src/databases/gsheet_db.py new file mode 100644 index 0000000..939e851 --- /dev/null +++ b/src/databases/gsheet_db.py @@ -0,0 +1,64 @@ +from typing import Union, Tuple +import gspread + +# from metadata import Metadata +from loguru import logger + +# from . import Enricher +from databases import Database +from metadata import Metadata +from steps.gsheet import Gsheets +from utils import GWorksheet + + +class GsheetsDb(Database): + """ + NB: only works if GsheetFeeder is used. + could be updated in the future to support non-GsheetFeeder metadata + """ + name = "gsheet_db" + + def __init__(self, config: dict) -> None: + # without this STEP.__init__ is not called + super().__init__(config) + + @staticmethod + def configs() -> dict: + return {} + + def started(self, item: Metadata) -> None: + logger.warning(f"STARTED {item}") + gw, row = self._retrieve_gsheet(item) + gw.set_cell(row, 'status', 'Archive in progress') + + def failed(self, item: Metadata) -> None: + logger.error(f"FAILED {item}") + self._safe_status_update(item, 'Archive failed') + + def aborted(self, item: Metadata) -> None: + logger.warning(f"ABORTED {item}") + self._safe_status_update(item, '') + + def fetch(self, item: Metadata) -> Union[Metadata, bool]: + """check if the given item has been archived already""" + # TODO: this should not be done at the feeder stage then! + return False + + def done(self, item: Metadata) -> None: + """archival result ready - should be saved to DB""" + logger.success(f"DONE {item}") + gw, row = self._retrieve_gsheet(item) + self._safe_status_update(item, 'done') + pass + + def _safe_status_update(self, item: Metadata, new_status: str) -> None: + try: + gw, row = self._retrieve_gsheet(item) + gw.set_cell(row, 'status', new_status) + except Exception as e: + logger.debug(f"Unable to update sheet: {e}") + + def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]: + gw: GWorksheet = item.get("gsheet").get("worksheet") + row: int = item.get("gsheet").get("row") + return gw, row diff --git a/src/enrichers/__init__.py b/src/enrichers/__init__.py index 3c266f8..503ea2c 100644 --- a/src/enrichers/__init__.py +++ b/src/enrichers/__init__.py @@ -1,2 +1,2 @@ from .enricher import Enricher -from .enricher_screenshot import ScreenshotEnricher \ No newline at end of file +from .screenshot_enricher import ScreenshotEnricher \ No newline at end of file diff --git a/src/enrichers/enricher_screenshot.py b/src/enrichers/screenshot_enricher.py similarity index 100% rename from src/enrichers/enricher_screenshot.py rename to src/enrichers/screenshot_enricher.py diff --git a/src/feeders/__init__.py b/src/feeders/__init__.py index 9fb5942..b11cd50 100644 --- a/src/feeders/__init__.py +++ b/src/feeders/__init__.py @@ -1,2 +1,2 @@ from.feeder import Feeder -from .feeder_gsheet import GsheetsFeeder \ No newline at end of file +from .gsheet_feeder import GsheetsFeeder \ No newline at end of file diff --git a/src/feeders/feeder.py b/src/feeders/feeder.py index d930ba0..bccfab8 100644 --- a/src/feeders/feeder.py +++ b/src/feeders/feeder.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass from abc import abstractmethod -# from metadata import Metadata +from metadata import Metadata from steps.step import Step @@ -17,7 +17,5 @@ class Feeder(Step): # only for code typing return Step.init(name, config, Feeder) - # def feed(self, item: Metadata) -> Metadata: pass - @abstractmethod - def __iter__(self) -> Feeder: return None \ No newline at end of file + def __iter__(self) -> Metadata: return None \ No newline at end of file diff --git a/src/feeders/feeder_gsheet.py b/src/feeders/gsheet_feeder.py similarity index 85% rename from src/feeders/feeder_gsheet.py rename to src/feeders/gsheet_feeder.py index ad28af1..b9389a2 100644 --- a/src/feeders/feeder_gsheet.py +++ b/src/feeders/gsheet_feeder.py @@ -1,16 +1,17 @@ -import json, gspread +import gspread # from metadata import Metadata from loguru import logger # from . import Enricher from feeders import Feeder +from metadata import Metadata from steps.gsheet import Gsheets from utils import GWorksheet class GsheetsFeeder(Gsheets, Feeder): - name = "gsheets_feeder" + name = "gsheet_feeder" def __init__(self, config: dict) -> None: # without this STEP.__init__ is not called @@ -35,7 +36,7 @@ class GsheetsFeeder(Gsheets, Feeder): } }) - def __iter__(self) -> str: + def __iter__(self) -> Metadata: sh = self.gsheets_client.open(self.sheet) for ii, wks in enumerate(sh.worksheets()): if not self.should_process_sheet(wks.title): @@ -52,17 +53,16 @@ class GsheetsFeeder(Gsheets, Feeder): for row in range(1 + self.header, gw.count_rows() + 1): url = gw.get_cell(row, 'url').strip() if not len(url): continue - # TODO: gsheet_db should check later if this is supposed to be archived - # static_status = gw.get_cell(row, 'status') - # status = gw.get_cell(row, 'status', fresh=static_status in ['', None] and url != '') - # All checks done - archival process starts here - yield url - logger.success(f'Finished worksheet {wks.title}') - # GWorksheet(self.sheet) - print(self.sheet) - for u in ["url1", "url2"]: - yield u + original_status = gw.get_cell(row, 'status') + status = gw.get_cell(row, 'status', fresh=original_status in ['', None]) + # TODO: custom status parser(?) aka should_retry_from_status + if status not in ['', None]: continue + + # All checks done - archival process starts here + yield Metadata().set_url(url).set("gsheet", {"row": row, "worksheet": gw}, True) + + logger.success(f'Finished worksheet {wks.title}') def should_process_sheet(self, sheet_name: str) -> bool: if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets: diff --git a/src/metadata.py b/src/metadata.py index 8945e1a..90ca743 100644 --- a/src/metadata.py +++ b/src/metadata.py @@ -1,6 +1,6 @@ from __future__ import annotations -from ast import List +from ast import List, Set from typing import Any, Union, Dict from dataclasses import dataclass, field from datetime import datetime @@ -12,8 +12,14 @@ from media import Media @dataclass class Metadata: status: str = "" - metadata: Dict[str, Any] = field(default_factory=dict) + metadata: Dict[str, Any] = field(default_factory=dict) + tmp_keys: Set[str] = field(default_factory=set) # keys that are not to be saved in DBs media: List[Media] = field(default_factory=list) + rearchivable: bool = False + + # def __init__(self, url, metadata = {}) -> None: + # self.set_url(url) + # self.metadata = metadata def merge(self: Metadata, right: Metadata, overwrite_left=True) -> Metadata: """ @@ -21,6 +27,7 @@ class Metadata: """ if overwrite_left: self.status = right.status + self.rearchivable |= right.rearchivable for k, v in right.metadata.items(): assert k not in self.metadata or type(v) == type(self.get(k)) if type(v) not in [dict, list, set] or k not in self.metadata: @@ -33,8 +40,10 @@ class Metadata: return right.merge(self) return self - def set(self, key: str, val: Any) -> Metadata: + def set(self, key: str, val: Any, is_tmp=False) -> Metadata: + # if not self.metadata: self.metadata = {} self.metadata[key] = val + if is_tmp: self.tmp_keys.add(key) return self def get(self, key: str, default: Any = None, create_if_missing=False) -> Union[Metadata, str]: @@ -75,3 +84,12 @@ class Metadata: # # converts all metadata and data into JSON # return json.dumps(self.metadata) # #TODO: datetime is not serializable + + def cleanup(self) -> Metadata: + #TODO: refactor so it returns a JSON with all intended properties, except tmp_keys + # the code below leads to errors if database needs tmp_keys after they are removed + # """removes temporary metadata fields, ideally called after all ops except writing""" + # for tmp_key in self.tmp_keys: + # self.metadata.pop(tmp_key, None) + # self.tmp_keys = set() + pass diff --git a/src/orchestrator.py b/src/orchestrator.py index 2f33370..26baed1 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -5,8 +5,11 @@ from dataclasses import dataclass from archivers.archiver import Archiverv2 from enrichers.enricher import Enricher +from databases.database import Database from metadata import Metadata -import tempfile, time +import tempfile, time, traceback +from loguru import logger + """ how not to couple the different pieces of logic @@ -119,7 +122,7 @@ class ArchivingOrchestrator: # identify each formatter, storage, database, etc # self.feeder = Feeder.init(config.feeder, config.get(config.feeder)) - # Is it possible to overwrite config.yaml values? it could be useful: share config file and modify gsheets_feeder.sheet via CLI + # Is it possible to overwrite config.yaml values? it could be useful: share config file and modify gsheet_feeder.sheet via CLI # where does that update/processing happen? in config.py # reflection for Archiver to know wihch child classes it has? use Archiver.__subclasses__ # self.archivers = [ @@ -129,12 +132,12 @@ class ArchivingOrchestrator: self.feeder = config.feeder self.enrichers = config.enrichers self.archivers: List[Archiverv2] = config.archivers + self.databases: List[Database] = config.databases for a in self.archivers: a.setup() self.formatters = [] self.storages = [] - self.databases = [] # self.formatters = [ # Formatter.init(f, config) # for f in config.formatters @@ -154,51 +157,61 @@ class ArchivingOrchestrator: # assert len(archivers) > 1, "there needs to be at least one Archiver" def feed(self) -> list(Metadata): - for url in self.feeder: - print("ARCHIVING", url) - with tempfile.TemporaryDirectory(dir="./") as tmp_dir: - result = self.archive(url, tmp_dir) - print(type(result)) - print(result) - # print(result.as_json()) - print("holding on") - time.sleep(300) + for item in self.feeder: + print("ARCHIVING", item) + try: + with tempfile.TemporaryDirectory(dir="./") as tmp_dir: + item.set("tmp_dir", tmp_dir, True) + result = self.archive(item) + print(result) + except KeyboardInterrupt: + # catches keyboard interruptions to do a clean exit + logger.warning(f"caught interrupt on {item=}") + for d in self.databases: d.aborted(item) + exit() + except Exception as e: + logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}') + for d in self.databases: d.failed(item) + + print("holding on 5min") + time.sleep(300) + # how does this handle the parameters like folder which can be different for each archiver? # the storage needs to know where to archive!! # solution: feeders have context: extra metadata that they can read or ignore, # all of it should have sensible defaults (eg: folder) # default feeder is a list with 1 element - def archive(self, url: str, tmp_dir: str) -> Union[Metadata, None]: - # TODO: - # url = clear_url(url) # should we save if they differ? - # result = Metadata(url=url) - result = Metadata() + def archive(self, result: Metadata) -> Union[Metadata, None]: + url = result.get_url() + # TODO: clean urls + for a in self.archivers: + url = a.clean_url(url) result.set_url(url) - result.set("tmp_dir", tmp_dir) - - should_archive = True - for d in self.databases: should_archive &= d.should_process(url) + # should_archive = False + # for d in self.databases: should_archive |= d.should_process(url) # should storages also be able to check? - for s in self.storages: should_archive &= s.should_process(url) + # for s in self.storages: should_archive |= s.should_process(url) - if not should_archive: - print("skipping") - return "skipping" + # if not should_archive: + # print("skipping") + # return "skipping" # signal to DB that archiving has started + # and propagate already archived if it exists + cached_result = None for d in self.databases: # are the databases to decide whether to archive? # they can simply return True by default, otherwise they can avoid duplicates. should this logic be more granular, for example on the archiver level: a tweet will not need be scraped twice, whereas an instagram profile might. the archiver could not decide from the link which parts to archive, # instagram profile example: it would always re-archive everything # maybe the database/storage could use a hash/key to decide if there's a need to re-archive - if d.should_process(url): - d.started(url) - elif d.exists(url): - return d.fetch(url) - else: - print("Skipping url") - return + d.started(result) + if (local_result := d.fetch(result)): + cached_result = (cached_result or Metadata()).merge(local_result) + if cached_result and not cached_result.rearchivable: + for d in self.databases: + d.done(cached_result) + return cached_result # vk, telethon, ... for a in self.archivers: @@ -209,6 +222,7 @@ class ArchivingOrchestrator: # this is where the Hashes come from, the place with access to all content # the archiver does not have access to storage result.merge(a.download(result)) + # TODO: fix logic if True or result.is_success(): break # what if an archiver returns multiple entries and one is to be part of HTMLgenerator? @@ -224,13 +238,14 @@ class ArchivingOrchestrator: for f in self.formatters: result.merge(f.format(result)) - # storages + # storage for s in self.storages: for m in result.media: - m.merge(s.store(m)) + result.merge(s.store(m)) # signal completion to databases (DBs, Google Sheets, CSV, ...) # a hash registration service could be one database: forensic archiving + result.cleanup() for d in self.databases: d.done(result) return result diff --git a/src/steps/step.py b/src/steps/step.py index 7a2135c..b512af7 100644 --- a/src/steps/step.py +++ b/src/steps/step.py @@ -14,7 +14,7 @@ class Step(ABC): def __init__(self, config: dict) -> None: # reads the configs into object properties # self.config = config[self.name] - for k, v in config[self.name].items(): + for k, v in config.get(self.name, {}).items(): self.__setattr__(k, v) @staticmethod diff --git a/src/storages/__init__.py b/src/storages/__init__.py index 96baaba..91ce148 100644 --- a/src/storages/__init__.py +++ b/src/storages/__init__.py @@ -4,4 +4,5 @@ from .local_storage import LocalStorage, LocalConfig from .s3_storage import S3Config, S3Storage from .gd_storage import GDConfig, GDStorage -from .storage import StorageV2 \ No newline at end of file +from .storage import StorageV2 +from .s3 import S3StorageV2 \ No newline at end of file