diff --git a/ios/build/.last_build_id b/ios/build/.last_build_id new file mode 100644 index 0000000..7769d20 --- /dev/null +++ b/ios/build/.last_build_id @@ -0,0 +1 @@ +ce49e7d90cd902197f9a9cbc84219d23 \ No newline at end of file diff --git a/lib/src/providers/api/api.dart b/lib/src/providers/api/api.dart index 59edb6a..ea32000 100644 --- a/lib/src/providers/api/api.dart +++ b/lib/src/providers/api/api.dart @@ -3,6 +3,7 @@ import 'dart:math'; import 'package:drift/drift.dart'; import 'package:hive/hive.dart'; import 'package:logging/logging.dart'; +import 'package:mutex/mutex.dart'; import 'package:twonly/globals.dart'; import 'package:twonly/src/database/twonly_database.dart'; import 'package:twonly/src/database/tables/messages_table.dart'; @@ -16,40 +17,44 @@ import 'package:twonly/src/services/notification_service.dart'; import 'package:twonly/src/utils/signal.dart' as SignalHelper; import 'package:twonly/src/utils/storage.dart'; +final lockSendingMessages = Mutex(); + Future tryTransmitMessages() async { - Map retransmit = await getAllMessagesForRetransmitting(); + lockSendingMessages.protect(() async { + Map retransmit = await getAllMessagesForRetransmitting(); - if (retransmit.isEmpty) return; + if (retransmit.isEmpty) return; - Logger("api.dart").info("try sending messages: ${retransmit.length}"); + Logger("api.dart").info("try sending messages: ${retransmit.length}"); - Map failed = {}; + Map failed = {}; - for (String key in retransmit.keys) { - RetransmitMessage msg = - RetransmitMessage.fromJson(jsonDecode(retransmit[key])); + for (String key in retransmit.keys) { + RetransmitMessage msg = + RetransmitMessage.fromJson(jsonDecode(retransmit[key])); - Result resp = await apiProvider.sendTextMessage( - msg.userId, - msg.bytes, - msg.pushData, - ); + Result resp = await apiProvider.sendTextMessage( + msg.userId, + msg.bytes, + msg.pushData, + ); - if (resp.isSuccess) { - if (msg.messageId != null) { - await twonlyDatabase.messagesDao.updateMessageByMessageId( - msg.messageId!, - MessagesCompanion( - acknowledgeByServer: Value(true), - ), - ); + if (resp.isSuccess) { + if (msg.messageId != null) { + await twonlyDatabase.messagesDao.updateMessageByMessageId( + msg.messageId!, + MessagesCompanion( + acknowledgeByServer: Value(true), + ), + ); + } + } else { + failed[key] = retransmit[key]; } - } else { - failed[key] = retransmit[key]; } - } - Box box = await getMediaStorage(); - box.put("messages-to-retransmit", jsonEncode(failed)); + Box box = await getMediaStorage(); + box.put("messages-to-retransmit", jsonEncode(failed)); + }); } class RetransmitMessage { @@ -102,54 +107,57 @@ Future> getAllMessagesForRetransmitting() async { Future encryptAndSendMessage( int? messageId, int userId, MessageJson msg, {PushKind? pushKind}) async { - Uint8List? bytes = await SignalHelper.encryptMessage(msg, userId); + return await lockSendingMessages.protect(() async { + Uint8List? bytes = await SignalHelper.encryptMessage(msg, userId); - if (bytes == null) { - Logger("api.dart").shout("Error encryption message!"); - return Result.error(ErrorCode.InternalError); - } - - String stateId = (messageId ?? (60001 + Random().nextInt(100000))).toString(); - Box box = await getMediaStorage(); - - List? pushData; - if (pushKind != null) { - pushData = await getPushData(userId, pushKind); - } - - { - var retransmit = await getAllMessagesForRetransmitting(); - - retransmit[stateId] = jsonEncode(RetransmitMessage( - messageId: messageId, - userId: userId, - bytes: bytes, - pushData: pushData, - ).toJson()); - - box.put("messages-to-retransmit", jsonEncode(retransmit)); - } - - Result resp = await apiProvider.sendTextMessage(userId, bytes, pushData); - - if (resp.isSuccess) { - if (messageId != null) { - await twonlyDatabase.messagesDao.updateMessageByMessageId( - messageId, - MessagesCompanion(acknowledgeByServer: Value(true)), - ); - - { - var retransmit = await getAllMessagesForRetransmitting(); - retransmit.remove(stateId); - box.put("messages-to-retransmit", jsonEncode(retransmit)); - } - - box.delete("retransmit-$messageId-textmessage"); + if (bytes == null) { + Logger("api.dart").shout("Error encryption message!"); + return Result.error(ErrorCode.InternalError); } - } - return resp; + String stateId = + (messageId ?? (60001 + Random().nextInt(100000))).toString(); + Box box = await getMediaStorage(); + + List? pushData; + if (pushKind != null) { + pushData = await getPushData(userId, pushKind); + } + + { + var retransmit = await getAllMessagesForRetransmitting(); + + retransmit[stateId] = jsonEncode(RetransmitMessage( + messageId: messageId, + userId: userId, + bytes: bytes, + pushData: pushData, + ).toJson()); + + box.put("messages-to-retransmit", jsonEncode(retransmit)); + } + + Result resp = await apiProvider.sendTextMessage(userId, bytes, pushData); + + if (resp.isSuccess) { + if (messageId != null) { + await twonlyDatabase.messagesDao.updateMessageByMessageId( + messageId, + MessagesCompanion(acknowledgeByServer: Value(true)), + ); + + { + var retransmit = await getAllMessagesForRetransmitting(); + retransmit.remove(stateId); + box.put("messages-to-retransmit", jsonEncode(retransmit)); + } + + box.delete("retransmit-$messageId-textmessage"); + } + } + + return resp; + }); } Future sendTextMessage( diff --git a/lib/src/providers/api/server_messages.dart b/lib/src/providers/api/server_messages.dart index 2e90367..a7227f1 100644 --- a/lib/src/providers/api/server_messages.dart +++ b/lib/src/providers/api/server_messages.dart @@ -5,6 +5,7 @@ import 'package:drift/drift.dart'; import 'package:fixnum/fixnum.dart'; import 'package:libsignal_protocol_dart/libsignal_protocol_dart.dart'; import 'package:logging/logging.dart'; +import 'package:mutex/mutex.dart'; import 'package:twonly/globals.dart'; import 'package:twonly/src/app.dart'; import 'package:twonly/src/database/twonly_database.dart'; @@ -23,42 +24,36 @@ import 'package:twonly/src/services/notification_service.dart'; // ignore: library_prefixes import 'package:twonly/src/utils/signal.dart' as SignalHelper; -bool isBlocked = false; +final lockHandleServerMessage = Mutex(); Future handleServerMessage(server.ServerToClient msg) async { - client.Response? response; - int maxCounter = 0; // only block for 2 seconds - while (isBlocked && maxCounter < 200) { - await Future.delayed(Duration(milliseconds: 10)); - maxCounter += 1; - } - isBlocked = true; + return lockHandleServerMessage.protect(() async { + client.Response? response; - try { - if (msg.v0.hasRequestNewPreKeys()) { - response = await handleRequestNewPreKey(); - } else if (msg.v0.hasNewMessage()) { - Uint8List body = Uint8List.fromList(msg.v0.newMessage.body); - int fromUserId = msg.v0.newMessage.fromUserId.toInt(); - response = await handleNewMessage(fromUserId, body); - } else if (msg.v0.hasDownloaddata()) { - response = await handleDownloadData(msg.v0.downloaddata); - } else { - Logger("handleServerMessage") - .shout("Got a new message from the server: $msg"); + try { + if (msg.v0.hasRequestNewPreKeys()) { + response = await handleRequestNewPreKey(); + } else if (msg.v0.hasNewMessage()) { + Uint8List body = Uint8List.fromList(msg.v0.newMessage.body); + int fromUserId = msg.v0.newMessage.fromUserId.toInt(); + response = await handleNewMessage(fromUserId, body); + } else if (msg.v0.hasDownloaddata()) { + response = await handleDownloadData(msg.v0.downloaddata); + } else { + Logger("handleServerMessage") + .shout("Got a new message from the server: $msg"); + response = client.Response()..error = ErrorCode.InternalError; + } + } catch (e) { response = client.Response()..error = ErrorCode.InternalError; } - } catch (e) { - response = client.Response()..error = ErrorCode.InternalError; - } - isBlocked = false; + var v0 = client.V0() + ..seq = msg.v0.seq + ..response = response; - var v0 = client.V0() - ..seq = msg.v0.seq - ..response = response; - - apiProvider.sendResponse(ClientToServer()..v0 = v0); + apiProvider.sendResponse(ClientToServer()..v0 = v0); + }); } Future handleDownloadData(DownloadData data) async { @@ -300,7 +295,7 @@ Future handleNewMessage(int fromUserId, Uint8List body) async { return client.Response()..error = ErrorCode.InternalError; } - encryptAndSendMessage( + await encryptAndSendMessage( message.messageId!, fromUserId, MessageJson( diff --git a/pubspec.lock b/pubspec.lock index e21fa18..d3aebfe 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -932,6 +932,14 @@ packages: url: "https://pub.dev" source: hosted version: "2.0.0" + mutex: + dependency: "direct main" + description: + name: mutex + sha256: "8827da25de792088eb33e572115a5eb0d61d61a3c01acbc8bcbe76ed78f1a1f2" + url: "https://pub.dev" + source: hosted + version: "3.1.0" nested: dependency: transitive description: diff --git a/pubspec.yaml b/pubspec.yaml index 805d00a..36fc262 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -61,6 +61,7 @@ dependencies: flutter_svg: ^2.0.17 flutter_volume_controller: ^1.3.3 fixnum: ^1.1.1 + mutex: ^3.1.0 # avatar_maker # avatar_maker: # path: ./dependencies/avatar_maker/