gsheet feeder + db WIP

pull/72/head
msramalho 2023-01-04 16:37:36 +00:00
rodzic 96845305a3
commit bb512b36c9
15 zmienionych plików z 195 dodań i 65 usunięć

Wyświetl plik

@ -1,7 +1,7 @@
steps:
# only 1 feeder allowed
# a feeder could be in an "infinite loop" for example: gsheets_infinite feeder which holds-> this could be an easy logic addiction by modifying for each to while not feeder.done() if it becomes necessary
feeder: gsheets_feeder # default -> only expects URL from CLI
feeder: gsheet_feeder # default -> only expects URL from CLI
archivers: # order matters
- telethon
# - tiktok
@ -28,7 +28,7 @@ steps:
configurations:
global:
- save_logs: False
gsheets_feeder:
gsheet_feeder:
sheet: my-auto-archiver
header: 2 # defaults to 1 in GSheetsFeeder
service_account: "secrets/service_account.json"

Wyświetl plik

@ -23,6 +23,10 @@ class Archiverv2(Step):
# used when archivers need to login or do other one-time setup
pass
def clean_url(self, url:str) -> str:
# used to clean unnecessary URL parameters
return url
def _guess_file_type(self, path: str) -> str:
"""
Receives a URL or filename and returns global mimetype like 'image' or 'video'

Wyświetl plik

@ -5,6 +5,8 @@ from dataclasses import dataclass, field
from typing import List
from archivers import Archiverv2
from feeders import Feeder
from databases import Database
from storages import StorageV2
from steps.step import Step
from enrichers import Enricher
from collections import defaultdict
@ -13,10 +15,13 @@ from collections import defaultdict
@dataclass
class ConfigV2:
# TODO: should Config inherit from Step so it can have it's own configurations?
# these are only detected if they are put to the respective __init__.py
configurable_parents = [
Feeder,
Enricher,
Archiverv2,
Database,
StorageV2
# Util
]
feeder: Step # TODO:= BaseFeeder
@ -24,14 +29,14 @@ class ConfigV2:
enrichers: List[Enricher] = field(default_factory=[])
formatters: List[Step] = field(default_factory=[]) # TODO: fix type
storages: List[Step] = field(default_factory=[]) # TODO: fix type
databases: List[Step] = field(default_factory=[]) # TODO: fix type
databases: List[Database] = field(default_factory=[])
def __init__(self) -> None:
self.defaults = {}
self.cli_ops = {}
self.config = {}
# TODO: make this work for nested props like gsheets_feeder.columns.url = "URL"
# TODO: make this work for nested props like gsheet_feeder.columns.url = "URL"
def parse(self):
# 1. parse CLI values
parser = argparse.ArgumentParser(
@ -84,10 +89,12 @@ class ConfigV2:
self.feeder = Feeder.init(steps.get("feeder", "cli_feeder"), self.config)
self.enrichers = [Enricher.init(e, self.config) for e in steps.get("enrichers", [])]
self.archivers = [Archiverv2.init(e, self.config) for e in steps.get("archivers", [])]
self.databases = [Database.init(e, self.config) for e in steps.get("databases", [])]
print("feeder", self.feeder)
print("enrichers", [e for e in self.enrichers])
print("archivers", [e for e in self.archivers])
print("databases", [e for e in self.databases])
def validate(self):
pass

Wyświetl plik

@ -0,0 +1,2 @@
from .database import Database
from .gsheet_db import GsheetsDb

Wyświetl plik

@ -1,9 +1,11 @@
from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod, ABC
from typing import Union
from metadata import Metadata
from steps.step import Step
@dataclass
class Database(Step, ABC):
name = "database"
@ -11,11 +13,30 @@ class Database(Step, ABC):
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
# only for typing...
def init(name: str, config: dict) -> Database:
# only for typing...
return Step.init(name, config, Database)
@abstractmethod
def enrich(self, item: Metadata) -> Metadata: pass
def started(self, item: Metadata) -> None:
"""signals the DB that the given item archival has started"""
pass
def failed(self, item: Metadata) -> None:
"""update DB accordingly for failure"""
pass
def aborted(self, item: Metadata) -> None:
"""abort notification if user cancelled after start"""
pass
# @abstractmethod
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already"""
return False
@abstractmethod
def done(self, item: Metadata) -> None:
"""archival result ready - should be saved to DB"""
pass

Wyświetl plik

