kopia lustrzana https://dev.funkwhale.audio/funkwhale/funkwhale
91 wiersze
2.5 KiB
Python
91 wiersze
2.5 KiB
Python
import unicodedata
|
|
import re
|
|
from django.conf import settings
|
|
|
|
from funkwhale_api.common import session
|
|
|
|
from . import signing
|
|
|
|
|
|
def full_url(path):
|
|
"""
|
|
Given a relative path, return a full url usable for federation purpose
|
|
"""
|
|
if path.startswith("http://") or path.startswith("https://"):
|
|
return path
|
|
root = settings.FUNKWHALE_URL
|
|
if path.startswith("/") and root.endswith("/"):
|
|
return root + path[1:]
|
|
elif not path.startswith("/") and not root.endswith("/"):
|
|
return root + "/" + path
|
|
else:
|
|
return root + path
|
|
|
|
|
|
def clean_wsgi_headers(raw_headers):
|
|
"""
|
|
Convert WSGI headers from CONTENT_TYPE to Content-Type notation
|
|
"""
|
|
cleaned = {}
|
|
non_prefixed = ["content_type", "content_length"]
|
|
for raw_header, value in raw_headers.items():
|
|
h = raw_header.lower()
|
|
if not h.startswith("http_") and h not in non_prefixed:
|
|
continue
|
|
|
|
words = h.replace("http_", "", 1).split("_")
|
|
cleaned_header = "-".join([w.capitalize() for w in words])
|
|
cleaned[cleaned_header] = value
|
|
|
|
return cleaned
|
|
|
|
|
|
def slugify_username(username):
|
|
"""
|
|
Given a username such as "hello M. world", returns a username
|
|
suitable for federation purpose (hello_M_world).
|
|
|
|
Preserves the original case.
|
|
|
|
Code is borrowed from django's slugify function.
|
|
"""
|
|
|
|
value = str(username)
|
|
value = (
|
|
unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii")
|
|
)
|
|
value = re.sub(r"[^\w\s-]", "", value).strip()
|
|
return re.sub(r"[-\s]+", "_", value)
|
|
|
|
|
|
def retrieve(fid, actor=None, serializer_class=None, queryset=None):
|
|
if queryset:
|
|
try:
|
|
# queryset can also be a Model class
|
|
existing = queryset.filter(fid=fid).first()
|
|
except AttributeError:
|
|
existing = queryset.objects.filter(fid=fid).first()
|
|
if existing:
|
|
return existing
|
|
|
|
auth = (
|
|
None if not actor else signing.get_auth(actor.private_key, actor.private_key_id)
|
|
)
|
|
response = session.get_session().get(
|
|
fid,
|
|
auth=auth,
|
|
timeout=5,
|
|
verify=settings.EXTERNAL_REQUESTS_VERIFY_SSL,
|
|
headers={
|
|
"Accept": "application/activity+json",
|
|
"Content-Type": "application/activity+json",
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if not serializer_class:
|
|
return data
|
|
serializer = serializer_class(data=data)
|
|
serializer.is_valid(raise_exception=True)
|
|
return serializer.save()
|