DTR and RTS control added for CDC ACM driver.
pull/6/head v0.2.0
Quan Lin 2019-06-11 14:24:56 +10:00
rodzic 14172dafaf
commit a176f5deef
3 zmienionych plików z 194 dodań i 33 usunięć

Wyświetl plik

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name="usbserial4a",
version="0.1.10",
version="0.2.0",
description="Python package for Kivy Android USB serial port.",
long_description="https://github.com/jacklinquan/usbserial4a",
long_description_content_type="text/markdown",

Wyświetl plik

@ -9,4 +9,4 @@ Requires: kivy, pyjnius, pyserial, usb4a
'''
# Project version
__version__ = '0.1.10'
__version__ = '0.2.0'

Wyświetl plik

@ -4,6 +4,8 @@ Classes:
CdcAcmSerial(serial.serialutil.SerialBase)
'''
from struct import pack, unpack
import time
from serial.serialutil import SerialBase, SerialException, to_bytes, \
portNotOpenError, writeTimeoutError, Timeout
from usb4a import usb
@ -15,14 +17,17 @@ class CdcAcmSerial(SerialBase):
It can be used in a similar way to serial.Serial from pyserial.
'''
USB_RECIP_INTERFACE = 0x01
USB_RT_ACM = usb.UsbConstants.USB_TYPE_CLASS | USB_RECIP_INTERFACE
REQTYPE_HOST2DEVICE = 0x21
REQTYPE_DEVICE2HOST = 0xA1
SET_LINE_CODING = 0x20 # USB CDC 1.1 section 6.2
SET_LINE_CODING = 0x20
GET_LINE_CODING = 0x21
SET_CONTROL_LINE_STATE = 0x22
SEND_BREAK = 0x23
SET_CONTROL_LINE_STATE_DTR = 0x1
SET_CONTROL_LINE_STATE_RTS = 0x2
STOPBIT_MAP = {
1:0,
1.5:1,
@ -87,6 +92,7 @@ class CdcAcmSerial(SerialBase):
raise SerialException("Could not establish all endpoints!")
self.is_open = True
self._set_dtr_rts(self._dtr_state, self._rts_state)
self._reconfigure_port()
def _open_single_interface(self):
@ -146,6 +152,19 @@ class CdcAcmSerial(SerialBase):
self._read_endpoint = self._data_interface.getEndpoint(1)
self._write_endpoint = self._data_interface.getEndpoint(0)
def _reconfigure_port(self):
'''Reconfigure serial port parameters.'''
msg = bytearray([
self.baudrate & 0xff,
self.baudrate >> 8 & 0xff,
self.baudrate >> 16 & 0xff,
self.baudrate >> 24 & 0xff,
self.STOPBIT_MAP[self.stopbits],
self.PARITY_MAP[self.parity],
self.bytesize])
# Set line coding.
self._ctrl_transfer_out(self.SET_LINE_CODING, 0, msg)
def close(self):
'''Close the serial port.'''
if self._connection:
@ -153,6 +172,8 @@ class CdcAcmSerial(SerialBase):
self._connection = None
self.is_open = False
# - - - - - - - - - - - - - - - - - - - - - - - -
@property
def in_waiting(self):
'''Return the number of bytes currently in the input buffer.
@ -164,6 +185,14 @@ class CdcAcmSerial(SerialBase):
self._read_buffer.extend(self._read())
return len(self._read_buffer)
@property
def out_waiting(self):
'''Return the number of bytes currently in the output buffer.
Always return 0.
'''
return 0
def read(self, size=1):
'''Read data from the serial port.
@ -225,6 +254,158 @@ class CdcAcmSerial(SerialBase):
'''Simply wait some time to allow all data to be written.'''
pass
def reset_input_buffer(self):
'''Clear input buffer, discarding all that is in the buffer.'''
if not self.is_open:
raise portNotOpenError
self._purgeHwBuffers(True, False)
def reset_output_buffer(self):
'''\
Clear output buffer, aborting the current output and discarding all
that is in the buffer.
'''
if not self.is_open:
raise portNotOpenError
self._purgeHwBuffers(False, True)
def send_break(self, duration=0.25):
'''Send break condition.
Parameters:
duration (float): break time in seconds.
'''
if not self.is_open:
raise portNotOpenError
self._ctrl_transfer_out(
self.SEND_BREAK,
int(duration * 1000)
)
def _update_break_state(self):
'''Send break condition.'''
self._set_break(self._break_state)
def _update_rts_state(self):
'''Set terminal status line: Request To Send.'''
self._set_rts(self._rts_state)
def _update_dtr_state(self):
'''Set terminal status line: Data Terminal Ready.'''
self._set_dtr(self._dtr_state)
@property
def cts(self):
'''Read terminal status line: Clear To Send.'''
status = self._poll_modem_status()
return bool(status)
@property
def dsr(self):
'''Read terminal status line: Data Set Ready.'''
status = self._poll_modem_status()
return bool(status)
@property
def ri(self):
'''Read terminal status line: Ring Indicator.'''
status = self._poll_modem_status()
return bool(status)
@property
def cd(self):
'''Read terminal status line: Carrier Detect.'''
status = self._poll_modem_status()
return bool(status)
# - - - - - - - - - - - - - - - - - - - - - - - -
def _ctrl_transfer_out(self, request, value, buf=None):
'''USB control transfer out.
This function does the USB configuration job.
'''
result = self._connection.controlTransfer(
self.REQTYPE_HOST2DEVICE,
request,
value,
0,
buf,
(0 if buf is None else len(buf)),
self.USB_WRITE_TIMEOUT_MILLIS
)
return result
def _ctrl_transfer_in(self, request, value, buf):
'''USB control transfer in.
Request for a control message from the device.
'''
result = self._connection.controlTransfer(
self.REQTYPE_DEVICE2HOST,
request,
value,
0,
buf,
(0 if buf is None else len(buf)),
self.USB_READ_TIMEOUT_MILLIS
)
return result
def _set_break(self, break_):
'''Start or stop a break exception event on the serial line.
Parameters:
break_ (bool): either start or stop break event.
'''
pass
def _set_dtr(self, state):
'''Set dtr line.
Parameters:
state (bool): new DTR logical level.
'''
self._set_dtr_rts(state, self._rts_state)
def _set_rts(self, state):
'''Set rts line.
Parameters:
state (bool): new RTS logical level.
'''
self._set_dtr_rts(self._dtr_state, state)
def _set_dtr_rts(self, dtr, rts):
'''Set dtr and rts lines at once.
Parameters:
dtr (bool): new DTR logical level.
rts (bool): new RTS logical level.
'''
value = 0
if dtr:
value |= self.SET_CONTROL_LINE_STATE_DTR
if rts:
value |= self.SET_CONTROL_LINE_STATE_RTS
self._ctrl_transfer_out(
self.SET_CONTROL_LINE_STATE,
value
)
def _purgeHwBuffers(self, purgeReadBuffers, purgeWriteBuffers):
'''Set serial port parameters.
Parameters:
purgeReadBuffers (bool): need to purge read buffer or not.
purgeWriteBuffers (bool): need to purge write buffer or not.
Returns:
result (bool): successful or not.
'''
return True
def _read(self):
'''Hardware dependent read function.
@ -250,35 +431,15 @@ class CdcAcmSerial(SerialBase):
read = buf[:totalBytesRead]
return bytes(read)
def _send_acm_control_message(self, request, value, buf=None):
'''USB control transfer.
def _poll_modem_status(self):
'''Poll modem status information.
This function does the USB configuration job.
This function allows the retrieve the one status byte of the
device, useful in UART mode.
Returns:
status (int): modem status, as a proprietary bitfield
'''
return self._connection.controlTransfer(
self.USB_RT_ACM,
request,
value,
0,
buf,
0 if buf is None else len(buf),
self.USB_WRITE_TIMEOUT_MILLIS)
def _reconfigure_port(self):
'''Reconfigure serial port parameters.'''
msg = bytearray([
self.baudrate & 0xff,
self.baudrate >> 8 & 0xff,
self.baudrate >> 16 & 0xff,
self.baudrate >> 24 & 0xff,
self.STOPBIT_MAP[self.stopbits],
self.PARITY_MAP[self.parity],
self.bytesize])
# Set line coding.
self._send_acm_control_message(self.SET_LINE_CODING, 0, msg)
# Set line state.
value = (0x2 if self._rts_state else 0) \
or (0x1 if self._dtr_state else 0)
self._send_acm_control_message(self.SET_CONTROL_LINE_STATE, value)
return 0