Create a write-through cache for PendingRetryReceiptDatabase.

fork-5.53.8
Greyson Parrelli 2021-07-01 12:56:25 -04:00 zatwierdzone przez Alex Hart
rodzic 0921ebe5f1
commit 62040d06b4
9 zmienionych plików z 152 dodań i 34 usunięć

Wyświetl plik

@ -172,6 +172,20 @@ private static final String[] GROUP_PROJECTION = {
}
}
public Optional<GroupRecord> getGroupByDistributionId(@NonNull DistributionId distributionId) {
SQLiteDatabase db = databaseHelper.getReadableDatabase();
String query = DISTRIBUTION_ID + " = ?";
String[] args = SqlUtil.buildArgs(distributionId);
try (Cursor cursor = db.query(TABLE_NAME, null, query, args, null, null, null)) {
if (cursor.moveToFirst()) {
return getGroup(cursor);
} else {
return Optional.absent();
}
}
}
/**
* Removes the specified members from the list of 'unmigrated V1 members' -- the list of members
* that were either dropped or had to be invited when migrating the group from V1->V2.

Wyświetl plik

@ -0,0 +1,78 @@
package org.thoughtcrime.securesms.database
import android.content.Context
import androidx.annotation.VisibleForTesting
import org.thoughtcrime.securesms.database.model.PendingRetryReceiptModel
import org.thoughtcrime.securesms.recipients.RecipientId
import org.thoughtcrime.securesms.util.FeatureFlags
/**
* A write-through cache for [PendingRetryReceiptDatabase].
*
* We have to read from this cache every time we process an incoming message. As a result, it's a very performance-sensitive operation.
*
* This cache is very similar to our job storage cache or our key-value store, in the sense that the first access of it will fetch all data from disk so all
* future reads can happen in memory.
*/
class PendingRetryReceiptCache @VisibleForTesting constructor(
private val database: PendingRetryReceiptDatabase
) {
constructor(context: Context) : this(DatabaseFactory.getPendingRetryReceiptDatabase(context))
private val pendingRetries: MutableMap<RemoteMessageId, PendingRetryReceiptModel> = HashMap()
private var populated: Boolean = false
fun insert(author: RecipientId, authorDevice: Int, sentTimestamp: Long, receivedTimestamp: Long, threadId: Long) {
if (!FeatureFlags.senderKey()) return
ensurePopulated()
synchronized(pendingRetries) {
val model: PendingRetryReceiptModel = database.insert(author, authorDevice, sentTimestamp, receivedTimestamp, threadId)
pendingRetries[RemoteMessageId(author, sentTimestamp)] = model
}
}
fun get(author: RecipientId, sentTimestamp: Long): PendingRetryReceiptModel? {
if (!FeatureFlags.senderKey()) return null
ensurePopulated()
synchronized(pendingRetries) {
return pendingRetries[RemoteMessageId(author, sentTimestamp)]
}
}
fun getOldest(): PendingRetryReceiptModel? {
if (!FeatureFlags.senderKey()) return null
ensurePopulated()
synchronized(pendingRetries) {
return pendingRetries.values.minByOrNull { it.receivedTimestamp }
}
}
fun delete(model: PendingRetryReceiptModel) {
if (!FeatureFlags.senderKey()) return
ensurePopulated()
synchronized(pendingRetries) {
pendingRetries.remove(RemoteMessageId(model.author, model.sentTimestamp))
database.delete(model)
}
}
private fun ensurePopulated() {
if (!populated) {
synchronized(pendingRetries) {
if (!populated) {
database.all.forEach { model ->
pendingRetries[RemoteMessageId(model.author, model.sentTimestamp)] = model
}
populated = true
}
}
}
}
data class RemoteMessageId(val author: RecipientId, val sentTimestamp: Long)
}

Wyświetl plik

