# Copyright (C) 2002-2007 Python Software Foundation # Contact: email-sig@python.org """Email address parsing code. Lifted directly from rfc822.py. This should eventually be rewritten. """ __all__ = [ "mktime_tz", "parsedate", "parsedate_tz", "quote", ] import time, calendar SPACE = " " EMPTYSTRING = "" COMMASPACE = ", " # Parse a date field _monthnames = [ "jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec", "january", "february", "march", "april", "may", "june", "july", "august", "september", "october", "november", "december", ] _daynames = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"] # The timezone table does not include the military time zones defined # in RFC822, other than Z. According to RFC1123, the description in # RFC822 gets the signs wrong, so we can't rely on any such time # zones. RFC1123 recommends that numeric timezone indicators be used # instead of timezone names. _timezones = { "UT": 0, "UTC": 0, "GMT": 0, "Z": 0, "AST": -400, "ADT": -300, # Atlantic (used in Canada) "EST": -500, "EDT": -400, # Eastern "CST": -600, "CDT": -500, # Central "MST": -700, "MDT": -600, # Mountain "PST": -800, "PDT": -700, # Pacific } def parsedate_tz(data): """Convert a date string to a time tuple. Accounts for military timezones. """ res = _parsedate_tz(data) if not res: return if res[9] is None: res[9] = 0 return tuple(res) def _parsedate_tz(data): """Convert date to extended time tuple. The last (additional) element is the time zone offset in seconds, except if the timezone was specified as -0000. In that case the last element is None. This indicates a UTC timestamp that explicitly declaims knowledge of the source timezone, as opposed to a +0000 timestamp that indicates the source timezone really was UTC. """ if not data: return data = data.split() # The FWS after the comma after the day-of-week is optional, so search and # adjust for this. if data[0].endswith(",") or data[0].lower() in _daynames: # There's a dayname here. Skip it del data[0] else: i = data[0].rfind(",") if i >= 0: data[0] = data[0][i + 1 :] if len(data) == 3: # RFC 850 date, deprecated stuff = data[0].split("-") if len(stuff) == 3: data = stuff + data[1:] if len(data) == 4: s = data[3] i = s.find("+") if i == -1: i = s.find("-") if i > 0: data[3:] = [s[:i], s[i:]] else: data.append("") # Dummy tz if len(data) < 5: return None data = data[:5] [dd, mm, yy, tm, tz] = data mm = mm.lower() if mm not in _monthnames: dd, mm = mm, dd.lower() if mm not in _monthnames: return None mm = _monthnames.index(mm) + 1 if mm > 12: mm -= 12 if dd[-1] == ",": dd = dd[:-1] i = yy.find(":") if i > 0: yy, tm = tm, yy if yy[-1] == ",": yy = yy[:-1] if not yy[0].isdigit(): yy, tz = tz, yy if tm[-1] == ",": tm = tm[:-1] tm = tm.split(":") if len(tm) == 2: [thh, tmm] = tm tss = "0" elif len(tm) == 3: [thh, tmm, tss] = tm elif len(tm) == 1 and "." in tm[0]: # Some non-compliant MUAs use '.' to separate time elements. tm = tm[0].split(".") if len(tm) == 2: [thh, tmm] = tm tss = 0 elif len(tm) == 3: [thh, tmm, tss] = tm else: return None try: yy = int(yy) dd = int(dd) thh = int(thh) tmm = int(tmm) tss = int(tss) except ValueError: return None # Check for a yy specified in two-digit format, then convert it to the # appropriate four-digit format, according to the POSIX standard. RFC 822 # calls for a two-digit yy, but RFC 2822 (which obsoletes RFC 822) # mandates a 4-digit yy. For more information, see the documentation for # the time module. if yy < 100: # The year is between 1969 and 1999 (inclusive). if yy > 68: yy += 1900 # The year is between 2000 and 2068 (inclusive). else: yy += 2000 tzoffset = None tz = tz.upper() if tz in _timezones: tzoffset = _timezones[tz] else: try: tzoffset = int(tz) except ValueError: pass if tzoffset == 0 and tz.startswith("-"): tzoffset = None # Convert a timezone offset into seconds ; -0500 -> -18000 if tzoffset: if tzoffset < 0: tzsign = -1 tzoffset = -tzoffset else: tzsign = 1 tzoffset = tzsign * ((tzoffset // 100) * 3600 + (tzoffset % 100) * 60) # Daylight Saving Time flag is set to -1, since DST is unknown. return [yy, mm, dd, thh, tmm, tss, 0, 1, -1, tzoffset] def parsedate(data): """Convert a time string to a time tuple.""" t = parsedate_tz(data) if isinstance(t, tuple): return t[:9] else: return t def mktime_tz(data): """Turn a 10-tuple as returned by parsedate_tz() into a POSIX timestamp.""" if data[9] is None: # No zone info, so localtime is better assumption than GMT return time.mktime(data[:8] + (-1,)) else: t = calendar.timegm(data) return t - data[9] def quote(str): """Prepare string to be used in a quoted string. Turns backslash and double quote characters into quoted pairs. These are the only characters that need to be quoted inside a quoted string. Does not add the surrounding double quotes. """ return str.replace("\\", "\\\\").replace('"', '\\"') class AddrlistClass: """Address parser class by Ben Escoto. To understand what this class does, it helps to have a copy of RFC 2822 in front of you. Note: this class interface is deprecated and may be removed in the future. Use email.utils.AddressList instead. """ def __init__(self, field): """Initialize a new instance. `field' is an unparsed address header field, containing one or more addresses. """ self.specials = '()<>@,:;."[]' self.pos = 0 self.LWS = " \t" self.CR = "\r\n" self.FWS = self.LWS + self.CR self.atomends = self.specials + self.LWS + self.CR # Note that RFC 2822 now specifies `.' as obs-phrase, meaning that it # is obsolete syntax. RFC 2822 requires that we recognize obsolete # syntax, so allow dots in phrases. self.phraseends = self.atomends.replace(".", "") self.field = field self.commentlist = [] def gotonext(self): """Skip white space and extract comments.""" wslist = [] while self.pos < len(self.field): if self.field[self.pos] in self.LWS + "\n\r": if self.field[self.pos] not in "\n\r": wslist.append(self.field[self.pos]) self.pos += 1 elif self.field[self.pos] == "(": self.commentlist.append(self.getcomment()) else: break return EMPTYSTRING.join(wslist) def getaddrlist(self): """Parse all addresses. Returns a list containing all of the addresses. """ result = [] while self.pos < len(self.field): ad = self.getaddress() if ad: result += ad else: result.append(("", "")) return result def getaddress(self): """Parse the next address.""" self.commentlist = [] self.gotonext() oldpos = self.pos oldcl = self.commentlist plist = self.getphraselist() self.gotonext() returnlist = [] if self.pos >= len(self.field): # Bad email address technically, no domain. if plist: returnlist = [(SPACE.join(self.commentlist), plist[0])] elif self.field[self.pos] in ".@": # email address is just an addrspec # this isn't very efficient since we start over self.pos = oldpos self.commentlist = oldcl addrspec = self.getaddrspec() returnlist = [(SPACE.join(self.commentlist), addrspec)] elif self.field[self.pos] == ":": # address is a group returnlist = [] fieldlen = len(self.field) self.pos += 1 while self.pos < len(self.field): self.gotonext() if self.pos < fieldlen and self.field[self.pos] == ";": self.pos += 1 break returnlist = returnlist + self.getaddress() elif self.field[self.pos] == "<": # Address is a phrase then a route addr routeaddr = self.getrouteaddr() if self.commentlist: returnlist = [ (SPACE.join(plist) + " (" + " ".join(self.commentlist) + ")", routeaddr) ] else: returnlist = [(SPACE.join(plist), routeaddr)] else: if plist: returnlist = [(SPACE.join(self.commentlist), plist[0])] elif self.field[self.pos] in self.specials: self.pos += 1 self.gotonext() if self.pos < len(self.field) and self.field[self.pos] == ",": self.pos += 1 return returnlist def getrouteaddr(self): """Parse a route address (Return-path value). This method just skips all the route stuff and returns the addrspec. """ if self.field[self.pos] != "<": return expectroute = False self.pos += 1 self.gotonext() adlist = "" while self.pos < len(self.field): if expectroute: self.getdomain() expectroute = False elif self.field[self.pos] == ">": self.pos += 1 break elif self.field[self.pos] == "@": self.pos += 1 expectroute = True elif self.field[self.pos] == ":": self.pos += 1 else: adlist = self.getaddrspec() self.pos += 1 break self.gotonext() return adlist def getaddrspec(self): """Parse an RFC 2822 addr-spec.""" aslist = [] self.gotonext() while self.pos < len(self.field): preserve_ws = True if self.field[self.pos] == ".": if aslist and not aslist[-1].strip(): aslist.pop() aslist.append(".") self.pos += 1 preserve_ws = False elif self.field[self.pos] == '"': aslist.append('"%s"' % quote(self.getquote())) elif self.field[self.pos] in self.atomends: if aslist and not aslist[-1].strip(): aslist.pop() break else: aslist.append(self.getatom()) ws = self.gotonext() if preserve_ws and ws: aslist.append(ws) if self.pos >= len(self.field) or self.field[self.pos] != "@": return EMPTYSTRING.join(aslist) aslist.append("@") self.pos += 1 self.gotonext() return EMPTYSTRING.join(aslist) + self.getdomain() def getdomain(self): """Get the complete domain name from an address.""" sdlist = [] while self.pos < len(self.field): if self.field[self.pos] in self.LWS: self.pos += 1 elif self.field[self.pos] == "(": self.commentlist.append(self.getcomment()) elif self.field[self.pos] == "[": sdlist.append(self.getdomainliteral()) elif self.field[self.pos] == ".": self.pos += 1 sdlist.append(".") elif self.field[self.pos] in self.atomends: break else: sdlist.append(self.getatom()) return EMPTYSTRING.join(sdlist) def getdelimited(self, beginchar, endchars, allowcomments=True): """Parse a header fragment delimited by special characters. `beginchar' is the start character for the fragment. If self is not looking at an instance of `beginchar' then getdelimited returns the empty string. `endchars' is a sequence of allowable end-delimiting characters. Parsing stops when one of these is encountered. If `allowcomments' is non-zero, embedded RFC 2822 comments are allowed within the parsed fragment. """ if self.field[self.pos] != beginchar: return "" slist = [""] quote = False self.pos += 1 while self.pos < len(self.field): if quote: slist.append(self.field[self.pos]) quote = False elif self.field[self.pos] in endchars: self.pos += 1 break elif allowcomments and self.field[self.pos] == "(": slist.append(self.getcomment()) continue # have already advanced pos from getcomment elif self.field[self.pos] == "\\": quote = True else: slist.append(self.field[self.pos]) self.pos += 1 return EMPTYSTRING.join(slist) def getquote(self): """Get a quote-delimited fragment from self's field.""" return self.getdelimited('"', '"\r', False) def getcomment(self): """Get a parenthesis-delimited fragment from self's field.""" return self.getdelimited("(", ")\r", True) def getdomainliteral(self): """Parse an RFC 2822 domain-literal.""" return "[%s]" % self.getdelimited("[", "]\r", False) def getatom(self, atomends=None): """Parse an RFC 2822 atom. Optional atomends specifies a different set of end token delimiters (the default is to use self.atomends). This is used e.g. in getphraselist() since phrase endings must not include the `.' (which is legal in phrases).""" atomlist = [""] if atomends is None: atomends = self.atomends while self.pos < len(self.field): if self.field[self.pos] in atomends: break else: atomlist.append(self.field[self.pos]) self.pos += 1 return EMPTYSTRING.join(atomlist) def getphraselist(self): """Parse a sequence of RFC 2822 phrases. A phrase is a sequence of words, which are in turn either RFC 2822 atoms or quoted-strings. Phrases are canonicalized by squeezing all runs of continuous whitespace into one space. """ plist = [] while self.pos < len(self.field): if self.field[self.pos] in self.FWS: self.pos += 1 elif self.field[self.pos] == '"': plist.append(self.getquote()) elif self.field[self.pos] == "(": self.commentlist.append(self.getcomment()) elif self.field[self.pos] in self.phraseends: break else: plist.append(self.getatom(self.phraseends)) return plist class AddressList(AddrlistClass): """An AddressList encapsulates a list of parsed RFC 2822 addresses.""" def __init__(self, field): AddrlistClass.__init__(self, field) if field: self.addresslist = self.getaddrlist() else: self.addresslist = [] def __len__(self): return len(self.addresslist) def __add__(self, other): # Set union newaddr = AddressList(None) newaddr.addresslist = self.addresslist[:] for x in other.addresslist: if not x in self.addresslist: newaddr.addresslist.append(x) return newaddr def __iadd__(self, other): # Set union, in-place for x in other.addresslist: if not x in self.addresslist: self.addresslist.append(x) return self def __sub__(self, other): # Set difference newaddr = AddressList(None) for x in self.addresslist: if not x in other.addresslist: newaddr.addresslist.append(x) return newaddr def __isub__(self, other): # Set difference, in-place for x in other.addresslist: if x in self.addresslist: self.addresslist.remove(x) return self def __getitem__(self, index): # Make indexing, slices, and 'in' work return self.addresslist[index]