From 1130d7ffbdd6d1a81051890586a8a8884321be03 Mon Sep 17 00:00:00 2001 From: Maxence Lange Date: Tue, 1 Oct 2019 13:53:26 +0200 Subject: [PATCH] migrate data to a specific table Signed-off-by: Maxence Lange --- lib/Command/CacheRefresh.php | 1 - lib/Db/CoreRequestBuilder.php | 2 + lib/Db/SocialCrossQueryBuilder.php | 42 ++++- lib/Db/SocialLimitsQueryBuilder.php | 4 +- lib/Db/StreamDestRequest.php | 2 +- lib/Db/StreamDestRequestBuilder.php | 8 +- lib/Db/StreamRequest.php | 37 ++-- lib/Db/StreamTagsRequest.php | 78 ++++++++ lib/Db/StreamTagsRequestBuilder.php | 105 +++++++++++ .../Version0002Date20190916000001.php | 2 +- .../Version0002Date20190925000001.php | 2 +- .../Version0002Date20191001000001.php | 177 ++++++++++++++++++ 12 files changed, 427 insertions(+), 33 deletions(-) create mode 100644 lib/Db/StreamTagsRequest.php create mode 100644 lib/Db/StreamTagsRequestBuilder.php create mode 100644 lib/Migration/Version0002Date20191001000001.php diff --git a/lib/Command/CacheRefresh.php b/lib/Command/CacheRefresh.php index 71165df2..349edc4f 100644 --- a/lib/Command/CacheRefresh.php +++ b/lib/Command/CacheRefresh.php @@ -128,6 +128,5 @@ class CacheRefresh extends Base { $output->writeLn($result . ' hashtags updated'); } - } diff --git a/lib/Db/CoreRequestBuilder.php b/lib/Db/CoreRequestBuilder.php index 4ddd6ece..dc347c64 100644 --- a/lib/Db/CoreRequestBuilder.php +++ b/lib/Db/CoreRequestBuilder.php @@ -65,6 +65,7 @@ class CoreRequestBuilder { const TABLE_ACTORS = 'social_a2_actors'; const TABLE_STREAM = 'social_a2_stream'; const TABLE_STREAM_DEST = 'social_a2_stream_dest'; + const TABLE_STREAM_TAGS = 'social_a2_stream_tags'; const TABLE_STREAM_QUEUE = 'social_a2_stream_queue'; const TABLE_STREAM_ACTIONS = 'social_a2_stream_action'; @@ -87,6 +88,7 @@ class CoreRequestBuilder { self::TABLE_CACHE_DOCUMENTS, self::TABLE_STREAM_QUEUE, self::TABLE_STREAM_DEST, + self::TABLE_STREAM_TAGS, self::TABLE_STREAM_ACTIONS ]; diff --git a/lib/Db/SocialCrossQueryBuilder.php b/lib/Db/SocialCrossQueryBuilder.php index bfd99de7..4cd50c5d 100644 --- a/lib/Db/SocialCrossQueryBuilder.php +++ b/lib/Db/SocialCrossQueryBuilder.php @@ -66,7 +66,24 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder { * @param string $alias * @param string $link */ - public function innerJoinCacheActors(string $alias = 'ca', string $link = '') { + public function linkToStreamTags(string $alias = 'st', string $link = '') { + if ($this->getType() !== QueryBuilder::SELECT) { + return; + } + + $this->from(CoreRequestBuilder::TABLE_STREAM_TAGS, $alias); + if ($link !== '') { + $expr = $this->expr(); + $this->andWhere($expr->eq($alias . '.stream_id', $link)); + } + } + + + /** + * @param string $alias + * @param string $link + */ + public function linkToCacheActors(string $alias = 'ca', string $link = '') { if ($this->getType() !== QueryBuilder::SELECT) { return; } @@ -173,15 +190,14 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder { /** * @param string $type - * @param string $subType * @param string $field * @param string $aliasDest * @param string $alias */ - public function innerJoinDest( + public function innerJoinSteamDest( string $type, string $field = 'id_prim', string $aliasDest = 'sd', string $alias = '' ) { - $this->andWhere($this->exprInnerJoinDest($type, $field, $aliasDest, $alias)); + $this->andWhere($this->exprInnerJoinStreamDest($type, $field, $aliasDest, $alias)); } @@ -194,7 +210,7 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder { * * @return ICompositeExpression */ - public function exprInnerJoinDest( + public function exprInnerJoinStreamDest( string $type, string $field = 'id_prim', string $aliasDest = 'sd', string $alias = '' ): ICompositeExpression { $expr = $this->expr(); @@ -207,12 +223,22 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder { } - public function innerJoinDestFollowing( + /** + * @param string $actorId + * @param string $type + * @param string $field + * @param string $aliasDest + * @param string $aliasFollowing + * @param string $alias + */ + public function innerJoinStreamDestFollowing( string $actorId, string $type, string $field = 'id_prim', string $aliasDest = 'sd', string $aliasFollowing = 'f', string $alias = '' ) { $this->andWhere( - $this->exprInnerJoinDestFollowing($actorId, $type, $field, $aliasDest, $aliasFollowing, $alias) + $this->exprInnerJoinStreamDestFollowing( + $actorId, $type, $field, $aliasDest, $aliasFollowing, $alias + ) ); } @@ -227,7 +253,7 @@ class SocialCrossQueryBuilder extends SocialCoreQueryBuilder { * * @return ICompositeExpression */ - public function exprInnerJoinDestFollowing( + public function exprInnerJoinStreamDestFollowing( string $actorId, string $type, string $field = 'id_prim', string $aliasDest = 'sd', string $aliasFollowing = 'f', string $alias = '' ): ICompositeExpression { diff --git a/lib/Db/SocialLimitsQueryBuilder.php b/lib/Db/SocialLimitsQueryBuilder.php index f4c21bb5..e3cb83ae 100644 --- a/lib/Db/SocialLimitsQueryBuilder.php +++ b/lib/Db/SocialLimitsQueryBuilder.php @@ -365,7 +365,7 @@ class SocialLimitsQueryBuilder extends SocialCrossQueryBuilder { ) { if (!$this->hasViewer()) { $this->selectDestFollowing($aliasDest); - $this->innerJoinDest('recipient', 'id_prim', 'sd', 's'); + $this->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's'); $this->limitToDest(ACore::CONTEXT_PUBLIC, 'recipient', '', $aliasDest); return; @@ -376,7 +376,7 @@ class SocialLimitsQueryBuilder extends SocialCrossQueryBuilder { $orX = $expr->orX(); $actor = $this->getViewer(); - $following = $this->exprInnerJoinDestFollowing( + $following = $this->exprInnerJoinStreamDestFollowing( $actor->getId(), 'recipient', 'id_prim', $aliasDest, $aliasFollowing ); $orX->add($following); diff --git a/lib/Db/StreamDestRequest.php b/lib/Db/StreamDestRequest.php index 8ea6908d..78194f0d 100644 --- a/lib/Db/StreamDestRequest.php +++ b/lib/Db/StreamDestRequest.php @@ -87,7 +87,7 @@ class StreamDestRequest extends StreamDestRequestBuilder { $qb->execute(); } catch (UniqueConstraintViolationException $e) { \OC::$server->getLogger() - ->log(3, 'Social - Duplicate recipient on Stream ' . json_encode($stream)); + ->log(1, 'Social - Duplicate recipient on Stream ' . json_encode($stream)); } } } diff --git a/lib/Db/StreamDestRequestBuilder.php b/lib/Db/StreamDestRequestBuilder.php index 9bb60738..986e0215 100644 --- a/lib/Db/StreamDestRequestBuilder.php +++ b/lib/Db/StreamDestRequestBuilder.php @@ -64,7 +64,7 @@ class StreamDestRequestBuilder extends CoreRequestBuilder { * @return IQueryBuilder */ protected function getStreamDestUpdateSql(): IQueryBuilder { - $qb = $this->dbConnection->getQueryBuilder(); + $qb = $this->getQueryBuilder(); $qb->update(self::TABLE_STREAM_DEST); return $qb; @@ -77,7 +77,7 @@ class StreamDestRequestBuilder extends CoreRequestBuilder { * @return IQueryBuilder */ protected function getStreamDestSelectSql(): IQueryBuilder { - $qb = $this->dbConnection->getQueryBuilder(); + $qb = $this->getQueryBuilder(); /** @noinspection PhpMethodParametersCountMismatchInspection */ $qb->select('sd.actor_id', 'sd.stream_id', 'sd.type') @@ -95,7 +95,7 @@ class StreamDestRequestBuilder extends CoreRequestBuilder { * @return IQueryBuilder */ protected function getStreamDestDeleteSql(): IQueryBuilder { - $qb = $this->dbConnection->getQueryBuilder(); + $qb = $this->getQueryBuilder(); $qb->delete(self::TABLE_STREAM_DEST); return $qb; @@ -108,7 +108,7 @@ class StreamDestRequestBuilder extends CoreRequestBuilder { * @return IQueryBuilder */ protected function countStreamDestSelectSql(): IQueryBuilder { - $qb = $this->dbConnection->getQueryBuilder(); + $qb = $this->getQueryBuilder(); $qb->selectAlias($qb->createFunction('COUNT(*)'), 'count') ->from(self::TABLE_STREAM_DEST, 'sd'); diff --git a/lib/Db/StreamRequest.php b/lib/Db/StreamRequest.php index a5ea0dbc..c321f2a5 100644 --- a/lib/Db/StreamRequest.php +++ b/lib/Db/StreamRequest.php @@ -62,6 +62,9 @@ class StreamRequest extends StreamRequestBuilder { /** @var StreamDestRequest */ private $streamDestRequest; + /** @var StreamTagsRequest */ + private $streamTagsRequest; + /** * StreamRequest constructor. @@ -74,11 +77,12 @@ class StreamRequest extends StreamRequestBuilder { */ public function __construct( IDBConnection $connection, ILogger $logger, StreamDestRequest $streamDestRequest, - ConfigService $configService, MiscService $miscService + StreamTagsRequest $streamTagsRequest, ConfigService $configService, MiscService $miscService ) { parent::__construct($connection, $logger, $configService, $miscService); $this->streamDestRequest = $streamDestRequest; + $this->streamTagsRequest = $streamTagsRequest; } @@ -101,6 +105,7 @@ class StreamRequest extends StreamRequestBuilder { $qb->execute(); $this->streamDestRequest->generateStreamDest($stream); + $this->streamTagsRequest->generateStreamTags($stream); } catch (UniqueConstraintViolationException $e) { } } @@ -231,7 +236,7 @@ class StreamRequest extends StreamRequestBuilder { $expr = $qb->expr(); $qb->limitToIdPrim($qb->prim($id)); - $qb->innerJoinCacheActors('ca', 's.attributed_to_prim'); + $qb->linkToCacheActors('ca', 's.attributed_to_prim'); if ($asViewer) { $qb->limitToViewer('sd', 'f', true); @@ -271,7 +276,7 @@ class StreamRequest extends StreamRequestBuilder { $qb->limitPaginate($since, $limit); $expr = $qb->expr(); - $qb->innerJoinCacheActors('ca', 's.attributed_to_prim'); + $qb->linkToCacheActors('ca', 's.attributed_to_prim'); $qb->andWhere($expr->eq('s.attributed_to', 'ca.id_prim')); @@ -337,7 +342,7 @@ class StreamRequest extends StreamRequestBuilder { $qb->limitToType(Note::TYPE); $qb->selectDestFollowing('sd', ''); - $qb->innerJoinDest('recipient', 'id_prim', 'sd', 's'); + $qb->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's'); $qb->limitToDest(ACore::CONTEXT_PUBLIC, 'recipient', '', 'sd'); $cursor = $qb->execute(); @@ -364,7 +369,7 @@ class StreamRequest extends StreamRequestBuilder { $qb = $this->getStreamSelectSql(); $expr = $qb->expr(); - $qb->innerJoinCacheActors('ca', 'f.object_id_prim'); + $qb->linkToCacheActors('ca', 'f.object_id_prim'); $qb->limitPaginate($since, $limit); $qb->andWhere($qb->exprLimitToDBField('type', SocialAppNotification::TYPE, false)); @@ -401,7 +406,7 @@ class StreamRequest extends StreamRequestBuilder { $qb->limitToDest($actor->getId(), 'recipient', '', 'sd'); $qb->limitToType(SocialAppNotification::TYPE); - $qb->innerJoinCacheActors('ca', 's.attributed_to_prim'); + $qb->linkToCacheActors('ca', 's.attributed_to_prim'); $qb->leftJoinStreamAction(); return $this->getStreamsFromRequest($qb); @@ -427,10 +432,10 @@ class StreamRequest extends StreamRequestBuilder { $qb->limitToAttributedTo($actorId); $qb->selectDestFollowing('sd', ''); - $qb->innerJoinDest('recipient', 'id_prim', 'sd', 's'); + $qb->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's'); $qb->limitToDest(ACore::CONTEXT_PUBLIC, 'recipient', '', 'sd'); - $qb->innerJoinCacheActors('ca', 's.attributed_to_prim'); + $qb->linkToCacheActors('ca', 's.attributed_to_prim'); $qb->leftJoinStreamAction(); return $this->getStreamsFromRequest($qb); @@ -455,10 +460,10 @@ class StreamRequest extends StreamRequestBuilder { $qb->filterType(SocialAppNotification::TYPE); $qb->limitPaginate($since, $limit); - $qb->innerJoinCacheActors('ca', 's.attributed_to_prim'); + $qb->linkToCacheActors('ca', 's.attributed_to_prim'); $qb->selectDestFollowing('sd', ''); - $qb->innerJoinDest('recipient', 'id_prim', 'sd', 's'); + $qb->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's'); $qb->limitToDest($actor->getId(), 'recipient', '', 'sd'); $qb->filterDest(ACore::CONTEXT_PUBLIC); @@ -488,11 +493,11 @@ class StreamRequest extends StreamRequestBuilder { $qb->limitToLocal($localOnly); $qb->limitToType(Note::TYPE); - $qb->innerJoinCacheActors('ca', 's.attributed_to_prim'); + $qb->linkToCacheActors('ca', 's.attributed_to_prim'); $qb->leftJoinStreamAction(); $qb->selectDestFollowing('sd', ''); - $qb->innerJoinDest('recipient', 'id_prim', 'sd', 's'); + $qb->innerJoinSteamDest('recipient', 'id_prim', 'sd', 's'); $qb->limitToDest(ACore::CONTEXT_PUBLIC, 'recipient', 'to', 'sd'); return $this->getStreamsFromRequest($qb); @@ -521,7 +526,7 @@ class StreamRequest extends StreamRequestBuilder { $qb->limitPaginate($since, $limit); $expr = $qb->expr(); - $qb->innerJoinCacheActors('ca', 's.attributed_to_prim'); + $qb->linkToCacheActors('ca', 's.attributed_to_prim'); $qb->selectStreamActions('sa'); $qb->andWhere($expr->eq('sa.stream_id_prim', 's.id_prim')); @@ -549,10 +554,12 @@ class StreamRequest extends StreamRequestBuilder { $qb = $this->getStreamSelectSql(); $expr = $qb->expr(); - $qb->innerJoinCacheActors('ca', 's.attributed_to_prim'); + $qb->linkToCacheActors('ca', 's.attributed_to_prim'); + $qb->linkToStreamTags('st', 's.id_prim'); $qb->limitPaginate($since, $limit); $qb->andWhere($qb->exprLimitToDBField('type', Note::TYPE)); + $qb->andWhere($qb->exprLimitToDBField('hashtag', $hashtag, true, false, 'st')); $qb->limitToViewer('sd', 'f', true); $qb->andWhere($expr->eq('s.attributed_to_prim', 'ca.id_prim')); @@ -560,7 +567,7 @@ class StreamRequest extends StreamRequestBuilder { $qb->leftJoinStreamAction('sa'); // TODO: Sql optimisation - Create a table like stream_dest for to link 'hashtag' to 'stream_id' - $qb->andWhere($this->exprValueWithinJsonFormat($qb, 'hashtags', '' . $hashtag)); +// $qb->andWhere($this->exprValueWithinJsonFormat($qb, 'hashtags', '' . $hashtag)); return $this->getStreamsFromRequest($qb); } diff --git a/lib/Db/StreamTagsRequest.php b/lib/Db/StreamTagsRequest.php new file mode 100644 index 00000000..4b80d34a --- /dev/null +++ b/lib/Db/StreamTagsRequest.php @@ -0,0 +1,78 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Db; + + +use daita\MySmallPhpTools\Traits\TStringTools; +use Doctrine\DBAL\Exception\UniqueConstraintViolationException; +use OCA\Social\Model\ActivityPub\Stream; + + +/** + * Class StreamTagsRequest + * + * @package OCA\Social\Db + */ +class StreamTagsRequest extends StreamTagsRequestBuilder { + + + use TStringTools; + + + /** + * @param Stream $stream + */ + public function generateStreamTags(Stream $stream) { + $hashtags = $stream->getTags(); + + $this->miscService->log('$$$$ ' . json_encode($hashtags)); + foreach ($hashtags as $hashtag) { + $tag = $this->get('name', $hashtag); + if ($this->get('type', $hashtag) !== 'Hashtag' || $tag === '') { + continue; + } + + $tag = substr($tag, 1); + $qb = $this->getStreamTagsInsertSql(); + $streamId = $qb->prim($stream->getId()); + $qb->setValue('stream_id', $qb->createNamedParameter($streamId)); + $qb->setValue('hashtag', $qb->createNamedParameter($tag)); + try { + $qb->execute(); + } catch (UniqueConstraintViolationException $e) { + \OC::$server->getLogger() + ->log(1, 'Social - Duplicate hashtag on Stream ' . json_encode($stream)); + } + } + } + +} + diff --git a/lib/Db/StreamTagsRequestBuilder.php b/lib/Db/StreamTagsRequestBuilder.php new file mode 100644 index 00000000..25fe9f67 --- /dev/null +++ b/lib/Db/StreamTagsRequestBuilder.php @@ -0,0 +1,105 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OCA\Social\Db; + + +use daita\MySmallPhpTools\Traits\TArrayTools; +use OCP\DB\QueryBuilder\IQueryBuilder; + + +/** + * Class StreamDestRequestBuilder + * + * @package OCA\Social\Db + */ +class StreamTagsRequestBuilder extends CoreRequestBuilder { + + + use TArrayTools; + + + /** + * Base of the Sql Insert request + * + * @return SocialQueryBuilder + */ + protected function getStreamTagsInsertSql(): SocialQueryBuilder { + $qb = $this->getQueryBuilder(); + $qb->insert(self::TABLE_STREAM_TAGS); + + return $qb; + } + + + /** + * Base of the Sql Update request + * + * @return IQueryBuilder + */ + protected function getStreamTagsUpdateSql(): IQueryBuilder { + $qb = $this->getQueryBuilder(); + $qb->update(self::TABLE_STREAM_TAGS); + + return $qb; + } + + + /** + * Base of the Sql Select request for Shares + * + * @return IQueryBuilder + */ + protected function getStreamTagsSelectSql(): IQueryBuilder { + $qb = $this->getQueryBuilder(); + + /** @noinspection PhpMethodParametersCountMismatchInspection */ + $qb->select('st.stream_id', 'st.hashtag') + ->from(self::TABLE_STREAM_TAGS, 'st'); + + $this->defaultSelectAlias = 'st'; + + return $qb; + } + + + /** + * Base of the Sql Delete request + * + * @return IQueryBuilder + */ + protected function getStreamTagsDeleteSql(): IQueryBuilder { + $qb = $this->getQueryBuilder(); + $qb->delete(self::TABLE_STREAM_TAGS); + + return $qb; + } + +} + diff --git a/lib/Migration/Version0002Date20190916000001.php b/lib/Migration/Version0002Date20190916000001.php index 14894b6d..04bf2c5e 100644 --- a/lib/Migration/Version0002Date20190916000001.php +++ b/lib/Migration/Version0002Date20190916000001.php @@ -390,7 +390,7 @@ class Version0002Date20190916000001 extends SimpleMigrationStep { $insert->execute(); } catch (UniqueConstraintViolationException $e) { \OC::$server->getLogger() - ->log(3, 'Social - Duplicate recipient on Stream ' . json_encode($data)); + ->log(1, 'Social - Duplicate recipient on Stream ' . json_encode($data)); } } } diff --git a/lib/Migration/Version0002Date20190925000001.php b/lib/Migration/Version0002Date20190925000001.php index 50ea07bf..fc33b320 100644 --- a/lib/Migration/Version0002Date20190925000001.php +++ b/lib/Migration/Version0002Date20190925000001.php @@ -154,7 +154,7 @@ class Version0002Date20190925000001 extends SimpleMigrationStep { $update->set('stream_id_prim', $update->createNamedParameter(hash('sha512', $streamId))); $update->set('liked', $update->createNamedParameter($liked)); $update->set('boosted', $update->createNamedParameter($boosted)); - $update->setValue('replied', $update->createNamedParameter($replied)); + $update->set('replied', $update->createNamedParameter($replied)); $expr = $update->expr(); $update->where($expr->eq('id', $update->createNamedParameter($id))); diff --git a/lib/Migration/Version0002Date20191001000001.php b/lib/Migration/Version0002Date20191001000001.php new file mode 100644 index 00000000..86888a13 --- /dev/null +++ b/lib/Migration/Version0002Date20191001000001.php @@ -0,0 +1,177 @@ + + * @copyright 2018, Maxence Lange + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + + +namespace OCA\Social\Migration; + + +use Closure; +use Doctrine\DBAL\Exception\UniqueConstraintViolationException; +use Doctrine\DBAL\Schema\SchemaException; +use Exception; +use OCP\DB\ISchemaWrapper; +use OCP\IDBConnection; +use OCP\Migration\IOutput; +use OCP\Migration\SimpleMigrationStep; + + +/** + * Class Version0002Date20191001000001 + * + * @package OCA\Social\Migration + */ +class Version0002Date20191001000001 extends SimpleMigrationStep { + + + /** @var IDBConnection */ + private $connection; + + + /** + * @param IDBConnection $connection + */ + public function __construct(IDBConnection $connection) { + $this->connection = $connection; + } + + + /** + * @param IOutput $output + * @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper` + * @param array $options + * + * @return ISchemaWrapper + * @throws SchemaException + */ + public function changeSchema(IOutput $output, Closure $schemaClosure, array $options + ): ISchemaWrapper { + /** @var ISchemaWrapper $schema */ + $schema = $schemaClosure(); + + if (!$schema->hasTable('social_a2_stream_tags')) { + $table = $schema->createTable('social_a2_stream_tags'); + + $table->addColumn( + 'stream_id', 'string', + [ + 'notnull' => true, + 'length' => 128, + ] + ); + $table->addColumn( + 'hashtag', 'string', + [ + 'notnull' => false, + 'length' => 127, + ] + ); + + if (!$table->hasIndex('sh')) { + $table->addUniqueIndex(['stream_id', 'hashtag'], 'sh'); + } + } + + return $schema; + } + + + /** + * @param IOutput $output + * @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper` + * @param array $options + * + * @throws Exception + */ + public function postSchemaChange(IOutput $output, Closure $schemaClosure, array $options) { + /** @var ISchemaWrapper $schema */ + $schema = $schemaClosure(); + + $this->fillTableStreamHashtags($schema); + } + + /** + * @param ISchemaWrapper $schema + */ + private function fillTableStreamHashtags(ISchemaWrapper $schema) { + + $start = 0; + $limit = 1000; + while (true) { + $qb = $this->connection->getQueryBuilder(); + $qb->select('id', 'hashtags') + ->from('social_a2_stream') + ->setMaxResults(1000) + ->setFirstResult($start); + + $cursor = $qb->execute(); + $count = 0; + while ($data = $cursor->fetch()) { + $count++; + + $this->updateStreamTags($data); + } + $cursor->closeCursor(); + + $start += $count; + if ($count < $limit) { + break; + } + } + } + + + /** + * @param array $data + */ + private function updateStreamTags(array $data) { + if ($data['hashtags'] === '' || $data['hashtags'] === null) { + return; + } + + $id = $data['id']; + $tags = json_decode($data['hashtags'], true); + + foreach ($tags as $tag) { + if ($tag === '') { + continue; + } + $insert = $this->connection->getQueryBuilder(); + $insert->insert('social_a2_stream_tags'); + + $insert->setValue('stream_id', $insert->createNamedParameter(hash('sha512', $id))); + $insert->setValue('hashtag', $insert->createNamedParameter($tag)); + try { + $insert->execute(); + } catch (UniqueConstraintViolationException $e) { + } + } + + } + +}