diff --git a/quopri/quopri.py b/quopri/quopri.py new file mode 100644 index 00000000..3d0f0ac0 --- /dev/null +++ b/quopri/quopri.py @@ -0,0 +1,244 @@ +#! /usr/bin/env python3 + +"""Conversions to/from quoted-printable transport encoding as per RFC 1521.""" + +# (Dec 1991 version). + +__all__ = ["encode", "decode", "encodestring", "decodestring"] + +ESCAPE = b'=' +MAXLINESIZE = 76 +HEX = b'0123456789ABCDEF' +EMPTYSTRING = b'' + +try: + from binascii import a2b_qp, b2a_qp +except ImportError: + a2b_qp = None + b2a_qp = None + + +def needsquoting(c, quotetabs, header): + """Decide whether a particular byte ordinal needs to be quoted. + + The 'quotetabs' flag indicates whether embedded tabs and spaces should be + quoted. Note that line-ending tabs and spaces are always encoded, as per + RFC 1521. + """ + assert isinstance(c, bytes) + if c in b' \t': + return quotetabs + # if header, we have to escape _ because _ is used to escape space + if c == b'_': + return header + return c == ESCAPE or not (b' ' <= c <= b'~') + +def quote(c): + """Quote a single character.""" + assert isinstance(c, bytes) and len(c)==1 + c = ord(c) + return ESCAPE + bytes((HEX[c//16], HEX[c%16])) + + + +def encode(input, output, quotetabs, header=False): + """Read 'input', apply quoted-printable encoding, and write to 'output'. + + 'input' and 'output' are files with readline() and write() methods. + The 'quotetabs' flag indicates whether embedded tabs and spaces should be + quoted. Note that line-ending tabs and spaces are always encoded, as per + RFC 1521. + The 'header' flag indicates whether we are encoding spaces as _ as per + RFC 1522. + """ + + if b2a_qp is not None: + data = input.read() + odata = b2a_qp(data, quotetabs=quotetabs, header=header) + output.write(odata) + return + + def write(s, output=output, lineEnd=b'\n'): + # RFC 1521 requires that the line ending in a space or tab must have + # that trailing character encoded. + if s and s[-1:] in b' \t': + output.write(s[:-1] + quote(s[-1:]) + lineEnd) + elif s == b'.': + output.write(quote(s) + lineEnd) + else: + output.write(s + lineEnd) + + prevline = None + while 1: + line = input.readline() + if not line: + break + outline = [] + # Strip off any readline induced trailing newline + stripped = b'' + if line[-1:] == b'\n': + line = line[:-1] + stripped = b'\n' + # Calculate the un-length-limited encoded line + for c in line: + c = bytes((c,)) + if needsquoting(c, quotetabs, header): + c = quote(c) + if header and c == b' ': + outline.append(b'_') + else: + outline.append(c) + # First, write out the previous line + if prevline is not None: + write(prevline) + # Now see if we need any soft line breaks because of RFC-imposed + # length limitations. Then do the thisline->prevline dance. + thisline = EMPTYSTRING.join(outline) + while len(thisline) > MAXLINESIZE: + # Don't forget to include the soft line break `=' sign in the + # length calculation! + write(thisline[:MAXLINESIZE-1], lineEnd=b'=\n') + thisline = thisline[MAXLINESIZE-1:] + # Write out the current line + prevline = thisline + # Write out the last line, without a trailing newline + if prevline is not None: + write(prevline, lineEnd=stripped) + +def encodestring(s, quotetabs=False, header=False): + if b2a_qp is not None: + return b2a_qp(s, quotetabs=quotetabs, header=header) + from io import BytesIO + infp = BytesIO(s) + outfp = BytesIO() + encode(infp, outfp, quotetabs, header) + return outfp.getvalue() + + + +def decode(input, output, header=False): + """Read 'input', apply quoted-printable decoding, and write to 'output'. + 'input' and 'output' are files with readline() and write() methods. + If 'header' is true, decode underscore as space (per RFC 1522).""" + + if a2b_qp is not None: + data = input.read() + odata = a2b_qp(data, header=header) + output.write(odata) + return + + new = b'' + while 1: + line = input.readline() + if not line: break + i, n = 0, len(line) + if n > 0 and line[n-1:n] == b'\n': + partial = 0; n = n-1 + # Strip trailing whitespace + while n > 0 and line[n-1:n] in b" \t\r": + n = n-1 + else: + partial = 1 + while i < n: + c = line[i:i+1] + if c == b'_' and header: + new = new + b' '; i = i+1 + elif c != ESCAPE: + new = new + c; i = i+1 + elif i+1 == n and not partial: + partial = 1; break + elif i+1 < n and line[i+1] == ESCAPE: + new = new + ESCAPE; i = i+2 + elif i+2 < n and ishex(line[i+1:i+2]) and ishex(line[i+2:i+3]): + new = new + bytes((unhex(line[i+1:i+3]),)); i = i+3 + else: # Bad escape sequence -- leave it in + new = new + c; i = i+1 + if not partial: + output.write(new + b'\n') + new = b'' + if new: + output.write(new) + +def decodestring(s, header=False): + if a2b_qp is not None: + return a2b_qp(s, header=header) + from io import BytesIO + infp = BytesIO(s) + outfp = BytesIO() + decode(infp, outfp, header=header) + return outfp.getvalue() + + + +# Other helper functions +def ishex(c): + """Return true if the byte ordinal 'c' is a hexadecimal digit in ASCII.""" + assert isinstance(c, bytes) + return b'0' <= c <= b'9' or b'a' <= c <= b'f' or b'A' <= c <= b'F' + +def unhex(s): + """Get the integer value of a hexadecimal number.""" + bits = 0 + for c in s: + c = bytes((c,)) + if b'0' <= c <= b'9': + i = ord('0') + elif b'a' <= c <= b'f': + i = ord('a')-10 + elif b'A' <= c <= b'F': + i = ord(b'A')-10 + else: + assert False, "non-hex digit "+repr(c) + bits = bits*16 + (ord(c) - i) + return bits + + + +def main(): + import sys + import getopt + try: + opts, args = getopt.getopt(sys.argv[1:], 'td') + except getopt.error as msg: + sys.stdout = sys.stderr + print(msg) + print("usage: quopri [-t | -d] [file] ...") + print("-t: quote tabs") + print("-d: decode; default encode") + sys.exit(2) + deco = 0 + tabs = 0 + for o, a in opts: + if o == '-t': tabs = 1 + if o == '-d': deco = 1 + if tabs and deco: + sys.stdout = sys.stderr + print("-t and -d are mutually exclusive") + sys.exit(2) + if not args: args = ['-'] + sts = 0 + for file in args: + if file == '-': + fp = sys.stdin.buffer + else: + try: + fp = open(file, "rb") + except IOError as msg: + sys.stderr.write("%s: can't open (%s)\n" % (file, msg)) + sts = 1 + continue + try: + if deco: + decode(fp, sys.stdout.buffer) + else: + encode(fp, sys.stdout.buffer, tabs) + finally: + if file != '-': + fp.close() + if sts: + sys.exit(sts) + + + +if __name__ == '__main__': + main() diff --git a/quopri/test_quopri.py b/quopri/test_quopri.py new file mode 100644 index 00000000..583fd456 --- /dev/null +++ b/quopri/test_quopri.py @@ -0,0 +1,208 @@ +from test import support +import unittest + +import sys, os, io, subprocess +import quopri + + + +ENCSAMPLE = b"""\ +Here's a bunch of special=20 + +=A1=A2=A3=A4=A5=A6=A7=A8=A9 +=AA=AB=AC=AD=AE=AF=B0=B1=B2=B3 +=B4=B5=B6=B7=B8=B9=BA=BB=BC=BD=BE +=BF=C0=C1=C2=C3=C4=C5=C6 +=C7=C8=C9=CA=CB=CC=CD=CE=CF +=D0=D1=D2=D3=D4=D5=D6=D7 +=D8=D9=DA=DB=DC=DD=DE=DF +=E0=E1=E2=E3=E4=E5=E6=E7 +=E8=E9=EA=EB=EC=ED=EE=EF +=F0=F1=F2=F3=F4=F5=F6=F7 +=F8=F9=FA=FB=FC=FD=FE=FF + +characters... have fun! +""" + +# First line ends with a space +DECSAMPLE = b"Here's a bunch of special \n" + \ +b"""\ + +\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9 +\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3 +\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe +\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6 +\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf +\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7 +\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf +\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7 +\xe8\xe9\xea\xeb\xec\xed\xee\xef +\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7 +\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff + +characters... have fun! +""" + + +def withpythonimplementation(testfunc): + def newtest(self): + # Test default implementation + testfunc(self) + # Test Python implementation + if quopri.b2a_qp is not None or quopri.a2b_qp is not None: + oldencode = quopri.b2a_qp + olddecode = quopri.a2b_qp + try: + quopri.b2a_qp = None + quopri.a2b_qp = None + testfunc(self) + finally: + quopri.b2a_qp = oldencode + quopri.a2b_qp = olddecode + newtest.__name__ = testfunc.__name__ + return newtest + +class QuopriTestCase(unittest.TestCase): + # Each entry is a tuple of (plaintext, encoded string). These strings are + # used in the "quotetabs=0" tests. + STRINGS = ( + # Some normal strings + (b'hello', b'hello'), + (b'''hello + there + world''', b'''hello + there + world'''), + (b'''hello + there + world +''', b'''hello + there + world +'''), + (b'\201\202\203', b'=81=82=83'), + # Add some trailing MUST QUOTE strings + (b'hello ', b'hello=20'), + (b'hello\t', b'hello=09'), + # Some long lines. First, a single line of 108 characters + (b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\xd8\xd9\xda\xdb\xdc\xdd\xde\xdfxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', + b'''xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=D8=D9=DA=DB=DC=DD=DE=DFx= +xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'''), + # A line of exactly 76 characters, no soft line break should be needed + (b'yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy', + b'yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy'), + # A line of 77 characters, forcing a soft line break at position 75, + # and a second line of exactly 2 characters (because the soft line + # break `=' sign counts against the line length limit). + (b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz', + b'''zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz= +zz'''), + # A line of 151 characters, forcing a soft line break at position 75, + # with a second line of exactly 76 characters and no trailing = + (b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz', + b'''zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz= +zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz'''), + # A string containing a hard line break, but which the first line is + # 151 characters and the second line is exactly 76 characters. This + # should leave us with three lines, the first which has a soft line + # break, and which the second and third do not. + (b'''yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy +zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz''', + b'''yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy= +yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy +zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz'''), + # Now some really complex stuff ;) + (DECSAMPLE, ENCSAMPLE), + ) + + # These are used in the "quotetabs=1" tests. + ESTRINGS = ( + (b'hello world', b'hello=20world'), + (b'hello\tworld', b'hello=09world'), + ) + + # These are used in the "header=1" tests. + HSTRINGS = ( + (b'hello world', b'hello_world'), + (b'hello_world', b'hello=5Fworld'), + ) + + @withpythonimplementation + def test_encodestring(self): + for p, e in self.STRINGS: + self.assertEqual(quopri.encodestring(p), e) + + @withpythonimplementation + def test_decodestring(self): + for p, e in self.STRINGS: + self.assertEqual(quopri.decodestring(e), p) + + @withpythonimplementation + def test_idempotent_string(self): + for p, e in self.STRINGS: + self.assertEqual(quopri.decodestring(quopri.encodestring(e)), e) + + @withpythonimplementation + def test_encode(self): + for p, e in self.STRINGS: + infp = io.BytesIO(p) + outfp = io.BytesIO() + quopri.encode(infp, outfp, quotetabs=False) + self.assertEqual(outfp.getvalue(), e) + + @withpythonimplementation + def test_decode(self): + for p, e in self.STRINGS: + infp = io.BytesIO(e) + outfp = io.BytesIO() + quopri.decode(infp, outfp) + self.assertEqual(outfp.getvalue(), p) + + @withpythonimplementation + def test_embedded_ws(self): + for p, e in self.ESTRINGS: + self.assertEqual(quopri.encodestring(p, quotetabs=True), e) + self.assertEqual(quopri.decodestring(e), p) + + @withpythonimplementation + def test_encode_header(self): + for p, e in self.HSTRINGS: + self.assertEqual(quopri.encodestring(p, header=True), e) + + @withpythonimplementation + def test_decode_header(self): + for p, e in self.HSTRINGS: + self.assertEqual(quopri.decodestring(e, header=True), p) + + def test_scriptencode(self): + (p, e) = self.STRINGS[-1] + process = subprocess.Popen([sys.executable, "-mquopri"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + self.addCleanup(process.stdout.close) + cout, cerr = process.communicate(p) + # On Windows, Python will output the result to stdout using + # CRLF, as the mode of stdout is text mode. To compare this + # with the expected result, we need to do a line-by-line comparison. + cout = cout.decode('latin-1').splitlines() + e = e.decode('latin-1').splitlines() + assert len(cout)==len(e) + for i in range(len(cout)): + self.assertEqual(cout[i], e[i]) + self.assertEqual(cout, e) + + def test_scriptdecode(self): + (p, e) = self.STRINGS[-1] + process = subprocess.Popen([sys.executable, "-mquopri", "-d"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + self.addCleanup(process.stdout.close) + cout, cerr = process.communicate(e) + cout = cout.decode('latin-1') + p = p.decode('latin-1') + self.assertEqual(cout.splitlines(), p.splitlines()) + +def test_main(): + support.run_unittest(QuopriTestCase) + + +if __name__ == "__main__": + test_main()