import 'dart:math'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:logging/logging.dart'; import 'package:result_monad/result_monad.dart'; import 'package:riverpod_annotation/riverpod_annotation.dart'; import 'package:stack_trace/stack_trace.dart'; import '../data/interfaces/connections_repo_intf.dart'; import '../data/objectbox/objectbox_cache.dart'; import '../data/objectbox/objectbox_connections_repo.dart'; import '../models/auth/profile.dart'; import '../models/connection.dart'; import '../models/exec_error.dart'; import '../models/networking/paging_data.dart'; import '../riverpod_controllers/networking/friendica_trending_client_services.dart'; import 'circles_repo_services.dart'; import 'networking/friendica_relationship_client_services.dart'; import 'persistent_info_services.dart'; part 'connection_manager_services.g.dart'; final _crLogger = Logger('ConnectionsRepoProvider'); @Riverpod(keepAlive: true) Future _connectionsRepo(Ref ref, Profile profile) async { _crLogger.info('Creating Connections repo for $profile'); final objectBox = await ObjectBoxCache.create( baseDir: 'profileboxcaches', subDir: profile.id, ); _crLogger.info('Object box object returned for $profile'); return ObjectBoxConnectionsRepo(objectBox); } final _crsLogger = Logger('ConnectionsRepoSyncProvider'); @Riverpod(keepAlive: true) class _ConnectionsRepoSync extends _$ConnectionsRepoSync { @override Result build(Profile profile) { _crsLogger.info('Build $profile'); ref.watch(_connectionsRepoProvider(profile).future).then((repo) { state = Result.ok(repo); ref.notifyListeners(); }); return Result.error(ExecError(type: ErrorType.riveprodProviderNotReady)); } } @riverpod class ConnectionsRepoClearer extends _$ConnectionsRepoClearer { @override bool build(Profile profile) { return true; } bool clear() { return ref .read(_connectionsRepoSyncProvider(profile)) .withResult((repo) => repo.clear()) .isSuccess; } } @riverpod Future connectionRepoInit(Ref ref, Profile profile) async { await ref.read(_connectionsRepoProvider(profile).future); return true; } @riverpod List knownUsersByName(Ref ref, Profile profile, String userName, {Set? matchingStatus}) { return ref.watch(_connectionsRepoSyncProvider(profile)).fold( onSuccess: (repo) { final knownUsers = repo.getKnownUsersByName(userName); if (matchingStatus == null) { return knownUsers; } return knownUsers .where((u) => matchingStatus.contains(u.status)) .toList(); }, onError: (_) => []); } @riverpod Future> myContacts(Ref ref, Profile profile) async { final repo = await ref.watch(_connectionsRepoProvider(profile).future); return repo.getMyContacts(); } final _cbiLogger = Logger('ConnectionByIdProvider'); @riverpod Result connectionById( Ref ref, Profile profile, String id, {bool forceUpdate = false}) { _cbiLogger.finest('Build for $id for $profile'); final result = ref .watch(_connectionsRepoSyncProvider(profile)) .andThen((repo) => repo.getById(id)) .transform((c) { if (forceUpdate) { ref .read(connectionModifierProvider(profile, c).notifier) .refreshConnection(true); } return c; }).withError((error) { if (error is ExecError && error.type == ErrorType.riveprodProviderNotReady) { return; } final tmpConnection = Connection(id: id); ref .read(connectionModifierProvider(profile, tmpConnection).notifier) .refreshConnection(true); }).execErrorCast(); return result; } final _cbhLogger = Logger('ConnectionByHandleProvider'); @riverpod Result connectionByHandle( Ref ref, Profile profile, String handle) { _cbhLogger.finest('Build for $handle for $profile'); return ref .watch(_connectionsRepoSyncProvider(profile)) .andThen((repo) => repo.getByHandle(handle)) .execErrorCast(); } final _cmpLogger = Logger('ConnectionModifierProvider'); @riverpod class ConnectionModifier extends _$ConnectionModifier { @override Connection build(Profile profile, Connection connection) { return connection; } Future upsertConnection(Connection update) async { final repo = await ref.read(_connectionsRepoProvider(profile).future); repo.getById(connection.id).match( onSuccess: (original) { late final Connection forUpsert; if (update.status == ConnectionStatus.unknown) { forUpsert = update.copy(status: original.status); } else { forUpsert = update; } repo.upsertConnection( forUpsert, ); state = forUpsert; }, onError: (_) { repo.upsertConnection(update); state = update; }, ); _sendUpdateNotifications(); return; } Future acceptFollowRequest() async { _cmpLogger.finest( 'Attempting to accept follow request ${connection.name}: ${connection.status}'); await ref .read(adjudicateFollowProvider( profile, connection, FollowAdjudication.authorize) .future) .match( onSuccess: (update) async { _cmpLogger .finest('Successfully followed ${update.name}: ${update.status}'); await upsertConnection(update); }, onError: (error) { _cmpLogger.severe( 'Error following ${connection.name}: $error', Trace.current(), ); }, ); } Future rejectFollowRequest() async { _cmpLogger.finest( 'Attempting to accept follow request ${connection.name}: ${connection.status}'); await ref .read(adjudicateFollowProvider( profile, connection, FollowAdjudication.reject) .future) .match( onSuccess: (update) async { _cmpLogger .finest('Successfully followed ${update.name}: ${update.status}'); await upsertConnection(update); }, onError: (error) { _cmpLogger.severe('Error following ${connection.name}: $error'); }, ); } Future ignoreFollowRequest() async { _cmpLogger.finest( 'Attempting to accept follow request ${connection.name}: ${connection.status}'); await ref .read(adjudicateFollowProvider( profile, connection, FollowAdjudication.ignore) .future) .match( onSuccess: (update) async { _cmpLogger .finest('Successfully followed ${update.name}: ${update.status}'); await upsertConnection(update); }, onError: (error) { _cmpLogger.severe('Error following ${connection.name}'); }, ); } Future follow() async { _cmpLogger.finest( 'Attempting to follow ${connection.name}: ${connection.status}'); await ref.read(followConnectionProvider(profile, connection).future).match( onSuccess: (update) async { _cmpLogger .finest('Successfully followed ${update.name}: ${update.status}'); await upsertConnection(update); }, onError: (error) { _cmpLogger.severe('Error following ${connection.name}'); }, ); } Future unfollow() async { _cmpLogger.finest( 'Attempting to unfollow ${connection.name}: ${connection.status}'); await ref .read(unFollowConnectionProvider(profile, connection).future) .match( onSuccess: (update) async { _cmpLogger .finest('Successfully unfollowed ${update.name}: ${update.status}'); await upsertConnection(update); }, onError: (error) { _cmpLogger.severe('Error following ${connection.name}'); }, ); } void _sendUpdateNotifications() { ref.invalidate(myContactsProvider); ref.invalidate(connectionByIdProvider(profile, connection.id)); ref.invalidate(connectionByHandleProvider(profile, connection.handle)); } Future fullRefresh({ bool withNotifications = true, }) async { await ref.read(circlesProvider(profile).notifier).refresh(); await ref .read(circlesProvider(profile).notifier) .refreshConnectionCircleData(connection.id, false); await ref .read(connectionModifierProvider(profile, connection).notifier) .refreshConnection(false); if (withNotifications) { ref.notifyListeners(); } } Future refreshConnection(bool withNotification) async { _cmpLogger.finest( 'Refreshing connection data for ${connection.name} with notifications? $withNotification'); await ref .read(connectionWithStatusClientProvider(profile, connection).future) .match( onSuccess: (update) async { await upsertConnection(update); if (withNotification) { _sendUpdateNotifications(); } }, onError: (error) { _cmpLogger .severe('Error getting updates for ${connection.name}: $error'); }, ); } } @Riverpod(keepAlive: true) class _UpdateStatusInternal extends _$UpdateStatusInternal { @override String build(Profile profile) { return ''; } void update(String update) { state = update; } } @riverpod String updateStatus(Ref ref, Profile profile) { return ref.watch(_updateStatusInternalProvider(profile)); } final _acuLogger = Logger('AllContactsUpdaterProvider'); @Riverpod(keepAlive: true) class AllContactsUpdater extends _$AllContactsUpdater { @override bool build(Profile profile) { return false; } Future updateAllContacts(bool backgroundUpdate) async { // TODO check if profile is no longer same and stop if it is between pagings and cancel if so if (state) { _acuLogger.info( 'updateAllContacts called but believe it is already running so skipping'); return; } state = true; _acuLogger.info('Updating all contacts'); final messageStart = backgroundUpdate ? 'Background ' : ''; final delay = backgroundUpdate ? const Duration(minutes: 5) : const Duration(seconds: 10); ref .read(_updateStatusInternalProvider(profile).notifier) .update('${messageStart}Updating Connections Data'); final originalTime = ref.read(persistentInfoProvider(profile)); await ref .read(persistentInfoProvider(profile).notifier) .updateLastMyConnectionUpdate(DateTime.now()); try { final results = {}; var moreResults = true; var maxId = -1; const limit = 50; var currentPage = const PagingData(limit: limit); final originalContacts = Set.from( ref.read(myContactsProvider(profile)).valueOrNull ?? []); num count = 0; while (moreResults) { await ref .read(getMyFollowersProvider(profile, currentPage).future) .match(onSuccess: (followers) { count += followers.data.length; for (final f in followers.data) { originalContacts.remove(f); final c = f.copy(status: ConnectionStatus.theyFollowYou); results[c.id] = c; ref .read(connectionModifierProvider(profile, c).notifier) .upsertConnection(c); int id = int.parse(f.id); maxId = max(maxId, id); } if (followers.next != null) { currentPage = followers.next!; } moreResults = followers.next != null; ref .read(_updateStatusInternalProvider(profile).notifier) .update('${messageStart}Updating Followers: $count processed'); }, onError: (error) { _acuLogger.severe( 'Error getting followers data: $error', Trace.current(), ); moreResults = false; }); await Future.delayed(delay); } moreResults = true; currentPage = const PagingData(limit: limit); count = 0; while (moreResults) { await ref .read(myFollowingClientProvider(profile, currentPage).future) .match(onSuccess: (following) { count += following.data.length; for (final f in following.data) { originalContacts.remove(f); final newStatus = results.containsKey(f.id) ? ConnectionStatus.mutual : ConnectionStatus.youFollowThem; final c = f.copy(status: newStatus); ref .read(connectionModifierProvider(profile, c).notifier) .upsertConnection(c); int id = int.parse(f.id); maxId = max(maxId, id); } if (following.next != null) { currentPage = following.next!; } moreResults = following.next != null; ref .read(_updateStatusInternalProvider(profile).notifier) .update('${messageStart}Updating Followings: $count processed'); }, onError: (error) { _acuLogger.severe( 'Error getting your following data: $error', Trace.current(), ); }); await Future.delayed(delay); } ref .read(_updateStatusInternalProvider(profile).notifier) .update('${messageStart}Pruning no longer followed contacts'); for (final noLongerFollowed in originalContacts) { final nf = noLongerFollowed.copy(status: ConnectionStatus.none); ref .read(connectionModifierProvider(profile, nf).notifier) .upsertConnection(nf); } await ref .read(persistentInfoProvider(profile).notifier) .updateLastMyConnectionUpdate(DateTime.now()); final contactsLength = (ref.read(myContactsProvider(profile)).valueOrNull ?? []).length; _acuLogger.info('Done updating # Contacts:$contactsLength'); } catch (e) { _acuLogger.severe( 'Exception thrown trying to update contacts: $e', Trace.current(), ); } await ref .read(persistentInfoProvider(profile).notifier) .updateLastMyConnectionUpdate(originalTime); state = false; } } @riverpod Future, ExecError>> suggestedConnections( Ref ref, Profile profile) async { final result = await ref.watch(suggestedConnectionsClientProvider(profile).future); await result.withResultAsync((suggestions) async { for (final s in suggestions) { await ref .read(connectionModifierProvider(profile, s).notifier) .upsertConnection(s); } }); return result; }