diff --git a/lib/src/services/api.service.dart b/lib/src/services/api.service.dart index e3af08e0..a78e709e 100644 --- a/lib/src/services/api.service.dart +++ b/lib/src/services/api.service.dart @@ -7,7 +7,6 @@ import 'dart:io'; import 'dart:math'; import 'dart:ui' as ui; -import 'package:clock/clock.dart'; import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:drift/drift.dart'; import 'package:fixnum/fixnum.dart'; @@ -80,7 +79,8 @@ class ApiService { Timer? reconnectionTimer; int _reconnectionDelay = 5; - final HashMap messagesV0 = HashMap(); + final HashMap> _pendingRequests = + HashMap(); IOWebSocketChannel? _channel; // ignore: cancel_subscriptions StreamSubscription>? connectivitySubscription; @@ -144,6 +144,13 @@ class ApiService { _channel = null; isAuthenticated = false; _connectionStateController.add(false); + // Complete all pending requests with null on disconnect + for (final completer in _pendingRequests.values) { + if (!completer.isCompleted) { + completer.complete(null); + } + } + _pendingRequests.clear(); await twonlyDB.mediaFilesDao.resetPendingDownloadState(); await startReconnectionTimer(); } @@ -233,7 +240,10 @@ class ApiService { final msg = server.ServerToClient.fromBuffer(msgBuffer as Uint8List); if (msg.v0.hasResponse()) { await removeFromRetransmissionBuffer(msg.v0.seq); - messagesV0[msg.v0.seq] = msg; + final completer = _pendingRequests.remove(msg.v0.seq); + if (completer != null && !completer.isCompleted) { + completer.complete(msg); + } } else { await handleServerMessage(msg); } @@ -243,22 +253,20 @@ class ApiService { } Future _waitForResponse(Int64 seq) async { - final startTime = clock.now(); + final completer = Completer(); + _pendingRequests[seq] = completer; - const timeout = Duration(seconds: 60); - - while (true) { - if (messagesV0[seq] != null) { - final tmp = messagesV0[seq]; - messagesV0.remove(seq); - return tmp; - } - if (clock.now().difference(startTime) > timeout) { + final timer = Timer(const Duration(seconds: 60), () { + if (!completer.isCompleted) { Log.warn('Timeout for message $seq'); - return null; + _pendingRequests.remove(seq); + completer.complete(null); } - await Future.delayed(const Duration(milliseconds: 10)); - } + }); + + final result = await completer.future; + timer.cancel(); + return result; } Future sendResponse(ClientToServer response) async { @@ -315,7 +323,7 @@ class ApiService { int? contactId, }) async { var seq = Int64(Random().nextInt(4294967296)); - while (messagesV0.containsKey(seq)) { + while (_pendingRequests.containsKey(seq)) { seq = Int64(Random().nextInt(4294967296)); }