From b2a045cb696d85c5e21a7c25ad72619a5ca607b1 Mon Sep 17 00:00:00 2001 From: Maxence Lange Date: Tue, 27 Nov 2018 15:59:19 -0100 Subject: [PATCH] queue and async Signed-off-by: Maxence Lange --- appinfo/database.xml | 74 ++++ appinfo/info.xml | 3 +- appinfo/routes.php | 39 ++- lib/Command/NoteCreate.php | 4 +- lib/Command/QueueStatus.php | 110 ++++++ lib/Controller/QueueController.php | 115 +++++++ lib/Db/CoreRequestBuilder.php | 19 +- lib/Db/NotesRequestBuilder.php | 4 +- lib/Db/RequestQueueRequest.php | 199 +++++++++++ lib/Db/RequestQueueRequestBuilder.php | 116 +++++++ lib/Exceptions/EmptyQueueException.php | 8 + .../NoHighPriorityRequestException.php | 8 + lib/Exceptions/QueueStatusException.php | 8 + lib/Model/RequestQueue.php | 323 ++++++++++++++++++ lib/Service/ActivityService.php | 178 ++++++---- lib/Service/ConfigService.php | 7 + lib/Service/CurlService.php | 40 ++- lib/Service/QueueService.php | 191 +++++++++++ 18 files changed, 1370 insertions(+), 76 deletions(-) create mode 100644 lib/Command/QueueStatus.php create mode 100644 lib/Controller/QueueController.php create mode 100644 lib/Db/RequestQueueRequest.php create mode 100644 lib/Db/RequestQueueRequestBuilder.php create mode 100644 lib/Exceptions/EmptyQueueException.php create mode 100644 lib/Exceptions/NoHighPriorityRequestException.php create mode 100644 lib/Exceptions/QueueStatusException.php create mode 100644 lib/Model/RequestQueue.php create mode 100644 lib/Service/QueueService.php diff --git a/appinfo/database.xml b/appinfo/database.xml index 9ccbc389..c3b073fa 100644 --- a/appinfo/database.xml +++ b/appinfo/database.xml @@ -468,5 +468,79 @@ + + *dbprefix*social_request_queue + + + + id + integer + 11 + true + true + true + true + + + + token + text + 63 + true + + + + author + text + 1270 + true + + + + activity + text + 6000 + true + + + + instance + text + 500 + false + + + + priority + integer + 1 + 0 + false + + + + status + integer + 1 + 0 + false + + + + tries + integer + 2 + 0 + false + + + + last + timestamp + + + +
+ diff --git a/appinfo/info.xml b/appinfo/info.xml index 1283707a..29f0a4a3 100644 --- a/appinfo/info.xml +++ b/appinfo/info.xml @@ -5,7 +5,7 @@ Social 🎉 Nextcloud becomes part of the federated social networks! - 0.0.43 + 0.0.50 agpl Maxence Lange Julius Härtl @@ -32,6 +32,7 @@ OCA\Social\Command\CacheRefresh + OCA\Social\Command\QueueStatus OCA\Social\Command\NoteCreate diff --git a/appinfo/routes.php b/appinfo/routes.php index 6f86395e..0a7aad2b 100644 --- a/appinfo/routes.php +++ b/appinfo/routes.php @@ -1,12 +1,39 @@ OCA\MailTest\Controller\PageController->index() + * Nextcloud - Social Support + * + * This file is licensed under the Affero General Public License version 3 or + * later. See the COPYING file. + * + * @author Maxence Lange + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . * - * The controller class has to be registered in the application.php file since - * it's instantiated in there */ + + +namespace OCA\Social\AppInfo; + + +use OCA\Social\Service\CurlService; + + return [ 'routes' => [ ['name' => 'Navigation#navigate', 'url' => '/', 'verb' => 'GET'], @@ -52,6 +79,8 @@ return [ ['name' => 'Local#actorInfo', 'url' => '/api/v1/actor/info', 'verb' => 'GET'], ['name' => 'Local#documentsCache', 'url' => '/api/v1/documents/cache', 'verb' => 'POST'], + ['name' => 'Queue#asyncWithToken', 'url' => CurlService::ASYNC_TOKEN, 'verb' => 'POST'], + [ 'name' => 'Config#setCloudAddress', 'url' => '/api/v1/config/cloudAddress', 'verb' => 'POST' diff --git a/lib/Command/NoteCreate.php b/lib/Command/NoteCreate.php index 0d4f6f72..307fa4af 100644 --- a/lib/Command/NoteCreate.php +++ b/lib/Command/NoteCreate.php @@ -135,10 +135,10 @@ class NoteCreate extends Base { $post->setReplyTo(($replyTo === null) ? '' : $replyTo); $post->addTo(($to === null) ? '' : $to); - $result = $this->postService->createPost($post, $activity); + $token = $this->postService->createPost($post, $activity); echo 'object: ' . json_encode($activity, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES) . "\n"; - echo 'result: ' . json_encode($result, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES) . "\n"; + echo 'token: ' . $token . "\n"; } } diff --git a/lib/Command/QueueStatus.php b/lib/Command/QueueStatus.php new file mode 100644 index 00000000..932b7ea2 --- /dev/null +++ b/lib/Command/QueueStatus.php @@ -0,0 +1,110 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Command; + +use Exception; +use OC\Core\Command\Base; +use OCA\Social\Service\ConfigService; +use OCA\Social\Service\MiscService; +use OCA\Social\Service\QueueService; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; + + +class QueueStatus extends Base { + + + /** @var ConfigService */ + private $configService; + + /** @var QueueService */ + private $queueService; + + /** @var MiscService */ + private $miscService; + + + /** + * NoteCreate constructor. + * + * @param QueueService $queueService + * @param ConfigService $configService + * @param MiscService $miscService + */ + public function __construct( + QueueService $queueService, ConfigService $configService, MiscService $miscService + ) { + parent::__construct(); + + $this->queueService = $queueService; + $this->configService = $configService; + $this->miscService = $miscService; + } + + + /** + * + */ + protected function configure() { + parent::configure(); + $this->setName('social:queue:status') + ->addOption( + 'token', 't', InputOption::VALUE_OPTIONAL, 'token of a request' + ) + ->setDescription('Return status on the request queue'); + } + + + /** + * @param InputInterface $input + * @param OutputInterface $output + * + * @throws Exception + */ + protected function execute(InputInterface $input, OutputInterface $output) { + + $token = $input->getOption('token'); + + if ($token === null) { + throw new Exception('As of today, --token is mandatory'); + } + + $requests = $this->queueService->getRequestFromToken($token); + + foreach ($requests as $request) { + $output->writeLn(json_encode($request)); + } + + } + +} + diff --git a/lib/Controller/QueueController.php b/lib/Controller/QueueController.php new file mode 100644 index 00000000..f1594c85 --- /dev/null +++ b/lib/Controller/QueueController.php @@ -0,0 +1,115 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OCA\Social\Controller; + + +use daita\MySmallPhpTools\Traits\TAsync; +use OCA\Social\AppInfo\Application; +use OCA\Social\Exceptions\ActorDoesNotExistException; +use OCA\Social\Exceptions\RequestException; +use OCA\Social\Exceptions\SocialAppConfigException; +use OCA\Social\Model\RequestQueue; +use OCA\Social\Service\ActivityService; +use OCA\Social\Service\CurlService; +use OCA\Social\Service\MiscService; +use OCA\Social\Service\QueueService; +use OCP\AppFramework\Controller; +use OCP\IRequest; + + +/** + * Class QueueController + * + * @package OCA\Social\Controller + */ +class QueueController extends Controller { + + + use TAsync; + + /** @var QueueService */ + private $queueService; + + /** @var ActivityService */ + private $activityService; + + /** @var MiscService */ + private $miscService; + + + /** + * QueueController constructor. + * + * @param IRequest $request + * @param QueueService $queueService + * @param ActivityService $activityService + * @param MiscService $miscService + */ + public function __construct( + IRequest $request, QueueService $queueService, ActivityService $activityService, + MiscService $miscService + ) { + parent::__construct(Application::APP_NAME, $request); + + $this->queueService = $queueService; + $this->activityService = $activityService; + $this->miscService = $miscService; + } + + + /** + * // TODO: Delete the NoCSRF check + * + * @PublicPage + * @NoCSRFRequired + * @NoAdminRequired + * @NoSubAdminRequired + * + * @param string $token + */ + public function asyncWithToken(string $token) { + $this->async(); + + $requests = $this->queueService->getRequestFromToken($token, RequestQueue::STATUS_STANDBY); + foreach ($requests as $request) { + try { + $this->activityService->manageRequest($request); + } catch (ActorDoesNotExistException $e) { + } catch (RequestException $e) { + } catch (SocialAppConfigException $e) { + } + } + + // or it will feed the logs. + exit(); + } + +} + diff --git a/lib/Db/CoreRequestBuilder.php b/lib/Db/CoreRequestBuilder.php index 9224600d..c22dde13 100644 --- a/lib/Db/CoreRequestBuilder.php +++ b/lib/Db/CoreRequestBuilder.php @@ -53,6 +53,8 @@ use OCP\IDBConnection; class CoreRequestBuilder { + const TABLE_REQUEST_QUEUE = 'social_request_queue'; + const TABLE_SERVER_ACTORS = 'social_server_actors'; const TABLE_SERVER_NOTES = 'social_server_notes'; const TABLE_SERVER_FOLLOWS = 'social_server_follows'; @@ -158,6 +160,17 @@ class CoreRequestBuilder { } + /** + * Limit the request to the token + * + * @param IQueryBuilder $qb + * @param string $token + */ + protected function limitToToken(IQueryBuilder &$qb, string $token) { + $this->limitToDBField($qb, 'token', $token); + } + + /** * Limit the request to the ActorId * @@ -262,10 +275,10 @@ class CoreRequestBuilder { * Limit the request to the status * * @param IQueryBuilder $qb - * @param string $status + * @param int $status */ - protected function limitToStatus(IQueryBuilder &$qb, $status) { - $this->limitToDBField($qb, 'status', $status); + protected function limitToStatus(IQueryBuilder &$qb, int $status) { + $this->limitToDBFieldInt($qb, 'status', $status); } diff --git a/lib/Db/NotesRequestBuilder.php b/lib/Db/NotesRequestBuilder.php index 16c5999d..90ff24e1 100644 --- a/lib/Db/NotesRequestBuilder.php +++ b/lib/Db/NotesRequestBuilder.php @@ -177,7 +177,9 @@ class NotesRequestBuilder extends CoreRequestBuilder { $instances = json_decode($data['instances'], true); if (is_array($instances)) { foreach ($instances as $instance) { - $note->addInstancePath(new InstancePath($instance['uri'], $instance['type'])); + $instancePath = new InstancePath(); + $instancePath->import($instance); + $note->addInstancePath($instancePath); } } diff --git a/lib/Db/RequestQueueRequest.php b/lib/Db/RequestQueueRequest.php new file mode 100644 index 00000000..33ca298c --- /dev/null +++ b/lib/Db/RequestQueueRequest.php @@ -0,0 +1,199 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Db; + + +use DateTime; +use Exception; +use OCA\Social\Exceptions\QueueStatusException; +use OCA\Social\Model\RequestQueue; +use OCA\Social\Service\QueueService; +use OCP\DB\QueryBuilder\IQueryBuilder; + + +/** + * Class RequestQueueRequest + * + * @package OCA\Social\Db + */ +class RequestQueueRequest extends RequestQueueRequestBuilder { + + + /** + * create a new Queue in the database. + * + * @param RequestQueue[] $queues + * + * @throws Exception + */ + public function multiple(array $queues) { + foreach ($queues as $queue) { + $this->create($queue); +// $qb->values( +// [ +// 'source' => $qb->createNamedParameter($queue->getSource()), +// 'activity' => $qb->createNamedParameter($queue->getActivity()), +// 'instance' => $qb->createNamedParameter( +// json_encode($queue->getInstance(), JSON_UNESCAPED_SLASHES) +// ), +// 'status' => $qb->createNamedParameter($queue->getStatus()), +// 'tries' => $qb->createNamedParameter($queue->getTries()), +// 'last' => $qb->createNamedParameter($queue->getLast()) +// ] +// ); + } + } + + + /** + * create a new Queue in the database. + * + * @param RequestQueue $queue + * + * @throws Exception + */ + public function create(RequestQueue $queue) { + $qb = $this->getQueueInsertSql(); + $qb->setValue('token', $qb->createNamedParameter($queue->getToken())) + ->setValue('author', $qb->createNamedParameter($queue->getAuthor())) + ->setValue('activity', $qb->createNamedParameter($queue->getActivity())) + ->setValue( + 'instance', $qb->createNamedParameter( + json_encode($queue->getInstance(), JSON_UNESCAPED_SLASHES) + ) + ) + ->setValue('priority', $qb->createNamedParameter($queue->getPriority())) + ->setValue('status', $qb->createNamedParameter($queue->getStatus())) + ->setValue('tries', $qb->createNamedParameter($queue->getTries())) + ->setValue('last', $qb->createNamedParameter($queue->getLast())); + $qb->execute(); + } + + + /** + * return Actor from database based on the username + * + * @param string $token + * @param int $status + * + * @return RequestQueue[] + */ + public function getFromToken(string $token, int $status = -1): array { + $qb = $this->getQueueSelectSql(); + $this->limitToToken($qb, $token); + + if ($status > -1) { + $this->limitToStatus($qb, $status); + } + + $this->orderByPriority($qb); + + $requests = []; + $cursor = $qb->execute(); + while ($data = $cursor->fetch()) { + $requests[] = $this->parseQueueSelectSql($data); + } + $cursor->closeCursor(); + + return $requests; + } + + + /** + * @param RequestQueue $queue + * + * @throws QueueStatusException + */ + public function setAsRunning(RequestQueue &$queue) { + $qb = $this->getQueueUpdateSql(); + $qb->set('status', $qb->createNamedParameter(RequestQueue::STATUS_RUNNING)) + ->set( + 'last', + $qb->createNamedParameter(new DateTime('now'), IQueryBuilder::PARAM_DATE) + ); + $this->limitToId($qb, $queue->getId()); + $this->limitToStatus($qb, RequestQueue::STATUS_STANDBY); + + $count = $qb->execute(); + + if ($count === 0) { + throw new QueueStatusException(); + } + + $queue->setStatus(RequestQueue::STATUS_RUNNING); + } + + + /** + * @param RequestQueue $queue + * + * @throws QueueStatusException + */ + public function setAsSuccess(RequestQueue &$queue) { + $qb = $this->getQueueUpdateSql(); + $qb->set('status', $qb->createNamedParameter(RequestQueue::STATUS_SUCCESS)); + $this->limitToId($qb, $queue->getId()); + $this->limitToStatus($qb, RequestQueue::STATUS_RUNNING); + + $count = $qb->execute(); + + if ($count === 0) { + throw new QueueStatusException(); + } + + $queue->setStatus(RequestQueue::STATUS_SUCCESS); + } + + + /** + * @param RequestQueue $queue >ll + * + * @throws QueueStatusException + */ + public function setAsFailure(RequestQueue &$queue) { + $qb = $this->getQueueUpdateSql(); + $qb->set('status', $qb->createNamedParameter(RequestQueue::STATUS_STANDBY)); + // TODO - increment tries++ +// ->set('tries', 'tries+1'); + $this->limitToId($qb, $queue->getId()); + $this->limitToStatus($qb, RequestQueue::STATUS_RUNNING); + + $count = $qb->execute(); + + if ($count === 0) { + throw new QueueStatusException(); + } + + $queue->setStatus(RequestQueue::STATUS_SUCCESS); + } + +} + diff --git a/lib/Db/RequestQueueRequestBuilder.php b/lib/Db/RequestQueueRequestBuilder.php new file mode 100644 index 00000000..41060f80 --- /dev/null +++ b/lib/Db/RequestQueueRequestBuilder.php @@ -0,0 +1,116 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OCA\Social\Db; + + +use daita\MySmallPhpTools\Traits\TArrayTools; +use OCA\Social\Model\RequestQueue; +use OCP\DB\QueryBuilder\IQueryBuilder; + +class RequestQueueRequestBuilder extends CoreRequestBuilder { + + + use TArrayTools; + + + /** + * Base of the Sql Insert request + * + * @return IQueryBuilder + */ + protected function getQueueInsertSql(): IQueryBuilder { + $qb = $this->dbConnection->getQueryBuilder(); + $qb->insert(self::TABLE_REQUEST_QUEUE); + + return $qb; + } + + + /** + * Base of the Sql Update request + * + * @return IQueryBuilder + */ + protected function getQueueUpdateSql(): IQueryBuilder { + $qb = $this->dbConnection->getQueryBuilder(); + $qb->update(self::TABLE_REQUEST_QUEUE); + + return $qb; + } + + + /** + * Base of the Sql Select request for Shares + * + * @return IQueryBuilder + */ + protected function getQueueSelectSql(): IQueryBuilder { + $qb = $this->dbConnection->getQueryBuilder(); + + /** @noinspection PhpMethodParametersCountMismatchInspection */ + $qb->select( + 'rq.id', 'rq.token', 'rq.author', 'rq.activity', 'rq.instance', 'rq.priority', + 'rq.status', 'rq.tries', 'rq.last' + ) + ->from(self::TABLE_REQUEST_QUEUE, 'rq'); + + $this->defaultSelectAlias = 'rq'; + + return $qb; + } + + + /** + * Base of the Sql Delete request + * + * @return IQueryBuilder + */ + protected function getQueueDeleteSql(): IQueryBuilder { + $qb = $this->dbConnection->getQueryBuilder(); + $qb->delete(self::TABLE_REQUEST_QUEUE); + + return $qb; + } + + + /** + * @param array $data + * + * @return RequestQueue + */ + protected function parseQueueSelectSql($data): RequestQueue { + $queue = new RequestQueue(); + $queue->importFromDatabase($data); + + return $queue; + } + +} + diff --git a/lib/Exceptions/EmptyQueueException.php b/lib/Exceptions/EmptyQueueException.php new file mode 100644 index 00000000..1170b586 --- /dev/null +++ b/lib/Exceptions/EmptyQueueException.php @@ -0,0 +1,8 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Model; + + +use daita\MySmallPhpTools\Traits\TArrayTools; +use JsonSerializable; + + +/** + * Class RequestQueue + * + * @package OCA\Social\Model + */ +class RequestQueue implements JsonSerializable { + + + use TArrayTools; + + + const STATUS_STANDBY = 0; + const STATUS_RUNNING = 1; + const STATUS_SUCCESS = 9; + + + /** @var integer */ + private $id = 0; + + /** @var string */ + private $token = ''; + + /** @var string */ + private $author = ''; + + /** @var string */ + private $activity = ''; + + /** @var InstancePath */ + private $instance; + + /** @var int */ + private $priority = 0; + + /** @var int */ + private $status = 0; + + /** @var int */ + private $tries = 0; + + /** @var int */ + private $last = 0; + + + /** + * RequestQueue constructor. + * + * @param string $activity + * @param InstancePath $instance + * @param string $author + */ + public function __construct(string $activity = '', $instance = null, string $author = '') { + $this->setActivity($activity); + if ($instance instanceof InstancePath) { + $this->setInstance($instance); + } + + $this->setAuthor($author); + $this->resetToken(); + } + + + /** + * @return int + */ + public function getId(): int { + return $this->id; + } + + /** + * @param int $id + * + * @return RequestQueue + */ + public function setId(int $id): RequestQueue { + $this->id = $id; + + return $this; + } + + + /** + * @return string + */ + public function getToken(): string { + return $this->token; + } + + /** + * @param string $token + * + * @return RequestQueue + */ + public function setToken(string $token): RequestQueue { + $this->token = $token; + + return $this; + } + + + /** + * @return string + */ + public function getAuthor(): string { + return $this->author; + } + + /** + * @param string $author + * + * @return RequestQueue + */ + public function setAuthor(string $author): RequestQueue { + $this->author = $author; + + return $this; + } + + + /** + * @return RequestQueue + */ + public function resetToken(): RequestQueue { + $uuid = sprintf( + '%04x%04x-%04x-%04x-%04x-%04x%04x%04x', mt_rand(0, 0xffff), mt_rand(0, 0xffff), + mt_rand(0, 0xffff), mt_rand(0, 0xfff) | 0x4000, mt_rand(0, 0x3fff) | 0x8000, + mt_rand(0, 0xffff), mt_rand(0, 0xffff), mt_rand(0, 0xffff) + ); + + $this->setToken($uuid); + + return $this; + } + + /** + * @return string + */ + public function getActivity(): string { + return $this->activity; + } + + /** + * @param string $activity + * + * @return RequestQueue + */ + public function setActivity(string $activity): RequestQueue { + $this->activity = $activity; + + return $this; + } + + + /** + * @return InstancePath + */ + public function getInstance(): InstancePath { + return $this->instance; + } + + /** + * @param InstancePath $instance + * + * @return RequestQueue + */ + public function setInstance(InstancePath $instance): RequestQueue { + $this->setPriority($instance->getPriority()); + $this->instance = $instance; + + return $this; + } + + + /** + * @return int + */ + public function getPriority(): int { + return $this->priority; + } + + /** + * @param int $priority + * + * @return RequestQueue + */ + public function setPriority(int $priority): RequestQueue { + $this->priority = $priority; + + return $this; + } + + + /** + * @return int + */ + public function getStatus(): int { + return $this->status; + } + + /** + * @param int $status + * + * @return RequestQueue + */ + public function setStatus(int $status): RequestQueue { + $this->status = $status; + + return $this; + } + + + /** + * @return int + */ + public function getTries(): int { + return $this->tries; + } + + /** + * @param int $tries + * + * @return RequestQueue + */ + public function setTries(int $tries): RequestQueue { + $this->tries = $tries; + + return $this; + } + + + /** + * @return int + */ + public function getLast(): int { + return $this->last; + } + + /** + * @param int $last + * + * @return RequestQueue + */ + public function setLast(int $last): RequestQueue { + $this->last = $last; + + return $this; + } + + + /** + * @param array $data + */ + public function importFromDatabase(array $data) { + $instance = new InstancePath(); + $instance->import(json_decode($this->get('instance', $data, '{}'), true)); + + $this->setId($this->getInt('id', $data, 0)); + $this->setToken($this->get('token', $data, '')); + $this->setAuthor($this->get('author', $data, '')); + $this->setInstance($instance); + $this->setPriority($this->getInt('priority', $data, 0)); + $this->setActivity($this->get('activity', $data, '')); + $this->setStatus($this->getInt('status', $data, 0)); + $this->setTries($this->getInt('tries', $data, 0)); + $this->setLast($this->getInt('last', $data, 0)); + } + + + /** + * @return array + */ + public function jsonSerialize(): array { + return [ + 'id' => $this->getId(), + 'token' => $this->getToken(), + 'author' => $this->getAuthor(), + 'instance' => $this->getInstance(), + 'priority' => $this->getPriority(), + 'status' => $this->getStatus(), + 'tries' => $this->getTries(), + 'last' => $this->getLast() + ]; + } + +} + diff --git a/lib/Service/ActivityService.php b/lib/Service/ActivityService.php index ac87e854..03cd437f 100644 --- a/lib/Service/ActivityService.php +++ b/lib/Service/ActivityService.php @@ -39,16 +39,21 @@ use OCA\Social\Db\ActorsRequest; use OCA\Social\Db\FollowsRequest; use OCA\Social\Db\NotesRequest; use OCA\Social\Exceptions\ActorDoesNotExistException; +use OCA\Social\Exceptions\EmptyQueueException; use OCA\Social\Exceptions\InvalidResourceException; +use OCA\Social\Exceptions\NoHighPriorityRequestException; +use OCA\Social\Exceptions\QueueStatusException; use OCA\Social\Exceptions\RequestException; use OCA\Social\Exceptions\SignatureException; use OCA\Social\Exceptions\SocialAppConfigException; +use OCA\Social\Exceptions\UrlCloudException; use OCA\Social\Model\ActivityPub\ACore; use OCA\Social\Model\ActivityPub\Activity\Create; use OCA\Social\Model\ActivityPub\Activity\Delete; use OCA\Social\Model\ActivityPub\Activity\Tombstone; use OCA\Social\Model\ActivityPub\Person; use OCA\Social\Model\InstancePath; +use OCA\Social\Model\RequestQueue; use OCA\Social\Service\ActivityPub\PersonService; use OCP\IRequest; @@ -68,6 +73,7 @@ class ActivityService { const DATE_FORMAT = 'D, d M Y H:i:s T'; const DATE_DELAY = 30; + /** @var ActorsRequest */ private $actorsRequest; @@ -77,6 +83,9 @@ class ActivityService { /** @var FollowsRequest */ private $followsRequest; + /** @var QueueService */ + private $queueService; + /** @var ActorService */ private $actorService; @@ -99,6 +108,7 @@ class ActivityService { /** * ActivityService constructor. * + * @param QueueService $queueService * @param ActorsRequest $actorsRequest * @param NotesRequest $notesRequest * @param FollowsRequest $followsRequest @@ -111,15 +121,15 @@ class ActivityService { */ public function __construct( ActorsRequest $actorsRequest, NotesRequest $notesRequest, FollowsRequest $followsRequest, - CurlService $curlService, ActorService $actorService, + QueueService $queueService, CurlService $curlService, ActorService $actorService, PersonService $personService, InstanceService $instanceService, - ConfigService $configService, - MiscService $miscService + ConfigService $configService, MiscService $miscService ) { - $this->curlService = $curlService; $this->actorsRequest = $actorsRequest; $this->notesRequest = $notesRequest; $this->followsRequest = $followsRequest; + $this->queueService = $queueService; + $this->curlService = $curlService; $this->actorService = $actorService; $this->personService = $personService; $this->instanceService = $instanceService; @@ -133,13 +143,11 @@ class ActivityService { * @param ACore $item * @param ACore $activity * - * @return array - * @throws RequestException - * @throws SocialAppConfigException - * @throws ActorDoesNotExistException + * @return string + * @throws Exception */ public function createActivity(Person $actor, ACore $item, ACore &$activity = null - ): array { + ): string { $activity = new Create(); $item->setParent($activity); @@ -158,20 +166,17 @@ class ActivityService { $activity->setActor($actor); - $result = $this->request($activity); - - return $result; + return $this->request($activity); } /** * @param ACore $item * - * @throws ActorDoesNotExistException - * @throws RequestException - * @throws SocialAppConfigException + * @return string + * @throws Exception */ - public function deleteActivity(ACore $item) { + public function deleteActivity(ACore $item): string { $delete = new Delete(); $delete->setId($item->getId() . '#delete'); $delete->setActorId($item->getActorId()); @@ -182,7 +187,7 @@ class ActivityService { $delete->setObject($tombstone); $delete->addInstancePaths($item->getInstancePaths()); - $this->request($delete); + return $this->request($delete); } @@ -217,61 +222,91 @@ class ActivityService { /** * @param ACore $activity * + * @return string + * @throws Exception + */ + public function request(ACore $activity): string { + $this->setupCore($activity); + + $author = $this->getAuthorFromItem($activity); + $instancePaths = $this->generateInstancePaths($activity); + $token = $this->queueService->generateRequestQueue($instancePaths, $activity, $author); + + try { + $directRequest = $this->queueService->getPriorityRequest($token); + $this->manageRequest($directRequest); + } catch (NoHighPriorityRequestException $e) { + } catch (EmptyQueueException $e) { + return ''; + } + + $this->curlService->asyncWithToken($token); + + return $token; + } + + + /** + * @param RequestQueue $queue + * + * @throws ActorDoesNotExistException * @throws RequestException * @throws SocialAppConfigException - * @throws ActorDoesNotExistException */ - public function manageRequest(ACore $activity) { - $result = $this->request($activity); - $this->miscService->log('Activity: ' . json_encode($activity)); - $this->miscService->log('Result: ' . json_encode($result)); + public function manageRequest(RequestQueue $queue) { + + try { + $this->queueService->initRequest($queue); + } catch (QueueStatusException $e) { + return; + } + + $result = $this->generateRequest( + $queue->getInstance(), $queue->getActivity(), $queue->getAuthor() + ); + + try { + if ($this->getint('_code', $result, 500) === 202) { + $this->queueService->endRequest($queue, true); + } else { + $this->queueService->endRequest($queue, false); + } + } catch (QueueStatusException $e) { + } } /** * @param ACore $activity * - * - * @return array - * @throws RequestException - * @throws SocialAppConfigException - * @throws ActorDoesNotExistException + * @return InstancePath[] */ - public function request(ACore &$activity) { - $this->setupCore($activity); -// $hosts = $this->instanceService->getInstancesFromActivity($activity); - - $result = []; -// foreach ($hosts as $host) { -// foreach ($host->getInstancePaths() as $path) { + private function generateInstancePaths(ACore $activity): array { + $instancePaths = []; foreach ($activity->getInstancePaths() as $instancePath) { if ($instancePath->getType() === InstancePath::TYPE_FOLLOWERS) { - $result = array_merge($result, $this->requestToFollowers($activity, $instancePath)); + $instancePaths = array_merge( + $instancePaths, $this->generateInstancePathsFollowers($instancePath) + ); } else { - $result[] = $this->generateRequest($instancePath, $activity); + $instancePaths[] = $instancePath; } } -// } - - return $result; + return $instancePaths; } /** - * @param ACore $activity * @param InstancePath $instancePath * - * @return array - * @throws ActorDoesNotExistException - * @throws RequestException - * @throws SocialAppConfigException + * @return InstancePath[] */ - private function requestToFollowers(ACore &$activity, InstancePath $instancePath): array { - $result = []; + private function generateInstancePathsFollowers(InstancePath $instancePath): array { + $follows = $this->followsRequest->getByFollowId($instancePath->getUri()); $sharedInboxes = []; - $follows = $this->followsRequest->getByFollowId($instancePath->getUri()); + $instancePaths = []; foreach ($follows as $follow) { if (!$follow->gotActor()) { // TODO - check if cache can be empty at this point ? @@ -285,28 +320,32 @@ class ActivityService { } $sharedInboxes[] = $sharedInbox; - $result[] = $this->generateRequest( - new InstancePath($sharedInbox, InstancePath::TYPE_GLOBAL), $activity + $instancePaths[] = new InstancePath( + $sharedInbox, InstancePath::TYPE_GLOBAL, $instancePath->getPriority() ); +// $result[] = $this->generateRequest( +// new InstancePath($sharedInbox, InstancePath::TYPE_GLOBAL), $activity +// ); } - return $result; + return $instancePaths; } /** * @param InstancePath $path - * @param ACore $activity + * @param string $activity + * @param string $author * * @return Request[] * @throws ActorDoesNotExistException * @throws RequestException * @throws SocialAppConfigException */ - public function generateRequest(InstancePath $path, ACore $activity): array { - $document = json_encode($activity); + public function generateRequest(InstancePath $path, string $activity, string $author): array { +// $document = json_encode($activity); $date = gmdate(self::DATE_FORMAT); - $localActor = $this->getActorFromItem($activity); + $localActor = $this->getActorFromAuthor($author); $localActorLink = $this->configService->getUrlSocial() . '@' . $localActor->getPreferredUsername(); @@ -332,7 +371,7 @@ class ActivityService { $request->addHeader('Date: ' . $date); $request->addHeader('Signature: ' . $header); - $request->setDataJson($document); + $request->setDataJson($activity); $request->setAddress($path->getAddress()); return $this->curlService->request($request); @@ -362,18 +401,27 @@ class ActivityService { /** * @param ACore $activity * - * @return Person - * @throws SocialAppConfigException - * @throws ActorDoesNotExistException + * @return string */ - private function getActorFromItem(Acore $activity): Person { + private function getAuthorFromItem(Acore $activity): string { if ($activity->gotActor()) { - return $activity->getActor(); + return $activity->getActor() + ->getId(); } - $actorId = $activity->getActorId(); + return $activity->getActorId(); + } - return $this->actorService->getActorById($actorId); + + /** + * @param string $author + * + * @return Person + * @throws ActorDoesNotExistException + * @throws SocialAppConfigException + */ + private function getActorFromAuthor(string $author): Person { + return $this->actorService->getActorById($author); } @@ -456,8 +504,10 @@ class ActivityService { * @param $keyId * * @return string - * @throws RequestException * @throws InvalidResourceException + * @throws RequestException + * @throws SocialAppConfigException + * @throws UrlCloudException */ private function retrieveKey($keyId): string { $actor = $this->personService->getFromId($keyId); @@ -467,6 +517,8 @@ class ActivityService { /** + * @deprecated !??? - do we need this !? + * * @param ACore $activity */ private function setupCore(ACore $activity) { diff --git a/lib/Service/ConfigService.php b/lib/Service/ConfigService.php index 34d4bb16..968859dd 100644 --- a/lib/Service/ConfigService.php +++ b/lib/Service/ConfigService.php @@ -52,11 +52,18 @@ class ConfigService { const SOCIAL_ADDRESS = 'address'; + const SOCIAL_SERVICE = 'service'; const SOCIAL_MAX_SIZE = 'max_size'; + const BACKGROUND_CRON = 1; + const BACKGROUND_ASYNC = 2; + const BACKGROUND_SERVICE = 3; + const BACKGROUND_FULL_SERVICE = 4; + /** @var array */ public $defaults = [ self::SOCIAL_ADDRESS => '', + self::SOCIAL_SERVICE => 1, self::SOCIAL_MAX_SIZE => 25 ]; diff --git a/lib/Service/CurlService.php b/lib/Service/CurlService.php index a2cdef98..3a420560 100644 --- a/lib/Service/CurlService.php +++ b/lib/Service/CurlService.php @@ -31,11 +31,24 @@ namespace OCA\Social\Service; use daita\MySmallPhpTools\Model\Request; +use daita\MySmallPhpTools\Traits\TArrayTools; +use daita\MySmallPhpTools\Traits\TPathTools; use OCA\Social\Exceptions\RequestException; +use OCA\Social\Exceptions\SocialAppConfigException; class CurlService { + use TArrayTools; + use TPathTools; + + + const ASYNC_TOKEN = '/async/token/{token}'; + + + /** @var ConfigService */ + private $configService; + /** @var MiscService */ private $miscService; @@ -43,9 +56,11 @@ class CurlService { /** * CurlService constructor. * + * @param ConfigService $configService * @param MiscService $miscService */ - public function __construct(MiscService $miscService) { + public function __construct(ConfigService $configService, MiscService $miscService) { + $this->configService = $configService; $this->miscService = $miscService; } @@ -89,6 +104,29 @@ class CurlService { } + /** + * @param string $token + * + * @throws SocialAppConfigException + */ + public function asyncWithToken(string $token) { + $address = $this->configService->getUrlSocial(); + $parse = parse_url($address); + $host = $this->get('host', $parse, ''); + $path = $this->withEndSlash($this->get('path', $parse, '')) . $this->withoutBeginSlash( + self::ASYNC_TOKEN + ); + $path = str_replace('{token}', $token, $path); + + $request = new Request($path, Request::TYPE_POST); + $request->setAddress($host); + try { + $this->request($request); + } catch (RequestException $e) { + } + } + + /** * @param Request $request * diff --git a/lib/Service/QueueService.php b/lib/Service/QueueService.php new file mode 100644 index 00000000..6fc3b8f3 --- /dev/null +++ b/lib/Service/QueueService.php @@ -0,0 +1,191 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Service; + + +use daita\MySmallPhpTools\Traits\TArrayTools; +use Exception; +use OCA\Social\Db\RequestQueueRequest; +use OCA\Social\Exceptions\EmptyQueueException; +use OCA\Social\Exceptions\NoHighPriorityRequestException; +use OCA\Social\Exceptions\QueueStatusException; +use OCA\Social\Model\ActivityPub\ACore; +use OCA\Social\Model\InstancePath; +use OCA\Social\Model\RequestQueue; + + +class QueueService { + + + use TArrayTools; + + + /** @var RequestQueueRequest */ + private $requestQueueRequest; + + /** @var ConfigService */ + private $configService; + + /** @var MiscService */ + private $miscService; + + + /** + * QueueService constructor. + * + * @param RequestQueueRequest $requestQueueRequest + * @param ConfigService $configService + * @param MiscService $miscService + */ + public function __construct( + RequestQueueRequest $requestQueueRequest, ConfigService $configService, + MiscService $miscService + ) { + $this->requestQueueRequest = $requestQueueRequest; + $this->configService = $configService; + $this->miscService = $miscService; + } + + + /** + * @param array $instancePaths + * @param ACore $item + * + * @return string + * @throws Exception + */ + public function generateRequestQueue(array $instancePaths, ACore $item, string $author + ): string { + $activity = json_encode($item, JSON_UNESCAPED_SLASHES); + + $token = ''; + $requests = []; + foreach ($instancePaths as $instancePath) { + $request = new RequestQueue($activity, $instancePath, $author); + if ($token === '') { + $token = $request->getToken(); + } else { + $request->setToken($token); + } + + $requests[] = $request; + } + + $this->requestQueueRequest->multiple($requests); + + return $token; + } + + + /** + * @param string $token + * + * @return RequestQueue + * @throws EmptyQueueException + * @throws NoHighPriorityRequestException + */ + public function getPriorityRequest(string $token): RequestQueue { + $requests = $this->requestQueueRequest->getFromToken($token); + + if (sizeof($requests) === 0) { + throw new EmptyQueueException(); + } + + $request = $requests[0]; + switch ($request->getPriority()) { + + case InstancePath::PRIORITY_TOP: + return $request; + + case InstancePath::PRIORITY_HIGH: + if (sizeof($requests) === 1) { + return $request; + } + + $next = $requests[1]; + if ($next->getStatus() < InstancePath::PRIORITY_HIGH) { + return $request; + } + break; + + case InstancePath::PRIORITY_MEDIUM: + if (sizeof($requests) === 1) { + return $request; + } + break; + } + + throw new NoHighPriorityRequestException(); + } + + + /** + * @param string $token + * @param int $status + * + * @return array + */ + public function getRequestFromToken(string $token, int $status = -1): array { + if ($token === '') { + return []; + } + + return $this->requestQueueRequest->getFromToken($token, $status); + } + + + /** + * @param RequestQueue $queue + * + * @throws QueueStatusException + */ + public function initRequest(RequestQueue $queue) { + $this->requestQueueRequest->setAsRunning($queue); + } + + + /** + * @param RequestQueue $queue + * @param bool $success + * + * @throws QueueStatusException + */ + public function endRequest(RequestQueue $queue, bool $success) { + if ($success === true) { + $this->requestQueueRequest->setAsSuccess($queue); + } else { + $this->requestQueueRequest->setAsFailure($queue); + } + } + + +} +