auto-archiver/src/auto_archiver/core/metadata.py

161 wiersze
5.8 KiB
Python

from __future__ import annotations
from ast import List, Set
from typing import Any, Union, Dict
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json
import datetime, mimetypes
from urllib.parse import urlparse
from loguru import logger
from dateutil.parser import parse as parse_dt
from .media import Media
# annotation order matters
@dataclass_json
@dataclass
class Metadata:
status: str = "no archiver"
_processed_at: datetime = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
metadata: Dict[str, Any] = field(default_factory=dict)
tmp_keys: Set[str] = field(default_factory=set, repr=False) # keys that are not to be saved in DBs
media: List[Media] = field(default_factory=list)
final_media: Media = None # can be overwritten by formatters
rearchivable: bool = False
# def __init__(self, url, metadata = {}) -> None:
# self.set_url(url)
# self.metadata = metadata
def merge(self: Metadata, right: Metadata, overwrite_left=True) -> Metadata:
"""
merges two Metadata instances, will overwrite according to overwrite_left flag
"""
if not right: return self
if overwrite_left:
if right.status and len(right.status):
self.status = right.status
self.rearchivable |= right.rearchivable
self.tmp_keys |= right.tmp_keys
for k, v in right.metadata.items():
assert k not in self.metadata or type(v) == type(self.get(k))
if type(v) not in [dict, list, set] or k not in self.metadata:
self.set(k, v)
else: # key conflict
if type(v) in [dict, set]: self.set(k, self.get(k) | v)
elif type(v) == list: self.set(k, self.get(k) + v)
self.media.extend(right.media)
else: # invert and do same logic
return right.merge(self)
return self
def set(self, key: str, val: Any, is_tmp=False) -> Metadata:
# if not self.metadata: self.metadata = {}
self.metadata[key] = val
if is_tmp: self.tmp_keys.add(key)
return self
def get(self, key: str, default: Any = None, create_if_missing=False) -> Union[Metadata, str]:
# goes through metadata and returns the Metadata available
if create_if_missing and key not in self.metadata:
self.metadata[key] = default
return self.metadata.get(key, default)
def success(self, context: str = None) -> Metadata:
if context: self.status = f"{context}: success"
else: self.status = "success"
return self
def is_success(self) -> bool:
return "success" in self.status
@property # getter .netloc
def netloc(self) -> str:
return urlparse(self.get_url()).netloc
# custom getter/setters
def set_url(self, url: str) -> Metadata:
assert type(url) is str and len(url) > 0, "invalid URL"
return self.set("url", url)
def get_url(self) -> str:
url = self.get("url")
assert type(url) is str and len(url) > 0, "invalid URL"
return url
def set_content(self, content: str) -> Metadata:
# a dump with all the relevant content
return self.set("content", content)
def set_title(self, title: str) -> Metadata:
return self.set("title", title)
def get_title(self) -> str:
return self.get("title")
def set_tmp_dir(self, tmp_dir: str) -> Metadata:
return self.set("tmp_dir", tmp_dir, True)
def get_tmp_dir(self) -> str:
return self.get("tmp_dir")
def set_timestamp(self, timestamp: datetime.datetime) -> Metadata:
if type(timestamp) == str:
timestamp = parse_dt(timestamp)
assert type(timestamp) == datetime.datetime, "set_timestamp expects a datetime instance"
return self.set("timestamp", timestamp)
def get_timestamp(self, utc=True, iso=True) -> datetime.datetime:
ts = self.get("timestamp")
if not ts: return ts
if utc: ts = ts.replace(tzinfo=datetime.timezone.utc)
if iso: return ts.isoformat()
return ts
def add_media(self, media: Media, id: str = None) -> Metadata:
# adds a new media, optionally including an id
if media is None: return
if id is not None: media.set("id", id)
self.media.append(media)
return media
def get_media_by_id(self, id: str) -> Media:
for m in self.media:
if m.get("id") == id: return m
return None
def set_final_media(self, final: Media) -> Metadata:
if final:
if self.final_media:
logger.warning(f"overwriting final media value :{self.final_media} with {final}")
self.final_media = final
return self
def get_single_media(self) -> Media:
# TODO: could be refactored to use a custom media.id
if self.final_media:
return self.final_media
return self.media[0]
# def as_json(self) -> str:
# # converts all metadata and data into JSON
# return json.dumps(self.metadata)
# #TODO: datetime is not serializable
def get_clean_metadata(self) -> Metadata:
return dict(
{k: v for k, v in self.metadata.items() if k not in self.tmp_keys},
**{"processed_at": self._processed_at} # TODO: move to enrichment
)
def cleanup(self) -> Metadata:
# TODO: refactor so it returns a JSON with all intended properties, except tmp_keys
# the code below leads to errors if database needs tmp_keys after they are removed
# """removes temporary metadata fields, ideally called after all ops except writing"""
# for tmp_key in self.tmp_keys:
# self.metadata.pop(tmp_key, None)
# self.tmp_keys = set()
pass