diff --git a/amqtt/client.py b/amqtt/client.py index 03c1b95..0264ee5 100644 --- a/amqtt/client.py +++ b/amqtt/client.py @@ -501,13 +501,6 @@ class MQTTClient: reader = WebSocketsReader(websocket) writer = WebSocketsWriter(websocket) - elif scheme == "unix": - conn_reader, conn_writer = await asyncio.wait_for( - asyncio.open_unix_connection( - path=self.session.broker_uri, - **kwargs), timeout=connection_timeout) - reader = UnixStreamReaderAdapter(conn_reader) - writer = StreamWriterAdapter(conn_writer) elif not self.session.broker_uri: msg = "missing broker uri" raise ClientError(msg) diff --git a/samples/unix_socket_connector.py b/samples/unix_socket_adapters.py similarity index 98% rename from samples/unix_socket_connector.py rename to samples/unix_socket_adapters.py index 9079622..35a5f35 100644 --- a/samples/unix_socket_connector.py +++ b/samples/unix_socket_adapters.py @@ -1,4 +1,3 @@ -import asyncio import contextlib import logging from asyncio import StreamWriter, StreamReader, Event @@ -54,3 +53,5 @@ class UnixStreamWriterAdapter(WriterAdapter): with contextlib.suppress(AttributeError): await self._writer.wait_closed() + + diff --git a/samples/unix_socket_broker.py b/samples/unix_socket_broker.py new file mode 100644 index 0000000..a3a8b72 --- /dev/null +++ b/samples/unix_socket_broker.py @@ -0,0 +1,15 @@ +from amqtt.broker import Broker +from amqtt.contexts import BrokerConfig, ListenerConfig + + +async def main(): + + cfg = BrokerConfig( + listeners={ + 'default': ListenerConfig( + ListenerType.External + ) + } + ) + + b = Broker() \ No newline at end of file diff --git a/samples/unix_socket_client.py b/samples/unix_socket_client.py new file mode 100644 index 0000000..5ac62d4 --- /dev/null +++ b/samples/unix_socket_client.py @@ -0,0 +1,27 @@ +import asyncio + +from amqtt.client import MQTTClient, ClientContext +from amqtt.contexts import ClientConfig +from amqtt.mqtt.protocol.client_handler import ClientProtocolHandler +from amqtt.plugins.manager import PluginManager +from amqtt.session import Session +from samples.unix_socket_adapters import UnixStreamReaderAdapter, UnixStreamWriterAdapter + + +async def client(): + config = ClientConfig() + context = ClientContext() + context.config = config + plugins_manager = PluginManager("amqtt.client.plugins", context) + + cph = ClientProtocolHandler(plugins_manager) + + s = Session() + r = UnixStreamReaderAdapter() + w = UnixStreamWriterAdapter() + + cph.attach(session=s, reader=r, writer=w) + await cph.mqtt_connect() + +if __name__ == '__main__': + asyncio.run(client())