kopia lustrzana https://github.com/bellingcat/auto-archiver
Implementing ruff suggestions.
rodzic
ca44a40b88
commit
e7fa88f1c7
|
@ -108,7 +108,7 @@ ignore = []
|
|||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
# Ignore import violations in __init__.py files
|
||||
"__init__.py" = ["F401"]
|
||||
"__init__.py" = ["F401", "F403"]
|
||||
|
||||
[tool.ruff.format]
|
||||
docstring-code-format = false
|
||||
|
|
|
@ -80,7 +80,7 @@ class AuthenticationJsonParseAction(argparse.Action):
|
|||
auth_dict = auth_dict["authentication"]
|
||||
auth_dict["load_from_file"] = path
|
||||
return auth_dict
|
||||
except:
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
if isinstance(auth_dict, dict) and auth_dict.get("from_file"):
|
||||
|
|
|
@ -123,6 +123,6 @@ class Media:
|
|||
try:
|
||||
fsize = os.path.getsize(self.filename)
|
||||
return fsize > 20_000
|
||||
except:
|
||||
except Exception as e:
|
||||
pass
|
||||
return True
|
||||
|
|
|
@ -48,15 +48,16 @@ class Metadata:
|
|||
self.status = right.status
|
||||
self._context.update(right._context)
|
||||
for k, v in right.metadata.items():
|
||||
assert k not in self.metadata or type(v) == type(self.get(k))
|
||||
if type(v) not in [dict, list, set] or k not in self.metadata:
|
||||
assert k not in self.metadata or type(v) is type(self.get(k))
|
||||
if not isinstance(v, (dict, list, set)) or k not in self.metadata:
|
||||
self.set(k, v)
|
||||
else: # key conflict
|
||||
if type(v) in [dict, set]:
|
||||
if isinstance(v, (dict, set)):
|
||||
self.set(k, self.get(k) | v)
|
||||
elif type(v) == list:
|
||||
elif type(v) is list:
|
||||
self.set(k, self.get(k) + v)
|
||||
self.media.extend(right.media)
|
||||
|
||||
else: # invert and do same logic
|
||||
return right.merge(self)
|
||||
return self
|
||||
|
@ -126,28 +127,26 @@ class Metadata:
|
|||
return self.get("title")
|
||||
|
||||
def set_timestamp(self, timestamp: datetime.datetime) -> Metadata:
|
||||
if type(timestamp) == str:
|
||||
if isinstance(timestamp, str):
|
||||
timestamp = parse_dt(timestamp)
|
||||
assert type(timestamp) == datetime.datetime, "set_timestamp expects a datetime instance"
|
||||
assert isinstance(timestamp, datetime.datetime), "set_timestamp expects a datetime instance"
|
||||
return self.set("timestamp", timestamp)
|
||||
|
||||
def get_timestamp(self, utc=True, iso=True) -> datetime.datetime:
|
||||
def get_timestamp(self, utc=True, iso=True) -> datetime.datetime | str | None:
|
||||
ts = self.get("timestamp")
|
||||
if not ts:
|
||||
return
|
||||
return None
|
||||
try:
|
||||
if type(ts) == str:
|
||||
if isinstance(ts, str):
|
||||
ts = datetime.datetime.fromisoformat(ts)
|
||||
if type(ts) == float:
|
||||
elif isinstance(ts, float):
|
||||
ts = datetime.datetime.fromtimestamp(ts)
|
||||
if utc:
|
||||
ts = ts.replace(tzinfo=datetime.timezone.utc)
|
||||
if iso:
|
||||
return ts.isoformat()
|
||||
return ts
|
||||
return ts.isoformat() if iso else ts
|
||||
except Exception as e:
|
||||
logger.error(f"Unable to parse timestamp {ts}: {e}")
|
||||
return
|
||||
return None
|
||||
|
||||
def add_media(self, media: Media, id: str = None) -> Metadata:
|
||||
# adds a new media, optionally including an id
|
||||
|
|
|
@ -47,7 +47,7 @@ class ModuleFactory:
|
|||
|
||||
# see odoo/module/module.py -> initialize_sys_path
|
||||
if path not in auto_archiver.modules.__path__:
|
||||
if HAS_SETUP_PATHS == True:
|
||||
if HAS_SETUP_PATHS:
|
||||
logger.warning(
|
||||
f"You are attempting to re-initialise the module paths with: '{path}' for a 2nd time. \
|
||||
This could lead to unexpected behaviour. It is recommended to only use a single modules path. \
|
||||
|
@ -228,7 +228,7 @@ class LazyBaseModule:
|
|||
# we must now load this module and set it up with the config
|
||||
m.load(config)
|
||||
return True
|
||||
except:
|
||||
except Exception:
|
||||
logger.error(f"Unable to setup module '{dep}' for use in module '{self.name}'")
|
||||
return False
|
||||
except IndexError:
|
||||
|
|
|
@ -531,7 +531,7 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
|
|||
except Exception as e:
|
||||
logger.error(f"Got unexpected error on item {item}: {e}\n{traceback.format_exc()}")
|
||||
for d in self.databases:
|
||||
if type(e) == AssertionError:
|
||||
if isinstance(e, AssertionError):
|
||||
d.failed(item, str(e))
|
||||
else:
|
||||
d.failed(item, reason="unexpected error")
|
||||
|
|
|
@ -3,7 +3,6 @@
|
|||
"type": ["feeder"],
|
||||
"entry_point": "cli_feeder::CLIFeeder",
|
||||
"requires_setup": False,
|
||||
"description": "Feeds URLs to orchestrator from the command line",
|
||||
"configs": {
|
||||
"urls": {
|
||||
"default": None,
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
{
|
||||
"name": "CSV Feeder",
|
||||
"type": ["feeder"],
|
||||
"requires_setup": False,
|
||||
"dependencies": {"python": ["loguru"], "bin": [""]},
|
||||
"requires_setup": True,
|
||||
"entry_point": "csv_feeder::CSVFeeder",
|
||||
|
|
|
@ -12,7 +12,9 @@
|
|||
"default": None,
|
||||
"help": "the id of the sheet to archive (alternative to 'sheet' config)",
|
||||
},
|
||||
"header": {"default": 1, "type": "int", "help": "index of the header row (starts at 1)", "type": "int"},
|
||||
"header": {"default": 1,
|
||||
"help": "index of the header row (starts at 1)",
|
||||
"type": "int"},
|
||||
"service_account": {
|
||||
"default": "secrets/service_account.json",
|
||||
"help": "service account JSON file path. Learn how to create one: https://gspread.readthedocs.io/en/latest/oauth2.html",
|
||||
|
@ -51,19 +53,6 @@
|
|||
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
|
||||
"type": "bool",
|
||||
},
|
||||
"allow_worksheets": {
|
||||
"default": set(),
|
||||
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
|
||||
},
|
||||
"block_worksheets": {
|
||||
"default": set(),
|
||||
"help": "(CSV) explicitly block some worksheets from being processed",
|
||||
},
|
||||
"use_sheet_names_in_stored_paths": {
|
||||
"default": True,
|
||||
"type": "bool",
|
||||
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
|
||||
},
|
||||
},
|
||||
"description": """
|
||||
GsheetsFeederDatabase
|
||||
|
|
|
@ -68,7 +68,7 @@ class GWorksheet:
|
|||
|
||||
if fresh:
|
||||
return self.wks.cell(row, col_index + 1).value
|
||||
if type(row) == int:
|
||||
if isinstance(row, int):
|
||||
row = self.get_row(row)
|
||||
|
||||
if col_index >= len(row):
|
||||
|
@ -84,7 +84,7 @@ class GWorksheet:
|
|||
if when_empty_use_default and val.strip() == "":
|
||||
return default
|
||||
return val
|
||||
except:
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
def set_cell(self, row: int, col: str, val):
|
||||
|
|
|
@ -74,9 +74,9 @@ class InstagramAPIExtractor(Extractor):
|
|||
# repeats 3 times to remove nested empty values
|
||||
if not self.minimize_json_output:
|
||||
return d
|
||||
if type(d) == list:
|
||||
if isinstance(d, list):
|
||||
return [self.cleanup_dict(v) for v in d]
|
||||
if type(d) != dict:
|
||||
if not isinstance(d, dict):
|
||||
return d
|
||||
return {
|
||||
k: clean_v
|
||||
|
@ -220,7 +220,7 @@ class InstagramAPIExtractor(Extractor):
|
|||
post_count = 0
|
||||
while end_cursor != "":
|
||||
posts = self.call_api("v1/user/medias/chunk", {"user_id": user_id, "end_cursor": end_cursor})
|
||||
if not len(posts) or not type(posts) == list or len(posts) != 2:
|
||||
if not posts or not isinstance(posts, list) or len(posts) != 2:
|
||||
break
|
||||
posts, end_cursor = posts[0], posts[1]
|
||||
logger.info(f"parsing {len(posts)} posts, next {end_cursor=}")
|
||||
|
@ -243,7 +243,7 @@ class InstagramAPIExtractor(Extractor):
|
|||
pbar = tqdm(desc="downloading tagged posts")
|
||||
|
||||
tagged_count = 0
|
||||
while next_page_id != None:
|
||||
while next_page_id is not None:
|
||||
resp = self.call_api("v2/user/tag/medias", {"user_id": user_id, "page_id": next_page_id})
|
||||
posts = resp.get("response", {}).get("items", [])
|
||||
if not len(posts):
|
||||
|
|
|
@ -61,7 +61,7 @@ class TelegramExtractor(Extractor):
|
|||
else:
|
||||
duration = float(duration)
|
||||
m_video.set("duration", duration)
|
||||
except:
|
||||
except Exception:
|
||||
pass
|
||||
result.add_media(m_video)
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ class TwitterApiExtractor(Extractor):
|
|||
r = requests.get(url, timeout=30)
|
||||
logger.debug(f"Expanded url {url} to {r.url}")
|
||||
url = r.url
|
||||
except:
|
||||
except Exception:
|
||||
logger.error(f"Failed to expand url {url}")
|
||||
return url
|
||||
|
||||
|
|
|
@ -14,7 +14,9 @@
|
|||
"help": "browsertrix-profile (for profile generation see https://github.com/webrecorder/browsertrix-crawler#creating-and-using-browser-profiles).",
|
||||
},
|
||||
"docker_commands": {"default": None, "help": "if a custom docker invocation is needed"},
|
||||
"timeout": {"default": 120, "type": "int", "help": "timeout for WACZ generation in seconds", "type": "int"},
|
||||
"timeout": {"default": 120,
|
||||
"help": "timeout for WACZ generation in seconds",
|
||||
"type": "int"},
|
||||
"extract_media": {
|
||||
"default": False,
|
||||
"type": "bool",
|
||||
|
|
|
@ -88,7 +88,7 @@ class WhisperEnricher(Enricher):
|
|||
while not all_completed and (time.time() - start_time) <= self.timeout:
|
||||
all_completed = True
|
||||
for job_id in job_results:
|
||||
if job_results[job_id] != False:
|
||||
if job_results[job_id] is not False:
|
||||
continue
|
||||
all_completed = False # at least one not ready
|
||||
try:
|
||||
|
|
|
@ -21,7 +21,7 @@ def expand_url(url):
|
|||
r = requests.get(url)
|
||||
logger.debug(f"Expanded url {url} to {r.url}")
|
||||
return r.url
|
||||
except:
|
||||
except Exception:
|
||||
logger.error(f"Failed to expand url {url}")
|
||||
return url
|
||||
|
||||
|
@ -32,7 +32,7 @@ def getattr_or(o: object, prop: str, default=None):
|
|||
if res is None:
|
||||
raise
|
||||
return res
|
||||
except:
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue