diff --git a/appinfo/info.xml b/appinfo/info.xml index add173bd..8e1f2608 100644 --- a/appinfo/info.xml +++ b/appinfo/info.xml @@ -33,6 +33,7 @@ OCA\Social\Command\CacheRefresh OCA\Social\Command\QueueStatus + OCA\Social\Command\QueueProcess OCA\Social\Command\NoteCreate diff --git a/composer.lock b/composer.lock index b0c6dab8..1e26539f 100644 --- a/composer.lock +++ b/composer.lock @@ -12,12 +12,12 @@ "source": { "type": "git", "url": "https://github.com/daita/my-small-php-tools.git", - "reference": "12090dc3ae29d2eb49d5274ca3f6ebfb76ce5997" + "reference": "56cff24fdde14d21e3903428c5ee629c839866af" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/daita/my-small-php-tools/zipball/12090dc3ae29d2eb49d5274ca3f6ebfb76ce5997", - "reference": "12090dc3ae29d2eb49d5274ca3f6ebfb76ce5997", + "url": "https://api.github.com/repos/daita/my-small-php-tools/zipball/56cff24fdde14d21e3903428c5ee629c839866af", + "reference": "56cff24fdde14d21e3903428c5ee629c839866af", "shasum": "" }, "require": { @@ -40,7 +40,7 @@ } ], "description": "My small PHP Tools", - "time": "2018-11-28T10:47:43+00:00" + "time": "2018-11-28T13:07:27+00:00" } ], "packages-dev": [], diff --git a/lib/Command/QueueProcess.php b/lib/Command/QueueProcess.php new file mode 100644 index 00000000..d7aeb1a9 --- /dev/null +++ b/lib/Command/QueueProcess.php @@ -0,0 +1,126 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Command; + + +use Exception; +use OC\Core\Command\Base; +use OCA\Social\Exceptions\ActorDoesNotExistException; +use OCA\Social\Exceptions\RequestException; +use OCA\Social\Exceptions\SocialAppConfigException; +use OCA\Social\Service\ActivityService; +use OCA\Social\Service\ConfigService; +use OCA\Social\Service\MiscService; +use OCA\Social\Service\QueueService; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Output\OutputInterface; + + +class QueueProcess extends Base { + + + /** @var ActivityService */ + private $activityService; + + /** @var QueueService */ + private $queueService; + + /** @var ConfigService */ + private $configService; + + /** @var MiscService */ + private $miscService; + + + /** + * NoteCreate constructor. + * + * @param ActivityService $activityService + * @param QueueService $queueService + * @param ConfigService $configService + * @param MiscService $miscService + */ + public function __construct( + ActivityService $activityService, QueueService $queueService, ConfigService $configService, + MiscService $miscService + ) { + parent::__construct(); + + $this->activityService = $activityService; + $this->queueService = $queueService; + $this->configService = $configService; + $this->miscService = $miscService; + } + + + /** + * + */ + protected function configure() { + parent::configure(); + $this->setName('social:queue:process') + ->setDescription('Process the request queue'); + } + + + /** + * @param InputInterface $input + * @param OutputInterface $output + */ + protected function execute(InputInterface $input, OutputInterface $output) { + + $requests = $this->queueService->getRequestStandby($total = 0); + + $output->writeLn('found a total of ' . $total . ' requests in the queue'); + if ($total === 0) { + return; + } + + $output->writeLn(sizeof($requests) . ' are processable at this time'); + if (sizeof($requests) === 0) { + return; + } + + foreach ($requests as $request) { + $output->write('.'); + try { + $this->activityService->manageRequest($request); + } catch (ActorDoesNotExistException $e) { + } catch (RequestException $e) { + } catch (SocialAppConfigException $e) { + } + } + + $output->writeLn('done'); + } + +} + diff --git a/lib/Cron/Queue.php b/lib/Cron/Queue.php new file mode 100644 index 00000000..d725b113 --- /dev/null +++ b/lib/Cron/Queue.php @@ -0,0 +1,109 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Cron; + + +use Exception; +use OC\BackgroundJob\TimedJob; +use OCA\Social\AppInfo\Application; +use OCA\Social\Exceptions\ActorDoesNotExistException; +use OCA\Social\Exceptions\RequestException; +use OCA\Social\Exceptions\SocialAppConfigException; +use OCA\Social\Service\ActivityPub\DocumentService; +use OCA\Social\Service\ActivityPub\PersonService; +use OCA\Social\Service\ActivityService; +use OCA\Social\Service\ActorService; +use OCA\Social\Service\CacheService; +use OCA\Social\Service\ConfigService; +use OCA\Social\Service\MiscService; +use OCA\Social\Service\QueueService; +use OCP\AppFramework\QueryException; + + +/** + * Class Queue + * + * @package OCA\Social\Cron + */ +class Queue extends TimedJob { + + + /** @var ActivityService */ + private $activityService; + + /** @var QueueService */ + private $queueService; + + /** @var MiscService */ + private $miscService; + + + /** + * Cache constructor. + */ + public function __construct() { + $this->setInterval(12 * 60); // 12 minutes + } + + + /** + * @param mixed $argument + * + * @throws QueryException + */ + protected function run($argument) { + $app = new Application(); + $c = $app->getContainer(); + + $this->queueService = $c->query(QueueService::class); + $this->activityService = $c->query(ActivityService::class); + $this->miscService = $c->query(MiscService::class); + + $this->manageQueue(); + } + + + private function manageQueue() { + $requests = $this->queueService->getRequestStandby($total = 0); + + foreach ($requests as $request) { + try { + $this->activityService->manageRequest($request); + } catch (ActorDoesNotExistException $e) { + } catch (RequestException $e) { + } catch (SocialAppConfigException $e) { + } + } + + } + +} + diff --git a/lib/Db/RequestQueueRequest.php b/lib/Db/RequestQueueRequest.php index 045e9c16..d358051e 100644 --- a/lib/Db/RequestQueueRequest.php +++ b/lib/Db/RequestQueueRequest.php @@ -57,18 +57,6 @@ class RequestQueueRequest extends RequestQueueRequestBuilder { public function multiple(array $queues) { foreach ($queues as $queue) { $this->create($queue); -// $qb->values( -// [ -// 'source' => $qb->createNamedParameter($queue->getSource()), -// 'activity' => $qb->createNamedParameter($queue->getActivity()), -// 'instance' => $qb->createNamedParameter( -// json_encode($queue->getInstance(), JSON_UNESCAPED_SLASHES) -// ), -// 'status' => $qb->createNamedParameter($queue->getStatus()), -// 'tries' => $qb->createNamedParameter($queue->getTries()), -// 'last' => $qb->createNamedParameter($queue->getLast()) -// ] -// ); } } @@ -92,14 +80,34 @@ class RequestQueueRequest extends RequestQueueRequestBuilder { ) ->setValue('priority', $qb->createNamedParameter($queue->getPriority())) ->setValue('status', $qb->createNamedParameter($queue->getStatus())) - ->setValue('tries', $qb->createNamedParameter($queue->getTries())) - ->setValue('last', $qb->createNamedParameter($queue->getLast())); + ->setValue('tries', $qb->createNamedParameter($queue->getTries())); $qb->execute(); } /** - * return Actor from database based on the username + * return Queue from database based on the status != 9 + * + * @return RequestQueue[] + */ + public function getStandby(): array { + $qb = $this->getQueueSelectSql(); + $this->limitToStatus($qb, RequestQueue::STATUS_STANDBY); + $this->orderByPriority($qb, 'desc'); + + $requests = []; + $cursor = $qb->execute(); + while ($data = $cursor->fetch()) { + $requests[] = $this->parseQueueSelectSql($data); + } + $cursor->closeCursor(); + + return $requests; + } + + + /** + * return Queue from database based on the token * * @param string $token * @param int $status @@ -197,5 +205,13 @@ class RequestQueueRequest extends RequestQueueRequestBuilder { $queue->setStatus(RequestQueue::STATUS_SUCCESS); } + + public function delete(RequestQueue $queue) { + $qb = $this->getQueueDeleteSql(); + $this->limitToId($qb, $queue->getId()); + + $qb->execute(); + } + } diff --git a/lib/Model/RequestQueue.php b/lib/Model/RequestQueue.php index 62f98155..d96253f3 100644 --- a/lib/Model/RequestQueue.php +++ b/lib/Model/RequestQueue.php @@ -32,6 +32,7 @@ namespace OCA\Social\Model; use daita\MySmallPhpTools\Traits\TArrayTools; +use DateTime; use JsonSerializable; @@ -299,9 +300,15 @@ class RequestQueue implements JsonSerializable { $this->setActivity($this->get('activity', $data, '')); $this->setStatus($this->getInt('status', $data, 0)); $this->setTries($this->getInt('tries', $data, 0)); - $this->setLast($this->getInt('last', $data, 0)); - } + $last = $this->get('last', $data, ''); + if ($last === '') { + $this->setLast(0); + } else { + $dTime = new DateTime($last); + $this->setLast($dTime->getTimestamp()); + } + } /** * @return array diff --git a/lib/Service/ActivityService.php b/lib/Service/ActivityService.php index 85b1ea8d..fcc8399a 100644 --- a/lib/Service/ActivityService.php +++ b/lib/Service/ActivityService.php @@ -251,7 +251,6 @@ class ActivityService { /** * @param RequestQueue $queue * - * @throws ActorDoesNotExistException * @throws RequestException * @throws SocialAppConfigException */ @@ -263,9 +262,16 @@ class ActivityService { return; } - $result = $this->generateRequest( - $queue->getInstance(), $queue->getActivity(), $queue->getAuthor() - ); + + try { + $result = $this->generateRequest( + $queue->getInstance(), $queue->getActivity(), $queue->getAuthor() + ); + } catch (ActorDoesNotExistException $e) { + $this->queueService->deleteRequest($queue); + } catch (Request410Exception $e) { + $this->queueService->deleteRequest($queue); + } try { if ($this->getint('_code', $result, 500) === 202) { @@ -341,6 +347,7 @@ class ActivityService { * * @return Request[] * @throws ActorDoesNotExistException + * @throws Request410Exception * @throws RequestException * @throws SocialAppConfigException */ diff --git a/lib/Service/QueueService.php b/lib/Service/QueueService.php index 6fc3b8f3..36cc46dd 100644 --- a/lib/Service/QueueService.php +++ b/lib/Service/QueueService.php @@ -147,6 +147,22 @@ class QueueService { } + public function getRequestStandby(int &$total = 0): array { + $requests = $this->requestQueueRequest->getStandby(); + $total = sizeof($requests); + + $result = []; + foreach ($requests as $request) { + $delay = floor(pow($request->getTries(), 4) / 3); + if ($request->getLast() < (time() - $delay)) { + $result[] = $request; + } + } + + return $result; + } + + /** * @param string $token * @param int $status @@ -186,6 +202,12 @@ class QueueService { } } + /** + * @param RequestQueue $queue + */ + public function deleteRequest(RequestQueue $queue) { + $this->requestQueueRequest->delete($queue); + } }