@ -0,0 +1,64 @@
from typing import Union, Tuple
import gspread
# from metadata import Metadata
from loguru import logger
# from . import Enricher
from databases import Database
from metadata import Metadata
from steps.gsheet import Gsheets
from utils import GWorksheet
class GsheetsDb(Database):
"""
NB: only works if GsheetFeeder is used.
could be updated in the future to support non-GsheetFeeder metadata
"""
name = "gsheet_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', 'Archive in progress')
def failed(self, item: Metadata) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, 'Archive failed')
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
self._safe_status_update(item, '')
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already"""
# TODO: this should not be done at the feeder stage then!
return False
def done(self, item: Metadata) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item}")
gw, row = self._retrieve_gsheet(item)
self._safe_status_update(item, 'done')
pass
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', new_status)
except Exception as e:
logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
gw: GWorksheet = item.get("gsheet").get("worksheet")
row: int = item.get("gsheet").get("row")
return gw, row

Wyświetl plik

@ -1,2 +1,2 @@
from .enricher import Enricher
from .enricher_screenshot import ScreenshotEnricher
from .screenshot_enricher import ScreenshotEnricher

Wyświetl plik

@ -1,2 +1,2 @@
from.feeder import Feeder
from .feeder_gsheet import GsheetsFeeder
from .gsheet_feeder import GsheetsFeeder

Wyświetl plik

@ -1,7 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass
from abc import abstractmethod
# from metadata import Metadata
from metadata import Metadata
from steps.step import Step
@ -17,7 +17,5 @@ class Feeder(Step):
# only for code typing
return Step.init(name, config, Feeder)
# def feed(self, item: Metadata) -> Metadata: pass
@abstractmethod
def __iter__(self) -> Feeder: return None
def __iter__(self) -> Metadata: return None

Wyświetl plik

@ -1,16 +1,17 @@
import json, gspread
import gspread
# from metadata import Metadata
from loguru import logger
# from . import Enricher
from feeders import Feeder
from metadata import Metadata
from steps.gsheet import Gsheets
from utils import GWorksheet
class GsheetsFeeder(Gsheets, Feeder):
name = "gsheets_feeder"
name = "gsheet_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
@ -35,7 +36,7 @@ class GsheetsFeeder(Gsheets, Feeder):
}
})
def __iter__(self) -> str:
def __iter__(self) -> Metadata:
sh = self.gsheets_client.open(self.sheet)
for ii, wks in enumerate(sh.worksheets()):
if not self.should_process_sheet(wks.title):
@ -52,17 +53,16 @@ class GsheetsFeeder(Gsheets, Feeder):
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
# TODO: gsheet_db should check later if this is supposed to be archived
# static_status = gw.get_cell(row, 'status')
# status = gw.get_cell(row, 'status', fresh=static_status in ['', None] and url != '')
# All checks done - archival process starts here
yield url
logger.success(f'Finished worksheet {wks.title}')
# GWorksheet(self.sheet)
print(self.sheet)
for u in ["url1", "url2"]:
yield u
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here
yield Metadata().set_url(url).set("gsheet", {"row": row, "worksheet": gw}, True)
logger.success(f'Finished worksheet {wks.title}')
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:

Wyświetl plik

@ -1,6 +1,6 @@
from __future__ import annotations
from ast import List
from ast import List, Set
from typing import Any, Union, Dict
from dataclasses import dataclass, field
from datetime import datetime
@ -12,8 +12,14 @@ from media import Media
@dataclass
class Metadata:
status: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
tmp_keys: Set[str] = field(default_factory=set) # keys that are not to be saved in DBs
media: List[Media] = field(default_factory=list)
rearchivable: bool = False
# def __init__(self, url, metadata = {}) -> None:
# self.set_url(url)
# self.metadata = metadata
def merge(self: Metadata, right: Metadata, overwrite_left=True) -> Metadata:
"""
@ -21,6 +27,7 @@ class Metadata:
"""
if overwrite_left:
self.status = right.status
self.rearchivable |= right.rearchivable
for k, v in right.metadata.items():
assert k not in self.metadata or type(v) == type(self.get(k))
if type(v) not in [dict, list, set] or k not in self.metadata:
@ -33,8 +40,10 @@ class Metadata:
return right.merge(self)
return self
def set(self, key: str, val: Any) -> Metadata:
def set(self, key: str, val: Any, is_tmp=False) -> Metadata:
# if not self.metadata: self.metadata = {}
self.metadata[key] = val
if is_tmp: self.tmp_keys.add(key)
return self
def get(self, key: str, default: Any = None, create_if_missing=False) -> Union[Metadata, str]:
@ -75,3 +84,12 @@ class Metadata:
# # converts all metadata and data into JSON
# return json.dumps(self.metadata)
# #TODO: datetime is not serializable
def cleanup(self) -> Metadata:
#TODO: refactor so it returns a JSON with all intended properties, except tmp_keys
# the code below leads to errors if database needs tmp_keys after they are removed
# """removes temporary metadata fields, ideally called after all ops except writing"""
# for tmp_key in self.tmp_keys:
# self.metadata.pop(tmp_key, None)
# self.tmp_keys = set()
pass

Wyświetl plik

@ -5,8 +5,11 @@ from dataclasses import dataclass
from archivers.archiver import Archiverv2
from enrichers.enricher import Enricher
from databases.database import Database
from metadata import Metadata
import tempfile, time
import tempfile, time, traceback
from loguru import logger
"""
how not to couple the different pieces of logic
@ -119,7 +122,7 @@ class ArchivingOrchestrator:
# identify each formatter, storage, database, etc
# self.feeder = Feeder.init(config.feeder, config.get(config.feeder))
# Is it possible to overwrite config.yaml values? it could be useful: share config file and modify gsheets_feeder.sheet via CLI
# Is it possible to overwrite config.yaml values? it could be useful: share config file and modify gsheet_feeder.sheet via CLI
# where does that update/processing happen? in config.py
# reflection for Archiver to know wihch child classes it has? use Archiver.__subclasses__
# self.archivers = [
@ -129,12 +132,12 @@ class ArchivingOrchestrator:
self.feeder = config.feeder
self.enrichers = config.enrichers
self.archivers: List[Archiverv2] = config.archivers
self.databases: List[Database] = config.databases
for a in self.archivers: a.setup()
self.formatters = []
self.storages = []
self.databases = []
# self.formatters = [
# Formatter.init(f, config)
# for f in config.formatters
@ -154,51 +157,61 @@ class ArchivingOrchestrator:
# assert len(archivers) > 1, "there needs to be at least one Archiver"
def feed(self) -> list(Metadata):
for url in self.feeder:
print("ARCHIVING", url)
with tempfile.TemporaryDirectory(dir="./") as tmp_dir:
result = self.archive(url, tmp_dir)
print(type(result))
print(result)
# print(result.as_json())
print("holding on")
time.sleep(300)
for item in self.feeder:
print("ARCHIVING", item)
try:
with tempfile.TemporaryDirectory(dir="./") as tmp_dir:
item.set("tmp_dir", tmp_dir, True)
result = self.archive(item)
print(result)
except KeyboardInterrupt:
# catches keyboard interruptions to do a clean exit
logger.warning(f"caught interrupt on {item=}")
for d in self.databases: d.aborted(item)
exit()
except Exception as e:
logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}')
for d in self.databases: d.failed(item)
print("holding on 5min")
time.sleep(300)
# how does this handle the parameters like folder which can be different for each archiver?
# the storage needs to know where to archive!!
# solution: feeders have context: extra metadata that they can read or ignore,
# all of it should have sensible defaults (eg: folder)
# default feeder is a list with 1 element
def archive(self, url: str, tmp_dir: str) -> Union[Metadata, None]:
# TODO:
# url = clear_url(url) # should we save if they differ?
# result = Metadata(url=url)
result = Metadata()
def archive(self, result: Metadata) -> Union[Metadata, None]:
url = result.get_url()
# TODO: clean urls
for a in self.archivers:
url = a.clean_url(url)
result.set_url(url)
result.set("tmp_dir", tmp_dir)
should_archive = True
for d in self.databases: should_archive &= d.should_process(url)
# should_archive = False
# for d in self.databases: should_archive |= d.should_process(url)
# should storages also be able to check?
for s in self.storages: should_archive &= s.should_process(url)
# for s in self.storages: should_archive |= s.should_process(url)
if not should_archive:
print("skipping")
return "skipping"
# if not should_archive:
# print("skipping")
# return "skipping"
# signal to DB that archiving has started
# and propagate already archived if it exists
cached_result = None
for d in self.databases:
# are the databases to decide whether to archive?
# they can simply return True by default, otherwise they can avoid duplicates. should this logic be more granular, for example on the archiver level: a tweet will not need be scraped twice, whereas an instagram profile might. the archiver could not decide from the link which parts to archive,
# instagram profile example: it would always re-archive everything
# maybe the database/storage could use a hash/key to decide if there's a need to re-archive
if d.should_process(url):
d.started(url)
elif d.exists(url):
return d.fetch(url)
else:
print("Skipping url")
return
d.started(result)
if (local_result := d.fetch(result)):
cached_result = (cached_result or Metadata()).merge(local_result)
if cached_result and not cached_result.rearchivable:
for d in self.databases:
d.done(cached_result)
return cached_result
# vk, telethon, ...
for a in self.archivers:
@ -209,6 +222,7 @@ class ArchivingOrchestrator:
# this is where the Hashes come from, the place with access to all content
# the archiver does not have access to storage
result.merge(a.download(result))
# TODO: fix logic
if True or result.is_success(): break
# what if an archiver returns multiple entries and one is to be part of HTMLgenerator?
@ -224,13 +238,14 @@ class ArchivingOrchestrator:
for f in self.formatters:
result.merge(f.format(result))
# storages
# storage
for s in self.storages:
for m in result.media:
m.merge(s.store(m))
result.merge(s.store(m))
# signal completion to databases (DBs, Google Sheets, CSV, ...)
# a hash registration service could be one database: forensic archiving
result.cleanup()
for d in self.databases: d.done(result)
return result

Wyświetl plik

@ -14,7 +14,7 @@ class Step(ABC):
def __init__(self, config: dict) -> None:
# reads the configs into object properties
# self.config = config[self.name]
for k, v in config[self.name].items():
for k, v in config.get(self.name, {}).items():
self.__setattr__(k, v)
@staticmethod

Wyświetl plik

@ -4,4 +4,5 @@ from .local_storage import LocalStorage, LocalConfig
from .s3_storage import S3Config, S3Storage
from .gd_storage import GDConfig, GDStorage
from .storage import StorageV2
from .storage import StorageV2
from .s3 import S3StorageV2