amqtt/samples/http_server_integration.py

185 wiersze
5.9 KiB
Python

2025-08-04 20:27:51 +00:00
import asyncio
import io
2025-08-04 20:27:51 +00:00
import logging
import ssl
2025-08-04 20:27:51 +00:00
import aiohttp
from aiohttp import web
from amqtt.adapters import ReaderAdapter, WriterAdapter
from amqtt.broker import Broker
from amqtt.contexts import BrokerConfig, ListenerConfig, ListenerType
logger = logging.getLogger(__name__)
2025-08-04 20:27:51 +00:00
MQTT_LISTENER_NAME = "myMqttListener"
2025-08-04 20:27:51 +00:00
async def hello(request):
"""Get request handler"""
2025-08-04 20:27:51 +00:00
return web.Response(text="Hello, world")
class WebSocketResponseReader(ReaderAdapter):
"""Interface to allow mqtt broker to read from an aiohttp websocket connection."""
2025-08-04 20:27:51 +00:00
def __init__(self, ws: web.WebSocketResponse):
self.ws = ws
self.buffer = bytearray()
async def read(self, n: int = -1) -> bytes:
"""Read 'n' bytes from the datastream, if < 0 read all available bytes
Raises:
BrokerPipeError : if reading on a closed websocket connection
"""
# continue until buffer contains at least the amount of data being requested
while not self.buffer or len(self.buffer) < n:
# if the websocket is closed
if self.ws.closed:
raise BrokenPipeError
try:
# read from stream
msg = await asyncio.wait_for(self.ws.receive(), timeout=0.5)
# mqtt streams should always be binary...
if msg.type == aiohttp.WSMsgType.BINARY:
self.buffer.extend(msg.data)
elif msg.type == aiohttp.WSMsgType.CLOSE:
raise BrokenPipeError
except asyncio.TimeoutError:
raise BrokenPipeError
2025-08-04 21:47:29 +00:00
# return all bytes currently in the buffer
2025-08-04 20:27:51 +00:00
if n == -1:
result = bytes(self.buffer)
self.buffer.clear()
# return the requested number of bytes from the buffer
2025-08-04 20:27:51 +00:00
else:
result = self.buffer[:n]
del self.buffer[:n]
2025-08-04 21:47:29 +00:00
2025-08-04 20:27:51 +00:00
return result
def feed_eof(self) -> None:
pass
class WebSocketResponseWriter(WriterAdapter):
"""Interface to allow mqtt broker to write to an aiohttp websocket connection."""
2025-08-04 20:27:51 +00:00
def __init__(self, ws: web.WebSocketResponse, request: web.Request):
2025-08-04 20:27:51 +00:00
super().__init__()
self.ws = ws
# needed for `get_peer_info`
# https://docs.python.org/3/library/socket.html#socket.socket.getpeername
peer_name = request.transport.get_extra_info("peername")
if peer_name is not None:
self.client_ip, self.port = peer_name[0:2]
else:
self.client_ip, self.port = request.remote, 0
# interpret AF_INET6
self.client_ip = "localhost" if self.client_ip == "::1" else self.client_ip
self._stream = io.BytesIO(b"")
2025-08-04 20:27:51 +00:00
def write(self, data: bytes) -> None:
"""Add bytes to stream buffer."""
self._stream.write(data)
2025-08-04 20:27:51 +00:00
async def drain(self) -> None:
"""Send the collected bytes in the buffer to the websocket connection."""
data = self._stream.getvalue()
if data and len(data):
await self.ws.send_bytes(data)
self._stream = io.BytesIO(b"")
2025-08-04 20:27:51 +00:00
def get_peer_info(self) -> tuple[str, int] | None:
return self.client_ip, self.port
2025-08-04 20:27:51 +00:00
async def close(self) -> None:
# no clean up needed, stream will be gc along with instance
2025-08-04 20:27:51 +00:00
pass
def get_ssl_info(self) -> ssl.SSLObject | None:
pass
async def mqtt_websocket_handler(request: web.Request) -> web.StreamResponse:
2025-08-04 20:27:51 +00:00
# establish connection by responding to the websocket request with the 'mqtt' protocol
ws = web.WebSocketResponse(protocols=["mqtt"])
2025-08-04 20:27:51 +00:00
await ws.prepare(request)
2025-08-04 21:47:29 +00:00
# access the broker created when the server started
b: Broker = request.app["broker"]
2025-08-04 20:27:51 +00:00
# hand-off the websocket data stream to the broker for handling
# `listener_name` is the same name of the externalized listener in the broker config
await b.external_connected(WebSocketResponseReader(ws), WebSocketResponseWriter(ws, request), MQTT_LISTENER_NAME)
logger.debug("websocket connection closed")
2025-08-04 20:27:51 +00:00
return ws
async def websocket_handler(request: web.Request) -> web.StreamResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
2025-08-04 20:27:51 +00:00
async for msg in ws:
logging.info(msg)
logging.info("websocket connection closed")
return ws
def main():
# create an `aiohttp` server
lp = asyncio.get_event_loop()
2025-08-04 20:27:51 +00:00
app = web.Application()
app.add_routes(
[
web.get("/", hello), # http get request/response route
web.get("/ws", websocket_handler), # standard websocket handler
web.get("/mqtt", mqtt_websocket_handler), # websocket handler for mqtt connections
2025-08-04 20:27:51 +00:00
])
# create background task for running the `amqtt` broker
2025-08-04 20:27:51 +00:00
app.cleanup_ctx.append(run_broker)
# make sure that both `aiohttp` server and `amqtt` broker run in the same loop
# so the server can hand off the connection to the broker (prevents attached-to-a-different-loop `RuntimeError`)
web.run_app(app, loop=lp)
2025-08-04 20:27:51 +00:00
async def run_broker(_app):
"""App init function to start (and then shutdown) the `amqtt` broker.
https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks
"""
# standard TCP connection as well as an externalized-listener
2025-08-04 20:27:51 +00:00
cfg = BrokerConfig(
listeners={
"default":ListenerConfig(type=ListenerType.TCP, bind="127.0.0.1:1883"),
MQTT_LISTENER_NAME: ListenerConfig(type=ListenerType.EXTERNAL),
2025-08-04 20:27:51 +00:00
}
)
# make sure the `Broker` runs in the same loop as the aiohttp server
loop = asyncio.get_event_loop()
2025-08-04 20:27:51 +00:00
broker = Broker(config=cfg, loop=loop)
# store broker instance so that incoming requests can hand off processing of a datastream
_app["broker"] = broker
# start the broker
2025-08-04 20:27:51 +00:00
await broker.start()
# pass control back to web app
2025-08-04 20:27:51 +00:00
yield
# closing activities
2025-08-04 20:27:51 +00:00
await broker.shutdown()
if __name__ == "__main__":
2025-08-04 20:27:51 +00:00
logging.basicConfig(level=logging.INFO)
main()