pull/8/head
Nicolas Jouanin 2015-06-27 17:55:18 +02:00
rodzic f4bab73381
commit 8b4714db37
3 zmienionych plików z 8 dodań i 102 usunięć

Wyświetl plik

@ -50,6 +50,8 @@ class Session:
def open(self, reader: asyncio.StreamReader, writer:asyncio.StreamWriter):
self.reader = reader
self.writer = writer
self.local_address, self.local_port = self.writer.get_extra_info('sockname')
yield from self.handler.start()
@asyncio.coroutine
@ -132,7 +134,7 @@ class ProtocolHandler:
while self._running:
try:
self._reader_ready.set()
fixed_header = yield from asyncio.wait_for(MQTTFixedHeader.from_stream(self.session.reader), 60)
fixed_header = yield from asyncio.wait_for(MQTTFixedHeader.from_stream(self.session.reader), 5)
if fixed_header:
cls = packet_class(fixed_header)
packet = yield from cls.from_stream(self.session.reader, fixed_header=fixed_header)
@ -158,7 +160,7 @@ class ProtocolHandler:
while self._running:
try:
self._writer_ready.set()
packet = yield from asyncio.wait_for(out_queue.get(), 60)
packet = yield from asyncio.wait_for(out_queue.get(), 5)
self.logger.debug(packet)
yield from packet.to_stream(self.session.writer)
yield from self.session.writer.drain()

Wyświetl plik

@ -1,94 +0,0 @@
# Copyright (c) 2015 Nicolas JOUANIN
#
# See the file license.txt for copying permission.
import asyncio
import logging
from enum import Enum
from hbmqtt.mqtt.packet import PacketType
from hbmqtt.mqtt.packet import MQTTFixedHeader
from hbmqtt.mqtt.connect import ConnectVariableHeader, ConnectPacket, ConnectPayload
from hbmqtt.mqtt.protocol import ProtocolHandler
class SessionState(Enum):
NEW = 0
CONNECTED = 1
DISCONNECTED = 2
class Session:
def __init__(self, loop=None):
self.logger = logging.getLogger(__name__)
self.state = SessionState.NEW
self.reader = None
self.writer = None
self.remote_address = None
self.remote_port = None
self.local_address = None
self.local_port = None
self.client_id = None
self.clean_session = None
self.will_flag = False
self.will_message = None
self.will_qos = None
self.will_retain = None
self.will_topic = None
self.keep_alive = None
self.username = None
self.password = None
self.scheme = None
self._packet_id = 0
self.incoming_queues = dict()
for p in PacketType:
self.incoming_queues[p] = asyncio.Queue()
self.outgoing_queue = asyncio.Queue()
self.handler = ProtocolHandler(self, loop)
def open(self, reader: asyncio.StreamReader, writer:asyncio.StreamWriter):
self.reader = reader
self.writer = writer
self.handler.start()
@asyncio.coroutine
def close(self):
yield from self.handler.stop()
self.writer.close()
def build_connect_packet(self):
vh = ConnectVariableHeader()
payload = ConnectPayload()
vh.keep_alive = self.keep_alive
vh.clean_session_flag = self.clean_session
vh.will_retain_flag = self.will_retain
payload.client_id = self.client_id
if self.username:
vh.username_flag = True
payload.username = self.username
else:
vh.username_flag = False
if self.password:
vh.password_flag = True
payload.password = self.password
else:
vh.password_flag = False
if self.will_flag:
vh.will_flag = True
vh.will_qos = self.will_qos
payload.will_message = self.will_message
payload.will_topic = self.will_topic
else:
vh.will_flag = False
header = MQTTFixedHeader(PacketType.CONNECT, 0x00)
packet = ConnectPacket(header, vh, payload)
return packet
@property
def next_packet_id(self):
self._packet_id += 1
return self._packet_id

Wyświetl plik

@ -32,18 +32,16 @@ class ConnectPacketTest(unittest.TestCase):
@asyncio.coroutine
def client():
S = Session(loop)
S.reader, S.writer = yield from asyncio.open_connection('127.0.0.1', 8888,
S = Session()
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
yield from S.start()
yield from S.open(reader, writer)
incoming_packet = yield from S.incoming_queues[PacketType.CONNECT].get()
S.writer.close()
yield from S.stop()
yield from S.close()
return incoming_packet
packet = loop.run_until_complete(client())
server.close()
loop.stop()
self.assertEquals(packet.fixed_header.packet_type, PacketType.CONNECT)
def test_write_loop(self):