use mutex over all signal operations

This commit is contained in:
otsmr 2026-04-22 20:56:28 +02:00
parent 0c8bd0a7b4
commit dde339d1b3
7 changed files with 160 additions and 147 deletions

View file

@ -35,6 +35,7 @@ import 'package:twonly/src/services/group.services.dart';
import 'package:twonly/src/services/notifications/fcm.notifications.dart'; import 'package:twonly/src/services/notifications/fcm.notifications.dart';
import 'package:twonly/src/services/notifications/pushkeys.notifications.dart'; import 'package:twonly/src/services/notifications/pushkeys.notifications.dart';
import 'package:twonly/src/services/signal/identity.signal.dart'; import 'package:twonly/src/services/signal/identity.signal.dart';
import 'package:twonly/src/services/signal/protocol_state.signal.dart';
import 'package:twonly/src/services/signal/utils.signal.dart'; import 'package:twonly/src/services/signal/utils.signal.dart';
import 'package:twonly/src/services/subscription.service.dart'; import 'package:twonly/src/services/subscription.service.dart';
import 'package:twonly/src/services/user.service.dart'; import 'package:twonly/src/services/user.service.dart';
@ -121,6 +122,7 @@ class ApiService {
unawaited(syncFlameCounters()); unawaited(syncFlameCounters());
unawaited(setupNotificationWithUsers()); unawaited(setupNotificationWithUsers());
unawaited(signalHandleNewServerConnection()); unawaited(signalHandleNewServerConnection());
resetResyncedUsers();
unawaited(fetchGroupStatesForUnjoinedGroups()); unawaited(fetchGroupStatesForUnjoinedGroups());
unawaited(fetchMissingGroupPublicKey()); unawaited(fetchMissingGroupPublicKey());
unawaited(checkForDeletedUsernames()); unawaited(checkForDeletedUsernames());

View file

@ -26,6 +26,8 @@ Future<void> handleErrorMessage(
requested: Value(true), requested: Value(true),
), ),
); );
case EncryptedContent_ErrorMessages_Type.SESSION_OUT_OF_SYNC:
break; // The other user initiated a new signal session, so ignore the error in this case, as the new session works...
// ignore: no_default_cases // ignore: no_default_cases
default: default:
break; break;

View file

@ -74,7 +74,6 @@ Future<void> handleServerMessage(server.ServerToClient msg) async {
} }
DateTime lastPushKeyRequest = clock.now().subtract(const Duration(hours: 1)); DateTime lastPushKeyRequest = clock.now().subtract(const Duration(hours: 1));
bool alreadyPerformedAnResync = false;
Mutex protectReceiptCheck = Mutex(); Mutex protectReceiptCheck = Mutex();

View file

