change cli interface and add some inline docstrings

pull/9/head
Jeremy Carbaugh 2012-11-02 16:04:04 -04:00
rodzic d2415f42c5
commit 90c0a7ecd6
1 zmienionych plików z 92 dodań i 50 usunięć

Wyświetl plik

@ -1,5 +1,8 @@
import logging
import urllib
import urllib2
import requests
import rd
__version__ = '0.2'
@ -26,21 +29,28 @@ UNOFFICIAL_ENDPOINTS = {
'twitter.com': 'twitter-webfinger.appspot.com',
}
logger = logging.getLogger("webfinger")
class WebFingerException(Exception):
pass
class WebFingerResponse(object):
""" Response that wraps an RD object. This class provides a `secure`
parameter to indicate whether the response was returned via HTTP or
HTTPS. It also provides attribute-style access to links for specific
rels, responding with the href attribute of the matched element.
"""
def __init__(self, xrd, insecure):
self.insecure = insecure
self._xrd = xrd
def __init__(self, rd, secure=False):
self.secure = secure
self.rd = rd
def __getattr__(self, name):
if name in RELS:
return self._xrd.find_link(RELS[name], attr='href')
return getattr(self._xrd, name)
return self.rd.find_link(RELS[name], attr='href')
return getattr(self.rd, name)
class WebFingerClient(object):
@ -48,42 +58,59 @@ class WebFingerClient(object):
def __init__(self, host, timeout=None, official=False):
self._host = host
self._official = official
self._opener = urllib2.build_opener(urllib2.HTTPRedirectHandler())
self._opener.addheaders = [('User-agent', 'python-webfinger')]
self._session = requests.session(
timeout=timeout,
headers={'User-Agent': 'python-webfinger'})
self._timeout = timeout
def _hm_hosts(self, xrd):
hosts = [e.value for e in xrd.elements if e.name == 'hm:Host']
def _hm_hosts(self, rd):
hosts = [e.value for e in rd.elements if e.name == 'hm:Host']
if not self._official and hosts and self._host in UNOFFICIAL_ENDPOINTS:
hosts.append(UNOFFICIAL_ENDPOINTS[self._host])
return hosts
def xrd(self, url, raw=False):
def rd(self, url, raw=False):
from xrd import XRD
resp = self._session.get(url)
content = resp.content
conn = self._opener.open(url, timeout=self._timeout)
response = conn.read()
conn.close()
return content if raw else rd.loads(content, resp.headers.get('Content-Type'))
return response if raw else XRD.parse(response)
def hostmeta(self, secure=True):
protocol = "https" if secure else "http"
logger.debug("hostmeta() protocol:%s" % protocol)
def hostmeta(self, protocol):
if not self._official and self._host in UNOFFICIAL_ENDPOINTS:
host = UNOFFICIAL_ENDPOINTS[self._host]
else:
host = self._host
hostmeta_url = "%s://%s/.well-known/host-meta" % (protocol, host)
return self.xrd(hostmeta_url)
def finger(self, username):
logger.debug("hostmeta() host:%s" % host)
hostmeta_url = "%s://%s/.well-known/host-meta" % (protocol, host)
logger.debug("hostmeta() loading:%s.json" % hostmeta_url)
resp = self._session.get("%s.json" % hostmeta_url) # attempt JRD
if resp.status_code == 404:
logger.debug("hostmeta() loading:%s" % hostmeta_url)
resp = self._session.get(hostmeta_url, headers={"Accept": "application/json"}) # fall back to XRD
if resp.status_code != 200:
raise WebFingerException("host-meta not found")
logger.debug("hostmeta() content_type:%s" % resp.headers.get('Content-Type'))
return rd.loads(resp.content, resp.headers.get('Content-Type'))
def finger(self, username, rel=None):
try:
hm = self.hostmeta('https')
insecure = False
except (urllib2.URLError, urllib2.HTTPError):
hm = self.hostmeta('http')
insecure = True
hm = self.hostmeta()
secure = True
except (requests.RequestException, requests.HTTPError):
hm = self.hostmeta(secure=False)
secure = False
hm_hosts = self._hm_hosts(hm)
@ -91,44 +118,59 @@ class WebFingerClient(object):
raise WebFingerException("hostmeta host did not match account host")
template = hm.find_link(WEBFINGER_TYPES, attr='template')
if not template.startswith('https://'):
insecure = True
xrd_url = template.replace('{uri}',
secure = False
rd_url = template.replace('{uri}',
urllib.quote_plus('acct:%s@%s' % (username, self._host)))
data = self.xrd(xrd_url)
return WebFingerResponse(data, insecure)
data = self.rd(rd_url)
return WebFingerResponse(data, secure)
def finger(identifier, timeout=None, official=False):
def finger(identifier, rel=None, timeout=None, official=False):
if identifier.startswith('acct:'):
(acct, identifier) = identifier.split(':', 1)
(username, host) = identifier.split('@')
client = WebFingerClient(host, timeout=timeout, official=official)
return client.finger(username)
return client.finger(username, rel=rel)
if __name__ == '__main__':
# example usage
import argparse
import sys
parser = argparse.ArgumentParser(description="Simple webfinger client.")
parser.add_argument("acct", metavar="URI", help="account URI")
parser.add_argument("-d", "--debug", dest="debug", action="store_true", help="print debug logging output to console")
parser.add_argument("-r", "--rel", metavar="REL", dest="rel", help="desired relation")
try:
acct = sys.argv[1]
except IndexError:
acct = "jcarbaugh@twitter.com"
args = parser.parse_args()
wf = finger(acct)
if args.debug:
logging.basicConfig(level=logging.DEBUG)
print "Activity Streams: ", wf.activity_streams
print "Avatar: ", wf.avatar
print "HCard: ", wf.hcard
print "OpenID: ", wf.open_id
print "Open Social: ", wf.opensocial
print "Profile: ", wf.profile
print "Portable Contacts: ", wf.portable_contacts
print "XFN: ", wf.find_link('http://gmpg.org/xfn/11', attr='href')
wf = finger(args.acct, rel=args.rel)
if wf.insecure:
print "*** Warning: Data was retrieved over an insecure connection"
if args.rel:
link = wf.find_link(args.rel)
if link is None:
print "*** Link not found for rel=%s" % args.rel
print "%s:\n\t%s" % (link.rel, link.href)
else:
print "Activity Streams: ", wf.activity_streams
print "Avatar: ", wf.avatar
print "HCard: ", wf.hcard
print "OpenID: ", wf.open_id
print "Open Social: ", wf.opensocial
print "Portable Contacts: ", wf.portable_contacts
print "Profile: ", wf.profile
print "XFN: ", wf.find_link("http://gmpg.org/xfn/11", attr="href")
if not wf.secure:
print "\n*** Warning: Data was retrieved over an insecure connection"