import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:logging/logging.dart'; import 'package:result_monad/result_monad.dart'; import 'package:riverpod_annotation/riverpod_annotation.dart'; import 'package:stack_trace/stack_trace.dart'; import '../globals.dart'; import '../models/auth/profile.dart'; import '../models/connection.dart'; import '../models/exec_error.dart'; import '../models/networking/paged_response.dart'; import '../models/networking/pages_manager.dart'; import '../models/networking/paging_data.dart'; import '../models/user_notification.dart'; import '../serializers/mastodon/follow_request_mastodon_extensions.dart'; import 'connection_manager_services.dart'; import 'direct_message_services.dart'; import 'feature_checker_services.dart'; import 'follow_requests_services.dart'; import 'networking/friendica_notifications_client_services.dart'; import 'settings_services.dart'; part 'notification_services.g.dart'; const _itemsPerQuery = 50; const _minimumDmsAndCrsUpdateDuration = Duration(seconds: 30); final _logger = Logger('NotificationManager'); @Riverpod(keepAlive: true) class NotificationsManager extends _$NotificationsManager { final dms = []; final connectionRequests = []; final unread = []; final read = []; var lastDmsUpdate = DateTime(1900); var lastCrUpdate = DateTime(1900); bool get hasNotifications => dms.isNotEmpty || connectionRequests.isNotEmpty || unread.isNotEmpty; List get notifications => [...connectionRequests, ...dms, ...unread, ...read]; @override Future, ExecError>> build( Profile profile) async { _logger.info('Building'); await _initialize(); return Result.ok(notifications); } Future _initialize() async { final result = await loadUnreadNotifications(false); if (result.isSuccess && unread.isEmpty && read.isEmpty) { await loadOlderNotifications(withListenerNotification: false); } } void refreshNotifications() async { clear(withListenerNotification: true); } Future clearConnectionRequestNotifications() async { _logger.info('clearConnectionRequestNotifications'); connectionRequests.clear(); state = AsyncData(Result.ok(notifications)); } Future refreshConnectionRequestNotifications() async { _logger.info('refreshConnectionRequestNotifications'); clearConnectionRequestNotifications(); await _postFetchOperations( [], true, updateDms: false, updateFollowRequests: true, ); } Future refreshDms() async { _logger.info('refreshDms'); dms.clear(); await _postFetchOperations( [], true, updateDms: true, updateFollowRequests: false, ); } void clear({bool withListenerNotification = true}) { dms.clear(); connectionRequests.clear(); unread.clear(); read.clear(); _initialize(); } FutureResult loadUnreadNotifications( bool withListenerNotification) async { final notificationsFromRefresh = []; final pm = _buildPageManager(ref, profile, false); final useActualRequests = ref.read(featureCheckProvider( profile, RelaticaFeatures.usingActualFollowRequests, )); var hasMore = true; var first = true; const maxCalls = 3; var count = 0; while (hasMore && count < maxCalls) { final result = first ? await pm.initialize(_itemsPerQuery) : await pm.nextFromEnd(); first = false; result.match( onSuccess: (nd) => _logger.fine('Got ${nd.data.length} notifications'), onError: (e) => _logger.severe( 'Error getting notification: $e', Trace.current(), )); final response = result.getValueOrElse(() => PagedResponse([])); response.data .where((n) => !useActualRequests || n.type != NotificationType.follow_request) .forEach(notificationsFromRefresh.add); hasMore = response.next != null; count++; } // filter out connection requests if going to use the real service for that when doing the query // get earliest and latest notification ID from unread notifications // query all notifications over that in page increments of 25 // query unread notifications in increments of 25 after the latest ID return await _postFetchOperations( notificationsFromRefresh, withListenerNotification, ).mapValue((value) => value.isNotEmpty); } FutureResult, ExecError> _postFetchOperations( List notificationsFromRefresh, bool withListenerNotification, { bool updateDms = true, bool updateFollowRequests = true, }) async { if (updateDms) { if (DateTime.now().difference(lastDmsUpdate) > _minimumDmsAndCrsUpdateDuration) { await ref .read(directMessageThreadIdsProvider(profile).notifier) .update(); lastDmsUpdate = DateTime.now(); } } final useActualRequests = ref.read(featureCheckProvider( profile, RelaticaFeatures.usingActualFollowRequests, )); if (updateFollowRequests) { if (useActualRequests) { if (DateTime.now().difference(lastCrUpdate) > _minimumDmsAndCrsUpdateDuration) { await ref.read(followRequestsProvider(profile).notifier).update(); lastCrUpdate = DateTime.now(); } } } final notifications = {}; notificationsFromRefresh.removeWhere((n) => n.type == NotificationType.direct_message || (useActualRequests && n.type == NotificationType.follow_request)); for (final n in notificationsFromRefresh) { notifications[n.id] = n; } for (final n in _buildUnreadMessageNotifications(useActualRequests)) { notifications[n.id] = n; } _processNewNotifications(notifications.values); return Result.ok(notifications.values.toList()); } FutureResult loadNewerNotifications({ bool withListenerNotification = true, }) async { final (_, highestId) = unread.isNotEmpty ? calcLowHigh(unread) : calcLowHigh(read); final pm = _buildPageManager( ref, profile, true, initialPages: read.isEmpty && unread.isEmpty ? [] : [ PagedResponse( [], previous: PagingData( minId: highestId, limit: _itemsPerQuery, ), ) ], ); final result = await (unread.isEmpty && read.isEmpty ? pm.initialize(_itemsPerQuery) : pm.previousFromBeginning()) .andThenAsync( (page) async => await _postFetchOperations(page.data, withListenerNotification), ) .withError( (error) => _logger.info('Error getting more updates: $error')); return result.mapValue((value) => value.isNotEmpty).execErrorCast(); } FutureResult loadOlderNotifications( {bool withListenerNotification = true}) async { if (unread.isNotEmpty) { final result = await _loadOlderUnreadNotifications(withListenerNotification); final nonDmAndConnectionNotifications = result .getValueOrElse(() => []) .where((n) => n.type != NotificationType.follow_request && n.type != NotificationType.direct_message) .toList(); if (nonDmAndConnectionNotifications.isNotEmpty) { return Result.ok(true); } } return _loadOlderReadAndUnreadNotifications(withListenerNotification) .mapValue((value) => value.isNotEmpty); } FutureResult markSeen(UserNotification notification) async { final result = await ref .read(clearNotificationProvider(profile, notification).future) .withResult((_) { unread.remove(notification); read.add(notification.copy(dismissed: true)); read.sort(); state = AsyncData(Result.ok(notifications)); }); return result.execErrorCast(); } FutureResult markAllAsRead() async { final result = await ref .read(clearNotificationsProvider(profile).future) .withResult((_) { unread.map((n) => n.copy(dismissed: true)).forEach(read.add); unread.clear(); read.sort(); state = AsyncData(Result.ok(notifications)); }); return result.execErrorCast(); } List _buildUnreadMessageNotifications( bool useActualRequests) { final myId = profile.userId; final dmsResult = ref .watch(directMessageThreadIdsProvider(profile)) .map((id) => ref.watch(directMessageThreadServiceProvider(profile, id))) .where((t) => !t.allSeen) .map((t) { final fromAccountId = t.participantIds.firstWhere((pid) => pid != myId); final fromAccount = ref .watch(connectionByIdProvider(profile, fromAccountId)) .getValueOrElse(() => Connection()); final latestMessage = t.messages.reduce((s, m) => s.createdAt > m.createdAt ? s : m); return UserNotification( id: (fromAccount.hashCode ^ t.parentUri.hashCode ^ t.title.hashCode) .toString(), type: NotificationType.direct_message, fromId: fromAccount.id, fromName: fromAccount.name, fromUrl: fromAccount.profileUrl, timestamp: latestMessage.createdAt, iid: t.parentUri, dismissed: false, content: '${fromAccount.name} sent you a direct message', link: ''); }); final followRequestResult = !useActualRequests ? [] : ref .watch(followRequestListProvider(profile)) .map((r) => r.toUserNotification()) .toList(); return [...dmsResult, ...followRequestResult]; } Future _processNewNotifications( Iterable notifications) async { final groupNotifications = ref.watch(notificationGroupingSettingProvider); final dmsMap = {}; final crMap = {}; final unreadMap = {}; final readMap = {}; final st = Stopwatch()..start(); for (int i = 0; i < dms.length; i++) { dmsMap[dms[i].id] = dms[i]; } if (st.elapsedMilliseconds > maxProcessingMillis) { await Future.delayed(processingSleep, () => st.reset()); } for (int i = 0; i < connectionRequests.length; i++) { crMap[connectionRequests[i].id] = connectionRequests[i]; } if (st.elapsedMilliseconds > maxProcessingMillis) { await Future.delayed(processingSleep, () => st.reset()); } for (int i = 0; i < unread.length; i++) { unreadMap[unread[i].id] = unread[i]; } if (st.elapsedMilliseconds > maxProcessingMillis) { await Future.delayed(processingSleep, () => st.reset()); } for (int i = 0; i < read.length; i++) { readMap[read[i].id] = read[i]; } dms.clear(); connectionRequests.clear(); unread.clear(); read.clear(); for (final n in notifications) { if (st.elapsedMilliseconds > maxProcessingMillis) { await Future.delayed(processingSleep, () => st.reset()); } dmsMap.remove(n.id); crMap.remove(n.id); unreadMap.remove(n.id); readMap.remove(n.id); if (n.dismissed) { readMap[n.id] = n; continue; } switch (n.type) { case NotificationType.direct_message: dmsMap[n.id] = n; break; case NotificationType.follow_request: crMap[n.id] = n; break; default: unreadMap[n.id] = n; } } dms ..addAll(dmsMap.values) ..sort(); connectionRequests ..addAll(crMap.values) ..sort(); unread ..addAll(unreadMap.values) ..sort( (n1, n2) => _compareByTypeStatusAndDate(n1, n2, groupNotifications)); read ..addAll(readMap.values) ..sort( (n1, n2) => _compareByTypeStatusAndDate(n1, n2, groupNotifications)); state = AsyncData(Result.ok(this.notifications)); } FutureResult, ExecError> _loadOlderUnreadNotifications( bool withListenerNotification) async { _logger.finest('Loading Older Unread Notifications'); final (lowestId, _) = calcLowHigh(unread); final pm = _buildPageManager( ref, profile, false, initialPages: [ PagedResponse( [], next: PagingData( maxId: lowestId, limit: _itemsPerQuery, ), ) ], ); final result = await pm .nextFromEnd() .andThenAsync( (page) async => await _postFetchOperations(page.data, withListenerNotification), ) .withError( (error) => _logger.info('Error getting more updates: $error')); _logger.finest( 'Loaded Older Unread Notifications: ${result.getValueOrElse(() => []).length}'); return result.execErrorCast(); } FutureResult, ExecError> _loadOlderReadAndUnreadNotifications( bool withListenerNotification) async { _logger.finest('Loading Older Read and Unread Notifications'); final (lowestId, _) = read.isNotEmpty ? calcLowHigh(read) : calcLowHigh(unread); final pm = _buildPageManager( ref, profile, true, initialPages: read.isEmpty && unread.isEmpty ? [] : [ PagedResponse( [], next: PagingData( maxId: lowestId, limit: _itemsPerQuery, ), ) ], ); final result = await (read.isEmpty && unread.isEmpty ? pm.initialize(_itemsPerQuery) : pm.nextFromEnd()) .andThenAsync( (page) async => await _postFetchOperations(page.data, withListenerNotification), ) .withError( (error) => _logger.info('Error getting more updates: $error')); _logger.finest( 'Loaded Older Read and Unread Notifications: ${result.getValueOrElse(() => []).length}'); return result.execErrorCast(); } } (int lowest, int highest) calcLowHigh(List notifications) { int highestNotificationId = -1; int lowestNotificationId = 0x7FFFFFFFFFFFFFFF; final ids = notifications .where((n) => n.type != NotificationType.direct_message && n.type != NotificationType.follow_request) .map((n) => int.parse(n.id)); for (var id in ids) { if (id > highestNotificationId) { highestNotificationId = id; } if (id < lowestNotificationId) { lowestNotificationId = id; } } return (lowestNotificationId, highestNotificationId); } PagesManager, String> _buildPageManager( Ref ref, Profile profile, bool includeAll, { List initialPages = const [], }) => PagesManager, String>( initialPages: initialPages, idMapper: (nn) => nn.map((n) => n.id).toList(), onRequest: (pd) async => await ref .read(notificationsClientProvider(profile, pd, includeAll).future), ); int _compareByTypeStatusAndDate( UserNotification n1, UserNotification n2, bool groupNotifications) { final n1Weight = _notificationTypeToWeight(n1.type); final n2Weight = _notificationTypeToWeight(n2.type); if (!groupNotifications || n1Weight == n2Weight) { return n1.compareTo(n2); } return (n2Weight - n1Weight).sign.toInt(); } num _notificationTypeToWeight(NotificationType type) { return switch (type) { NotificationType.follow_request => 1000, NotificationType.follow => 100, NotificationType.direct_message => 50, NotificationType.mention => 10, NotificationType.status => 4, NotificationType.reshare => 3, NotificationType.reblog => 3, NotificationType.favourite => 2, NotificationType.unknown => 1, }; }