fix deadlock issue

This commit is contained in:
otsmr 2026-05-04 23:33:42 +02:00
parent cdababa3c8
commit 0a972d023f
6 changed files with 122 additions and 82 deletions

View file

@ -145,6 +145,7 @@ class ApiService {
} }
Future<void> onClosed() async { Future<void> onClosed() async {
Log.info('websocket connection closed');
_channel = null; _channel = null;
isAuthenticated = false; isAuthenticated = false;
_connectionStateController.add(false); _connectionStateController.add(false);
@ -249,7 +250,7 @@ class ApiService {
completer.complete(msg); completer.complete(msg);
} }
} else { } else {
await handleServerMessage(msg); unawaited(handleServerMessage(msg));
} }
} catch (e) { } catch (e) {
Log.error('Error parsing the servers message: $e'); Log.error('Error parsing the servers message: $e');

View file

@ -67,6 +67,7 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
Receipt? receipt, Receipt? receipt,
bool onlyReturnEncryptedData = false, bool onlyReturnEncryptedData = false,
bool blocking = true, bool blocking = true,
bool useLock = true,
}) async { }) async {
try { try {
if (receiptId == null && receipt == null) return null; if (receiptId == null && receipt == null) return null;
@ -132,6 +133,7 @@ Future<(Uint8List, Uint8List?)?> tryToSendCompleteMessage({
final cipherText = await signalEncryptMessage( final cipherText = await signalEncryptMessage(
receipt.contactId, receipt.contactId,
Uint8List.fromList(message.encryptedContent), Uint8List.fromList(message.encryptedContent),
useLock: useLock,
); );
if (cipherText == null) { if (cipherText == null) {
Log.error('Could not encrypt the message. Aborting and trying again.'); Log.error('Could not encrypt the message. Aborting and trying again.');
@ -336,6 +338,7 @@ Future<(Uint8List, Uint8List?)?> sendCipherText(
bool blocking = true, bool blocking = true,
String? messageId, String? messageId,
bool onlySendIfNoReceiptsAreOpen = false, bool onlySendIfNoReceiptsAreOpen = false,
bool useLock = true,
}) async { }) async {
if (onlySendIfNoReceiptsAreOpen) { if (onlySendIfNoReceiptsAreOpen) {
final openReceipts = await twonlyDB.receiptsDao.getReceiptCountForContact( final openReceipts = await twonlyDB.receiptsDao.getReceiptCountForContact(
@ -397,6 +400,7 @@ Future<(Uint8List, Uint8List?)?> sendCipherText(
receipt: receipt, receipt: receipt,
onlyReturnEncryptedData: onlyReturnEncryptedData, onlyReturnEncryptedData: onlyReturnEncryptedData,
blocking: blocking, blocking: blocking,
useLock: useLock,
); );
if (!blocking) { if (!blocking) {
return null; return null;

View file

@ -40,6 +40,8 @@ final lockHandleServerMessage = Mutex();
Future<void> handleServerMessage(server.ServerToClient msg) async { Future<void> handleServerMessage(server.ServerToClient msg) async {
return lockHandleServerMessage.protect(() async { return lockHandleServerMessage.protect(() async {
Log.info('Processing a message from the server.');
/// Returns means, that the server can delete the message from the server. /// Returns means, that the server can delete the message from the server.
final ok = client.Response_Ok()..none = true; final ok = client.Response_Ok()..none = true;
var response = client.Response()..ok = ok; var response = client.Response()..ok = ok;
@ -48,8 +50,12 @@ Future<void> handleServerMessage(server.ServerToClient msg) async {
if (msg.v0.hasRequestNewPreKeys()) { if (msg.v0.hasRequestNewPreKeys()) {
response = await handleRequestNewPreKey(); response = await handleRequestNewPreKey();
} else if (msg.v0.hasNewMessage()) { } else if (msg.v0.hasNewMessage()) {
Log.info('Got 1 message from the server.');
await handleClient2ClientMessage(msg.v0.newMessage); await handleClient2ClientMessage(msg.v0.newMessage);
} else if (msg.v0.hasNewMessages()) { } else if (msg.v0.hasNewMessages()) {
Log.info(
'Got ${msg.v0.newMessages.newMessages.length} messages from the server.',
);
for (final newMessage in msg.v0.newMessages.newMessages) { for (final newMessage in msg.v0.newMessages.newMessages) {
try { try {
await handleClient2ClientMessage(newMessage); await handleClient2ClientMessage(newMessage);
@ -70,13 +76,12 @@ Future<void> handleServerMessage(server.ServerToClient msg) async {
await apiService.sendResponse(ClientToServer()..v0 = v0); await apiService.sendResponse(ClientToServer()..v0 = v0);
AppState.gotMessageFromServer = true; AppState.gotMessageFromServer = true;
Log.info('Message from server proccessed.');
}); });
} }
DateTime lastPushKeyRequest = clock.now().subtract(const Duration(hours: 1)); DateTime lastPushKeyRequest = clock.now().subtract(const Duration(hours: 1));
Mutex protectReceiptCheck = Mutex();
Future<void> handleClient2ClientMessage(NewMessage newMessage) async { Future<void> handleClient2ClientMessage(NewMessage newMessage) async {
final body = Uint8List.fromList(newMessage.body); final body = Uint8List.fromList(newMessage.body);
final fromUserId = newMessage.fromUserId.toInt(); final fromUserId = newMessage.fromUserId.toInt();
@ -84,15 +89,15 @@ Future<void> handleClient2ClientMessage(NewMessage newMessage) async {
final message = Message.fromBuffer(body); final message = Message.fromBuffer(body);
final receiptId = message.receiptId; final receiptId = message.receiptId;
final isDuplicated = await protectReceiptCheck.protect(() async { if (await twonlyDB.receiptsDao.isDuplicated(receiptId)) {
if (await twonlyDB.receiptsDao.isDuplicated(receiptId)) { return;
return true; }
}
await twonlyDB.receiptsDao.gotReceipt(receiptId);
return false;
});
if (isDuplicated) { try {
await twonlyDB.receiptsDao.gotReceipt(receiptId);
Log.info('Got a message with receiptId $receiptId');
} catch (e) {
Log.error(e);
return; return;
} }

View file

@ -11,20 +11,31 @@ import 'package:twonly/src/services/signal/utils.signal.dart';
import 'package:twonly/src/utils/log.dart'; import 'package:twonly/src/utils/log.dart';
Future<CiphertextMessage?> signalEncryptMessage( Future<CiphertextMessage?> signalEncryptMessage(
int target,
Uint8List plaintextContent, {
bool useLock = true,
}) async {
if (useLock) {
return lockingSignalProtocol.protect<CiphertextMessage?>(() async {
return _signalEncryptMessage(target, plaintextContent);
});
}
return _signalEncryptMessage(target, plaintextContent);
}
Future<CiphertextMessage?> _signalEncryptMessage(
int target, int target,
Uint8List plaintextContent, Uint8List plaintextContent,
) async { ) async {
return lockingSignalProtocol.protect<CiphertextMessage?>(() async { try {
try { final signalStore = (await getSignalStore())!;
final signalStore = (await getSignalStore())!; final address = getSignalAddress(target);
final address = getSignalAddress(target); final session = SessionCipher.fromStore(signalStore, address);
final session = SessionCipher.fromStore(signalStore, address); return await session.encrypt(plaintextContent);
return await session.encrypt(plaintextContent); } catch (e) {
} catch (e) { Log.error(e.toString());
Log.error(e.toString()); return null;
return null; }
}
});
} }
Future<(EncryptedContent?, PlaintextContent_DecryptionErrorMessage_Type?)> Future<(EncryptedContent?, PlaintextContent_DecryptionErrorMessage_Type?)>
@ -67,8 +78,9 @@ signalDecryptMessage(
Log.info(e.toString()); Log.info(e.toString());
return (null, null); return (null, null);
} on InvalidMessageException catch (e) { } on InvalidMessageException catch (e) {
Log.warn(e);
if (!resyncedUsers.contains(fromUserId)) { if (!resyncedUsers.contains(fromUserId)) {
if (await handleSessionResync(fromUserId)) { if (await handleSessionResync(fromUserId, useLock: false)) {
// This flag prevents from resyncing the session the client received multiple new // This flag prevents from resyncing the session the client received multiple new
// messages from the server he could not decrypt // messages from the server he could not decrypt
resyncedUsers.add(fromUserId); resyncedUsers.add(fromUserId);
@ -81,10 +93,10 @@ signalDecryptMessage(
type: EncryptedContent_ErrorMessages_Type.SESSION_OUT_OF_SYNC, type: EncryptedContent_ErrorMessages_Type.SESSION_OUT_OF_SYNC,
), ),
), ),
useLock: false,
); );
} }
} }
Log.warn(e);
return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN); return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN);
} catch (e) { } catch (e) {
Log.error(e); Log.error(e);

View file

@ -91,9 +91,14 @@ Future<SignalIdentity?> getSignalIdentity() async {
} }
Future<Uint8List> getUserPublicKey() async { Future<Uint8List> getUserPublicKey() async {
Log.info('getUserPublicKey: getting identity');
final signalIdentity = (await getSignalIdentity())!; final signalIdentity = (await getSignalIdentity())!;
Log.info('getUserPublicKey: getting signal store');
final signalStore = await getSignalStoreFromIdentity(signalIdentity); final signalStore = await getSignalStoreFromIdentity(signalIdentity);
return (await signalStore.getIdentityKeyPair()).getPublicKey().serialize(); Log.info('getUserPublicKey: getting key pair');
final keyPair = await signalStore.getIdentityKeyPair();
Log.info('getUserPublicKey: serializing public key');
return keyPair.getPublicKey().serialize();
} }
Future<void> createIfNotExistsSignalIdentity() async { Future<void> createIfNotExistsSignalIdentity() async {

View file

@ -8,73 +8,83 @@ import 'package:twonly/src/services/signal/protocol_state.signal.dart';
import 'package:twonly/src/services/signal/utils.signal.dart'; import 'package:twonly/src/services/signal/utils.signal.dart';
import 'package:twonly/src/utils/log.dart'; import 'package:twonly/src/utils/log.dart';
Future<bool> processSignalUserData(Response_UserData userData) async { Future<bool> processSignalUserData(
return lockingSignalProtocol.protect(() async { Response_UserData userData, {
final SignalProtocolStore? signalStore = await getSignalStore(); bool useLock = true,
}) async {
if (useLock) {
return lockingSignalProtocol.protect(() async {
return _processSignalUserData(userData);
});
}
return _processSignalUserData(userData);
}
if (signalStore == null) { Future<bool> _processSignalUserData(Response_UserData userData) async {
return false; final SignalProtocolStore? signalStore = await getSignalStore();
}
final targetAddress = getSignalAddress(userData.userId.toInt()); if (signalStore == null) {
return false;
}
final sessionBuilder = SessionBuilder.fromSignalStore( final targetAddress = getSignalAddress(userData.userId.toInt());
signalStore,
targetAddress,
);
ECPublicKey? tempPrePublicKey; final sessionBuilder = SessionBuilder.fromSignalStore(
int? tempPreKeyId; signalStore,
targetAddress,
);
if (userData.prekeys.isNotEmpty) { ECPublicKey? tempPrePublicKey;
tempPrePublicKey = Curve.decodePoint( int? tempPreKeyId;
DjbECPublicKey(
Uint8List.fromList(userData.prekeys.first.prekey),
).serialize(),
1,
);
tempPreKeyId = userData.prekeys.first.id.toInt();
}
final tempSignedPreKeyId = userData.signedPrekeyId.toInt(); if (userData.prekeys.isNotEmpty) {
tempPrePublicKey = Curve.decodePoint(
final tempSignedPreKeyPublic = Curve.decodePoint( DjbECPublicKey(
DjbECPublicKey(Uint8List.fromList(userData.signedPrekey)).serialize(), Uint8List.fromList(userData.prekeys.first.prekey),
).serialize(),
1, 1,
); );
tempPreKeyId = userData.prekeys.first.id.toInt();
}
final tempSignedPreKeySignature = Uint8List.fromList( final tempSignedPreKeyId = userData.signedPrekeyId.toInt();
userData.signedPrekeySignature,
);
final tempIdentityKey = IdentityKey( final tempSignedPreKeyPublic = Curve.decodePoint(
Curve.decodePoint( DjbECPublicKey(Uint8List.fromList(userData.signedPrekey)).serialize(),
DjbECPublicKey( 1,
Uint8List.fromList(userData.publicIdentityKey), );
).serialize(),
1,
),
);
final preKeyBundle = PreKeyBundle( final tempSignedPreKeySignature = Uint8List.fromList(
userData.registrationId.toInt(), userData.signedPrekeySignature,
defaultDeviceId, );
tempPreKeyId,
tempPrePublicKey,
tempSignedPreKeyId,
tempSignedPreKeyPublic,
tempSignedPreKeySignature,
tempIdentityKey,
);
try { final tempIdentityKey = IdentityKey(
await sessionBuilder.processPreKeyBundle(preKeyBundle); Curve.decodePoint(
return true; DjbECPublicKey(
} catch (e) { Uint8List.fromList(userData.publicIdentityKey),
Log.error('could not process pre key bundle: $e'); ).serialize(),
return false; 1,
} ),
}); );
final preKeyBundle = PreKeyBundle(
userData.registrationId.toInt(),
defaultDeviceId,
tempPreKeyId,
tempPrePublicKey,
tempSignedPreKeyId,
tempSignedPreKeyPublic,
tempSignedPreKeySignature,
tempIdentityKey,
);
try {
await sessionBuilder.processPreKeyBundle(preKeyBundle);
return true;
} catch (e) {
Log.error('could not process pre key bundle: $e');
return false;
}
} }
Future<Uint8List?> getPublicKeyFromContact(int contactId) async { Future<Uint8List?> getPublicKeyFromContact(int contactId) async {
@ -96,11 +106,14 @@ Future<Uint8List?> getPublicKeyFromContact(int contactId) async {
} }
} }
Future<bool> handleSessionResync(int fromUserId) async { Future<bool> handleSessionResync(
int fromUserId, {
bool useLock = true,
}) async {
final userData = await apiService.getUserById(fromUserId); final userData = await apiService.getUserById(fromUserId);
if (userData != null) { if (userData != null) {
Log.info('Got new session data from the server to re-sync the session'); Log.info('Got new session data from the server to re-sync the session');
return processSignalUserData(userData); return processSignalUserData(userData, useLock: useLock);
} }
Log.info('Could not download userdata from the server.'); Log.info('Could not download userdata from the server.');
return false; return false;