add documentation, expand functionality to handle case including test cases

pull/262/head
Andrew Mirsky 2025-07-10 14:25:58 -04:00
rodzic 4a622f7e8c
commit 893aec2d4a
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: A98E67635CDF2C39
3 zmienionych plików z 113 dodań i 24 usunięć

Wyświetl plik

@ -44,8 +44,11 @@ class ACLError(Exception):
HTTP_2xx_MIN = 200
HTTP_2xx_MAX = 300
HTTP_4xx_MIN = 400
HTTP_4xx_MAX = 500
class HttpAuthACL(BaseAuthPlugin, BaseTopicPlugin):
class HttpAuthACLPlugin(BaseAuthPlugin, BaseTopicPlugin):
def __init__(self, context: BrokerContext) -> None:
super().__init__(context)
@ -66,8 +69,11 @@ class HttpAuthACL(BaseAuthPlugin, BaseTopicPlugin):
def _is_2xx(r: ClientResponse) -> bool:
return HTTP_2xx_MIN <= r.status < HTTP_2xx_MAX
async def _send_request(self, url: str, payload: dict[str, Any]) -> bool:
@staticmethod
def _is_4xx(r: ClientResponse) -> bool:
return HTTP_4xx_MIN <= r.status < HTTP_4xx_MAX
def _get_params(self, payload: dict[str, Any]) -> dict[str, Any]:
match self.config.params_mode:
case ParamsMode.FORM:
match self.config.request_method:
@ -78,20 +84,32 @@ class HttpAuthACL(BaseAuthPlugin, BaseTopicPlugin):
kwargs = {"data": d}
case _: # JSON
kwargs = { "json": payload}
return kwargs
async with self.method(url, **kwargs) as r: # type: ignore[arg-type]
async def _send_request(self, url: str, payload: dict[str, Any]) -> bool|None: # pylint: disable=R0911
kwargs = self._get_params(payload)
async with self.method(url, **kwargs) as r:
logger.debug(f"http request returned {r.status}")
if not self._is_2xx(r):
return False
match self.config.response_mode:
case ResponseMode.TEXT:
return (await r.text()).lower() == "ok"
return self._is_2xx(r) and (await r.text()).lower() == "ok"
case ResponseMode.STATUS:
return self._is_2xx(r)
if self._is_2xx(r):
return True
if self._is_4xx(r):
return False
# any other code
return None
case _:
if not self._is_2xx(r):
return False
data = await r.json()
for ok in ("OK", "Ok", "ok"):
if ok in data and data[ok] is None:
return None
if ok in data and isinstance(data[ok], bool):
return bool(data[ok])
return False

Wyświetl plik

@ -3,3 +3,48 @@
Plugins that are not part of the core functionality of the aMQTT broker or client, often requiring additional dependencies.
## Authentication & Topic Access via external HTTP server
`amqtt.contrib.http.HttpAuthACLPlugin`
If clients accessing the broker are managed by another application, implement API endpoints
that allows the broker to check if a client is authenticated and what topics that client
is authorized to access.
**Configuration**
- `host` *(str) hostname of the server for the auth & acl check
- `port` *(int) port of the server for the auth & acl check
- `user_uri` *(str) uri of the topic check (e.g. '/user')
- `acl_uri` *(str) uri of the topic check (e.g. '/acl')
- `request_method` *(RequestMethod) send the request as a GET, POST or PUT
- `params_mode` *(ParamsMode) send the request with json or form data
- `response_mode` *(ResponseMode) expected response from the auth/acl server. STATUS (code), JSON, or TEXT.
- `user_agent` *(str) the 'User-Agent' header sent along with the request
Each endpoint (uri) will receive the information needed to determine authentication and authorization (in either
json or form data format, based on the `params_mode`)
For user authentication (`user_uri`), the http server will receive in json or form format the following:
- username *(str)*
- password *(str)*
- client_id *(str)*
For superuser validation (`superuser_uri`), the http server will receive in json or form format the following:
- username *(str)*
For acl check (`acl_uri`), the http server will receive in json or form format the following:
- username *(str)*
- client_id *(str)*
- topic *(str)*
- acc *(int)* client can receive (1), can publish(2), can receive & publish (3) and can subscribe (4)
The HTTP endpoints can respond in three different ways, depending on `response_mode`:
1. STATUS - allowing access should respond with a 2xx status code. rejection is 4xx.
if a 5xx is received, the plugin will not participate in the filtering operation and will defer to another topic filtering plugin to determine access
2. JSON - response should be `{'ok':true|false|null, 'error':'optional reason for false or null response'}`.
`true` allows access, `false` denies access and `null` the plugin will not participate in the filtering operation
3. TEXT - `ok` allows access, any other message denies access. non-participation not supported with this mode.

Wyświetl plik

