implement retransmission for encrypted text messages

This commit is contained in:
otsmr 2025-01-31 00:39:02 +01:00
parent dd31f18e07
commit 57309ae775
8 changed files with 110 additions and 63 deletions

View file

@ -72,10 +72,9 @@ class MessageSendStateIcon extends StatelessWidget {
bool isDownloading = false;
if (message.messageContent != null &&
message.messageContent!.downloadToken != null) {
isDownloading = context
.watch<DownloadChangeProvider>()
.currentlyDownloading
.contains(message.messageContent!.downloadToken!);
final test = context.watch<DownloadChangeProvider>().currentlyDownloading;
isDownloading =
test.contains(message.messageContent!.downloadToken.toString());
}
if (isDownloading) {

View file

@ -194,6 +194,15 @@ class DbMessages extends CvModelBase {
return messages;
}
static Future<List<DbMessage>> getAllMessagesForRetransmitting() async {
var rows = await dbProvider.db!.query(
tableName,
where: "$columnMessageAcknowledgeByServer = 0",
);
List<DbMessage> messages = await convertToDbMessage(rows);
return messages;
}
static Future<List<DbMessage>> getAllMessagesForUser(int otherUserId) async {
var rows = await dbProvider.db!.query(
tableName,

View file

@ -17,6 +17,33 @@ import 'package:twonly/src/utils/misc.dart';
import 'package:twonly/src/utils/signal.dart' as SignalHelper;
// this functions ensures that the message is received by the server and in case of errors will try again later
Future tryTransmitMessages() async {
List<DbMessage> retransmit =
await DbMessages.getAllMessagesForRetransmitting();
debugPrint("tryTransmitMessages: ${retransmit.length}");
Box box = await getMediaStorage();
for (int i = 0; i < retransmit.length; i++) {
int msgId = retransmit[i].messageId;
debugPrint("msgId=$msgId");
Uint8List? bytes = box.get("retransmit-$msgId");
debugPrint("bytes == null =${bytes == null}");
if (bytes != null) {
Result resp = await apiProvider.sendTextMessage(
Int64(retransmit[i].otherUserId), bytes);
if (resp.isSuccess) {
DbMessages.acknowledgeMessageByServer(msgId);
box.delete("retransmit-$msgId");
} else {
// in case of error do nothing. As the message is not removed the app will try again when relaunched
}
}
}
}
Future<Result> encryptAndSendMessage(Int64 userId, Message msg) async {
Uint8List? bytes = await SignalHelper.encryptMessage(msg, userId);
@ -25,18 +52,19 @@ Future<Result> encryptAndSendMessage(Int64 userId, Message msg) async {
return Result.error(ErrorCode.InternalError);
}
Logger("api.dart").shout(
"TODO: store encrypted message and send later again. STORE: userId, bytes and messageId");
Box box = await getMediaStorage();
if (msg.messageId != null) {
debugPrint("putting=${msg.messageId}");
box.put("retransmit-${msg.messageId}", bytes);
}
Result resp = await apiProvider.sendTextMessage(userId, bytes);
if (resp.isSuccess) {
if (msg.messageId != null) {
DbMessages.acknowledgeMessageByServer(msg.messageId!);
box.delete("retransmit-${msg.messageId}");
}
// TODO: remove encrypted tmp file
} else {
// in case of error do nothing. As the message is not removed the app will try again when relaunched
}
return resp;
@ -65,22 +93,6 @@ Future sendImageToSingleTarget(Int64 target, Uint8List imageBytes) async {
await DbMessages.insertMyMessage(target.toInt(), MessageKind.image);
if (messageId == null) return;
Result res = await apiProvider.getUploadToken();
if (res.isError || !res.value.hasUploadtoken()) {
Logger("api.dart").shout("Error getting upload token!");
// TODO store message for later and try again
return null;
}
List<int> uploadToken = res.value.uploadtoken;
Logger("sendImageToSingleTarget").fine("Got token: $uploadToken");
MessageContent content =
MessageContent(text: null, downloadToken: uploadToken);
Uint8List? encryptBytes = await SignalHelper.encryptBytes(imageBytes, target);
if (encryptBytes == null) {
await DbMessages.deleteMessageById(messageId);
@ -88,14 +100,25 @@ Future sendImageToSingleTarget(Int64 target, Uint8List imageBytes) async {
return;
}
List<int>? imageToken =
await apiProvider.uploadData(uploadToken, encryptBytes);
if (imageToken == null) {
Logger("api.dart").shout("handle error uploading like saving...");
return;
Result res = await apiProvider.getUploadToken();
if (res.isError || !res.value.hasUploadtoken()) {
print("store encryptBytes in box to retransmit without an upload token");
Logger("api.dart").shout("Error getting upload token!");
return null;
}
print("TODO: insert into DB and then create this MESSAGE");
List<int> uploadToken = res.value.uploadtoken;
MessageContent content =
MessageContent(text: null, downloadToken: uploadToken);
print("fragmentate the data");
if (!await apiProvider.uploadData(uploadToken, encryptBytes, 0)) {
Logger("api.dart").shout("error while uploading image");
return;
}
Message msg = Message(
kind: MessageKind.image,
@ -140,7 +163,7 @@ Future tryDownloadMedia(List<int> mediaToken, {bool force = false}) async {
if (media != null && media.isNotEmpty) {
offset = media.length;
}
globalCallBackOnDownloadChange(mediaToken, true);
//globalCallBackOnDownloadChange(mediaToken, true);
apiProvider.triggerDownload(mediaToken, offset);
}

View file

@ -8,6 +8,7 @@ import 'package:twonly/src/app.dart';
import 'package:twonly/src/proto/api/client_to_server.pbserver.dart';
import 'package:twonly/src/proto/api/error.pb.dart';
import 'package:twonly/src/proto/api/server_to_client.pb.dart' as server;
import 'package:twonly/src/providers/api/api.dart';
import 'package:twonly/src/providers/api/api_utils.dart';
import 'package:twonly/src/providers/api/server_messages.dart';
import 'package:twonly/src/utils/misc.dart';
@ -52,6 +53,14 @@ class ApiProvider {
}
}
Future onConnected() async {
await authenticate();
globalCallbackConnectionState(true);
_reconnectionDelay = 5;
tryTransmitMessages();
}
Future<bool> connect() async {
if (_channel != null && _channel!.closeCode != null) {
return true;
@ -63,17 +72,13 @@ class ApiProvider {
log.info("Trying to connect to the backend $apiUrl!");
if (await _connectTo(apiUrl)) {
await authenticate();
globalCallbackConnectionState(true);
_reconnectionDelay = 5;
onConnected();
return true;
}
if (backupApiUrl != null) {
log.info("Trying to connect to the backup backend $backupApiUrl!");
if (await _connectTo(backupApiUrl!)) {
globalCallbackConnectionState(true);
await authenticate();
_reconnectionDelay = 5;
onConnected();
return true;
}
}
@ -265,18 +270,16 @@ class ApiProvider {
return await _sendRequestV0(req);
}
Future<List<int>?> uploadData(List<int> uploadToken, Uint8List data) async {
log.shout("fragmentate the data");
Future<bool> uploadData(
List<int> uploadToken, Uint8List data, int offset) async {
var get = ApplicationData_UploadData()
..uploadToken = uploadToken
..data = data
..offset = 0;
..offset = offset;
var appData = ApplicationData()..uploaddata = get;
var req = createClientToServerFromApplicationData(appData);
final result = await _sendRequestV0(req);
return result.isSuccess ? uploadToken : null;
return result.isSuccess;
}
Future<Result> getUserData(String username) async {

View file

@ -2,15 +2,19 @@ import 'dart:collection';
import 'package:flutter/foundation.dart';
class DownloadChangeProvider with ChangeNotifier, DiagnosticableTreeMixin {
final HashSet<List<int>> _currentlyDownloading = HashSet<List<int>>();
final HashSet<String> _currentlyDownloading = HashSet<String>();
HashSet<List<int>> get currentlyDownloading => _currentlyDownloading;
HashSet<String> get currentlyDownloading => _currentlyDownloading;
void update(List<int> token, bool add) {
debugPrint("Downloading: $add : $token");
if (add) {
_currentlyDownloading.add(token);
_currentlyDownloading.add(token.toString());
} else {
_currentlyDownloading.remove(token);
_currentlyDownloading.remove(token.toString());
}
debugPrint("Downloading: $add : ${_currentlyDownloading.toList()}");
notifyListeners();
}
}

View file

@ -21,6 +21,7 @@ class MessagesChangeProvider with ChangeNotifier, DiagnosticableTreeMixin {
changeCounter[targetUserId] = 0;
}
changeCounter[targetUserId] = changeCounter[targetUserId]! + 1;
notifyListeners();
}
void init() async {
@ -33,5 +34,6 @@ class MessagesChangeProvider with ChangeNotifier, DiagnosticableTreeMixin {
_lastMessage[last.otherUserId] = last;
}
}
notifyListeners();
}
}

View file

@ -26,13 +26,13 @@ class ChatListEntry extends StatelessWidget {
MessageSendState state = message.getSendState();
bool isDownloading = false;
if (message.messageContent != null &&
message.messageContent!.downloadToken != null) {
isDownloading = context
.watch<DownloadChangeProvider>()
.currentlyDownloading
.contains(message.messageContent!.downloadToken!);
}
// if (message.messageContent != null &&
// message.messageContent!.downloadToken != null) {
// isDownloading = context
// .watch<DownloadChangeProvider>()
// .currentlyDownloading
// .contains(message.messageContent!.downloadToken!);
// }
Widget child = Container();
@ -148,7 +148,14 @@ class _ChatItemDetailsViewState extends State<ChatItemDetailsView> {
_messages.insertAll(0, toAppend);
}
setState(() {});
try {
if (context.mounted) {
setState(() {});
}
} catch (e) {
// state should be disposed
return;
}
if (updateOpenStatus) {
_messages.where((x) => x.messageOpenedAt == null).forEach((message) {

View file

@ -151,13 +151,13 @@ class _UserListItem extends State<UserListItem> {
MessageSendState state = widget.lastMessage.getSendState();
bool isDownloading = false;
if (widget.lastMessage.messageContent != null &&
widget.lastMessage.messageContent!.downloadToken != null) {
isDownloading = context
.watch<DownloadChangeProvider>()
.currentlyDownloading
.contains(widget.lastMessage.messageContent!.downloadToken!);
}
// if (widget.lastMessage.messageContent != null &&
// widget.lastMessage.messageContent!.downloadToken != null) {
// isDownloading = context
// .watch<DownloadChangeProvider>()
// .currentlyDownloading
// .contains(widget.lastMessage.messageContent!.downloadToken!);
// }
return UserContextMenu(
user: widget.user,