kopia lustrzana https://github.com/micropython/micropython-lib
Porównaj commity
4 Commity
a5f583e3cf
...
dea822ec88
Autor | SHA1 | Data |
---|---|---|
Jonah Bron | dea822ec88 | |
Jonah Bron | 1d642ad8eb | |
Jonah Bron | 4e43bc0c78 | |
Jonah Bron | 17b7e0c665 |
|
@ -0,0 +1,12 @@
|
||||||
|
metadata(
|
||||||
|
version="0.1.0",
|
||||||
|
pypi="py-vapid",
|
||||||
|
author="Jonah Bron <hi@jonah.id>",
|
||||||
|
description="""
|
||||||
|
VAPID
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
require("pyjwt")
|
||||||
|
|
||||||
|
package("py_vapid")
|
|
@ -0,0 +1,50 @@
|
||||||
|
"""
|
||||||
|
Based on https://github.com/web-push-libs/vapid
|
||||||
|
"""
|
||||||
|
|
||||||
|
import binascii
|
||||||
|
import time
|
||||||
|
import jwt
|
||||||
|
|
||||||
|
from cryptography import serialization
|
||||||
|
|
||||||
|
|
||||||
|
def _to_b64url(data):
|
||||||
|
return (
|
||||||
|
binascii.b2a_base64(data)
|
||||||
|
.rstrip(b"\n")
|
||||||
|
.rstrip(b"=")
|
||||||
|
.replace(b"+", b"-")
|
||||||
|
.replace(b"/", b"_")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Vapid:
|
||||||
|
def __init__(self, private_key):
|
||||||
|
self._private_key = private_key
|
||||||
|
|
||||||
|
def sign(self, claims):
|
||||||
|
claim = claims
|
||||||
|
if "exp" not in claim:
|
||||||
|
# Default to expiring 24 hours into the future (the max).
|
||||||
|
# https://datatracker.ietf.org/doc/html/rfc8292#section-2
|
||||||
|
exp = int(time.time()) + 86400
|
||||||
|
# Correct the epoch offset if not the Unix standard.
|
||||||
|
if time.gmtime(0)[0] == 2000:
|
||||||
|
exp += 946684800 # Unix timestamp of 2000-01-01
|
||||||
|
|
||||||
|
claim["exp"] = exp
|
||||||
|
|
||||||
|
token = jwt.encode(claim, self._private_key, "ES256")
|
||||||
|
public_key = _to_b64url(
|
||||||
|
self._private_key.public_key().public_bytes(
|
||||||
|
encoding=serialization.Encoding.X962,
|
||||||
|
format=serialization.PublicFormat.UncompressedPoint,
|
||||||
|
)
|
||||||
|
).decode()
|
||||||
|
|
||||||
|
return {"Authorization": f"vapid t={token},k={public_key}"}
|
||||||
|
|
||||||
|
|
||||||
|
# Re-export for interface compatibility with PyPi py-vapid
|
||||||
|
Vapid02 = Vapid
|
|
@ -0,0 +1,111 @@
|
||||||
|
import jwt
|
||||||
|
import py_vapid
|
||||||
|
from time import time
|
||||||
|
from cryptography import ec
|
||||||
|
from machine import RTC
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
Run tests by executing:
|
||||||
|
|
||||||
|
```
|
||||||
|
mpremote fs cp py_vapid/__init__.py :lib/py_vapid.py + run test_vapid.py
|
||||||
|
```
|
||||||
|
|
||||||
|
The [ucryptography](https://github.com/dmazzella/ucryptography) library must
|
||||||
|
be present in the firmware for this library and tests to work.
|
||||||
|
"""
|
||||||
|
|
||||||
|
rtc = RTC()
|
||||||
|
|
||||||
|
GOLDEN_0 = (
|
||||||
|
0xEB6DFB26C7A3C23D33C60F7C7BA61B6893451F2643E0737B20759E457825EE75,
|
||||||
|
(2010, 1, 1, 0, 0, 0, 0, 0),
|
||||||
|
{
|
||||||
|
"aud": "https://updates.push.services.mozilla.com",
|
||||||
|
"sub": "mailto:admin@example.com",
|
||||||
|
"exp": 9876543,
|
||||||
|
},
|
||||||
|
"vapid t=eyJ0eXAiOiAiSldUIiwgImFsZyI6ICJFUzI1NiJ9.eyJhdWQiOiAiaHR0cHM6Ly91cGRhdGVzLnB1c2guc2VydmljZXMubW96aWxsYS5jb20iLCAic3ViIjogIm1haWx0bzphZG1pbkBleGFtcGxlLmNvbSIsICJleHAiOiA5ODc2NTQzfQ.DLB6PF2RApzk0n0oH-Kv_Onuwg9C7VXakM-GlEMCwj50rQ7G0hF_vLIYzCPeXT8Hu8Uup900YBapZ9y45vc8QA,k=BKoKs6nJ3466nCEQ5TvFkBIGBKSGplPTUBzJlLXM13I8S0SF-o_NSB-Q4At3BeLSrZVptEd5xBuGRXCKMe_YRg8",
|
||||||
|
)
|
||||||
|
|
||||||
|
GOLDEN_1 = (
|
||||||
|
0x4370082632776C74FDC5517AC12881413A60B25D10E863296AD67E4260A3BF56,
|
||||||
|
(2015, 1, 1, 0, 0, 0, 0, 0),
|
||||||
|
{
|
||||||
|
"aud": "https://updates.push.services.mozilla.com",
|
||||||
|
"sub": "mailto:admin@example.com",
|
||||||
|
},
|
||||||
|
"vapid t=eyJ0eXAiOiAiSldUIiwgImFsZyI6ICJFUzI1NiJ9.eyJleHAiOiAxNDIwMTU2ODAwLCAic3ViIjogIm1haWx0bzphZG1pbkBleGFtcGxlLmNvbSIsICJhdWQiOiAiaHR0cHM6Ly91cGRhdGVzLnB1c2guc2VydmljZXMubW96aWxsYS5jb20ifQ.NlVtqjGWy-hvNtoScrwAv-4cpNYrgUJ4EVgtxTnIn-haPtBSpak7aQN518tVYelQB1TZqc0bxAjWfK9QvZUbOA,k=BGEwf7m9F3vCvOuPeN4pEZ91t-dpSmg_y8ZXMfOyl-f22zw10ho_4EeBqZj2-NtW_Kb98b6tGjOKO_-TJiWvyfo",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set of opaquely known-good scenarios to check against
|
||||||
|
golden_test_cases = [GOLDEN_0, GOLDEN_1]
|
||||||
|
|
||||||
|
|
||||||
|
# Test basic validation of claim
|
||||||
|
private_key_0 = ec.derive_private_key(
|
||||||
|
0x5C76C15BBC541E7BF6987557124A6E6EB745723B1CF20E2ED2A3ED5B7C16DD46, ec.SECP256R1()
|
||||||
|
)
|
||||||
|
vapid = py_vapid.Vapid(private_key=private_key_0)
|
||||||
|
rtc.datetime((2018, 1, 1, 0, 0, 0, 0, 0))
|
||||||
|
headers = vapid.sign(
|
||||||
|
{
|
||||||
|
"aud": "https://fcm.googleapis.com",
|
||||||
|
"sub": "mailto:foo@bar.com",
|
||||||
|
"exp": 1493315200,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
actual_token = headers["Authorization"].split(" ")[1].split(",")[0].split("=")[1]
|
||||||
|
actual_decoded_claim = jwt.decode(actual_token, private_key_0.public_key(), "ES256")
|
||||||
|
assert (
|
||||||
|
actual_decoded_claim["aud"] == "https://fcm.googleapis.com"
|
||||||
|
), f"Claim audience '{actual_decoded_claim['aud']}' does not match input"
|
||||||
|
assert (
|
||||||
|
actual_decoded_claim["sub"] == "mailto:foo@bar.com"
|
||||||
|
), f"Claim subscriber '{actual_decoded_claim['sub']}' does not match input"
|
||||||
|
assert (
|
||||||
|
actual_decoded_claim["exp"] == 1493315200
|
||||||
|
), f"Claim exp '{actual_decoded_claim['exp']}' does not match input"
|
||||||
|
print(f"Test claim validation: Passed")
|
||||||
|
|
||||||
|
|
||||||
|
# Test auto expiration date population
|
||||||
|
private_key_1 = ec.derive_private_key(
|
||||||
|
0x5C76C15BBC541E7BF6987557124A6E6EB745723B1CF20E2ED2A3ED5B7C16DD46, ec.SECP256R1()
|
||||||
|
)
|
||||||
|
vapid = py_vapid.Vapid(private_key=private_key_1)
|
||||||
|
rtc.datetime((2017, 1, 1, 0, 0, 0, 0, 0))
|
||||||
|
headers = vapid.sign(
|
||||||
|
{
|
||||||
|
"aud": "https://updates.push.services.mozilla.com",
|
||||||
|
"sub": "mailto:admin@example.com",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
actual_token = headers["Authorization"].split(" ")[1].split(",")[0].split("=")[1]
|
||||||
|
actual_decoded_claim = jwt.decode(actual_token, private_key_1.public_key(), "ES256")
|
||||||
|
assert (
|
||||||
|
actual_decoded_claim["exp"] == 1483315200
|
||||||
|
), f"Claim exp '{actual_decoded_claim['exp']}' does not match expected 2017-01-02 value"
|
||||||
|
print(f"Test auto expiry: Passed")
|
||||||
|
|
||||||
|
|
||||||
|
# Because they provide the least information about what could have gone wrong,
|
||||||
|
# Run golden test cases after all more specific tests pass first.
|
||||||
|
for case_no, case in enumerate(golden_test_cases):
|
||||||
|
private_key_number, curr_time, claim, expected_id = case
|
||||||
|
try:
|
||||||
|
private_key = ec.derive_private_key(private_key_number, ec.SECP256R1())
|
||||||
|
vapid = py_vapid.Vapid(private_key=private_key)
|
||||||
|
rtc.datetime(curr_time)
|
||||||
|
headers = vapid.sign(claim)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
headers["Authorization"] == expected_id
|
||||||
|
), f"Authorization header '{headers['Authorization']}' does not match golden test case {case_no}"
|
||||||
|
print(f"Golden test case {case_no}: Passed")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Golden test case {case_no}: Failed")
|
||||||
|
raise e
|
|
@ -4,6 +4,17 @@ import hmac
|
||||||
import json
|
import json
|
||||||
from time import time
|
from time import time
|
||||||
|
|
||||||
|
# Optionally depend on https://github.com/dmazzella/ucryptography
|
||||||
|
try:
|
||||||
|
# Try importing from ucryptography port.
|
||||||
|
import cryptography
|
||||||
|
from cryptography import hashes, ec, serialization, utils
|
||||||
|
|
||||||
|
_ec_supported = True
|
||||||
|
except ImportError:
|
||||||
|
# No cryptography library available, no EC256 support.
|
||||||
|
_ec_supported = False
|
||||||
|
|
||||||
|
|
||||||
def _to_b64url(data):
|
def _to_b64url(data):
|
||||||
return (
|
return (
|
||||||
|
@ -19,6 +30,28 @@ def _from_b64url(data):
|
||||||
return binascii.a2b_base64(data.replace(b"-", b"+").replace(b"_", b"/") + b"===")
|
return binascii.a2b_base64(data.replace(b"-", b"+").replace(b"_", b"/") + b"===")
|
||||||
|
|
||||||
|
|
||||||
|
def _sig_der_to_jws(signed):
|
||||||
|
"""Accept a DER signature and convert to JSON Web Signature bytes.
|
||||||
|
|
||||||
|
`cryptography` produces signatures encoded in DER ASN.1 binary format.
|
||||||
|
JSON Web Algorithm instead encodes the signature as the point coordinates
|
||||||
|
as bigendian byte strings concatenated.
|
||||||
|
|
||||||
|
See https://datatracker.ietf.org/doc/html/rfc7518#section-3.4
|
||||||
|
"""
|
||||||
|
r, s = utils.decode_dss_signature(signed)
|
||||||
|
return r.to_bytes(32, "big") + s.to_bytes(32, "big")
|
||||||
|
|
||||||
|
|
||||||
|
def _sig_jws_to_der(signed):
|
||||||
|
"""Accept a JSON Web Signature and convert to a DER signature.
|
||||||
|
|
||||||
|
See `_sig_der_to_jws()`
|
||||||
|
"""
|
||||||
|
r, s = int.from_bytes(signed[0:32], "big"), int.from_bytes(signed[32:], "big")
|
||||||
|
return utils.encode_dss_signature(r, s)
|
||||||
|
|
||||||
|
|
||||||
class exceptions:
|
class exceptions:
|
||||||
class PyJWTError(Exception):
|
class PyJWTError(Exception):
|
||||||
pass
|
pass
|
||||||
|
@ -37,19 +70,32 @@ class exceptions:
|
||||||
|
|
||||||
|
|
||||||
def encode(payload, key, algorithm="HS256"):
|
def encode(payload, key, algorithm="HS256"):
|
||||||
if algorithm != "HS256":
|
if algorithm != "HS256" and algorithm != "ES256":
|
||||||
raise exceptions.InvalidAlgorithmError
|
raise exceptions.InvalidAlgorithmError
|
||||||
|
|
||||||
if isinstance(key, str):
|
|
||||||
key = key.encode()
|
|
||||||
header = _to_b64url(json.dumps({"typ": "JWT", "alg": algorithm}).encode())
|
header = _to_b64url(json.dumps({"typ": "JWT", "alg": algorithm}).encode())
|
||||||
payload = _to_b64url(json.dumps(payload).encode())
|
payload = _to_b64url(json.dumps(payload).encode())
|
||||||
signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest())
|
|
||||||
|
if algorithm == "HS256":
|
||||||
|
if isinstance(key, str):
|
||||||
|
key = key.encode()
|
||||||
|
signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest())
|
||||||
|
elif algorithm == "ES256":
|
||||||
|
if not _ec_supported:
|
||||||
|
raise exceptions.InvalidAlgorithmError(
|
||||||
|
"Required dependencies for ES256 are not available"
|
||||||
|
)
|
||||||
|
if isinstance(key, int):
|
||||||
|
key = ec.derive_private_key(key, ec.SECP256R1())
|
||||||
|
signature = _to_b64url(
|
||||||
|
_sig_der_to_jws(key.sign(header + b"." + payload, ec.ECDSA(hashes.SHA256())))
|
||||||
|
)
|
||||||
|
|
||||||
return (header + b"." + payload + b"." + signature).decode()
|
return (header + b"." + payload + b"." + signature).decode()
|
||||||
|
|
||||||
|
|
||||||
def decode(token, key, algorithms=["HS256"]):
|
def decode(token, key, algorithms=["HS256", "ES256"]):
|
||||||
if "HS256" not in algorithms:
|
if "HS256" not in algorithms and "ES256" not in algorithms:
|
||||||
raise exceptions.InvalidAlgorithmError
|
raise exceptions.InvalidAlgorithmError
|
||||||
|
|
||||||
parts = token.encode().split(b".")
|
parts = token.encode().split(b".")
|
||||||
|
@ -63,14 +109,31 @@ def decode(token, key, algorithms=["HS256"]):
|
||||||
except Exception:
|
except Exception:
|
||||||
raise exceptions.InvalidTokenError
|
raise exceptions.InvalidTokenError
|
||||||
|
|
||||||
if header["alg"] not in algorithms or header["alg"] != "HS256":
|
if header["alg"] not in algorithms or (header["alg"] != "HS256" and header["alg"] != "ES256"):
|
||||||
raise exceptions.InvalidAlgorithmError
|
raise exceptions.InvalidAlgorithmError
|
||||||
|
|
||||||
if isinstance(key, str):
|
if header["alg"] == "HS256":
|
||||||
key = key.encode()
|
if isinstance(key, str):
|
||||||
calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest()
|
key = key.encode()
|
||||||
if signature != calculated_signature:
|
calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest()
|
||||||
raise exceptions.InvalidSignatureError
|
if signature != calculated_signature:
|
||||||
|
raise exceptions.InvalidSignatureError
|
||||||
|
elif header["alg"] == "ES256":
|
||||||
|
if not _ec_supported:
|
||||||
|
raise exceptions.InvalidAlgorithmError(
|
||||||
|
"Required dependencies for ES256 are not available"
|
||||||
|
)
|
||||||
|
|
||||||
|
if isinstance(key, bytes):
|
||||||
|
key = ec.EllipticCurvePublicKey.from_encoded_point(key, ec.SECP256R1())
|
||||||
|
try:
|
||||||
|
key.verify(
|
||||||
|
_sig_jws_to_der(signature),
|
||||||
|
parts[0] + b"." + parts[1],
|
||||||
|
ec.ECDSA(hashes.SHA256()),
|
||||||
|
)
|
||||||
|
except cryptography.exceptions.InvalidSignature:
|
||||||
|
raise exceptions.InvalidSignatureError
|
||||||
|
|
||||||
if "exp" in payload:
|
if "exp" in payload:
|
||||||
if time() > payload["exp"]:
|
if time() > payload["exp"]:
|
||||||
|
|
|
@ -1,4 +1,13 @@
|
||||||
metadata(version="0.1.0", pypi="pyjwt")
|
metadata(
|
||||||
|
version="0.2.0",
|
||||||
|
pypi="pyjwt",
|
||||||
|
description="""
|
||||||
|
JWT library for MicroPython. Supports HMAC (HS256) encoding essentially.
|
||||||
|
Optionally supports ECDSA (ES256) asymmetric-key signing/verification when the
|
||||||
|
[dmazella/ucryptography](https://github.com/dmazzella/ucryptography/) library
|
||||||
|
is available in the MicroPython firmware.
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
require("hmac")
|
require("hmac")
|
||||||
|
|
||||||
|
|
|
@ -1,28 +1,71 @@
|
||||||
import jwt
|
import jwt
|
||||||
from time import time
|
from time import time
|
||||||
|
|
||||||
|
"""
|
||||||
|
Run tests by executing:
|
||||||
|
|
||||||
|
```
|
||||||
|
mpremote fs cp jwt.py :lib/jwt.py + run test_jwt.py
|
||||||
|
```
|
||||||
|
|
||||||
|
Only the full test suite can be run if
|
||||||
|
[ucryptography](https://github.com/dmazzella/ucryptography) is present in the
|
||||||
|
firmware.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Indentation
|
||||||
|
I = " "
|
||||||
|
|
||||||
|
print("Testing HS256")
|
||||||
secret_key = "top-secret!"
|
secret_key = "top-secret!"
|
||||||
|
|
||||||
token = jwt.encode({"user": "joe"}, secret_key, algorithm="HS256")
|
token = jwt.encode({"user": "joe"}, secret_key, algorithm="HS256")
|
||||||
print(token)
|
|
||||||
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
|
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
|
||||||
if decoded != {"user": "joe"}:
|
if decoded != {"user": "joe"}:
|
||||||
raise Exception("Invalid decoded JWT")
|
raise Exception("Invalid decoded JWT")
|
||||||
else:
|
else:
|
||||||
print("Encode/decode test: OK")
|
print(I, "Encode/decode test: OK")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
decoded = jwt.decode(token, "wrong-secret", algorithms=["HS256"])
|
decoded = jwt.decode(token, "wrong-secret", algorithms=["HS256"])
|
||||||
except jwt.exceptions.InvalidSignatureError:
|
except jwt.exceptions.InvalidSignatureError:
|
||||||
print("Invalid signature test: OK")
|
print(I, "Invalid signature test: OK")
|
||||||
else:
|
else:
|
||||||
raise Exception("Invalid JWT should have failed decoding")
|
raise Exception("Invalid JWT should have failed decoding")
|
||||||
|
|
||||||
token = jwt.encode({"user": "joe", "exp": time() - 1}, secret_key)
|
token = jwt.encode({"user": "joe", "exp": time() - 1}, secret_key)
|
||||||
print(token)
|
|
||||||
try:
|
try:
|
||||||
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
|
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
|
||||||
except jwt.exceptions.ExpiredSignatureError:
|
except jwt.exceptions.ExpiredSignatureError:
|
||||||
print("Expired token test: OK")
|
print(I, "Expired token test: OK")
|
||||||
else:
|
else:
|
||||||
raise Exception("Expired JWT should have failed decoding")
|
raise Exception("Expired JWT should have failed decoding")
|
||||||
|
|
||||||
|
|
||||||
|
print("Testing ES256")
|
||||||
|
try:
|
||||||
|
from cryptography import ec
|
||||||
|
except ImportError:
|
||||||
|
raise Exception("No cryptography lib present, can't test ES256")
|
||||||
|
|
||||||
|
private_key = ec.derive_private_key(
|
||||||
|
0xEB6DFB26C7A3C23D33C60F7C7BA61B6893451F2643E0737B20759E457825EE75, ec.SECP256R1()
|
||||||
|
)
|
||||||
|
wrong_private_key = ec.derive_private_key(
|
||||||
|
0x25D91A0DA38F69283A0CE32B87D82817CA4E134A1693BE6083C2292BF562A451, ec.SECP256R1()
|
||||||
|
)
|
||||||
|
|
||||||
|
token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256")
|
||||||
|
decoded = jwt.decode(token, private_key.public_key(), algorithms=["ES256"])
|
||||||
|
if decoded != {"user": "joe"}:
|
||||||
|
raise Exception("Invalid decoded JWT")
|
||||||
|
else:
|
||||||
|
print(I, "Encode/decode test: OK")
|
||||||
|
|
||||||
|
token = jwt.encode({"user": "joe"}, private_key, algorithm="ES256")
|
||||||
|
try:
|
||||||
|
decoded = jwt.decode(token + "a", wrong_private_key.public_key(), algorithms=["ES256"])
|
||||||
|
except jwt.exceptions.InvalidSignatureError:
|
||||||
|
print(I, "Invalid signature test: OK")
|
||||||
|
else:
|
||||||
|
raise Exception("Invalid JWT should have fialed decoding")
|
||||||
|
|
Ładowanie…
Reference in New Issue