using API instead of scraping

pull/40/head
msramalho 2022-06-15 21:25:15 +02:00
rodzic 2f02336403
commit c08b5268f7
2 zmienionych plików z 42 dodań i 47 usunięć

Wyświetl plik

@ -95,7 +95,7 @@ class Archiver(ABC):
# eg images in a tweet save to cloud storage
def generate_media_page(self, urls, url, object, requester=requests):
def generate_media_page(self, urls, url, object):
"""
For a list of media urls, fetch them, upload them
and call self.generate_media_page_html with them
@ -111,7 +111,7 @@ class Archiver(ABC):
filename = os.path.join(Storage.TMP_FOLDER, key)
d = requester.get(media_url, headers=headers)
d = requests.get(media_url, headers=headers)
with open(filename, 'wb') as f:
f.write(d.content)

Wyświetl plik

@ -1,4 +1,4 @@
import re, json
import re, json, requests
import vk_api, dateparser
from bs4 import BeautifulSoup
@ -16,6 +16,7 @@ class VkArchiver(Archiver):
"""
name = "vk"
wall_pattern = re.compile(r"(wall.{0,1}\d+_\d+)")
photo_pattern = re.compile(r"(photo.{0,1}\d+_\d+)")
onclick_pattern = re.compile(r"({.*})")
def __init__(self, storage: Storage, driver, config: VkConfig):
@ -27,55 +28,49 @@ class VkArchiver(Archiver):
def download(self, url, check_if_exists=False):
# detect URLs that this archiver can handle
has_wall = self.wall_pattern.search(url)
has_photo = self.photo_pattern.search(url)
_id, method = None, None
if has_wall:
wall_id = has_wall[0]
wall_url = f'https://vk.com/{wall_id}'
logger.info(f"found valid wall id from {url=} : {wall_id=}")
key = self.get_html_key(wall_url)
_id = has_wall[0]
method = self.archive_wall
elif has_photo:
_id = has_photo[0]
method = self.archive_photo
else: return False
# if check if exists will not download again
if check_if_exists and self.storage.exists(key):
screenshot = self.get_screenshot(wall_url)
cdn_url = self.storage.get_cdn_url(key)
return ArchiveResult(status="already archived", cdn_url=cdn_url, screenshot=screenshot)
logger.info(f"found valid {_id=} from {url=}")
proper_url = f'https://vk.com/{_id}'
return self.archive_wall(wall_url)
return False
# if check if exists will not download again
key = self.get_html_key(proper_url)
if check_if_exists and self.storage.exists(key):
screenshot = self.get_screenshot(proper_url)
cdn_url = self.storage.get_cdn_url(key)
return ArchiveResult(status="already archived", cdn_url=cdn_url, screenshot=screenshot)
def archive_wall(self, wall_url):
res = self.vk_session.http.get(wall_url).text
soup = BeautifulSoup(res, "html.parser")
image_urls = []
time = None
try:
rel_date = soup.find("a", class_="post_link").find("span", class_="rel_date")
t = rel_date.get_text()
if "time" in rel_date.attrs:
t = rel_date["time"]
elif "abs_time" in rel_date.attrs:
t = rel_date["abs_time"]
time = dateparser.parse(t, settings={"RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"})
except Exception as e:
logger.warning(f"could not fetch time from post: {e}")
return method(proper_url, _id)
post = soup.find("div", class_="wall_text")
post_text = post.find(class_="wall_post_text").get_text()
for anchor in post.find_all("a", attrs={"aria-label": "photo"}):
if img_url := self.get_image_from_anchor(anchor):
image_urls.append(img_url)
def archive_photo(self, photo_url, photo_id):
headers = {"access_token": self.vk_session.token["access_token"], "photos": photo_id.replace("photo", ""), "extended": "1", "v": self.vk_session.api_version}
req = requests.get("https://api.vk.com/method/photos.getById", headers)
res = req.json()["response"][0]
img_url = res["orig_photo"]["url"]
time = dateparser.parse(str(res["date"]), settings={"RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"})
page_cdn, page_hash, thumbnail = self.generate_media_page(image_urls, wall_url, post_text, requester=self.vk_session.http)
screenshot = self.get_screenshot(wall_url)
page_cdn, page_hash, thumbnail = self.generate_media_page([img_url], photo_url, res)
screenshot = self.get_screenshot(photo_url)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=time)
def get_image_from_anchor(self, anchor):
try:
# get anchor.onlick text, retrieve the JSON value there
# retrieve "temp"."z" which contains the image with more quality
temp_json = json.loads(self.onclick_pattern.search(anchor["onclick"])[0])["temp"]
for quality in ["z", "y", "x"]: # decreasing quality
if quality in temp_json:
return temp_json[quality]
except Exception as e:
logger.warning(f"failed to get image from vk wall anchor: {e}")
return False
def archive_wall(self, wall_url, wall_id):
headers = {"access_token": self.vk_session.token["access_token"], "posts": wall_id.replace("wall", ""), "extended": "1", "copy_history_depth": "2", "v": self.vk_session.api_version}
req = requests.get("https://api.vk.com/method/wall.getById", headers)
res = req.json()["response"]
wall = res["items"][0]
img_urls = [p[p["type"]]["sizes"][-1]["url"] for p in wall["attachments"]] if "attachments" in wall else []
title = wall["text"][0:200] # more on the page
time = dateparser.parse(str(wall["date"]), settings={"RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"})
page_cdn, page_hash, thumbnail = self.generate_media_page(img_urls, wall_url, res)
screenshot = self.get_screenshot(wall_url)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=time, title=title)