Web: add poll_feed_task for ingesting Atom/RSS feeds

also demotes Web.atom to informational only, we're switching to populate feed entries into our_as1 because we can't easily extract individual items from feedparser.

for #550
pull/777/head
Ryan Barrett 2024-01-01 12:22:23 -08:00
rodzic 4652ac49f4
commit 7e702305bf
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 6BE31FDF4776E9D4
4 zmienionych plików z 114 dodań i 37 usunięć

Wyświetl plik

@ -537,13 +537,19 @@ class Object(StringIdModel):
# TODO: switch back to ndb.JsonProperty if/when they fix it for the web console
# https://github.com/googleapis/python-ndb/issues/874
as2 = JsonProperty() # only one of the rest will be populated...
atom = ndb.TextProperty() # Atom XML, usually from Superfeedr
bsky = JsonProperty() # Bluesky / AT Protocol
mf2 = JsonProperty() # HTML microformats2 item (ie _not_ the top level
# parse object with items inside an 'items' field)
our_as1 = JsonProperty() # AS1 for activities that we generate or modify ourselves
raw = JsonProperty() # other standalone data format, eg DID document
# these are full feeds with multiple items, not just this one, so they're
# stored as audit records only. they're not used in to_as1. for Atom/RSS
# based Objects, our_as1 will be populated with an feed_index top-level
# integer field that indexes into one of these.
atom = ndb.TextProperty() # Atom XML
rss = ndb.TextProperty() # RSS XML
deleted = ndb.BooleanProperty()
delivered = ndb.StructuredProperty(Target, repeated=True)
@ -587,6 +593,8 @@ class Object(StringIdModel):
if self.our_as1:
obj = self.our_as1
if self.atom or self.rss:
use_urls_as_ids(obj)
elif self.as2:
obj = as2.to_as1(self.as2)
@ -603,6 +611,7 @@ class Object(StringIdModel):
rel_urls=self.mf2.get('rel-urls'))
use_urls_as_ids(obj)
# TODO: remove once we drop superfeedr
elif self.atom:
obj = atom.atom_to_activity(self.atom)['object']
use_urls_as_ids(obj)
@ -687,7 +696,7 @@ class Object(StringIdModel):
'new': self.new,
'changed': self.changed,
})
for prop in 'as2', 'bsky', 'mf2', 'our_as1', 'raw':
for prop in 'as2', 'atom', 'bsky', 'mf2', 'our_as1', 'raw', 'rss':
if props.get(prop):
props[prop] = "..."
for prop in 'created', 'updated', 'as1', 'expire':

Wyświetl plik

@ -574,34 +574,13 @@ class ObjectTest(TestCase):
self.assertEqual({'id': 'x', 'foo': 'bar'},
Object(id='x', our_as1={'foo': 'bar'}).as1)
def test_as1_from_atom(self):
self.assert_equals({
def test_atom_url_overrides_id(self):
obj = {
'objectType': 'note',
'id': 'http://user/post',
'url': 'http://user/post',
'content': 'I hereby ☕ reply.',
}, Object(atom="""\
<?xml version="1.0" encoding="UTF-8"?>
<entry xmlns="http://www.w3.org/2005/Atom">
<uri>http://user/post</uri>
<content>I hereby reply.</content>
</entry>
""").as1)
def test_as1_from_atom_url_overrides_id(self):
self.assert_equals({
'objectType': 'note',
'id': 'http://user/post',
'url': 'http://user/post',
'content': 'I hereby ☕ reply.',
}, Object(atom="""\
<?xml version="1.0" encoding="UTF-8"?>
<entry xmlns="http://www.w3.org/2005/Atom">
<id>unused</id>
<uri>http://user/post</uri>
<content>I hereby reply.</content>
</entry>
""").as1)
'id': 'bad',
'url': 'good',
}
self.assert_equals('good', Object(our_as1=obj, atom='trigger').as1['id'])
@patch('requests.get', return_value=requests_response(DID_DOC))
def test_as1_from_bsky(self, mock_get):

Wyświetl plik

