2019-01-09 16:52:14 +00:00
|
|
|
import pytest
|
|
|
|
|
2019-03-05 14:15:37 +00:00
|
|
|
from funkwhale_api.federation import authentication, exceptions, keys, jsonld
|
2018-03-31 13:47:21 +00:00
|
|
|
|
|
|
|
|
2018-04-03 21:25:44 +00:00
|
|
|
def test_authenticate(factories, mocker, api_request):
|
2018-03-31 13:47:21 +00:00
|
|
|
private, public = keys.get_key_pair()
|
2019-01-30 10:54:43 +00:00
|
|
|
factories["federation.Domain"](name="test.federation", nodeinfo_fetch_date=None)
|
2018-06-09 13:36:16 +00:00
|
|
|
actor_url = "https://test.federation/actor"
|
2018-03-31 13:47:21 +00:00
|
|
|
mocker.patch(
|
2018-06-09 13:36:16 +00:00
|
|
|
"funkwhale_api.federation.actors.get_actor_data",
|
2018-03-31 13:47:21 +00:00
|
|
|
return_value={
|
2019-03-05 14:15:37 +00:00
|
|
|
"@context": jsonld.get_default_context(),
|
2018-06-09 13:36:16 +00:00
|
|
|
"id": actor_url,
|
|
|
|
"type": "Person",
|
|
|
|
"outbox": "https://test.com",
|
|
|
|
"inbox": "https://test.com",
|
2018-09-22 12:29:30 +00:00
|
|
|
"followers": "https://test.com",
|
2018-06-09 13:36:16 +00:00
|
|
|
"preferredUsername": "test",
|
|
|
|
"publicKey": {
|
|
|
|
"publicKeyPem": public.decode("utf-8"),
|
|
|
|
"owner": actor_url,
|
|
|
|
"id": actor_url + "#main-key",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
)
|
2019-01-30 10:54:43 +00:00
|
|
|
update_domain_nodeinfo = mocker.patch(
|
|
|
|
"funkwhale_api.federation.tasks.update_domain_nodeinfo"
|
|
|
|
)
|
|
|
|
|
2018-06-09 13:36:16 +00:00
|
|
|
signed_request = factories["federation.SignedRequest"](
|
|
|
|
auth__key=private, auth__key_id=actor_url + "#main-key", auth__headers=["date"]
|
2018-03-31 13:47:21 +00:00
|
|
|
)
|
|
|
|
prepared = signed_request.prepare()
|
|
|
|
django_request = api_request.get(
|
2018-06-09 13:36:16 +00:00
|
|
|
"/",
|
2018-03-31 16:40:41 +00:00
|
|
|
**{
|
2018-06-09 13:36:16 +00:00
|
|
|
"HTTP_DATE": prepared.headers["date"],
|
|
|
|
"HTTP_SIGNATURE": prepared.headers["signature"],
|
2018-03-31 13:47:21 +00:00
|
|
|
}
|
|
|
|
)
|
|
|
|
authenticator = authentication.SignatureAuthentication()
|
|
|
|
user, _ = authenticator.authenticate(django_request)
|
|
|
|
actor = django_request.actor
|
|
|
|
|
|
|
|
assert user.is_anonymous is True
|
2018-06-09 13:36:16 +00:00
|
|
|
assert actor.public_key == public.decode("utf-8")
|
2018-09-06 18:35:02 +00:00
|
|
|
assert actor.fid == actor_url
|
2019-01-30 10:54:43 +00:00
|
|
|
update_domain_nodeinfo.assert_called_once_with(domain_name="test.federation")
|
2019-01-09 16:52:14 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_authenticate_skips_blocked_domain(factories, api_request):
|
|
|
|
policy = factories["moderation.InstancePolicy"](block_all=True, for_domain=True)
|
|
|
|
private, public = keys.get_key_pair()
|
|
|
|
actor_url = "https://{}/actor".format(policy.target_domain.name)
|
|
|
|
|
|
|
|
signed_request = factories["federation.SignedRequest"](
|
|
|
|
auth__key=private, auth__key_id=actor_url + "#main-key", auth__headers=["date"]
|
|
|
|
)
|
|
|
|
prepared = signed_request.prepare()
|
|
|
|
django_request = api_request.get(
|
|
|
|
"/",
|
|
|
|
**{
|
|
|
|
"HTTP_DATE": prepared.headers["date"],
|
|
|
|
"HTTP_SIGNATURE": prepared.headers["signature"],
|
|
|
|
}
|
|
|
|
)
|
|
|
|
authenticator = authentication.SignatureAuthentication()
|
|
|
|
|
|
|
|
with pytest.raises(exceptions.BlockedActorOrDomain):
|
|
|
|
authenticator.authenticate(django_request)
|
|
|
|
|
|
|
|
|
|
|
|
def test_authenticate_skips_blocked_actor(factories, api_request):
|
|
|
|
policy = factories["moderation.InstancePolicy"](block_all=True, for_actor=True)
|
|
|
|
private, public = keys.get_key_pair()
|
|
|
|
actor_url = policy.target_actor.fid
|
|
|
|
|
|
|
|
signed_request = factories["federation.SignedRequest"](
|
|
|
|
auth__key=private, auth__key_id=actor_url + "#main-key", auth__headers=["date"]
|
|
|
|
)
|
|
|
|
prepared = signed_request.prepare()
|
|
|
|
django_request = api_request.get(
|
|
|
|
"/",
|
|
|
|
**{
|
|
|
|
"HTTP_DATE": prepared.headers["date"],
|
|
|
|
"HTTP_SIGNATURE": prepared.headers["signature"],
|
|
|
|
}
|
|
|
|
)
|
|
|
|
authenticator = authentication.SignatureAuthentication()
|
|
|
|
|
|
|
|
with pytest.raises(exceptions.BlockedActorOrDomain):
|
|
|
|
authenticator.authenticate(django_request)
|
|
|
|
|
|
|
|
|
|
|
|
def test_authenticate_ignore_inactive_policy(factories, api_request, mocker):
|
|
|
|
policy = factories["moderation.InstancePolicy"](
|
|
|
|
block_all=True, for_domain=True, is_active=False
|
|
|
|
)
|
|
|
|
private, public = keys.get_key_pair()
|
|
|
|
actor_url = "https://{}/actor".format(policy.target_domain.name)
|
|
|
|
|
|
|
|
signed_request = factories["federation.SignedRequest"](
|
|
|
|
auth__key=private, auth__key_id=actor_url + "#main-key", auth__headers=["date"]
|
|
|
|
)
|
|
|
|
mocker.patch(
|
|
|
|
"funkwhale_api.federation.actors.get_actor_data",
|
|
|
|
return_value={
|
2019-03-05 14:15:37 +00:00
|
|
|
"@context": jsonld.get_default_context(),
|
2019-01-09 16:52:14 +00:00
|
|
|
"id": actor_url,
|
|
|
|
"type": "Person",
|
|
|
|
"outbox": "https://test.com",
|
|
|
|
"inbox": "https://test.com",
|
|
|
|
"followers": "https://test.com",
|
|
|
|
"preferredUsername": "test",
|
|
|
|
"publicKey": {
|
|
|
|
"publicKeyPem": public.decode("utf-8"),
|
|
|
|
"owner": actor_url,
|
|
|
|
"id": actor_url + "#main-key",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
)
|
|
|
|
prepared = signed_request.prepare()
|
|
|
|
django_request = api_request.get(
|
|
|
|
"/",
|
|
|
|
**{
|
|
|
|
"HTTP_DATE": prepared.headers["date"],
|
|
|
|
"HTTP_SIGNATURE": prepared.headers["signature"],
|
|
|
|
}
|
|
|
|
)
|
|
|
|
authenticator = authentication.SignatureAuthentication()
|
|
|
|
authenticator.authenticate(django_request)
|
|
|
|
actor = django_request.actor
|
|
|
|
|
|
|
|
assert actor.public_key == public.decode("utf-8")
|
|
|
|
assert actor.fid == actor_url
|
2019-01-11 10:04:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_autenthicate_supports_blind_key_rotation(factories, mocker, api_request):
|
|
|
|
actor = factories["federation.Actor"]()
|
|
|
|
actor_url = actor.fid
|
|
|
|
# request is signed with a pair of new keys
|
|
|
|
new_private, new_public = keys.get_key_pair()
|
|
|
|
mocker.patch(
|
|
|
|
"funkwhale_api.federation.actors.get_actor_data",
|
|
|
|
return_value={
|
2019-03-05 14:15:37 +00:00
|
|
|
"@context": jsonld.get_default_context(),
|
2019-01-11 10:04:11 +00:00
|
|
|
"id": actor_url,
|
|
|
|
"type": "Person",
|
|
|
|
"outbox": "https://test.com",
|
|
|
|
"inbox": "https://test.com",
|
|
|
|
"followers": "https://test.com",
|
|
|
|
"preferredUsername": "test",
|
|
|
|
"publicKey": {
|
|
|
|
"publicKeyPem": new_public.decode("utf-8"),
|
|
|
|
"owner": actor_url,
|
|
|
|
"id": actor_url + "#main-key",
|
|
|
|
},
|
|
|
|
},
|
|
|
|
)
|
|
|
|
signed_request = factories["federation.SignedRequest"](
|
|
|
|
auth__key=new_private,
|
|
|
|
auth__key_id=actor_url + "#main-key",
|
|
|
|
auth__headers=["date"],
|
|
|
|
)
|
|
|
|
prepared = signed_request.prepare()
|
|
|
|
django_request = api_request.get(
|
|
|
|
"/",
|
|
|
|
**{
|
|
|
|
"HTTP_DATE": prepared.headers["date"],
|
|
|
|
"HTTP_SIGNATURE": prepared.headers["signature"],
|
|
|
|
}
|
|
|
|
)
|
|
|
|
authenticator = authentication.SignatureAuthentication()
|
|
|
|
user, _ = authenticator.authenticate(django_request)
|
|
|
|
actor = django_request.actor
|
|
|
|
|
|
|
|
assert user.is_anonymous is True
|
|
|
|
assert actor.public_key == new_public.decode("utf-8")
|
|
|
|
assert actor.fid == actor_url
|
2019-06-26 08:22:29 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_authenticate_checks_signature_with_allow_list(
|
|
|
|
preferences, factories, api_request
|
|
|
|
):
|
|
|
|
preferences["moderation__allow_list_enabled"] = True
|
|
|
|
domain = factories["federation.Domain"](allowed=False)
|
|
|
|
private, public = keys.get_key_pair()
|
|
|
|
actor_url = "https://{}/actor".format(domain.name)
|
|
|
|
|
|
|
|
signed_request = factories["federation.SignedRequest"](
|
|
|
|
auth__key=private, auth__key_id=actor_url + "#main-key", auth__headers=["date"]
|
|
|
|
)
|
|
|
|
prepared = signed_request.prepare()
|
|
|
|
django_request = api_request.get(
|
|
|
|
"/",
|
|
|
|
**{
|
|
|
|
"HTTP_DATE": prepared.headers["date"],
|
|
|
|
"HTTP_SIGNATURE": prepared.headers["signature"],
|
|
|
|
}
|
|
|
|
)
|
|
|
|
authenticator = authentication.SignatureAuthentication()
|
|
|
|
|
|
|
|
with pytest.raises(exceptions.BlockedActorOrDomain):
|
|
|
|
authenticator.authenticate(django_request)
|