@ -5,7 +5,6 @@ import android.content.Context;
import android.database.Cursor;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper;
import org.thoughtcrime.securesms.database.model.PendingRetryReceiptModel;
@ -13,8 +12,13 @@ import org.thoughtcrime.securesms.recipients.RecipientId;
import org.thoughtcrime.securesms.util.CursorUtil;
import org.thoughtcrime.securesms.util.SqlUtil;
import java.util.LinkedList;
import java.util.List;
/**
* Holds information about messages we've sent out retry receipts for.
*
* Do not use directly! The only class that should be accessing this is {@link PendingRetryReceiptCache}
*/
public final class PendingRetryReceiptDatabase extends Database {
@ -39,7 +43,7 @@ public final class PendingRetryReceiptDatabase extends Database {
super(context, databaseHelper);
}
public void insert(@NonNull RecipientId author, int authorDevice, long sentTimestamp, long receivedTimestamp, long threadId) {
@NonNull PendingRetryReceiptModel insert(@NonNull RecipientId author, int authorDevice, long sentTimestamp, long receivedTimestamp, long threadId) {
ContentValues values = new ContentValues();
values.put(AUTHOR, author.serialize());
values.put(DEVICE, authorDevice);
@ -47,37 +51,27 @@ public final class PendingRetryReceiptDatabase extends Database {
values.put(RECEIVED_TIMESTAMP, receivedTimestamp);
values.put(THREAD_ID, threadId);
databaseHelper.getWritableDatabase().insertWithOnConflict(TABLE_NAME, null, values, SQLiteDatabase.CONFLICT_REPLACE);
long id = databaseHelper.getWritableDatabase().insertWithOnConflict(TABLE_NAME, null, values, SQLiteDatabase.CONFLICT_REPLACE);
return new PendingRetryReceiptModel(id, author, authorDevice, sentTimestamp, receivedTimestamp, threadId);
}
public @Nullable PendingRetryReceiptModel get(@NonNull RecipientId author, long sentTimestamp) {
String query = AUTHOR + " = ? AND " + SENT_TIMESTAMP + " = ?";
String[] args = SqlUtil.buildArgs(author, sentTimestamp);
@NonNull List<PendingRetryReceiptModel> getAll() {
List<PendingRetryReceiptModel> models = new LinkedList<>();
try (Cursor cursor = databaseHelper.getReadableDatabase().query(TABLE_NAME, null, query, args, null, null, null)) {
if (cursor.moveToFirst()) {
return fromCursor(cursor);
try (Cursor cursor = databaseHelper.getReadableDatabase().query(TABLE_NAME, null, null, null, null, null, null)) {
while (cursor.moveToNext()) {
models.add(fromCursor(cursor));
}
}
return null;
return models;
}
public @Nullable PendingRetryReceiptModel getOldest() {
try (Cursor cursor = databaseHelper.getReadableDatabase().query(TABLE_NAME, null, null, null, null, null, RECEIVED_TIMESTAMP + " ASC", "1")) {
if (cursor.moveToFirst()) {
return fromCursor(cursor);
}
}
return null;
void delete(@NonNull PendingRetryReceiptModel model) {
databaseHelper.getWritableDatabase().delete(TABLE_NAME, ID_WHERE, SqlUtil.buildArgs(model.getId()));
}
public void delete(long id) {
databaseHelper.getWritableDatabase().delete(TABLE_NAME, ID_WHERE, SqlUtil.buildArgs(id));
}
private static @NonNull PendingRetryReceiptModel fromCursor(@NonNull Cursor cursor) {
return new PendingRetryReceiptModel(CursorUtil.requireLong(cursor, ID),
RecipientId.from(CursorUtil.requireString(cursor, AUTHOR)),

Wyświetl plik

@ -9,7 +9,6 @@ import org.thoughtcrime.securesms.KbsEnclave;
import org.thoughtcrime.securesms.components.TypingStatusRepository;
import org.thoughtcrime.securesms.components.TypingStatusSender;
import org.thoughtcrime.securesms.database.DatabaseObserver;
import org.thoughtcrime.securesms.database.model.PendingRetryReceiptModel;
import org.thoughtcrime.securesms.groups.GroupsV2Authorization;
import org.thoughtcrime.securesms.groups.GroupsV2AuthorizationMemoryValueCache;
import org.thoughtcrime.securesms.groups.v2.processing.GroupsV2StateProcessor;
@ -19,7 +18,7 @@ import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor;
import org.thoughtcrime.securesms.net.ContentProxySelector;
import org.thoughtcrime.securesms.database.PendingRetryReceiptCache;
import org.thoughtcrime.securesms.net.PipeConnectivityListener;
import org.thoughtcrime.securesms.net.StandardUserAgentInterceptor;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
@ -90,6 +89,7 @@ public class ApplicationDependencies {
private static volatile ShakeToReport shakeToReport;
private static volatile OkHttpClient okHttpClient;
private static volatile PendingRetryReceiptManager pendingRetryReceiptManager;
private static volatile PendingRetryReceiptCache pendingRetryReceiptCache;
@MainThread
public static void init(@NonNull Application application, @NonNull Provider provider) {
@ -480,6 +480,18 @@ public class ApplicationDependencies {
return appForegroundObserver;
}
public static @NonNull PendingRetryReceiptCache getPendingRetryReceiptCache() {
if (pendingRetryReceiptCache == null) {
synchronized (LOCK) {
if (pendingRetryReceiptCache == null) {
pendingRetryReceiptCache = provider.providePendingRetryReceiptCache();
}
}
}
return pendingRetryReceiptCache;
}
public interface Provider {
@NonNull PipeConnectivityListener providePipeListener();
@ -508,5 +520,6 @@ public class ApplicationDependencies {
@NonNull AppForegroundObserver provideAppForegroundObserver();
@NonNull SignalCallManager provideSignalCallManager();
@NonNull PendingRetryReceiptManager providePendingRetryReceiptManager();
@NonNull PendingRetryReceiptCache providePendingRetryReceiptCache();
}
}

Wyświetl plik

@ -33,6 +33,7 @@ import org.thoughtcrime.securesms.megaphone.MegaphoneRepository;
import org.thoughtcrime.securesms.messages.BackgroundMessageRetriever;
import org.thoughtcrime.securesms.messages.IncomingMessageObserver;
import org.thoughtcrime.securesms.messages.IncomingMessageProcessor;
import org.thoughtcrime.securesms.database.PendingRetryReceiptCache;
import org.thoughtcrime.securesms.net.PipeConnectivityListener;
import org.thoughtcrime.securesms.notifications.MessageNotifier;
import org.thoughtcrime.securesms.notifications.OptimizedMessageNotifier;
@ -255,6 +256,11 @@ public class ApplicationDependencyProvider implements ApplicationDependencies.Pr
return new PendingRetryReceiptManager(context);
}
@Override
public @NonNull PendingRetryReceiptCache providePendingRetryReceiptCache() {
return new PendingRetryReceiptCache(context);
}
private static class DynamicCredentialsProvider implements CredentialsProvider {
private final Context context;

Wyświetl plik

@ -10,6 +10,9 @@ import org.signal.core.util.ThreadUtil;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.crypto.UnidentifiedAccessUtil;
import org.thoughtcrime.securesms.crypto.storage.SignalSenderKeyStore;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.GroupDatabase;
import org.thoughtcrime.securesms.database.GroupDatabase.GroupRecord;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.groups.GroupId;
import org.thoughtcrime.securesms.jobmanager.Data;
@ -129,6 +132,16 @@ public class ResendMessageJob extends BaseJob {
Content contentToSend = content;
if (distributionId != null) {
Optional<GroupRecord> groupRecord = DatabaseFactory.getGroupDatabase(context).getGroupByDistributionId(distributionId);
if (!groupRecord.isPresent()) {
Log.w(TAG, "Could not find a matching group for the distributionId! Skipping message send.");
return;
} else if (!groupRecord.get().getMembers().contains(recipientId)) {
Log.w(TAG, "The target user is no longer in the group! Skipping message send.");
return;
}
SenderKeyDistributionMessage senderKeyDistributionMessage = messageSender.getOrCreateNewGroupSession(distributionId);
ByteString distributionBytes = ByteString.copyFrom(senderKeyDistributionMessage.serialize());

Wyświetl plik

@ -232,7 +232,7 @@ public final class MessageContentProcessor {
}
RecipientId senderId = RecipientId.fromHighTrust(content.getSender());
PendingRetryReceiptModel pending = DatabaseFactory.getPendingRetryReceiptDatabase(context).get(senderId, content.getTimestamp());
PendingRetryReceiptModel pending = ApplicationDependencies.getPendingRetryReceiptCache().get(senderId, content.getTimestamp());
long receivedTime = handlePendingRetry(pending, content);
log(String.valueOf(content.getTimestamp()), "Beginning message processing.");
@ -350,7 +350,7 @@ public final class MessageContentProcessor {
if (pending != null) {
warn(content.getTimestamp(), "Pending retry was processed. Deleting.");
DatabaseFactory.getPendingRetryReceiptDatabase(context).delete(pending.getId());
ApplicationDependencies.getPendingRetryReceiptCache().delete(pending);
}
} catch (StorageFailedException e) {
warn(String.valueOf(content.getTimestamp()), e);

Wyświetl plik

@ -147,7 +147,7 @@ public final class MessageDecryptionUtil {
break;
case RESENDABLE:
Log.w(TAG, "[" + envelope.getTimestamp() + "] Inserting into pending retries store because it's " + contentHint);
DatabaseFactory.getPendingRetryReceiptDatabase(context).insert(sender.getId(), senderDevice, envelope.getTimestamp(), receivedTimestamp, threadId);
ApplicationDependencies.getPendingRetryReceiptCache().insert(sender.getId(), senderDevice, envelope.getTimestamp(), receivedTimestamp, threadId);
ApplicationDependencies.getPendingRetryReceiptManager().scheduleIfNecessary();
break;
case IMPLICIT:

Wyświetl plik

@ -13,9 +13,9 @@ import androidx.annotation.WorkerThread;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.database.DatabaseFactory;
import org.thoughtcrime.securesms.database.MessageDatabase;
import org.thoughtcrime.securesms.database.PendingRetryReceiptDatabase;
import org.thoughtcrime.securesms.database.model.PendingRetryReceiptModel;
import org.thoughtcrime.securesms.dependencies.ApplicationDependencies;
import org.thoughtcrime.securesms.database.PendingRetryReceiptCache;
import org.thoughtcrime.securesms.util.FeatureFlags;
@ -26,13 +26,13 @@ public final class PendingRetryReceiptManager extends TimedEventManager<PendingR
private static final String TAG = Log.tag(PendingRetryReceiptManager.class);
private final PendingRetryReceiptDatabase pendingDatabase;
private final MessageDatabase messageDatabase;
private final PendingRetryReceiptCache pendingCache;
private final MessageDatabase messageDatabase;
public PendingRetryReceiptManager(@NonNull Application application) {
super(application, "PendingRetryReceiptManager");
this.pendingDatabase = DatabaseFactory.getPendingRetryReceiptDatabase(application);
this.pendingCache = ApplicationDependencies.getPendingRetryReceiptCache();
this.messageDatabase = DatabaseFactory.getSmsDatabase(application);
scheduleIfNecessary();
@ -41,7 +41,7 @@ public final class PendingRetryReceiptManager extends TimedEventManager<PendingR
@WorkerThread
@Override
protected @Nullable PendingRetryReceiptModel getNextClosestEvent() {
PendingRetryReceiptModel model = pendingDatabase.getOldest();
PendingRetryReceiptModel model = pendingCache.getOldest();
if (model != null) {
Log.i(TAG, "Next closest expiration is in " + getDelayForEvent(model) + " ms for timestamp " + model.getSentTimestamp() + ".");
@ -57,7 +57,7 @@ public final class PendingRetryReceiptManager extends TimedEventManager<PendingR
protected void executeEvent(@NonNull PendingRetryReceiptModel event) {
Log.w(TAG, "It's been " + (System.currentTimeMillis() - event.getReceivedTimestamp()) + " ms since this retry receipt was received. Showing an error.");
messageDatabase.insertBadDecryptMessage(event.getAuthor(), event.getAuthorDevice(), event.getSentTimestamp(), event.getReceivedTimestamp(), event.getThreadId());
pendingDatabase.delete(event.getId());
pendingCache.delete(event);
}
@WorkerThread