@ -3,21 +3,18 @@ import 'dart:typed_data';
import 'package:libsignal_protocol_dart/libsignal_protocol_dart.dart'; import 'package:libsignal_protocol_dart/libsignal_protocol_dart.dart';
// ignore: implementation_imports // ignore: implementation_imports
import 'package:libsignal_protocol_dart/src/invalid_message_exception.dart'; import 'package:libsignal_protocol_dart/src/invalid_message_exception.dart';
import 'package:mutex/mutex.dart';
import 'package:twonly/src/model/protobuf/client/generated/messages.pb.dart'; import 'package:twonly/src/model/protobuf/client/generated/messages.pb.dart';
import 'package:twonly/src/services/api/messages.api.dart'; import 'package:twonly/src/services/api/messages.api.dart';
import 'package:twonly/src/services/signal/protocol_state.signal.dart';
import 'package:twonly/src/services/signal/session.signal.dart'; import 'package:twonly/src/services/signal/session.signal.dart';
import 'package:twonly/src/services/signal/utils.signal.dart'; import 'package:twonly/src/services/signal/utils.signal.dart';
import 'package:twonly/src/utils/log.dart'; import 'package:twonly/src/utils/log.dart';
/// This caused some troubles, so protection the encryption...
final lockingSignalEncryption = Mutex();
Future<CiphertextMessage?> signalEncryptMessage( Future<CiphertextMessage?> signalEncryptMessage(
int target, int target,
Uint8List plaintextContent, Uint8List plaintextContent,
) async { ) async {
return lockingSignalEncryption.protect<CiphertextMessage?>(() async { return lockingSignalProtocol.protect<CiphertextMessage?>(() async {
try { try {
final signalStore = (await getSignalStore())!; final signalStore = (await getSignalStore())!;
final address = getSignalAddress(target); final address = getSignalAddress(target);
@ -30,62 +27,62 @@ Future<CiphertextMessage?> signalEncryptMessage(
}); });
} }
bool alreadyPerformedAnResync = false;
Future<(EncryptedContent?, PlaintextContent_DecryptionErrorMessage_Type?)> Future<(EncryptedContent?, PlaintextContent_DecryptionErrorMessage_Type?)>
signalDecryptMessage( signalDecryptMessage(
int fromUserId, int fromUserId,
Uint8List encryptedContentRaw, Uint8List encryptedContentRaw,
int type, int type,
) async { ) async {
try { return lockingSignalProtocol.protect(() async {
final session = SessionCipher.fromStore( try {
(await getSignalStore())!, final session = SessionCipher.fromStore(
getSignalAddress(fromUserId), (await getSignalStore())!,
); getSignalAddress(fromUserId),
);
Uint8List plaintext; Uint8List plaintext;
switch (type) { switch (type) {
case CiphertextMessage.prekeyType: case CiphertextMessage.prekeyType:
plaintext = await session.decrypt( plaintext = await session.decrypt(
PreKeySignalMessage(encryptedContentRaw), PreKeySignalMessage(encryptedContentRaw),
); );
case CiphertextMessage.whisperType: case CiphertextMessage.whisperType:
plaintext = await session.decryptFromSignal( plaintext = await session.decryptFromSignal(
SignalMessage.fromSerialized(encryptedContentRaw), SignalMessage.fromSerialized(encryptedContentRaw),
); );
default: default:
Log.error('Unknown Message Decryption Type: $type'); Log.error('Unknown Message Decryption Type: $type');
return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN); return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN);
}
return (EncryptedContent.fromBuffer(plaintext), null);
} on InvalidKeyIdException catch (e) {
Log.warn(e);
return (null, PlaintextContent_DecryptionErrorMessage_Type.PREKEY_UNKNOWN);
} on InvalidMessageException catch (e) {
if (!alreadyPerformedAnResync) {
if (await handleSessionResync(fromUserId)) {
// This flag prevents from resyncing the session the client received multiple new
// messages from the server he could not decrypt
alreadyPerformedAnResync = true;
// This message contains a new PreKeyBundle establishing a new signal session
await sendCipherText(
fromUserId,
EncryptedContent(
errorMessages: EncryptedContent_ErrorMessages(
type: EncryptedContent_ErrorMessages_Type.SESSION_OUT_OF_SYNC,
),
),
);
} }
return (EncryptedContent.fromBuffer(plaintext), null);
} on InvalidKeyIdException catch (e) {
Log.warn(e);
return (null, PlaintextContent_DecryptionErrorMessage_Type.PREKEY_UNKNOWN);
} on InvalidMessageException catch (e) {
if (!resyncedUsers.contains(fromUserId)) {
if (await handleSessionResync(fromUserId)) {
// This flag prevents from resyncing the session the client received multiple new
// messages from the server he could not decrypt
resyncedUsers.add(fromUserId);
// This message contains a new PreKeyBundle establishing a new signal session
await sendCipherText(
fromUserId,
EncryptedContent(
errorMessages: EncryptedContent_ErrorMessages(
type: EncryptedContent_ErrorMessages_Type.SESSION_OUT_OF_SYNC,
),
),
);
}
}
Log.warn(e);
return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN);
} catch (e) {
Log.error(e);
return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN);
} }
Log.warn(e); });
return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN);
} catch (e) {
Log.error(e);
return (null, PlaintextContent_DecryptionErrorMessage_Type.UNKNOWN);
}
} }

View file

