From 06280aa5a334d80f26d4d10c08df07bdabc61915 Mon Sep 17 00:00:00 2001 From: Michael Date: Sun, 24 Jul 2022 09:26:52 +0000 Subject: [PATCH] Recursively delete failed worker tasks --- src/Protocol/ActivityPub/Processor.php | 34 ++++++++++-------- src/Protocol/ActivityPub/Queue.php | 48 ++++++++++++++++++++++++-- src/Protocol/ActivityPub/Receiver.php | 12 +++---- src/Worker/Cron.php | 7 ++-- src/Worker/FetchMissingActivity.php | 11 ++++-- 5 files changed, 82 insertions(+), 30 deletions(-) diff --git a/src/Protocol/ActivityPub/Processor.php b/src/Protocol/ActivityPub/Processor.php index aa169ab028..9d5f3f2f6e 100644 --- a/src/Protocol/ActivityPub/Processor.php +++ b/src/Protocol/ActivityPub/Processor.php @@ -551,16 +551,13 @@ class Processor } } - if ($activity['target_id'] != $actor['featured']) { - return null; + $parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]); + if (empty($parent['uri-id'])) { + if (self::fetchMissingActivity($activity['object_id'], $activity, '', Receiver::COMPLETION_AUTO)) { + $parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id']]); + } } - $id = Contact::getIdForURL($activity['actor']); - if (empty($id)) { - return null; - } - - $parent = Post::selectFirst(['uri-id'], ['uri' => $activity['object_id'], 'author-id' => $id]); if (!empty($parent['uri-id'])) { return $parent['uri-id']; } @@ -1191,20 +1188,27 @@ class Processor return ''; } + $signer = []; + + if (!empty($object['attributedTo'])) { + $attributed_to = $object['attributedTo']; + if (is_array($attributed_to)) { + $compacted = JsonLD::compact($object); + $attributed_to = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id'); + } + $signer[] = $attributed_to; + } + if (!empty($object['actor'])) { $object_actor = $object['actor']; - } elseif (!empty($object['attributedTo'])) { - $object_actor = $object['attributedTo']; - if (is_array($object_actor)) { - $compacted = JsonLD::compact($object); - $object_actor = JsonLD::fetchElement($compacted, 'as:attributedTo', '@id'); - } + } elseif (!empty($attributed_to)) { + $object_actor = $attributed_to; } else { // Shouldn't happen $object_actor = ''; } - $signer = [$object_actor]; + $signer[] = $object_actor; if (!empty($child['author'])) { $actor = $child['author']; diff --git a/src/Protocol/ActivityPub/Queue.php b/src/Protocol/ActivityPub/Queue.php index d150f9f9cc..3d40d71638 100644 --- a/src/Protocol/ActivityPub/Queue.php +++ b/src/Protocol/ActivityPub/Queue.php @@ -95,6 +95,42 @@ class Queue DBA::delete('inbox-entry', ['id' => $activity['entry-id']]); } + /** + * Delete all entries that depend on the given worker id + * + * @param integer $wid + * @return void + */ + public static function deleteByWorkerId(int $wid) + { + $entries = DBA::select('inbox-entry', ['id'], ['wid' => $wid]); + while ($entry = DBA::fetch($entries)) { + self::deleteById($entry['id']); + } + DBA::close($entries); + } + + /** + * Delete recursively an entry and all their children + * + * @param integer $id + * @return void + */ + private static function deleteById(int $id) + { + $entry = DBA::selectFirst('inbox-entry', ['id', 'object-id'], ['id' => $id]); + if (empty($entry)) { + return; + } + + $children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]); + while ($child = DBA::fetch($children)) { + self::deleteById($child['id']); + } + DBA::close($children); + DBA::delete('inbox-entry', ['id' => $entry['id']]); + } + /** * Set the worker id for the queue entry * @@ -143,8 +179,9 @@ class Queue $type = $entry['type']; $push = $entry['push']; - $activity['entry-id'] = $entry['id']; - $activity['worker-id'] = $entry['wid']; + $activity['entry-id'] = $entry['id']; + $activity['worker-id'] = $entry['wid']; + $activity['recursion-depth'] = 0; $receivers = DBA::select('inbox-entry-receiver', ['uid'], ['queue-id' => $entry['id']]); while ($receiver = DBA::fetch($receivers)) { @@ -166,8 +203,13 @@ class Queue */ public static function processAll() { - $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]); + $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id'], ["`wid` IS NULL"], ['order' => ['id' => true]]); while ($entry = DBA::fetch($entries)) { + // We don't need to process entries that depend on already existing entries. + if (!empty($entry['in-reply-to-id']) && DBA::exists('inbox-entry', ['object-id' => $entry['in-reply-to-id']])) { + continue; + } + Logger::debug('Process leftover entry', $entry); self::process($entry['id']); } } diff --git a/src/Protocol/ActivityPub/Receiver.php b/src/Protocol/ActivityPub/Receiver.php index 92ed31922b..3e34510749 100644 --- a/src/Protocol/ActivityPub/Receiver.php +++ b/src/Protocol/ActivityPub/Receiver.php @@ -517,10 +517,6 @@ class Receiver } } - if (($type == 'as:Add') && is_array($activity['as:object']) && (count($activity['as:object']) == 1)) { - $trust_source = false; - } - // $trust_source is called by reference and is set to true if the content was retrieved successfully $object_data = self::prepareObjectData($activity, $uid, $push, $trust_source); if (empty($object_data)) { @@ -556,10 +552,6 @@ class Receiver $object_data['thread-children-type'] = $activity['thread-children-type']; } - if (!empty($activity['recursion-depth'])) { - $object_data['recursion-depth'] = $activity['recursion-depth']; - } - // Internal flag for posts that arrived via relay if (!empty($activity['from-relay'])) { $object_data['from-relay'] = $activity['from-relay']; @@ -571,6 +563,10 @@ class Receiver $object_data = Queue::add($object_data, $type, $uid, $http_signer, $push); + if (!empty($activity['recursion-depth'])) { + $object_data['recursion-depth'] = $activity['recursion-depth']; + } + if (in_array('as:Question', [$object_data['object_type'] ?? '', $object_data['object_object_type'] ?? ''])) { self::storeUnhandledActivity(false, $type, $object_data, $activity, $body, $uid, $trust_source, $push, $signer); } diff --git a/src/Worker/Cron.php b/src/Worker/Cron.php index 68ad218014..c2109e66a3 100644 --- a/src/Worker/Cron.php +++ b/src/Worker/Cron.php @@ -128,11 +128,14 @@ class Cron if (DI::config()->get('system', 'optimize_tables')) { Worker::add(PRIORITY_LOW, 'OptimizeTables'); } - - DI::config()->set('system', 'last_cron_daily', time()); + + // Process all unprocessed entries + Queue::processAll(); // Resubscribe to relay servers Relay::reSubscribe(); + + DI::config()->set('system', 'last_cron_daily', time()); } Logger::notice('end'); diff --git a/src/Worker/FetchMissingActivity.php b/src/Worker/FetchMissingActivity.php index ae0f8a7fbf..91b473a6ef 100644 --- a/src/Worker/FetchMissingActivity.php +++ b/src/Worker/FetchMissingActivity.php @@ -23,6 +23,7 @@ namespace Friendica\Worker; use Friendica\Core\Logger; use Friendica\Core\Worker; +use Friendica\DI; use Friendica\Protocol\ActivityPub; use Friendica\Protocol\ActivityPub\Queue; use Friendica\Protocol\ActivityPub\Receiver; @@ -32,6 +33,8 @@ class FetchMissingActivity /** * Fetch missing activities * @param string $url Contact URL + * + * @return void */ public static function execute(string $url, array $child = [], string $relay_actor = '', int $completion = Receiver::COMPLETION_MANUAL) { @@ -39,10 +42,14 @@ class FetchMissingActivity $result = ActivityPub\Processor::fetchMissingActivity($url, $child, $relay_actor, $completion); if ($result) { Logger::info('Successfully fetched missing activity', ['url' => $url]); - Queue::processReplyByUri($url); } elseif (!Worker::defer()) { - // @todo perform recursive deletion of all entries Logger::info('Activity could not be fetched', ['url' => $url]); + + // recursively delete all entries that belong to this worker task + $queue = DI::app()->getQueue(); + if (!empty($queue['id'])) { + Queue::deleteByWorkerId($queue['id']); + } } else { Logger::info('Fetching deferred', ['url' => $url]); }