fix potential vulnerability when fetching remote json data

pull/700/head
Henri Dickson 2024-02-19 13:06:03 -05:00
rodzic 7c34ac78ed
commit 6f31dc5600
8 zmienionych plików z 22 dodań i 16 usunięć

Wyświetl plik

@ -30,6 +30,7 @@ from activities.models.post_types import (
) )
from core.exceptions import ActivityPubFormatError from core.exceptions import ActivityPubFormatError
from core.html import ContentRenderer, FediverseHtmlParser from core.html import ContentRenderer, FediverseHtmlParser
from core.json import json_from_response
from core.ld import ( from core.ld import (
canonicalise, canonicalise,
format_ld_date, format_ld_date,
@ -1033,8 +1034,9 @@ class Post(StatorModel):
{response.content}, {response.content},
) )
try: try:
json_data = json_from_response(response)
post = cls.by_ap( post = cls.by_ap(
canonicalise(response.json(), include_security=True), canonicalise(json_data, include_security=True),
create=True, create=True,
update=True, update=True,
fetch_author=True, fetch_author=True,

Wyświetl plik

@ -83,11 +83,11 @@ class SearchService:
if response.status_code >= 400: if response.status_code >= 400:
return None return None
json_data = json_from_response(response) try:
if not json_data: json_data = json_from_response(response)
document = canonicalise(json_data, include_security=True)
except ValueError:
return None return None
document = canonicalise(json_data, include_security=True)
type = document.get("type", "unknown").lower() type = document.get("type", "unknown").lower()
# Is it an identity? # Is it an identity?

Wyświetl plik

@ -5,6 +5,7 @@ from django import forms
from django.utils.decorators import method_decorator from django.utils.decorators import method_decorator
from django.views.generic import FormView, TemplateView from django.views.generic import FormView, TemplateView
from core.json import json_from_response
from core.ld import canonicalise from core.ld import canonicalise
from users.decorators import admin_required from users.decorators import admin_required
from users.models import SystemActor from users.models import SystemActor
@ -50,8 +51,9 @@ class JsonViewer(FormView):
result = f"Error response: {response.status_code}\n{response.content}" result = f"Error response: {response.status_code}\n{response.content}"
else: else:
try: try:
document = canonicalise(response.json(), include_security=True) json_data = json_from_response(response)
except json.JSONDecodeError as ex: document = canonicalise(json_data, include_security=True)
except ValueError as ex:
result = str(ex) result = str(ex)
else: else:
context["raw_result"] = json.dumps(response.json(), indent=2) context["raw_result"] = json.dumps(response.json(), indent=2)

Wyświetl plik

@ -3,19 +3,18 @@ import json
from httpx import Response from httpx import Response
JSON_CONTENT_TYPES = [ JSON_CONTENT_TYPES = [
"application/json",
"application/ld+json", "application/ld+json",
"application/activity+json", "application/activity+json",
] ]
def json_from_response(response: Response) -> dict | None: def json_from_response(response: Response) -> dict:
content_type, *parameters = ( content_type, *parameters = (
response.headers.get("Content-Type", "invalid").lower().split(";") response.headers.get("Content-Type", "invalid").lower().split(";")
) )
if content_type not in JSON_CONTENT_TYPES: if content_type not in JSON_CONTENT_TYPES:
return None raise ValueError(f"Invalid content type: {content_type}")
charset = None charset = None

Wyświetl plik

@ -13,6 +13,7 @@ def test_fetch_post(httpx_mock: HTTPXMock, config_system):
""" """
httpx_mock.add_response( httpx_mock.add_response(
url="https://example.com/test-actor", url="https://example.com/test-actor",
headers={"Content-Type": "application/activity+json"},
json={ json={
"@context": [ "@context": [
"https://www.w3.org/ns/activitystreams", "https://www.w3.org/ns/activitystreams",
@ -23,6 +24,7 @@ def test_fetch_post(httpx_mock: HTTPXMock, config_system):
) )
httpx_mock.add_response( httpx_mock.add_response(
url="https://example.com/test-post", url="https://example.com/test-post",
headers={"Content-Type": "application/activity+json"},
json={ json={
"@context": [ "@context": [
"https://www.w3.org/ns/activitystreams", "https://www.w3.org/ns/activitystreams",

Wyświetl plik

@ -86,8 +86,7 @@ def test_search_not_found(httpx_mock: HTTPXMock, api_client):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"content_type", "content_type",
[ [
"application/json", 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"',
"application/ld+json",
"application/activity+json", "application/activity+json",
], ],
) )

Wyświetl plik

@ -109,6 +109,7 @@ def test_fetch_actor(httpx_mock, config_system):
# Trigger actor fetch # Trigger actor fetch
httpx_mock.add_response( httpx_mock.add_response(
url="https://example.com/.well-known/webfinger?resource=acct:test@example.com", url="https://example.com/.well-known/webfinger?resource=acct:test@example.com",
headers={"Content-Type": "application/activity+json"},
json={ json={
"subject": "acct:test@example.com", "subject": "acct:test@example.com",
"aliases": [ "aliases": [
@ -130,6 +131,7 @@ def test_fetch_actor(httpx_mock, config_system):
) )
httpx_mock.add_response( httpx_mock.add_response(
url="https://example.com/test-actor/", url="https://example.com/test-actor/",
headers={"Content-Type": "application/activity+json"},
json={ json={
"@context": [ "@context": [
"https://www.w3.org/ns/activitystreams", "https://www.w3.org/ns/activitystreams",
@ -170,6 +172,7 @@ def test_fetch_actor(httpx_mock, config_system):
) )
httpx_mock.add_response( httpx_mock.add_response(
url="https://example.com/test-actor/collections/featured/", url="https://example.com/test-actor/collections/featured/",
headers={"Content-Type": "application/activity+json"},
json={ json={
"type": "Collection", "type": "Collection",
"totalItems": 1, "totalItems": 1,

Wyświetl plik

@ -857,7 +857,8 @@ class Identity(StatorModel):
return [] return []
try: try:
data = canonicalise(response.json(), include_security=True) json_data = json_from_response(response)
data = canonicalise(json_data, include_security=True)
items: list[dict | str] = [] items: list[dict | str] = []
if "orderedItems" in data: if "orderedItems" in data:
items = list(reversed(data["orderedItems"])) items = list(reversed(data["orderedItems"]))
@ -917,10 +918,8 @@ class Identity(StatorModel):
"Client error fetching actor: %d %s", status_code, self.actor_uri "Client error fetching actor: %d %s", status_code, self.actor_uri
) )
return False return False
json_data = json_from_response(response)
if not json_data:
return False
try: try:
json_data = json_from_response(response)
document = canonicalise(json_data, include_security=True) document = canonicalise(json_data, include_security=True)
except ValueError: except ValueError:
# servers with empty or invalid responses are inevitable # servers with empty or invalid responses are inevitable