added mutext protection for critical code

This commit is contained in:
otsmr 2025-04-07 23:56:02 +02:00
parent e005d01177
commit bdc4780d30
5 changed files with 113 additions and 100 deletions

1
ios/build/.last_build_id Normal file
View file

@ -0,0 +1 @@
ce49e7d90cd902197f9a9cbc84219d23

View file

@ -3,6 +3,7 @@ import 'dart:math';
import 'package:drift/drift.dart'; import 'package:drift/drift.dart';
import 'package:hive/hive.dart'; import 'package:hive/hive.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:mutex/mutex.dart';
import 'package:twonly/globals.dart'; import 'package:twonly/globals.dart';
import 'package:twonly/src/database/twonly_database.dart'; import 'package:twonly/src/database/twonly_database.dart';
import 'package:twonly/src/database/tables/messages_table.dart'; import 'package:twonly/src/database/tables/messages_table.dart';
@ -16,40 +17,44 @@ import 'package:twonly/src/services/notification_service.dart';
import 'package:twonly/src/utils/signal.dart' as SignalHelper; import 'package:twonly/src/utils/signal.dart' as SignalHelper;
import 'package:twonly/src/utils/storage.dart'; import 'package:twonly/src/utils/storage.dart';
final lockSendingMessages = Mutex();
Future tryTransmitMessages() async { Future tryTransmitMessages() async {
Map<String, dynamic> retransmit = await getAllMessagesForRetransmitting(); lockSendingMessages.protect(() async {
Map<String, dynamic> retransmit = await getAllMessagesForRetransmitting();
if (retransmit.isEmpty) return; if (retransmit.isEmpty) return;
Logger("api.dart").info("try sending messages: ${retransmit.length}"); Logger("api.dart").info("try sending messages: ${retransmit.length}");
Map<String, dynamic> failed = {}; Map<String, dynamic> failed = {};
for (String key in retransmit.keys) { for (String key in retransmit.keys) {
RetransmitMessage msg = RetransmitMessage msg =
RetransmitMessage.fromJson(jsonDecode(retransmit[key])); RetransmitMessage.fromJson(jsonDecode(retransmit[key]));
Result resp = await apiProvider.sendTextMessage( Result resp = await apiProvider.sendTextMessage(
msg.userId, msg.userId,
msg.bytes, msg.bytes,
msg.pushData, msg.pushData,
); );
if (resp.isSuccess) { if (resp.isSuccess) {
if (msg.messageId != null) { if (msg.messageId != null) {
await twonlyDatabase.messagesDao.updateMessageByMessageId( await twonlyDatabase.messagesDao.updateMessageByMessageId(
msg.messageId!, msg.messageId!,
MessagesCompanion( MessagesCompanion(
acknowledgeByServer: Value(true), acknowledgeByServer: Value(true),
), ),
); );
}
} else {
failed[key] = retransmit[key];
} }
} else {
failed[key] = retransmit[key];
} }
} Box box = await getMediaStorage();
Box box = await getMediaStorage(); box.put("messages-to-retransmit", jsonEncode(failed));
box.put("messages-to-retransmit", jsonEncode(failed)); });
} }
class RetransmitMessage { class RetransmitMessage {
@ -102,54 +107,57 @@ Future<Map<String, dynamic>> getAllMessagesForRetransmitting() async {
Future<Result> encryptAndSendMessage( Future<Result> encryptAndSendMessage(
int? messageId, int userId, MessageJson msg, int? messageId, int userId, MessageJson msg,
{PushKind? pushKind}) async { {PushKind? pushKind}) async {
Uint8List? bytes = await SignalHelper.encryptMessage(msg, userId); return await lockSendingMessages.protect<Result>(() async {
Uint8List? bytes = await SignalHelper.encryptMessage(msg, userId);
if (bytes == null) { if (bytes == null) {
Logger("api.dart").shout("Error encryption message!"); Logger("api.dart").shout("Error encryption message!");
return Result.error(ErrorCode.InternalError); return Result.error(ErrorCode.InternalError);
}
String stateId = (messageId ?? (60001 + Random().nextInt(100000))).toString();
Box box = await getMediaStorage();
List<int>? pushData;
if (pushKind != null) {
pushData = await getPushData(userId, pushKind);
}
{
var retransmit = await getAllMessagesForRetransmitting();
retransmit[stateId] = jsonEncode(RetransmitMessage(
messageId: messageId,
userId: userId,
bytes: bytes,
pushData: pushData,
).toJson());
box.put("messages-to-retransmit", jsonEncode(retransmit));
}
Result resp = await apiProvider.sendTextMessage(userId, bytes, pushData);
if (resp.isSuccess) {
if (messageId != null) {
await twonlyDatabase.messagesDao.updateMessageByMessageId(
messageId,
MessagesCompanion(acknowledgeByServer: Value(true)),
);
{
var retransmit = await getAllMessagesForRetransmitting();
retransmit.remove(stateId);
box.put("messages-to-retransmit", jsonEncode(retransmit));
}
box.delete("retransmit-$messageId-textmessage");
} }
}
return resp; String stateId =
(messageId ?? (60001 + Random().nextInt(100000))).toString();
Box box = await getMediaStorage();
List<int>? pushData;
if (pushKind != null) {
pushData = await getPushData(userId, pushKind);
}
{
var retransmit = await getAllMessagesForRetransmitting();
retransmit[stateId] = jsonEncode(RetransmitMessage(
messageId: messageId,
userId: userId,
bytes: bytes,
pushData: pushData,
).toJson());
box.put("messages-to-retransmit", jsonEncode(retransmit));
}
Result resp = await apiProvider.sendTextMessage(userId, bytes, pushData);
if (resp.isSuccess) {
if (messageId != null) {
await twonlyDatabase.messagesDao.updateMessageByMessageId(
messageId,
MessagesCompanion(acknowledgeByServer: Value(true)),
);
{
var retransmit = await getAllMessagesForRetransmitting();
retransmit.remove(stateId);
box.put("messages-to-retransmit", jsonEncode(retransmit));
}
box.delete("retransmit-$messageId-textmessage");
}
}
return resp;
});
} }
Future sendTextMessage( Future sendTextMessage(

View file

@ -5,6 +5,7 @@ import 'package:drift/drift.dart';
import 'package:fixnum/fixnum.dart'; import 'package:fixnum/fixnum.dart';
import 'package:libsignal_protocol_dart/libsignal_protocol_dart.dart'; import 'package:libsignal_protocol_dart/libsignal_protocol_dart.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:mutex/mutex.dart';
import 'package:twonly/globals.dart'; import 'package:twonly/globals.dart';
import 'package:twonly/src/app.dart'; import 'package:twonly/src/app.dart';
import 'package:twonly/src/database/twonly_database.dart'; import 'package:twonly/src/database/twonly_database.dart';
@ -23,42 +24,36 @@ import 'package:twonly/src/services/notification_service.dart';
// ignore: library_prefixes // ignore: library_prefixes
import 'package:twonly/src/utils/signal.dart' as SignalHelper; import 'package:twonly/src/utils/signal.dart' as SignalHelper;
bool isBlocked = false; final lockHandleServerMessage = Mutex();
Future handleServerMessage(server.ServerToClient msg) async { Future handleServerMessage(server.ServerToClient msg) async {
client.Response? response; return lockHandleServerMessage.protect(() async {
int maxCounter = 0; // only block for 2 seconds client.Response? response;
while (isBlocked && maxCounter < 200) {
await Future.delayed(Duration(milliseconds: 10));
maxCounter += 1;
}
isBlocked = true;
try { try {
if (msg.v0.hasRequestNewPreKeys()) { if (msg.v0.hasRequestNewPreKeys()) {
response = await handleRequestNewPreKey(); response = await handleRequestNewPreKey();
} else if (msg.v0.hasNewMessage()) { } else if (msg.v0.hasNewMessage()) {
Uint8List body = Uint8List.fromList(msg.v0.newMessage.body); Uint8List body = Uint8List.fromList(msg.v0.newMessage.body);
int fromUserId = msg.v0.newMessage.fromUserId.toInt(); int fromUserId = msg.v0.newMessage.fromUserId.toInt();
response = await handleNewMessage(fromUserId, body); response = await handleNewMessage(fromUserId, body);
} else if (msg.v0.hasDownloaddata()) { } else if (msg.v0.hasDownloaddata()) {
response = await handleDownloadData(msg.v0.downloaddata); response = await handleDownloadData(msg.v0.downloaddata);
} else { } else {
Logger("handleServerMessage") Logger("handleServerMessage")
.shout("Got a new message from the server: $msg"); .shout("Got a new message from the server: $msg");
response = client.Response()..error = ErrorCode.InternalError;
}
} catch (e) {
response = client.Response()..error = ErrorCode.InternalError; response = client.Response()..error = ErrorCode.InternalError;
} }
} catch (e) {
response = client.Response()..error = ErrorCode.InternalError;
}
isBlocked = false; var v0 = client.V0()
..seq = msg.v0.seq
..response = response;
var v0 = client.V0() apiProvider.sendResponse(ClientToServer()..v0 = v0);
..seq = msg.v0.seq });
..response = response;
apiProvider.sendResponse(ClientToServer()..v0 = v0);
} }
Future<client.Response> handleDownloadData(DownloadData data) async { Future<client.Response> handleDownloadData(DownloadData data) async {
@ -300,7 +295,7 @@ Future<client.Response> handleNewMessage(int fromUserId, Uint8List body) async {
return client.Response()..error = ErrorCode.InternalError; return client.Response()..error = ErrorCode.InternalError;
} }
encryptAndSendMessage( await encryptAndSendMessage(
message.messageId!, message.messageId!,
fromUserId, fromUserId,
MessageJson( MessageJson(

View file

@ -932,6 +932,14 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "2.0.0" version: "2.0.0"
mutex:
dependency: "direct main"
description:
name: mutex
sha256: "8827da25de792088eb33e572115a5eb0d61d61a3c01acbc8bcbe76ed78f1a1f2"
url: "https://pub.dev"
source: hosted
version: "3.1.0"
nested: nested:
dependency: transitive dependency: transitive
description: description:

View file

@ -61,6 +61,7 @@ dependencies:
flutter_svg: ^2.0.17 flutter_svg: ^2.0.17
flutter_volume_controller: ^1.3.3 flutter_volume_controller: ^1.3.3
fixnum: ^1.1.1 fixnum: ^1.1.1
mutex: ^3.1.0
# avatar_maker # avatar_maker
# avatar_maker: # avatar_maker:
# path: ./dependencies/avatar_maker/ # path: ./dependencies/avatar_maker/