amqtt/samples/http_server_integration.py

185 wiersze
5.9 KiB
Python

import asyncio
import io
import logging
import ssl
import aiohttp
from aiohttp import web
from amqtt.adapters import ReaderAdapter, WriterAdapter
from amqtt.broker import Broker
from amqtt.contexts import BrokerConfig, ListenerConfig, ListenerType
logger = logging.getLogger(__name__)
MQTT_LISTENER_NAME = "myMqttListener"
async def hello(request):
"""Get request handler"""
return web.Response(text="Hello, world")
class WebSocketResponseReader(ReaderAdapter):
"""Interface to allow mqtt broker to read from an aiohttp websocket connection."""
def __init__(self, ws: web.WebSocketResponse):
self.ws = ws
self.buffer = bytearray()
async def read(self, n: int = -1) -> bytes:
"""Read 'n' bytes from the datastream, if < 0 read all available bytes
Raises:
BrokerPipeError : if reading on a closed websocket connection
"""
# continue until buffer contains at least the amount of data being requested
while not self.buffer or len(self.buffer) < n:
# if the websocket is closed
if self.ws.closed:
raise BrokenPipeError
try:
# read from stream
msg = await asyncio.wait_for(self.ws.receive(), timeout=0.5)
# mqtt streams should always be binary...
if msg.type == aiohttp.WSMsgType.BINARY:
self.buffer.extend(msg.data)
elif msg.type == aiohttp.WSMsgType.CLOSE:
raise BrokenPipeError
except asyncio.TimeoutError:
raise BrokenPipeError
# return all bytes currently in the buffer
if n == -1:
result = bytes(self.buffer)
self.buffer.clear()
# return the requested number of bytes from the buffer
else:
result = self.buffer[:n]
del self.buffer[:n]
return result
def feed_eof(self) -> None:
pass
class WebSocketResponseWriter(WriterAdapter):
"""Interface to allow mqtt broker to write to an aiohttp websocket connection."""
def __init__(self, ws: web.WebSocketResponse, request: web.Request):
super().__init__()
self.ws = ws
# needed for `get_peer_info`
# https://docs.python.org/3/library/socket.html#socket.socket.getpeername
peer_name = request.transport.get_extra_info("peername")
if peer_name is not None:
self.client_ip, self.port = peer_name[0:2]
else:
self.client_ip, self.port = request.remote, 0
# interpret AF_INET6
self.client_ip = "localhost" if self.client_ip == "::1" else self.client_ip
self._stream = io.BytesIO(b"")
def write(self, data: bytes) -> None:
"""Add bytes to stream buffer."""
self._stream.write(data)
async def drain(self) -> None:
"""Send the collected bytes in the buffer to the websocket connection."""
data = self._stream.getvalue()
if data and len(data):
await self.ws.send_bytes(data)
self._stream = io.BytesIO(b"")
def get_peer_info(self) -> tuple[str, int] | None:
return self.client_ip, self.port
async def close(self) -> None:
# no clean up needed, stream will be gc along with instance
pass
def get_ssl_info(self) -> ssl.SSLObject | None:
pass
async def mqtt_websocket_handler(request: web.Request) -> web.StreamResponse:
# establish connection by responding to the websocket request with the 'mqtt' protocol
ws = web.WebSocketResponse(protocols=["mqtt"])
await ws.prepare(request)
# access the broker created when the server started
b: Broker = request.app["broker"]
# hand-off the websocket data stream to the broker for handling
# `listener_name` is the same name of the externalized listener in the broker config
await b.external_connected(WebSocketResponseReader(ws), WebSocketResponseWriter(ws, request), MQTT_LISTENER_NAME)
logger.debug("websocket connection closed")
return ws
async def websocket_handler(request: web.Request) -> web.StreamResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
logging.info(msg)
logging.info("websocket connection closed")
return ws
def main():
# create an `aiohttp` server
lp = asyncio.get_event_loop()
app = web.Application()
app.add_routes(
[
web.get("/", hello), # http get request/response route
web.get("/ws", websocket_handler), # standard websocket handler
web.get("/mqtt", mqtt_websocket_handler), # websocket handler for mqtt connections
])
# create background task for running the `amqtt` broker
app.cleanup_ctx.append(run_broker)
# make sure that both `aiohttp` server and `amqtt` broker run in the same loop
# so the server can hand off the connection to the broker (prevents attached-to-a-different-loop `RuntimeError`)
web.run_app(app, loop=lp)
async def run_broker(_app):
"""App init function to start (and then shutdown) the `amqtt` broker.
https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks
"""
# standard TCP connection as well as an externalized-listener
cfg = BrokerConfig(
listeners={
"default":ListenerConfig(type=ListenerType.TCP, bind="127.0.0.1:1883"),
MQTT_LISTENER_NAME: ListenerConfig(type=ListenerType.EXTERNAL),
}
)
# make sure the `Broker` runs in the same loop as the aiohttp server
loop = asyncio.get_event_loop()
broker = Broker(config=cfg, loop=loop)
# store broker instance so that incoming requests can hand off processing of a datastream
_app["broker"] = broker
# start the broker
await broker.start()
# pass control back to web app
yield
# closing activities
await broker.shutdown()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()