Merge branch 'main' into settings_page

pull/217/head
Patrick Robertson 2025-03-07 15:17:42 +00:00 zatwierdzone przez GitHub
commit 333201acec
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
21 zmienionych plików z 342 dodań i 127 usunięć

Wyświetl plik

@ -7,13 +7,24 @@ ENV RUNNING_IN_DOCKER=1 \
PYTHONFAULTHANDLER=1 \ PYTHONFAULTHANDLER=1 \
PATH="/root/.local/bin:$PATH" PATH="/root/.local/bin:$PATH"
ARG TARGETARCH
# Installing system dependencies # Installing system dependencies
RUN add-apt-repository ppa:mozillateam/ppa && \ RUN add-apt-repository ppa:mozillateam/ppa && \
apt-get update && \ apt-get update && \
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool && \ apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool && \
apt-get install -y --no-install-recommends firefox-esr && \ apt-get install -y --no-install-recommends firefox-esr && \
ln -s /usr/bin/firefox-esr /usr/bin/firefox && \ ln -s /usr/bin/firefox-esr /usr/bin/firefox
wget https://github.com/mozilla/geckodriver/releases/download/v0.35.0/geckodriver-v0.35.0-linux64.tar.gz && \
ARG GECKODRIVER_VERSION=0.36.0
RUN if [ $(uname -m) = "aarch64" ]; then \
GECKODRIVER_ARCH=linux-aarch64; \
else \
GECKODRIVER_ARCH=linux64; \
fi && \
wget https://github.com/mozilla/geckodriver/releases/download/v${GECKODRIVER_VERSION}/geckodriver-v${GECKODRIVER_VERSION}-${GECKODRIVER_ARCH}.tar.gz && \
tar -xvzf geckodriver* -C /usr/local/bin && \ tar -xvzf geckodriver* -C /usr/local/bin && \
chmod +x /usr/local/bin/geckodriver && \ chmod +x /usr/local/bin/geckodriver && \
rm geckodriver-v* && \ rm geckodriver-v* && \

81
poetry.lock wygenerowano
Wyświetl plik

