kopia lustrzana https://github.com/Yakifo/amqtt
Yakifo/amqtt#73 : adding test case for external http server integration. comments and documentation.
rodzic
0141cddeeb
commit
6ff7c5a87a
|
@ -52,6 +52,8 @@ class RetainedApplicationMessage(ApplicationMessage):
|
|||
|
||||
|
||||
class Server:
|
||||
"""Used to encapsulate the server associated with a listener. Allows broker to interact with the connection lifecycle."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
listener_name: str,
|
||||
|
@ -90,8 +92,10 @@ class Server:
|
|||
|
||||
|
||||
class ExternalServer(Server):
|
||||
def __init__(self):
|
||||
super().__init__('aiohttp', None)
|
||||
"""For external listeners, the connection lifecycle is handled by that implementation so these are no-ops."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("aiohttp", None) # type: ignore[arg-type]
|
||||
|
||||
async def acquire_connection(self) -> None:
|
||||
pass
|
||||
|
@ -104,10 +108,7 @@ class ExternalServer(Server):
|
|||
|
||||
|
||||
class BrokerContext(BaseContext):
|
||||
"""BrokerContext is used as the context passed to plugins interacting with the broker.
|
||||
|
||||
It act as an adapter to broker services from plugins developed for HBMQTT broker.
|
||||
"""
|
||||
"""BrokerContext is used as the context passed to plugins interacting with the broker."""
|
||||
|
||||
def __init__(self, broker: "Broker") -> None:
|
||||
super().__init__()
|
||||
|
@ -257,10 +258,14 @@ class Broker:
|
|||
max_connections = listener.get("max_connections", -1)
|
||||
ssl_context = self._create_ssl_context(listener) if listener.get("ssl", False) else None
|
||||
|
||||
# for listeners which are external, don't need to create a server
|
||||
if listener.type == ListenerType.EXTERNAL:
|
||||
|
||||
# broker still needs to associate a new connection to the listener
|
||||
self.logger.info(f"External listener exists for '{listener_name}' ")
|
||||
self._servers[listener_name] = ExternalServer()
|
||||
else:
|
||||
# for tcp and websockets, start servers to listen for inbound connections
|
||||
try:
|
||||
address, port = self._split_bindaddr_port(listener["bind"], DEFAULT_PORTS[listener["type"]])
|
||||
except ValueError as e:
|
||||
|
@ -404,6 +409,7 @@ class Broker:
|
|||
await self._client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
|
||||
|
||||
async def external_connected(self, reader: ReaderAdapter, writer: WriterAdapter, listener_name: str) -> None:
|
||||
"""Engage the broker in handling the data stream to/from an established connection."""
|
||||
await self._client_connected(listener_name, reader, writer)
|
||||
|
||||
async def _client_connected(self, listener_name: str, reader: ReaderAdapter, writer: WriterAdapter) -> None:
|
||||
|
|
|
@ -128,10 +128,6 @@ class ListenerConfig(Dictable):
|
|||
if isinstance(getattr(self, fn), str):
|
||||
setattr(self, fn, Path(getattr(self, fn)))
|
||||
|
||||
# if self.type == ListenerType.EXTERNAL and not all([self.reader, self.writer]):
|
||||
# msg = "external type requires specifying reader, writer and server classes"
|
||||
# raise ValueError(msg)
|
||||
|
||||
def apply(self, other: "ListenerConfig") -> None:
|
||||
"""Apply the field from 'other', if 'self' field is default."""
|
||||
for f in fields(self):
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import asyncio
|
||||
import traceback
|
||||
|
||||
try:
|
||||
from asyncio import InvalidStateError, QueueFull, QueueShutDown
|
||||
|
@ -536,10 +535,6 @@ class ProtocolHandler(Generic[C]):
|
|||
self.handle_read_timeout()
|
||||
except NoDataError:
|
||||
self.logger.debug(f"{self.session.client_id} No data available")
|
||||
except RuntimeError:
|
||||
self.logger.debug(f"{self.session.client_id} websocket closed")
|
||||
traceback.print_exc()
|
||||
break
|
||||
except Exception as e: # noqa: BLE001
|
||||
self.logger.warning(f"{type(self).__name__} Unhandled exception in reader coro: {e!r}")
|
||||
break
|
||||
|
|
|
@ -8,12 +8,14 @@ from aiohttp import web
|
|||
from amqtt.adapters import ReaderAdapter, WriterAdapter
|
||||
from amqtt.broker import Broker
|
||||
from amqtt.contexts import BrokerConfig, ListenerConfig, ListenerType
|
||||
|
||||
from amqtt.errors import ConnectError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MQTT_LISTENER_NAME = "myMqttListener"
|
||||
|
||||
async def hello(request):
|
||||
"""get request handler"""
|
||||
return web.Response(text="Hello, world")
|
||||
|
||||
class WebSocketResponseReader(ReaderAdapter):
|
||||
|
@ -25,26 +27,34 @@ class WebSocketResponseReader(ReaderAdapter):
|
|||
|
||||
async def read(self, n: int = -1) -> bytes:
|
||||
"""
|
||||
read 'n' bytes from the datastream, if < 0 read all available bytes
|
||||
|
||||
Raises:
|
||||
BrokerPipeError : if reading on a closed websocket connection
|
||||
"""
|
||||
# continue until buffer contains at least the amount of data being requested
|
||||
while not self.buffer or len(self.buffer) < n:
|
||||
# if the websocket is closed
|
||||
if self.ws.closed:
|
||||
raise BrokenPipeError()
|
||||
|
||||
try:
|
||||
async with asyncio.timeout(0.5):
|
||||
msg = await self.ws.receive()
|
||||
if msg.type == aiohttp.WSMsgType.BINARY:
|
||||
self.buffer.extend(msg.data)
|
||||
elif msg.type == aiohttp.WSMsgType.CLOSE:
|
||||
raise BrokenPipeError()
|
||||
# read from stream
|
||||
msg = await asyncio.wait_for(self.ws.receive(), timeout=0.5)
|
||||
# mqtt streams should always be binary...
|
||||
if msg.type == aiohttp.WSMsgType.BINARY:
|
||||
self.buffer.extend(msg.data)
|
||||
elif msg.type == aiohttp.WSMsgType.CLOSE:
|
||||
raise BrokenPipeError()
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
raise BrokenPipeError()
|
||||
|
||||
# return all bytes currently in the buffer
|
||||
if n == -1:
|
||||
result = bytes(self.buffer)
|
||||
self.buffer.clear()
|
||||
# return the requested number of bytes from the buffer
|
||||
else:
|
||||
result = self.buffer[:n]
|
||||
del self.buffer[:n]
|
||||
|
@ -75,9 +85,11 @@ class WebSocketResponseWriter(WriterAdapter):
|
|||
self._stream = io.BytesIO(b"")
|
||||
|
||||
def write(self, data: bytes) -> None:
|
||||
"""Add bytes to stream buffer."""
|
||||
self._stream.write(data)
|
||||
|
||||
async def drain(self) -> None:
|
||||
"""Send the collected bytes in the buffer to the websocket connection."""
|
||||
data = self._stream.getvalue()
|
||||
if data and len(data):
|
||||
await self.ws.send_bytes(data)
|
||||
|
@ -87,54 +99,79 @@ class WebSocketResponseWriter(WriterAdapter):
|
|||
return self.client_ip, self.port
|
||||
|
||||
async def close(self) -> None:
|
||||
# no clean up needed, stream will be gc along with instance
|
||||
pass
|
||||
|
||||
async def mqtt_websocket_handler(request: web.Request) -> web.StreamResponse:
|
||||
|
||||
async def websocket_handler(request: web.Request) -> web.StreamResponse:
|
||||
|
||||
# respond to the websocket request with the 'mqtt' protocol
|
||||
# establish connection by responding to the websocket request with the 'mqtt' protocol
|
||||
ws = web.WebSocketResponse(protocols=['mqtt',])
|
||||
await ws.prepare(request)
|
||||
|
||||
# access the broker created when the server started and notify the broker of this new connection
|
||||
# access the broker created when the server started
|
||||
b: Broker = request.app['broker']
|
||||
|
||||
# send/receive data to the websocket. must pass the name of the externalized listener in the broker config
|
||||
await b.external_connected(WebSocketResponseReader(ws), WebSocketResponseWriter(ws, request), 'myAIOHttp')
|
||||
# hand-off the websocket data stream to the broker for handling
|
||||
# `listener_name` is the same name of the externalized listener in the broker config
|
||||
await b.external_connected(WebSocketResponseReader(ws), WebSocketResponseWriter(ws, request), MQTT_LISTENER_NAME)
|
||||
|
||||
logger.debug('websocket connection closed')
|
||||
return ws
|
||||
|
||||
|
||||
def main():
|
||||
async def websocket_handler(request: web.Request) -> web.StreamResponse:
|
||||
ws = web.WebSocketResponse()
|
||||
await ws.prepare(request)
|
||||
|
||||
async for msg in ws:
|
||||
logging.info(msg)
|
||||
|
||||
logging.info("websocket connection closed")
|
||||
return ws
|
||||
|
||||
def main():
|
||||
# create an `aiohttp` server
|
||||
lp = asyncio.get_event_loop()
|
||||
app = web.Application()
|
||||
app.add_routes(
|
||||
[
|
||||
web.get('/', hello),
|
||||
web.get('/ws', websocket_handler)
|
||||
web.get('/', hello), # http get request/response route
|
||||
web.get('/ws', websocket_handler), # standard websocket handler
|
||||
web.get('/mqtt', mqtt_websocket_handler), # websocket handler for mqtt connections
|
||||
])
|
||||
# create background task for running the `amqtt` broker
|
||||
app.cleanup_ctx.append(run_broker)
|
||||
web.run_app(app)
|
||||
|
||||
# make sure that both `aiohttp` server and `amqtt` broker run in the same loop
|
||||
# so the server can hand off the connection to the broker (prevents attached-to-a-different-loop `RuntimeError`)
|
||||
web.run_app(app, loop=lp)
|
||||
|
||||
|
||||
async def run_broker(_app):
|
||||
"""https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks"""
|
||||
loop = asyncio.get_event_loop()
|
||||
"""App init function to start (and then shutdown) the `amqtt` broker.
|
||||
https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks"""
|
||||
|
||||
# standard TCP connection as well as an externalized-listener
|
||||
cfg = BrokerConfig(
|
||||
listeners={
|
||||
'default':ListenerConfig(type=ListenerType.TCP, bind='127.0.0.1:1883'),
|
||||
'myAIOHttp': ListenerConfig(type=ListenerType.EXTERNAL),
|
||||
MQTT_LISTENER_NAME: ListenerConfig(type=ListenerType.EXTERNAL),
|
||||
}
|
||||
)
|
||||
|
||||
# make sure the `Broker` runs in the same loop as the aiohttp server
|
||||
loop = asyncio.get_event_loop()
|
||||
broker = Broker(config=cfg, loop=loop)
|
||||
|
||||
# store broker instance so that incoming requests can hand off processing of a datastream
|
||||
_app['broker'] = broker
|
||||
# start the broker
|
||||
await broker.start()
|
||||
|
||||
# pass control back to web app
|
||||
yield
|
||||
|
||||
# closing activities
|
||||
await broker.shutdown()
|
||||
|
||||
|
|
@ -2,14 +2,15 @@ import asyncio
|
|||
import logging
|
||||
import signal
|
||||
import subprocess
|
||||
|
||||
from multiprocessing import Process
|
||||
from pathlib import Path
|
||||
from samples.http_server_integration import main as http_server_main
|
||||
|
||||
import pytest
|
||||
|
||||
from amqtt.broker import Broker
|
||||
from samples.client_publish import __main__ as client_publish_main
|
||||
from samples.client_subscribe import __main__ as client_subscribe_main
|
||||
from samples.client_keepalive import __main__ as client_keepalive_main
|
||||
from amqtt.client import MQTTClient
|
||||
from samples.broker_acl import config as broker_acl_config
|
||||
from samples.broker_taboo import config as broker_taboo_config
|
||||
|
||||
|
@ -275,4 +276,25 @@ async def test_client_subscribe_plugin_taboo():
|
|||
assert "ERROR" not in stderr.decode("utf-8")
|
||||
assert "Exception" not in stderr.decode("utf-8")
|
||||
|
||||
await broker.shutdown()
|
||||
await broker.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def external_http_server():
|
||||
p = Process(target=http_server_main)
|
||||
p.start()
|
||||
yield p
|
||||
p.terminate()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_external_http_server(external_http_server):
|
||||
|
||||
await asyncio.sleep(1)
|
||||
client = MQTTClient(config={'auto_reconnect': False})
|
||||
await client.connect("ws://127.0.0.1:8080/mqtt")
|
||||
assert client.session is not None
|
||||
await client.publish("my/topic", b'test message')
|
||||
await client.disconnect()
|
||||
# Send the interrupt signal
|
||||
await asyncio.sleep(1)
|
||||
|
|
Ładowanie…
Reference in New Issue