kopia lustrzana https://gitlab.com/jaywink/federation
let socialhome process the reply fid extracted from reply collections
rodzic
6678c4feea
commit
aebc5276a9
|
@ -5,7 +5,7 @@ from federation.entities.activitypub.constants import NAMESPACE_PUBLIC
|
|||
from federation.entities.activitypub.entities import (
|
||||
ActivitypubFollow, ActivitypubProfile, ActivitypubAccept, ActivitypubPost, ActivitypubComment,
|
||||
ActivitypubRetraction, ActivitypubShare, ActivitypubImage)
|
||||
from federation.entities.activitypub.models import element_to_base_entities
|
||||
from federation.entities.activitypub.models import element_to_objects
|
||||
from federation.entities.base import Follow, Profile, Accept, Post, Comment, Retraction, Share, Image
|
||||
from federation.entities.mixins import BaseEntity
|
||||
from federation.types import UserType, ReceiverVariant
|
||||
|
@ -47,7 +47,7 @@ UNDO_MAPPINGS = {
|
|||
}
|
||||
|
||||
|
||||
def element_to_objects(payload: Dict) -> List:
|
||||
def element_to_objects_orig(payload: Dict) -> List:
|
||||
"""
|
||||
Transform an Element to a list of entities.
|
||||
"""
|
||||
|
@ -171,7 +171,7 @@ def message_to_objects(
|
|||
Takes in a message extracted by a protocol and maps it to entities.
|
||||
"""
|
||||
# We only really expect one element here for ActivityPub.
|
||||
return element_to_base_entities(message)
|
||||
return element_to_objects(message)
|
||||
|
||||
|
||||
def transform_attribute(
|
||||
|
|
|
@ -867,33 +867,31 @@ def extract_and_validate(entity):
|
|||
if hasattr(entity, "extract_mentions"):
|
||||
entity.extract_mentions()
|
||||
|
||||
# Extract reply ids
|
||||
if getattr(entity, 'replies', None):
|
||||
entity._replies = extract_reply_ids(getattr(entity.replies, 'first', []))
|
||||
|
||||
|
||||
visited = [] # to prevent infinite loops
|
||||
def process_reply_collection(replies):
|
||||
global visited
|
||||
|
||||
def extract_reply_ids(replies, visited=[]):
|
||||
objs = []
|
||||
items = getattr(replies, 'items', [])
|
||||
if items and not isinstance(items, list): items = [items]
|
||||
print('items = ', items)
|
||||
for item in items:
|
||||
if isinstance(item, Object):
|
||||
objs += element_to_base_entities(item, True)
|
||||
objs.append(item.id)
|
||||
else:
|
||||
objs += retrieve_and_parse_document(item, True)
|
||||
print('added items = ', objs)
|
||||
objs.append(item)
|
||||
if hasattr(replies, 'next_'):
|
||||
print('next = ', replies.next_)
|
||||
if replies.next_ and (replies.id != replies.next_) and (replies.next_ not in visited):
|
||||
resp = retrieve_and_parse_document(replies.next_, True)
|
||||
resp = retrieve_and_parse_document(replies.next_)
|
||||
if resp:
|
||||
visited.append(replies.next_)
|
||||
objs += process_reply_collection(resp[0])
|
||||
print('len objs = ', len(objs))
|
||||
objs += extract_reply_ids(resp, visited)
|
||||
return objs
|
||||
|
||||
|
||||
def element_to_base_entities(element: Union[Dict, Object], found_parent: bool = False) -> List:
|
||||
def element_to_objects(element: Union[Dict, Object]) -> List:
|
||||
"""
|
||||
Transform an Element to a list of entities.
|
||||
"""
|
||||
|
@ -903,7 +901,6 @@ def element_to_base_entities(element: Union[Dict, Object], found_parent: bool =
|
|||
# Skips unimplemented payloads
|
||||
# TODO: remove unused code
|
||||
entity = model_to_objects(element) if not isinstance(element, Object) else element
|
||||
print('target_id = ', getattr(entity, 'target_id', None), 'entity = ', entity)
|
||||
if entity: entity = entity.to_base()
|
||||
if entity and isinstance(entity, BaseEntity):
|
||||
logger.info('Entity type "%s" was handled through the json-ld processor', entity.__class__.__name__)
|
||||
|
@ -913,11 +910,10 @@ def element_to_base_entities(element: Union[Dict, Object], found_parent: bool =
|
|||
logger.error("Failed to validate entity %s: %s", entity, ex)
|
||||
return None
|
||||
entities.append(entity)
|
||||
if not found_parent and getattr(entity, 'target_id', None):
|
||||
entities = retrieve_and_parse_document(entity.target_id) + entities
|
||||
if getattr(entity, 'replies', None):
|
||||
print('enter process_reply_collection for ', entity.id)
|
||||
entities += process_reply_collection(getattr(entity.replies,'first', None))
|
||||
#if not found_parent and getattr(entity, 'target_id', None):
|
||||
# entities = retrieve_and_parse_document(entity.target_id) + entities
|
||||
#if getattr(entity, 'replies', None):
|
||||
# entities += process_reply_collection(getattr(entity.replies,'first', None))
|
||||
return entities
|
||||
elif entity:
|
||||
logger.info('Entity type "%s" was handled through the json-ld processor but is not a base entity', entity.__class__.__name__)
|
||||
|
|
|
@ -1,20 +0,0 @@
|
|||
import logging
|
||||
|
||||
from requests_cache import install_cache, RedisCache, SQLiteCache
|
||||
|
||||
|
||||
logger = logging.getLogger("federation")
|
||||
|
||||
# try to obtain redis config from django
|
||||
try:
|
||||
from federation.utils.django import get_configuration
|
||||
cfg = get_configuration()
|
||||
if cfg.get('redis'):
|
||||
backend = RedisCache(namespace='fed_cache', **cfg['redis'])
|
||||
else:
|
||||
backend = SQLiteCache(db_path='fed_cache')
|
||||
except ImportError:
|
||||
backend = SQLiteCache(db_path='fed_cache')
|
||||
|
||||
install_cache(backend=backend)
|
||||
logger.info(f'requests_cache backend set to {type(backend).__name__}')
|
|
@ -38,21 +38,20 @@ def retrieve_and_parse_content(**kwargs) -> Optional[Any]:
|
|||
return retrieve_and_parse_document(kwargs.get("id"))
|
||||
|
||||
|
||||
def retrieve_and_parse_document(fid: str, found_parent: bool = False) -> Optional[Any]:
|
||||
def retrieve_and_parse_document(fid: str) -> Optional[Any]:
|
||||
"""
|
||||
Retrieve remote document by ID and return the entity.
|
||||
"""
|
||||
from federation.entities.activitypub.models import element_to_base_entities # Circulars
|
||||
from federation.entities.activitypub.models import element_to_objects # Circulars
|
||||
document, status_code, ex = fetch_document(fid,
|
||||
extra_headers={'accept': 'application/activity+json'},
|
||||
auth=get_http_authentication(admin_user.rsa_private_key,f'{admin_user.id}#main-key') if admin_user else None)
|
||||
if document:
|
||||
document = json.loads(decode_if_bytes(document))
|
||||
entity = element_to_base_entities(document, found_parent)
|
||||
if entity:
|
||||
logger.info("retrieve_and_parse_document - found %d entity", len(entity))
|
||||
return entity
|
||||
return []
|
||||
entities = element_to_objects(document)
|
||||
if entities:
|
||||
logger.info("retrieve_and_parse_document - using first entity: %s", entities[0])
|
||||
return entities[0]
|
||||
|
||||
|
||||
def retrieve_and_parse_profile(fid: str) -> Optional[ActivitypubProfile]:
|
||||
|
@ -68,7 +67,6 @@ def retrieve_and_parse_profile(fid: str) -> Optional[ActivitypubProfile]:
|
|||
profile = retrieve_and_parse_document(profile_id)
|
||||
if not profile:
|
||||
return
|
||||
profile = profile[0]
|
||||
try:
|
||||
profile.validate()
|
||||
except ValueError as ex:
|
||||
|
|
|
@ -183,7 +183,7 @@ def retrieve_and_parse_content(
|
|||
logger.warning("retrieve_and_parse_content - more than one entity parsed from remote even though we"
|
||||
"expected only one! ID %s", guid)
|
||||
if entities:
|
||||
return entities[0:1]
|
||||
return entities[0]
|
||||
return
|
||||
elif status_code == 404:
|
||||
logger.warning("retrieve_and_parse_content - remote content %s not found", guid)
|
||||
|
|
|
@ -11,11 +11,25 @@ import requests
|
|||
from requests.exceptions import RequestException, HTTPError, SSLError
|
||||
from requests.exceptions import ConnectionError
|
||||
from requests.structures import CaseInsensitiveDict
|
||||
from requests_cache import CachedSession, RedisCache, SQLiteCache
|
||||
|
||||
from federation import __version__
|
||||
|
||||
logger = logging.getLogger("federation")
|
||||
|
||||
# try to obtain redis config from django
|
||||
try:
|
||||
from federation.utils.django import get_configuration
|
||||
cfg = get_configuration()
|
||||
if cfg.get('redis'):
|
||||
backend = RedisCache(namespace='fed_cache', **cfg['redis'])
|
||||
else:
|
||||
backend = SQLiteCache(db_path='fed_cache')
|
||||
except ImportError:
|
||||
backend = SQLiteCache(db_path='fed_cache')
|
||||
|
||||
session = CachedSession(backend=backend, expire_after=1800)
|
||||
|
||||
USER_AGENT = "python/federation/%s" % __version__
|
||||
|
||||
|
||||
|
@ -24,7 +38,7 @@ def fetch_content_type(url: str) -> Optional[str]:
|
|||
Fetch the HEAD of the remote url to determine the content type.
|
||||
"""
|
||||
try:
|
||||
response = requests.head(url, headers={'user-agent': USER_AGENT}, timeout=10)
|
||||
response = session.head(url, headers={'user-agent': USER_AGENT}, timeout=10)
|
||||
except RequestException as ex:
|
||||
logger.warning("fetch_content_type - %s when fetching url %s", ex, url)
|
||||
else:
|
||||
|
@ -60,7 +74,7 @@ def fetch_document(url=None, host=None, path="/", timeout=10, raise_ssl_errors=T
|
|||
# Use url since it was given
|
||||
logger.debug("fetch_document: trying %s", url)
|
||||
try:
|
||||
response = requests.get(url, timeout=timeout, headers=headers, **kwargs)
|
||||
response = session.get(url, timeout=timeout, headers=headers, **kwargs)
|
||||
logger.debug("fetch_document: found document, code %s", response.status_code)
|
||||
response.raise_for_status()
|
||||
return response.text, response.status_code, None
|
||||
|
@ -73,7 +87,7 @@ def fetch_document(url=None, host=None, path="/", timeout=10, raise_ssl_errors=T
|
|||
url = "https://%s%s" % (host_string, path_string)
|
||||
logger.debug("fetch_document: trying %s", url)
|
||||
try:
|
||||
response = requests.get(url, timeout=timeout, headers=headers)
|
||||
response = session.get(url, timeout=timeout, headers=headers)
|
||||
logger.debug("fetch_document: found document, code %s", response.status_code)
|
||||
response.raise_for_status()
|
||||
return response.text, response.status_code, None
|
||||
|
@ -85,7 +99,7 @@ def fetch_document(url=None, host=None, path="/", timeout=10, raise_ssl_errors=T
|
|||
url = url.replace("https://", "http://")
|
||||
logger.debug("fetch_document: trying %s", url)
|
||||
try:
|
||||
response = requests.get(url, timeout=timeout, headers=headers)
|
||||
response = session.get(url, timeout=timeout, headers=headers)
|
||||
logger.debug("fetch_document: found document, code %s", response.status_code)
|
||||
response.raise_for_status()
|
||||
return response.text, response.status_code, None
|
||||
|
@ -116,7 +130,7 @@ def fetch_file(url: str, timeout: int = 30, extra_headers: Dict = None) -> str:
|
|||
headers = {'user-agent': USER_AGENT}
|
||||
if extra_headers:
|
||||
headers.update(extra_headers)
|
||||
response = requests.get(url, timeout=timeout, headers=headers, stream=True)
|
||||
response = session.get(url, timeout=timeout, headers=headers, stream=True)
|
||||
response.raise_for_status()
|
||||
name = f"/tmp/{str(uuid4())}"
|
||||
with open(name, "wb") as f:
|
||||
|
@ -198,7 +212,7 @@ def send_document(url, data, timeout=10, method="post", *args, **kwargs):
|
|||
kwargs.update({
|
||||
"data": data, "timeout": timeout, "headers": headers
|
||||
})
|
||||
request_func = getattr(requests, method)
|
||||
request_func = getattr(session, method)
|
||||
try:
|
||||
response = request_func(url, *args, **kwargs)
|
||||
logger.debug("send_document: response status code %s", response.status_code)
|
||||
|
|
Ładowanie…
Reference in New Issue