Improve efficiency of bulk receipt processing.

If there were N receipts for a single thread, we were previously
updating that thread N times.

This change bundles updates together so we will only update each thread
once after all receipts in a bundle are processed.
fork-5.53.8
Greyson Parrelli 2021-03-25 20:10:53 -04:00 zatwierdzone przez Alex Hart
rodzic 3162f04937
commit e068fde8f2
4 zmienionych plików z 140 dodań i 99 usunięć

Wyświetl plik

@ -49,6 +49,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@ -118,7 +119,7 @@ public abstract class MessageDatabase extends Database implements MmsSmsColumns
public abstract void markDownloadState(long messageId, long state);
public abstract void markIncomingNotificationReceived(long threadId);
public abstract boolean incrementReceiptCount(SyncMessageId messageId, long timestamp, @NonNull ReceiptType receiptType);
public abstract Set<ThreadUpdate> incrementReceiptCount(SyncMessageId messageId, long timestamp, @NonNull ReceiptType receiptType);
public abstract List<Pair<Long, Long>> setTimestampRead(SyncMessageId messageId, long proposedExpireStarted);
public abstract List<MarkedMessageInfo> setEntireThreadRead(long threadId);
public abstract List<MarkedMessageInfo> setMessagesReadSince(long threadId, long timestamp);
@ -717,6 +718,39 @@ public abstract class MessageDatabase extends Database implements MmsSmsColumns
}
}
static class ThreadUpdate {
private final long threadId;
private final boolean verbose;
ThreadUpdate(long threadId, boolean verbose) {
this.threadId = threadId;
this.verbose = verbose;
}
public long getThreadId() {
return threadId;
}
public boolean isVerbose() {
return verbose;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ThreadUpdate that = (ThreadUpdate) o;
return threadId == that.threadId &&
verbose == that.verbose;
}
@Override
public int hashCode() {
return Objects.hash(threadId, verbose);
}
}
public interface InsertListener {
void onComplete();
}

Wyświetl plik

