読者です 読者をやめる 読者になる 読者になる

Python 2.6 以降だと PySerial で '\r' が readline の delimiter に使えない

Python

PySerial では Python 2.6 以降の場合には、'\r' (CR) が readline のときの delimiter に使えません。つまり、装置側の出力に '\r' が delimiter として使われている場合、次の code だと '\n' (LF) を待ってしまうために time out してしまいます。

import serial
s = serial.Serial('/dev/ttyUSB0', ... )
s.write('SOME_COMMAND?')
s.readline()

これは、Python 2.6 以降の場合に PySerial が io.RawIOBase.readline を使うようになっているからです。io.RawIOBase は delimiter が '\n' に決め打ちなため、制御する装置側で delimiter の変更ができない場合は詰んじゃいます。

Python 2.5 以前の場合だと、PySerial は serialutil.FileLike という独自の file IO を使うため、delimiter を自分で設定することができました。

import serial
s = serial.Serial('/dev/ttyUSB0', ... )
s.write('SOME_COMMAND?')
s.readline(eol = '\r')

どうしても '\r' を delimiter に使いたい場合は、単純には serial.Serial.readline を override してしまいます。

class MySerial(serial.Serial):
    """
    Wrapper for Serial
    """
    try:
        import io
    except ImportError:
        # serial.Serial inherits serial.FileLike
        pass
    else:
        def readline(self):
            """
            Overrides io.RawIOBase.readline which cannot handle with '\r' delimiters
            """
            ret = ''
            while True:
                c = self.read(1)
                if c == '':
                    return ret
                elif c == '\r':
                    return ret + c
                else:
                    ret += c

単純に '\r' を読むまで 1 文字ずつ進めているだけですが、実用上は問題ないでしょう。

もう少し io.RawIOBase.readline に近づけたい場合は、それをそのまま改造してしまうのも手でしょう。例えば OS XPython 2.6 の場合、/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/io.py の中で次のように io.RawIOBase.readline が定義されているので、'\n' を '\r' に置き換えてしまえば OK のはずです。

    def readline(self, limit = -1):
        r"""Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.

        The line terminator is always b'\n' for binary files; for text
        files, the newlines argument to open can be used to select the line
        terminator(s) recognized.
        """
        self._checkClosed()
        if hasattr(self, "peek"):
            def nreadahead():
                readahead = self.peek(1)
                if not readahead:
                    return 1
                n = (readahead.find(b"\n") + 1) or len(readahead)
                if limit >= 0:
                    n = min(n, limit)
                return n
        else:
            def nreadahead():
                return 1
        if limit is None:
            limit = -1
        if not isinstance(limit, (int, long)):
            raise TypeError("limit must be an integer")
        res = bytearray()
        while limit < 0 or len(res) < limit:
            b = self.read(nreadahead())
            if not b:
                break
            res += b
            if res.endswith(b"\n"):
                break
        return bytes(res)