@ -9,6 +9,7 @@ import 'package:twonly/src/constants/secure_storage.keys.dart';
import 'package:twonly/src/database/signal/signal_protocol_store.dart'; import 'package:twonly/src/database/signal/signal_protocol_store.dart';
import 'package:twonly/src/model/json/signal_identity.model.dart'; import 'package:twonly/src/model/json/signal_identity.model.dart';
import 'package:twonly/src/services/signal/consts.signal.dart'; import 'package:twonly/src/services/signal/consts.signal.dart';
import 'package:twonly/src/services/signal/protocol_state.signal.dart';
import 'package:twonly/src/services/signal/utils.signal.dart'; import 'package:twonly/src/services/signal/utils.signal.dart';
import 'package:twonly/src/services/user.service.dart'; import 'package:twonly/src/services/user.service.dart';
import 'package:twonly/src/utils/log.dart'; import 'package:twonly/src/utils/log.dart';
@ -57,21 +58,23 @@ Future<void> signalHandleNewServerConnection() async {
} }
Future<List<PreKeyRecord>> signalGetPreKeys() async { Future<List<PreKeyRecord>> signalGetPreKeys() async {
final user = await getUser(); return lockingSignalProtocol.protect(() async {
if (user == null) return []; final user = await getUser();
if (user == null) return [];
final start = user.currentPreKeyIndexStart; final start = user.currentPreKeyIndexStart;
await updateUser((user) { await updateUser((user) {
user.currentPreKeyIndexStart = user.currentPreKeyIndexStart =
(user.currentPreKeyIndexStart + 200) % maxValue; (user.currentPreKeyIndexStart + 200) % maxValue;
});
final preKeys = generatePreKeys(start, 200);
final signalStore = await getSignalStore();
if (signalStore == null) return [];
for (final p in preKeys) {
await signalStore.preKeyStore.storePreKey(p.id, p);
}
return preKeys;
}); });
final preKeys = generatePreKeys(start, 200);
final signalStore = await getSignalStore();
if (signalStore == null) return [];
for (final p in preKeys) {
await signalStore.preKeyStore.storePreKey(p.id, p);
}
return preKeys;
} }
Future<SignalIdentity?> getSignalIdentity() async { Future<SignalIdentity?> getSignalIdentity() async {
@ -136,26 +139,28 @@ Future<void> createIfNotExistsSignalIdentity() async {
} }
Future<SignedPreKeyRecord?> _getNewSignalSignedPreKey() async { Future<SignedPreKeyRecord?> _getNewSignalSignedPreKey() async {
var identityKeyPair = await getSignalIdentityKeyPair(); return lockingSignalProtocol.protect(() async {
final user = await getUser(); var identityKeyPair = await getSignalIdentityKeyPair();
final signalStore = await getSignalStore(); final user = await getUser();
if (identityKeyPair == null || signalStore == null || user == null) { final signalStore = await getSignalStore();
return null; if (identityKeyPair == null || signalStore == null || user == null) {
} return null;
}
final signedPreKeyId = user.currentSignedPreKeyIndexStart; final signedPreKeyId = user.currentSignedPreKeyIndexStart;
await updateUser((user) { await updateUser((user) {
user.currentSignedPreKeyIndexStart += 1; user.currentSignedPreKeyIndexStart += 1;
});
final signedPreKey = generateSignedPreKey(
identityKeyPair,
signedPreKeyId,
);
identityKeyPair = null;
await signalStore.storeSignedPreKey(signedPreKeyId, signedPreKey);
return signedPreKey;
}); });
final signedPreKey = generateSignedPreKey(
identityKeyPair,
signedPreKeyId,
);
identityKeyPair = null;
await signalStore.storeSignedPreKey(signedPreKeyId, signedPreKey);
return signedPreKey;
} }

View file

@ -0,0 +1,12 @@
import 'package:mutex/mutex.dart';
/// Unified lock for all Signal protocol operations (encryption, decryption, session management).
final lockingSignalProtocol = Mutex();
/// Tracking users who have already been resynced in the current session.
final resyncedUsers = <int>{};
/// Reset the resync tracking set.
void resetResyncedUsers() {
resyncedUsers.clear();
}

View file

@ -4,81 +4,77 @@ import 'package:libsignal_protocol_dart/libsignal_protocol_dart.dart';
import 'package:twonly/locator.dart'; import 'package:twonly/locator.dart';
import 'package:twonly/src/model/protobuf/api/websocket/server_to_client.pb.dart'; import 'package:twonly/src/model/protobuf/api/websocket/server_to_client.pb.dart';
import 'package:twonly/src/services/signal/consts.signal.dart'; import 'package:twonly/src/services/signal/consts.signal.dart';
import 'package:twonly/src/services/signal/protocol_state.signal.dart';
import 'package:twonly/src/services/signal/utils.signal.dart'; import 'package:twonly/src/services/signal/utils.signal.dart';
import 'package:twonly/src/utils/log.dart'; import 'package:twonly/src/utils/log.dart';
Future<bool> processSignalUserData(Response_UserData userData) async { Future<bool> processSignalUserData(Response_UserData userData) async {
final SignalProtocolStore? signalStore = await getSignalStore(); return lockingSignalProtocol.protect(() async {
final SignalProtocolStore? signalStore = await getSignalStore();
if (signalStore == null) { if (signalStore == null) {
return false; return false;
} }
final targetAddress = getSignalAddress(userData.userId.toInt()); final targetAddress = getSignalAddress(userData.userId.toInt());
final sessionBuilder = SessionBuilder.fromSignalStore( final sessionBuilder = SessionBuilder.fromSignalStore(
signalStore, signalStore,
targetAddress, targetAddress,
); );
ECPublicKey? tempPrePublicKey; ECPublicKey? tempPrePublicKey;
int? tempPreKeyId; int? tempPreKeyId;
if (userData.prekeys.isNotEmpty) { if (userData.prekeys.isNotEmpty) {
tempPrePublicKey = Curve.decodePoint( tempPrePublicKey = Curve.decodePoint(
DjbECPublicKey( DjbECPublicKey(
Uint8List.fromList(userData.prekeys.first.prekey), Uint8List.fromList(userData.prekeys.first.prekey),
).serialize(), ).serialize(),
1,
);
tempPreKeyId = userData.prekeys.first.id.toInt();
}
final tempSignedPreKeyId = userData.signedPrekeyId.toInt();
final tempSignedPreKeyPublic = Curve.decodePoint(
DjbECPublicKey(Uint8List.fromList(userData.signedPrekey)).serialize(),
1, 1,
); );
tempPreKeyId = userData.prekeys.first.id.toInt();
}
final tempSignedPreKeyId = userData.signedPrekeyId.toInt(); final tempSignedPreKeySignature = Uint8List.fromList(
userData.signedPrekeySignature,
);
final tempSignedPreKeyPublic = Curve.decodePoint( final tempIdentityKey = IdentityKey(
DjbECPublicKey(Uint8List.fromList(userData.signedPrekey)).serialize(), Curve.decodePoint(
1, DjbECPublicKey(
); Uint8List.fromList(userData.publicIdentityKey),
).serialize(),
1,
),
);
final tempSignedPreKeySignature = Uint8List.fromList( final preKeyBundle = PreKeyBundle(
userData.signedPrekeySignature, userData.registrationId.toInt(),
); defaultDeviceId,
tempPreKeyId,
tempPrePublicKey,
tempSignedPreKeyId,
tempSignedPreKeyPublic,
tempSignedPreKeySignature,
tempIdentityKey,
);
final tempIdentityKey = IdentityKey( try {
Curve.decodePoint( await sessionBuilder.processPreKeyBundle(preKeyBundle);
DjbECPublicKey( return true;
Uint8List.fromList(userData.publicIdentityKey), } catch (e) {
).serialize(), Log.error('could not process pre key bundle: $e');
1, return false;
), }
); });
final preKeyBundle = PreKeyBundle(
userData.registrationId.toInt(),
defaultDeviceId,
tempPreKeyId,
tempPrePublicKey,
tempSignedPreKeyId,
tempSignedPreKeyPublic,
tempSignedPreKeySignature,
tempIdentityKey,
);
try {
await sessionBuilder.processPreKeyBundle(preKeyBundle);
return true;
} catch (e) {
Log.error('could not process pre key bundle: $e');
return false;
}
}
Future<void> deleteSessionWithTarget(int target) async {
final signalStore = await getSignalStore();
if (signalStore == null) return;
final address = SignalProtocolAddress(target.toString(), defaultDeviceId);
await signalStore.sessionStore.deleteSession(address);
} }
Future<Uint8List?> getPublicKeyFromContact(int contactId) async { Future<Uint8List?> getPublicKeyFromContact(int contactId) async {