kopia lustrzana https://github.com/micropython/micropython-lib
python-ecosys/pyjwt: Add pyjwt-compatible module.
rodzic
70e422dc2e
commit
9fdf046e27
|
@ -0,0 +1,78 @@
|
||||||
|
import binascii
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import json
|
||||||
|
from time import time
|
||||||
|
|
||||||
|
def _to_b64url(data):
|
||||||
|
return (
|
||||||
|
binascii.b2a_base64(data)
|
||||||
|
.rstrip(b"\n")
|
||||||
|
.rstrip(b"=")
|
||||||
|
.replace(b"+", b"-")
|
||||||
|
.replace(b"/", b"_")
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _from_b64url(data):
|
||||||
|
return binascii.a2b_base64(data.replace(b"-", b"+").replace(b"_", b"/") + b"===")
|
||||||
|
|
||||||
|
|
||||||
|
class exceptions:
|
||||||
|
class PyJWTError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class InvalidTokenError(PyJWTError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class InvalidAlgorithmError(PyJWTError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class InvalidSignatureError(PyJWTError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ExpiredSignatureError(PyJWTError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def encode(payload, key, algorithm="HS256"):
|
||||||
|
if algorithm != "HS256":
|
||||||
|
raise exceptions.InvalidAlgorithmError
|
||||||
|
|
||||||
|
if isinstance(key, str):
|
||||||
|
key = key.encode()
|
||||||
|
header = _to_b64url(json.dumps({"typ": "JWT", "alg": algorithm}).encode())
|
||||||
|
payload = _to_b64url(json.dumps(payload).encode())
|
||||||
|
signature = _to_b64url(hmac.new(key, header + b"." + payload, hashlib.sha256).digest())
|
||||||
|
return (header + b"." + payload + b"." + signature).decode()
|
||||||
|
|
||||||
|
|
||||||
|
def decode(token, key, algorithms=["HS256"]):
|
||||||
|
if "HS256" not in algorithms:
|
||||||
|
raise exceptions.InvalidAlgorithmError
|
||||||
|
|
||||||
|
parts = token.encode().split(b".")
|
||||||
|
if len(parts) != 3:
|
||||||
|
raise exceptions.InvalidTokenError
|
||||||
|
|
||||||
|
try:
|
||||||
|
header = json.loads(_from_b64url(parts[0]).decode())
|
||||||
|
payload = json.loads(_from_b64url(parts[1]).decode())
|
||||||
|
signature = _from_b64url(parts[2])
|
||||||
|
except Exception:
|
||||||
|
raise exceptions.InvalidTokenError
|
||||||
|
|
||||||
|
if header["alg"] not in algorithms or header["alg"] != "HS256":
|
||||||
|
raise exceptions.InvalidAlgorithmError
|
||||||
|
|
||||||
|
if isinstance(key, str):
|
||||||
|
key = key.encode()
|
||||||
|
calculated_signature = hmac.new(key, parts[0] + b"." + parts[1], hashlib.sha256).digest()
|
||||||
|
if signature != calculated_signature:
|
||||||
|
raise exceptions.InvalidSignatureError
|
||||||
|
|
||||||
|
if "exp" in payload:
|
||||||
|
if time() > payload["exp"]:
|
||||||
|
raise exceptions.ExpiredSignatureError
|
||||||
|
|
||||||
|
return payload
|
|
@ -0,0 +1,28 @@
|
||||||
|
import jwt
|
||||||
|
from time import time
|
||||||
|
|
||||||
|
secret_key = "top-secret!"
|
||||||
|
|
||||||
|
token = jwt.encode({"user": "joe"}, secret_key, algorithm="HS256")
|
||||||
|
print(token)
|
||||||
|
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
|
||||||
|
if decoded != {"user": "joe"}:
|
||||||
|
raise Exception("Invalid decoded JWT")
|
||||||
|
else:
|
||||||
|
print("Encode/decode test: OK")
|
||||||
|
|
||||||
|
try:
|
||||||
|
decoded = jwt.decode(token, "wrong-secret", algorithms=["HS256"])
|
||||||
|
except jwt.exceptions.InvalidSignatureError:
|
||||||
|
print("Invalid signature test: OK")
|
||||||
|
else:
|
||||||
|
raise Exception("Invalid JWT should have failed decoding")
|
||||||
|
|
||||||
|
token = jwt.encode({"user": "joe", "exp": time() - 1}, secret_key)
|
||||||
|
print(token)
|
||||||
|
try:
|
||||||
|
decoded = jwt.decode(token, secret_key, algorithms=["HS256"])
|
||||||
|
except jwt.exceptions.ExpiredSignatureError:
|
||||||
|
print("Expired token test: OK")
|
||||||
|
else:
|
||||||
|
raise Exception("Expired JWT should have failed decoding")
|
Ładowanie…
Reference in New Issue