From 7e702305bf55123cc3b0f6485002ad5f076af00d Mon Sep 17 00:00:00 2001 From: Ryan Barrett Date: Mon, 1 Jan 2024 12:22:23 -0800 Subject: [PATCH] Web: add poll_feed_task for ingesting Atom/RSS feeds also demotes Web.atom to informational only, we're switching to populate feed entries into our_as1 because we can't easily extract individual items from feedparser. for #550 --- models.py | 13 +++++++-- tests/test_models.py | 33 +++++------------------ tests/test_web.py | 41 ++++++++++++++++++++++++---- web.py | 64 +++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 114 insertions(+), 37 deletions(-) diff --git a/models.py b/models.py index d20bce2..958cfb5 100644 --- a/models.py +++ b/models.py @@ -537,13 +537,19 @@ class Object(StringIdModel): # TODO: switch back to ndb.JsonProperty if/when they fix it for the web console # https://github.com/googleapis/python-ndb/issues/874 as2 = JsonProperty() # only one of the rest will be populated... - atom = ndb.TextProperty() # Atom XML, usually from Superfeedr bsky = JsonProperty() # Bluesky / AT Protocol mf2 = JsonProperty() # HTML microformats2 item (ie _not_ the top level # parse object with items inside an 'items' field) our_as1 = JsonProperty() # AS1 for activities that we generate or modify ourselves raw = JsonProperty() # other standalone data format, eg DID document + # these are full feeds with multiple items, not just this one, so they're + # stored as audit records only. they're not used in to_as1. for Atom/RSS + # based Objects, our_as1 will be populated with an feed_index top-level + # integer field that indexes into one of these. + atom = ndb.TextProperty() # Atom XML + rss = ndb.TextProperty() # RSS XML + deleted = ndb.BooleanProperty() delivered = ndb.StructuredProperty(Target, repeated=True) @@ -587,6 +593,8 @@ class Object(StringIdModel): if self.our_as1: obj = self.our_as1 + if self.atom or self.rss: + use_urls_as_ids(obj) elif self.as2: obj = as2.to_as1(self.as2) @@ -603,6 +611,7 @@ class Object(StringIdModel): rel_urls=self.mf2.get('rel-urls')) use_urls_as_ids(obj) + # TODO: remove once we drop superfeedr elif self.atom: obj = atom.atom_to_activity(self.atom)['object'] use_urls_as_ids(obj) @@ -687,7 +696,7 @@ class Object(StringIdModel): 'new': self.new, 'changed': self.changed, }) - for prop in 'as2', 'bsky', 'mf2', 'our_as1', 'raw': + for prop in 'as2', 'atom', 'bsky', 'mf2', 'our_as1', 'raw', 'rss': if props.get(prop): props[prop] = "..." for prop in 'created', 'updated', 'as1', 'expire': diff --git a/tests/test_models.py b/tests/test_models.py index 8acb6d4..7067e17 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -574,34 +574,13 @@ class ObjectTest(TestCase): self.assertEqual({'id': 'x', 'foo': 'bar'}, Object(id='x', our_as1={'foo': 'bar'}).as1) - def test_as1_from_atom(self): - self.assert_equals({ + def test_atom_url_overrides_id(self): + obj = { 'objectType': 'note', - 'id': 'http://user/post', - 'url': 'http://user/post', - 'content': 'I hereby ☕ reply.', - }, Object(atom="""\ - - -http://user/post -I hereby ☕ reply. - -""").as1) - - def test_as1_from_atom_url_overrides_id(self): - self.assert_equals({ - 'objectType': 'note', - 'id': 'http://user/post', - 'url': 'http://user/post', - 'content': 'I hereby ☕ reply.', - }, Object(atom="""\ - - -unused -http://user/post -I hereby ☕ reply. - -""").as1) + 'id': 'bad', + 'url': 'good', + } + self.assert_equals('good', Object(our_as1=obj, atom='trigger').as1['id']) @patch('requests.get', return_value=requests_response(DID_DOC)) def test_as1_from_bsky(self, mock_get): diff --git a/tests/test_web.py b/tests/test_web.py index 6ca9928..aa03df1 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -1829,19 +1829,50 @@ class WebTest(TestCase): logs.output) @patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task') - def test_superfeedr_notify_make_task(self, mock_create_task, *_): + def test_poll_feed_atom(self, mock_create_task, mock_get, _): common.RUN_TASKS_INLINE = False + self.user.obj.mf2 = ACTOR_MF2_REL_FEED_URL + self.user.obj.put() - got = self.post('/superfeedr/notify/user.com', data="""\ + feed = """\ https://user.com/post -I hereby ☕ post. +I hereby ☕ post -""", headers={'Content-Type': atom.CONTENT_TYPE}) +""" + mock_get.return_value = requests_response( + feed, headers={'Content-Type': atom.CONTENT_TYPE}) + + got = self.post('/queue/poll-feed', data={'domain': 'user.com'}) self.assertEqual(200, got.status_code) + + mock_get.assert_has_calls(( + self.req('https://foo/atom'), + )) + obj = self.assert_object('https://user.com/post', + users=[self.user.key], + source_protocol='web', + status='new', + atom=feed, + our_as1={ + 'objectType': 'activity', + 'verb': 'post', + 'id': 'https://user.com/post', + 'url': 'https://user.com/post', + 'object':{ + 'objectType': 'note', + 'id': 'https://user.com/post', + 'url': 'https://user.com/post', + 'content': 'I hereby ☕ post', + }, + }, + type='post', + object_ids=['https://user.com/post'], + labels=['user', 'activity'], + ) self.assert_task(mock_create_task, 'receive', '/queue/receive', - obj=Object(id='https://user.com/post').key.urlsafe(), + obj=obj.key.urlsafe(), authed_as='user.com') def test_superfeedr_notify_no_user(self, *_): diff --git a/web.py b/web.py index fea99d7..2e56db8 100644 --- a/web.py +++ b/web.py @@ -58,7 +58,10 @@ NON_TLDS = frozenset(( SUPERFEEDR_PUSH_API = 'https://push.superfeedr.com' SUPERFEEDR_USERNAME = util.read('superfeedr_username') SUPERFEEDR_TOKEN = util.read('superfeedr_token') -FEED_TYPES = [type.split(';')[0] for type in (atom.CONTENT_TYPE, rss.CONTENT_TYPE)] +FEED_TYPES = { + atom.CONTENT_TYPE.split(';')[0]: 'atom', + rss.CONTENT_TYPE.split(';')[0]: 'rss', +} def is_valid_domain(domain): @@ -620,7 +623,7 @@ def maybe_superfeedr_subscribe(user): # discover feed for url, info in user.obj.mf2.get('rel-urls', {}).items(): if ('alternate' in info.get('rels', []) - and info.get('type', '').split(';')[0] in FEED_TYPES): + and info.get('type', '').split(';')[0] in FEED_TYPES.keys()): break else: logger.info(f"User {user.key.id()} has no feed URL, can't subscribe") @@ -683,6 +686,57 @@ def maybe_superfeedr_unsubscribe(user): resp.raise_for_status() +@app.post(f'/queue/poll-feed') +def poll_feed_task(): + """Fetches a :class:`Web` site's feed and delivers new/updated posts. + + Params: + ``domain`` (str): key id of the :class:`Web` user + """ + user = Web.get_by_id(flask_util.get_required_param('domain')) + if not user: + error(f'No Web user found for domain {domain}', status=304) + + # discover feed URL + for url, info in user.obj.mf2.get('rel-urls', {}).items(): + if ('alternate' in info.get('rels', []) + and info.get('type', '').split(';')[0] in FEED_TYPES.keys()): + break + else: + msg = f"User {user.key.id()} has no feed URL, can't fetch feed" + logger.info(msg) + return msg + + # fetch feed + resp = util.requests_get(url) + content_type = resp.headers.get('Content-Type') + type = FEED_TYPES.get(content_type.split(';')[0]) + if type == 'atom': + activities = atom.atom_to_activities(resp.text) + elif type == 'rss': + activities = rss.to_activities(resp.text) + else: + msg = f'Unknown feed type {content_type}' + logger.info(msg) + return msg + + # create Objects and receive tasks + for activity in activities: + logger.info(f'Converted to AS1: {json_dumps(activity, indent=2)}') + + id = Object(our_as1=activity).as1.get('id') + if not id: + logger.warning('No id or URL!') + continue + + obj = Object.get_or_create(id=id, our_as1=activity, atom=resp.text, + source_protocol=Web.ABBREV, users=[user.key], + status='new') + common.create_task(queue='receive', obj=obj.key.urlsafe(), + authed_as=user.key.id()) + + return 'OK' + # generate/check per-user token for auth? # or https://documentation.superfeedr.com/subscribers.html#http-authentication ? @app.post(f'/superfeedr/notify/') @@ -728,7 +782,11 @@ def _superfeedr_notify(doc, user): @app.post('/queue/webmention') @cloud_tasks_only def webmention_task(): - """Handles inbound webmention task.""" + """Handles inbound webmention task. + + Params: + ``source`` (str): URL + """ logger.info(f'Params: {list(request.form.items())}') # load user