kopia lustrzana https://github.com/bellingcat/auto-archiver
merge logic started
rodzic
53ffa2d4ae
commit
9c056d001c
|
@ -87,9 +87,11 @@ class TelethonArchiver(Archiverv2):
|
|||
pbar.update()
|
||||
|
||||
def download(self, item: Metadata) -> Metadata:
|
||||
"""
|
||||
if this url is archivable will download post info and look for other posts from the same group with media.
|
||||
can handle private/public channels
|
||||
"""
|
||||
url = item.get_url()
|
||||
|
||||
print(f"downloading {url=}")
|
||||
# detect URLs that we definitely cannot handle
|
||||
match = self.link_pattern.search(url)
|
||||
if not match: return False
|
||||
|
@ -126,8 +128,9 @@ class TelethonArchiver(Archiverv2):
|
|||
|
||||
# media can also be in entities
|
||||
if mp.entities:
|
||||
other_media_urls = [e.url for e in mp.entities if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image"]]
|
||||
logger.debug(f"Got {len(other_media_urls)} other medial urls from {mp.id=}: {other_media_urls}")
|
||||
other_media_urls = [e.url for e in mp.entities if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image", "audio"]]
|
||||
if len(other_media_urls):
|
||||
logger.debug(f"Got {len(other_media_urls)} other medial urls from {mp.id=}: {other_media_urls}")
|
||||
for om_url in other_media_urls:
|
||||
filename = os.path.join(tmp_dir, f'{chat}_{group_id}_{self._get_key_from_url(om_url)}')
|
||||
self.download_from_url(om_url, filename)
|
||||
|
@ -140,7 +143,7 @@ class TelethonArchiver(Archiverv2):
|
|||
continue
|
||||
result.add_media(filename)
|
||||
|
||||
result.set("post", post).set_title(title).set_timestamp(post.date)
|
||||
result.set("post", str(post)).set_title(title).set_timestamp(post.date)
|
||||
return result
|
||||
|
||||
def _get_media_posts_in_group(self, chat, original_post, max_amp=10):
|
||||
|
|
|
@ -4,6 +4,7 @@ from ast import List
|
|||
from typing import Any, Union, Dict
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -21,12 +22,25 @@ class Metadata:
|
|||
self.status = status
|
||||
self.metadata = {}
|
||||
|
||||
# @staticmethod
|
||||
def merge(self: Metadata, right: Metadata, overwrite_left=True) -> Metadata:
|
||||
# should return a merged version of the Metadata
|
||||
# will work for archived() and enriched()
|
||||
# what if 2 metadatas contain the same keys? only one can remain! : overwrite_left
|
||||
pass
|
||||
"""
|
||||
merges to Metadata instances, will overwrite according to overwrite_left flag
|
||||
"""
|
||||
res = Metadata()
|
||||
if overwrite_left:
|
||||
res.status = right.status
|
||||
res.metadata = dict(self.metadata) # make a copy
|
||||
for k, v in right.metadata.items():
|
||||
print(type(v), type(self.get(k)))
|
||||
# assert type(v) == type(self.get(k))
|
||||
if type(v) not in [dict, list, set] or k not in res.metadata:
|
||||
res.set(k, v)
|
||||
else: # key conflict
|
||||
if type(v) in [dict, set]: res.set(k, self.get(k) | v)
|
||||
elif type(v) == list: res.set(k, self.get(k) + v)
|
||||
else: # invert and do same logic
|
||||
return right.merge(self)
|
||||
return res
|
||||
|
||||
# TODO: setters?
|
||||
def set(self, key: str, val: Any) -> Metadata:
|
||||
|
@ -34,8 +48,10 @@ class Metadata:
|
|||
self.metadata[key] = val
|
||||
return self
|
||||
|
||||
def get(self, key: str, default: Any = None) -> Union[Metadata, str]:
|
||||
def get(self, key: str, default: Any = None, create_if_missing=False) -> Union[Metadata, str]:
|
||||
# goes through metadata and returns the Metadata available
|
||||
if create_if_missing and key not in self.metadata:
|
||||
self.metadata[key] = default
|
||||
return self.metadata.get(key, default)
|
||||
|
||||
# custom getter/setters
|
||||
|
@ -50,7 +66,11 @@ class Metadata:
|
|||
return url
|
||||
|
||||
def get_media(self) -> List:
|
||||
return self.get("media", [])
|
||||
return self.get("media", [], create_if_missing=True)
|
||||
|
||||
def set_content(self, content: str) -> Metadata:
|
||||
# the main textual content/information from a social media post, webpage, ...
|
||||
return self.set("content", content)
|
||||
|
||||
def set_title(self, title: str) -> Metadata:
|
||||
return self.set("title", title)
|
||||
|
@ -59,8 +79,10 @@ class Metadata:
|
|||
return self.set("title", title)
|
||||
|
||||
def add_media(self, filename: str) -> Metadata:
|
||||
# print(f"adding {filename} to {self.metadata.get('media')}")
|
||||
# return self.set("media", self.get_media() + [filename])
|
||||
return self.get_media().append(filename)
|
||||
|
||||
def as_json(self) -> str:
|
||||
# converts all metadata and data into JSON
|
||||
pass
|
||||
return json.dumps(self.metadata)
|
||||
|
|
|
@ -157,8 +157,9 @@ class ArchivingOrchestrator:
|
|||
for url in self.feeder:
|
||||
print("ARCHIVING", url)
|
||||
with tempfile.TemporaryDirectory(dir="./") as tmp_dir:
|
||||
self.archive(url, tmp_dir)
|
||||
|
||||
result = self.archive(url, tmp_dir)
|
||||
print(result)
|
||||
print(result.as_json())
|
||||
print("holding on")
|
||||
time.sleep(300)
|
||||
# how does this handle the parameters like folder which can be different for each archiver?
|
||||
|
|
|
@ -22,10 +22,10 @@ class Step(ABC):
|
|||
|
||||
def init(name: str, config: dict, child: Type[Step]) -> Step:
|
||||
"""
|
||||
cannot find subclasses of child.subclasses
|
||||
looks into direct subclasses of child for name and returns such ab object
|
||||
TODO: cannot find subclasses of child.subclasses
|
||||
"""
|
||||
for sub in child.__subclasses__():
|
||||
if sub.name == name:
|
||||
print(sub.name, "CALLING NEW")
|
||||
return sub(config)
|
||||
raise ClassFoundException(f"Unable to initialize STEP with {name=}")
|
||||
|
|
Ładowanie…
Reference in New Issue