@ -103,14 +103,14 @@ tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"]
[[package]] [[package]]
name = "authlib" name = "authlib"
version = "1.4.1" version = "1.5.0"
description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients." description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients."
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "Authlib-1.4.1-py2.py3-none-any.whl", hash = "sha256:edc29c3f6a3e72cd9e9f45fff67fc663a2c364022eb0371c003f22d5405915c1"}, {file = "Authlib-1.5.0-py2.py3-none-any.whl", hash = "sha256:b3cc5ccfc19cf87678046b6e7cb19d402d8a631a33c40e36385232203227953a"},
{file = "authlib-1.4.1.tar.gz", hash = "sha256:30ead9ea4993cdbab821dc6e01e818362f92da290c04c7f6a1940f86507a790d"}, {file = "authlib-1.5.0.tar.gz", hash = "sha256:8fd8bd8f806485a532ac39a17b579982cf54688f956174f995cc938a91725423"},
] ]
[package.dependencies] [package.dependencies]
@ -172,18 +172,18 @@ lxml = ["lxml"]
[[package]] [[package]]
name = "boto3" name = "boto3"
version = "1.36.22" version = "1.37.0"
description = "The AWS SDK for Python" description = "The AWS SDK for Python"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "boto3-1.36.22-py3-none-any.whl", hash = "sha256:39957eabdce009353d72d131046489fbbfa15891865d5f069f1e8bfa414e6b81"}, {file = "boto3-1.37.0-py3-none-any.whl", hash = "sha256:03bd8c93b226f07d944fd6b022e11a307bff94ab6a21d51675d7e3ea81ee8424"},
{file = "boto3-1.36.22.tar.gz", hash = "sha256:768c8a4d4a6227fe2258105efa086f1424cba5ca915a5eb2305b2cd979306ad1"}, {file = "boto3-1.37.0.tar.gz", hash = "sha256:01015b38017876d79efd7273f35d9a4adfba505237159621365bed21b9b65eca"},
] ]
[package.dependencies] [package.dependencies]
botocore = ">=1.36.22,<1.37.0" botocore = ">=1.37.0,<1.38.0"
jmespath = ">=0.7.1,<2.0.0" jmespath = ">=0.7.1,<2.0.0"
s3transfer = ">=0.11.0,<0.12.0" s3transfer = ">=0.11.0,<0.12.0"
@ -192,14 +192,14 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
[[package]] [[package]]
name = "botocore" name = "botocore"
version = "1.36.22" version = "1.37.0"
description = "Low-level, data-driven core of boto 3." description = "Low-level, data-driven core of boto 3."
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "botocore-1.36.22-py3-none-any.whl", hash = "sha256:75d6b34acb0686ee4d54ff6eb285e78ccfe318407428769d1e3e13351714d890"}, {file = "botocore-1.37.0-py3-none-any.whl", hash = "sha256:d01661f38c0edac87424344cdf4169f3ab9bc1bf1b677c8b230d025eb66c54a3"},
{file = "botocore-1.36.22.tar.gz", hash = "sha256:59520247d5a479731724f97c995d5a1c2aae3b303b324f39d99efcfad1d3019e"}, {file = "botocore-1.37.0.tar.gz", hash = "sha256:b129d091a8360b4152ab65327186bf4e250de827c4a9b7ddf40a72b1acf1f3c1"},
] ]
[package.dependencies] [package.dependencies]
@ -363,14 +363,14 @@ beautifulsoup4 = "*"
[[package]] [[package]]
name = "cachetools" name = "cachetools"
version = "5.5.1" version = "5.5.2"
description = "Extensible memoizing collections and decorators" description = "Extensible memoizing collections and decorators"
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.7"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "cachetools-5.5.1-py3-none-any.whl", hash = "sha256:b76651fdc3b24ead3c648bbdeeb940c1b04d365b38b4af66788f9ec4a81d42bb"}, {file = "cachetools-5.5.2-py3-none-any.whl", hash = "sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a"},
{file = "cachetools-5.5.1.tar.gz", hash = "sha256:70f238fbba50383ef62e55c6aff6d9673175fe59f7c6782c7a0b9e38f4a9df95"}, {file = "cachetools-5.5.2.tar.gz", hash = "sha256:1a661caa9175d26759571b2e19580f9d6393969e5dfca11fdb1f947a23e640d4"},
] ]
[[package]] [[package]]
@ -860,14 +860,14 @@ tool = ["click (>=6.0.0)"]
[[package]] [[package]]
name = "googleapis-common-protos" name = "googleapis-common-protos"
version = "1.67.0" version = "1.68.0"
description = "Common protobufs used in Google APIs" description = "Common protobufs used in Google APIs"
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.7"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "googleapis_common_protos-1.67.0-py2.py3-none-any.whl", hash = "sha256:579de760800d13616f51cf8be00c876f00a9f146d3e6510e19d1f4111758b741"}, {file = "googleapis_common_protos-1.68.0-py2.py3-none-any.whl", hash = "sha256:aaf179b2f81df26dfadac95def3b16a95064c76a5f45f07e4c68a21bb371c4ac"},
{file = "googleapis_common_protos-1.67.0.tar.gz", hash = "sha256:21398025365f138be356d5923e9168737d94d46a72aefee4a6110a1f23463c86"}, {file = "googleapis_common_protos-1.68.0.tar.gz", hash = "sha256:95d38161f4f9af0d9423eed8fb7b64ffd2568c3464eb542ff02c5bfa1953ab3c"},
] ]
[package.dependencies] [package.dependencies]
@ -1674,14 +1674,14 @@ files = [
[[package]] [[package]]
name = "pydata-sphinx-theme" name = "pydata-sphinx-theme"
version = "0.16.1" version = "0.15.4"
description = "Bootstrap-based Sphinx theme from the PyData community" description = "Bootstrap-based Sphinx theme from the PyData community"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["docs"] groups = ["docs"]
files = [ files = [
{file = "pydata_sphinx_theme-0.16.1-py3-none-any.whl", hash = "sha256:225331e8ac4b32682c18fcac5a57a6f717c4e632cea5dd0e247b55155faeccde"}, {file = "pydata_sphinx_theme-0.15.4-py3-none-any.whl", hash = "sha256:2136ad0e9500d0949f96167e63f3e298620040aea8f9c74621959eda5d4cf8e6"},
{file = "pydata_sphinx_theme-0.16.1.tar.gz", hash = "sha256:a08b7f0b7f70387219dc659bff0893a7554d5eb39b59d3b8ef37b8401b7642d7"}, {file = "pydata_sphinx_theme-0.15.4.tar.gz", hash = "sha256:7762ec0ac59df3acecf49fd2f889e1b4565dbce8b88b2e29ee06fdd90645a06d"},
] ]
[package.dependencies] [package.dependencies]
@ -1689,8 +1689,9 @@ accessible-pygments = "*"
Babel = "*" Babel = "*"
beautifulsoup4 = "*" beautifulsoup4 = "*"
docutils = "!=0.17.0" docutils = "!=0.17.0"
packaging = "*"
pygments = ">=2.7" pygments = ">=2.7"
sphinx = ">=6.1" sphinx = ">=5"
typing-extensions = "*" typing-extensions = "*"
[package.extras] [package.extras]
@ -2265,14 +2266,14 @@ crt = ["botocore[crt] (>=1.36.0,<2.0a.0)"]
[[package]] [[package]]
name = "selenium" name = "selenium"
version = "4.28.1" version = "4.29.0"
description = "Official Python bindings for Selenium WebDriver" description = "Official Python bindings for Selenium WebDriver"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "selenium-4.28.1-py3-none-any.whl", hash = "sha256:4238847e45e24e4472cfcf3554427512c7aab9443396435b1623ef406fff1cc1"}, {file = "selenium-4.29.0-py3-none-any.whl", hash = "sha256:ce5d26f1ddc1111641113653af33694c13947dd36c2df09cdd33f554351d372e"},
{file = "selenium-4.28.1.tar.gz", hash = "sha256:0072d08670d7ec32db901bd0107695a330cecac9f196e3afb3fa8163026e022a"}, {file = "selenium-4.29.0.tar.gz", hash = "sha256:3a62f7ec33e669364a6c0562a701deb69745b569c50d55f1a912bf8eb33358ba"},
] ]
[package.dependencies] [package.dependencies]
@ -2425,19 +2426,19 @@ test = ["httpx", "pytest (>=6)"]
[[package]] [[package]]
name = "sphinx-book-theme" name = "sphinx-book-theme"
version = "1.1.3" version = "1.1.4"
description = "A clean book theme for scientific explanations and documentation with Sphinx" description = "A clean book theme for scientific explanations and documentation with Sphinx"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["docs"] groups = ["docs"]
files = [ files = [
{file = "sphinx_book_theme-1.1.3-py3-none-any.whl", hash = "sha256:a554a9a7ac3881979a87a2b10f633aa2a5706e72218a10f71be38b3c9e831ae9"}, {file = "sphinx_book_theme-1.1.4-py3-none-any.whl", hash = "sha256:843b3f5c8684640f4a2d01abd298beb66452d1b2394cd9ef5be5ebd5640ea0e1"},
{file = "sphinx_book_theme-1.1.3.tar.gz", hash = "sha256:1f25483b1846cb3d353a6bc61b3b45b031f4acf845665d7da90e01ae0aef5b4d"}, {file = "sphinx_book_theme-1.1.4.tar.gz", hash = "sha256:73efe28af871d0a89bd05856d300e61edce0d5b2fbb7984e84454be0fedfe9ed"},
] ]
[package.dependencies] [package.dependencies]
pydata-sphinx-theme = ">=0.15.2" pydata-sphinx-theme = "0.15.4"
sphinx = ">=5" sphinx = ">=6.1"
[package.extras] [package.extras]
code-style = ["pre-commit"] code-style = ["pre-commit"]
@ -2584,14 +2585,14 @@ test = ["pytest"]
[[package]] [[package]]
name = "starlette" name = "starlette"
version = "0.45.3" version = "0.46.0"
description = "The little ASGI library that shines." description = "The little ASGI library that shines."
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["docs"] groups = ["docs"]
files = [ files = [
{file = "starlette-0.45.3-py3-none-any.whl", hash = "sha256:dfb6d332576f136ec740296c7e8bb8c8a7125044e7c6da30744718880cdd059d"}, {file = "starlette-0.46.0-py3-none-any.whl", hash = "sha256:913f0798bd90ba90a9156383bcf1350a17d6259451d0d8ee27fc0cf2db609038"},
{file = "starlette-0.45.3.tar.gz", hash = "sha256:2cbcba2a75806f8a41c722141486f37c28e30a0921c5f6fe4346cb0dcee1302f"}, {file = "starlette-0.46.0.tar.gz", hash = "sha256:b359e4567456b28d473d0193f34c0de0ed49710d75ef183a74a5ce0499324f50"},
] ]
[package.dependencies] [package.dependencies]
@ -2602,14 +2603,14 @@ full = ["httpx (>=0.27.0,<0.29.0)", "itsdangerous", "jinja2", "python-multipart
[[package]] [[package]]
name = "telethon" name = "telethon"
version = "1.38.1" version = "1.39.0"
description = "Full-featured Telegram client library for Python 3" description = "Full-featured Telegram client library for Python 3"
optional = false optional = false
python-versions = ">=3.5" python-versions = ">=3.5"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "Telethon-1.38.1-py3-none-any.whl", hash = "sha256:30c187017501bfb982b8af5659f864dda4108f77ea49cfce61e8f6fdb8a18d6e"}, {file = "Telethon-1.39.0-py3-none-any.whl", hash = "sha256:aa9f394b94be144799a6f6a93ab463867bc7c63503ede9631751940a98f6c703"},
{file = "Telethon-1.38.1.tar.gz", hash = "sha256:f9866c1e37197a0894e0c02aa56a6359bffb14a585e88e18e3e819df4fda399a"}, {file = "telethon-1.39.0.tar.gz", hash = "sha256:35d4795d8c91deac515fb0bcb3723866b924de1c724e1d5c230460e96f284a63"},
] ]
[package.dependencies] [package.dependencies]
@ -2719,14 +2720,14 @@ sortedcontainers = "*"
[[package]] [[package]]
name = "trio-websocket" name = "trio-websocket"
version = "0.12.1" version = "0.12.2"
description = "WebSocket library for Trio" description = "WebSocket library for Trio"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "trio_websocket-0.12.1-py3-none-any.whl", hash = "sha256:608ec746bb287e5d5a66baf483e41194193c5cf05ffaad6240e7d1fcd80d1e6f"}, {file = "trio_websocket-0.12.2-py3-none-any.whl", hash = "sha256:df605665f1db533f4a386c94525870851096a223adcb97f72a07e8b4beba45b6"},
{file = "trio_websocket-0.12.1.tar.gz", hash = "sha256:d55ccd4d3eae27c494f3fdae14823317839bdcb8214d1173eacc4d42c69fc91b"}, {file = "trio_websocket-0.12.2.tar.gz", hash = "sha256:22c72c436f3d1e264d0910a3951934798dcc5b00ae56fc4ee079d46c7cf20fae"},
] ]
[package.dependencies] [package.dependencies]
@ -3161,14 +3162,14 @@ h11 = ">=0.9.0,<1"
[[package]] [[package]]
name = "yt-dlp" name = "yt-dlp"
version = "2025.1.26" version = "2025.2.19"
description = "A feature-rich command-line audio/video downloader" description = "A feature-rich command-line audio/video downloader"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "yt_dlp-2025.1.26-py3-none-any.whl", hash = "sha256:3e76bd896b9f96601021ca192ca0fbdd195e3c3dcc28302a3a34c9bc4979da7b"}, {file = "yt_dlp-2025.2.19-py3-none-any.whl", hash = "sha256:3ed218eaeece55e9d715afd41abc450dc406ee63bf79355169dfde312d38fdb8"},
{file = "yt_dlp-2025.1.26.tar.gz", hash = "sha256:1c9738266921ad43c568ad01ac3362fb7c7af549276fbec92bd72f140da16240"}, {file = "yt_dlp-2025.2.19.tar.gz", hash = "sha256:f33ca76df2e4db31880f2fe408d44f5058d9f135015b13e50610dfbe78245bea"},
] ]
[package.extras] [package.extras]

