replace for loop with Completer

This commit is contained in:
otsmr 2026-04-22 19:47:56 +02:00
parent 1371cf80cb
commit fcb93830e1

View file

@ -7,7 +7,6 @@ import 'dart:io';
import 'dart:math'; import 'dart:math';
import 'dart:ui' as ui; import 'dart:ui' as ui;
import 'package:clock/clock.dart';
import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:drift/drift.dart'; import 'package:drift/drift.dart';
import 'package:fixnum/fixnum.dart'; import 'package:fixnum/fixnum.dart';
@ -80,7 +79,8 @@ class ApiService {
Timer? reconnectionTimer; Timer? reconnectionTimer;
int _reconnectionDelay = 5; int _reconnectionDelay = 5;
final HashMap<Int64, server.ServerToClient?> messagesV0 = HashMap(); final HashMap<Int64, Completer<server.ServerToClient?>> _pendingRequests =
HashMap();
IOWebSocketChannel? _channel; IOWebSocketChannel? _channel;
// ignore: cancel_subscriptions // ignore: cancel_subscriptions
StreamSubscription<List<ConnectivityResult>>? connectivitySubscription; StreamSubscription<List<ConnectivityResult>>? connectivitySubscription;
@ -144,6 +144,13 @@ class ApiService {
_channel = null; _channel = null;
isAuthenticated = false; isAuthenticated = false;
_connectionStateController.add(false); _connectionStateController.add(false);
// Complete all pending requests with null on disconnect
for (final completer in _pendingRequests.values) {
if (!completer.isCompleted) {
completer.complete(null);
}
}
_pendingRequests.clear();
await twonlyDB.mediaFilesDao.resetPendingDownloadState(); await twonlyDB.mediaFilesDao.resetPendingDownloadState();
await startReconnectionTimer(); await startReconnectionTimer();
} }
@ -233,7 +240,10 @@ class ApiService {
final msg = server.ServerToClient.fromBuffer(msgBuffer as Uint8List); final msg = server.ServerToClient.fromBuffer(msgBuffer as Uint8List);
if (msg.v0.hasResponse()) { if (msg.v0.hasResponse()) {
await removeFromRetransmissionBuffer(msg.v0.seq); await removeFromRetransmissionBuffer(msg.v0.seq);
messagesV0[msg.v0.seq] = msg; final completer = _pendingRequests.remove(msg.v0.seq);
if (completer != null && !completer.isCompleted) {
completer.complete(msg);
}
} else { } else {
await handleServerMessage(msg); await handleServerMessage(msg);
} }
@ -243,22 +253,20 @@ class ApiService {
} }
Future<server.ServerToClient?> _waitForResponse(Int64 seq) async { Future<server.ServerToClient?> _waitForResponse(Int64 seq) async {
final startTime = clock.now(); final completer = Completer<server.ServerToClient?>();
_pendingRequests[seq] = completer;
const timeout = Duration(seconds: 60); final timer = Timer(const Duration(seconds: 60), () {
if (!completer.isCompleted) {
while (true) {
if (messagesV0[seq] != null) {
final tmp = messagesV0[seq];
messagesV0.remove(seq);
return tmp;
}
if (clock.now().difference(startTime) > timeout) {
Log.warn('Timeout for message $seq'); Log.warn('Timeout for message $seq');
return null; _pendingRequests.remove(seq);
} completer.complete(null);
await Future.delayed(const Duration(milliseconds: 10));
} }
});
final result = await completer.future;
timer.cancel();
return result;
} }
Future<void> sendResponse(ClientToServer response) async { Future<void> sendResponse(ClientToServer response) async {
@ -315,7 +323,7 @@ class ApiService {
int? contactId, int? contactId,
}) async { }) async {
var seq = Int64(Random().nextInt(4294967296)); var seq = Int64(Random().nextInt(4294967296));
while (messagesV0.containsKey(seq)) { while (_pendingRequests.containsKey(seq)) {
seq = Int64(Random().nextInt(4294967296)); seq = Int64(Random().nextInt(4294967296));
} }