Fix: Reliability of receiving media files.
Some checks are pending
Flutter analyze & test / flutter_analyze_and_test (push) Waiting to run

This commit is contained in:
otsmr 2026-05-15 23:17:40 +02:00
parent d52f1eefea
commit ebc643cbe4
7 changed files with 191 additions and 90 deletions

View file

@ -3,6 +3,7 @@
## 0.2.12 ## 0.2.12
- Improved: Memories viewer redesigned with smoother animations and new quick-action controls. - Improved: Memories viewer redesigned with smoother animations and new quick-action controls.
- Fix: Reliability of receiving media files.
## 0.2.11 ## 0.2.11

View file

@ -191,6 +191,23 @@ class ReceiptsDao extends DatabaseAccessor<TwonlyDB> with _$ReceiptsDaoMixin {
)..where((c) => c.receiptId.equals(receiptId))).write(updates); )..where((c) => c.receiptId.equals(receiptId))).write(updates);
} }
Future<Receipt?> rotateReceiptId(String oldReceiptId) async {
final newReceiptId = uuid.v4();
await updateReceipt(
oldReceiptId,
ReceiptsCompanion(
receiptId: Value(newReceiptId),
),
);
final updatedReceipt = await getReceiptById(newReceiptId);
if (updatedReceipt == null) {
Log.error(
'Tried to change the receipt ID, but could not get the updated receipt...',
);
}
return updatedReceipt;
}
Future<void> updateReceiptByContactAndMessageId( Future<void> updateReceiptByContactAndMessageId(
int contactId, int contactId,
String messageId, String messageId,

View file

@ -121,6 +121,7 @@ Future<void> handleMedia(
MediaFile? mediaFile; MediaFile? mediaFile;
Message? message; Message? message;
Log.info('Starting transaction for media message ${media.senderMessageId}');
await twonlyDB.transaction(() async { await twonlyDB.transaction(() async {
mediaFile = await twonlyDB.mediaFilesDao.insertOrUpdateMedia( mediaFile = await twonlyDB.mediaFilesDao.insertOrUpdateMedia(
MediaFilesCompanion( MediaFilesCompanion(
@ -163,6 +164,9 @@ Future<void> handleMedia(
), ),
); );
}); });
Log.info(
'Finished transaction for media message ${media.senderMessageId}. Success: ${message != null}',
);
if (message != null && mediaFile != null) { if (message != null && mediaFile != null) {
await twonlyDB.groupsDao.increaseLastMessageExchange( await twonlyDB.groupsDao.increaseLastMessageExchange(

View file

@ -27,6 +27,16 @@ import 'package:twonly/src/utils/misc.dart';
import 'package:workmanager/workmanager.dart' hide TaskStatus; import 'package:workmanager/workmanager.dart' hide TaskStatus;
final lockRetransmission = Mutex(); final lockRetransmission = Mutex();
final Map<String, Mutex> _uploadMutexes = {};
Future<void> _protectMediaUpload(
String mediaId,
Future<void> Function() action,
) async {
final mutex = _uploadMutexes.putIfAbsent(mediaId, Mutex.new);
await mutex.protect(action);
_uploadMutexes.remove(mediaId);
}
Future<void> reuploadMediaFiles() async { Future<void> reuploadMediaFiles() async {
return exclusiveAccess( return exclusiveAccess(
@ -42,18 +52,33 @@ Future<void> reuploadMediaFiles() async {
final contacts = <int, Contact>{}; final contacts = <int, Contact>{};
for (final receipt in receipts) { for (var receipt in receipts) {
if (receipt.retryCount > 1 && receipt.lastRetry != null) { if (receipt.retryCount > 1 && receipt.lastRetry != null) {
final twentyFourHoursAgo = DateTime.now().subtract( final twentyFourHoursAgo = DateTime.now().subtract(
const Duration(hours: 24), const Duration(hours: 6),
); );
if (receipt.lastRetry!.isAfter(twentyFourHoursAgo)) { if (receipt.lastRetry!.isAfter(twentyFourHoursAgo)) {
Log.info( Log.info(
'Ignoring ${receipt.receiptId} as it was retried in the last 24h', 'Ignoring ${receipt.receiptId} as it was retried in the last 6h',
); );
continue; continue;
} }
} }
if (receipt.retryCount >= 2) {
// After two retries, change the receiptId. This addresses a bug where the receiver received the message and marked it as received, but the app was closed before the message was fully processed. Because the receipt was already stored, subsequent retries were detected as duplicates and rejected.
final oldReceiptId = receipt.receiptId;
final updatedReceipt = await twonlyDB.receiptsDao.rotateReceiptId(
oldReceiptId,
);
if (updatedReceipt == null) continue;
Log.info(
'Changed receiptId $oldReceiptId to ${updatedReceipt.receiptId} as retryCount is ${receipt.retryCount}',
);
receipt = updatedReceipt;
}
var messageId = receipt.messageId; var messageId = receipt.messageId;
if (receipt.messageId == null) { if (receipt.messageId == null) {
Log.info('Message not in receipt. Loading it from the content.'); Log.info('Message not in receipt. Loading it from the content.');
@ -146,7 +171,7 @@ Future<void> reuploadMediaFiles() async {
Log.info('Reuploading media file $messageId'); Log.info('Reuploading media file $messageId');
// the media file should be still on the server, so it should be enough // the media file should be still on the server, so it should be enough
// to just resend the message containing the download token. // to just resend the message containing the download token.
await tryToSendCompleteMessage(receipt: receipt); await tryToSendCompleteMessage(receiptId: receipt.receiptId);
} }
} }
}, },
@ -158,6 +183,7 @@ Future<void> reuploadMediaFile(
MediaFile mediaFile, MediaFile mediaFile,
String messageId, String messageId,
) async { ) async {
return _protectMediaUpload(mediaFile.mediaId, () async {
Log.info('Reuploading media file: ${mediaFile.mediaId}'); Log.info('Reuploading media file: ${mediaFile.mediaId}');
await twonlyDB.receiptsDao.updateReceiptByContactAndMessageId( await twonlyDB.receiptsDao.updateReceiptByContactAndMessageId(
@ -169,8 +195,14 @@ Future<void> reuploadMediaFile(
), ),
); );
final reuploadRequestedBy = (mediaFile.reuploadRequestedBy ?? []) // Refresh media file to get latest reuploadRequestedBy
final currentMedia = await twonlyDB.mediaFilesDao.getMediaFileById(
mediaFile.mediaId,
);
final reuploadRequestedBy = (currentMedia?.reuploadRequestedBy ?? [])
..add(contactId); ..add(contactId);
await twonlyDB.mediaFilesDao.updateMedia( await twonlyDB.mediaFilesDao.updateMedia(
mediaFile.mediaId, mediaFile.mediaId,
MediaFilesCompanion( MediaFilesCompanion(
@ -185,8 +217,9 @@ Future<void> reuploadMediaFile(
if (mediaFileUpdated.uploadRequestPath.existsSync()) { if (mediaFileUpdated.uploadRequestPath.existsSync()) {
mediaFileUpdated.uploadRequestPath.deleteSync(); mediaFileUpdated.uploadRequestPath.deleteSync();
} }
unawaited(startBackgroundMediaUpload(mediaFileUpdated)); await _startBackgroundMediaUploadInternal(mediaFileUpdated);
} }
});
} }
final Mutex _lockPreprocessing = Mutex(); final Mutex _lockPreprocessing = Mutex();
@ -398,6 +431,18 @@ Future<void> insertMediaFileInMessagesTable(
} }
Future<void> startBackgroundMediaUpload(MediaFileService mediaService) async { Future<void> startBackgroundMediaUpload(MediaFileService mediaService) async {
return _protectMediaUpload(
mediaService.mediaFile.mediaId,
() => _startBackgroundMediaUploadInternal(mediaService),
);
}
Future<void> _startBackgroundMediaUploadInternal(
MediaFileService mediaService,
) async {
// Refresh the media file state inside the mutex
await mediaService.updateFromDB();
if (mediaService.mediaFile.uploadState == UploadState.initialized || if (mediaService.mediaFile.uploadState == UploadState.initialized ||
mediaService.mediaFile.uploadState == UploadState.preprocessing) { mediaService.mediaFile.uploadState == UploadState.preprocessing) {
await mediaService.setUploadState(UploadState.preprocessing); await mediaService.setUploadState(UploadState.preprocessing);
@ -603,10 +648,7 @@ Future<void> _createUploadRequest(MediaFileService media) async {
await media.uploadRequestPath.writeAsBytes(uploadRequestBytes); await media.uploadRequestPath.writeAsBytes(uploadRequestBytes);
} }
Mutex protectUpload = Mutex();
Future<void> _uploadUploadRequest(MediaFileService media) async { Future<void> _uploadUploadRequest(MediaFileService media) async {
await protectUpload.protect(() async {
final currentMedia = await twonlyDB.mediaFilesDao.getMediaFileById( final currentMedia = await twonlyDB.mediaFilesDao.getMediaFileById(
media.mediaFile.mediaId, media.mediaFile.mediaId,
); );
@ -614,7 +656,7 @@ Future<void> _uploadUploadRequest(MediaFileService media) async {
if (currentMedia == null || if (currentMedia == null ||
currentMedia.uploadState == UploadState.backgroundUploadTaskStarted) { currentMedia.uploadState == UploadState.backgroundUploadTaskStarted) {
Log.info('Download for ${media.mediaFile.mediaId} already started.'); Log.info('Download for ${media.mediaFile.mediaId} already started.');
return null; return;
} }
final apiUrl = final apiUrl =
@ -650,7 +692,6 @@ Future<void> _uploadUploadRequest(MediaFileService media) async {
} else { } else {
unawaited(uploadFileFastOrEnqueue(task, media)); unawaited(uploadFileFastOrEnqueue(task, media));
} }
});
} }
Future<void> uploadFileFastOrEnqueue( Future<void> uploadFileFastOrEnqueue(

View file

@ -80,15 +80,29 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
return null; return null;
} }
} }
// ignore: parameter_assignments
receiptId = receipt.receiptId; if (receipt.retryCount >= 2) {
// After two retries, change the receiptId. This addresses a bug where the receiver received the message and marked it as received,
// but the app was closed before the message was fully processed. Because the receipt was already stored, subsequent retries were
// detected as duplicates and rejected.
final oldReceiptId = receipt.receiptId;
final updatedReceipt = await twonlyDB.receiptsDao.rotateReceiptId(
oldReceiptId,
);
if (updatedReceipt != null) {
Log.info(
'Changed receiptId $oldReceiptId to ${updatedReceipt.receiptId} as retryCount is ${receipt.retryCount}',
);
receipt = updatedReceipt;
}
}
final contact = await twonlyDB.contactsDao.getContactById( final contact = await twonlyDB.contactsDao.getContactById(
receipt.contactId, receipt.contactId,
); );
if (contact == null || contact.accountDeleted) { if (contact == null || contact.accountDeleted) {
Log.warn('Will not send message again as user does not exist anymore.'); Log.warn('Will not send message again as user does not exist anymore.');
await twonlyDB.receiptsDao.deleteReceipt(receiptId); await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId);
return null; return null;
} }
@ -100,13 +114,13 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
} }
final message = pb.Message.fromBuffer(receipt.message) final message = pb.Message.fromBuffer(receipt.message)
..receiptId = receiptId; ..receiptId = receipt.receiptId;
final encryptedContent = pb.EncryptedContent.fromBuffer( final encryptedContent = pb.EncryptedContent.fromBuffer(
message.encryptedContent, message.encryptedContent,
); );
Log.info('Uploading $receiptId.'); Log.info('Uploading ${receipt.receiptId}.');
Uint8List? pushData; Uint8List? pushData;
if (receipt.retryCount == 0) { if (receipt.retryCount == 0) {
@ -164,7 +178,7 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
if (resp.isError) { if (resp.isError) {
Log.warn('Could not transmit message got ${resp.error}.'); Log.warn('Could not transmit message got ${resp.error}.');
if (resp.error == ErrorCode.UserIdNotFound) { if (resp.error == ErrorCode.UserIdNotFound) {
await twonlyDB.receiptsDao.deleteReceipt(receiptId); await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId);
await twonlyDB.contactsDao.updateContact( await twonlyDB.contactsDao.updateContact(
receipt.contactId, receipt.contactId,
const ContactsCompanion(accountDeleted: Value(true)), const ContactsCompanion(accountDeleted: Value(true)),
@ -182,10 +196,10 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
); );
} }
if (!receipt.contactWillSendsReceipt) { if (!receipt.contactWillSendsReceipt) {
await twonlyDB.receiptsDao.deleteReceipt(receiptId); await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId);
} else { } else {
await twonlyDB.receiptsDao.updateReceipt( await twonlyDB.receiptsDao.updateReceipt(
receiptId, receipt.receiptId,
ReceiptsCompanion( ReceiptsCompanion(
ackByServerAt: Value(clock.now()), ackByServerAt: Value(clock.now()),
retryCount: Value(receipt.retryCount + 1), retryCount: Value(receipt.retryCount + 1),
@ -197,8 +211,8 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
} }
} catch (e) { } catch (e) {
Log.error('Unknown Error when sending message: $e'); Log.error('Unknown Error when sending message: $e');
if (receiptId != null) { if (receipt != null) {
await twonlyDB.receiptsDao.deleteReceipt(receiptId); await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId);
} }
} }
return null; return null;

View file

@ -4,6 +4,7 @@ import 'dart:io';
import 'package:clock/clock.dart'; import 'package:clock/clock.dart';
import 'package:drift/drift.dart'; import 'package:drift/drift.dart';
import 'package:hashlib/random.dart'; import 'package:hashlib/random.dart';
import 'package:mutex/mutex.dart';
import 'package:twonly/globals.dart'; import 'package:twonly/globals.dart';
import 'package:twonly/locator.dart'; import 'package:twonly/locator.dart';
import 'package:twonly/src/database/daos/contacts.dao.dart'; import 'package:twonly/src/database/daos/contacts.dao.dart';
@ -77,24 +78,32 @@ Future<void> handleServerMessage(server.ServerToClient msg) async {
DateTime lastPushKeyRequest = clock.now().subtract(const Duration(hours: 1)); DateTime lastPushKeyRequest = clock.now().subtract(const Duration(hours: 1));
final Map<String, Mutex> _messageLocks = {};
Future<void> handleClient2ClientMessage(NewMessage newMessage) async { Future<void> handleClient2ClientMessage(NewMessage newMessage) async {
final body = Uint8List.fromList(newMessage.body); final body = Uint8List.fromList(newMessage.body);
final fromUserId = newMessage.fromUserId.toInt();
final message = Message.fromBuffer(body); final message = Message.fromBuffer(body);
final receiptId = message.receiptId; final receiptId = message.receiptId;
final mutex = _messageLocks.putIfAbsent(receiptId, Mutex.new);
await mutex.protect(() async {
await _handleClient2ClientMessage(newMessage, message);
});
_messageLocks.remove(receiptId);
}
Future<void> _handleClient2ClientMessage(
NewMessage newMessage,
Message message,
) async {
final fromUserId = newMessage.fromUserId.toInt();
final receiptId = message.receiptId;
if (await twonlyDB.receiptsDao.isDuplicated(receiptId)) { if (await twonlyDB.receiptsDao.isDuplicated(receiptId)) {
return; return;
} }
try { Log.info('Started processing message with receiptId $receiptId');
await twonlyDB.receiptsDao.gotReceipt(receiptId);
Log.info('Got a message with receiptId $receiptId');
} catch (e) {
Log.error(e);
return;
}
switch (message.type) { switch (message.type) {
case Message_Type.SENDER_DELIVERY_RECEIPT: case Message_Type.SENDER_DELIVERY_RECEIPT:
@ -209,7 +218,14 @@ Future<void> handleClient2ClientMessage(NewMessage newMessage) async {
await tryToSendCompleteMessage(receiptId: receiptId); await tryToSendCompleteMessage(receiptId: receiptId);
} }
case Message_Type.TEST_NOTIFICATION: case Message_Type.TEST_NOTIFICATION:
return; break;
}
try {
await twonlyDB.receiptsDao.gotReceipt(receiptId);
Log.info('Got a message with receiptId $receiptId');
} catch (e) {
Log.error('Error marking message as received $receiptId: $e');
} }
} }
@ -238,6 +254,8 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessageRaw(
); );
} }
Log.info('Calling handleEncryptedMessage for $receiptId');
final (a, b) = await handleEncryptedMessage( final (a, b) = await handleEncryptedMessage(
fromUserId, fromUserId,
encryptedContent, encryptedContent,
@ -245,6 +263,8 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessageRaw(
receiptId, receiptId,
); );
Log.info('Finished handleEncryptedMessage for $receiptId');
if (Platform.isAndroid && a == null && b == null) { if (Platform.isAndroid && a == null && b == null) {
// Message was handled without any error -> Show push notification to the user. // Message was handled without any error -> Show push notification to the user.
await showPushNotificationFromServerMessages(fromUserId, encryptedContent); await showPushNotificationFromServerMessages(fromUserId, encryptedContent);

View file

@ -41,8 +41,10 @@ signalDecryptMessage(
int type, int type,
) async { ) async {
// Hold the lock only for the cryptographic operation, not for network I/O // Hold the lock only for the cryptographic operation, not for network I/O
Log.info('Acquiring lockingSignalProtocol for $fromUserId');
final (decryptedContent, errorType, needsResync) = await lockingSignalProtocol final (decryptedContent, errorType, needsResync) = await lockingSignalProtocol
.protect(() async { .protect(() async {
Log.info('Lock acquired for $fromUserId');
try { try {
final session = SessionCipher.fromStore( final session = SessionCipher.fromStore(
(await getSignalStore())!, (await getSignalStore())!,
@ -97,6 +99,8 @@ signalDecryptMessage(
} }
}); });
Log.info('Released lockingSignalProtocol for $fromUserId');
// Handle session resync OUTSIDE the lock to avoid holding it during // Handle session resync OUTSIDE the lock to avoid holding it during
// network round-trips (which can block for up to 60 seconds) // network round-trips (which can block for up to 60 seconds)
if (needsResync && !resyncedUsers.contains(fromUserId)) { if (needsResync && !resyncedUsers.contains(fromUserId)) {