fix deadlock

This commit is contained in:
otsmr 2026-05-13 12:34:40 +02:00
parent 09129639e1
commit f45638c58d
5 changed files with 89 additions and 83 deletions

View file

@ -7,6 +7,8 @@
- Improved: Redesigned snackbar notifications - Improved: Redesigned snackbar notifications
- Improved: New backup mechanism to allow larger backup files - Improved: New backup mechanism to allow larger backup files
- Improved: Move keys into a centralized Rust-owned structure stored in secure storage - Improved: Move keys into a centralized Rust-owned structure stored in secure storage
- Fix: Messages occasionally not received until app restart
- Fix: Multiple smaller issues
## 0.2.10 ## 0.2.10

View file

@ -95,6 +95,7 @@ class ApiService {
try { try {
final channel = IOWebSocketChannel.connect( final channel = IOWebSocketChannel.connect(
Uri.parse(apiUrl), Uri.parse(apiUrl),
pingInterval: const Duration(seconds: 30),
); );
_channel = channel; _channel = channel;
_channel!.stream.listen(_onData, onDone: _onDone, onError: _onError); _channel!.stream.listen(_onData, onDone: _onDone, onError: _onError);
@ -247,11 +248,11 @@ class ApiService {
try { try {
final msg = server.ServerToClient.fromBuffer(msgBuffer as Uint8List); final msg = server.ServerToClient.fromBuffer(msgBuffer as Uint8List);
if (msg.v0.hasResponse()) { if (msg.v0.hasResponse()) {
await removeFromRetransmissionBuffer(msg.v0.seq);
final completer = _pendingRequests.remove(msg.v0.seq); final completer = _pendingRequests.remove(msg.v0.seq);
if (completer != null && !completer.isCompleted) { if (completer != null && !completer.isCompleted) {
completer.complete(msg); completer.complete(msg);
} }
unawaited(removeFromRetransmissionBuffer(msg.v0.seq));
} else { } else {
unawaited(handleServerMessage(msg)); unawaited(handleServerMessage(msg));
} }

View file

@ -67,7 +67,6 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
Receipt? receipt, Receipt? receipt,
bool onlyReturnEncryptedData = false, bool onlyReturnEncryptedData = false,
bool blocking = true, bool blocking = true,
bool useLock = true,
}) async { }) async {
if (apiService.appIsOutdated) return null; if (apiService.appIsOutdated) return null;
@ -135,7 +134,6 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
final cipherText = await signalEncryptMessage( final cipherText = await signalEncryptMessage(
receipt.contactId, receipt.contactId,
Uint8List.fromList(message.encryptedContent), Uint8List.fromList(message.encryptedContent),
useLock: useLock,
); );
if (cipherText == null) { if (cipherText == null) {
Log.error('Could not encrypt the message. Aborting and trying again.'); Log.error('Could not encrypt the message. Aborting and trying again.');
@ -340,7 +338,6 @@ Future<(Uint8List, Uint8List?)?> sendCipherText(
bool blocking = true, bool blocking = true,
String? messageId, String? messageId,
bool onlySendIfNoReceiptsAreOpen = false, bool onlySendIfNoReceiptsAreOpen = false,
bool useLock = true,
}) async { }) async {
if (onlySendIfNoReceiptsAreOpen) { if (onlySendIfNoReceiptsAreOpen) {
final openReceipts = await twonlyDB.receiptsDao.getReceiptCountForContact( final openReceipts = await twonlyDB.receiptsDao.getReceiptCountForContact(
@ -402,7 +399,6 @@ Future<(Uint8List, Uint8List?)?> sendCipherText(
receipt: receipt, receipt: receipt,
onlyReturnEncryptedData: onlyReturnEncryptedData, onlyReturnEncryptedData: onlyReturnEncryptedData,
blocking: blocking, blocking: blocking,
useLock: useLock,
); );
if (!blocking) { if (!blocking) {
return null; return null;

View file

@ -12,15 +12,11 @@ import 'package:twonly/src/utils/log.dart';
Future<CiphertextMessage?> signalEncryptMessage( Future<CiphertextMessage?> signalEncryptMessage(
int target, int target,
Uint8List plaintextContent, { Uint8List plaintextContent,
bool useLock = true, ) async {
}) async {
if (useLock) {
return lockingSignalProtocol.protect<CiphertextMessage?>(() async { return lockingSignalProtocol.protect<CiphertextMessage?>(() async {
return _signalEncryptMessage(target, plaintextContent); return _signalEncryptMessage(target, plaintextContent);
}); });
}
return _signalEncryptMessage(target, plaintextContent);
} }
Future<CiphertextMessage?> _signalEncryptMessage( Future<CiphertextMessage?> _signalEncryptMessage(
@ -44,7 +40,9 @@ signalDecryptMessage(
Uint8List encryptedContentRaw, Uint8List encryptedContentRaw,
int type, int type,
) async { ) async {
return lockingSignalProtocol.protect(() async { // Hold the lock only for the cryptographic operation, not for network I/O
final (decryptedContent, errorType, needsResync) = await lockingSignalProtocol
.protect(() async {
try { try {
final session = SessionCipher.fromStore( final session = SessionCipher.fromStore(
(await getSignalStore())!, (await getSignalStore())!,
@ -64,28 +62,51 @@ signalDecryptMessage(
); );
default: default:
Log.error('Unknown Message Decryption Type: $type'); Log.error('Unknown Message Decryption Type: $type');
return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN); return (
null,
PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN,
false,
);
} }
return (EncryptedContent.fromBuffer(plaintext), null); return (EncryptedContent.fromBuffer(plaintext), null, false);
} on InvalidKeyIdException catch (e) { } on InvalidKeyIdException catch (e) {
Log.warn(e); Log.warn(e);
return ( return (
null, null,
PlaintextContent_DecryptionErrorMessage_Type.PREKEY_UNKNOWN, PlaintextContent_DecryptionErrorMessage_Type.PREKEY_UNKNOWN,
false,
); );
} on DuplicateMessageException catch (e) { } on DuplicateMessageException catch (e) {
Log.info(e.toString()); Log.info(e.toString());
return (null, null); return (null, null, false);
} on InvalidMessageException catch (e) { } on InvalidMessageException catch (e) {
Log.warn(e); Log.warn(e);
if (!resyncedUsers.contains(fromUserId)) { return (
if (await handleSessionResync(fromUserId, useLock: false)) { null,
// This flag prevents from resyncing the session the client received multiple new PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN,
// messages from the server he could not decrypt true,
);
} catch (e) {
Log.error(e);
return (
null,
PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN,
false,
);
}
});
// Handle session resync OUTSIDE the lock to avoid holding it during
// network round-trips (which can block for up to 60 seconds)
if (needsResync && !resyncedUsers.contains(fromUserId)) {
if (await handleSessionResync(fromUserId)) {
// This flag prevents from resyncing the session the client received
// multiple new messages from the server he could not decrypt
resyncedUsers.add(fromUserId); resyncedUsers.add(fromUserId);
// This message contains a new PreKeyBundle establishing a new signal session // This message contains a new PreKeyBundle establishing a new signal
// session
await sendCipherText( await sendCipherText(
fromUserId, fromUserId,
EncryptedContent( EncryptedContent(
@ -93,14 +114,9 @@ signalDecryptMessage(
type: EncryptedContent_ErrorMessages_Type.SESSION_OUT_OF_SYNC, type: EncryptedContent_ErrorMessages_Type.SESSION_OUT_OF_SYNC,
), ),
), ),
useLock: false,
); );
} }
} }
return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN);
} catch (e) { return (decryptedContent, errorType);
Log.error(e);
return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN);
}
});
} }

View file

@ -8,16 +8,10 @@ import 'package:twonly/src/services/signal/protocol_state.signal.dart';
import 'package:twonly/src/services/signal/utils.signal.dart'; import 'package:twonly/src/services/signal/utils.signal.dart';
import 'package:twonly/src/utils/log.dart'; import 'package:twonly/src/utils/log.dart';
Future<bool> processSignalUserData( Future<bool> processSignalUserData(Response_UserData userData) async {
Response_UserData userData, {
bool useLock = true,
}) async {
if (useLock) {
return lockingSignalProtocol.protect(() async { return lockingSignalProtocol.protect(() async {
return _processSignalUserData(userData); return _processSignalUserData(userData);
}); });
}
return _processSignalUserData(userData);
} }
Future<bool> _processSignalUserData(Response_UserData userData) async { Future<bool> _processSignalUserData(Response_UserData userData) async {
@ -106,14 +100,11 @@ Future<Uint8List?> getPublicKeyFromContact(int contactId) async {
} }
} }
Future<bool> handleSessionResync( Future<bool> handleSessionResync(int fromUserId) async {
int fromUserId, {
bool useLock = true,
}) async {
final userData = await apiService.getUserById(fromUserId); final userData = await apiService.getUserById(fromUserId);
if (userData != null) { if (userData != null) {
Log.info('Got new session data from the server to re-sync the session'); Log.info('Got new session data from the server to re-sync the session');
return processSignalUserData(userData, useLock: useLock); return processSignalUserData(userData);
} }
Log.info('Could not download userdata from the server.'); Log.info('Could not download userdata from the server.');
return false; return false;