retry-on-fail by cron and cli

Signed-off-by: Maxence Lange <maxence@artificial-owl.com>
pull/70/head
Maxence Lange 2018-11-28 17:18:37 -01:00
rodzic f08865eeed
commit 9a042eea45
8 zmienionych plików z 313 dodań i 25 usunięć

Wyświetl plik

@ -33,6 +33,7 @@
<commands>
<command>OCA\Social\Command\CacheRefresh</command>
<command>OCA\Social\Command\QueueStatus</command>
<command>OCA\Social\Command\QueueProcess</command>
<command>OCA\Social\Command\NoteCreate</command>
</commands>

8
composer.lock wygenerowano
Wyświetl plik

@ -12,12 +12,12 @@
"source": {
"type": "git",
"url": "https://github.com/daita/my-small-php-tools.git",
"reference": "12090dc3ae29d2eb49d5274ca3f6ebfb76ce5997"
"reference": "56cff24fdde14d21e3903428c5ee629c839866af"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/daita/my-small-php-tools/zipball/12090dc3ae29d2eb49d5274ca3f6ebfb76ce5997",
"reference": "12090dc3ae29d2eb49d5274ca3f6ebfb76ce5997",
"url": "https://api.github.com/repos/daita/my-small-php-tools/zipball/56cff24fdde14d21e3903428c5ee629c839866af",
"reference": "56cff24fdde14d21e3903428c5ee629c839866af",
"shasum": ""
},
"require": {
@ -40,7 +40,7 @@
}
],
"description": "My small PHP Tools",
"time": "2018-11-28T10:47:43+00:00"
"time": "2018-11-28T13:07:27+00:00"
}
],
"packages-dev": [],

Wyświetl plik

@ -0,0 +1,126 @@
<?php
declare(strict_types=1);
/**
* Nextcloud - Social Support
*
* This file is licensed under the Affero General Public License version 3 or
* later. See the COPYING file.
*
* @author Maxence Lange <maxence@artificial-owl.com>
* @copyright 2018, Maxence Lange <maxence@artificial-owl.com>
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
namespace OCA\Social\Command;
use Exception;
use OC\Core\Command\Base;
use OCA\Social\Exceptions\ActorDoesNotExistException;
use OCA\Social\Exceptions\RequestException;
use OCA\Social\Exceptions\SocialAppConfigException;
use OCA\Social\Service\ActivityService;
use OCA\Social\Service\ConfigService;
use OCA\Social\Service\MiscService;
use OCA\Social\Service\QueueService;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class QueueProcess extends Base {
/** @var ActivityService */
private $activityService;
/** @var QueueService */
private $queueService;
/** @var ConfigService */
private $configService;
/** @var MiscService */
private $miscService;
/**
* NoteCreate constructor.
*
* @param ActivityService $activityService
* @param QueueService $queueService
* @param ConfigService $configService
* @param MiscService $miscService
*/
public function __construct(
ActivityService $activityService, QueueService $queueService, ConfigService $configService,
MiscService $miscService
) {
parent::__construct();
$this->activityService = $activityService;
$this->queueService = $queueService;
$this->configService = $configService;
$this->miscService = $miscService;
}
/**
*
*/
protected function configure() {
parent::configure();
$this->setName('social:queue:process')
->setDescription('Process the request queue');
}
/**
* @param InputInterface $input
* @param OutputInterface $output
*/
protected function execute(InputInterface $input, OutputInterface $output) {
$requests = $this->queueService->getRequestStandby($total = 0);
$output->writeLn('found a total of ' . $total . ' requests in the queue');
if ($total === 0) {
return;
}
$output->writeLn(sizeof($requests) . ' are processable at this time');
if (sizeof($requests) === 0) {
return;
}
foreach ($requests as $request) {
$output->write('.');
try {
$this->activityService->manageRequest($request);
} catch (ActorDoesNotExistException $e) {
} catch (RequestException $e) {
} catch (SocialAppConfigException $e) {
}
}
$output->writeLn('done');
}
}

109
lib/Cron/Queue.php 100644
Wyświetl plik

@ -0,0 +1,109 @@
<?php
declare(strict_types=1);
/**
* Nextcloud - Social Support
*
* This file is licensed under the Affero General Public License version 3 or
* later. See the COPYING file.
*
* @author Maxence Lange <maxence@artificial-owl.com>
* @copyright 2018, Maxence Lange <maxence@artificial-owl.com>
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
namespace OCA\Social\Cron;
use Exception;
use OC\BackgroundJob\TimedJob;
use OCA\Social\AppInfo\Application;
use OCA\Social\Exceptions\ActorDoesNotExistException;
use OCA\Social\Exceptions\RequestException;
use OCA\Social\Exceptions\SocialAppConfigException;
use OCA\Social\Service\ActivityPub\DocumentService;
use OCA\Social\Service\ActivityPub\PersonService;
use OCA\Social\Service\ActivityService;
use OCA\Social\Service\ActorService;
use OCA\Social\Service\CacheService;
use OCA\Social\Service\ConfigService;
use OCA\Social\Service\MiscService;
use OCA\Social\Service\QueueService;
use OCP\AppFramework\QueryException;
/**
* Class Queue
*
* @package OCA\Social\Cron
*/
class Queue extends TimedJob {
/** @var ActivityService */
private $activityService;
/** @var QueueService */
private $queueService;
/** @var MiscService */
private $miscService;
/**
* Cache constructor.
*/
public function __construct() {
$this->setInterval(12 * 60); // 12 minutes
}
/**
* @param mixed $argument
*
* @throws QueryException
*/
protected function run($argument) {
$app = new Application();
$c = $app->getContainer();
$this->queueService = $c->query(QueueService::class);
$this->activityService = $c->query(ActivityService::class);
$this->miscService = $c->query(MiscService::class);
$this->manageQueue();
}
private function manageQueue() {
$requests = $this->queueService->getRequestStandby($total = 0);
foreach ($requests as $request) {
try {
$this->activityService->manageRequest($request);
} catch (ActorDoesNotExistException $e) {
} catch (RequestException $e) {
} catch (SocialAppConfigException $e) {
}
}
}
}