Wyświetl plik

@ -50,7 +50,6 @@ class BaseModule(ABC):
def config_setup(self, config: dict): def config_setup(self, config: dict):
authentication = config.get('authentication', {})
# this is important. Each instance is given its own deepcopied config, so modules cannot # this is important. Each instance is given its own deepcopied config, so modules cannot
# change values to affect other modules # change values to affect other modules
config = deepcopy(config) config = deepcopy(config)
@ -106,8 +105,8 @@ class BaseModule(ABC):
for key in self.authentication.keys(): for key in self.authentication.keys():
if key in site or site in key: if key in site or site in key:
logger.debug(f"Could not find exact authentication information for site '{site}'. \ logger.debug(f"Could not find exact authentication information for site '{site}'. \
did find information for '{key}' which is close, is this what you meant? \ did find information for '{key}' which is close, is this what you meant? \
If so, edit your authentication settings to make sure it exactly matches.") If so, edit your authentication settings to make sure it exactly matches.")
def get_ytdlp_cookiejar(args): def get_ytdlp_cookiejar(args):
import yt_dlp import yt_dlp
@ -117,7 +116,7 @@ class BaseModule(ABC):
# collections.namedtuple('ParsedOptions', ('parser', 'options', 'urls', 'ydl_opts')) # collections.namedtuple('ParsedOptions', ('parser', 'options', 'urls', 'ydl_opts'))
ytdlp_opts = getattr(parse_options(args), 'ydl_opts') ytdlp_opts = getattr(parse_options(args), 'ydl_opts')
return yt_dlp.YoutubeDL(ytdlp_opts).cookiejar return yt_dlp.YoutubeDL(ytdlp_opts).cookiejar
get_cookiejar_options = None get_cookiejar_options = None
# order of priority: # order of priority:

Wyświetl plik

