import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:logging/logging.dart'; import 'package:result_monad/result_monad.dart'; import 'package:riverpod_annotation/riverpod_annotation.dart'; import 'package:stack_trace/stack_trace.dart'; import '../globals.dart'; import '../models/auth/profile.dart'; import '../models/connection.dart'; import '../models/exec_error.dart'; import '../models/networking/paged_response.dart'; import '../models/networking/pages_manager.dart'; import '../models/networking/paging_data.dart'; import '../models/user_notification.dart'; import '../serializers/mastodon/follow_request_mastodon_extensions.dart'; import '../utils/list_utils.dart'; import 'connection_manager_services.dart'; import 'direct_message_services.dart'; import 'feature_checker_services.dart'; import 'follow_requests_services.dart'; import 'networking/friendica_notifications_client_services.dart'; part 'notification_services.g.dart'; const _itemsPerQuery = 50; const _minimumDmsAndCrsUpdateDuration = Duration(seconds: 30); final _logger = Logger('NotificationManager'); @Riverpod(keepAlive: true) class _NotificationsStore extends _$NotificationsStore { final _values = {}; @override List build(Profile profile, NotificationType type) { _logger.fine('Build _NotificationStoreProvider($profile) for $type'); return _rval; } void upsert(UserNotification notification) { _values.remove(notification); _values.add(notification); state = _rval; } void clear() { _values.clear(); state = _rval; } void markAllRead() { final updated = _values.map((n) => n.copy(dismissed: true)).toList(); _values.clear(); _values.addAll(updated); state = _rval; } List get _rval => _values.toList()..sort(); @override bool updateShouldNotify( List previous, List next) { final rval = !listEqualsWithComparer( previous, next, equals: (u1, u2) => u1.exactlyEqual(u2), ); return rval; } } @riverpod List userNotificationsByType( Ref ref, Profile profile, NotificationType type, bool isRead) { _logger.fine('Build userNotificationsByTypeProvider($type,$isRead,$profile)'); final notifications = ref.watch(_NotificationsStoreProvider(profile, type)); return notifications.where((n) => n.dismissed == isRead).toList(); } @riverpod List allNotifications(Ref ref, Profile profile, bool isRead) { _logger.fine('Build allNotificationsProvider($profile)'); final notifications = []; for (NotificationType type in NotificationType.values) { notifications.addAll( ref.watch(userNotificationsByTypeProvider(profile, type, isRead))); } return notifications..sort(); } @riverpod bool hasNotifications(Ref ref, Profile profile, bool isRead) { _logger.info('Build hasNotifications($profile)'); var hasNotifications = false; // Go through all to watch all for changes for (NotificationType type in NotificationType.values) { hasNotifications |= ref .watch(userNotificationsByTypeProvider(profile, type, isRead)) .isNotEmpty; } return hasNotifications; } @riverpod bool hasAnyNotifications(Ref ref, Profile profile) { _logger.info('Build hasNotifications($profile)'); final hasRead = ref.watch(hasNotificationsProvider(profile, true)); final hasUnread = ref.watch(hasNotificationsProvider(profile, false)); return hasRead || hasUnread; } @riverpod (int low, int high) lowHighId(Ref ref, Profile profile, bool isRead) { _logger.fine('Build lowHighIdProvider($profile) for isRead? $isRead'); final notifications = []; for (NotificationType type in NotificationType.values) { notifications.addAll( ref.watch(userNotificationsByTypeProvider(profile, type, isRead))); } final result = calcLowHigh(notifications); _logger.finest( 'Result lowHighIdProvider($profile) for isRead? $isRead: $result'); return result; } @Riverpod(keepAlive: true) class NotificationsManager extends _$NotificationsManager { var lastDmsUpdate = DateTime(1900); var lastCrUpdate = DateTime(1900); @override Future build(Profile profile) async { _logger.info('Building'); await _initialize(); return true; } Future _initialize() async { final result = await loadUnreadNotifications(); final hasNoNotifications = !ref.read(hasAnyNotificationsProvider(profile)); if (result.isSuccess && hasNoNotifications) { await loadOlderNotifications(); } } void refreshNotifications() async { for (final t in NotificationType.values) { ref.read(_NotificationsStoreProvider(profile, t).notifier).clear(); } _initialize(); } Future clearConnectionRequestNotifications() async { _logger.info('clearConnectionRequestNotifications'); ref .read(_NotificationsStoreProvider( profile, NotificationType.follow_request) .notifier) .clear(); state = const AsyncData(true); } Future refreshConnectionRequestNotifications() async { _logger.info('refreshConnectionRequestNotifications'); clearConnectionRequestNotifications(); await _postFetchOperations( [], updateDms: false, updateFollowRequests: true, ); } Future refreshDms() async { _logger.info('refreshDms'); ref .read(_NotificationsStoreProvider( profile, NotificationType.direct_message) .notifier) .clear(); await _postFetchOperations( [], updateDms: true, updateFollowRequests: false, ); } FutureResult loadUnreadNotifications() async { final notificationsFromRefresh = []; final pm = _buildPageManager(ref, profile, false); final useActualRequests = ref.read(featureCheckProvider( profile, RelaticaFeatures.usingActualFollowRequests, )); var hasMore = true; var first = true; const maxCalls = 3; var count = 0; while (hasMore && count < maxCalls) { final result = first ? await pm.initialize(_itemsPerQuery) : await pm.nextFromEnd(); first = false; result.match( onSuccess: (nd) => _logger.fine('Got ${nd.data.length} notifications'), onError: (e) => _logger.severe( 'Error getting notification: $e', Trace.current(), )); final response = result.getValueOrElse(() => PagedResponse([])); response.data .where((n) => !useActualRequests || n.type != NotificationType.follow_request) .forEach(notificationsFromRefresh.add); hasMore = response.next != null; count++; } // filter out connection requests if going to use the real service for that when doing the query // get earliest and latest notification ID from unread notifications // query all notifications over that in page increments of 25 // query unread notifications in increments of 25 after the latest ID return await _postFetchOperations( notificationsFromRefresh, ).mapValue((value) => value.isNotEmpty); } FutureResult, ExecError> _postFetchOperations( List notificationsFromRefresh, { bool updateDms = true, bool updateFollowRequests = true, }) async { if (updateDms) { if (DateTime.now().difference(lastDmsUpdate) > _minimumDmsAndCrsUpdateDuration) { await ref .read(directMessageThreadIdsProvider(profile).notifier) .update(); lastDmsUpdate = DateTime.now(); } } final useActualRequests = ref.read(featureCheckProvider( profile, RelaticaFeatures.usingActualFollowRequests, )); if (updateFollowRequests) { if (useActualRequests) { if (DateTime.now().difference(lastCrUpdate) > _minimumDmsAndCrsUpdateDuration) { await ref.read(followRequestsProvider(profile).notifier).update(); lastCrUpdate = DateTime.now(); } } } final notifications = {}; notificationsFromRefresh.removeWhere((n) => n.type == NotificationType.direct_message || (useActualRequests && n.type == NotificationType.follow_request)); for (final n in notificationsFromRefresh) { notifications[n.id] = n; } for (final n in _buildUnreadMessageNotifications(useActualRequests)) { notifications[n.id] = n; } _processNewNotifications(notifications.values); return Result.ok(notifications.values.toList()); } FutureResult loadNewerNotifications() async { final hasNoNotifications = !ref.read(hasAnyNotificationsProvider(profile)); final useIsRead = !ref.read(hasNotificationsProvider(profile, false)); final (_, highestId) = ref.read(lowHighIdProvider(profile, useIsRead)); final pm = _buildPageManager( ref, profile, true, initialPages: hasNoNotifications ? [] : [ PagedResponse( [], previous: PagingData( minId: highestId, limit: _itemsPerQuery, ), ) ], ); final stillNoNotifications = !ref.read(hasAnyNotificationsProvider(profile)); final result = await (stillNoNotifications ? pm.initialize(_itemsPerQuery) : pm.previousFromBeginning()) .andThenAsync( (page) async => await _postFetchOperations(page.data), ) .withError( (error) => _logger.info('Error getting more updates: $error')); return result.mapValue((value) => value.isNotEmpty).execErrorCast(); } FutureResult loadOlderNotifications() async { final hasUnread = ref.read(hasNotificationsProvider(profile, false)); if (hasUnread) { final result = await _loadOlderUnreadNotifications(); final nonDmAndConnectionNotifications = result .getValueOrElse(() => []) .where((n) => n.type != NotificationType.follow_request && n.type != NotificationType.direct_message) .toList(); if (nonDmAndConnectionNotifications.isNotEmpty) { return Result.ok(true); } } return _loadOlderReadAndUnreadNotifications() .mapValue((value) => value.isNotEmpty); } FutureResult markSeen(UserNotification notification) async { _logger.fine('Marking Notification Seen: $notification'); final result = await ref .read(clearNotificationProvider(profile, notification).future) .withResult((_) { ref .read( _NotificationsStoreProvider(profile, notification.type).notifier) .upsert(notification.copy(dismissed: true)); }); return result.execErrorCast(); } FutureResult markAllAsRead() async { final result = await ref .read(clearNotificationsProvider(profile).future) .withResult((_) { for (final t in NotificationType.values) { ref .read(_NotificationsStoreProvider(profile, t).notifier) .markAllRead(); } }); return result.execErrorCast(); } List _buildUnreadMessageNotifications( bool useActualRequests) { final myId = profile.userId; final dmsResult = ref .watch(directMessageThreadIdsProvider(profile)) .map((id) => ref.read(directMessageThreadServiceProvider(profile, id))) .where((t) => !t.allSeen) .map((t) { final fromAccountId = t.participantIds.firstWhere((pid) => pid != myId); final fromAccount = ref .watch(connectionByIdProvider(profile, fromAccountId)) .getValueOrElse(() => Connection()); final latestMessage = t.messages.reduce((s, m) => s.createdAt > m.createdAt ? s : m); return UserNotification( id: (fromAccount.hashCode ^ t.parentUri.hashCode ^ t.title.hashCode) .toString(), type: NotificationType.direct_message, fromId: fromAccount.id, fromName: fromAccount.name, fromUrl: fromAccount.profileUrl, timestamp: latestMessage.createdAt, iid: t.parentUri, dismissed: false, content: '${fromAccount.name} sent you a direct message', link: ''); }); final followRequestResult = !useActualRequests ? [] : ref .watch(followRequestListProvider(profile)) .map((r) => r.toUserNotification()) .toList(); return [...dmsResult, ...followRequestResult]; } Future _processNewNotifications( Iterable notifications) async { final st = Stopwatch()..start(); for (final n in notifications) { if (st.elapsedMilliseconds > maxProcessingMillis) { await Future.delayed(processingSleep, () => st.reset()); } ref.read(_NotificationsStoreProvider(profile, n.type).notifier).upsert(n); } } FutureResult, ExecError> _loadOlderUnreadNotifications() async { _logger.finest('Loading Older Unread Notifications'); final (lowestId, _) = ref.read(lowHighIdProvider(profile, false)); final pm = _buildPageManager( ref, profile, false, initialPages: [ PagedResponse( [], next: PagingData( maxId: lowestId, limit: _itemsPerQuery, ), ) ], ); final result = await pm .nextFromEnd() .andThenAsync( (page) async => await _postFetchOperations(page.data), ) .withError( (error) => _logger.info('Error getting more updates: $error')); _logger.finest( 'Loaded Older Unread Notifications: ${result.getValueOrElse(() => []).length}'); return result.execErrorCast(); } FutureResult, ExecError> _loadOlderReadAndUnreadNotifications() async { _logger.finest('Loading Older Read and Unread Notifications'); final hasNoNotifications = !ref.read(hasAnyNotificationsProvider(profile)); final useIsRead = ref.read(hasNotificationsProvider(profile, true)); final (lowestId, _) = ref.read(lowHighIdProvider(profile, useIsRead)); final pm = _buildPageManager( ref, profile, true, initialPages: hasNoNotifications ? [] : [ PagedResponse( [], next: PagingData( maxId: lowestId, limit: _itemsPerQuery, ), ) ], ); final result = await (hasNoNotifications ? pm.initialize(_itemsPerQuery) : pm.nextFromEnd()) .andThenAsync( (page) async => await _postFetchOperations(page.data), ) .withError( (error) => _logger.info('Error getting more updates: $error')); _logger.finest( 'Loaded Older Read and Unread Notifications: ${result.getValueOrElse(() => []).length}'); return result.execErrorCast(); } } (int lowest, int highest) calcLowHigh(List notifications) { int highestNotificationId = -1; int lowestNotificationId = 0x7FFFFFFFFFFFFFFF; final ids = notifications .where((n) => n.type != NotificationType.direct_message && n.type != NotificationType.follow_request) .map((n) => int.parse(n.id)); for (var id in ids) { if (id > highestNotificationId) { highestNotificationId = id; } if (id < lowestNotificationId) { lowestNotificationId = id; } } return (lowestNotificationId, highestNotificationId); } PagesManager, String> _buildPageManager( Ref ref, Profile profile, bool includeAll, { List initialPages = const [], }) => PagesManager, String>( initialPages: initialPages, idMapper: (nn) => nn.map((n) => n.id).toList(), onRequest: (pd) async => await ref .read(notificationsClientProvider(profile, pd, includeAll).future), );