From 2a0bbf0d5da9e8cb76fc5093b11f2bdf876f5b49 Mon Sep 17 00:00:00 2001 From: Osma Ahvenlampi Date: Sun, 27 Aug 2023 00:16:14 +0300 Subject: [PATCH] One more try to get the fetch_account/sync_pins/post relationship and parallelism fixed (#634) --- activities/models/post.py | 50 ++++++++++++++++++++------------------- users/models/identity.py | 33 +++++++++++++++++++++----- 2 files changed, 53 insertions(+), 30 deletions(-) diff --git a/activities/models/post.py b/activities/models/post.py index 7d46463..2cb3be7 100644 --- a/activities/models/post.py +++ b/activities/models/post.py @@ -834,25 +834,20 @@ class Post(StatorModel): # If the author is not fetched yet, try again later if author.domain is None: if fetch_author: - author.fetch_actor() - # perhaps the entire "try again" logic below - # could be replaced with TryAgainLater for - # _all_ fetches, to let it handle pinned posts? - if author.domain is None: + if not author.fetch_actor() or author.domain is None: raise TryAgainLater() else: raise TryAgainLater() # If the post is from a blocked domain, stop and drop if author.domain.recursively_blocked(): raise cls.DoesNotExist("Post is from a blocked domain") + # parallelism may cause another simultaneous worker thread + # to try to create the same post - so watch for that and + # try to avoid failing the entire transaction try: - # try again, because fetch_actor() also fetches pinned posts - post = cls.objects.select_related("author__domain").get( - object_uri=data["id"] - ) - except cls.DoesNotExist: - # finally, create a stub - try: + # wrapped in a transaction to avoid breaking the outer + # transaction + with transaction.atomic(): post = cls.objects.create( object_uri=data["id"], author=author, @@ -861,24 +856,23 @@ class Post(StatorModel): type=data["type"], ) created = True - except IntegrityError as dupe: - # there's still some kind of race condition here - # it's far more rare, but sometimes we fire an - # IntegrityError on activities_post_object_uri_key - # this transaction is now aborted and anything following - # in the caller function will fail in the database. - raise TryAgainLater() from dupe + except IntegrityError: + # despite previous checks, a parallel thread managed + # to create the same object already + post = cls.by_object_uri(object_uri=data["id"]) else: raise cls.DoesNotExist(f"No post with ID {data['id']}", data) if update or created: post.type = data["type"] + post.url = data.get("url", data["id"]) if post.type in (cls.Types.article, cls.Types.question): post.type_data = PostTypeData(__root__=data).__root__ try: # apparently sometimes posts (Pages?) in the fediverse - # don't have content?! + # don't have content, but this shouldn't be a total failure post.content = get_value_or_map(data, "content", "contentMap") - except KeyError: + except ActivityPubFormatError as err: + capture_message(f"{err} on {post.url}") post.content = None # Document types have names, not summaries post.summary = data.get("summary") or data.get("name") @@ -886,7 +880,6 @@ class Post(StatorModel): post.content = post.summary post.summary = None post.sensitive = data.get("sensitive", False) - post.url = data.get("url", data["id"]) post.published = parse_ld_date(data.get("published")) post.edited = parse_ld_date(data.get("updated")) post.in_reply_to = data.get("inReplyTo") @@ -930,6 +923,9 @@ class Post(StatorModel): # These have no IDs, so we have to wipe them each time post.attachments.all().delete() for attachment in get_list(data, "attachment"): + if "url" not in attachment and "href" in attachment: + # Links have hrefs, while other Objects have urls + attachment["url"] = attachment["href"] if "focalPoint" in attachment: try: focal_x, focal_y = attachment["focalPoint"] @@ -940,7 +936,9 @@ class Post(StatorModel): mimetype = attachment.get("mediaType") if not mimetype or not isinstance(mimetype, str): if "url" not in attachment: - raise ActivityPubFormatError("No URL present on attachment") + raise ActivityPubFormatError( + f"No URL present on attachment in {post.url}" + ) mimetype, _ = mimetypes.guess_type(attachment["url"]) if not mimetype: mimetype = "application/octet-stream" @@ -956,7 +954,11 @@ class Post(StatorModel): ) # Calculate stats in case we have existing replies post.calculate_stats(save=False) - post.save() + with transaction.atomic(): + # if we don't commit the transaction here, there's a chance + # the parent fetch below goes into an infinite loop + post.save() + # Potentially schedule a fetch of the reply parent, and recalculate # its stats if it's here already. if post.in_reply_to: diff --git a/users/models/identity.py b/users/models/identity.py index 97f4fcb..41c2f42 100644 --- a/users/models/identity.py +++ b/users/models/identity.py @@ -6,7 +6,7 @@ from urllib.parse import urlparse import httpx import urlman from django.conf import settings -from django.db import IntegrityError, models +from django.db import IntegrityError, models, transaction from django.utils import timezone from django.utils.functional import lazy from lxml import etree @@ -439,7 +439,13 @@ class Identity(StatorModel): # to the DB until the fetch succeeds return cls(actor_uri=uri, local=False) else: - return cls.objects.create(actor_uri=uri, local=False) + # parallelism may cause another simultaneous worker thread + # to try to create the same identity - so use database level + # constructs to avoid an integrity error + identity, created = cls.objects.update_or_create( + actor_uri=uri, local=False + ) + return identity else: raise cls.DoesNotExist(f"No identity found with actor_uri {uri}") @@ -860,8 +866,19 @@ class Identity(StatorModel): }, ) return False - - document = canonicalise(response.json(), include_security=True) + try: + document = canonicalise(response.json(), include_security=True) + except ValueError: + # servers with empty or invalid responses are inevitable + capture_message( + f"Invalid response fetching actor at {self.actor_uri}", + extras={ + "identity": self.pk, + "domain": self.domain_id, + "content": response.content, + }, + ) + return False if "type" not in document: return False self.name = document.get("name") @@ -923,7 +940,10 @@ class Identity(StatorModel): # Mark as fetched self.fetched = timezone.now() try: - self.save() + with transaction.atomic(): + # if we don't wrap this in its own transaction, the exception + # handler is guaranteed to fail + self.save() except IntegrityError as e: # See if we can fetch a PK and save there if self.pk is None: @@ -934,7 +954,8 @@ class Identity(StatorModel): f"Could not save Identity at end of actor fetch: {e}" ) self.pk: int | None = other_row.pk - self.save() + with transaction.atomic(): + self.save() # Fetch pinned posts after identity has been fetched and saved if self.featured_collection_uri: