kopia lustrzana https://github.com/Yakifo/amqtt
Move CONNECT/CONNACK handling to broker class
rodzic
9317ceb8fa
commit
6c8313f1b7
117
hbmqtt/broker.py
117
hbmqtt/broker.py
|
@ -7,6 +7,10 @@ import asyncio
|
|||
from transitions import Machine, MachineError
|
||||
from hbmqtt.session import Session
|
||||
from hbmqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
|
||||
from hbmqtt.mqtt.connect import ConnectPacket
|
||||
from hbmqtt.mqtt.connack import ConnackPacket, ReturnCode
|
||||
from hbmqtt.errors import HBMQTTException
|
||||
from hbmqtt.utils import format_client_message, gen_client_id
|
||||
|
||||
|
||||
_defaults = {
|
||||
|
@ -36,6 +40,7 @@ class Broker:
|
|||
self._server = None
|
||||
self._handlers = []
|
||||
self._init_states()
|
||||
self._sessions = dict()
|
||||
|
||||
def _init_states(self):
|
||||
self.machine = Machine(states=Broker.states, initial='new')
|
||||
|
@ -86,15 +91,123 @@ class Broker:
|
|||
remote_address = extra_info[0]
|
||||
remote_port = extra_info[1]
|
||||
self.logger.debug("Connection from %s:%d" % (remote_address, remote_port))
|
||||
new_session = Session()
|
||||
|
||||
# Wait for first packet and expect a CONNECT
|
||||
connect = None
|
||||
try:
|
||||
connect = yield from ConnectPacket.from_stream(reader)
|
||||
self.check_connect(connect)
|
||||
except HBMQTTException as exc:
|
||||
self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" % (format_client_message(address=remote_address, port=remote_port), exc))
|
||||
writer.close()
|
||||
return
|
||||
except BrokerException as be:
|
||||
self.logger.error('Invalid connection from %s : %s' % (format_client_message(address=remote_address, port=remote_port), be))
|
||||
writer.close()
|
||||
return
|
||||
|
||||
if connect.variable_header.proto_level != 4:
|
||||
# only MQTT 3.1.1 supported
|
||||
self.logger.error('Invalid protocol from %s: %d' % (format_client_message(address=remote_address, port=remote_port), connect.variable_header.protocol_level))
|
||||
connack = ConnackPacket.build(0, ReturnCode.UNACCEPTABLE_PROTOCOL_VERSION) #[MQTT-3.2.2-4] session_parent=0
|
||||
self.logger.debug(" -out-> " + repr(connack))
|
||||
yield from connack.to_stream(writer)
|
||||
writer.close()
|
||||
return
|
||||
|
||||
if connect.variable_header.username_flag and connect.payload.username is None:
|
||||
self.logger.error('Invalid username from %s' % (format_client_message(address=remote_address, port=remote_port)))
|
||||
connack = ConnackPacket.build(0, ReturnCode.BAD_USERNAME_PASSWORD) #[MQTT-3.2.2-4] session_parent=0
|
||||
self.logger.debug(" -out-> " + repr(connack))
|
||||
yield from connack.to_stream(writer)
|
||||
writer.close()
|
||||
return
|
||||
|
||||
if connect.variable_header.password_flag and connect.payload.password is None:
|
||||
self.logger.error('Invalid password %s' % (format_client_message(address=remote_address, port=remote_port)))
|
||||
connack = ConnackPacket.build(0, ReturnCode.BAD_USERNAME_PASSWORD) #[MQTT-3.2.2-4] session_parent=0
|
||||
self.logger.debug(" -out-> " + repr(connack))
|
||||
yield from connack.to_stream(writer)
|
||||
writer.close()
|
||||
return
|
||||
|
||||
new_session = None
|
||||
if connect.variable_header.clean_session_flag:
|
||||
client_id = connect.payload.client_id
|
||||
if client_id is not None and client_id in self._sessions:
|
||||
del self._sessions[client_id]
|
||||
new_session = Session()
|
||||
new_session.parent = 0
|
||||
else:
|
||||
# Get session from cache
|
||||
client_id = connect.payload.client_id
|
||||
if client_id is None:
|
||||
self.logger.error('[MQTT-3.1.3-8] [MQTT-3.1.3-9] %s: No client Id provided (cleansession=0)' % format_client_message(address=remote_address, port=remote_port))
|
||||
connack = ConnackPacket.build(0, ReturnCode.IDENTIFIER_REJECTED)
|
||||
self.logger.debug(" -out-> " + repr(connack))
|
||||
yield from connack.to_stream(writer)
|
||||
writer.close()
|
||||
return
|
||||
if client_id in self._sessions:
|
||||
new_session = self._sessions[client_id]
|
||||
new_session.parent = 1
|
||||
else:
|
||||
new_session = Session()
|
||||
new_session.parent = 0
|
||||
|
||||
if new_session.client_id is None:
|
||||
# Generate client ID
|
||||
new_session.client_id = gen_client_id()
|
||||
new_session.remote_address = remote_address
|
||||
new_session.remote_port = remote_port
|
||||
new_session.clean_session = connect.variable_header.clean_session_flag
|
||||
new_session.will_flag = connect.variable_header.will_flag
|
||||
new_session.will_retain = connect.variable_header.will_retain_flag
|
||||
new_session.will_qos = connect.variable_header.will_qos
|
||||
new_session.will_topic = connect.payload.will_topic
|
||||
new_session.will_message = connect.payload.will_message
|
||||
new_session.username = connect.payload.username
|
||||
new_session.password = connect.payload.password
|
||||
new_session.client_id = connect.payload.client_id
|
||||
new_session.keep_alive = connect.variable_header.keep_alive
|
||||
|
||||
new_session.reader = reader
|
||||
new_session.writer = writer
|
||||
|
||||
if self.authenticate(new_session):
|
||||
connack = ConnackPacket.build(new_session.parent, ReturnCode.CONNECTION_ACCEPTED)
|
||||
self.logger.info('%s : connection accepted' % format_client_message(session=new_session))
|
||||
self.logger.debug(" -out-> " + repr(connack))
|
||||
yield from connack.to_stream(writer)
|
||||
else:
|
||||
connack = ConnackPacket.build(new_session.parent, ReturnCode.NOT_AUTHORIZED)
|
||||
self.logger.info('%s : connection refused' % format_client_message(session=new_session))
|
||||
self.logger.debug(" -out-> " + repr(connack))
|
||||
yield from connack.to_stream(writer)
|
||||
writer.close()
|
||||
return
|
||||
|
||||
handler = BrokerProtocolHandler(new_session, self._loop)
|
||||
self._handlers.append(handler)
|
||||
self.logger.debug("Start messages handling")
|
||||
yield from handler.start()
|
||||
self.logger.debug("Wait for disconnect")
|
||||
yield from handler.wait_disconnect()
|
||||
yield from handler.stop()
|
||||
self.logger.debug("Client disconnected")
|
||||
yield from handler.stop()
|
||||
|
||||
@asyncio.coroutine
|
||||
def check_connect(self, connect: ConnectPacket):
|
||||
if connect.payload.client_id is None:
|
||||
raise BrokerException('[[MQTT-3.1.3-3]] : Client identifier must be present' )
|
||||
|
||||
if connect.variable_header.will_flag:
|
||||
if connect.payload.will_topic is None or connect.payload.will_message is None:
|
||||
raise BrokerException('will flag set, but will topic/message not present in payload')
|
||||
|
||||
if connect.variable_header.reserved_flag:
|
||||
raise BrokerException('[MQTT-3.1.2-3] CONNECT reserved flag must be set to 0')
|
||||
|
||||
def authenticate(self, session: Session):
|
||||
# TODO : Handle client authentication here
|
||||
return True
|
|
@ -99,15 +99,13 @@ class ConnectVariableHeader(MQTTVariableHeader):
|
|||
if protocol_name != "MQTT":
|
||||
raise MQTTException('[MQTT-3.1.2-1] Incorrect protocol name: "%s"' % protocol_name)
|
||||
|
||||
# protocol level (only MQTT 3.1.1 supported)
|
||||
# protocol level
|
||||
protocol_level_byte = yield from read_or_raise(reader, 1)
|
||||
protocol_level = bytes_to_int(protocol_level_byte)
|
||||
|
||||
# flags
|
||||
flags_byte = yield from read_or_raise(reader, 1)
|
||||
flags = bytes_to_int(flags_byte)
|
||||
if flags & 0x01:
|
||||
raise MQTTException('[MQTT-3.1.2-3] CONNECT reserved flag must be set to 0')
|
||||
|
||||
# keep-alive
|
||||
keep_alive_byte = yield from read_or_raise(reader, 2)
|
||||
|
@ -152,7 +150,7 @@ class ConnectPayload(MQTTPayload):
|
|||
try:
|
||||
payload.client_id = yield from decode_string(reader)
|
||||
except NoDataException:
|
||||
raise MQTTException('[[MQTT-3.1.3-3]] Client identifier must be present')
|
||||
payload.client_id = None
|
||||
|
||||
# Read will topic, username and password
|
||||
if variable_header.will_flag:
|
||||
|
@ -160,19 +158,20 @@ class ConnectPayload(MQTTPayload):
|
|||
payload.will_topic = yield from decode_string(reader)
|
||||
payload.will_message = yield from decode_string(reader)
|
||||
except NoDataException:
|
||||
raise MQTTException('will flag set, but will topic/message not present in payload')
|
||||
payload.will_topic = None
|
||||
payload.will_message = None
|
||||
|
||||
if variable_header.username_flag:
|
||||
try:
|
||||
payload.username = yield from decode_string(reader)
|
||||
except NoDataException:
|
||||
raise MQTTException('username flag set, but username not present in payload')
|
||||
payload.username = None
|
||||
|
||||
if variable_header.password_flag:
|
||||
try:
|
||||
payload.password = yield from decode_string(reader)
|
||||
except NoDataException:
|
||||
raise MQTTException('password flag set, but password not present in payload')
|
||||
payload.password = None
|
||||
|
||||
return payload
|
||||
|
||||
|
|
|
@ -5,18 +5,12 @@ import logging
|
|||
import asyncio
|
||||
from asyncio import futures
|
||||
from hbmqtt.mqtt.protocol.handler import ProtocolHandler
|
||||
from hbmqtt.mqtt.packet import MQTTFixedHeader
|
||||
from hbmqtt.mqtt.packet import PacketType
|
||||
from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPacket, ConnectPayload
|
||||
from hbmqtt.mqtt.connack import ConnackPacket, ReturnCode
|
||||
from hbmqtt.mqtt.disconnect import DisconnectPacket
|
||||
from hbmqtt.mqtt.pingreq import PingReqPacket
|
||||
from hbmqtt.mqtt.pingresp import PingRespPacket
|
||||
from hbmqtt.mqtt.subscribe import SubscribePacket
|
||||
from hbmqtt.mqtt.suback import SubackPacket
|
||||
from hbmqtt.mqtt.unsubscribe import UnsubscribePacket
|
||||
from hbmqtt.mqtt.unsuback import UnsubackPacket
|
||||
from hbmqtt.session import Session
|
||||
from hbmqtt.utils import format_client_message
|
||||
|
||||
class BrokerProtocolHandler(ProtocolHandler):
|
||||
def __init__(self, session: Session, loop=None):
|
||||
|
@ -44,9 +38,10 @@ class BrokerProtocolHandler(ProtocolHandler):
|
|||
|
||||
@asyncio.coroutine
|
||||
def handle_connect(self, connect: ConnectPacket):
|
||||
# TODO : implements this correcly (manage authentication, cleansession, ...)
|
||||
self.logger.debug("Connect received")
|
||||
yield from self.outgoing_queue.put(ConnackPacket.build(0, ReturnCode.CONNECTION_ACCEPTED))
|
||||
# Broker handler shouldn't received CONNECT message during messages handling
|
||||
# as CONNECT messages are managed by the broker on client connection
|
||||
self.logger.error('[MQTT-3.1.0-2] %s : CONNECT message received during messages handling' % (format_client_message(self.session)))
|
||||
self._disconnect_waiter.set_result(None)
|
||||
|
||||
@asyncio.coroutine
|
||||
def handle_pingreq(self, pingreq: PingReqPacket):
|
||||
|
|
|
@ -26,6 +26,7 @@ class Session:
|
|||
self.password = None
|
||||
self.scheme = None
|
||||
self._packet_id = 0
|
||||
self.parent = 0
|
||||
|
||||
self.inflight_out = dict()
|
||||
self.inflight_in = dict()
|
||||
|
|
Ładowanie…
Reference in New Issue