Issue with receiving messages when user closed app while decrypting

This commit is contained in:
otsmr 2026-05-19 01:54:42 +02:00
parent d7e4da0e55
commit fc5c74eaed
18 changed files with 240 additions and 143 deletions

View file

@ -90,6 +90,11 @@ class ApiService {
StreamSubscription<List<ConnectivityResult>>? _connectivitySubscription;
Future<bool> _connectTo(String apiUrl) async {
if (kDebugMode) {
print(
'DEBUG: ApiService._connectTo called with: $apiUrl (appIsOutdated=$appIsOutdated)',
);
}
if (appIsOutdated) return false;
try {
final channel = IOWebSocketChannel.connect(
@ -101,7 +106,10 @@ class ApiService {
await _channel!.ready;
Log.info('websocket connected to $apiUrl');
return true;
} catch (_) {
} catch (e, s) {
if (kDebugMode) {
print('DEBUG: _connectTo caught exception: $e\n$s');
}
return false;
}
}

View file

@ -10,9 +10,10 @@ Future<void> handleAdditionalDataMessage(
int fromUserId,
String groupId,
EncryptedContent_AdditionalDataMessage message,
String receiptId,
) async {
Log.info(
'Got a additional data message: ${message.senderMessageId} from $groupId',
'[$receiptId] Got a additional data message: ${message.senderMessageId} from $groupId',
);
// Prevent message overwrite: reject if a message with this ID already
@ -22,7 +23,7 @@ Future<void> handleAdditionalDataMessage(
.getSingleOrNull();
if (existing != null && existing.senderId != fromUserId) {
Log.warn(
'$fromUserId tried to overwrite message from ${existing.senderId}. Dropping.',
'[$receiptId] $fromUserId tried to overwrite message from ${existing.senderId}. Dropping.',
);
return;
}
@ -45,6 +46,6 @@ Future<void> handleAdditionalDataMessage(
fromTimestamp(message.timestamp),
);
if (msg != null) {
Log.info('Inserted a new text message with ID: ${msg.messageId}');
Log.info('[$receiptId] Inserted a new text message with ID: ${msg.messageId}');
}
}

View file

@ -88,16 +88,17 @@ Future<void> handleContactAccept(int fromUserId) async {
Future<bool> handleContactRequest(
int fromUserId,
EncryptedContent_ContactRequest contactRequest,
String receiptId,
) async {
switch (contactRequest.type) {
case EncryptedContent_ContactRequest_Type.REQUEST:
Log.info('Got a contact request from $fromUserId');
Log.info('[$receiptId] Got a contact request from $fromUserId');
return handleNewContactRequest(fromUserId);
case EncryptedContent_ContactRequest_Type.ACCEPT:
Log.info('Got a contact accept from $fromUserId');
Log.info('[$receiptId] Got a contact accept from $fromUserId');
await handleContactAccept(fromUserId);
case EncryptedContent_ContactRequest_Type.REJECT:
Log.info('Got a contact reject from $fromUserId');
Log.info('[$receiptId] Got a contact reject from $fromUserId');
await twonlyDB.contactsDao.updateContact(
fromUserId,
const ContactsCompanion(
@ -114,14 +115,15 @@ Future<void> handleContactUpdate(
int fromUserId,
EncryptedContent_ContactUpdate contactUpdate,
int? senderProfileCounter,
String receiptId,
) async {
switch (contactUpdate.type) {
case EncryptedContent_ContactUpdate_Type.REQUEST:
Log.info('Got a contact update request from $fromUserId');
Log.info('[$receiptId] Got a contact update request from $fromUserId');
await sendContactMyProfileData(fromUserId);
case EncryptedContent_ContactUpdate_Type.UPDATE:
Log.info('Got a contact update $fromUserId');
Log.info('[$receiptId] Got a contact update $fromUserId');
Uint8List? avatarSvgCompressed;
if (contactUpdate.hasAvatarSvgCompressed()) {
avatarSvgCompressed = Uint8List.fromList(
@ -188,8 +190,9 @@ Future<void> handleContactUpdate(
Future<void> handleFlameSync(
String groupId,
EncryptedContent_FlameSync flameSync,
String receiptId,
) async {
Log.info('Got a flameSync for group $groupId');
Log.info('[$receiptId] Got a flameSync for group $groupId');
final group = await twonlyDB.groupsDao.getGroup(groupId);
if (group == null || group.lastFlameCounterChange == null) return;

View file

@ -8,8 +8,9 @@ import 'package:twonly/src/utils/log.dart';
Future<void> handleErrorMessage(
int fromUserId,
EncryptedContent_ErrorMessages error,
String receiptId,
) async {
Log.error('Got error from $fromUserId: $error');
Log.error('[$receiptId] Got error from $fromUserId: $error');
switch (error.type) {
case EncryptedContent_ErrorMessages_Type

View file

@ -15,6 +15,7 @@ Future<void> handleGroupCreate(
int fromUserId,
String groupId,
EncryptedContent_GroupCreate newGroup,
String receiptId,
) async {
final user = await twonlyDB.contactsDao
.getContactByUserId(fromUserId)
@ -22,7 +23,7 @@ Future<void> handleGroupCreate(
if (user == null) {
// Only contacts can invite other contacts, so this can (via the UI) not happen.
Log.error(
'User is not a contact. Aborting.',
'[$receiptId] User is not a contact. Aborting.',
);
return;
}
@ -66,7 +67,7 @@ Future<void> handleGroupCreate(
if (group == null) {
Log.error(
'Could not create new group. Probably because the group already existed.',
'[$receiptId] Could not create new group. Probably because the group already existed.',
);
return;
}
@ -108,12 +109,13 @@ Future<void> handleGroupUpdate(
int fromUserId,
String groupId,
EncryptedContent_GroupUpdate update,
String receiptId,
) async {
Log.info('Got group update for $groupId from $fromUserId');
Log.info('[$receiptId] Got group update for $groupId from $fromUserId');
final actionType = groupActionTypeFromString(update.groupActionType);
if (actionType == null) {
Log.error('Group action ${update.groupActionType} is unknown ignoring.');
Log.error('[$receiptId] Group action ${update.groupActionType} is unknown ignoring.');
return;
}
@ -189,10 +191,11 @@ Future<bool> handleGroupJoin(
int fromUserId,
String groupId,
EncryptedContent_GroupJoin join,
String receiptId,
) async {
if (await twonlyDB.contactsDao.getContactById(fromUserId) == null) {
if (!await addNewHiddenContact(fromUserId)) {
Log.error('Got group join, but could not load contact.');
Log.error('[$receiptId] Got group join, but could not load contact.');
// This can happen in case the group join was received before the group create.
// In this case return false, which will cause the receipt to fail and the user
// will resend this message.
@ -213,6 +216,7 @@ Future<void> handleResendGroupPublicKey(
int fromUserId,
String groupId,
EncryptedContent_GroupJoin join,
String receiptId,
) async {
final group = await twonlyDB.groupsDao.getGroup(groupId);
if (group == null || group.myGroupPrivateKey == null) return;
@ -232,6 +236,7 @@ Future<void> handleTypingIndicator(
int fromUserId,
String groupId,
EncryptedContent_TypingIndicator indicator,
String receiptId,
) async {
var lastTypeIndicator = const Value<DateTime?>.absent();

View file

@ -18,9 +18,10 @@ Future<void> handleMedia(
int fromUserId,
String groupId,
EncryptedContent_Media media,
String receiptId,
) async {
Log.info(
'Got a media message: ${media.senderMessageId} from $groupId with type ${media.type}',
'[$receiptId] Got a media message: ${media.senderMessageId} from $groupId with type ${media.type}',
);
late MediaType mediaType;
@ -33,7 +34,7 @@ Future<void> handleMedia(
message.senderId != fromUserId ||
message.mediaId == null) {
Log.warn(
'Got reupload from $fromUserId for a message that either does not exists (${message == null}) or senderId = ${message?.senderId}',
'[$receiptId] Got reupload for a message that either does not exists (${message == null}) or senderId = ${message?.senderId}',
);
return;
}
@ -82,13 +83,13 @@ Future<void> handleMedia(
if (messageTmp != null) {
if (messageTmp.senderId != fromUserId) {
Log.warn(
'$fromUserId tried to modify the message from ${messageTmp.senderId}.',
'[$receiptId] $fromUserId tried to modify the message from ${messageTmp.senderId}.',
);
return;
}
if (messageTmp.mediaId == null) {
Log.warn(
'This message already exit without a mediaId. Message is dropped.',
'[$receiptId] This message already exit without a mediaId. Message is dropped.',
);
return;
}
@ -97,7 +98,7 @@ Future<void> handleMedia(
);
if (mediaFile?.downloadState != DownloadState.reuploadRequested) {
Log.warn(
'This message and media file already exit and was not requested again. Dropping it.',
'[$receiptId] This message and media file already exit and was not requested again. Dropping it.',
);
return;
}
@ -121,7 +122,9 @@ Future<void> handleMedia(
MediaFile? mediaFile;
Message? message;
Log.info('Starting transaction for media message ${media.senderMessageId}');
Log.info(
'[$receiptId] Starting transaction for media message ${media.senderMessageId}',
);
await twonlyDB.transaction(() async {
mediaFile = await twonlyDB.mediaFilesDao.insertOrUpdateMedia(
MediaFilesCompanion(
@ -141,7 +144,7 @@ Future<void> handleMedia(
);
if (mediaFile == null) {
Log.error('Could not insert media file into database');
Log.error('[$receiptId] Could not insert media file into database');
return;
}
@ -165,7 +168,7 @@ Future<void> handleMedia(
);
});
Log.info(
'Finished transaction for media message ${media.senderMessageId}. Success: ${message != null}',
'[$receiptId] Finished transaction for media message ${media.senderMessageId}. Success: ${message != null}',
);
if (message != null && mediaFile != null) {
@ -173,7 +176,9 @@ Future<void> handleMedia(
groupId,
fromTimestamp(media.timestamp),
);
Log.info('Inserted a new media message with ID: ${message!.messageId}');
Log.info(
'[$receiptId] Inserted a new media message with ID: ${message!.messageId}',
);
await incFlameCounter(
message!.groupId,
true,
@ -184,12 +189,16 @@ Future<void> handleMedia(
} else {
if (mediaFile == null && message == null) {
Log.error(
'Could not insert new message as both the message and mediaFile are empty.',
'[$receiptId] Could not insert new message as both the message and mediaFile are empty.',
);
} else if (mediaFile == null) {
Log.error('Could not insert new message as the mediaFile is empty.');
Log.error(
'[$receiptId] Could not insert new message as the mediaFile is empty.',
);
} else {
Log.error('Could not insert new message as the message is empty.');
Log.error(
'[$receiptId] Could not insert new message as the message is empty.',
);
}
}
}
@ -197,6 +206,7 @@ Future<void> handleMedia(
Future<void> handleMediaUpdate(
int fromUserId,
EncryptedContent_MediaUpdate mediaUpdate,
String receiptId,
) async {
final message = await twonlyDB.messagesDao
.getMessageById(mediaUpdate.targetMessageId)
@ -204,14 +214,14 @@ Future<void> handleMediaUpdate(
if (message == null) {
// this can happen, in case the message was already deleted.
Log.info(
'Got media update to message ${mediaUpdate.targetMessageId} but message not found.',
'[$receiptId] Got media update to message ${mediaUpdate.targetMessageId} but message not found.',
);
return;
}
if (message.mediaId == null) {
// this can happen, in case the message was already deleted.
Log.warn(
'Got media update for message ${mediaUpdate.targetMessageId} which does not have a mediaId defined.',
'[$receiptId] Got media update for message ${mediaUpdate.targetMessageId} which does not have a mediaId defined.',
);
return;
}
@ -220,14 +230,14 @@ Future<void> handleMediaUpdate(
);
if (mediaFile == null) {
Log.info(
'Got media file update, but media file was not found ${message.mediaId}',
'[$receiptId] Got media file update, but media file was not found ${message.mediaId}',
);
return;
}
switch (mediaUpdate.type) {
case EncryptedContent_MediaUpdate_Type.REOPENED:
Log.info('Got media file reopened ${mediaFile.mediaId}');
Log.info('[$receiptId] Got media file reopened ${mediaFile.mediaId}');
await twonlyDB.messagesDao.updateMessageId(
message.messageId,
const MessagesCompanion(
@ -235,7 +245,7 @@ Future<void> handleMediaUpdate(
),
);
case EncryptedContent_MediaUpdate_Type.STORED:
Log.info('Got media file stored ${mediaFile.mediaId}');
Log.info('[$receiptId] Got media file stored ${mediaFile.mediaId}');
final mediaService = MediaFileService(mediaFile);
await mediaService.storeMediaFile();
await twonlyDB.messagesDao.updateMessageId(
@ -246,7 +256,9 @@ Future<void> handleMediaUpdate(
);
case EncryptedContent_MediaUpdate_Type.DECRYPTION_ERROR:
Log.info('Got media file decryption error ${mediaFile.mediaId}');
Log.info(
'[$receiptId] Got media file decryption error ${mediaFile.mediaId}',
);
await reuploadMediaFile(fromUserId, mediaFile, message.messageId);
}
}

View file

@ -7,11 +7,12 @@ import 'package:twonly/src/utils/log.dart';
Future<void> handleMessageUpdate(
int contactId,
EncryptedContent_MessageUpdate messageUpdate,
String receiptId,
) async {
switch (messageUpdate.type) {
case EncryptedContent_MessageUpdate_Type.OPENED:
Log.info(
'Opened message ${messageUpdate.multipleTargetMessageIds}',
'[$receiptId] Opened message ${messageUpdate.multipleTargetMessageIds}',
);
try {
await twonlyDB.messagesDao.handleMessagesOpened(
@ -20,13 +21,13 @@ Future<void> handleMessageUpdate(
fromTimestamp(messageUpdate.timestamp),
);
} catch (e) {
Log.warn(e);
Log.warn('[$receiptId] Error handling messages opened: $e');
}
case EncryptedContent_MessageUpdate_Type.DELETE:
if (!await isSender(contactId, messageUpdate.senderMessageId)) {
if (!await isSender(contactId, messageUpdate.senderMessageId, receiptId)) {
return;
}
Log.info('Delete message ${messageUpdate.senderMessageId}');
Log.info('[$receiptId] Delete message ${messageUpdate.senderMessageId}');
try {
await twonlyDB.messagesDao.handleMessageDeletion(
contactId,
@ -34,13 +35,13 @@ Future<void> handleMessageUpdate(
fromTimestamp(messageUpdate.timestamp),
);
} catch (e) {
Log.warn(e);
Log.warn('[$receiptId] Error handling message deletion: $e');
}
case EncryptedContent_MessageUpdate_Type.EDIT_TEXT:
if (!await isSender(contactId, messageUpdate.senderMessageId)) {
if (!await isSender(contactId, messageUpdate.senderMessageId, receiptId)) {
return;
}
Log.info('Edit message ${messageUpdate.senderMessageId}');
Log.info('[$receiptId] Edit message ${messageUpdate.senderMessageId}');
try {
await twonlyDB.messagesDao.handleTextEdit(
contactId,
@ -49,12 +50,12 @@ Future<void> handleMessageUpdate(
fromTimestamp(messageUpdate.timestamp),
);
} catch (e) {
Log.warn(e);
Log.warn('[$receiptId] Error handling text edit: $e');
}
}
}
Future<bool> isSender(int fromUserId, String messageId) async {
Future<bool> isSender(int fromUserId, String messageId, String receiptId) async {
final message = await twonlyDB.messagesDao
.getMessageById(messageId)
.getSingleOrNull();
@ -62,6 +63,6 @@ Future<bool> isSender(int fromUserId, String messageId) async {
if (message.senderId == fromUserId) {
return true;
}
Log.error('Contact $fromUserId tried to modify the message $messageId');
Log.error('[$receiptId] Contact $fromUserId tried to modify the message $messageId');
return false;
}

View file

@ -10,10 +10,11 @@ DateTime lastPushKeyRequest = clock.now().subtract(const Duration(hours: 1));
Future<void> handlePushKey(
int contactId,
EncryptedContent_PushKeys pushKeys,
String receiptId,
) async {
switch (pushKeys.type) {
case EncryptedContent_PushKeys_Type.REQUEST:
Log.info('Got a pushkey request from $contactId');
Log.info('[$receiptId] Got a pushkey request from $contactId');
if (lastPushKeyRequest.isBefore(
clock.now().subtract(const Duration(seconds: 60)),
)) {
@ -22,7 +23,7 @@ Future<void> handlePushKey(
}
case EncryptedContent_PushKeys_Type.UPDATE:
Log.info('Got a pushkey update from $contactId');
Log.info('[$receiptId] Got a pushkey update from $contactId');
await handleNewPushKey(contactId, pushKeys.keyId.toInt(), pushKeys.key);
}
}

View file

@ -8,8 +8,9 @@ Future<void> handleReaction(
int fromUserId,
String groupId,
EncryptedContent_Reaction reaction,
String receiptId,
) async {
Log.info('Got a reaction from $fromUserId (remove=${reaction.remove})');
Log.info('[$receiptId] Got a reaction from $fromUserId (remove=${reaction.remove})');
await twonlyDB.reactionsDao.updateReaction(
fromUserId,
reaction.targetMessageId,

View file

@ -11,9 +11,10 @@ Future<void> handleTextMessage(
int fromUserId,
String groupId,
EncryptedContent_TextMessage textMessage,
String receiptId,
) async {
Log.info(
'Got a text message: ${textMessage.senderMessageId} from $groupId',
'[$receiptId] Got a text message: ${textMessage.senderMessageId} from $groupId',
);
// Prevent message overwrite: reject if a message with this ID already
@ -23,7 +24,7 @@ Future<void> handleTextMessage(
.getSingleOrNull();
if (existing != null && existing.senderId != fromUserId) {
Log.warn(
'$fromUserId tried to overwrite message from ${existing.senderId}. Dropping.',
'[$receiptId] $fromUserId tried to overwrite message from ${existing.senderId}. Dropping.',
);
return;
}
@ -47,6 +48,6 @@ Future<void> handleTextMessage(
fromTimestamp(textMessage.timestamp),
);
if (message != null) {
Log.info('Inserted a new text message with ID: ${message.messageId}');
Log.info('[$receiptId] Inserted a new text message with ID: ${message.messageId}');
}
}

View file

@ -15,6 +15,7 @@ void resetUserDiscoveryRequestUpdates() {
Future<void> checkForUserDiscoveryChanges(
int fromUserId,
List<int> receivedVersion,
String receiptId,
) async {
final currentVersion = await UserDiscoveryService.shouldRequestNewMessages(
fromUserId,
@ -26,7 +27,7 @@ Future<void> checkForUserDiscoveryChanges(
// Only request a new version once per app session
return;
}
Log.info('Having old version from contact. Requesting new version.');
Log.info('[$receiptId] Having old version from contact. Requesting new version.');
_requestedUpdates.add(fromUserId);
await sendCipherText(
fromUserId,
@ -42,18 +43,19 @@ Future<void> checkForUserDiscoveryChanges(
Future<void> handleUserDiscoveryRequest(
int fromUserId,
EncryptedContent_UserDiscoveryRequest request,
String receiptId,
) async {
Log.info('Got a user discovery request');
Log.info('[$receiptId] Got a user discovery request');
if (!userService.currentUser.isUserDiscoveryEnabled) {
Log.warn('Got a user discovery request while it is disabled');
Log.warn('[$receiptId] Got a user discovery request while it is disabled');
return;
}
final contact = await twonlyDB.contactsDao.getContactById(fromUserId);
if (!UserDiscoveryService.isContactAllowed(contact)) {
Log.warn(
'Got a request to update user discovery, but mediaSendCounter (${contact?.mediaSendCounter}) < ${userService.currentUser.requiredSendImages} or user is excluded ${contact?.userDiscoveryExcluded}',
'[$receiptId] Got a request to update user discovery, but mediaSendCounter (${contact?.mediaSendCounter}) < ${userService.currentUser.requiredSendImages} or user is excluded ${contact?.userDiscoveryExcluded}',
);
return;
}
@ -63,7 +65,7 @@ Future<void> handleUserDiscoveryRequest(
request.currentVersion,
);
if (newMessages != null && newMessages.isNotEmpty) {
Log.info('Sending ${newMessages.length} user discovery messages');
Log.info('[$receiptId] Sending ${newMessages.length} user discovery messages');
await sendCipherText(
fromUserId,
EncryptedContent(
@ -73,19 +75,20 @@ Future<void> handleUserDiscoveryRequest(
),
);
} else {
Log.info('Got update request, but there are no new updates for the user');
Log.info('[$receiptId] Got update request, but there are no new updates for the user');
}
}
Future<void> handleUserDiscoveryUpdate(
int fromUserId,
EncryptedContent_UserDiscoveryUpdate update,
String receiptId,
) async {
if (!userService.currentUser.isUserDiscoveryEnabled) {
Log.warn('Got a user discovery update while it is disabled');
Log.warn('[$receiptId] Got a user discovery update while it is disabled');
return;
}
Log.info('Got ${update.messages.length} user discovery messages');
Log.info('[$receiptId] Got ${update.messages.length} user discovery messages');
await UserDiscoveryService.handleNewMessages(
fromUserId,
update.messages.map(Uint8List.fromList).toList(),

View file

@ -35,7 +35,6 @@ Future<void> _protectMediaUpload(
) async {
final mutex = _uploadMutexes.putIfAbsent(mediaId, Mutex.new);
await mutex.protect(action);
_uploadMutexes.remove(mediaId);
}
Future<void> reuploadMediaFiles() async {

View file

@ -61,6 +61,8 @@ Future<void> retransmitAllMessages() async {
});
}
final Map<String, Mutex> _tryToSendLocks = {};
// When the ackByServerAt is set this value is written in the receipted
Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
String? receiptId,
@ -68,15 +70,41 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
bool onlyReturnEncryptedData = false,
bool blocking = true,
}) async {
final rId = receiptId ?? receipt?.receiptId;
if (rId == null) {
Log.error(
'Cannot try to send complete message as both receiptId and receipt are null.',
);
return null;
}
final mutex = _tryToSendLocks.putIfAbsent(rId, Mutex.new);
return mutex.protect(() async {
return _tryToSendCompleteMessageInternal(
receiptId: receiptId,
receipt: receipt,
onlyReturnEncryptedData: onlyReturnEncryptedData,
blocking: blocking,
);
});
}
Future<(Uint8List, Uint8List?)?> _tryToSendCompleteMessageInternal({
String? receiptId,
Receipt? receipt,
bool onlyReturnEncryptedData = false,
bool blocking = true,
}) async {
// this should have a lock for every receiptID, split the function into a _internal withou the lock and a normal with the lock
if (apiService.appIsOutdated) return null;
if (receiptId == null && receipt == null) return null;
try {
if (receiptId == null && receipt == null) return null;
if (receipt == null) {
// ignore: parameter_assignments
receipt = await twonlyDB.receiptsDao.getReceiptById(receiptId!);
if (receipt == null) {
Log.warn('Receipt not found.');
Log.warn('[$receiptId] Receipt not found.');
return null;
}
}
@ -120,7 +148,7 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
message.encryptedContent,
);
Log.info('Uploading ${receipt.receiptId}.');
Log.info('Uploading message with receiptID ${receipt.receiptId}.');
Uint8List? pushData;
if (receipt.retryCount == 0) {
@ -176,7 +204,7 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
);
if (resp.isError) {
Log.warn('Could not transmit message got ${resp.error}.');
Log.warn('Could not transmit ${receipt.receiptId} got ${resp.error}.');
if (resp.error == ErrorCode.UserIdNotFound) {
await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId);
await twonlyDB.contactsDao.updateContact(
@ -210,7 +238,7 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
}
}
} catch (e) {
Log.error('Unknown Error when sending message: $e');
Log.error('[$receiptId] unknown error when sending message: $e');
if (receipt != null) {
await twonlyDB.receiptsDao.deleteReceipt(receipt.receiptId);
}

View file

@ -73,7 +73,7 @@ Future<void> handleServerMessage(server.ServerToClient msg) async {
await apiService.sendResponse(ClientToServer()..v0 = v0);
AppState.gotMessageFromServer = true;
Log.info('Message from server proccessed.');
Log.info('All messages from the server proccessed.');
}
DateTime lastPushKeyRequest = clock.now().subtract(const Duration(hours: 1));
@ -89,7 +89,6 @@ Future<void> handleClient2ClientMessage(NewMessage newMessage) async {
await mutex.protect(() async {
await _handleClient2ClientMessage(newMessage, message);
});
_messageLocks.remove(receiptId);
}
Future<void> _handleClient2ClientMessage(
@ -103,11 +102,11 @@ Future<void> _handleClient2ClientMessage(
return;
}
Log.info('Started processing message with receiptId $receiptId');
Log.info('[$receiptId] Started processing message');
switch (message.type) {
case Message_Type.SENDER_DELIVERY_RECEIPT:
Log.info('Got delivery receipt for $receiptId!');
Log.info('[$receiptId] Got delivery receipt!');
await twonlyDB.receiptsDao.confirmReceipt(receiptId, fromUserId);
case Message_Type.PLAINTEXT_CONTENT:
@ -120,13 +119,13 @@ Future<void> _handleClient2ClientMessage(
await handleSessionResync(fromUserId);
}
Log.info(
'Got decryption error: ${message.plaintextContent.decryptionErrorMessage.type} for $receiptId',
'[$receiptId] Got decryption error: ${message.plaintextContent.decryptionErrorMessage.type}',
);
retry = true;
}
if (message.plaintextContent.hasRetryControlError()) {
Log.info(
'Got access control error for $receiptId. Resending message.',
'[$receiptId] Got access control error. Resending message.',
);
retry = true;
}
@ -141,6 +140,9 @@ Future<void> _handleClient2ClientMessage(
ackByServerAt: const Value(null),
),
);
Log.info(
'[$receiptId] Sending error message to the original sender with receiptId $newReceiptId.',
);
await tryToSendCompleteMessage(receiptId: newReceiptId);
}
@ -197,7 +199,6 @@ Future<void> _handleClient2ClientMessage(
receiptIdDB = const Value.absent();
} else {
// Message was successful processed
//
}
}
@ -213,7 +214,7 @@ Future<void> _handleClient2ClientMessage(
),
);
} catch (e) {
Log.warn(e);
Log.warn('[$receiptId] Error inserting receipt: $e');
}
await tryToSendCompleteMessage(receiptId: receiptId);
}
@ -223,9 +224,9 @@ Future<void> _handleClient2ClientMessage(
try {
await twonlyDB.receiptsDao.gotReceipt(receiptId);
Log.info('Got a message with receiptId $receiptId');
Log.info('[$receiptId] Finished processing');
} catch (e) {
Log.error('Error marking message as received $receiptId: $e');
Log.error('[$receiptId] Error marking message as received: $e');
}
}
@ -235,26 +236,26 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessageRaw(
Message_Type messageType,
String receiptId,
) async {
final (encryptedContent, decryptionErrorType) = await signalDecryptMessage(
Log.info('[$receiptId] calling signalDecryptMessage');
var (encryptedContent, decryptionErrorType) = await signalDecryptMessage(
fromUserId,
encryptedContentRaw,
messageType.value,
);
if (encryptedContent == null) {
if (decryptionErrorType == null) {
// Duplicate message
return (null, null);
}
return (
null,
PlaintextContent()
..decryptionErrorMessage = (PlaintextContent_DecryptionErrorMessage()
..type = decryptionErrorType),
PlaintextContent(
decryptionErrorMessage: PlaintextContent_DecryptionErrorMessage(
type: decryptionErrorType ??=
PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN,
),
),
);
}
Log.info('Calling handleEncryptedMessage for $receiptId');
Log.info('[$receiptId] Calling handleEncryptedMessage');
final (a, b) = await handleEncryptedMessage(
fromUserId,
@ -263,7 +264,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessageRaw(
receiptId,
);
Log.info('Finished handleEncryptedMessage for $receiptId');
Log.info('[$receiptId] Finished handleEncryptedMessage');
if (Platform.isAndroid && a == null && b == null) {
// Message was handled without any error -> Show push notification to the user.
@ -294,11 +295,16 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
await checkForUserDiscoveryChanges(
fromUserId,
content.senderUserDiscoveryVersion,
receiptId,
);
}
if (content.hasContactRequest()) {
if (!await handleContactRequest(fromUserId, content.contactRequest)) {
if (!await handleContactRequest(
fromUserId,
content.contactRequest,
receiptId,
)) {
return (
null,
PlaintextContent()
@ -312,6 +318,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
await handleErrorMessage(
fromUserId,
content.errorMessages,
receiptId,
);
return (null, null);
}
@ -321,6 +328,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.contactUpdate,
senderProfileCounter,
receiptId,
);
return (null, null);
}
@ -329,6 +337,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
await handleUserDiscoveryRequest(
fromUserId,
content.userDiscoveryRequest,
receiptId,
);
return (null, null);
}
@ -337,12 +346,13 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
await handleUserDiscoveryUpdate(
fromUserId,
content.userDiscoveryUpdate,
receiptId,
);
return (null, null);
}
if (content.hasPushKeys()) {
await handlePushKey(fromUserId, content.pushKeys);
await handlePushKey(fromUserId, content.pushKeys, receiptId);
return (null, null);
}
@ -350,6 +360,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
await handleMessageUpdate(
fromUserId,
content.messageUpdate,
receiptId,
);
return (null, null);
}
@ -366,12 +377,13 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
await handleMediaUpdate(
fromUserId,
content.mediaUpdate,
receiptId,
);
return (null, null);
}
if (!content.hasGroupId()) {
Log.error('Messages should have a groupId $fromUserId.');
Log.error('[$receiptId] Messages should have a groupId $fromUserId.');
return (null, null);
}
@ -380,6 +392,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.groupId,
content.groupCreate,
receiptId,
);
return (null, null);
}
@ -392,12 +405,12 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
.getContactByUserId(fromUserId)
.getSingleOrNull();
Log.info(
'Contact exists?: ${contact != null} Is deleted? ${contact?.deletedByUser} Accepted? (${contact?.accepted})',
'[$receiptId] Contact exists?: ${contact != null} Is deleted? ${contact?.deletedByUser} Accepted? (${contact?.accepted})',
);
if (contact == null || !contact.accepted || contact.deletedByUser) {
await handleNewContactRequest(fromUserId);
Log.error(
'User tries to send message to direct chat while the user does not exists !',
'[$receiptId] User tries to send message to direct chat while the user does not exist!',
);
return (
EncryptedContent(
@ -411,7 +424,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
);
}
Log.info(
'Creating new DirectChat between two users',
'[$receiptId] Creating new DirectChat between two users',
);
await twonlyDB.groupsDao.createNewDirectChat(
fromUserId,
@ -422,7 +435,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
} else {
if (content.hasGroupJoin()) {
Log.error(
'Got group join message, but group does not exists yet, retry later. As probably the GroupCreate was not yet received.',
'[$receiptId] Got group join message, but group does not exist yet, retry later. As probably the GroupCreate was not yet received.',
);
// In case the group join was received before the GroupCreate the sender should send it later again.
return (
@ -432,13 +445,15 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
);
}
Log.error('User $fromUserId tried to access group ${content.groupId}.');
Log.error(
'[$receiptId] User $fromUserId tried to access group ${content.groupId}.',
);
return (null, null);
}
}
if (content.hasFlameSync()) {
await handleFlameSync(content.groupId, content.flameSync);
await handleFlameSync(content.groupId, content.flameSync, receiptId);
return (null, null);
}
@ -447,6 +462,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.groupId,
content.groupUpdate,
receiptId,
);
return (null, null);
}
@ -456,6 +472,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.groupId,
content.groupJoin,
receiptId,
)) {
return (
null,
@ -471,6 +488,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.groupId,
content.groupJoin,
receiptId,
);
return (null, null);
}
@ -480,6 +498,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.groupId,
content.additionalDataMessage,
receiptId,
);
return (null, null);
}
@ -489,6 +508,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.groupId,
content.textMessage,
receiptId,
);
return (null, null);
}
@ -498,6 +518,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.groupId,
content.reaction,
receiptId,
);
return (null, null);
}
@ -507,6 +528,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.groupId,
content.media,
receiptId,
);
return (null, null);
}
@ -516,6 +538,7 @@ Future<(EncryptedContent?, PlaintextContent?)> handleEncryptedMessage(
fromUserId,
content.groupId,
content.typingIndicator,
receiptId,
);
}

View file

@ -42,8 +42,11 @@ signalDecryptMessage(
) async {
// Hold the lock only for the cryptographic operation, not for network I/O
Log.info('Acquiring lockingSignalProtocol for $fromUserId');
final (decryptedContent, errorType, needsResync) = await lockingSignalProtocol
.protect(() async {
final (
decryptedContent,
errorType,
needsResync,
) = await lockingSignalProtocol.protect(() async {
Log.info('Lock acquired for $fromUserId');
try {
final session = SessionCipher.fromStore(
@ -80,8 +83,15 @@ signalDecryptMessage(
false,
);
} on DuplicateMessageException catch (e) {
Log.info(e.toString());
return (null, null, false);
// This is normal behavior: This can happen in case a message was decrypted, but before further processing
// the user killed the app. This results in a new transmission from the server, but as the message was already
// decrypted, this error happens. In this case, request the message again.
Log.info(e);
return (
null,
PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN,
false,
);
} on InvalidMessageException catch (e) {
Log.warn(e);
return (

View file

@ -136,7 +136,7 @@ Future<void> cleanLogFile() async {
}
final lines = await logFile.readAsLines();
final twoWeekAgo = clock.now().subtract(const Duration(days: 14));
final twoWeekAgo = clock.now().subtract(const Duration(days: 3));
var keepStartIndex = -1;
for (var i = 0; i < lines.length; i += 100) {

View file

@ -2185,7 +2185,7 @@ packages:
source: hosted
version: "0.9.1+2"
workmanager_platform_interface:
dependency: transitive
dependency: "direct dev"
description:
name: workmanager_platform_interface
sha256: f40422f10b970c67abb84230b44da22b075147637532ac501729256fcea10a47

View file

@ -3,7 +3,7 @@ description: "twonly, a privacy-friendly way to connect with friends through sec
publish_to: 'none'
version: 0.2.15+124
version: 0.2.16+125
environment:
sdk: ^3.11.0