From cfc6e945dada42fe2b04909ffa62162348609058 Mon Sep 17 00:00:00 2001 From: otsmr Date: Sun, 12 Apr 2026 02:01:31 +0200 Subject: [PATCH] fix: reupload of media files --- CHANGELOG.md | 6 + lib/src/database/daos/mediafiles.dao.dart | 6 +- lib/src/database/daos/messages.dao.dart | 2 +- lib/src/database/daos/receipts.dao.dart | 47 ++++- lib/src/services/api.service.dart | 6 +- .../services/api/client2client/media.c2c.dart | 52 +++--- .../api/mediafiles/upload.service.dart | 160 +++++++++++++++++- lib/src/services/api/messages.dart | 37 +++- .../save_to_gallery.dart | 15 +- lib/src/views/chats/chat_messages.view.dart | 2 +- .../typing_indicator.dart | 2 +- .../data_and_storage/import_media.view.dart | 2 +- 12 files changed, 287 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2814eef..8b582b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.1.5 + +- Fix: Reupload of media files was not working properly +- Fix: Chats where ordered wrongly +- Fix: Typing indicator was not shown always + ## 0.1.4 - New: Typing and chat open indicator diff --git a/lib/src/database/daos/mediafiles.dao.dart b/lib/src/database/daos/mediafiles.dao.dart index 7f4ff8c..01752ed 100644 --- a/lib/src/database/daos/mediafiles.dao.dart +++ b/lib/src/database/daos/mediafiles.dao.dart @@ -14,7 +14,7 @@ class MediaFilesDao extends DatabaseAccessor // ignore: matching_super_parameters MediaFilesDao(super.db); - Future insertMedia(MediaFilesCompanion mediaFile) async { + Future insertOrUpdateMedia(MediaFilesCompanion mediaFile) async { try { var insertMediaFile = mediaFile; @@ -24,7 +24,9 @@ class MediaFilesDao extends DatabaseAccessor ); } - final rowId = await into(mediaFiles).insert(insertMediaFile); + final rowId = await into( + mediaFiles, + ).insertOnConflictUpdate(insertMediaFile); return await (select( mediaFiles, diff --git a/lib/src/database/daos/messages.dao.dart b/lib/src/database/daos/messages.dao.dart index 2b58186..7a1c36a 100644 --- a/lib/src/database/daos/messages.dao.dart +++ b/lib/src/database/daos/messages.dao.dart @@ -318,7 +318,7 @@ class MessagesDao extends DatabaseAccessor with _$MessagesDaoMixin { ); } - final rowId = await into(messages).insert(insertMessage); + final rowId = await into(messages).insertOnConflictUpdate(insertMessage); await twonlyDB.groupsDao.updateGroup( message.groupId.value, diff --git a/lib/src/database/daos/receipts.dao.dart b/lib/src/database/daos/receipts.dao.dart index d3e4b78..34b9a7a 100644 --- a/lib/src/database/daos/receipts.dao.dart +++ b/lib/src/database/daos/receipts.dao.dart @@ -31,7 +31,7 @@ class ReceiptsDao extends DatabaseAccessor with _$ReceiptsDaoMixin { if (receipt == null) return; if (receipt.messageId != null) { - await into(messageActions).insert( + await into(messageActions).insertOnConflictUpdate( MessageActionsCompanion( messageId: Value(receipt.messageId!), contactId: Value(fromUserId), @@ -113,6 +113,16 @@ class ReceiptsDao extends DatabaseAccessor with _$ReceiptsDaoMixin { } } + Future> getReceiptsByContactAndMessageId( + int contactId, + String messageId, + ) async { + return (select(receipts)..where( + (t) => t.contactId.equals(contactId) & t.messageId.equals(messageId), + )) + .get(); + } + Future> getReceiptsForRetransmission() async { final markedRetriesTime = clock.now().subtract( const Duration( @@ -132,6 +142,24 @@ class ReceiptsDao extends DatabaseAccessor with _$ReceiptsDaoMixin { .get(); } + Future> getReceiptsForMediaRetransmissions() async { + final markedRetriesTime = clock.now().subtract( + const Duration( + // give the server time to transmit all messages to the client + seconds: 20, + ), + ); + return (select(receipts)..where( + (t) => + (t.markForRetry.isSmallerThanValue(markedRetriesTime) | + t.markForRetryAfterAccepted.isSmallerThanValue( + markedRetriesTime, + )) & + t.willBeRetriedByMediaUpload.equals(true), + )) + .get(); + } + Stream> watchAll() { return select(receipts).watch(); } @@ -155,6 +183,19 @@ class ReceiptsDao extends DatabaseAccessor with _$ReceiptsDaoMixin { )..where((c) => c.receiptId.equals(receiptId))).write(updates); } + Future updateReceiptByContactAndMessageId( + int contactId, + String messageId, + ReceiptsCompanion updates, + ) async { + await (update( + receipts, + )..where( + (c) => c.contactId.equals(contactId) & c.messageId.equals(messageId), + )) + .write(updates); + } + Future updateReceiptWidthUserId( int fromUserId, String receiptId, @@ -168,9 +209,7 @@ class ReceiptsDao extends DatabaseAccessor with _$ReceiptsDaoMixin { Future markMessagesForRetry(int contactId) async { await (update(receipts)..where( - (c) => - c.contactId.equals(contactId) & - c.willBeRetriedByMediaUpload.equals(false), + (c) => c.contactId.equals(contactId) & c.markForRetry.isNull(), )) .write( ReceiptsCompanion( diff --git a/lib/src/services/api.service.dart b/lib/src/services/api.service.dart index 4b65f32..432b16b 100644 --- a/lib/src/services/api.service.dart +++ b/lib/src/services/api.service.dart @@ -92,12 +92,14 @@ class ApiService { if (globalIsInBackgroundTask) { await retransmitRawBytes(); - await tryTransmitMessages(); + await retransmitAllMessages(); + await reuploadMediaFiles(); await tryDownloadAllMediaFiles(); } else if (!globalIsAppInBackground) { unawaited(retransmitRawBytes()); - unawaited(tryTransmitMessages()); + unawaited(retransmitAllMessages()); unawaited(tryDownloadAllMediaFiles()); + unawaited(reuploadMediaFiles()); twonlyDB.markUpdated(); unawaited(syncFlameCounters()); unawaited(setupNotificationWithUsers()); diff --git a/lib/src/services/api/client2client/media.c2c.dart b/lib/src/services/api/client2client/media.c2c.dart index 0369eb9..4f08d91 100644 --- a/lib/src/services/api/client2client/media.c2c.dart +++ b/lib/src/services/api/client2client/media.c2c.dart @@ -73,12 +73,38 @@ Future handleMedia( mediaType = MediaType.audio; } + var mediaIdValue = const Value.absent(); + final messageTmp = await twonlyDB.messagesDao .getMessageById(media.senderMessageId) .getSingleOrNull(); if (messageTmp != null) { - Log.warn('This message already exit. Message is dropped.'); - return; + if (messageTmp.senderId != fromUserId) { + Log.warn( + '$fromUserId tried to modify the message from ${messageTmp.senderId}.', + ); + return; + } + if (messageTmp.mediaId == null) { + Log.warn( + 'This message already exit without a mediaId. Message is dropped.', + ); + return; + } + final mediaFile = await twonlyDB.mediaFilesDao.getMediaFileById( + messageTmp.mediaId!, + ); + if (mediaFile?.downloadState != DownloadState.reuploadRequested) { + Log.warn( + 'This message and media file already exit and was not requested again. Dropping it.', + ); + return; + } + + if (mediaFile != null) { + // media file is reuploaded use the same mediaId + mediaIdValue = Value(mediaFile.mediaId); + } } int? displayLimitInMilliseconds; @@ -95,8 +121,9 @@ Future handleMedia( late Message? message; await twonlyDB.transaction(() async { - mediaFile = await twonlyDB.mediaFilesDao.insertMedia( + mediaFile = await twonlyDB.mediaFilesDao.insertOrUpdateMedia( MediaFilesCompanion( + mediaId: mediaIdValue, downloadState: const Value(DownloadState.pending), type: Value(mediaType), requiresAuthentication: Value(media.requiresAuthentication), @@ -205,23 +232,6 @@ Future handleMediaUpdate( case EncryptedContent_MediaUpdate_Type.DECRYPTION_ERROR: Log.info('Got media file decryption error ${mediaFile.mediaId}'); - final reuploadRequestedBy = mediaFile.reuploadRequestedBy ?? []; - reuploadRequestedBy.add(fromUserId); - await twonlyDB.mediaFilesDao.updateMedia( - mediaFile.mediaId, - MediaFilesCompanion( - uploadState: const Value(UploadState.preprocessing), - reuploadRequestedBy: Value(reuploadRequestedBy), - ), - ); - final mediaFileUpdated = await MediaFileService.fromMediaId( - mediaFile.mediaId, - ); - if (mediaFileUpdated != null) { - if (mediaFileUpdated.uploadRequestPath.existsSync()) { - mediaFileUpdated.uploadRequestPath.deleteSync(); - } - unawaited(startBackgroundMediaUpload(mediaFileUpdated)); - } + await reuploadMediaFile(fromUserId, mediaFile, message.messageId); } } diff --git a/lib/src/services/api/mediafiles/upload.service.dart b/lib/src/services/api/mediafiles/upload.service.dart index 16f745e..806bce5 100644 --- a/lib/src/services/api/mediafiles/upload.service.dart +++ b/lib/src/services/api/mediafiles/upload.service.dart @@ -26,6 +26,148 @@ import 'package:twonly/src/utils/log.dart'; import 'package:twonly/src/utils/misc.dart'; import 'package:workmanager/workmanager.dart' hide TaskStatus; +final lockRetransmission = Mutex(); + +Future reuploadMediaFiles() async { + return lockRetransmission.protect(() async { + final receipts = await twonlyDB.receiptsDao + .getReceiptsForMediaRetransmissions(); + + if (receipts.isEmpty) return; + + Log.info('Reuploading ${receipts.length} media files to the server.'); + + final contacts = {}; + + for (final receipt in receipts) { + if (receipt.retryCount > 1 && receipt.lastRetry != null) { + final twentyFourHoursAgo = DateTime.now().subtract( + const Duration(hours: 24), + ); + if (receipt.lastRetry!.isAfter(twentyFourHoursAgo)) { + Log.info( + 'Ignoring ${receipt.receiptId} as it was retried in the last 24h', + ); + continue; + } + } + var messageId = receipt.messageId; + if (receipt.messageId == null) { + Log.info('Message not in receipt. Loading it from the content.'); + try { + final content = EncryptedContent.fromBuffer(receipt.message); + if (content.hasMedia()) { + messageId = content.media.senderMessageId; + await twonlyDB.receiptsDao.updateReceipt( + receipt.receiptId, + ReceiptsCompanion( + messageId: Value(messageId), + ), + ); + } + } catch (e) { + Log.error(e); + } + } + if (messageId == null) { + Log.error('MessageId is empty for media file receipts'); + continue; + } + if (receipt.markForRetryAfterAccepted != null) { + if (!contacts.containsKey(receipt.contactId)) { + final contact = await twonlyDB.contactsDao + .getContactByUserId(receipt.contactId) + .getSingleOrNull(); + if (contact == null) { + Log.error( + 'Contact does not exists, but has a record in receipts, this should not be possible, because of the DELETE CASCADE relation.', + ); + continue; + } + contacts[receipt.contactId] = contact; + } + if (!(contacts[receipt.contactId]?.accepted ?? true)) { + Log.warn( + 'Could not send message as contact has still not yet accepted.', + ); + continue; + } + } + + if (receipt.ackByServerAt == null) { + // media file must be reuploaded again in case the media files + // was deleted by the server, the receiver will request a new media reupload + + final message = await twonlyDB.messagesDao + .getMessageById(messageId) + .getSingleOrNull(); + if (message == null || message.mediaId == null) { + Log.error( + 'Message not found for reupload of the receipt (${message == null} - ${message?.mediaId}).', + ); + continue; + } + + final mediaFile = await twonlyDB.mediaFilesDao.getMediaFileById( + message.mediaId!, + ); + if (mediaFile == null) { + Log.error( + 'Mediafile not found for reupload of the receipt (${message.messageId} - ${message.mediaId}).', + ); + continue; + } + await reuploadMediaFile( + receipt.contactId, + mediaFile, + message.messageId, + ); + } else { + Log.info('Reuploading media file $messageId'); + // the media file should be still on the server, so it should be enough + // to just resend the message containing the download token. + await tryToSendCompleteMessage(receipt: receipt); + } + } + }); +} + +Future reuploadMediaFile( + int contactId, + MediaFile mediaFile, + String messageId, +) async { + Log.info('Reuploading media file: ${mediaFile.mediaId}'); + + await twonlyDB.receiptsDao.updateReceiptByContactAndMessageId( + contactId, + messageId, + const ReceiptsCompanion( + markForRetry: Value(null), + markForRetryAfterAccepted: Value(null), + ), + ); + + final reuploadRequestedBy = (mediaFile.reuploadRequestedBy ?? []) + ..add(contactId); + await twonlyDB.mediaFilesDao.updateMedia( + mediaFile.mediaId, + MediaFilesCompanion( + uploadState: const Value(UploadState.preprocessing), + reuploadRequestedBy: Value(reuploadRequestedBy), + ), + ); + final mediaFileUpdated = await MediaFileService.fromMediaId( + mediaFile.mediaId, + ); + if (mediaFileUpdated != null) { + if (mediaFileUpdated.uploadRequestPath.existsSync()) { + mediaFileUpdated.uploadRequestPath.deleteSync(); + } + unawaited(startBackgroundMediaUpload(mediaFileUpdated)); + } +} + Future finishStartedPreprocessing() async { final mediaFiles = await twonlyDB.mediaFilesDao .getAllMediaFilesPendingUpload(); @@ -62,7 +204,7 @@ Future finishStartedPreprocessing() async { /// It can happen, that a media files is uploaded but not yet marked for been uploaded. /// For example because the background_downloader plugin has not yet reported the finished upload. -/// In case the the message receipts or a reaction was received, mark the media file as been uploaded. +/// In case the message receipts or a reaction was received, mark the media file as been uploaded. Future handleMediaRelatedResponseFromReceiver(String messageId) async { final message = await twonlyDB.messagesDao .getMessageById(messageId) @@ -100,6 +242,16 @@ Future markUploadAsSuccessful(MediaFile media) async { message.messageId, clock.now(), ); + await twonlyDB.receiptsDao.updateReceiptByContactAndMessageId( + contact.contactId, + message.messageId, + ReceiptsCompanion( + ackByServerAt: Value(clock.now()), + retryCount: const Value(1), + lastRetry: Value(clock.now()), + markForRetry: const Value(null), + ), + ); } } } @@ -122,7 +274,7 @@ Future initializeMediaUpload( const MediaFilesCompanion(isDraftMedia: Value(false)), ); - final mediaFile = await twonlyDB.mediaFilesDao.insertMedia( + final mediaFile = await twonlyDB.mediaFilesDao.insertOrUpdateMedia( MediaFilesCompanion( uploadState: const Value(UploadState.initialized), displayLimitInMilliseconds: Value(displayLimitInMilliseconds), @@ -313,7 +465,8 @@ Future _createUploadRequest(MediaFileService media) async { } if (media.mediaFile.reuploadRequestedBy != null) { - type = EncryptedContent_Media_Type.REUPLOAD; + // not used any more... Receiver detects automatically if it is an reupload... + // type = EncryptedContent_Media_Type.REUPLOAD; } final notEncryptedContent = EncryptedContent( @@ -340,6 +493,7 @@ Future _createUploadRequest(MediaFileService media) async { final cipherText = await sendCipherText( groupMember.contactId, notEncryptedContent, + messageId: message.messageId, onlyReturnEncryptedData: true, ); diff --git a/lib/src/services/api/messages.dart b/lib/src/services/api/messages.dart index e4fe6d9..07865ec 100644 --- a/lib/src/services/api/messages.dart +++ b/lib/src/services/api/messages.dart @@ -23,7 +23,7 @@ import 'package:twonly/src/utils/misc.dart'; final lockRetransmission = Mutex(); -Future tryTransmitMessages() async { +Future retransmitAllMessages() async { return lockRetransmission.protect(() async { final receipts = await twonlyDB.receiptsDao.getReceiptsForRetransmission(); @@ -304,7 +304,11 @@ Future sendCipherTextToGroup( }) async { final groupMembers = await twonlyDB.groupsDao.getGroupNonLeftMembers(groupId); - if (!onlySendIfNoReceiptsAreOpen) { + if (messageId != null || + encryptedContent.hasReaction() || + encryptedContent.hasMedia() || + encryptedContent.hasTextMessage()) { + // only update the counter in case this is a actual message await twonlyDB.groupsDao.increaseLastMessageExchange(groupId, clock.now()); } @@ -330,11 +334,11 @@ Future<(Uint8List, Uint8List?)?> sendCipherText( bool onlySendIfNoReceiptsAreOpen = false, }) async { if (onlySendIfNoReceiptsAreOpen) { - if (await twonlyDB.receiptsDao.getReceiptCountForContact( - contactId, - ) > - 0) { - // this prevents that this message is send in case the receiver is not online + final openReceipts = await twonlyDB.receiptsDao.getReceiptCountForContact( + contactId, + ); + if (openReceipts > 2) { + // this prevents that these types of messages are send in case the receiver is offline return null; } } @@ -344,12 +348,31 @@ Future<(Uint8List, Uint8List?)?> sendCipherText( ..type = pb.Message_Type.CIPHERTEXT ..encryptedContent = encryptedContent.writeToBuffer(); + var retryCounter = 0; + DateTime? lastRetry; + + if (messageId != null) { + final receipts = await twonlyDB.receiptsDao + .getReceiptsByContactAndMessageId(contactId, messageId); + + for (final receipt in receipts) { + if (receipt.lastRetry != null) { + lastRetry = receipt.lastRetry; + } + retryCounter += 1; + Log.info('Removing duplicated receipt for message $messageId'); + await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId); + } + } + final receipt = await twonlyDB.receiptsDao.insertReceipt( ReceiptsCompanion( contactId: Value(contactId), message: Value(response.writeToBuffer()), messageId: Value(messageId), willBeRetriedByMediaUpload: Value(onlyReturnEncryptedData), + retryCount: Value(retryCounter), + lastRetry: Value(lastRetry), ), ); diff --git a/lib/src/views/camera/camera_preview_components/save_to_gallery.dart b/lib/src/views/camera/camera_preview_components/save_to_gallery.dart index 282ccc5..efb57d4 100644 --- a/lib/src/views/camera/camera_preview_components/save_to_gallery.dart +++ b/lib/src/views/camera/camera_preview_components/save_to_gallery.dart @@ -52,13 +52,14 @@ class SaveToGalleryButtonState extends State { await widget.storeImageAsOriginal!(); } - final newMediaFile = await twonlyDB.mediaFilesDao.insertMedia( - MediaFilesCompanion( - type: Value(widget.mediaService.mediaFile.type), - createdAt: Value(clock.now()), - stored: const Value(true), - ), - ); + final newMediaFile = await twonlyDB.mediaFilesDao + .insertOrUpdateMedia( + MediaFilesCompanion( + type: Value(widget.mediaService.mediaFile.type), + createdAt: Value(clock.now()), + stored: const Value(true), + ), + ); if (newMediaFile != null) { final newService = MediaFileService(newMediaFile); diff --git a/lib/src/views/chats/chat_messages.view.dart b/lib/src/views/chats/chat_messages.view.dart index c3c3ac1..dcb1c2b 100644 --- a/lib/src/views/chats/chat_messages.view.dart +++ b/lib/src/views/chats/chat_messages.view.dart @@ -124,7 +124,7 @@ class _ChatMessagesViewState extends State { if (gUser.typingIndicators) { unawaited(sendTypingIndication(widget.groupId, false)); - _nextTypingIndicator = Timer.periodic(const Duration(seconds: 5), ( + _nextTypingIndicator = Timer.periodic(const Duration(seconds: 4), ( _, ) async { await sendTypingIndication(widget.groupId, false); diff --git a/lib/src/views/chats/chat_messages_components/typing_indicator.dart b/lib/src/views/chats/chat_messages_components/typing_indicator.dart index 695b897..da1f725 100644 --- a/lib/src/views/chats/chat_messages_components/typing_indicator.dart +++ b/lib/src/views/chats/chat_messages_components/typing_indicator.dart @@ -112,7 +112,7 @@ class _TypingIndicatorState extends State member.lastChatOpened!, ) .inSeconds <= - 8; + 6; } @override diff --git a/lib/src/views/settings/data_and_storage/import_media.view.dart b/lib/src/views/settings/data_and_storage/import_media.view.dart index a844643..bd8e8a5 100644 --- a/lib/src/views/settings/data_and_storage/import_media.view.dart +++ b/lib/src/views/settings/data_and_storage/import_media.view.dart @@ -111,7 +111,7 @@ class _ImportMediaViewState extends State { continue; } - final mediaFile = await twonlyDB.mediaFilesDao.insertMedia( + final mediaFile = await twonlyDB.mediaFilesDao.insertOrUpdateMedia( MediaFilesCompanion( type: Value(type), createdAt: Value(file.lastModDateTime),