diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a6df866..e8f59a7c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## 0.2.12 - Improved: Memories viewer redesigned with smoother animations and new quick-action controls. +- Fix: Reliability of receiving media files. ## 0.2.11 diff --git a/lib/src/database/daos/receipts.dao.dart b/lib/src/database/daos/receipts.dao.dart index ee966fa8..8e5ea2fb 100644 --- a/lib/src/database/daos/receipts.dao.dart +++ b/lib/src/database/daos/receipts.dao.dart @@ -191,6 +191,23 @@ class ReceiptsDao extends DatabaseAccessor with _$ReceiptsDaoMixin { )..where((c) => c.receiptId.equals(receiptId))).write(updates); } + Future rotateReceiptId(String oldReceiptId) async { + final newReceiptId = uuid.v4(); + await updateReceipt( + oldReceiptId, + ReceiptsCompanion( + receiptId: Value(newReceiptId), + ), + ); + final updatedReceipt = await getReceiptById(newReceiptId); + if (updatedReceipt == null) { + Log.error( + 'Tried to change the receipt ID, but could not get the updated receipt...', + ); + } + return updatedReceipt; + } + Future updateReceiptByContactAndMessageId( int contactId, String messageId, diff --git a/lib/src/services/api/client2client/media.c2c.dart b/lib/src/services/api/client2client/media.c2c.dart index 88a041e0..303cd3e2 100644 --- a/lib/src/services/api/client2client/media.c2c.dart +++ b/lib/src/services/api/client2client/media.c2c.dart @@ -121,6 +121,7 @@ Future handleMedia( MediaFile? mediaFile; Message? message; + Log.info('Starting transaction for media message ${media.senderMessageId}'); await twonlyDB.transaction(() async { mediaFile = await twonlyDB.mediaFilesDao.insertOrUpdateMedia( MediaFilesCompanion( @@ -163,6 +164,9 @@ Future handleMedia( ), ); }); + Log.info( + 'Finished transaction for media message ${media.senderMessageId}. Success: ${message != null}', + ); if (message != null && mediaFile != null) { await twonlyDB.groupsDao.increaseLastMessageExchange( diff --git a/lib/src/services/api/mediafiles/upload.api.dart b/lib/src/services/api/mediafiles/upload.api.dart index eb304f42..97c8a1fc 100644 --- a/lib/src/services/api/mediafiles/upload.api.dart +++ b/lib/src/services/api/mediafiles/upload.api.dart @@ -27,6 +27,16 @@ import 'package:twonly/src/utils/misc.dart'; import 'package:workmanager/workmanager.dart' hide TaskStatus; final lockRetransmission = Mutex(); +final Map _uploadMutexes = {}; + +Future _protectMediaUpload( + String mediaId, + Future Function() action, +) async { + final mutex = _uploadMutexes.putIfAbsent(mediaId, Mutex.new); + await mutex.protect(action); + _uploadMutexes.remove(mediaId); +} Future reuploadMediaFiles() async { return exclusiveAccess( @@ -42,18 +52,33 @@ Future reuploadMediaFiles() async { final contacts = {}; - for (final receipt in receipts) { + for (var receipt in receipts) { if (receipt.retryCount > 1 && receipt.lastRetry != null) { final twentyFourHoursAgo = DateTime.now().subtract( - const Duration(hours: 24), + const Duration(hours: 6), ); if (receipt.lastRetry!.isAfter(twentyFourHoursAgo)) { Log.info( - 'Ignoring ${receipt.receiptId} as it was retried in the last 24h', + 'Ignoring ${receipt.receiptId} as it was retried in the last 6h', ); continue; } } + + if (receipt.retryCount >= 2) { + // After two retries, change the receiptId. This addresses a bug where the receiver received the message and marked it as received, but the app was closed before the message was fully processed. Because the receipt was already stored, subsequent retries were detected as duplicates and rejected. + final oldReceiptId = receipt.receiptId; + final updatedReceipt = await twonlyDB.receiptsDao.rotateReceiptId( + oldReceiptId, + ); + if (updatedReceipt == null) continue; + + Log.info( + 'Changed receiptId $oldReceiptId to ${updatedReceipt.receiptId} as retryCount is ${receipt.retryCount}', + ); + receipt = updatedReceipt; + } + var messageId = receipt.messageId; if (receipt.messageId == null) { Log.info('Message not in receipt. Loading it from the content.'); @@ -146,7 +171,7 @@ Future reuploadMediaFiles() async { Log.info('Reuploading media file $messageId'); // the media file should be still on the server, so it should be enough // to just resend the message containing the download token. - await tryToSendCompleteMessage(receipt: receipt); + await tryToSendCompleteMessage(receiptId: receipt.receiptId); } } }, @@ -158,35 +183,43 @@ Future reuploadMediaFile( MediaFile mediaFile, String messageId, ) async { - Log.info('Reuploading media file: ${mediaFile.mediaId}'); + return _protectMediaUpload(mediaFile.mediaId, () async { + Log.info('Reuploading media file: ${mediaFile.mediaId}'); - await twonlyDB.receiptsDao.updateReceiptByContactAndMessageId( - contactId, - messageId, - const ReceiptsCompanion( - markForRetry: Value(null), - markForRetryAfterAccepted: Value(null), - ), - ); + await twonlyDB.receiptsDao.updateReceiptByContactAndMessageId( + contactId, + messageId, + const ReceiptsCompanion( + markForRetry: Value(null), + markForRetryAfterAccepted: Value(null), + ), + ); - final reuploadRequestedBy = (mediaFile.reuploadRequestedBy ?? []) - ..add(contactId); - await twonlyDB.mediaFilesDao.updateMedia( - mediaFile.mediaId, - MediaFilesCompanion( - uploadState: const Value(UploadState.preprocessing), - reuploadRequestedBy: Value(reuploadRequestedBy), - ), - ); - final mediaFileUpdated = await MediaFileService.fromMediaId( - mediaFile.mediaId, - ); - if (mediaFileUpdated != null) { - if (mediaFileUpdated.uploadRequestPath.existsSync()) { - mediaFileUpdated.uploadRequestPath.deleteSync(); + // Refresh media file to get latest reuploadRequestedBy + final currentMedia = await twonlyDB.mediaFilesDao.getMediaFileById( + mediaFile.mediaId, + ); + + final reuploadRequestedBy = (currentMedia?.reuploadRequestedBy ?? []) + ..add(contactId); + + await twonlyDB.mediaFilesDao.updateMedia( + mediaFile.mediaId, + MediaFilesCompanion( + uploadState: const Value(UploadState.preprocessing), + reuploadRequestedBy: Value(reuploadRequestedBy), + ), + ); + final mediaFileUpdated = await MediaFileService.fromMediaId( + mediaFile.mediaId, + ); + if (mediaFileUpdated != null) { + if (mediaFileUpdated.uploadRequestPath.existsSync()) { + mediaFileUpdated.uploadRequestPath.deleteSync(); + } + await _startBackgroundMediaUploadInternal(mediaFileUpdated); } - unawaited(startBackgroundMediaUpload(mediaFileUpdated)); - } + }); } final Mutex _lockPreprocessing = Mutex(); @@ -398,6 +431,18 @@ Future insertMediaFileInMessagesTable( } Future startBackgroundMediaUpload(MediaFileService mediaService) async { + return _protectMediaUpload( + mediaService.mediaFile.mediaId, + () => _startBackgroundMediaUploadInternal(mediaService), + ); +} + +Future _startBackgroundMediaUploadInternal( + MediaFileService mediaService, +) async { + // Refresh the media file state inside the mutex + await mediaService.updateFromDB(); + if (mediaService.mediaFile.uploadState == UploadState.initialized || mediaService.mediaFile.uploadState == UploadState.preprocessing) { await mediaService.setUploadState(UploadState.preprocessing); @@ -603,54 +648,50 @@ Future _createUploadRequest(MediaFileService media) async { await media.uploadRequestPath.writeAsBytes(uploadRequestBytes); } -Mutex protectUpload = Mutex(); - Future _uploadUploadRequest(MediaFileService media) async { - await protectUpload.protect(() async { - final currentMedia = await twonlyDB.mediaFilesDao.getMediaFileById( - media.mediaFile.mediaId, - ); + final currentMedia = await twonlyDB.mediaFilesDao.getMediaFileById( + media.mediaFile.mediaId, + ); - if (currentMedia == null || - currentMedia.uploadState == UploadState.backgroundUploadTaskStarted) { - Log.info('Download for ${media.mediaFile.mediaId} already started.'); - return null; - } + if (currentMedia == null || + currentMedia.uploadState == UploadState.backgroundUploadTaskStarted) { + Log.info('Download for ${media.mediaFile.mediaId} already started.'); + return; + } - final apiUrl = - 'http${apiService.apiSecure}://${apiService.apiHost}/api/upload'; + final apiUrl = + 'http${apiService.apiSecure}://${apiService.apiHost}/api/upload'; - Log.info('Starting upload from ${media.mediaFile.mediaId}'); + Log.info('Starting upload from ${media.mediaFile.mediaId}'); - final headers = await getAuthenticationHeader(); - if (headers == null) { - Log.error('Auth headers are empty. Returning'); - return; - } + final headers = await getAuthenticationHeader(); + if (headers == null) { + Log.error('Auth headers are empty. Returning'); + return; + } - final task = UploadTask.fromFile( - taskId: 'upload_${media.mediaFile.mediaId}', - displayName: media.mediaFile.type.name, - file: media.uploadRequestPath, - url: apiUrl, - priority: 0, - retries: 10, - headers: headers, - ); + final task = UploadTask.fromFile( + taskId: 'upload_${media.mediaFile.mediaId}', + displayName: media.mediaFile.type.name, + file: media.uploadRequestPath, + url: apiUrl, + priority: 0, + retries: 10, + headers: headers, + ); - final connectivityResult = await Connectivity().checkConnectivity(); + final connectivityResult = await Connectivity().checkConnectivity(); - if (AppState.isInBackgroundTask || - !connectivityResult.contains(ConnectivityResult.mobile) && - !connectivityResult.contains(ConnectivityResult.wifi)) { - // no internet, directly put it into the background... - await FileDownloader().enqueue(task); - await media.setUploadState(UploadState.backgroundUploadTaskStarted); - Log.info('Enqueue upload task: ${task.taskId}'); - } else { - unawaited(uploadFileFastOrEnqueue(task, media)); - } - }); + if (AppState.isInBackgroundTask || + !connectivityResult.contains(ConnectivityResult.mobile) && + !connectivityResult.contains(ConnectivityResult.wifi)) { + // no internet, directly put it into the background... + await FileDownloader().enqueue(task); + await media.setUploadState(UploadState.backgroundUploadTaskStarted); + Log.info('Enqueue upload task: ${task.taskId}'); + } else { + unawaited(uploadFileFastOrEnqueue(task, media)); + } } Future uploadFileFastOrEnqueue( diff --git a/lib/src/services/api/messages.api.dart b/lib/src/services/api/messages.api.dart index a9eabd3a..a1baadda 100644 --- a/lib/src/services/api/messages.api.dart +++ b/lib/src/services/api/messages.api.dart @@ -80,15 +80,29 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({ return null; } } - // ignore: parameter_assignments - receiptId = receipt.receiptId; + + if (receipt.retryCount >= 2) { + // After two retries, change the receiptId. This addresses a bug where the receiver received the message and marked it as received, + // but the app was closed before the message was fully processed. Because the receipt was already stored, subsequent retries were + // detected as duplicates and rejected. + final oldReceiptId = receipt.receiptId; + final updatedReceipt = await twonlyDB.receiptsDao.rotateReceiptId( + oldReceiptId, + ); + if (updatedReceipt != null) { + Log.info( + 'Changed receiptId $oldReceiptId to ${updatedReceipt.receiptId} as retryCount is ${receipt.retryCount}', + ); + receipt = updatedReceipt; + } + } final contact = await twonlyDB.contactsDao.getContactById( receipt.contactId, ); if (contact == null || contact.accountDeleted) { Log.warn('Will not send message again as user does not exist anymore.'); - await twonlyDB.receiptsDao.deleteReceipt(receiptId); + await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId); return null; } @@ -100,13 +114,13 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({ } final message = pb.Message.fromBuffer(receipt.message) - ..receiptId = receiptId; + ..receiptId = receipt.receiptId; final encryptedContent = pb.EncryptedContent.fromBuffer( message.encryptedContent, ); - Log.info('Uploading $receiptId.'); + Log.info('Uploading ${receipt.receiptId}.'); Uint8List? pushData; if (receipt.retryCount == 0) { @@ -164,7 +178,7 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({ if (resp.isError) { Log.warn('Could not transmit message got ${resp.error}.'); if (resp.error == ErrorCode.UserIdNotFound) { - await twonlyDB.receiptsDao.deleteReceipt(receiptId); + await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId); await twonlyDB.contactsDao.updateContact( receipt.contactId, const ContactsCompanion(accountDeleted: Value(true)), @@ -182,10 +196,10 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({ ); } if (!receipt.contactWillSendsReceipt) { - await twonlyDB.receiptsDao.deleteReceipt(receiptId); + await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId); } else { await twonlyDB.receiptsDao.updateReceipt( - receiptId, + receipt.receiptId, ReceiptsCompanion( ackByServerAt: Value(clock.now()), retryCount: Value(receipt.retryCount + 1), @@ -197,8 +211,8 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({ } } catch (e) { Log.error('Unknown Error when sending message: $e'); - if (receiptId != null) { - await twonlyDB.receiptsDao.deleteReceipt(receiptId); + if (receipt != null) { + await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId); } } return null; diff --git a/lib/src/services/api/server_messages.api.dart b/lib/src/services/api/server_messages.api.dart index 99ce4e36..8ff3aca6 100644 --- a/lib/src/services/api/server_messages.api.dart +++ b/lib/src/services/api/server_messages.api.dart @@ -4,6 +4,7 @@ import 'dart:io'; import 'package:clock/clock.dart'; import 'package:drift/drift.dart'; import 'package:hashlib/random.dart'; +import 'package:mutex/mutex.dart'; import 'package:twonly/globals.dart'; import 'package:twonly/locator.dart'; import 'package:twonly/src/database/daos/contacts.dao.dart'; @@ -77,24 +78,32 @@ Future handleServerMessage(server.ServerToClient msg) async { DateTime lastPushKeyRequest = clock.now().subtract(const Duration(hours: 1)); +final Map _messageLocks = {}; + Future handleClient2ClientMessage(NewMessage newMessage) async { final body = Uint8List.fromList(newMessage.body); - final fromUserId = newMessage.fromUserId.toInt(); - final message = Message.fromBuffer(body); final receiptId = message.receiptId; + final mutex = _messageLocks.putIfAbsent(receiptId, Mutex.new); + await mutex.protect(() async { + await _handleClient2ClientMessage(newMessage, message); + }); + _messageLocks.remove(receiptId); +} + +Future _handleClient2ClientMessage( + NewMessage newMessage, + Message message, +) async { + final fromUserId = newMessage.fromUserId.toInt(); + final receiptId = message.receiptId; + if (await twonlyDB.receiptsDao.isDuplicated(receiptId)) { return; } - try { - await twonlyDB.receiptsDao.gotReceipt(receiptId); - Log.info('Got a message with receiptId $receiptId'); - } catch (e) { - Log.error(e); - return; - } + Log.info('Started processing message with receiptId $receiptId'); switch (message.type) { case Message_Type.SENDER_DELIVERY_RECEIPT: @@ -209,7 +218,14 @@ Future handleClient2ClientMessage(NewMessage newMessage) async { await tryToSendCompleteMessage(receiptId: receiptId); } case Message_Type.TEST_NOTIFICATION: - return; + break; + } + + try { + await twonlyDB.receiptsDao.gotReceipt(receiptId); + Log.info('Got a message with receiptId $receiptId'); + } catch (e) { + Log.error('Error marking message as received $receiptId: $e'); } } @@ -238,6 +254,8 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessageRaw( ); } + Log.info('Calling handleEncryptedMessage for $receiptId'); + final (a, b) = await handleEncryptedMessage( fromUserId, encryptedContent, @@ -245,6 +263,8 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessageRaw( receiptId, ); + Log.info('Finished handleEncryptedMessage for $receiptId'); + if (Platform.isAndroid && a == null && b == null) { // Message was handled without any error -> Show push notification to the user. await showPushNotificationFromServerMessages(fromUserId, encryptedContent); diff --git a/lib/src/services/signal/encryption.signal.dart b/lib/src/services/signal/encryption.signal.dart index 0065eb4a..abbb1449 100644 --- a/lib/src/services/signal/encryption.signal.dart +++ b/lib/src/services/signal/encryption.signal.dart @@ -41,8 +41,10 @@ signalDecryptMessage( int type, ) async { // Hold the lock only for the cryptographic operation, not for network I/O + Log.info('Acquiring lockingSignalProtocol for $fromUserId'); final (decryptedContent, errorType, needsResync) = await lockingSignalProtocol .protect(() async { + Log.info('Lock acquired for $fromUserId'); try { final session = SessionCipher.fromStore( (await getSignalStore())!, @@ -97,6 +99,8 @@ signalDecryptMessage( } }); + Log.info('Released lockingSignalProtocol for $fromUserId'); + // Handle session resync OUTSIDE the lock to avoid holding it during // network round-trips (which can block for up to 60 seconds) if (needsResync && !resyncedUsers.contains(fromUserId)) {