kopia lustrzana https://dev.funkwhale.audio/funkwhale/funkwhale
				
				
				
			More test cases for request signing and added helpers to verify signature
							rodzic
							
								
									aa7365b71f
								
							
						
					
					
						commit
						4522f5997e
					
				| 
						 | 
				
			
			@ -1,3 +1,6 @@
 | 
			
		|||
import requests
 | 
			
		||||
import requests_http_signature
 | 
			
		||||
 | 
			
		||||
from cryptography.hazmat.primitives import serialization as crypto_serialization
 | 
			
		||||
from cryptography.hazmat.primitives.asymmetric import rsa
 | 
			
		||||
from cryptography.hazmat.backends import default_backend as crypto_default_backend
 | 
			
		||||
| 
						 | 
				
			
			@ -19,3 +22,35 @@ def get_key_pair(size=2048):
 | 
			
		|||
    )
 | 
			
		||||
 | 
			
		||||
    return private_key, public_key
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def verify(request, public_key):
 | 
			
		||||
    return requests_http_signature.HTTPSignatureAuth.verify(
 | 
			
		||||
        request,
 | 
			
		||||
        key_resolver=lambda **kwargs: public_key
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def verify_django(django_request, public_key):
 | 
			
		||||
    """
 | 
			
		||||
    Given a django WSGI request, create an underlying requests.PreparedRequest
 | 
			
		||||
    instance we can verify
 | 
			
		||||
    """
 | 
			
		||||
    headers = django_request.META.get('headers', {}).copy()
 | 
			
		||||
    for h, v in list(headers.items()):
 | 
			
		||||
        # we include lower-cased version of the headers for compatibility
 | 
			
		||||
        # with requests_http_signature
 | 
			
		||||
        headers[h.lower()] = v
 | 
			
		||||
    try:
 | 
			
		||||
        signature = headers['authorization']
 | 
			
		||||
    except KeyError:
 | 
			
		||||
        raise exceptions.MissingSignature
 | 
			
		||||
 | 
			
		||||
    request = requests.Request(
 | 
			
		||||
        method=django_request.method,
 | 
			
		||||
        url='http://noop',
 | 
			
		||||
        data=django_request.body,
 | 
			
		||||
        headers=headers)
 | 
			
		||||
 | 
			
		||||
    prepared_request = request.prepare()
 | 
			
		||||
    return verify(request, public_key)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,10 +16,23 @@ def test_can_sign_and_verify_request(factories):
 | 
			
		|||
    assert 'date' in prepared_request.headers
 | 
			
		||||
    assert 'authorization' in prepared_request.headers
 | 
			
		||||
    assert prepared_request.headers['authorization'].startswith('Signature')
 | 
			
		||||
    assert requests_http_signature.HTTPSignatureAuth.verify(
 | 
			
		||||
        prepared_request,
 | 
			
		||||
        key_resolver=lambda **kwargs: public
 | 
			
		||||
    ) is None
 | 
			
		||||
    assert signing.verify(prepared_request, public) is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_can_sign_and_verify_request_digest(factories):
 | 
			
		||||
    private, public = factories['federation.KeyPair']()
 | 
			
		||||
    auth = factories['federation.SignatureAuth'](key=private)
 | 
			
		||||
    request = factories['federation.SignedRequest'](
 | 
			
		||||
        auth=auth,
 | 
			
		||||
        method='post',
 | 
			
		||||
        data=b'hello=world'
 | 
			
		||||
    )
 | 
			
		||||
    prepared_request = request.prepare()
 | 
			
		||||
    assert 'date' in prepared_request.headers
 | 
			
		||||
    assert 'digest' in prepared_request.headers
 | 
			
		||||
    assert 'authorization' in prepared_request.headers
 | 
			
		||||
    assert prepared_request.headers['authorization'].startswith('Signature')
 | 
			
		||||
    assert signing.verify(prepared_request, public) is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_verify_fails_with_wrong_key(factories):
 | 
			
		||||
| 
						 | 
				
			
			@ -28,7 +41,78 @@ def test_verify_fails_with_wrong_key(factories):
 | 
			
		|||
    prepared_request = request.prepare()
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(cryptography.exceptions.InvalidSignature):
 | 
			
		||||
        requests_http_signature.HTTPSignatureAuth.verify(
 | 
			
		||||
            prepared_request,
 | 
			
		||||
            key_resolver=lambda **kwargs: wrong_public
 | 
			
		||||
        )
 | 
			
		||||
        signing.verify(prepared_request, wrong_public)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_can_verify_django_request(factories, api_request):
 | 
			
		||||
    private_key, public_key = signing.get_key_pair()
 | 
			
		||||
    signed_request = factories['federation.SignedRequest'](
 | 
			
		||||
        auth__key=private_key
 | 
			
		||||
    )
 | 
			
		||||
    prepared = signed_request.prepare()
 | 
			
		||||
    django_request = api_request.get(
 | 
			
		||||
        '/',
 | 
			
		||||
        headers={
 | 
			
		||||
            'Date': prepared.headers['date'],
 | 
			
		||||
            'Authorization': prepared.headers['authorization'],
 | 
			
		||||
        }
 | 
			
		||||
    )
 | 
			
		||||
    assert signing.verify_django(django_request, public_key) is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_can_verify_django_request_digest(factories, api_request):
 | 
			
		||||
    private_key, public_key = signing.get_key_pair()
 | 
			
		||||
    signed_request = factories['federation.SignedRequest'](
 | 
			
		||||
        auth__key=private_key,
 | 
			
		||||
        method='post',
 | 
			
		||||
        data=b'hello=world'
 | 
			
		||||
    )
 | 
			
		||||
    prepared = signed_request.prepare()
 | 
			
		||||
    django_request = api_request.post(
 | 
			
		||||
        '/',
 | 
			
		||||
        headers={
 | 
			
		||||
            'Date': prepared.headers['date'],
 | 
			
		||||
            'Digest': prepared.headers['digest'],
 | 
			
		||||
            'Authorization': prepared.headers['authorization'],
 | 
			
		||||
        }
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert signing.verify_django(django_request, public_key) is None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_can_verify_django_request_digest_failure(factories, api_request):
 | 
			
		||||
    private_key, public_key = signing.get_key_pair()
 | 
			
		||||
    signed_request = factories['federation.SignedRequest'](
 | 
			
		||||
        auth__key=private_key,
 | 
			
		||||
        method='post',
 | 
			
		||||
        data=b'hello=world'
 | 
			
		||||
    )
 | 
			
		||||
    prepared = signed_request.prepare()
 | 
			
		||||
    django_request = api_request.post(
 | 
			
		||||
        '/',
 | 
			
		||||
        headers={
 | 
			
		||||
            'Date': prepared.headers['date'],
 | 
			
		||||
            'Digest': prepared.headers['digest'] + 'noop',
 | 
			
		||||
            'Authorization': prepared.headers['authorization'],
 | 
			
		||||
        }
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(cryptography.exceptions.InvalidSignature):
 | 
			
		||||
        signing.verify_django(django_request, public_key)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_can_verify_django_request_failure(factories, api_request):
 | 
			
		||||
    private_key, public_key = signing.get_key_pair()
 | 
			
		||||
    signed_request = factories['federation.SignedRequest'](
 | 
			
		||||
        auth__key=private_key
 | 
			
		||||
    )
 | 
			
		||||
    prepared = signed_request.prepare()
 | 
			
		||||
    django_request = api_request.get(
 | 
			
		||||
        '/',
 | 
			
		||||
        headers={
 | 
			
		||||
            'Date': 'Wrong',
 | 
			
		||||
            'Authorization': prepared.headers['authorization'],
 | 
			
		||||
        }
 | 
			
		||||
    )
 | 
			
		||||
    with pytest.raises(cryptography.exceptions.InvalidSignature):
 | 
			
		||||
        signing.verify_django(django_request, public_key)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Ładowanie…
	
		Reference in New Issue