diff --git a/lib/Controller/ActivityPubController.php b/lib/Controller/ActivityPubController.php index e4915d45..0cab6d75 100644 --- a/lib/Controller/ActivityPubController.php +++ b/lib/Controller/ActivityPubController.php @@ -31,6 +31,8 @@ namespace OCA\Social\Controller; use daita\MySmallPhpTools\Traits\Nextcloud\TNCDataResponse; +use daita\MySmallPhpTools\Traits\TAsync; +use daita\MySmallPhpTools\Traits\TStringTools; use Exception; use OC\AppFramework\Http; use OCA\Social\AppInfo\Application; @@ -42,6 +44,7 @@ use OCA\Social\Service\FollowService; use OCA\Social\Service\ImportService; use OCA\Social\Service\MiscService; use OCA\Social\Service\SignatureService; +use OCA\Social\Service\StreamQueueService; use OCP\AppFramework\Controller; use OCP\AppFramework\Http\Response; use OCP\IRequest; @@ -51,6 +54,9 @@ class ActivityPubController extends Controller { use TNCDataResponse; + use TStringTools; + use TAsync; + /** @var SocialPubController */ @@ -62,6 +68,9 @@ class ActivityPubController extends Controller { /** @var SignatureService */ private $signatureService; + /** @var StreamQueueService */ + private $streamQueueService; + /** @var ImportService */ private $importService; @@ -79,6 +88,7 @@ class ActivityPubController extends Controller { * @param SocialPubController $socialPubController * @param CacheActorService $cacheActorService * @param SignatureService $signatureService + * @param StreamQueueService $streamQueueService * @param ImportService $importService * @param FollowService $followService * @param MiscService $miscService @@ -86,13 +96,15 @@ class ActivityPubController extends Controller { public function __construct( IRequest $request, SocialPubController $socialPubController, CacheActorService $cacheActorService, SignatureService $signatureService, - ImportService $importService, FollowService $followService, MiscService $miscService + StreamQueueService $streamQueueService, ImportService $importService, + FollowService $followService, MiscService $miscService ) { parent::__construct(Application::APP_NAME, $request); $this->socialPubController = $socialPubController; $this->cacheActorService = $cacheActorService; $this->signatureService = $signatureService; + $this->streamQueueService = $streamQueueService; $this->importService = $importService; $this->followService = $followService; $this->miscService = $miscService; @@ -178,7 +190,11 @@ class ActivityPubController extends Controller { } catch (ItemUnknownException $e) { } - return $this->success([]); + $this->async(); + $this->streamQueueService->cacheStreamByToken($activity->getRequestToken()); + + // or it will feed the logs. + exit(); } catch (SignatureIsGoneException $e) { return $this->fail($e, [], Http::STATUS_GONE, false); } catch (Exception $e) { diff --git a/lib/Db/NotesRequest.php b/lib/Db/NotesRequest.php index 306c85ff..3d42679e 100644 --- a/lib/Db/NotesRequest.php +++ b/lib/Db/NotesRequest.php @@ -30,11 +30,13 @@ declare(strict_types=1); namespace OCA\Social\Db; +use daita\MySmallPhpTools\Model\Cache; use DateTime; use OCA\Social\Exceptions\NoteNotFoundException; use OCA\Social\Model\ActivityPub\ACore; use OCA\Social\Model\ActivityPub\Actor\Person; use OCA\Social\Model\ActivityPub\Object\Note; +use OCA\Social\Model\ActivityPub\Stream; use OCA\Social\Service\ConfigService; use OCA\Social\Service\MiscService; use OCP\DB\QueryBuilder\IQueryBuilder; @@ -60,12 +62,17 @@ class NotesRequest extends NotesRequestBuilder { /** * Insert a new Note in the database. * - * @param Note $note + * @param Stream $note */ - public function save(Note $note) { + public function save(Stream $note) { $dTime = new DateTime(); $dTime->setTimestamp($note->getPublishedTime()); + $cache = '[]'; + if ($note->gotCache()) { + $cache = json_encode($note->getCache(), JSON_UNESCAPED_SLASHES); + } + $qb = $this->getNotesInsertSql(); $qb->setValue('id', $qb->createNamedParameter($note->getId())) ->setValue('type', $qb->createNamedParameter($note->getType())) @@ -94,6 +101,7 @@ class NotesRequest extends NotesRequestBuilder { ->setValue('attributed_to', $qb->createNamedParameter($note->getAttributedTo())) ->setValue('in_reply_to', $qb->createNamedParameter($note->getInReplyTo())) ->setValue('source', $qb->createNamedParameter($note->getSource())) + ->setValue('cache', $qb->createNamedParameter($cache)) ->setValue( 'instances', $qb->createNamedParameter( json_encode($note->getInstancePaths(), JSON_UNESCAPED_SLASHES) @@ -109,6 +117,20 @@ class NotesRequest extends NotesRequestBuilder { } + /** + * @param Stream $stream + * @param Cache $cache + */ + public function updateCache(Stream $stream, Cache $cache) { + $qb = $this->getNotesUpdateSql(); + $qb->set('cache', $qb->createNamedParameter(json_encode($cache, JSON_UNESCAPED_SLASHES))); + + $this->limitToIdString($qb, $stream->getId()); + + $qb->execute(); + } + + /** * @param string $id * diff --git a/lib/Db/StreamQueueRequest.php b/lib/Db/StreamQueueRequest.php new file mode 100644 index 00000000..70b1eb9d --- /dev/null +++ b/lib/Db/StreamQueueRequest.php @@ -0,0 +1,189 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Db; + + +use DateTime; +use OCA\Social\Exceptions\QueueStatusException; +use OCA\Social\Model\StreamQueue; +use OCP\DB\QueryBuilder\IQueryBuilder; + + +/** + * Class StreamQueueRequest + * + * @package OCA\Social\Db + */ +class StreamQueueRequest extends StreamQueueRequestBuilder { + + + /** + * create a new Queue in the database. + * + * @param StreamQueue $queue + */ + public function create(StreamQueue $queue) { + $qb = $this->getStreamQueueInsertSql(); + $qb->setValue('token', $qb->createNamedParameter($queue->getToken())) + ->setValue('stream_id', $qb->createNamedParameter($queue->getStreamId())) + ->setValue('type', $qb->createNamedParameter($queue->getType())) + ->setValue('status', $qb->createNamedParameter($queue->getStatus())) + ->setValue('tries', $qb->createNamedParameter($queue->getTries())); + $qb->execute(); + } + + + /** + * return Queue from database based on the status=0 + * + * @return StreamQueue[] + */ + public function getStandby(): array { + $qb = $this->getStreamQueueSelectSql(); + $this->limitToStatus($qb, StreamQueue::STATUS_STANDBY); + $qb->orderBy('id', 'asc'); + + $requests = []; + $cursor = $qb->execute(); + while ($data = $cursor->fetch()) { + $requests[] = $this->parseStreamQueueSelectSql($data); + } + $cursor->closeCursor(); + + return $requests; + } + + + /** + * return Queue from database based on the token + * + * @param string $token + * + * @return StreamQueue[] + */ + public function getFromToken(string $token): array { + $qb = $this->getStreamQueueSelectSql(); + $this->limitToToken($qb, $token); + + $queue = []; + $cursor = $qb->execute(); + while ($data = $cursor->fetch()) { + $queue[] = $this->parseStreamQueueSelectSql($data); + } + $cursor->closeCursor(); + + return $queue; + } + + + /** + * @param StreamQueue $queue + * + * @throws QueueStatusException + */ + public function setAsRunning(StreamQueue &$queue) { + $qb = $this->getStreamQueueUpdateSql(); + $qb->set('status', $qb->createNamedParameter(StreamQueue::STATUS_RUNNING)) + ->set( + 'last', + $qb->createNamedParameter(new DateTime('now'), IQueryBuilder::PARAM_DATE) + ); + $this->limitToId($qb, $queue->getId()); + $this->limitToStatus($qb, StreamQueue::STATUS_STANDBY); + + $count = $qb->execute(); + + if ($count === 0) { + throw new QueueStatusException(); + } + + $queue->setStatus(StreamQueue::STATUS_RUNNING); + } + + + /** + * @param StreamQueue $queue + * + * @throws QueueStatusException + */ + public function setAsSuccess(StreamQueue &$queue) { + $qb = $this->getStreamQueueUpdateSql(); + $qb->set('status', $qb->createNamedParameter(StreamQueue::STATUS_SUCCESS)); + $this->limitToId($qb, $queue->getId()); + $this->limitToStatus($qb, StreamQueue::STATUS_RUNNING); + + $count = $qb->execute(); + + if ($count === 0) { + throw new QueueStatusException(); + } + + $queue->setStatus(StreamQueue::STATUS_SUCCESS); + } + + + /** + * @param StreamQueue $queue + * + * @throws QueueStatusException + */ + public function setAsFailure(StreamQueue &$queue) { + $qb = $this->getStreamQueueUpdateSql(); + $func = $qb->func(); + $expr = $qb->expr(); + + $qb->set('status', $qb->createNamedParameter(StreamQueue::STATUS_STANDBY)) + ->set('tries', $func->add('tries', $expr->literal(1))); + $this->limitToId($qb, $queue->getId()); + $this->limitToStatus($qb, StreamQueue::STATUS_RUNNING); + + $count = $qb->execute(); + + if ($count === 0) { + throw new QueueStatusException(); + } + + $queue->setStatus(StreamQueue::STATUS_SUCCESS); + } + + + /** + * @param StreamQueue $queue + */ + public function delete(StreamQueue $queue) { + $qb = $this->getStreamQueueDeleteSql(); + $this->limitToId($qb, $queue->getId()); + + $qb->execute(); + } + +} + diff --git a/lib/Db/StreamQueueRequestBuilder.php b/lib/Db/StreamQueueRequestBuilder.php new file mode 100644 index 00000000..f4ac89a5 --- /dev/null +++ b/lib/Db/StreamQueueRequestBuilder.php @@ -0,0 +1,116 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OCA\Social\Db; + + +use daita\MySmallPhpTools\Traits\TArrayTools; +use OCA\Social\Model\RequestQueue; +use OCA\Social\Model\StreamQueue; +use OCP\DB\QueryBuilder\IQueryBuilder; + +class StreamQueueRequestBuilder extends CoreRequestBuilder { + + + use TArrayTools; + + + /** + * Base of the Sql Insert request + * + * @return IQueryBuilder + */ + protected function getStreamQueueInsertSql(): IQueryBuilder { + $qb = $this->dbConnection->getQueryBuilder(); + $qb->insert(self::TABLE_QUEUE_STREAM); + + return $qb; + } + + + /** + * Base of the Sql Update request + * + * @return IQueryBuilder + */ + protected function getStreamQueueUpdateSql(): IQueryBuilder { + $qb = $this->dbConnection->getQueryBuilder(); + $qb->update(self::TABLE_QUEUE_STREAM); + + return $qb; + } + + + /** + * Base of the Sql Select request for Shares + * + * @return IQueryBuilder + */ + protected function getStreamQueueSelectSql(): IQueryBuilder { + $qb = $this->dbConnection->getQueryBuilder(); + + /** @noinspection PhpMethodParametersCountMismatchInspection */ + $qb->select( + 'qs.id', 'qs.token', 'qs.stream_id', 'qs.type', 'qs.status', 'qs.tries', 'qs.last' + ) + ->from(self::TABLE_QUEUE_STREAM, 'qs'); + + $this->defaultSelectAlias = 'qs'; + + return $qb; + } + + + /** + * Base of the Sql Delete request + * + * @return IQueryBuilder + */ + protected function getStreamQueueDeleteSql(): IQueryBuilder { + $qb = $this->dbConnection->getQueryBuilder(); + $qb->delete(self::TABLE_QUEUE_STREAM); + + return $qb; + } + + + /** + * @param array $data + * + * @return StreamQueue + */ + protected function parseStreamQueueSelectSql($data): StreamQueue { + $queue = new StreamQueue(); + $queue->importFromDatabase($data); + + return $queue; + } + +} + diff --git a/lib/Model/ActivityPub/ACore.php b/lib/Model/ActivityPub/ACore.php index 4cac3361..1a2f3cfa 100644 --- a/lib/Model/ActivityPub/ACore.php +++ b/lib/Model/ActivityPub/ACore.php @@ -32,6 +32,7 @@ namespace OCA\Social\Model\ActivityPub; use daita\MySmallPhpTools\Traits\TArrayTools; use daita\MySmallPhpTools\Traits\TPathTools; +use daita\MySmallPhpTools\Traits\TStringTools; use JsonSerializable; use OCA\Social\Exceptions\ActivityCantBeVerifiedException; use OCA\Social\Exceptions\InvalidOriginException; @@ -45,6 +46,7 @@ class ACore extends Item implements JsonSerializable { use TArrayTools; + use TStringTools; use TPathTools; @@ -64,6 +66,9 @@ class ACore extends Item implements JsonSerializable { /** @var null Item */ private $parent = null; + /** @var string */ + private $requestToken = ''; + /** @var array */ private $entries = []; @@ -92,6 +97,30 @@ class ACore extends Item implements JsonSerializable { } + /** + * @return string + */ + public function getRequestToken(): string { + if ($this->isRoot()) { + return $this->requestToken; + } else { + return $this->getRoot() + ->getRequestToken(); + } + } + + /** + * @param string $token + * + * @return ACore + */ + public function setRequestToken(string $token): ACore { + $this->requestToken = $token; + + return $this; + } + + /** * @param ACore $parent * @@ -237,12 +266,7 @@ class ACore extends Item implements JsonSerializable { $base = $this->withoutEndSlash($this->withBeginSlash($base)); } - $uuid = sprintf( - '%04x%04x-%04x-%04x-%04x-%04x%04x%04x', mt_rand(0, 0xffff), mt_rand(0, 0xffff), - mt_rand(0, 0xffff), mt_rand(0, 0xfff) | 0x4000, mt_rand(0, 0x3fff) | 0x8000, - mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff) - ); - + $uuid = $this->uuid(); $this->setId($url . $base . '/' . $uuid); } diff --git a/lib/Model/StreamQueue.php b/lib/Model/StreamQueue.php new file mode 100644 index 00000000..1fcf1fe4 --- /dev/null +++ b/lib/Model/StreamQueue.php @@ -0,0 +1,264 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Model; + + +use daita\MySmallPhpTools\Traits\TArrayTools; +use DateTime; +use JsonSerializable; + + +/** + * Class StreamQueue + * + * @package OCA\Social\Model + */ +class StreamQueue implements JsonSerializable { + + + use TArrayTools; + + + const TYPE_CACHE = 'Cache'; + const TYPE_VERIFY = 'Signature'; + + const STATUS_STANDBY = 0; + const STATUS_RUNNING = 1; + const STATUS_SUCCESS = 9; + + + /** @var integer */ + private $id = 0; + + /** @var string */ + private $token = ''; + + /** @var string */ + private $streamId = ''; + + /** @var string */ + private $type = ''; + + /** @var int */ + private $status = 0; + + /** @var int */ + private $tries = 0; + + /** @var int */ + private $last = 0; + + + /** + * StreamQueue constructor. + * + * @param string $token + * @param string $type + * @param string $streamId + */ + public function __construct(string $token = '', string $type = '', string $streamId = '') { + $this->token = $token; + $this->type = $type; + $this->streamId = $streamId; + } + + + /** + * @return int + */ + public function getId(): int { + return $this->id; + } + + /** + * @param int $id + * + * @return StreamQueue + */ + public function setId(int $id): StreamQueue { + $this->id = $id; + + return $this; + } + + + /** + * @return string + */ + public function getToken(): string { + return $this->token; + } + + /** + * @param string $token + * + * @return StreamQueue + */ + public function setToken(string $token): StreamQueue { + $this->token = $token; + + return $this; + } + + + /** + * @return string + */ + public function getStreamId(): string { + return $this->streamId; + } + + /** + * @param string $streamId + * + * @return StreamQueue + */ + public function setStreamId(string $streamId): StreamQueue { + $this->streamId = $streamId; + + return $this; + } + + + /** + * @return string + */ + public function getType(): string { + return $this->type; + } + + /** + * @param string $type + * + * @return StreamQueue + */ + public function setType(string $type): StreamQueue { + $this->type = $type; + + return $this; + } + + + /** + * @return int + */ + public function getStatus(): int { + return $this->status; + } + + /** + * @param int $status + * + * @return StreamQueue + */ + public function setStatus(int $status): StreamQueue { + $this->status = $status; + + return $this; + } + + + /** + * @return int + */ + public function getTries(): int { + return $this->tries; + } + + /** + * @param int $tries + * + * @return StreamQueue + */ + public function setTries(int $tries): StreamQueue { + $this->tries = $tries; + + return $this; + } + + + /** + * @return int + */ + public function getLast(): int { + return $this->last; + } + + /** + * @param int $last + * + * @return StreamQueue + */ + public function setLast(int $last): StreamQueue { + $this->last = $last; + + return $this; + } + + + /** + * @param array $data + */ + public function importFromDatabase(array $data) { + $this->setId($this->getInt('id', $data, 0)); + $this->setToken($this->get('token', $data, '')); + $this->setStreamId($this->get('stream_id', $data, '')); + $this->setType($this->get('type', $data, '')); + $this->setStatus($this->getInt('status', $data, 0)); + $this->setTries($this->getInt('tries', $data, 0)); + + $last = $this->get('last', $data, ''); + if ($last === '') { + $this->setLast(0); + } else { + $dTime = new DateTime($last); + $this->setLast($dTime->getTimestamp()); + } + } + + + /** + * @return array + */ + public function jsonSerialize(): array { + return [ + 'id' => $this->getId(), + 'token' => $this->getToken(), + 'streamId' => $this->getStreamId(), + 'type' => $this->getType(), + 'status' => $this->getStatus(), + 'tries' => $this->getTries(), + 'last' => $this->getLast() + ]; + } + +} + diff --git a/lib/Service/ImportService.php b/lib/Service/ImportService.php index 24d8390e..de25268a 100644 --- a/lib/Service/ImportService.php +++ b/lib/Service/ImportService.php @@ -32,6 +32,7 @@ namespace OCA\Social\Service; use daita\MySmallPhpTools\Traits\TArrayTools; +use daita\MySmallPhpTools\Traits\TStringTools; use Exception; use OCA\Social\AP; use OCA\Social\Exceptions\ActivityPubFormatException; @@ -46,6 +47,7 @@ class ImportService { use TArrayTools; + use TStringTools; /** @var ConfigService */ @@ -94,6 +96,7 @@ class ImportService { */ public function parseIncomingRequest(ACore $activity) { $activity->checkOrigin($activity->getId()); + $activity->setRequestToken($this->uuid()); $interface = AP::$activityPub->getInterfaceForItem($activity); try { diff --git a/lib/Service/SignatureService.php b/lib/Service/SignatureService.php index bb8b6102..505d9b9c 100644 --- a/lib/Service/SignatureService.php +++ b/lib/Service/SignatureService.php @@ -64,6 +64,7 @@ class SignatureService { const ORIGIN_HEADER = 1; const ORIGIN_SIGNATURE = 2; + const ORIGIN_REQUEST = 3; const DATE_HEADER = 'D, d M Y H:i:s T'; diff --git a/lib/Service/StreamQueueService.php b/lib/Service/StreamQueueService.php new file mode 100644 index 00000000..2fd19033 --- /dev/null +++ b/lib/Service/StreamQueueService.php @@ -0,0 +1,345 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OCA\Social\Service; + + +use daita\MySmallPhpTools\Exceptions\MalformedArrayException; +use daita\MySmallPhpTools\Model\Cache; +use daita\MySmallPhpTools\Model\CacheItem; +use OCA\Social\AP; +use OCA\Social\Db\NotesRequest; +use OCA\Social\Db\StreamQueueRequest; +use OCA\Social\Exceptions\InvalidOriginException; +use OCA\Social\Exceptions\InvalidResourceException; +use OCA\Social\Exceptions\ItemNotFoundException; +use OCA\Social\Exceptions\ItemUnknownException; +use OCA\Social\Exceptions\NoteNotFoundException; +use OCA\Social\Exceptions\QueueStatusException; +use OCA\Social\Exceptions\RedundancyLimitException; +use OCA\Social\Exceptions\RequestContentException; +use OCA\Social\Exceptions\RequestNetworkException; +use OCA\Social\Exceptions\RequestResultNotJsonException; +use OCA\Social\Exceptions\RequestResultSizeException; +use OCA\Social\Exceptions\RequestServerException; +use OCA\Social\Exceptions\SocialAppConfigException; +use OCA\Social\Model\ActivityPub\Object\Note; +use OCA\Social\Model\ActivityPub\Stream; +use OCA\Social\Model\StreamQueue; + + +/** + * Class StreamQueueService + * + * @package OCA\Social\Service + */ +class StreamQueueService { + + + /** @var NotesRequest */ + private $notesRequest; + + /** @var StreamQueueRequest */ + private $streamQueueRequest; + + private $importService; + + /** @var CurlService */ + private $curlService; + + /** @var MiscService */ + private $miscService; + + + /** + * StreamQueueService constructor. + * + * @param NotesRequest $notesRequest + * @param StreamQueueRequest $streamQueueRequest + * @param ImportService $importService + * @param CurlService $curlService + * @param MiscService $miscService + */ + public function __construct( + NotesRequest $notesRequest, StreamQueueRequest $streamQueueRequest, + ImportService $importService, CurlService $curlService, MiscService $miscService + ) { + $this->notesRequest = $notesRequest; + $this->streamQueueRequest = $streamQueueRequest; + $this->importService = $importService; + $this->curlService = $curlService; + $this->miscService = $miscService; + } + + + /** + * @param string $token + * @param string $type + * @param string $streamId + */ + public function generateStreamQueue(string $token, string $type, string $streamId) { + $cache = new StreamQueue($token, $type, $streamId); + + $this->streamQueueRequest->create($cache); + } + + + /** + * @param int $total + * + * @return StreamQueue[] + */ + public function getRequestStandby(int &$total = 0): array { + $queue = $this->streamQueueRequest->getStandby(); + $total = sizeof($queue); + + $result = []; + foreach ($queue as $request) { + $delay = floor(pow($request->getTries(), 4) / 3); + if ($request->getLast() < (time() - $delay)) { + $result[] = $request; + } + } + + return $result; + } + + + /** + * @param string $token + */ + public function cacheStreamByToken(string $token) { + $this->miscService->log('Cache: ' . $token); + $items = $this->streamQueueRequest->getFromToken($token); + + foreach ($items as $item) { + $this->manageStreamQueue($item); + } + } + + + /** + * @param StreamQueue $queue + */ + public function manageStreamQueue(StreamQueue $queue) { + + try { +// $this->initRequest($queue); + } catch (QueueStatusException $e) { + return; + } + + switch ($queue->getType()) { + case 'Cache': + $this->manageStreamQueueCache($queue); + break; + + default: + $this->deleteRequest($queue); + break; + } + } + + + private function manageStreamQueueCache(StreamQueue $queue) { + try { + $stream = $this->notesRequest->getNoteById($queue->getStreamId()); + } catch (NoteNotFoundException $e) { + $this->deleteRequest($queue); + + return; + } + + if (!$stream->gotCache()) { + $this->deleteRequest($queue); + + return; + } + + $this->manageStreamCache($stream); + } + + + /** + * @param Stream $stream + */ + private function manageStreamCache(Stream $stream) { + $cache = $stream->getCache(); + foreach ($cache->getItems() as $item) { + $this->cacheItem($item); + $cache->updateItem($item); + } + + $this->updateCache($stream, $cache); + } + + + /** + * @param CacheItem $item + */ + private function cacheItem(CacheItem &$item) { + + try { + $data = $this->curlService->retrieveObject($item->getUrl()); + $object = AP::$activityPub->getItemFromData($data); + + $origin = parse_url($item->getUrl(), PHP_URL_HOST); + $object->setOrigin($origin, SignatureService::ORIGIN_REQUEST, time()); + + if ($object->getId() !== $item->getUrl()) { + throw new InvalidOriginException(); + } + + if ($object->getType() !== Note::TYPE) { + throw new InvalidResourceException(); + } + + $interface = AP::$activityPub->getInterfaceForItem($object); + $interface->save($object); + + $note = $this->notesRequest->getNoteById($object->getId()); + $item->setContent(json_encode($note, JSON_UNESCAPED_SLASHES)); + +//$this->endRequest($queue, true); + + } catch (NoteNotFoundException $e) { + $this->miscService->log( + 'Error caching stream: ' . json_encode($item) . ' ' . get_class($e) . ' ' + . $e->getMessage(), 1 + ); +// $this->deleteRequest($queue); + } catch (InvalidOriginException $e) { + $this->miscService->log( + 'Error caching stream: ' . json_encode($item) . ' ' . get_class($e) . ' ' + . $e->getMessage(), 1 + ); +// $this->deleteRequest($queue); + } catch (RequestContentException $e) { + $this->miscService->log( + 'Error caching stream: ' . json_encode($item) . ' ' . get_class($e) . ' ' + . $e->getMessage(), 1 + ); +// $this->deleteRequest($queue); + } catch (RequestNetworkException $e) { + $this->miscService->log( + 'Error caching stream: ' . json_encode($item) . ' ' . get_class($e) . ' ' + . $e->getMessage(), 1 + ); +// $this->endRequest($queue, false); + } catch (RequestResultNotJsonException $e) { + $this->miscService->log( + 'Error caching stream: ' . json_encode($item) . ' ' . get_class($e) . ' ' + . $e->getMessage(), 1 + ); +// $this->endRequest($queue, false); + } catch (RequestResultSizeException $e) { + $this->miscService->log( + 'Error caching stream: ' . json_encode($item) . ' ' . get_class($e) . ' ' + . $e->getMessage(), 1 + ); +// $this->deleteRequest($queue); + } catch (RequestServerException $e) { + $this->miscService->log( + 'Error caching stream: ' . json_encode($item) . ' ' . get_class($e) . ' ' + . $e->getMessage(), 1 + ); +// $this->endRequest($queue, false); + } catch (MalformedArrayException $e) { +//$this->deleteRequest($item); + } catch (ItemUnknownException $e) { + $this->miscService->log( + 'Error caching stream: ' . json_encode($item) . ' ' . get_class($e) . ' ' + . $e->getMessage(), 1 + ); +// $this->deleteRequest($item); + } catch (RedundancyLimitException $e) { + $this->miscService->log( + 'Error caching stream: ' . json_encode($item) . ' ' . get_class($e) . ' ' + . $e->getMessage(), 1 + ); +// $this->deleteRequest($item); + } catch (SocialAppConfigException $e) { + } + + } + + + /** + * @param Stream $stream + * @param Cache $cache + */ + private function updateCache(Stream $stream, Cache $cache) { + $this->notesRequest->updateCache($stream, $cache); + } + + + /** + * @param StreamQueue $queue + * + * @throws QueueStatusException + */ + public function initRequest(StreamQueue $queue) { + $this->streamQueueRequest->setAsRunning($queue); + } + + + /** + * @param StreamQueue $queue + * @param bool $success + */ + public function endRequest(StreamQueue $queue, bool $success) { + try { + if ($success === true) { + $this->streamQueueRequest->setAsSuccess($queue); + } else { + $this->streamQueueRequest->setAsFailure($queue); + } + } catch (QueueStatusException $e) { + } + } + + + /** + * @param StreamQueue $queue + */ + public function deleteRequest(StreamQueue $queue) { +// try { +// $stream = $this->notesRequest->getNoteById($queue->getStreamId()); +// $cache = $stream->getCache(); + +// $cache->removeItem($queue->get) +// } catch (NoteNotFoundException $e) { +// } + + $this->streamQueueRequest->delete($queue); + } + + +} +