@ -14,7 +14,7 @@ DEFAULT_MANIFEST = {
'name': '', # the display name of the module 'name': '', # the display name of the module
'author': 'Bellingcat', # creator of the module, leave this as Bellingcat or set your own name! 'author': 'Bellingcat', # creator of the module, leave this as Bellingcat or set your own name!
'type': [], # the type of the module, can be one or more of MODULE_TYPES 'type': [], # the type of the module, can be one or more of MODULE_TYPES
'requires_setup': True, # whether or not this module requires additional setup such as setting API Keys or installing additional softare 'requires_setup': True, # whether or not this module requires additional setup such as setting API Keys or installing additional software
'description': '', # a description of the module 'description': '', # a description of the module
'dependencies': {}, # external dependencies, e.g. python packages or binaries, in dictionary format 'dependencies': {}, # external dependencies, e.g. python packages or binaries, in dictionary format
'entry_point': '', # the entry point for the module, in the format 'module_name::ClassName'. This can be left blank to use the default entry point of module_name::ModuleName 'entry_point': '', # the entry point for the module, in the format 'module_name::ClassName'. This can be left blank to use the default entry point of module_name::ModuleName

Wyświetl plik

@ -15,6 +15,7 @@ from copy import copy
from rich_argparse import RichHelpFormatter from rich_argparse import RichHelpFormatter
from loguru import logger from loguru import logger
import requests
from .metadata import Metadata, Media from .metadata import Metadata, Media
from auto_archiver.version import __version__ from auto_archiver.version import __version__
@ -335,7 +336,23 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
yaml_config = self.load_config(basic_config.config_file) yaml_config = self.load_config(basic_config.config_file)
return self.setup_complete_parser(basic_config, yaml_config, unused_args) return self.setup_complete_parser(basic_config, yaml_config, unused_args)
def check_for_updates(self):
response = requests.get("https://pypi.org/pypi/auto-archiver/json").json()
latest_version = response['info']['version']
# check version compared to current version
if latest_version != __version__:
if os.environ.get('RUNNING_IN_DOCKER'):
update_cmd = "`docker pull bellingcat/auto-archiver:latest`"
else:
update_cmd = "`pip install --upgrade auto-archiver`"
logger.warning("")
logger.warning("********* IMPORTANT: UPDATE AVAILABLE ********")
logger.warning(f"A new version of auto-archiver is available (v{latest_version}, you have {__version__})")
logger.warning(f"Make sure to update to the latest version using: {update_cmd}")
logger.warning("")
def setup(self, args: list): def setup(self, args: list):
""" """
Function to configure all setup of the orchestrator: setup configs and load modules. Function to configure all setup of the orchestrator: setup configs and load modules.
@ -343,6 +360,8 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
This method should only ever be called once This method should only ever be called once
""" """
self.check_for_updates()
if self.setup_finished: if self.setup_finished:
logger.warning("The `setup_config()` function should only ever be run once. \ logger.warning("The `setup_config()` function should only ever be run once. \
If you need to re-run the setup, please re-instantiate a new instance of the orchestrator. \ If you need to re-run the setup, please re-instantiate a new instance of the orchestrator. \

Wyświetl plik

@ -10,7 +10,7 @@ class ConsoleDb(Database):
""" """
def started(self, item: Metadata) -> None: def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}") logger.info(f"STARTED {item}")
def failed(self, item: Metadata, reason:str) -> None: def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}: {reason}") logger.error(f"FAILED {item}: {reason}")

Wyświetl plik

@ -28,6 +28,13 @@ the broader archiving framework.
metadata objects. Some dropins are included in this generic_archiver by default, but metadata objects. Some dropins are included in this generic_archiver by default, but
custom dropins can be created to handle additional websites and passed to the archiver custom dropins can be created to handle additional websites and passed to the archiver
via the command line using the `--dropins` option (TODO!). via the command line using the `--dropins` option (TODO!).
### Auto-Updates
The Generic Extractor will also automatically check for updates to `yt-dlp` (every 5 days by default).
This can be configured using the `ytdlp_update_interval` setting (or disabled by setting it to -1).
If you are having issues with the extractor, you can review the version of `yt-dlp` being used with `yt-dlp --version`.
""", """,
"configs": { "configs": {
"subtitles": {"default": True, "help": "download subtitles if available", "type": "bool"}, "subtitles": {"default": True, "help": "download subtitles if available", "type": "bool"},
@ -64,5 +71,10 @@ via the command line using the `--dropins` option (TODO!).
"default": "inf", "default": "inf",
"help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit.", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit.",
}, },
"ytdlp_update_interval": {
"default": 5,
"help": "How often to check for yt-dlp updates (days). If positive, will check and update yt-dlp every [num] days. Set it to -1 to disable, or 0 to always update on every run.",
"type": "int",
},
}, },
} }

Wyświetl plik