@ -1829,19 +1829,50 @@ class WebTest(TestCase):
logs.output)
@patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task')
def test_superfeedr_notify_make_task(self, mock_create_task, *_):
def test_poll_feed_atom(self, mock_create_task, mock_get, _):
common.RUN_TASKS_INLINE = False
self.user.obj.mf2 = ACTOR_MF2_REL_FEED_URL
self.user.obj.put()
got = self.post('/superfeedr/notify/user.com', data="""\
feed = """\
<?xml version="1.0" encoding="UTF-8"?>
<entry xmlns="http://www.w3.org/2005/Atom">
<uri>https://user.com/post</uri>
<content>I hereby post.</content>
<content>I hereby post</content>
</entry>
""", headers={'Content-Type': atom.CONTENT_TYPE})
"""
mock_get.return_value = requests_response(
feed, headers={'Content-Type': atom.CONTENT_TYPE})
got = self.post('/queue/poll-feed', data={'domain': 'user.com'})
self.assertEqual(200, got.status_code)
mock_get.assert_has_calls((
self.req('https://foo/atom'),
))
obj = self.assert_object('https://user.com/post',
users=[self.user.key],
source_protocol='web',
status='new',
atom=feed,
our_as1={
'objectType': 'activity',
'verb': 'post',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'object':{
'objectType': 'note',
'id': 'https://user.com/post',
'url': 'https://user.com/post',
'content': 'I hereby ☕ post',
},
},
type='post',
object_ids=['https://user.com/post'],
labels=['user', 'activity'],
)
self.assert_task(mock_create_task, 'receive', '/queue/receive',
obj=Object(id='https://user.com/post').key.urlsafe(),
obj=obj.key.urlsafe(),
authed_as='user.com')
def test_superfeedr_notify_no_user(self, *_):

64
web.py
Wyświetl plik

@ -58,7 +58,10 @@ NON_TLDS = frozenset((
SUPERFEEDR_PUSH_API = 'https://push.superfeedr.com'
SUPERFEEDR_USERNAME = util.read('superfeedr_username')
SUPERFEEDR_TOKEN = util.read('superfeedr_token')
FEED_TYPES = [type.split(';')[0] for type in (atom.CONTENT_TYPE, rss.CONTENT_TYPE)]
FEED_TYPES = {
atom.CONTENT_TYPE.split(';')[0]: 'atom',
rss.CONTENT_TYPE.split(';')[0]: 'rss',
}
def is_valid_domain(domain):
@ -620,7 +623,7 @@ def maybe_superfeedr_subscribe(user):
# discover feed
for url, info in user.obj.mf2.get('rel-urls', {}).items():
if ('alternate' in info.get('rels', [])
and info.get('type', '').split(';')[0] in FEED_TYPES):
and info.get('type', '').split(';')[0] in FEED_TYPES.keys()):
break
else:
logger.info(f"User {user.key.id()} has no feed URL, can't subscribe")
@ -683,6 +686,57 @@ def maybe_superfeedr_unsubscribe(user):
resp.raise_for_status()
@app.post(f'/queue/poll-feed')
def poll_feed_task():
"""Fetches a :class:`Web` site's feed and delivers new/updated posts.
Params:
``domain`` (str): key id of the :class:`Web` user
"""
user = Web.get_by_id(flask_util.get_required_param('domain'))
if not user:
error(f'No Web user found for domain {domain}', status=304)
# discover feed URL
for url, info in user.obj.mf2.get('rel-urls', {}).items():
if ('alternate' in info.get('rels', [])
and info.get('type', '').split(';')[0] in FEED_TYPES.keys()):
break
else:
msg = f"User {user.key.id()} has no feed URL, can't fetch feed"
logger.info(msg)
return msg
# fetch feed
resp = util.requests_get(url)
content_type = resp.headers.get('Content-Type')
type = FEED_TYPES.get(content_type.split(';')[0])
if type == 'atom':
activities = atom.atom_to_activities(resp.text)
elif type == 'rss':
activities = rss.to_activities(resp.text)
else:
msg = f'Unknown feed type {content_type}'
logger.info(msg)
return msg
# create Objects and receive tasks
for activity in activities:
logger.info(f'Converted to AS1: {json_dumps(activity, indent=2)}')
id = Object(our_as1=activity).as1.get('id')
if not id:
logger.warning('No id or URL!')
continue
obj = Object.get_or_create(id=id, our_as1=activity, atom=resp.text,
source_protocol=Web.ABBREV, users=[user.key],
status='new')
common.create_task(queue='receive', obj=obj.key.urlsafe(),
authed_as=user.key.id())
return 'OK'
# generate/check per-user token for auth?
# or https://documentation.superfeedr.com/subscribers.html#http-authentication ?
@app.post(f'/superfeedr/notify/<regex("{DOMAIN_RE}"):domain>')
@ -728,7 +782,11 @@ def _superfeedr_notify(doc, user):
@app.post('/queue/webmention')
@cloud_tasks_only
def webmention_task():
"""Handles inbound webmention task."""
"""Handles inbound webmention task.
Params:
``source`` (str): URL
"""
logger.info(f'Params: {list(request.form.items())}')
# load user