kopia lustrzana https://github.com/Yakifo/amqtt
185 wiersze
5.9 KiB
Python
185 wiersze
5.9 KiB
Python
import asyncio
|
|
import io
|
|
import logging
|
|
import ssl
|
|
|
|
import aiohttp
|
|
from aiohttp import web
|
|
|
|
from amqtt.adapters import ReaderAdapter, WriterAdapter
|
|
from amqtt.broker import Broker
|
|
from amqtt.contexts import BrokerConfig, ListenerConfig, ListenerType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MQTT_LISTENER_NAME = "myMqttListener"
|
|
|
|
async def hello(request):
|
|
"""Get request handler"""
|
|
return web.Response(text="Hello, world")
|
|
|
|
class WebSocketResponseReader(ReaderAdapter):
|
|
"""Interface to allow mqtt broker to read from an aiohttp websocket connection."""
|
|
|
|
def __init__(self, ws: web.WebSocketResponse):
|
|
self.ws = ws
|
|
self.buffer = bytearray()
|
|
|
|
async def read(self, n: int = -1) -> bytes:
|
|
"""Read 'n' bytes from the datastream, if < 0 read all available bytes
|
|
|
|
Raises:
|
|
BrokerPipeError : if reading on a closed websocket connection
|
|
|
|
"""
|
|
# continue until buffer contains at least the amount of data being requested
|
|
while not self.buffer or len(self.buffer) < n:
|
|
# if the websocket is closed
|
|
if self.ws.closed:
|
|
raise BrokenPipeError
|
|
|
|
try:
|
|
# read from stream
|
|
msg = await asyncio.wait_for(self.ws.receive(), timeout=0.5)
|
|
# mqtt streams should always be binary...
|
|
if msg.type == aiohttp.WSMsgType.BINARY:
|
|
self.buffer.extend(msg.data)
|
|
elif msg.type == aiohttp.WSMsgType.CLOSE:
|
|
raise BrokenPipeError
|
|
|
|
except asyncio.TimeoutError:
|
|
raise BrokenPipeError
|
|
|
|
# return all bytes currently in the buffer
|
|
if n == -1:
|
|
result = bytes(self.buffer)
|
|
self.buffer.clear()
|
|
# return the requested number of bytes from the buffer
|
|
else:
|
|
result = self.buffer[:n]
|
|
del self.buffer[:n]
|
|
|
|
return result
|
|
|
|
def feed_eof(self) -> None:
|
|
pass
|
|
|
|
class WebSocketResponseWriter(WriterAdapter):
|
|
"""Interface to allow mqtt broker to write to an aiohttp websocket connection."""
|
|
|
|
def __init__(self, ws: web.WebSocketResponse, request: web.Request):
|
|
super().__init__()
|
|
self.ws = ws
|
|
|
|
# needed for `get_peer_info`
|
|
# https://docs.python.org/3/library/socket.html#socket.socket.getpeername
|
|
peer_name = request.transport.get_extra_info("peername")
|
|
if peer_name is not None:
|
|
self.client_ip, self.port = peer_name[0:2]
|
|
else:
|
|
self.client_ip, self.port = request.remote, 0
|
|
|
|
# interpret AF_INET6
|
|
self.client_ip = "localhost" if self.client_ip == "::1" else self.client_ip
|
|
|
|
self._stream = io.BytesIO(b"")
|
|
|
|
def write(self, data: bytes) -> None:
|
|
"""Add bytes to stream buffer."""
|
|
self._stream.write(data)
|
|
|
|
async def drain(self) -> None:
|
|
"""Send the collected bytes in the buffer to the websocket connection."""
|
|
data = self._stream.getvalue()
|
|
if data and len(data):
|
|
await self.ws.send_bytes(data)
|
|
self._stream = io.BytesIO(b"")
|
|
|
|
def get_peer_info(self) -> tuple[str, int] | None:
|
|
return self.client_ip, self.port
|
|
|
|
async def close(self) -> None:
|
|
# no clean up needed, stream will be gc along with instance
|
|
pass
|
|
|
|
def get_ssl_info(self) -> ssl.SSLObject | None:
|
|
pass
|
|
|
|
|
|
async def mqtt_websocket_handler(request: web.Request) -> web.StreamResponse:
|
|
|
|
# establish connection by responding to the websocket request with the 'mqtt' protocol
|
|
ws = web.WebSocketResponse(protocols=["mqtt"])
|
|
await ws.prepare(request)
|
|
|
|
# access the broker created when the server started
|
|
b: Broker = request.app["broker"]
|
|
|
|
# hand-off the websocket data stream to the broker for handling
|
|
# `listener_name` is the same name of the externalized listener in the broker config
|
|
await b.external_connected(WebSocketResponseReader(ws), WebSocketResponseWriter(ws, request), MQTT_LISTENER_NAME)
|
|
|
|
logger.debug("websocket connection closed")
|
|
return ws
|
|
|
|
|
|
async def websocket_handler(request: web.Request) -> web.StreamResponse:
|
|
ws = web.WebSocketResponse()
|
|
await ws.prepare(request)
|
|
|
|
async for msg in ws:
|
|
logging.info(msg)
|
|
|
|
logging.info("websocket connection closed")
|
|
return ws
|
|
|
|
def main():
|
|
# create an `aiohttp` server
|
|
lp = asyncio.get_event_loop()
|
|
app = web.Application()
|
|
app.add_routes(
|
|
[
|
|
web.get("/", hello), # http get request/response route
|
|
web.get("/ws", websocket_handler), # standard websocket handler
|
|
web.get("/mqtt", mqtt_websocket_handler), # websocket handler for mqtt connections
|
|
])
|
|
# create background task for running the `amqtt` broker
|
|
app.cleanup_ctx.append(run_broker)
|
|
|
|
# make sure that both `aiohttp` server and `amqtt` broker run in the same loop
|
|
# so the server can hand off the connection to the broker (prevents attached-to-a-different-loop `RuntimeError`)
|
|
web.run_app(app, loop=lp)
|
|
|
|
|
|
async def run_broker(_app):
|
|
"""App init function to start (and then shutdown) the `amqtt` broker.
|
|
https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks
|
|
"""
|
|
# standard TCP connection as well as an externalized-listener
|
|
cfg = BrokerConfig(
|
|
listeners={
|
|
"default":ListenerConfig(type=ListenerType.TCP, bind="127.0.0.1:1883"),
|
|
MQTT_LISTENER_NAME: ListenerConfig(type=ListenerType.EXTERNAL),
|
|
}
|
|
)
|
|
|
|
# make sure the `Broker` runs in the same loop as the aiohttp server
|
|
loop = asyncio.get_event_loop()
|
|
broker = Broker(config=cfg, loop=loop)
|
|
|
|
# store broker instance so that incoming requests can hand off processing of a datastream
|
|
_app["broker"] = broker
|
|
# start the broker
|
|
await broker.start()
|
|
|
|
# pass control back to web app
|
|
yield
|
|
|
|
# closing activities
|
|
await broker.shutdown()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO)
|
|
main()
|