Improve profile fetching for large groups.

fork-5.53.8
Cody Henthorne 2022-02-09 16:01:56 -05:00 zatwierdzone przez GitHub
rodzic bb1e6ffae0
commit 14db5ce349
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
5 zmienionych plików z 94 dodań i 61 usunięć

Wyświetl plik

@ -152,10 +152,13 @@ public class TextSecureIdentityKeyStore implements IdentityKeyStore {
public @NonNull Optional<IdentityRecord> getIdentityRecord(@NonNull RecipientId recipientId) {
Recipient recipient = Recipient.resolved(recipientId);
return getIdentityRecord(recipient);
}
public @NonNull Optional<IdentityRecord> getIdentityRecord(@NonNull Recipient recipient) {
if (recipient.hasServiceIdentifier()) {
IdentityStoreRecord record = cache.get(recipient.requireServiceId());
return Optional.fromNullable(record).transform(r -> r.toIdentityRecord(recipientId));
return Optional.fromNullable(record).transform(r -> r.toIdentityRecord(recipient.getId()));
} else {
Log.w(TAG, "[getIdentityRecord] No serviceId for " + recipient.getId());
return Optional.absent();

Wyświetl plik

@ -8,6 +8,8 @@ import androidx.annotation.VisibleForTesting;
import org.jetbrains.annotations.NotNull;
import org.signal.core.util.concurrent.SignalExecutors;
import org.thoughtcrime.securesms.database.model.MessageId;
import org.thoughtcrime.securesms.recipients.Recipient;
import org.thoughtcrime.securesms.recipients.RecipientId;
import org.thoughtcrime.securesms.util.concurrent.SerialExecutor;
import java.util.HashMap;
@ -37,6 +39,7 @@ public class DatabaseObserver {
private static final String KEY_MESSAGE_UPDATE = "MessageUpdate:";
private static final String KEY_MESSAGE_INSERT = "MessageInsert:";
private static final String KEY_NOTIFICATION_PROFILES = "NotificationProfiles";
private static final String KEY_RECIPIENT = "Recipient";
private final Application application;
private final Executor executor;
@ -253,6 +256,12 @@ public class DatabaseObserver {
});
}
public void notifyRecipientChanged(@NonNull RecipientId recipientId) {
runPostSuccessfulTransaction(KEY_RECIPIENT + recipientId.serialize(), () -> {
Recipient.live(recipientId).refresh();
});
}
private void runPostSuccessfulTransaction(@NonNull String dedupeKey, @NonNull Runnable runnable) {
SignalDatabase.runPostSuccessfulTransaction(dedupeKey, () -> {
executor.execute(runnable);

Wyświetl plik

@ -467,7 +467,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
if (transactionSuccessful) {
if (recipientsNeedingRefresh.isNotEmpty()) {
recipientsNeedingRefresh.forEach { Recipient.live(it).refresh() }
recipientsNeedingRefresh.forEach { ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(it) }
RetrieveProfileJob.enqueue(recipientsNeedingRefresh.toSet())
}
@ -752,7 +752,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
fun markNeedsSync(recipientId: RecipientId) {
rotateStorageId(recipientId)
Recipient.live(recipientId).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(recipientId)
}
fun applyStorageIdUpdates(storageIds: Map<RecipientId, StorageId>) {
@ -772,7 +772,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
for (id in storageIds.keys) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -850,7 +850,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
threads.applyStorageSyncUpdate(recipientId, update.new)
Recipient.live(recipientId).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(recipientId)
}
fun applyStorageSyncGroupV1Insert(insert: SignalGroupV1Record) {
@ -858,7 +858,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
val recipientId = RecipientId.from(id)
threads.applyStorageSyncUpdate(recipientId, insert)
Recipient.live(recipientId).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(recipientId)
}
fun applyStorageSyncGroupV1Update(update: StorageRecordUpdate<SignalGroupV1Record>) {
@ -1085,7 +1085,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
writableDatabase.update(TABLE_NAME, values, where, args)
for (recipientId in updated) {
Recipient.live(recipientId).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(recipientId)
}
}
}
@ -1112,7 +1112,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
writableDatabase.update(TABLE_NAME, values, where, args)
for (recipientId in updated) {
Recipient.live(recipientId).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(recipientId)
}
}
}
@ -1153,7 +1153,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
database.update(TABLE_NAME, values, where, args)
for (id in toUpdate) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1163,7 +1163,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(CUSTOM_CHAT_COLORS_ID, ChatColors.Id.NotSet.longValue)
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1173,7 +1173,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(CUSTOM_CHAT_COLORS_ID, color.id.longValue)
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1182,7 +1182,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(DEFAULT_SUBSCRIPTION_ID, defaultSubscriptionId)
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1191,7 +1191,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(FORCE_SMS_SELECTION, if (forceSmsSelection) 1 else 0)
}
if (update(id, contentValues)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1201,7 +1201,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
if (update(id, values)) {
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1210,7 +1210,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(MESSAGE_RINGTONE, notification?.toString())
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1219,7 +1219,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(CALL_RINGTONE, ringtone?.toString())
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1228,7 +1228,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(MESSAGE_VIBRATE, enabled.id)
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1237,7 +1237,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(CALL_VIBRATE, enabled.id)
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1248,7 +1248,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
if (update(id, values)) {
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
StorageSyncHelper.scheduleSyncForDataChange()
@ -1275,7 +1275,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
for (id in ids) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
StorageSyncHelper.scheduleSyncForDataChange()
@ -1301,7 +1301,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
writableDatabase.update(TABLE_NAME, values, query, args)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
fun setExpireMessages(id: RecipientId, expiration: Int) {
@ -1309,7 +1309,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(MESSAGE_EXPIRATION_TIME, expiration)
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1318,7 +1318,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(UNIDENTIFIED_ACCESS_MODE, unidentifiedAccessMode.mode)
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1360,7 +1360,8 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyNotificationProfileObservers()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1377,7 +1378,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1387,7 +1388,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
if (update(id, values)) {
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
StorageSyncHelper.scheduleSyncForDataChange()
}
}
@ -1415,7 +1416,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
if (update(updateQuery, valuesToSet)) {
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
StorageSyncHelper.scheduleSyncForDataChange()
return true
}
@ -1439,7 +1440,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
if (writableDatabase.update(TABLE_NAME, valuesToSet, selection, args) > 0) {
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
return true
} else {
return false
@ -1467,7 +1468,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
val updated = update(updateQuery, values)
if (updated) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
return updated
@ -1478,7 +1479,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
values.putNull(PROFILE_KEY_CREDENTIAL)
if (update(id, values)) {
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1554,7 +1555,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(SYSTEM_JOINED_NAME, systemContactName)
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1566,7 +1567,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
if (update(id, contentValues)) {
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
StorageSyncHelper.scheduleSyncForDataChange()
}
}
@ -1576,7 +1577,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(SIGNAL_PROFILE_AVATAR, profileAvatar)
}
if (update(id, contentValues)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
if (id == Recipient.self().id) {
rotateStorageId(id)
StorageSyncHelper.scheduleSyncForDataChange()
@ -1591,7 +1592,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
if (update(id, contentValues)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1610,7 +1611,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
if (profiledUpdated) {
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
StorageSyncHelper.scheduleSyncForDataChange()
}
}
@ -1620,7 +1621,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(NOTIFICATION_CHANNEL, notificationChannel)
}
if (update(id, contentValues)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1655,7 +1656,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
val rowsUpdated = database.update(TABLE_NAME, values, where, null)
if (rowsUpdated == idWithWallpaper.size) {
for (pair in idWithWallpaper) {
Recipient.live(pair.first()).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(pair.first())
if (pair.second() != null) {
WallpaperStorage.onWallpaperDeselected(context, Uri.parse(pair.second()))
}
@ -1685,7 +1686,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
if (update(id, values)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
if (existingWallpaperUri != null) {
@ -1790,7 +1791,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
if (update(id, contentValues)) {
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
StorageSyncHelper.scheduleSyncForDataChange()
}
}
@ -1838,7 +1839,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
put(USERNAME, username)
}
if (update(id, contentValues)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
StorageSyncHelper.scheduleSyncForDataChange()
}
}
@ -1905,7 +1906,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
if (update(id, contentValues)) {
setStorageIdIfNotSet(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -1915,7 +1916,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
putNull(STORAGE_SERVICE_ID)
}
if (update(id, contentValues)) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -2080,7 +2081,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
} finally {
db.setTransactionSuccessful()
db.endTransaction()
updates.entries.forEach { Recipient.live(it.key).refresh() }
updates.entries.forEach { ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(it.key) }
}
}
@ -2336,7 +2337,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
}
for (id in ids.keys) {
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -2369,7 +2370,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
writableDatabase.update(TABLE_NAME, values, query.where, query.whereArgs)
for (id in idsToUpdate) {
Recipient.live(RecipientId.from(id)).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(RecipientId.from(id))
}
}
}
@ -2398,7 +2399,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
val count = db.update(TABLE_NAME, values, query.where, query.whereArgs)
if (count > 0) {
for (id in idsToUpdate) {
Recipient.live(RecipientId.from(id)).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(RecipientId.from(id))
}
}
}
@ -2428,7 +2429,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
} finally {
db.endTransaction()
}
Recipient.live(recipientId).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(recipientId)
}
/**
@ -2473,7 +2474,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
if (update(query, values)) {
val id = getByGroupId(v2Id).get()
rotateStorageId(id)
Recipient.live(id).refresh()
ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id)
}
}
@ -3088,7 +3089,7 @@ open class RecipientDatabase(context: Context, databaseHelper: SignalDatabase) :
clearSystemDataForPendingInfo()
database.setTransactionSuccessful()
database.endTransaction()
pendingRecipients.forEach { id -> Recipient.live(id).refresh() }
pendingRecipients.forEach { id -> ApplicationDependencies.getDatabaseObserver().notifyRecipientChanged(id) }
}
private fun markAllRelevantEntriesDirty() {

Wyświetl plik

@ -295,6 +295,17 @@ open class SignalDatabase(private val context: Application, databaseSecret: Data
}
}
@JvmStatic
fun runInTransaction(operation: Runnable) {
instance!!.signalWritableDatabase.beginTransaction()
try {
operation.run()
instance!!.signalWritableDatabase.setTransactionSuccessful()
} finally {
instance!!.signalWritableDatabase.endTransaction()
}
}
@get:JvmStatic
@get:JvmName("attachments")
val attachments: AttachmentDatabase

Wyświetl plik

@ -285,14 +285,7 @@ public class RetrieveProfileJob extends BaseJob {
stopwatch.split("responses");
for (Pair<Recipient, ProfileAndCredential> profile : operationState.profiles) {
process(profile.first(), profile.second());
}
stopwatch.split("process");
Set<RecipientId> success = SetUtil.difference(recipientIds, operationState.retries);
recipientDatabase.markProfilesFetched(success, System.currentTimeMillis());
Map<RecipientId, ACI> newlyRegistered = Stream.of(operationState.profiles)
.map(Pair::first)
@ -300,6 +293,17 @@ public class RetrieveProfileJob extends BaseJob {
.collect(Collectors.toMap(Recipient::getId,
r -> r.getAci().orNull()));
//noinspection SimplifyStreamApiCallChains
Util.chunk(operationState.profiles, 150).stream().forEach(list -> {
SignalDatabase.runInTransaction(() -> {
for (Pair<Recipient, ProfileAndCredential> profile : list) {
process(profile.first(), profile.second());
}
});
});
recipientDatabase.markProfilesFetched(success, System.currentTimeMillis());
if (operationState.unregistered.size() > 0 || newlyRegistered.size() > 0) {
Log.i(TAG, "Marking " + newlyRegistered.size() + " users as registered and " + operationState.unregistered.size() + " users as unregistered.");
recipientDatabase.bulkUpdatedRegisteredStatus(newlyRegistered, operationState.unregistered);
@ -307,6 +311,12 @@ public class RetrieveProfileJob extends BaseJob {
stopwatch.split("process");
for (Pair<Recipient, ProfileAndCredential> profile : operationState.profiles) {
setIdentityKey(profile.first(), profile.second().getProfile().getIdentityKey());
}
stopwatch.split("identityKeys");
long keyCount = Stream.of(operationState.profiles).map(Pair::first).map(Recipient::getProfileKey).withoutNulls().count();
Log.d(TAG, String.format(Locale.US, "Started with %d recipient(s). Found %d profile(s), and had keys for %d of them. Will retry %d.", recipients.size(), operationState.profiles.size(), keyCount, operationState.retries.size()));
@ -338,7 +348,6 @@ public class RetrieveProfileJob extends BaseJob {
setProfileBadges(recipient, profile.getBadges());
clearUsername(recipient);
setProfileCapabilities(recipient, profile.getCapabilities());
setIdentityKey(recipient, profile.getIdentityKey());
setUnidentifiedAccessMode(recipient, profile.getUnidentifiedAccess(), profile.isUnrestrictedUnidentifiedAccess());
if (recipientProfileKey != null) {
@ -386,8 +395,8 @@ public class RetrieveProfileJob extends BaseJob {
IdentityKey identityKey = new IdentityKey(Base64.decode(identityKeyValue), 0);
if (!ApplicationDependencies.getProtocolStore().aci().identities().getIdentityRecord(recipient.getId()).isPresent()) {
Log.w(TAG, "Still first use...");
if (!ApplicationDependencies.getProtocolStore().aci().identities().getIdentityRecord(recipient).isPresent()) {
Log.w(TAG, "Still first use for " + recipient.getId());
return;
}
@ -454,7 +463,7 @@ public class RetrieveProfileJob extends BaseJob {
!localDisplayName.isEmpty() &&
!remoteDisplayName.equals(localDisplayName))
{
Log.i(TAG, "Writing a profile name change event.");
Log.i(TAG, "Writing a profile name change event for " + recipient.getId());
SignalDatabase.sms().insertProfileNameChangeMessages(recipient, remoteDisplayName, localDisplayName);
} else {
Log.i(TAG, String.format(Locale.US, "Name changed, but wasn't relevant to write an event. blocked: %s, group: %s, self: %s, firstSet: %s, displayChange: %s",
@ -463,7 +472,7 @@ public class RetrieveProfileJob extends BaseJob {
}
if (TextUtils.isEmpty(plaintextProfileName)) {
Log.i(TAG, "No profile name set.");
Log.i(TAG, "No profile name set for " + recipient.getId());
}
} catch (InvalidCiphertextException e) {
Log.w(TAG, "Bad profile key for " + recipient.getId());