@ -1,7 +1,11 @@
import datetime, os, yt_dlp, pysubs2 import datetime, os
import importlib import importlib
import subprocess
from typing import Generator, Type from typing import Generator, Type
import yt_dlp
from yt_dlp.extractor.common import InfoExtractor from yt_dlp.extractor.common import InfoExtractor
import pysubs2
from loguru import logger from loguru import logger
@ -11,6 +15,44 @@ from auto_archiver.core import Metadata, Media
class GenericExtractor(Extractor): class GenericExtractor(Extractor):
_dropins = {} _dropins = {}
def setup(self):
# check for file .ytdlp-update in the secrets folder
if self.ytdlp_update_interval < 0:
return
use_secrets = os.path.exists('secrets')
path = os.path.join('secrets' if use_secrets else '', '.ytdlp-update')
next_update_check = None
if os.path.exists(path):
with open(path, "r") as f:
next_update_check = datetime.datetime.fromisoformat(f.read())
if not next_update_check or next_update_check < datetime.datetime.now():
self.update_ytdlp()
next_update_check = datetime.datetime.now() + datetime.timedelta(days=self.ytdlp_update_interval)
with open(path, "w") as f:
f.write(next_update_check.isoformat())
def update_ytdlp(self):
logger.info("Checking and updating yt-dlp...")
logger.info(f"Tip: change the 'ytdlp_update_interval' setting to control how often yt-dlp is updated. Set to -1 to disable or 0 to enable on every run. Current setting: {self.ytdlp_update_interval}")
from importlib.metadata import version as get_version
old_version = get_version("yt-dlp")
try:
# try and update with pip (this works inside poetry environment and in a normal virtualenv)
result = subprocess.run(["pip", "install", "--upgrade", "yt-dlp"], check=True, capture_output=True)
if "Successfully installed yt-dlp" in result.stdout.decode():
new_version = importlib.metadata.version("yt-dlp")
logger.info(f"yt-dlp successfully (from {old_version} to {new_version})")
importlib.reload(yt_dlp)
else:
logger.info("yt-dlp already up to date")
except Exception as e:
logger.error(f"Error updating yt-dlp: {e}")
def suitable_extractors(self, url: str) -> Generator[str, None, None]: def suitable_extractors(self, url: str) -> Generator[str, None, None]:
""" """
Returns a list of valid extractors for the given URL""" Returns a list of valid extractors for the given URL"""
@ -86,7 +128,7 @@ class GenericExtractor(Extractor):
# keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist # keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist
result.set_title(video_data.pop('title', video_data.pop('fulltitle', ""))) result.set_title(video_data.pop('title', video_data.pop('fulltitle', "")))
result.set_url(url) result.set_url(url)
if "description" in video_data: result.set_content(video_data["description"])
# extract comments if enabled # extract comments if enabled
if self.comments: if self.comments:
result.set("comments", [{ result.set("comments", [{

Wyświetl plik

@ -64,7 +64,7 @@ class GsheetsFeeder(Feeder):
yield m yield m
def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata: def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata:
# TODO: Check folder value not being recognised
m.set_context("gsheet", {"row": row, "worksheet": gw}) m.set_context("gsheet", {"row": row, "worksheet": gw})
if gw.get_cell_or_default(row, 'folder', "") is None: if gw.get_cell_or_default(row, 'folder', "") is None:

Wyświetl plik

@ -17,6 +17,7 @@ class GWorksheet:
'thumbnail': 'thumbnail', 'thumbnail': 'thumbnail',
'timestamp': 'upload timestamp', 'timestamp': 'upload timestamp',
'title': 'upload title', 'title': 'upload title',
'text': 'text content',
'screenshot': 'screenshot', 'screenshot': 'screenshot',
'hash': 'hash', 'hash': 'hash',
'pdq_hash': 'perceptual hashes', 'pdq_hash': 'perceptual hashes',

Wyświetl plik

@ -10,25 +10,30 @@
"requires_setup": True, "requires_setup": True,
"configs": { "configs": {
"username": {"required": True, "username": {"required": True,
"help": "a valid Instagram username"}, "help": "A valid Instagram username."},
"password": { "password": {
"required": True, "required": True,
"help": "the corresponding Instagram account password", "help": "The corresponding Instagram account password.",
}, },
"download_folder": { "download_folder": {
"default": "instaloader", "default": "instaloader",
"help": "name of a folder to temporarily download content to", "help": "Name of a folder to temporarily download content to.",
}, },
"session_file": { "session_file": {
"default": "secrets/instaloader.session", "default": "secrets/instaloader.session",
"help": "path to the instagram session which saves session credentials", "help": "Path to the instagram session file which saves session credentials. If one doesn't exist this gives the path to store a new one.",
}, },
# TODO: fine-grain # TODO: fine-grain
# "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"}, # "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"},
}, },
"description": """ "description": """
Uses the [Instaloader library](https://instaloader.github.io/as-module.html) to download content from Instagram. This class handles both individual posts Uses the [Instaloader library](https://instaloader.github.io/as-module.html) to download content from Instagram.
and user profiles, downloading as much information as possible, including images, videos, text, stories,
> **Warning**
> This module is not actively maintained due to known issues with blocking.
> Prioritise usage of the [Instagram Tbot Extractor](./instagram_tbot_extractor.md) and [Instagram API Extractor](./instagram_api_extractor.md)
This class handles both individual posts and user profiles, downloading as much information as possible, including images, videos, text, stories,
highlights, and tagged posts. highlights, and tagged posts.
Authentication is required via username/password or a session file. Authentication is required via username/password or a session file.

Wyświetl plik

@ -3,7 +3,7 @@
highlights, and tagged posts. Authentication is required via username/password or a session file. highlights, and tagged posts. Authentication is required via username/password or a session file.
""" """
import re, os, shutil, traceback import re, os, shutil
import instaloader import instaloader
from loguru import logger from loguru import logger
@ -15,10 +15,9 @@ class InstagramExtractor(Extractor):
""" """
Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, ...) Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, ...)
""" """
# NB: post regex should be tested before profile # NB: post regex should be tested before profile
valid_url = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/") valid_url = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/")
# https://regex101.com/r/MGPquX/1 # https://regex101.com/r/MGPquX/1
post_pattern = re.compile(r"{valid_url}(?:p|reel)\/(\w+)".format(valid_url=valid_url)) post_pattern = re.compile(r"{valid_url}(?:p|reel)\/(\w+)".format(valid_url=valid_url))
# https://regex101.com/r/6Wbsxa/1 # https://regex101.com/r/6Wbsxa/1
@ -28,19 +27,22 @@ class InstagramExtractor(Extractor):
def setup(self) -> None: def setup(self) -> None:
self.insta = instaloader.Instaloader( self.insta = instaloader.Instaloader(
download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.download_folder, filename_pattern="{date_utc}_UTC_{target}__{typename}" download_geotags=True,
download_comments=True,
compress_json=False,
dirname_pattern=self.download_folder,
filename_pattern="{date_utc}_UTC_{target}__{typename}"
) )
try: try:
self.insta.load_session_from_file(self.username, self.session_file) self.insta.load_session_from_file(self.username, self.session_file)
except Exception as e: except Exception as e:
logger.error(f"Unable to login from session file: {e}\n{traceback.format_exc()}")
try: try:
self.insta.login(self.username, config.instagram_self.password) logger.debug(f"Session file failed", exc_info=True)
# TODO: wait for this issue to be fixed https://github.com/instaloader/instaloader/issues/1758 logger.info("No valid session file found - Attempting login with use and password.")
self.insta.login(self.username, self.password)
self.insta.save_session_to_file(self.session_file) self.insta.save_session_to_file(self.session_file)
except Exception as e2: except Exception as e:
logger.error(f"Unable to finish login (retrying from file): {e2}\n{traceback.format_exc()}") logger.error(f"Failed to setup Instagram Extractor with Instagrapi. {e}")
def download(self, item: Metadata) -> Metadata: def download(self, item: Metadata) -> Metadata:

Wyświetl plik

@ -9,7 +9,7 @@
"width": {"default": 1280, "width": {"default": 1280,
"type": "int", "type": "int",
"help": "width of the screenshots"}, "help": "width of the screenshots"},
"height": {"default": 720, "height": {"default": 1024,
"type": "int", "type": "int",
"help": "height of the screenshots"}, "help": "height of the screenshots"},
"timeout": {"default": 60, "timeout": {"default": 60,

Wyświetl plik

@ -1,9 +1,11 @@
import os import hashlib
import json import json
import os
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from dateutil.parser import parse as parse_dt
import requests import requests
import hashlib
from loguru import logger from loguru import logger
@ -68,26 +70,34 @@ def calculate_file_hash(filename: str, hash_algo = hashlib.sha256, chunksize: in
hash.update(buf) hash.update(buf)
return hash.hexdigest() return hash.hexdigest()
def get_current_datetime_iso() -> str:
return datetime.now(timezone.utc).replace(tzinfo=timezone.utc).isoformat()
def get_datetime_from_str(dt_str: str, fmt: str | None = None, dayfirst=True) -> datetime | None:
""" parse a datetime string with option of passing a specific format
def get_datetime_from_str(dt_str: str, fmt: str | None = None) -> datetime | None: Args:
# parse a datetime string with option of passing a specific format dt_str: the datetime string to parse
fmt: the python date format of the datetime string, if None, dateutil.parser.parse is used
dayfirst: Use this to signify between date formats which put the day first, vs the month first:
e.g. DD/MM/YYYY vs MM/DD/YYYY
"""
try: try:
return datetime.strptime(dt_str, fmt) if fmt else datetime.fromisoformat(dt_str) return datetime.strptime(dt_str, fmt) if fmt else parse_dt(dt_str, dayfirst=dayfirst)
except ValueError as e: except ValueError as e:
logger.error(f"Unable to parse datestring {dt_str}: {e}") logger.error(f"Unable to parse datestring {dt_str}: {e}")
return None return None
def get_timestamp(ts, utc=True, iso=True) -> str | datetime | None: def get_timestamp(ts, utc=True, iso=True, dayfirst=True) -> str | datetime | None:
# Consistent parsing of timestamps """ Consistent parsing of timestamps.
# If utc=True, the timezone is set to UTC, Args:
# if iso=True, the output is an iso string If utc=True, the timezone is set to UTC,
if iso=True, the output is an iso string
Use dayfirst to signify between date formats which put the date vs month first:
e.g. DD/MM/YYYY vs MM/DD/YYYY
"""
if not ts: return if not ts: return
try: try:
if isinstance(ts, str): ts = datetime.fromisoformat(ts) if isinstance(ts, str): ts = parse_dt(ts, dayfirst=dayfirst)
if isinstance(ts, (int, float)): ts = datetime.fromtimestamp(ts) if isinstance(ts, (int, float)): ts = datetime.fromtimestamp(ts)
if utc: ts = ts.replace(tzinfo=timezone.utc) if utc: ts = ts.replace(tzinfo=timezone.utc)
if iso: return ts.isoformat() if iso: return ts.isoformat()
@ -96,5 +106,6 @@ def get_timestamp(ts, utc=True, iso=True) -> str | datetime | None:
logger.error(f"Unable to parse timestamp {ts}: {e}") logger.error(f"Unable to parse timestamp {ts}: {e}")
return None return None
def get_current_timestamp() -> str: def get_current_timestamp() -> str:
return get_timestamp(datetime.now()) return get_timestamp(datetime.now())

Wyświetl plik

@ -1,18 +1,23 @@
""" This Webdriver class acts as a context manager for the selenium webdriver. """ """ This Webdriver class acts as a context manager for the selenium webdriver. """
from __future__ import annotations from __future__ import annotations
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.common.print_page_options import PrintOptions
from loguru import logger import os
from selenium.webdriver.common.by import By
import time import time
#import domain_for_url #import domain_for_url
from urllib.parse import urlparse, urlunparse from urllib.parse import urlparse, urlunparse
from http.cookiejar import MozillaCookieJar from http.cookiejar import MozillaCookieJar
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common import exceptions as selenium_exceptions
from selenium.webdriver.common.print_page_options import PrintOptions
from selenium.webdriver.common.by import By
from loguru import logger
class CookieSettingDriver(webdriver.Firefox): class CookieSettingDriver(webdriver.Firefox):
facebook_accept_cookies: bool facebook_accept_cookies: bool
@ -20,6 +25,10 @@ class CookieSettingDriver(webdriver.Firefox):
cookiejar: MozillaCookieJar cookiejar: MozillaCookieJar
def __init__(self, cookies, cookiejar, facebook_accept_cookies, *args, **kwargs): def __init__(self, cookies, cookiejar, facebook_accept_cookies, *args, **kwargs):
if os.environ.get('RUNNING_IN_DOCKER'):
# Selenium doesn't support linux-aarch64 driver, we need to set this manually
kwargs['service'] = webdriver.FirefoxService(executable_path='/usr/local/bin/geckodriver')
super(CookieSettingDriver, self).__init__(*args, **kwargs) super(CookieSettingDriver, self).__init__(*args, **kwargs)
self.cookies = cookies self.cookies = cookies
self.cookiejar = cookiejar self.cookiejar = cookiejar
@ -64,14 +73,29 @@ class CookieSettingDriver(webdriver.Firefox):
time.sleep(2) time.sleep(2)
except Exception as e: except Exception as e:
logger.warning(f'Failed on fb accept cookies.', e) logger.warning(f'Failed on fb accept cookies.', e)
# now get the actual URL # now get the actual URL
super(CookieSettingDriver, self).get(url) super(CookieSettingDriver, self).get(url)
if self.facebook_accept_cookies: if self.facebook_accept_cookies:
# try and click the 'close' button on the 'login' window to close it # try and click the 'close' button on the 'login' window to close it
close_button = self.find_element(By.XPATH, "//div[@role='dialog']//div[@aria-label='Close']") try:
if close_button: xpath = "//div[@role='dialog']//div[@aria-label='Close']"
close_button.click() WebDriverWait(self, 5).until(EC.element_to_be_clickable((By.XPATH, xpath))).click()
except selenium_exceptions.NoSuchElementException:
logger.warning("Unable to find the 'close' button on the facebook login window")
pass
else:
# for all other sites, try and use some common button text to reject/accept cookies
for text in ["Refuse non-essential cookies", "Decline optional cookies", "Reject additional cookies", "Accept all cookies"]:
try:
xpath = f"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text.lower()}')]"
WebDriverWait(self, 5).until(EC.element_to_be_clickable((By.XPATH, xpath))).click()
break
except selenium_exceptions.WebDriverException:
pass
class Webdriver: class Webdriver:
@ -90,7 +114,6 @@ class Webdriver:
setattr(self.print_options, k, v) setattr(self.print_options, k, v)
def __enter__(self) -> webdriver: def __enter__(self) -> webdriver:
options = webdriver.FirefoxOptions() options = webdriver.FirefoxOptions()
options.add_argument("--headless") options.add_argument("--headless")
options.add_argument(f'--proxy-server={self.http_proxy}') options.add_argument(f'--proxy-server={self.http_proxy}')
@ -105,7 +128,7 @@ class Webdriver:
self.driver.set_window_size(self.width, self.height) self.driver.set_window_size(self.width, self.height)
self.driver.set_page_load_timeout(self.timeout_seconds) self.driver.set_page_load_timeout(self.timeout_seconds)
self.driver.print_options = self.print_options self.driver.print_options = self.print_options
except TimeoutException as e: except selenium_exceptions.TimeoutException as e:
logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}") logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}")
return self.driver return self.driver

Wyświetl plik

@ -32,9 +32,8 @@ def mock_metadata(mocker):
@pytest.fixture @pytest.fixture
def metadata(): def metadata():
metadata = Metadata() metadata = Metadata()
metadata.add_media(Media(filename="screenshot", urls=["http://example.com/screenshot.png"])) metadata.add_media(Media(filename="screenshot.png", urls=["http://example.com/screenshot.png"]).set("id", "screenshot"))
metadata.add_media(Media(filename="browsertrix", urls=["http://example.com/browsertrix.wacz"])) metadata.add_media(Media(filename="browsertrix", urls=["http://example.com/browsertrix.wacz"]).set("id", "browsertrix"))
metadata.add_media(Media(filename="thumbnail", urls=["http://example.com/thumbnail.png"]))
metadata.set_url("http://example.com") metadata.set_url("http://example.com")
metadata.set_title("Example Title") metadata.set_title("Example Title")
metadata.set_content("Example Content") metadata.set_content("Example Content")
@ -53,7 +52,7 @@ def mock_media(mocker):
return mock_media return mock_media
@pytest.fixture @pytest.fixture
def gsheets_db(mock_gworksheet, setup_module, mocker): def gsheets_db(mock_gworksheet, setup_module, mocker) -> GsheetsDb:
db = setup_module("gsheet_db", { db = setup_module("gsheet_db", {
"allow_worksheets": "set()", "allow_worksheets": "set()",
"block_worksheets": "set()", "block_worksheets": "set()",
@ -80,10 +79,10 @@ def expected_calls(mock_media, fixed_timestamp):
(1, 'text', 'Example Content'), (1, 'text', 'Example Content'),
(1, 'timestamp', '2025-01-01T00:00:00+00:00'), (1, 'timestamp', '2025-01-01T00:00:00+00:00'),
(1, 'hash', 'not-calculated'), (1, 'hash', 'not-calculated'),
# (1, 'screenshot', 'http://example.com/screenshot.png'), (1, 'screenshot', 'http://example.com/screenshot.png'),
# (1, 'thumbnail', '=IMAGE("http://example.com/thumbnail.png")'), (1, 'thumbnail', '=IMAGE("http://example.com/screenshot.png")'),
# (1, 'wacz', 'http://example.com/browsertrix.wacz'), (1, 'wacz', 'http://example.com/browsertrix.wacz'),
# (1, 'replaywebpage', 'https://replayweb.page/?source=http%3A%2F%2Fexample.com%2Fbrowsertrix.wacz#view=pages&url=') (1, 'replaywebpage', 'https://replayweb.page/?source=http%3A//example.com/browsertrix.wacz#view=pages&url=http%3A//example.com')
] ]
def test_retrieve_gsheet(gsheets_db, metadata, mock_gworksheet): def test_retrieve_gsheet(gsheets_db, metadata, mock_gworksheet):

Wyświetl plik

@ -20,17 +20,15 @@ def metadata_with_images():
def test_successful_enrich(metadata_with_images, mocker): def test_successful_enrich(metadata_with_images, mocker):
with ( mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100))
mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100)), mocker.patch("PIL.Image.open")
mocker.patch("PIL.Image.open"), mocker.patch.object(Media, "is_image", return_value=True)
mocker.patch.object(Media, "is_image", return_value=True) as mock_is_image, enricher = PdqHashEnricher()
): enricher.enrich(metadata_with_images)
enricher = PdqHashEnricher()
enricher.enrich(metadata_with_images)
# Ensure the hash is set for image media # Ensure the hash is set for image media
for media in metadata_with_images.media: for media in metadata_with_images.media:
assert media.get("pdq_hash") is not None assert media.get("pdq_hash") is not None
def test_enrich_skip_non_image(metadata_with_images, mocker): def test_enrich_skip_non_image(metadata_with_images, mocker):

Wyświetl plik

@ -16,7 +16,7 @@ def mock_is_auth_wall(mocker):
def mock_post_success(mocker): def mock_post_success(mocker):
"""Fixture to mock POST requests with a successful response.""" """Fixture to mock POST requests with a successful response."""
def _mock_post(json_data: dict = None, status_code: int = 200): def _mock_post(json_data: dict = None, status_code: int = 200):
json_data = json_data or {"job_id": "job123"} json_data = {"job_id": "job123"} if json_data is None else json_data
resp = mocker.Mock(status_code=status_code) resp = mocker.Mock(status_code=status_code)
resp.json.return_value = json_data resp.json.return_value = json_data
return mocker.patch("requests.post", return_value=resp) return mocker.patch("requests.post", return_value=resp)

Wyświetl plik

@ -1,21 +1,36 @@
import pytest import pytest
from auto_archiver.modules.instagram_extractor import InstagramExtractor from auto_archiver.modules.instagram_extractor import InstagramExtractor
from .test_extractor_base import TestExtractorBase
class TestInstagramExtractor(TestExtractorBase):
@pytest.fixture
def instagram_extractor(setup_module, mocker):
extractor_module: str = 'instagram_extractor' extractor_module: str = 'instagram_extractor'
config: dict = {} config: dict = {
"username": "user_name",
"password": "password123",
"download_folder": "instaloader",
"session_file": "secrets/instaloader.session",
}
fake_loader = mocker.MagicMock()
fake_loader.load_session_from_file.return_value = None
fake_loader.login.return_value = None
fake_loader.save_session_to_file.return_value = None
mocker.patch("instaloader.Instaloader", return_value=fake_loader,)
return setup_module(extractor_module, config)
@pytest.mark.parametrize("url", [
"https://www.instagram.com/p/", @pytest.mark.parametrize("url", [
"https://www.instagram.com/p/1234567890/", "https://www.instagram.com/p/",
"https://www.instagram.com/reel/1234567890/", "https://www.instagram.com/p/1234567890/",
"https://www.instagram.com/username/", "https://www.instagram.com/reel/1234567890/",
"https://www.instagram.com/username/stories/", "https://www.instagram.com/username/",
"https://www.instagram.com/username/highlights/", "https://www.instagram.com/username/stories/",
]) "https://www.instagram.com/username/highlights/",
def test_regex_matches(self, url): ])
# post def test_regex_matches(url: str, instagram_extractor: InstagramExtractor) -> None:
assert InstagramExtractor.valid_url.match(url) """
Ensure that the valid_url regex matches all provided Instagram URLs.
"""
assert instagram_extractor.valid_url.match(url)

Wyświetl plik

@ -0,0 +1,76 @@
import pytest
from auto_archiver.core import Metadata
from auto_archiver.modules.vk_extractor import VkExtractor
@pytest.fixture
def mock_vk_scraper(mocker):
"""Fixture to mock VkScraper."""
return mocker.patch("auto_archiver.modules.vk_extractor.vk_extractor.VkScraper")
@pytest.fixture
def vk_extractor(setup_module, mock_vk_scraper) -> VkExtractor:
"""Fixture to initialize VkExtractor with mocked VkScraper."""
extractor_module = "vk_extractor"
configs = {
"username": "name",
"password": "password123",
"session_file": "secrets/vk_config.v2.json",
}
vk = setup_module(extractor_module, configs)
vk.vks = mock_vk_scraper.return_value
return vk
def test_netloc(vk_extractor, metadata):
# metadata url set as: "https://example.com/"
assert vk_extractor.download(metadata) is False
def test_vk_url_but_scrape_returns_empty(vk_extractor, metadata):
metadata.set_url("https://vk.com/valid-wall")
vk_extractor.vks.scrape.return_value = []
assert vk_extractor.download(metadata) is False
assert metadata.netloc == "vk.com"
vk_extractor.vks.scrape.assert_called_once_with(metadata.get_url())
def test_successful_scrape_and_download(vk_extractor, metadata, mocker):
mock_scrapes = [
{"text": "Post Title", "datetime": "2023-01-01T00:00:00", "id": 1},
{"text": "Another Post", "datetime": "2023-01-02T00:00:00", "id": 2}
]
mock_filenames = ["image1.jpg", "image2.png"]
vk_extractor.vks.scrape.return_value = mock_scrapes
vk_extractor.vks.download_media.return_value = mock_filenames
metadata.set_url("https://vk.com/valid-wall")
result = vk_extractor.download(metadata)
# Test metadata
assert result.is_success()
assert result.status == "vk: success"
assert result.get_title() == "Post Title"
assert result.get_timestamp() == "2023-01-01T00:00:00+00:00"
assert "Another Post" in result.metadata["content"]
# Test Media objects
assert len(result.media) == 2
assert result.media[0].filename == "image1.jpg"
assert result.media[1].filename == "image2.png"
vk_extractor.vks.download_media.assert_called_once_with(
mock_scrapes, vk_extractor.tmp_dir
)
def test_adds_first_title_and_timestamp(vk_extractor):
metadata = Metadata().set_url("https://vk.com/no-metadata")
metadata.set_url("https://vk.com/no-metadata")
mock_scrapes = [{"text": "value", "datetime": "2023-01-01T00:00:00"},
{"text": "value2", "datetime": "2023-01-02T00:00:00"}]
vk_extractor.vks.scrape.return_value = mock_scrapes
vk_extractor.vks.download_media.return_value = []
result = vk_extractor.download(metadata)
assert result.get_title() == "value"
# formatted timestamp
assert result.get_timestamp() == "2023-01-01T00:00:00+00:00"
assert result.is_success()

Wyświetl plik

@ -9,11 +9,12 @@ from auto_archiver.modules.local_storage import LocalStorage
@pytest.fixture @pytest.fixture
def local_storage(setup_module) -> LocalStorage: def local_storage(setup_module, tmp_path) -> LocalStorage:
save_to = tmp_path / "local_archive"
configs: dict = { configs: dict = {
"path_generator": "flat", "path_generator": "flat",
"filename_generator": "static", "filename_generator": "static",
"save_to": "./local_archive", "save_to": str(save_to),
"save_absolute": False, "save_absolute": False,
} }
return setup_module("local_storage", configs) return setup_module("local_storage", configs)