""" Communicate with the Arduino. """ import glob import os.path import serial from termios import tcgetattr, HUPCL, ONLCR, tcsetattr, TCSAFLUSH import time import logging import sys, traceback _logger = logging.getLogger(__name__) class Ctrl(object): RESET1 = b'\x81' RESET2 = b'\x82' ACK1 = b'\x81' ACK2 = b'\x82' OUTOFSYNC = b'\x83' PING = b'\x80' PONG = b'\x80' DATA_ESCAPE = b'\x84' @staticmethod def is_control(b): return (b & 0xC0) == 0x80 DATA_DEBUG_PREFIX = b'\xC0' def configure_serial(path): # Disables parameters which are not controlled by pyserial with open(path) as f: # Known example: [0o2400, 0o5, 0o6275, 0o105073, 0o15, 0o15, [...]] attrs = tcgetattr(f) # output mode: disable CRLF translation (ONLCR) attrs[1] &= ~ONLCR # control mode: Disable reset after hangup (HUPCL) attrs[2] &= ~HUPCL tcsetattr(f, TCSAFLUSH, attrs) ### Serial I/O processing class SerialStateError(RuntimeError): '''Requests a RESET.''' # Last processed byte class SerialState(object): def __init__(self, ser): self.ser = ser # None if there is no expected data self.data_len = None self.data = None # Whether escaped data was previously requested self.data_escape = False # If not None, then an integer marking the upper half of the length. self.data_len2 = None def _read_one(self): while True: try: return self.ser.read() except serial.SerialTimeoutException as e: _logger.warn('Read timeout: %s', e) def handle_one(self, ser): b = self._read_one(ser) bi = ord(b) if Ctrl.is_control(b): # Check states if b == Ctrl.OUTOFSYNC: raise SerialStateError('RESET requested') elif b in (Ctrl.ACK1, Ctrl.ACK2): # Should never happen during normal operation. raise SerialStateError('Unexpected ACKs') elif self.data is not None: raise SerialStateError('Unexpected control during data stage') if b == Ctrl.Ping: ser.write(Ctrl.PONG) elif (bi & 0xF8) == 0x88: # DATA_LEN2 self.data_len2 = (bi & 7) << 5 else: # Now interpret data length if self.data_len2 is not None: if (bi & 0xA0) != 0xA0: raise SerialStateError('Expected DATA_LEN2 cont.') self.set_data_length((self.data_len2 | (bi - 0xA0)) + 1) self.data_len2 = None else: # DATA_LEN1 (strip upper byte and then -7 per spec.) self.set_data_length(bi - 0x87) return True # OK, next byte please else: # not control if self.data is None: raise SerialStateError('Got DATA while not expecting it') if b == Ctrl.DATA_ESCAPE: self.data_escape = True else: if self.data_escape: self.data_escape = False self.data += chr(0x80 | bi) else: self.data += b if len(self.data) == self.data_len: data = self.data self.data = None return data def set_data_length(self, len): if self.data is not None: raise SerialStateError('Got new data while not finished with old') self.data = b'' self.data_len = len state = None def serial_handshake(ser): """ :type ser: serial.Serial """ state = SerialState(ser) # reset attempt = 0 while True: attempt += 1 try: # Keep sending RESET1 until acknowledged. ser.write(Ctrl.RESET1) b = ser.read(1) if b != Ctrl.ACK1: _logger.warn('Got no ACK1 but %r for attempt %d - retrying', attempt, b) continue # Move forward to the RESET2 state. ser.write(Ctrl.RESET2) except serial.SerialTimeoutException as e: # Write error, just reset _logger.warn('Write timeout for attempt %d: %s', attempt, e) time.sleep(.2) # Avoid spinning, just to be sure. continue # Read byte (ignoring repeated ACK1). b = ser.read(1) while b == Ctrl.ACK1: _logger.warn('Got repeated ACK1 for attempt %d - skipping', attempt) b = ser.read(1) # If not ACK2, state is wrong, so start over. if b != Ctrl.ACK2: _logger.warn('Got no ACK2 but %r for attempt %d - retrying', attempt, b); continue break def handle_data_debug(data): ''' :param data: raw bytes as read from serial :type data: bytes ''' try: # Try to decode as ASCII data = data.decode('ascii') except: # if that fails, just take the representation. # b'Foo\xff' -> Foo\xff data = repr(data) if data.endswith("'"): data = data[:-1] if data.startswith("b'"): data = data[2:] elif data.startswith("'"): data = data[1:] _logger.warn('PI DEBUG: %s', data) def handle_serial(path): configure_serial(path) ser = serial.Serial(path, 9600, timeout=2) while True: # Reset to a known good state serial_handshake(ser) while True: try: data = state.handle_one(ser) if data and data.startswith(DATA_DEBUG_PREFIX): handle_data_debug(data[1:]) elif data: _logger.info('PI DATA: %r', data) except SerialStateError as e: _logger.warn('%s', e) def main(): while True: try: _logger.info("Waiting for serial device") while True: try: path = glob.glob('/dev/ttyACM*')[0] break except IndexError: # Wait for a device to appear... time.sleep(.5) _logger.info("Found %s", path) handle_serial(path) except Exception as e: _logger.error('%s', e) _logger.debug('%s', traceback.format_exc()) time.sleep(.5) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s: %(message)s', level=logging.DEBUG) if len(sys.argv) > 1: handle_serial(sys.argv[1]) else: main()