kopia lustrzana https://github.com/snarfed/bridgy-fed
rodzic
cdb81ed3e1
commit
e303f55fdf
45
models.py
45
models.py
|
@ -4,13 +4,14 @@ from datetime import timedelta, timezone
|
|||
import difflib
|
||||
import itertools
|
||||
import logging
|
||||
import random
|
||||
import urllib.parse
|
||||
|
||||
import requests
|
||||
from werkzeug.exceptions import BadRequest, NotFound
|
||||
|
||||
from Crypto import Random
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.PublicKey import ECC, RSA
|
||||
from Crypto.Util import number
|
||||
from flask import g, request
|
||||
from google.cloud import ndb
|
||||
|
@ -67,9 +68,17 @@ def long_to_base64(x):
|
|||
class User(StringIdModel):
|
||||
"""Stores a Bridgy Fed user.
|
||||
|
||||
The key name is the domain. The key pair is used for ActivityPub HTTP Signatures.
|
||||
The key name is the domain.
|
||||
|
||||
https://tools.ietf.org/html/draft-cavage-http-signatures-07
|
||||
Stores multiple keypairs needed for the supported protocols. Currently:
|
||||
|
||||
* RSA keypair for ActivityPub HTTP Signatures
|
||||
properties: mod, public_exponent, private_exponent
|
||||
https://tools.ietf.org/html/draft-cavage-http-signatures-12
|
||||
|
||||
* P-256 keypair for AT Protocol's signing key
|
||||
property: p256_key, PEM encoded
|
||||
https://atproto.com/guides/overview#account-portability
|
||||
|
||||
The key pair's modulus and exponent properties are all encoded as base64url
|
||||
(ie URL-safe base64) strings as described in RFC 4648 and section 5.1 of the
|
||||
|
@ -78,6 +87,7 @@ class User(StringIdModel):
|
|||
mod = ndb.StringProperty(required=True)
|
||||
public_exponent = ndb.StringProperty(required=True)
|
||||
private_exponent = ndb.StringProperty(required=True)
|
||||
p256_key = ndb.StringProperty()
|
||||
has_redirects = ndb.BooleanProperty()
|
||||
redirects_error = ndb.TextProperty()
|
||||
has_hcard = ndb.BooleanProperty()
|
||||
|
@ -112,20 +122,23 @@ class User(StringIdModel):
|
|||
def get_or_create(domain, **kwargs):
|
||||
"""Loads and returns a User. Creates it if necessary."""
|
||||
user = User.get_by_id(domain)
|
||||
if user:
|
||||
return user
|
||||
|
||||
if not user:
|
||||
# originally from django_salmon.magicsigs
|
||||
# this uses urandom(), and does nontrivial math, so it can take a
|
||||
# while depending on the amount of randomness available.
|
||||
rng = Random.new().read
|
||||
key = RSA.generate(KEY_BITS, rng)
|
||||
user = User(id=domain,
|
||||
mod=long_to_base64(key.n),
|
||||
public_exponent=long_to_base64(key.e),
|
||||
private_exponent=long_to_base64(key.d),
|
||||
**kwargs)
|
||||
user.put()
|
||||
|
||||
# originally from django_salmon.magicsigs
|
||||
# this uses urandom(), and does nontrivial math, so it can take a
|
||||
# while depending on the amount of randomness available.
|
||||
rng = Random.new().read
|
||||
rsa_key = RSA.generate(KEY_BITS, rng)
|
||||
p256_key = ECC.generate(curve='P-256',
|
||||
randfunc=random.randbytes if DEBUG else None)
|
||||
user = User(id=domain,
|
||||
mod=long_to_base64(rsa_key.n),
|
||||
public_exponent=long_to_base64(rsa_key.e),
|
||||
private_exponent=long_to_base64(rsa_key.d),
|
||||
p256_key=p256_key.export_key(format='PEM'),
|
||||
**kwargs)
|
||||
user.put()
|
||||
return user
|
||||
|
||||
def href(self):
|
||||
|
|
|
@ -8,6 +8,7 @@ from oauth_dropins.webutil.testutil import NOW, requests_response
|
|||
|
||||
from app import app
|
||||
import common
|
||||
from Crypto.PublicKey import ECC
|
||||
from models import Follower, Object, OBJECT_EXPIRE_AGE, User
|
||||
import protocol
|
||||
from protocol import Protocol
|
||||
|
@ -32,12 +33,23 @@ class UserTest(testutil.TestCase):
|
|||
super().tearDown()
|
||||
|
||||
def test_get_or_create(self):
|
||||
assert g.user.mod
|
||||
assert g.user.public_exponent
|
||||
assert g.user.private_exponent
|
||||
user = User.get_or_create('a.b')
|
||||
|
||||
same = User.get_or_create('y.z')
|
||||
self.assertEqual(same, g.user)
|
||||
assert user.mod
|
||||
assert user.public_exponent
|
||||
assert user.private_exponent
|
||||
assert user.p256_key
|
||||
|
||||
# check that we can load the keys
|
||||
assert user.public_pem()
|
||||
assert user.private_pem()
|
||||
|
||||
p256_key = ECC.import_key(user.p256_key)
|
||||
assert isinstance(p256_key, ECC.EccKey)
|
||||
self.assertEqual('NIST P-256', p256_key.curve)
|
||||
|
||||
same = User.get_or_create('a.b')
|
||||
self.assertEqual(same, user)
|
||||
|
||||
def test_get_or_create_use_instead(self):
|
||||
user = User.get_or_create('a.b')
|
||||
|
|
Ładowanie…
Reference in New Issue