@ -619,13 +619,14 @@ public class MmsDatabase extends MessageDatabase {
}
@Override
public boolean incrementReceiptCount(SyncMessageId messageId, long timestamp, @NonNull ReceiptType receiptType) {
SQLiteDatabase database = databaseHelper.getWritableDatabase();
boolean found = false;
public Set<ThreadUpdate> incrementReceiptCount(SyncMessageId messageId, long timestamp, @NonNull ReceiptType receiptType) {
SQLiteDatabase database = databaseHelper.getWritableDatabase();
Set<ThreadUpdate> threadUpdates = new HashSet<>();
try (Cursor cursor = database.query(TABLE_NAME, new String[] {ID, THREAD_ID, MESSAGE_BOX, RECIPIENT_ID, receiptType.getColumnName()},
DATE_SENT + " = ?", new String[] {String.valueOf(messageId.getTimetamp())},
null, null, null, null)) {
null, null, null, null))
{
while (cursor.moveToNext()) {
if (Types.isOutgoingMessageType(cursor.getLong(cursor.getColumnIndexOrThrow(MESSAGE_BOX)))) {
RecipientId theirRecipientId = RecipientId.from(cursor.getLong(cursor.getColumnIndexOrThrow(RECIPIENT_ID)));
@ -638,30 +639,22 @@ public class MmsDatabase extends MessageDatabase {
int status = receiptType.getGroupStatus();
boolean isFirstIncrement = cursor.getLong(cursor.getColumnIndexOrThrow(columnName)) == 0;
found = true;
database.execSQL("UPDATE " + TABLE_NAME + " SET " +
columnName + " = " + columnName + " + 1 WHERE " + ID + " = ?",
new String[] {String.valueOf(id)});
DatabaseFactory.getGroupReceiptDatabase(context).update(ourRecipientId, id, status, timestamp);
DatabaseFactory.getThreadDatabase(context).update(threadId, false);
if (isFirstIncrement) {
notifyConversationListeners(threadId);
} else {
notifyVerboseConversationListeners(threadId);
}
threadUpdates.add(new ThreadUpdate(threadId, !isFirstIncrement));
}
}
}
if (!found && receiptType == ReceiptType.DELIVERY) {
if (threadUpdates.size() > 0 && receiptType == ReceiptType.DELIVERY) {
earlyDeliveryReceiptCache.increment(messageId.getTimetamp(), messageId.getRecipientId());
return true;
}
return found;
return threadUpdates;
}
}

Wyświetl plik

@ -28,6 +28,7 @@ import net.sqlcipher.database.SQLiteQueryBuilder;
import org.signal.core.util.logging.Log;
import org.thoughtcrime.securesms.database.MessageDatabase.SyncMessageId;
import org.thoughtcrime.securesms.database.MessageDatabase.ThreadUpdate;
import org.thoughtcrime.securesms.database.helpers.SQLCipherOpenHelper;
import org.thoughtcrime.securesms.database.model.MessageRecord;
import org.thoughtcrime.securesms.recipients.Recipient;
@ -320,102 +321,123 @@ public class MmsSmsDatabase extends Database {
}
public void incrementDeliveryReceiptCounts(@NonNull List<SyncMessageId> syncMessageIds, long timestamp) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
db.beginTransaction();
try {
for (SyncMessageId id : syncMessageIds) {
incrementDeliveryReceiptCount(id, timestamp);
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
incrementReceiptCounts(syncMessageIds, timestamp, MessageDatabase.ReceiptType.DELIVERY);
}
public void incrementDeliveryReceiptCount(SyncMessageId syncMessageId, long timestamp) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
db.beginTransaction();
try {
DatabaseFactory.getSmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, MessageDatabase.ReceiptType.DELIVERY);
DatabaseFactory.getMmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, MessageDatabase.ReceiptType.DELIVERY);
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
incrementReceiptCount(syncMessageId, timestamp, MessageDatabase.ReceiptType.DELIVERY);
}
/**
* @return A list of ID's that were not updated.
*/
public @NonNull Collection<SyncMessageId> incrementReadReceiptCounts(@NonNull List<SyncMessageId> syncMessageIds, long timestamp) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
List<SyncMessageId> unhandled = new LinkedList<>();
db.beginTransaction();
try {
for (SyncMessageId id : syncMessageIds) {
boolean handled = incrementReadReceiptCount(id, timestamp);
if (!handled) {
unhandled.add(id);
}
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
return unhandled;
return incrementReceiptCounts(syncMessageIds, timestamp, MessageDatabase.ReceiptType.READ);
}
public boolean incrementReadReceiptCount(SyncMessageId syncMessageId, long timestamp) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
db.beginTransaction();
try {
boolean handled = false;
handled |= DatabaseFactory.getSmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, MessageDatabase.ReceiptType.READ);
handled |= DatabaseFactory.getMmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, MessageDatabase.ReceiptType.READ);
db.setTransactionSuccessful();
return handled;
} finally {
db.endTransaction();
}
return incrementReceiptCount(syncMessageId, timestamp, MessageDatabase.ReceiptType.READ);
}
/**
* @return A list of ID's that were not updated.
*/
public @NonNull Collection<SyncMessageId> incrementViewedReceiptCounts(@NonNull List<SyncMessageId> syncMessageIds, long timestamp) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
List<SyncMessageId> unhandled = new LinkedList<>();
return incrementReceiptCounts(syncMessageIds, timestamp, MessageDatabase.ReceiptType.VIEWED);
}
public boolean incrementViewedReceiptCount(SyncMessageId syncMessageId, long timestamp) {
return incrementReceiptCount(syncMessageId, timestamp, MessageDatabase.ReceiptType.VIEWED);
}
/**
* Wraps a single receipt update in a transaction and triggers the proper updates.
*
* @return Whether or not some thread was updated.
*/
private boolean incrementReceiptCount(SyncMessageId syncMessageId, long timestamp, @NonNull MessageDatabase.ReceiptType receiptType) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
ThreadDatabase threadDatabase = DatabaseFactory.getThreadDatabase(context);
Set<ThreadUpdate> threadUpdates = new HashSet<>();
db.beginTransaction();
try {
for (SyncMessageId id : syncMessageIds) {
boolean handled = incrementViewedReceiptCount(id, timestamp);
threadUpdates = incrementReceiptCountInternal(syncMessageId, timestamp, receiptType);
if (!handled) {
unhandled.add(id);
}
for (ThreadUpdate threadUpdate : threadUpdates) {
threadDatabase.update(threadUpdate.getThreadId(), false);
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();
for (ThreadUpdate threadUpdate : threadUpdates) {
if (threadUpdate.isVerbose()) {
notifyVerboseConversationListeners(threadUpdate.getThreadId());
} else {
notifyConversationListeners(threadUpdate.getThreadId());
}
}
}
return threadUpdates.size() > 0;
}
/**
* Wraps multiple receipt updates in a transaction and triggers the proper updates.
*
* @return All of the messages that didn't result in updates.
*/
private @NonNull Collection<SyncMessageId> incrementReceiptCounts(@NonNull List<SyncMessageId> syncMessageIds, long timestamp, @NonNull MessageDatabase.ReceiptType receiptType) {
SQLiteDatabase db = databaseHelper.getWritableDatabase();
ThreadDatabase threadDatabase = DatabaseFactory.getThreadDatabase(context);
Set<ThreadUpdate> threadUpdates = new HashSet<>();
Collection<SyncMessageId> unhandled = new HashSet<>();
db.beginTransaction();
try {
for (SyncMessageId id : syncMessageIds) {
Set<ThreadUpdate> updates = incrementReceiptCountInternal(id, timestamp, receiptType);
if (updates.size() > 0) {
threadUpdates.addAll(updates);
} else {
unhandled.add(id);
}
}
for (ThreadUpdate update : threadUpdates) {
threadDatabase.update(update.getThreadId(), false);
}
db.setTransactionSuccessful();
} finally {
db.endTransaction();
for (ThreadUpdate threadUpdate : threadUpdates) {
if (threadUpdate.isVerbose()) {
notifyVerboseConversationListeners(threadUpdate.getThreadId());
} else {
notifyConversationListeners(threadUpdate.getThreadId());
}
}
}
return unhandled;
}
public boolean incrementViewedReceiptCount(SyncMessageId syncMessageId, long timestamp) {
return DatabaseFactory.getMmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, MessageDatabase.ReceiptType.VIEWED);
/**
* Doesn't do any transactions or updates, so we can re-use the method safely.
*/
private @NonNull Set<ThreadUpdate> incrementReceiptCountInternal(SyncMessageId syncMessageId, long timestamp, MessageDatabase.ReceiptType receiptType) {
Set<ThreadUpdate> threadUpdates = new HashSet<>();
threadUpdates.addAll(DatabaseFactory.getSmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, receiptType));
threadUpdates.addAll(DatabaseFactory.getMmsDatabase(context).incrementReceiptCount(syncMessageId, timestamp, receiptType));
return threadUpdates;
}
public int getQuotedMessagePosition(long threadId, long quoteId, @NonNull RecipientId recipientId) {

Wyświetl plik

@ -68,6 +68,7 @@ import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -468,13 +469,13 @@ public class SmsDatabase extends MessageDatabase {
}
@Override
public boolean incrementReceiptCount(SyncMessageId messageId, long timestamp, @NonNull ReceiptType receiptType) {
public @NonNull Set<ThreadUpdate> incrementReceiptCount(SyncMessageId messageId, long timestamp, @NonNull ReceiptType receiptType) {
if (receiptType == ReceiptType.VIEWED) {
return false;
return Collections.emptySet();
}
SQLiteDatabase database = databaseHelper.getWritableDatabase();
boolean foundMessage = false;
SQLiteDatabase database = databaseHelper.getWritableDatabase();
Set<ThreadUpdate> threadUpdates = new HashSet<>();
try (Cursor cursor = database.query(TABLE_NAME, new String[] {ID, THREAD_ID, RECIPIENT_ID, TYPE, DELIVERY_RECEIPT_COUNT, READ_RECEIPT_COUNT},
DATE_SENT + " = ?", new String[] {String.valueOf(messageId.getTimetamp())},
@ -484,36 +485,27 @@ public class SmsDatabase extends MessageDatabase {
if (Types.isOutgoingMessageType(cursor.getLong(cursor.getColumnIndexOrThrow(TYPE)))) {
RecipientId theirRecipientId = messageId.getRecipientId();
RecipientId outRecipientId = RecipientId.from(cursor.getLong(cursor.getColumnIndexOrThrow(RECIPIENT_ID)));
String columnName = receiptType.getColumnName();
boolean isFirstIncrement = cursor.getLong(cursor.getColumnIndexOrThrow(columnName)) == 0;
if (outRecipientId.equals(theirRecipientId)) {
long threadId = cursor.getLong(cursor.getColumnIndexOrThrow(THREAD_ID));
long threadId = cursor.getLong(cursor.getColumnIndexOrThrow(THREAD_ID));
String columnName = receiptType.getColumnName();
boolean isFirstIncrement = cursor.getLong(cursor.getColumnIndexOrThrow(columnName)) == 0;
database.execSQL("UPDATE " + TABLE_NAME +
" SET " + columnName + " = " + columnName + " + 1 WHERE " +
ID + " = ?",
new String[] {String.valueOf(cursor.getLong(cursor.getColumnIndexOrThrow(ID)))});
DatabaseFactory.getThreadDatabase(context).update(threadId, false);
if (isFirstIncrement) {
notifyConversationListeners(threadId);
} else {
notifyVerboseConversationListeners(threadId);
}
foundMessage = true;
threadUpdates.add(new ThreadUpdate(threadId, !isFirstIncrement));
}
}
}
if (!foundMessage && receiptType == ReceiptType.DELIVERY) {
if (threadUpdates.size() > 0 && receiptType == ReceiptType.DELIVERY) {
earlyDeliveryReceiptCache.increment(messageId.getTimetamp(), messageId.getRecipientId());
return true;
}
return foundMessage;
return threadUpdates;
}
}