Merge branch 'notifications-overhaul' into 'main'

Refactor notifications system

See merge request mysocialportal/relatica!43
codemagic-setup
HankG 2023-08-04 17:15:16 +00:00
commit 3dbcc9622b
7 zmienionych plików z 264 dodań i 191 usunięć

Wyświetl plik

@ -421,9 +421,10 @@ class NotificationsClient extends FriendicaClient {
NotificationsClient(super.credentials) : super();
FutureResult<PagedResponse<List<UserNotification>>, ExecError>
getNotifications(PagingData page) async {
getNotifications(PagingData page, bool includeAll) async {
_networkStatusService.startNotificationUpdate();
final url = 'https://$serverName/api/v1/notifications?include_all=true';
final url =
'https://$serverName/api/v1/notifications?include_all=$includeAll';
final request = Uri.parse('$url&${page.toQueryParameters()}');
_logger.finest(() => 'Getting new notifications');
final result =

Wyświetl plik

@ -1,12 +1,14 @@
import 'connection.dart';
class FollowRequest {
final String id;
final Connection connection;
final DateTime createdAt;
final int createdAtEpochSeconds;
const FollowRequest({
required this.id,
required this.connection,
required this.createdAt,
required this.createdAtEpochSeconds,
});
@override
@ -14,8 +16,11 @@ class FollowRequest {
identical(this, other) ||
other is FollowRequest &&
runtimeType == other.runtimeType &&
connection == other.connection;
id == other.id &&
connection == other.connection &&
createdAtEpochSeconds == other.createdAtEpochSeconds;
@override
int get hashCode => connection.hashCode;
int get hashCode =>
id.hashCode ^ connection.hashCode ^ createdAtEpochSeconds.hashCode;
}

Wyświetl plik

@ -75,6 +75,22 @@ class UserNotification implements Comparable<UserNotification> {
required this.link,
});
UserNotification copy({
bool? dismissed,
}) =>
UserNotification(
id: id,
type: type,
fromId: fromId,
fromName: fromName,
fromUrl: fromUrl,
timestamp: timestamp,
iid: iid,
dismissed: dismissed ?? this.dismissed,
content: content,
link: link,
);
@override
String toString() {
return 'UserNotification{id: $id, seen: $dismissed, fromName: $fromName, content: $content}';
@ -92,4 +108,14 @@ class UserNotification implements Comparable<UserNotification> {
return -1;
}
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is UserNotification &&
runtimeType == other.runtimeType &&
id == other.id;
@override
int get hashCode => id.hashCode;
}

Wyświetl plik

@ -262,6 +262,6 @@ class _FollowRequestAdjudicationScreenState
followRequestsManager.update();
getIt<ActiveProfileSelector<NotificationsManager>>()
.activeEntry
.andThenSuccess((m) => m.updateNotifications());
.andThenSuccess((m) => m.refreshNotifications());
}
}

Wyświetl plik

@ -19,9 +19,8 @@ class NotificationsScreen extends StatelessWidget {
const NotificationsScreen({super.key});
Future<void> update(NotificationsManager manager) async {
await manager.updateNotifications();
await manager.loadNewerNotifications();
void update(NotificationsManager manager) {
manager.refreshNotifications();
}
@override
@ -61,9 +60,7 @@ class NotificationsScreen extends StatelessWidget {
)),
);
} else {
final unreadCount = notifications.where((e) => !e.dismissed).length;
title =
'Notifications'; //TODO wire in the summary count data if has that endpoint
title = 'Notifications';
body = RefreshIndicator(
onRefresh: () async {
update(manager);

Wyświetl plik

@ -2,24 +2,35 @@ import 'package:uuid/uuid.dart';
import '../../models/follow_request.dart';
import '../../models/user_notification.dart';
import '../../utils/dateutils.dart';
import 'connection_mastodon_extensions.dart';
extension FollowRequestMastodonExtension on FollowRequest {
static FollowRequest fromJson(Map<String, dynamic> json) {
final connection = ConnectionMastodonExtensions.fromJson(json);
final createdAt =
DateTime.tryParse(json['created_at'] ?? '') ?? DateTime.now();
return FollowRequest(connection: connection, createdAt: createdAt);
final id = json['id'] ?? Uuid().v4();
final int timestamp = json.containsKey('created_at')
? OffsetDateTimeUtils.epochSecTimeFromTimeZoneString(json['created_at'])
.fold(
onSuccess: (value) => value,
onError: (error) => 0,
)
: 0;
return FollowRequest(
id: id,
connection: connection,
createdAtEpochSeconds: timestamp,
);
}
UserNotification toUserNotification() {
return UserNotification(
id: Uuid().v4(),
id: id,
type: NotificationType.follow_request,
fromId: connection.id,
fromName: connection.name,
fromUrl: connection.profileUrl,
timestamp: createdAt.millisecondsSinceEpoch,
timestamp: createdAtEpochSeconds,
iid: '',
dismissed: false,
content:

Wyświetl plik

@ -20,34 +20,31 @@ import 'follow_requests_manager.dart';
import 'network_status_service.dart';
class NotificationsManager extends ChangeNotifier {
static const itemsPerQuery = 50;
static const minimumDmsAndCrsUpdateDuration = Duration(seconds: 30);
static final _logger = Logger('NotificationManager');
late final PagesManager<List<UserNotification>, String> _pm;
final Profile profile;
final dms = <UserNotification>[];
final connectionRequests = <UserNotification>[];
final unread = <UserNotification>[];
final read = <UserNotification>[];
var lastDmsUpdate = DateTime(1900);
var lastCrUpdate = DateTime(1900);
NotificationsManager(this.profile) {
_pm = PagesManager<List<UserNotification>, String>(
idMapper: (nn) => nn.map((n) => n.id).toList(),
onRequest: (pd) async =>
await _clientGetNotificationsRequest(profile, pd));
}
NotificationsManager(this.profile);
var _firstLoad = true;
List<UserNotification> get notifications {
if (_firstLoad) {
updateNotifications();
loadUnreadNotifications(true);
_firstLoad = false;
}
return [...connectionRequests, ...dms, ...unread, ...read];
}
void clear() {
_pm.clear();
void clear({bool withListenerNotification = true}) {
dms.clear();
connectionRequests.clear();
unread.clear();
@ -56,148 +53,148 @@ class NotificationsManager extends ChangeNotifier {
notifyListeners();
}
FutureResult<List<UserNotification>, ExecError> updateNotifications() async {
const initialPull = 25;
void refreshNotifications() async {
clear(withListenerNotification: false);
await loadUnreadNotifications(false);
if (unread.isEmpty && read.isEmpty) {
await loadOlderNotifications(withListenerNotification: false);
}
notifyListeners();
}
FutureResult<List<UserNotification>, ExecError> loadUnreadNotifications(
bool withListenerNotification) async {
final notificationsFromRefresh = <UserNotification>[];
if (_pm.pages.isEmpty) {
final result = await _pm.initialize(initialPull);
result.andThenSuccess(
(response) => notificationsFromRefresh.addAll(response.data));
} else {
for (var i = 0; i < _pm.pages.length; i++) {
if (i > 0 && i == _pm.pages.length - 1) {
continue;
}
final page = _pm.pages[i];
if (i == 0) {
PagingData? pd;
bool initializedFirstPage = false;
if (page.next != null) {
final response = await _clientGetNotificationsRequest(
profile,
page.next!,
);
response.match(
onSuccess: (response) => pd = response.previous,
onError: (error) =>
_logger.severe('Error getting previous page: $error'));
if (pd != null) {
final response = await _clientGetNotificationsRequest(
profile,
pd!,
);
response.match(
onSuccess: (response) {
initializedFirstPage = true;
notificationsFromRefresh.addAll(response.data);
},
onError: (error) =>
_logger.severe('Error getting previous page: $error'));
} else if (pd == null && page.previous != null) {
final response = await _clientGetNotificationsRequest(
profile,
page.previous!,
).andThenAsync((previousData) async => previousData.next != null
? await _clientGetNotificationsRequest(
profile,
previousData.next!,
)
: buildErrorResult(
type: ErrorType.rangeError,
message: 'No "next" page from previous data either'));
response.match(
onSuccess: (response) {
initializedFirstPage = true;
notificationsFromRefresh.addAll(response.data);
},
onError: (error) =>
_logger.severe('Error getting previous page: $error'));
} else if (pd == null && page.previous == null) {
_logger.severe(
'Next page returned no results and no previous page so will need to re-initalize');
}
} else {
_logger.severe(
'There is no next page to query so will be forced to reset');
}
if (!initializedFirstPage) {
_logger.severe(
'Unable to determine call to rebuild initial page so resetting');
_pm.clear();
final result = await _pm.initialize(initialPull);
result.andThenSuccess(
(response) => notificationsFromRefresh.addAll(response.data));
}
}
final pm = _buildPageManager(profile, false);
final useActualRequests = getIt<FriendicaVersionChecker>()
.canUseFeature(RelaticaFeatures.usingActualFollowRequests);
var hasMore = true;
var first = true;
while (hasMore) {
final result =
first ? await pm.initialize(itemsPerQuery) : await pm.nextFromEnd();
if (page.next == null) {
if (i != _pm.pages.length - 2) {
_logger
.severe('No forward paging data in middle page but expected');
}
continue;
}
final response = await _clientGetNotificationsRequest(
profile,
page.next!,
);
response.match(
onSuccess: (response) =>
notificationsFromRefresh.addAll(response.data),
onError: (error) =>
_logger.severe('Error getting next page: $error'));
}
first = false;
result.match(
onSuccess: (nd) => print('Got ${nd.data.length} notifications'),
onError: (e) => debugPrint('Error getting notification: $e'));
final response = result.getValueOrElse(() => PagedResponse([]));
response.data
.where((n) =>
!useActualRequests || n.type != NotificationType.follow_request)
.forEach(notificationsFromRefresh.add);
hasMore = response.next != null;
}
return await _postFetchOperations(notificationsFromRefresh, true);
// filter out connection requests if going to use the real service for that when doing the query
// get earliest and latest notification ID from unread notifications
// query all notifications over that in page increments of 25
// query unread notifications in increments of 25 after the latest ID
return await _postFetchOperations(
notificationsFromRefresh,
withListenerNotification,
);
}
FutureResult<List<UserNotification>, ExecError>
loadNewerNotifications() async {
final result = await _pm
.previousFromBeginning()
FutureResult<List<UserNotification>, ExecError> loadNewerNotifications({
bool withListenerNotification = true,
}) async {
final (_, highestId) =
unread.isNotEmpty ? calcLowHigh(unread) : calcLowHigh(read);
final pm = await _buildPageManager(
profile,
true,
initialPages: [
PagedResponse(
<String>[],
next: PagingData(minId: highestId),
)
],
);
final result = await (unread.isEmpty && read.isEmpty
? pm.initialize(itemsPerQuery)
: pm.previousFromBeginning())
.andThenAsync(
(page) async => await _postFetchOperations(page.data, false),
(page) async =>
await _postFetchOperations(page.data, withListenerNotification),
)
.withError(
(error) => _logger.info('Error getting more updates: $error'));
return result.execErrorCast();
}
FutureResult<List<UserNotification>, ExecError>
loadOlderNotifications() async {
final result = await _pm
.nextFromEnd()
.andThenAsync(
(page) async => await _postFetchOperations(page.data, false),
)
.withError(
(error) => _logger.info('Error getting more updates: $error'));
return result.execErrorCast();
FutureResult<List<UserNotification>, ExecError> loadOlderNotifications(
{bool withListenerNotification = true}) async {
final (lowestId, _) =
read.isNotEmpty ? calcLowHigh(read) : calcLowHigh(unread);
final pm = _buildPageManager(
profile,
true,
initialPages: read.isEmpty
? []
: [
PagedResponse(
<String>[],
next: PagingData(maxId: lowestId),
)
],
);
final notifications = <UserNotification>[];
if (read.isEmpty) {
var hasReadNotification = false;
var hasMorePages = false;
do {
await (notifications.isEmpty
? pm.initialize(itemsPerQuery)
: pm.nextFromEnd())
.match(onSuccess: (r) {
notifications.addAll(r.data);
hasMorePages = r.next != null;
hasReadNotification = r.data.map((e) => e.dismissed).firstWhere(
(t) => t == true,
orElse: () => false,
);
}, onError: (e) {
hasMorePages = false;
print('Error getting older notifications: $e');
});
} while (!hasReadNotification && hasMorePages);
} else {
await pm.nextFromEnd().withResult((r) => notifications.addAll(r.data));
}
return _postFetchOperations(notifications, withListenerNotification);
}
FutureResult<bool, ExecError> markSeen(UserNotification notification) async {
final result =
await NotificationsClient(profile).clearNotification(notification);
if (result.isSuccess) {
final result = await NotificationsClient(profile)
.clearNotification(notification)
.withResult((_) {
unread.remove(notification);
read.add(notification.copy(dismissed: true));
read.sort();
notifyListeners();
}
});
updateNotifications();
return result;
return result.execErrorCast();
}
FutureResult<List<UserNotification>, ExecError> markAllAsRead() async {
FutureResult<bool, ExecError> markAllAsRead() async {
final result =
await NotificationsClient(getIt<AccountsService>().currentProfile)
.clearNotifications();
if (result.isFailure) {
return result.errorCast();
}
.clearNotifications()
.withResult((_) {
unread.map((n) => n.copy(dismissed: true)).forEach(read.add);
unread.clear();
read.sort();
notifyListeners();
});
return updateNotifications();
return result.execErrorCast();
}
List<UserNotification> buildUnreadMessageNotifications(
@ -238,22 +235,32 @@ class NotificationsManager extends ChangeNotifier {
return [...dmsResult, ...followRequestResult];
}
void updateNotification(UserNotification notification) {}
FutureResult<List<UserNotification>, ExecError> _postFetchOperations(
List<UserNotification> notificationsFromRefresh,
bool clearAtStart,
bool withListenerNotification,
) async {
getIt<NetworkStatusService>().startNotificationUpdate();
await getIt<ActiveProfileSelector<DirectMessageService>>()
.getForProfile(profile)
.transformAsync((dms) async => await dms.updateThreads());
if (DateTime.now().difference(lastDmsUpdate) >
minimumDmsAndCrsUpdateDuration) {
await getIt<ActiveProfileSelector<DirectMessageService>>()
.getForProfile(profile)
.transformAsync((dms) async => await dms.updateThreads());
lastDmsUpdate = DateTime.now();
}
final useActualRequests = getIt<FriendicaVersionChecker>()
.canUseFeature(RelaticaFeatures.usingActualFollowRequests);
if (useActualRequests) {
await getIt<ActiveProfileSelector<FollowRequestsManager>>()
.getForProfile(profile)
.transformAsync((fm) async => fm.update());
if (DateTime.now().difference(lastCrUpdate) >
minimumDmsAndCrsUpdateDuration) {
await getIt<ActiveProfileSelector<FollowRequestsManager>>()
.getForProfile(profile)
.transformAsync((fm) async => fm.update());
lastCrUpdate = DateTime.now();
}
}
final notifications = <String, UserNotification>{};
@ -270,51 +277,51 @@ class NotificationsManager extends ChangeNotifier {
notifications[n.id] = n;
}
_processNewNotifications(notifications.values, clearAtStart: clearAtStart);
_processNewNotifications(notifications.values);
notifyListeners();
if (withListenerNotification) {
notifyListeners();
}
return Result.ok(notifications.values.toList());
}
Future<void> _processNewNotifications(
Iterable<UserNotification> notifications, {
bool clearAtStart = false,
}) async {
Iterable<UserNotification> notifications) async {
final dmsMap = <String, UserNotification>{};
final crMap = <String, UserNotification>{};
final unreadMap = <String, UserNotification>{};
final readMap = <String, UserNotification>{};
final st = Stopwatch()..start();
if (!clearAtStart) {
for (int i = 0; i < dms.length; i++) {
dmsMap[dms[i].id] = dms[i];
}
if (st.elapsedMilliseconds > maxProcessingMillis) {
await Future.delayed(processingSleep, () => st.reset());
}
for (int i = 0; i < connectionRequests.length; i++) {
crMap[connectionRequests[i].id] = connectionRequests[i];
}
if (st.elapsedMilliseconds > maxProcessingMillis) {
await Future.delayed(processingSleep, () => st.reset());
}
for (int i = 0; i < unread.length; i++) {
unreadMap[unread[i].id] = unread[i];
}
if (st.elapsedMilliseconds > maxProcessingMillis) {
await Future.delayed(processingSleep, () => st.reset());
}
for (int i = 0; i < read.length; i++) {
readMap[read[i].id] = read[i];
}
for (int i = 0; i < dms.length; i++) {
dmsMap[dms[i].id] = dms[i];
}
if (st.elapsedMilliseconds > maxProcessingMillis) {
await Future.delayed(processingSleep, () => st.reset());
}
for (int i = 0; i < connectionRequests.length; i++) {
crMap[connectionRequests[i].id] = connectionRequests[i];
}
if (st.elapsedMilliseconds > maxProcessingMillis) {
await Future.delayed(processingSleep, () => st.reset());
}
for (int i = 0; i < unread.length; i++) {
unreadMap[unread[i].id] = unread[i];
}
if (st.elapsedMilliseconds > maxProcessingMillis) {
await Future.delayed(processingSleep, () => st.reset());
}
for (int i = 0; i < read.length; i++) {
readMap[read[i].id] = read[i];
}
dms.clear();
connectionRequests.clear();
unread.clear();
@ -337,7 +344,6 @@ class NotificationsManager extends ChangeNotifier {
case NotificationType.direct_message:
dmsMap[n.id] = n;
break;
case NotificationType.follow:
case NotificationType.follow_request:
crMap[n.id] = n;
break;
@ -359,9 +365,36 @@ class NotificationsManager extends ChangeNotifier {
..addAll(readMap.values)
..sort();
}
static FutureResult<PagedResponse<List<UserNotification>>, ExecError>
_clientGetNotificationsRequest(Profile profile, PagingData page) async {
return NotificationsClient(profile).getNotifications(page);
}
}
(int lowest, int highest) calcLowHigh(List<UserNotification> notifications) {
int highestNotificationId = -1;
int lowestNotificationId = 0x7FFFFFFFFFFFFFFF;
final ids = notifications
.where((n) =>
n.type != NotificationType.direct_message &&
n.type != NotificationType.follow_request)
.map((n) => int.parse(n.id));
for (var id in ids) {
if (id > highestNotificationId) {
highestNotificationId = id;
}
if (id < lowestNotificationId) {
lowestNotificationId = id;
}
}
return (lowestNotificationId, highestNotificationId);
}
PagesManager<List<UserNotification>, String> _buildPageManager(
Profile profile, bool includeAll,
{List<PagedResponse> initialPages = const []}) =>
PagesManager<List<UserNotification>, String>(
initialPages: initialPages,
idMapper: (nn) => nn.map((n) => n.id).toList(),
onRequest: (pd) async =>
await NotificationsClient(profile).getNotifications(pd, includeAll),
);