Wyświetl plik

@ -57,18 +57,6 @@ class RequestQueueRequest extends RequestQueueRequestBuilder {
public function multiple(array $queues) {
foreach ($queues as $queue) {
$this->create($queue);
// $qb->values(
// [
// 'source' => $qb->createNamedParameter($queue->getSource()),
// 'activity' => $qb->createNamedParameter($queue->getActivity()),
// 'instance' => $qb->createNamedParameter(
// json_encode($queue->getInstance(), JSON_UNESCAPED_SLASHES)
// ),
// 'status' => $qb->createNamedParameter($queue->getStatus()),
// 'tries' => $qb->createNamedParameter($queue->getTries()),
// 'last' => $qb->createNamedParameter($queue->getLast())
// ]
// );
}
}
@ -92,14 +80,34 @@ class RequestQueueRequest extends RequestQueueRequestBuilder {
)
->setValue('priority', $qb->createNamedParameter($queue->getPriority()))
->setValue('status', $qb->createNamedParameter($queue->getStatus()))
->setValue('tries', $qb->createNamedParameter($queue->getTries()))
->setValue('last', $qb->createNamedParameter($queue->getLast()));
->setValue('tries', $qb->createNamedParameter($queue->getTries()));
$qb->execute();
}
/**
* return Actor from database based on the username
* return Queue from database based on the status != 9
*
* @return RequestQueue[]
*/
public function getStandby(): array {
$qb = $this->getQueueSelectSql();
$this->limitToStatus($qb, RequestQueue::STATUS_STANDBY);
$this->orderByPriority($qb, 'desc');
$requests = [];
$cursor = $qb->execute();
while ($data = $cursor->fetch()) {
$requests[] = $this->parseQueueSelectSql($data);
}
$cursor->closeCursor();
return $requests;
}
/**
* return Queue from database based on the token
*
* @param string $token
* @param int $status
@ -197,5 +205,13 @@ class RequestQueueRequest extends RequestQueueRequestBuilder {
$queue->setStatus(RequestQueue::STATUS_SUCCESS);
}
public function delete(RequestQueue $queue) {
$qb = $this->getQueueDeleteSql();
$this->limitToId($qb, $queue->getId());
$qb->execute();
}
}

Wyświetl plik

@ -32,6 +32,7 @@ namespace OCA\Social\Model;
use daita\MySmallPhpTools\Traits\TArrayTools;
use DateTime;
use JsonSerializable;
@ -299,9 +300,15 @@ class RequestQueue implements JsonSerializable {
$this->setActivity($this->get('activity', $data, ''));
$this->setStatus($this->getInt('status', $data, 0));
$this->setTries($this->getInt('tries', $data, 0));
$this->setLast($this->getInt('last', $data, 0));
}
$last = $this->get('last', $data, '');
if ($last === '') {
$this->setLast(0);
} else {
$dTime = new DateTime($last);
$this->setLast($dTime->getTimestamp());
}
}
/**
* @return array

Wyświetl plik

@ -251,7 +251,6 @@ class ActivityService {
/**
* @param RequestQueue $queue
*
* @throws ActorDoesNotExistException
* @throws RequestException
* @throws SocialAppConfigException
*/
@ -263,9 +262,16 @@ class ActivityService {
return;
}
$result = $this->generateRequest(
$queue->getInstance(), $queue->getActivity(), $queue->getAuthor()
);
try {
$result = $this->generateRequest(
$queue->getInstance(), $queue->getActivity(), $queue->getAuthor()
);
} catch (ActorDoesNotExistException $e) {
$this->queueService->deleteRequest($queue);
} catch (Request410Exception $e) {
$this->queueService->deleteRequest($queue);
}
try {
if ($this->getint('_code', $result, 500) === 202) {
@ -341,6 +347,7 @@ class ActivityService {
*
* @return Request[]
* @throws ActorDoesNotExistException
* @throws Request410Exception
* @throws RequestException
* @throws SocialAppConfigException
*/

Wyświetl plik

@ -147,6 +147,22 @@ class QueueService {
}
public function getRequestStandby(int &$total = 0): array {
$requests = $this->requestQueueRequest->getStandby();
$total = sizeof($requests);
$result = [];
foreach ($requests as $request) {
$delay = floor(pow($request->getTries(), 4) / 3);
if ($request->getLast() < (time() - $delay)) {
$result[] = $request;
}
}
return $result;
}
/**
* @param string $token
* @param int $status
@ -186,6 +202,12 @@ class QueueService {
}
}
/**
* @param RequestQueue $queue
*/
public function deleteRequest(RequestQueue $queue) {
$this->requestQueueRequest->delete($queue);
}
}