migrate data to a specific table

Signed-off-by: Maxence Lange <maxence@artificial-owl.com>
pull/772/head
Maxence Lange 2019-10-01 13:53:26 +02:00
rodzic 6ba6d86fa7
commit 1130d7ffbd
12 zmienionych plików z 427 dodań i 33 usunięć

Wyświetl plik

@ -128,6 +128,5 @@ class CacheRefresh extends Base {
$output->writeLn($result . ' hashtags updated');
}
}

Wyświetl plik

@ -65,6 +65,7 @@ class CoreRequestBuilder {
const TABLE_ACTORS = 'social_a2_actors';
const TABLE_STREAM = 'social_a2_stream';
const TABLE_STREAM_DEST = 'social_a2_stream_dest';
const TABLE_STREAM_TAGS = 'social_a2_stream_tags';
const TABLE_STREAM_QUEUE = 'social_a2_stream_queue';
const TABLE_STREAM_ACTIONS = 'social_a2_stream_action';
@ -87,6 +88,7 @@ class CoreRequestBuilder {
self::TABLE_CACHE_DOCUMENTS,
self::TABLE_STREAM_QUEUE,
self::TABLE_STREAM_DEST,
self::TABLE_STREAM_TAGS,
self::TABLE_STREAM_ACTIONS
];

Wyświetl plik

@ -66,7 +66,24 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder {
* @param string $alias
* @param string $link
*/
public function innerJoinCacheActors(string $alias = 'ca', string $link = '') {
public function linkToStreamTags(string $alias = 'st', string $link = '') {
if ($this->getType() !== QueryBuilder::SELECT) {
return;
}
$this->from(CoreRequestBuilder::TABLE_STREAM_TAGS, $alias);
if ($link !== '') {
$expr = $this->expr();
$this->andWhere($expr->eq($alias . '.stream_id', $link));
}
}
/**
* @param string $alias
* @param string $link
*/
public function linkToCacheActors(string $alias = 'ca', string $link = '') {
if ($this->getType() !== QueryBuilder::SELECT) {
return;
}
@ -173,15 +190,14 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder {
/**
* @param string $type
* @param string $subType
* @param string $field
* @param string $aliasDest
* @param string $alias
*/
public function innerJoinDest(
public function innerJoinSteamDest(
string $type, string $field = 'id_prim', string $aliasDest = 'sd', string $alias = ''
) {
$this->andWhere($this->exprInnerJoinDest($type, $field, $aliasDest, $alias));
$this->andWhere($this->exprInnerJoinStreamDest($type, $field, $aliasDest, $alias));
}
@ -194,7 +210,7 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder {
*
* @return ICompositeExpression
*/
public function exprInnerJoinDest(
public function exprInnerJoinStreamDest(
string $type, string $field = 'id_prim', string $aliasDest = 'sd', string $alias = ''
): ICompositeExpression {
$expr = $this->expr();
@ -207,12 +223,22 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder {
}
public function innerJoinDestFollowing(
/**
* @param string $actorId
* @param string $type
* @param string $field
* @param string $aliasDest
* @param string $aliasFollowing
* @param string $alias
*/
public function innerJoinStreamDestFollowing(
string $actorId, string $type, string $field = 'id_prim', string $aliasDest = 'sd',
string $aliasFollowing = 'f', string $alias = ''
) {
$this->andWhere(
$this->exprInnerJoinDestFollowing($actorId, $type, $field, $aliasDest, $aliasFollowing, $alias)
$this->exprInnerJoinStreamDestFollowing(
$actorId, $type, $field, $aliasDest, $aliasFollowing, $alias
)
);
}
@ -227,7 +253,7 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder {
*
* @return ICompositeExpression
*/
public function exprInnerJoinDestFollowing(
public function exprInnerJoinStreamDestFollowing(
string $actorId, string $type, string $field = 'id_prim', string $aliasDest = 'sd',
string $aliasFollowing = 'f', string $alias = ''
): ICompositeExpression {

Wyświetl plik

@ -365,7 +365,7 @@ class SocialLimitsQueryBuilder extends SocialCrossQueryBuilder {
) {
if (!$this->hasViewer()) {
$this->selectDestFollowing($aliasDest);
$this->innerJoinDest('recipient', 'id_prim', 'sd', 's');
$this->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's');
$this->limitToDest(ACore::CONTEXT_PUBLIC, 'recipient', '', $aliasDest);
return;
@ -376,7 +376,7 @@ class SocialLimitsQueryBuilder extends SocialCrossQueryBuilder {
$orX = $expr->orX();
$actor = $this->getViewer();
$following = $this->exprInnerJoinDestFollowing(
$following = $this->exprInnerJoinStreamDestFollowing(
$actor->getId(), 'recipient', 'id_prim', $aliasDest, $aliasFollowing
);
$orX->add($following);

Wyświetl plik

@ -87,7 +87,7 @@ class StreamDestRequest extends StreamDestRequestBuilder {
$qb->execute();
} catch (UniqueConstraintViolationException $e) {
\OC::$server->getLogger()
->log(3, 'Social - Duplicate recipient on Stream ' . json_encode($stream));
->log(1, 'Social - Duplicate recipient on Stream ' . json_encode($stream));
}
}
}

Wyświetl plik

@ -64,7 +64,7 @@ class StreamDestRequestBuilder extends CoreRequestBuilder {
* @return IQueryBuilder
*/
protected function getStreamDestUpdateSql(): IQueryBuilder {
$qb = $this->dbConnection->getQueryBuilder();
$qb = $this->getQueryBuilder();
$qb->update(self::TABLE_STREAM_DEST);
return $qb;
@ -77,7 +77,7 @@ class StreamDestRequestBuilder extends CoreRequestBuilder {
* @return IQueryBuilder
*/
protected function getStreamDestSelectSql(): IQueryBuilder {
$qb = $this->dbConnection->getQueryBuilder();
$qb = $this->getQueryBuilder();
/** @noinspection PhpMethodParametersCountMismatchInspection */
$qb->select('sd.actor_id', 'sd.stream_id', 'sd.type')
@ -95,7 +95,7 @@ class StreamDestRequestBuilder extends CoreRequestBuilder {
* @return IQueryBuilder
*/
protected function getStreamDestDeleteSql(): IQueryBuilder {
$qb = $this->dbConnection->getQueryBuilder();
$qb = $this->getQueryBuilder();
$qb->delete(self::TABLE_STREAM_DEST);
return $qb;
@ -108,7 +108,7 @@ class StreamDestRequestBuilder extends CoreRequestBuilder {
* @return IQueryBuilder
*/
protected function countStreamDestSelectSql(): IQueryBuilder {
$qb = $this->dbConnection->getQueryBuilder();
$qb = $this->getQueryBuilder();
$qb->selectAlias($qb->createFunction('COUNT(*)'), 'count')
->from(self::TABLE_STREAM_DEST, 'sd');

Wyświetl plik

@ -62,6 +62,9 @@ class StreamRequest extends StreamRequestBuilder {
/** @var StreamDestRequest */
private $streamDestRequest;
/** @var StreamTagsRequest */
private $streamTagsRequest;
/**
* StreamRequest constructor.
@ -74,11 +77,12 @@ class StreamRequest extends StreamRequestBuilder {
*/
public function __construct(
IDBConnection $connection, ILogger $logger, StreamDestRequest $streamDestRequest,
ConfigService $configService, MiscService $miscService
StreamTagsRequest $streamTagsRequest, ConfigService $configService, MiscService $miscService
) {
parent::__construct($connection, $logger, $configService, $miscService);
$this->streamDestRequest = $streamDestRequest;
$this->streamTagsRequest = $streamTagsRequest;
}
@ -101,6 +105,7 @@ class StreamRequest extends StreamRequestBuilder {
$qb->execute();
$this->streamDestRequest->generateStreamDest($stream);
$this->streamTagsRequest->generateStreamTags($stream);
} catch (UniqueConstraintViolationException $e) {
}
}
@ -231,7 +236,7 @@ class StreamRequest extends StreamRequestBuilder {
$expr = $qb->expr();
$qb->limitToIdPrim($qb->prim($id));
$qb->innerJoinCacheActors('ca', 's.attributed_to_prim');
$qb->linkToCacheActors('ca', 's.attributed_to_prim');
if ($asViewer) {
$qb->limitToViewer('sd', 'f', true);
@ -271,7 +276,7 @@ class StreamRequest extends StreamRequestBuilder {
$qb->limitPaginate($since, $limit);
$expr = $qb->expr();
$qb->innerJoinCacheActors('ca', 's.attributed_to_prim');
$qb->linkToCacheActors('ca', 's.attributed_to_prim');
$qb->andWhere($expr->eq('s.attributed_to', 'ca.id_prim'));
@ -337,7 +342,7 @@ class StreamRequest extends StreamRequestBuilder {
$qb->limitToType(Note::TYPE);
$qb->selectDestFollowing('sd', '');
$qb->innerJoinDest('recipient', 'id_prim', 'sd', 's');
$qb->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's');
$qb->limitToDest(ACore::CONTEXT_PUBLIC, 'recipient', '', 'sd');
$cursor = $qb->execute();
@ -364,7 +369,7 @@ class StreamRequest extends StreamRequestBuilder {
$qb = $this->getStreamSelectSql();
$expr = $qb->expr();
$qb->innerJoinCacheActors('ca', 'f.object_id_prim');
$qb->linkToCacheActors('ca', 'f.object_id_prim');
$qb->limitPaginate($since, $limit);
$qb->andWhere($qb->exprLimitToDBField('type', SocialAppNotification::TYPE, false));
@ -401,7 +406,7 @@ class StreamRequest extends StreamRequestBuilder {
$qb->limitToDest($actor->getId(), 'recipient', '', 'sd');
$qb->limitToType(SocialAppNotification::TYPE);
$qb->innerJoinCacheActors('ca', 's.attributed_to_prim');
$qb->linkToCacheActors('ca', 's.attributed_to_prim');
$qb->leftJoinStreamAction();
return $this->getStreamsFromRequest($qb);
@ -427,10 +432,10 @@ class StreamRequest extends StreamRequestBuilder {
$qb->limitToAttributedTo($actorId);
$qb->selectDestFollowing('sd', '');
$qb->innerJoinDest('recipient', 'id_prim', 'sd', 's');
$qb->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's');
$qb->limitToDest(ACore::CONTEXT_PUBLIC, 'recipient', '', 'sd');
$qb->innerJoinCacheActors('ca', 's.attributed_to_prim');
$qb->linkToCacheActors('ca', 's.attributed_to_prim');
$qb->leftJoinStreamAction();
return $this->getStreamsFromRequest($qb);
@ -455,10 +460,10 @@ class StreamRequest extends StreamRequestBuilder {
$qb->filterType(SocialAppNotification::TYPE);
$qb->limitPaginate($since, $limit);
$qb->innerJoinCacheActors('ca', 's.attributed_to_prim');
$qb->linkToCacheActors('ca', 's.attributed_to_prim');
$qb->selectDestFollowing('sd', '');
$qb->innerJoinDest('recipient', 'id_prim', 'sd', 's');
$qb->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's');
$qb->limitToDest($actor->getId(), 'recipient', '', 'sd');
$qb->filterDest(ACore::CONTEXT_PUBLIC);
@ -488,11 +493,11 @@ class StreamRequest extends StreamRequestBuilder {
$qb->limitToLocal($localOnly);
$qb->limitToType(Note::TYPE);
$qb->innerJoinCacheActors('ca', 's.attributed_to_prim');
$qb->linkToCacheActors('ca', 's.attributed_to_prim');
$qb->leftJoinStreamAction();
$qb->selectDestFollowing('sd', '');
$qb->innerJoinDest('recipient', 'id_prim', 'sd', 's');
$qb->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's');
$qb->limitToDest(ACore::CONTEXT_PUBLIC, 'recipient', 'to', 'sd');
return $this->getStreamsFromRequest($qb);
@ -521,7 +526,7 @@ class StreamRequest extends StreamRequestBuilder {
$qb->limitPaginate($since, $limit);
$expr = $qb->expr();
$qb->innerJoinCacheActors('ca', 's.attributed_to_prim');
$qb->linkToCacheActors('ca', 's.attributed_to_prim');
$qb->selectStreamActions('sa');
$qb->andWhere($expr->eq('sa.stream_id_prim', 's.id_prim'));
@ -549,10 +554,12 @@ class StreamRequest extends StreamRequestBuilder {
$qb = $this->getStreamSelectSql();
$expr = $qb->expr();
$qb->innerJoinCacheActors('ca', 's.attributed_to_prim');
$qb->linkToCacheActors('ca', 's.attributed_to_prim');
$qb->linkToStreamTags('st', 's.id_prim');
$qb->limitPaginate($since, $limit);
$qb->andWhere($qb->exprLimitToDBField('type', Note::TYPE));
$qb->andWhere($qb->exprLimitToDBField('hashtag', $hashtag, true, false, 'st'));
$qb->limitToViewer('sd', 'f', true);
$qb->andWhere($expr->eq('s.attributed_to_prim', 'ca.id_prim'));
@ -560,7 +567,7 @@ class StreamRequest extends StreamRequestBuilder {
$qb->leftJoinStreamAction('sa');
// TODO: Sql optimisation - Create a table like stream_dest for to link 'hashtag' to 'stream_id'
$qb->andWhere($this->exprValueWithinJsonFormat($qb, 'hashtags', '' . $hashtag));
// $qb->andWhere($this->exprValueWithinJsonFormat($qb, 'hashtags', '' . $hashtag));
return $this->getStreamsFromRequest($qb);
}

Wyświetl plik

@ -0,0 +1,78 @@
<?php
declare(strict_types=1);
/**
* Nextcloud - Social Support
*
* This file is licensed under the Affero General Public License version 3 or
* later. See the COPYING file.
*
* @author Maxence Lange <maxence@artificial-owl.com>
* @copyright 2018, Maxence Lange <maxence@artificial-owl.com>
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
namespace OCA\Social\Db;
use daita\MySmallPhpTools\Traits\TStringTools;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use OCA\Social\Model\ActivityPub\Stream;
/**
* Class StreamTagsRequest
*
* @package OCA\Social\Db
*/
class StreamTagsRequest extends StreamTagsRequestBuilder {
use TStringTools;
/**
* @param Stream $stream
*/
public function generateStreamTags(Stream $stream) {
$hashtags = $stream->getTags();
$this->miscService->log('$$$$ ' . json_encode($hashtags));
foreach ($hashtags as $hashtag) {
$tag = $this->get('name', $hashtag);
if ($this->get('type', $hashtag) !== 'Hashtag' || $tag === '') {
continue;
}
$tag = substr($tag, 1);
$qb = $this->getStreamTagsInsertSql();
$streamId = $qb->prim($stream->getId());
$qb->setValue('stream_id', $qb->createNamedParameter($streamId));
$qb->setValue('hashtag', $qb->createNamedParameter($tag));
try {
$qb->execute();
} catch (UniqueConstraintViolationException $e) {
\OC::$server->getLogger()
->log(1, 'Social - Duplicate hashtag on Stream ' . json_encode($stream));
}
}
}
}

Wyświetl plik

@ -0,0 +1,105 @@
<?php
declare(strict_types=1);
/**
* Nextcloud - Social Support
*
* This file is licensed under the Affero General Public License version 3 or
* later. See the COPYING file.
*
* @author Maxence Lange <maxence@artificial-owl.com>
* @copyright 2018, Maxence Lange <maxence@artificial-owl.com>
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
namespace OCA\Social\Db;
use daita\MySmallPhpTools\Traits\TArrayTools;
use OCP\DB\QueryBuilder\IQueryBuilder;
/**
* Class StreamDestRequestBuilder
*
* @package OCA\Social\Db
*/
class StreamTagsRequestBuilder extends CoreRequestBuilder {
use TArrayTools;
/**
* Base of the Sql Insert request
*
* @return SocialQueryBuilder
*/
protected function getStreamTagsInsertSql(): SocialQueryBuilder {
$qb = $this->getQueryBuilder();
$qb->insert(self::TABLE_STREAM_TAGS);
return $qb;
}
/**
* Base of the Sql Update request
*
* @return IQueryBuilder
*/
protected function getStreamTagsUpdateSql(): IQueryBuilder {
$qb = $this->getQueryBuilder();
$qb->update(self::TABLE_STREAM_TAGS);
return $qb;
}
/**
* Base of the Sql Select request for Shares
*
* @return IQueryBuilder
*/
protected function getStreamTagsSelectSql(): IQueryBuilder {
$qb = $this->getQueryBuilder();
/** @noinspection PhpMethodParametersCountMismatchInspection */
$qb->select('st.stream_id', 'st.hashtag')
->from(self::TABLE_STREAM_TAGS, 'st');
$this->defaultSelectAlias = 'st';
return $qb;
}
/**
* Base of the Sql Delete request
*
* @return IQueryBuilder
*/
protected function getStreamTagsDeleteSql(): IQueryBuilder {
$qb = $this->getQueryBuilder();
$qb->delete(self::TABLE_STREAM_TAGS);
return $qb;
}
}

Wyświetl plik

@ -390,7 +390,7 @@ class Version0002Date20190916000001 extends SimpleMigrationStep {
$insert->execute();
} catch (UniqueConstraintViolationException $e) {
\OC::$server->getLogger()
->log(3, 'Social - Duplicate recipient on Stream ' . json_encode($data));
->log(1, 'Social - Duplicate recipient on Stream ' . json_encode($data));
}
}
}

Wyświetl plik

@ -154,7 +154,7 @@ class Version0002Date20190925000001 extends SimpleMigrationStep {
$update->set('stream_id_prim', $update->createNamedParameter(hash('sha512', $streamId)));
$update->set('liked', $update->createNamedParameter($liked));
$update->set('boosted', $update->createNamedParameter($boosted));
$update->setValue('replied', $update->createNamedParameter($replied));
$update->set('replied', $update->createNamedParameter($replied));
$expr = $update->expr();
$update->where($expr->eq('id', $update->createNamedParameter($id)));

Wyświetl plik

@ -0,0 +1,177 @@
<?php
declare(strict_types=1);
/**
* Nextcloud - Social Support
*
* This file is licensed under the Affero General Public License version 3 or
* later. See the COPYING file.
*
* @author Maxence Lange <maxence@artificial-owl.com>
* @copyright 2018, Maxence Lange <maxence@artificial-owl.com>
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
namespace OCA\Social\Migration;
use Closure;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Schema\SchemaException;
use Exception;
use OCP\DB\ISchemaWrapper;
use OCP\IDBConnection;
use OCP\Migration\IOutput;
use OCP\Migration\SimpleMigrationStep;
/**
* Class Version0002Date20191001000001
*
* @package OCA\Social\Migration
*/
class Version0002Date20191001000001 extends SimpleMigrationStep {
/** @var IDBConnection */
private $connection;
/**
* @param IDBConnection $connection
*/
public function __construct(IDBConnection $connection) {
$this->connection = $connection;
}
/**
* @param IOutput $output
* @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper`
* @param array $options
*
* @return ISchemaWrapper
* @throws SchemaException
*/
public function changeSchema(IOutput $output, Closure $schemaClosure, array $options
): ISchemaWrapper {
/** @var ISchemaWrapper $schema */
$schema = $schemaClosure();
if (!$schema->hasTable('social_a2_stream_tags')) {
$table = $schema->createTable('social_a2_stream_tags');
$table->addColumn(
'stream_id', 'string',
[
'notnull' => true,
'length' => 128,
]
);
$table->addColumn(
'hashtag', 'string',
[
'notnull' => false,
'length' => 127,
]
);
if (!$table->hasIndex('sh')) {
$table->addUniqueIndex(['stream_id', 'hashtag'], 'sh');
}
}
return $schema;
}
/**
* @param IOutput $output
* @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper`
* @param array $options
*
* @throws Exception
*/
public function postSchemaChange(IOutput $output, Closure $schemaClosure, array $options) {
/** @var ISchemaWrapper $schema */
$schema = $schemaClosure();
$this->fillTableStreamHashtags($schema);
}
/**
* @param ISchemaWrapper $schema
*/
private function fillTableStreamHashtags(ISchemaWrapper $schema) {
$start = 0;
$limit = 1000;
while (true) {
$qb = $this->connection->getQueryBuilder();
$qb->select('id', 'hashtags')
->from('social_a2_stream')
->setMaxResults(1000)
->setFirstResult($start);
$cursor = $qb->execute();
$count = 0;
while ($data = $cursor->fetch()) {
$count++;
$this->updateStreamTags($data);
}
$cursor->closeCursor();
$start += $count;
if ($count < $limit) {
break;
}
}
}
/**
* @param array $data
*/
private function updateStreamTags(array $data) {
if ($data['hashtags'] === '' || $data['hashtags'] === null) {
return;
}
$id = $data['id'];
$tags = json_decode($data['hashtags'], true);
foreach ($tags as $tag) {
if ($tag === '') {
continue;
}
$insert = $this->connection->getQueryBuilder();
$insert->insert('social_a2_stream_tags');
$insert->setValue('stream_id', $insert->createNamedParameter(hash('sha512', $id)));
$insert->setValue('hashtag', $insert->createNamedParameter($tag));
try {
$insert->execute();
} catch (UniqueConstraintViolationException $e) {
}
}
}
}