diff --git a/setup.py b/setup.py index 28a8b61..7fb1570 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name="usbserial4a", - version="0.1.10", + version="0.2.0", description="Python package for Kivy Android USB serial port.", long_description="https://github.com/jacklinquan/usbserial4a", long_description_content_type="text/markdown", diff --git a/usbserial4a/__init__.py b/usbserial4a/__init__.py index 02ed8a8..4c842db 100644 --- a/usbserial4a/__init__.py +++ b/usbserial4a/__init__.py @@ -9,4 +9,4 @@ Requires: kivy, pyjnius, pyserial, usb4a ''' # Project version -__version__ = '0.1.10' +__version__ = '0.2.0' diff --git a/usbserial4a/cdcacmserial4a.py b/usbserial4a/cdcacmserial4a.py index e90d0fb..3871a58 100644 --- a/usbserial4a/cdcacmserial4a.py +++ b/usbserial4a/cdcacmserial4a.py @@ -4,6 +4,8 @@ Classes: CdcAcmSerial(serial.serialutil.SerialBase) ''' +from struct import pack, unpack +import time from serial.serialutil import SerialBase, SerialException, to_bytes, \ portNotOpenError, writeTimeoutError, Timeout from usb4a import usb @@ -15,14 +17,17 @@ class CdcAcmSerial(SerialBase): It can be used in a similar way to serial.Serial from pyserial. ''' - USB_RECIP_INTERFACE = 0x01 - USB_RT_ACM = usb.UsbConstants.USB_TYPE_CLASS | USB_RECIP_INTERFACE + REQTYPE_HOST2DEVICE = 0x21 + REQTYPE_DEVICE2HOST = 0xA1 - SET_LINE_CODING = 0x20 # USB CDC 1.1 section 6.2 + SET_LINE_CODING = 0x20 GET_LINE_CODING = 0x21 SET_CONTROL_LINE_STATE = 0x22 SEND_BREAK = 0x23 + SET_CONTROL_LINE_STATE_DTR = 0x1 + SET_CONTROL_LINE_STATE_RTS = 0x2 + STOPBIT_MAP = { 1:0, 1.5:1, @@ -87,6 +92,7 @@ class CdcAcmSerial(SerialBase): raise SerialException("Could not establish all endpoints!") self.is_open = True + self._set_dtr_rts(self._dtr_state, self._rts_state) self._reconfigure_port() def _open_single_interface(self): @@ -146,6 +152,19 @@ class CdcAcmSerial(SerialBase): self._read_endpoint = self._data_interface.getEndpoint(1) self._write_endpoint = self._data_interface.getEndpoint(0) + def _reconfigure_port(self): + '''Reconfigure serial port parameters.''' + msg = bytearray([ + self.baudrate & 0xff, + self.baudrate >> 8 & 0xff, + self.baudrate >> 16 & 0xff, + self.baudrate >> 24 & 0xff, + self.STOPBIT_MAP[self.stopbits], + self.PARITY_MAP[self.parity], + self.bytesize]) + # Set line coding. + self._ctrl_transfer_out(self.SET_LINE_CODING, 0, msg) + def close(self): '''Close the serial port.''' if self._connection: @@ -153,6 +172,8 @@ class CdcAcmSerial(SerialBase): self._connection = None self.is_open = False + # - - - - - - - - - - - - - - - - - - - - - - - - + @property def in_waiting(self): '''Return the number of bytes currently in the input buffer. @@ -164,6 +185,14 @@ class CdcAcmSerial(SerialBase): self._read_buffer.extend(self._read()) return len(self._read_buffer) + @property + def out_waiting(self): + '''Return the number of bytes currently in the output buffer. + + Always return 0. + ''' + return 0 + def read(self, size=1): '''Read data from the serial port. @@ -225,6 +254,158 @@ class CdcAcmSerial(SerialBase): '''Simply wait some time to allow all data to be written.''' pass + def reset_input_buffer(self): + '''Clear input buffer, discarding all that is in the buffer.''' + if not self.is_open: + raise portNotOpenError + self._purgeHwBuffers(True, False) + + def reset_output_buffer(self): + '''\ + Clear output buffer, aborting the current output and discarding all + that is in the buffer. + ''' + if not self.is_open: + raise portNotOpenError + self._purgeHwBuffers(False, True) + + def send_break(self, duration=0.25): + '''Send break condition. + + Parameters: + duration (float): break time in seconds. + ''' + if not self.is_open: + raise portNotOpenError + self._ctrl_transfer_out( + self.SEND_BREAK, + int(duration * 1000) + ) + + def _update_break_state(self): + '''Send break condition.''' + self._set_break(self._break_state) + + def _update_rts_state(self): + '''Set terminal status line: Request To Send.''' + self._set_rts(self._rts_state) + + def _update_dtr_state(self): + '''Set terminal status line: Data Terminal Ready.''' + self._set_dtr(self._dtr_state) + + @property + def cts(self): + '''Read terminal status line: Clear To Send.''' + status = self._poll_modem_status() + return bool(status) + + @property + def dsr(self): + '''Read terminal status line: Data Set Ready.''' + status = self._poll_modem_status() + return bool(status) + + @property + def ri(self): + '''Read terminal status line: Ring Indicator.''' + status = self._poll_modem_status() + return bool(status) + + @property + def cd(self): + '''Read terminal status line: Carrier Detect.''' + status = self._poll_modem_status() + return bool(status) + + # - - - - - - - - - - - - - - - - - - - - - - - - + + def _ctrl_transfer_out(self, request, value, buf=None): + '''USB control transfer out. + + This function does the USB configuration job. + ''' + result = self._connection.controlTransfer( + self.REQTYPE_HOST2DEVICE, + request, + value, + 0, + buf, + (0 if buf is None else len(buf)), + self.USB_WRITE_TIMEOUT_MILLIS + ) + + return result + + def _ctrl_transfer_in(self, request, value, buf): + '''USB control transfer in. + + Request for a control message from the device. + ''' + result = self._connection.controlTransfer( + self.REQTYPE_DEVICE2HOST, + request, + value, + 0, + buf, + (0 if buf is None else len(buf)), + self.USB_READ_TIMEOUT_MILLIS + ) + + return result + + def _set_break(self, break_): + '''Start or stop a break exception event on the serial line. + + Parameters: + break_ (bool): either start or stop break event. + ''' + pass + + def _set_dtr(self, state): + '''Set dtr line. + + Parameters: + state (bool): new DTR logical level. + ''' + self._set_dtr_rts(state, self._rts_state) + + def _set_rts(self, state): + '''Set rts line. + + Parameters: + state (bool): new RTS logical level. + ''' + self._set_dtr_rts(self._dtr_state, state) + + def _set_dtr_rts(self, dtr, rts): + '''Set dtr and rts lines at once. + + Parameters: + dtr (bool): new DTR logical level. + rts (bool): new RTS logical level. + ''' + value = 0 + if dtr: + value |= self.SET_CONTROL_LINE_STATE_DTR + if rts: + value |= self.SET_CONTROL_LINE_STATE_RTS + self._ctrl_transfer_out( + self.SET_CONTROL_LINE_STATE, + value + ) + + def _purgeHwBuffers(self, purgeReadBuffers, purgeWriteBuffers): + '''Set serial port parameters. + + Parameters: + purgeReadBuffers (bool): need to purge read buffer or not. + purgeWriteBuffers (bool): need to purge write buffer or not. + Returns: + result (bool): successful or not. + ''' + return True + def _read(self): '''Hardware dependent read function. @@ -250,35 +431,15 @@ class CdcAcmSerial(SerialBase): read = buf[:totalBytesRead] return bytes(read) - def _send_acm_control_message(self, request, value, buf=None): - '''USB control transfer. + def _poll_modem_status(self): + '''Poll modem status information. - This function does the USB configuration job. + This function allows the retrieve the one status byte of the + device, useful in UART mode. + + Returns: + status (int): modem status, as a proprietary bitfield ''' - return self._connection.controlTransfer( - self.USB_RT_ACM, - request, - value, - 0, - buf, - 0 if buf is None else len(buf), - self.USB_WRITE_TIMEOUT_MILLIS) - - def _reconfigure_port(self): - '''Reconfigure serial port parameters.''' - msg = bytearray([ - self.baudrate & 0xff, - self.baudrate >> 8 & 0xff, - self.baudrate >> 16 & 0xff, - self.baudrate >> 24 & 0xff, - self.STOPBIT_MAP[self.stopbits], - self.PARITY_MAP[self.parity], - self.bytesize]) - # Set line coding. - self._send_acm_control_message(self.SET_LINE_CODING, 0, msg) - # Set line state. - value = (0x2 if self._rts_state else 0) \ - or (0x1 if self._dtr_state else 0) - self._send_acm_control_message(self.SET_CONTROL_LINE_STATE, value) + return 0 \ No newline at end of file