@ -7,51 +7,59 @@ from aiohttp.web import Response
from amqtt.broker import BrokerContext, Broker
from amqtt.contexts import Action
from amqtt.contrib.http import HttpAuthACL, ParamsMode, ResponseMode, RequestMethod
from amqtt.contrib.http import HttpAuthACLPlugin, ParamsMode, ResponseMode, RequestMethod
from amqtt.session import Session
logger = logging.getLogger(__name__)
def determine_auth_response_mode(d) -> Response:
def determine_auth_response(d) -> Response:
# check that auth response contains the correct params
assert 'username' in d
assert 'password' in d
assert 'client_id' in d
# use the username to determine response kind
if d['username'] == 'json':
# special case, i_am_null respond with None
if d['password'] == 'i_am_null':
return web.json_response({'Ok': None})
# otherwise, respond depending on if username and client_id match
return web.json_response({'Ok': d['username'] == d['password']})
elif d['username'] == 'status':
if d['password'] == 'i_am_null':
return web.Response(status=500)
return web.Response(status=200) if d['username'] == d['password'] else web.Response(status=400)
else: # text
return web.Response(text='ok' if d['username'] == d['password'] else 'error')
# aiohttp doesn't have a common `dispatch` method like django; therefore, need to take the non-DRY approach...
class JsonAuthView(web.View):
async def get(self) -> Response:
d = await self.request.json()
return determine_auth_response_mode(d)
return determine_auth_response(d)
async def post(self) -> Response:
d = dict(await self.request.json())
return determine_auth_response_mode(d)
return determine_auth_response(d)
async def put(self) -> Response:
d = dict(await self.request.json())
return determine_auth_response_mode(d)
return determine_auth_response(d)
class FormAuthView(web.View):
async def get(self) -> Response:
d = self.request.query
return determine_auth_response_mode(d)
return determine_auth_response(d)
async def post(self) -> Response:
d = dict(await self.request.post())
return determine_auth_response_mode(d)
return determine_auth_response(d)
async def put(self) -> Response:
d = dict(await self.request.post())
return determine_auth_response_mode(d)
return determine_auth_response(d)
@pytest.fixture
@ -90,15 +98,24 @@ def generate_use_cases(root_url):
for params in ParamsMode:
for response in ResponseMode:
url = f'/{root_url}/json' if params == ParamsMode.JSON else f'/{root_url}/form'
for is_authenticated in [True, False]:
prefix = '' if is_authenticated else 'not'
case = (url, request, params, response, response.value, f"{prefix}{response.value}", is_authenticated)
for is_authenticated in [True, False, None]:
if is_authenticated is None:
pwd = 'i_am_null'
elif is_authenticated:
pwd = f'{response.value}'
else:
pwd = f'not{response.value}'
if response == ResponseMode.TEXT and is_authenticated is None:
is_authenticated = False
case = (url, request, params, response, response.value, f"{pwd}", is_authenticated)
cases.append(case)
return cases
def test_generated_use_cases():
cases = generate_use_cases('user')
assert len(cases) == 36
assert len(cases) == 54
@pytest.mark.parametrize("url,request_method,params_mode,response_mode,username,password,is_authenticated",
@ -109,7 +126,7 @@ async def test_request_auth_response(empty_broker, http_auth_server, url,
username, password, is_authenticated):
context = BrokerContext(broker=empty_broker)
context.config = HttpAuthACL.Config(
context.config = HttpAuthACLPlugin.Config(
host="127.0.0.1",
port=8080,
user_uri=url,
@ -118,7 +135,7 @@ async def test_request_auth_response(empty_broker, http_auth_server, url,
params_mode=params_mode,
response_mode=response_mode,
)
http_acl = HttpAuthACL(context)
http_acl = HttpAuthACLPlugin(context)
session = Session()
session.client_id = "my_client_id"
@ -130,13 +147,22 @@ async def test_request_auth_response(empty_broker, http_auth_server, url,
def determine_acl_response(d) -> Response:
# make sure the params have the right categories
assert 'username' in d
assert 'client_id' in d
assert 'topic' in d
assert 'acc' in d
assert 1 <= int(d['acc']) <= 4
# use the username to determine response kind
if d['username'] == 'json':
# special case, i_am_null respond with None
if d['client_id'] == 'i_am_null':
return web.json_response({'Ok': None})
# otherwise, respond depending on if username and client_id match
return web.json_response({'Ok': d['username'] == d['client_id']})
elif d['username'] == 'status':
if d['client_id'] == 'i_am_null':
return web.Response(status=500)
return web.Response(status=200) if d['username'] == d['client_id'] else web.Response(status=400)
else: # text
return web.Response(text='ok' if d['username'] == d['client_id'] else 'error')
@ -202,7 +228,7 @@ async def test_request_acl_response(empty_broker, http_acl_server, url,
# response_mode = ResponseMode.JSON
context = BrokerContext(broker=empty_broker)
context.config = HttpAuthACL.Config(
context.config = HttpAuthACLPlugin.Config(
host="127.0.0.1",
port=8080,
user_uri='/user',
@ -211,7 +237,7 @@ async def test_request_acl_response(empty_broker, http_acl_server, url,
params_mode=params_mode,
response_mode=response_mode,
)
http_acl = HttpAuthACL(context)
http_acl = HttpAuthACLPlugin(context)
s = Session()
s.username = username