use also for download background manager

This commit is contained in:
otsmr 2025-06-15 20:47:57 +02:00
parent 88649c0dc0
commit 6946164d1e
8 changed files with 254 additions and 145 deletions

View file

@ -48,7 +48,7 @@ void main() async {
purgeReceivedMediaFiles();
purgeSendMediaFiles();
await initMediaUploader();
await initFileDownloader();
runApp(
MultiProvider(

View file

@ -97,6 +97,18 @@ class MessagesDao extends DatabaseAccessor<TwonlyDatabase>
.get();
}
Future<List<Message>> getAllNonACKMessagesFromUser() {
return (select(messages)
..where((t) =>
t.acknowledgeByUser.equals(false) &
t.messageOtherId.isNull() &
t.errorWhileSending.equals(false) &
t.sendAt.isBiggerThanValue(
DateTime.now().subtract(Duration(minutes: 10)),
)))
.get();
}
Stream<List<Message>> getAllStoredMediaFiles() {
return (select(messages)
..where((t) => t.mediaStored.equals(true))
@ -191,10 +203,6 @@ class MessagesDao extends DatabaseAccessor<TwonlyDatabase>
}
}
Future deleteMessageById(int messageId) {
return (delete(messages)..where((t) => t.messageId.equals(messageId))).go();
}
Future deleteMessagesByContactId(int contactId) {
return (delete(messages)
..where((t) =>
@ -207,7 +215,9 @@ class MessagesDao extends DatabaseAccessor<TwonlyDatabase>
}
Future<bool> containsOtherMessageId(
int fromUserId, int messageOtherId) async {
int fromUserId,
int messageOtherId,
) async {
final query = select(messages)
..where((t) =>
t.messageOtherId.equals(messageOtherId) &

View file

@ -1,5 +1,6 @@
import 'dart:convert';
import 'dart:io';
import 'package:background_downloader/background_downloader.dart';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:drift/drift.dart';
import 'package:path/path.dart';
@ -8,8 +9,6 @@ import 'package:twonly/globals.dart';
import 'package:twonly/src/database/twonly_database.dart';
import 'package:twonly/src/database/tables/messages_table.dart';
import 'package:twonly/src/model/json/message.dart';
import 'package:http/http.dart' as http;
// import 'package:twonly/src/providers/api/api_utils.dart';
import 'package:twonly/src/services/api/media_send.dart';
import 'package:cryptography_plus/cryptography_plus.dart';
import 'package:twonly/src/services/api/utils.dart';
@ -75,6 +74,34 @@ Future<bool> isAllowedToDownload(bool isVideo) async {
return false;
}
Future handleDownloadStatusUpdate(TaskStatusUpdate update) async {
bool failed = false;
int messageId = int.parse(update.task.taskId.replaceAll("download_", ""));
if (update.status == TaskStatus.failed ||
update.status == TaskStatus.canceled) {
Log.error("Download failed: ${update.status}");
failed = true;
} else if (update.status == TaskStatus.complete) {
if (update.responseStatusCode == 200) {
Log.info("Download was successfully for $messageId");
await handleEncryptedFile(messageId);
} else {
Log.error(
"Got invalid response status code: ${update.responseStatusCode}");
}
}
if (failed) {
Message? message = await twonlyDB.messagesDao
.getMessageByMessageId(messageId)
.getSingleOrNull();
if (message != null) {
await handleMediaError(message);
}
}
}
Future startDownloadMedia(Message message, bool force,
{int retryCounter = 0}) async {
if (message.contentJson == null) return;
@ -90,6 +117,7 @@ Future startDownloadMedia(Message message, bool force,
final content =
MessageContent.fromJson(message.kind, jsonDecode(message.contentJson!));
if (content is! MediaMessageContent) return;
if (content.downloadToken == null) return;
@ -135,60 +163,42 @@ Future startDownloadMedia(Message message, bool force,
String apiUrl =
"http${apiService.apiSecure}://${apiService.apiHost}/api/download/$downloadToken";
var httpClient = http.Client();
var request = http.Request('GET', Uri.parse(apiUrl));
var response = httpClient.send(request);
try {
final task = DownloadTask(
url: apiUrl,
taskId: "download_${media.messageId}",
directory: "media/received/",
baseDirectory: BaseDirectory.applicationSupport,
filename: "${media.messageId}.encrypted",
priority: 0,
retries: 10,
);
List<List<int>> chunks = [];
int downloaded = 0;
Log.info(
"Got media file. Starting download: ${downloadToken.substring(0, 10)}");
response.asStream().listen((http.StreamedResponse r) {
r.stream.listen((List<int> chunk) {
// Display percentage of completion
Log.info(
'downloadPercentage: ${downloaded / (r.contentLength ?? 0) * 100}');
final result = await FileDownloader().enqueue(task);
chunks.add(chunk);
downloaded += chunk.length;
}, onDone: () async {
if (r.statusCode != 200) {
Log.error("Download error: $r");
if (r.statusCode == 418) {
Log.error("Got custom error code: ${chunks.toList()}");
handleMediaError(message);
}
return;
}
// Display percentage of completion
Log.info(
'downloadPercentage: ${downloaded / (r.contentLength ?? 0) * 100}');
// Save the file
final Uint8List bytes = Uint8List(r.contentLength ?? 0);
int offset = 0;
for (List<int> chunk in chunks) {
bytes.setRange(offset, offset + chunk.length, chunk);
offset += chunk.length;
}
await writeMediaFile(message.messageId, "encrypted", bytes);
handleEncryptedFile(message,
encryptedBytesTmp: bytes, retryCounter: retryCounter);
return;
});
});
return result;
} catch (e) {
Log.error("Exception during upload: $e");
}
}
Future handleEncryptedFile(
Message msg, {
Uint8List? encryptedBytesTmp,
int retryCounter = 0,
}) async {
Uint8List? encryptedBytes =
encryptedBytesTmp ?? await readMediaFile(msg.messageId, "encrypted");
Future handleEncryptedFile(int messageId) async {
Message? msg = await twonlyDB.messagesDao
.getMessageByMessageId(messageId)
.getSingleOrNull();
if (msg == null) {
Log.error("Not message for downloaded file found: $messageId");
return;
}
Uint8List? encryptedBytes = await readMediaFile(msg.messageId, "encrypted");
if (encryptedBytes == null) {
Log.error("encrypted bytes are not found for ${msg.messageId}");
return;
}
MediaMessageContent content =
@ -198,7 +208,7 @@ Future handleEncryptedFile(
SecretKeyData secretKeyData = SecretKeyData(content.encryptionKey!);
SecretBox secretBox = SecretBox(
encryptedBytes!,
encryptedBytes,
nonce: content.encryptionNonce!,
mac: Mac(content.encryptionMac!),
);
@ -216,15 +226,9 @@ Future handleEncryptedFile(
await writeMediaFile(msg.messageId, "png", imageBytes);
} catch (e) {
if (retryCounter >= 1) {
Log.error(
"could not decrypt the media file in the second try. reporting error to user: $e");
handleMediaError(msg);
return;
}
Log.error("could not decrypt the media file trying again: $e");
startDownloadMedia(msg, true, retryCounter: retryCounter + 1);
// try downloading again....
Log.error(
"could not decrypt the media file in the second try. reporting error to user: $e");
handleMediaError(msg);
return;
}
@ -252,6 +256,7 @@ Future<File?> getVideoPath(int mediaId) async {
Future<Uint8List?> readMediaFile(int mediaId, String type) async {
String basePath = await getMediaFilePath(mediaId, "received");
File file = File("$basePath.$type");
Log.info("Reading: ${file}");
if (!await file.exists()) {
return null;
}
@ -365,3 +370,6 @@ Future<void> purgeMediaFiles(Directory directory) async {
}
}
}
// /data/user/0/eu.twonly.testing/files/media/received/27.encrypted
// /data/user/0/eu.twonly.testing/app_flutter/data/user/0/eu.twonly.testing/files/media/received/27.encrypted

View file

@ -52,73 +52,18 @@ Future<ErrorCode?> isAllowedToSend() async {
return null;
}
Future initMediaUploader() async {
Future initFileDownloader() async {
FileDownloader().updates.listen((update) async {
switch (update) {
case TaskStatusUpdate():
bool failed = false;
int mediaUploadId = int.parse(update.task.taskId);
MediaUpload? media = await twonlyDB.mediaUploadsDao
.getMediaUploadById(mediaUploadId)
.getSingleOrNull();
if (media == null) {
Log.error(
"Got an upload task but no upload media in the media upload database",
);
return;
if (update.task.taskId.contains("upload_")) {
await handleUploadStatusUpdate(update);
}
if (update.status == TaskStatus.failed ||
update.status == TaskStatus.canceled) {
Log.error("Upload failed: ${update.status}");
failed = true;
} else if (update.status == TaskStatus.complete) {
if (update.responseStatusCode == 200) {
Log.info("Upload of $mediaUploadId success!");
await twonlyDB.mediaUploadsDao.updateMediaUpload(
mediaUploadId,
MediaUploadsCompanion(
state: Value(UploadState.receiverNotified),
),
);
for (final messageId in media.messageIds!) {
await twonlyDB.messagesDao.updateMessageByMessageId(
messageId,
MessagesCompanion(
acknowledgeByServer: Value(true),
errorWhileSending: Value(false),
),
);
}
return;
} else if (update.responseStatusCode != null) {
if (update.responseStatusCode! >= 400 &&
update.responseStatusCode! < 500) {
failed = true;
}
Log.error(
"Got error while uploading: ${update.responseStatusCode}",
);
}
if (update.task.taskId.contains("download_")) {
await handleDownloadStatusUpdate(update);
}
if (failed) {
for (final messageId in media.messageIds!) {
await twonlyDB.messagesDao.updateMessageByMessageId(
messageId,
MessagesCompanion(
acknowledgeByServer: Value(true),
errorWhileSending: Value(true),
),
);
}
}
print('Status update for ${update.task} with status ${update.status}');
case TaskProgressUpdate():
print(
Log.info(
'Progress update for ${update.task} with progress ${update.progress}');
}
});
@ -132,8 +77,8 @@ Future initMediaUploader() async {
if (kDebugMode) {
FileDownloader().configureNotification(
running: TaskNotification(
'Uploading',
'Uploading your {filename} ({progress}).',
'Uploading/Downloading',
'{filename} ({progress}).',
),
complete: null,
progressBar: true,
@ -325,8 +270,7 @@ Future encryptMediaFiles(
state.encryptionMac = secretBox.mac.bytes;
final algorithm = Sha256();
state.sha2Hash = (await algorithm.hash(secretBox.cipherText)).bytes;
state.sha2Hash = (await Sha256().hash(secretBox.cipherText)).bytes;
final encryptedBytes = Uint8List.fromList(secretBox.cipherText);
await writeSendMediaFile(
@ -444,6 +388,72 @@ Future handleNextMediaUploadSteps(int mediaUploadId) async {
///
///
///
Future handleUploadStatusUpdate(TaskStatusUpdate update) async {
bool failed = false;
int mediaUploadId = int.parse(update.task.taskId.replaceAll("upload_", ""));
MediaUpload? media = await twonlyDB.mediaUploadsDao
.getMediaUploadById(mediaUploadId)
.getSingleOrNull();
if (media == null) {
Log.error(
"Got an upload task but no upload media in the media upload database",
);
return;
}
if (update.status == TaskStatus.failed ||
update.status == TaskStatus.canceled) {
Log.error("Upload failed: ${update.status}");
failed = true;
} else if (update.status == TaskStatus.complete) {
if (update.responseStatusCode == 200) {
Log.info("Upload of $mediaUploadId success!");
await twonlyDB.mediaUploadsDao.updateMediaUpload(
mediaUploadId,
MediaUploadsCompanion(
state: Value(UploadState.receiverNotified),
),
);
for (final messageId in media.messageIds!) {
await twonlyDB.messagesDao.updateMessageByMessageId(
messageId,
MessagesCompanion(
acknowledgeByServer: Value(true),
errorWhileSending: Value(false),
),
);
}
return;
} else if (update.responseStatusCode != null) {
if (update.responseStatusCode! >= 400 &&
update.responseStatusCode! < 500) {
failed = true;
}
Log.error(
"Got error while uploading: ${update.responseStatusCode}",
);
}
}
if (failed) {
for (final messageId in media.messageIds!) {
await twonlyDB.messagesDao.updateMessageByMessageId(
messageId,
MessagesCompanion(
acknowledgeByServer: Value(true),
errorWhileSending: Value(true),
),
);
}
}
Log.info(
'Status update for ${update.task.taskId} with status ${update.status}');
}
Future handleUploadError(MediaUpload mediaUpload) async {
// if the messageIds are already there notify the user about this error...
if (mediaUpload.messageIds != null) {
@ -496,6 +506,20 @@ Future<bool> handleMediaUpload(MediaUpload media) async {
if (message == null) continue;
Contact? contact = await twonlyDB.contactsDao
.getContactByUserId(message.contactId)
.getSingleOrNull();
if (contact == null || contact.deleted) {
Log.warn(
"Contact deleted ${message.contactId} or not found in database.");
await twonlyDB.messagesDao.updateMessageByMessageId(
message.messageId,
MessagesCompanion(errorWhileSending: Value(true)),
);
continue;
}
await twonlyDB.contactsDao.incFlameCounter(
message.contactId,
false,
@ -552,7 +576,7 @@ Future<bool> handleMediaUpload(MediaUpload media) async {
try {
final task = UploadTask.fromFile(
taskId: "${media.mediaUploadId}",
taskId: "upload_${media.mediaUploadId}",
displayName: (media.metadata?.isVideo ?? false) ? "image" : "video",
file: uploadRequestFile,
url: apiUrl,

View file

@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:drift/drift.dart';
@ -13,6 +14,43 @@ import 'package:twonly/src/services/notification.service.dart';
import 'package:twonly/src/utils/log.dart';
import 'package:twonly/src/utils/storage.dart';
class DirtyResendingItem {
DirtyResendingItem({required this.gotLastAck});
DateTime gotLastAck;
Timer? timer;
}
class DirtyResending {
static final Map<int, DirtyResendingItem> _gotLastAck = {};
static Future gotAckFromUser(int contactID) async {
_gotLastAck[contactID]?.timer?.cancel();
_gotLastAck[contactID] = DirtyResendingItem(gotLastAck: DateTime.now());
_gotLastAck[contactID]?.timer = Timer(Duration(seconds: 10), () async {
_gotLastAck.remove(contactID);
_handleNonACKMessagesForUser(contactID);
});
}
static Future _handleNonACKMessagesForUser(int contactID) async {
final List<Message> toResendMessages =
await twonlyDB.messagesDao.getAllNonACKMessagesFromUser();
for (final Message message in toResendMessages) {
Log.info("Got newer ACKs from user ${message.messageId}");
await twonlyDB.messagesDao.updateMessageByMessageId(
message.messageId,
MessagesCompanion(
errorWhileSending: Value(true),
),
);
}
}
}
Future handleOlderNonAckMessages() async {}
Future tryTransmitMessages() async {
final retransIds =
await twonlyDB.messageRetransmissionDao.getRetransmitAbleMessages();
@ -36,6 +74,20 @@ Future sendRetransmitMessage(int retransId) async {
return;
}
Contact? contact = await twonlyDB.contactsDao
.getContactByUserId(retrans.contactId)
.getSingleOrNull();
if (contact == null || contact.deleted) {
Log.warn("Contact deleted $retransId or not found in database.");
if (retrans.messageId != null) {
await twonlyDB.messagesDao.updateMessageByMessageId(
retrans.messageId!,
MessagesCompanion(errorWhileSending: Value(true)),
);
}
return;
}
Uint8List? encryptedBytes = await signalEncryptMessage(
retrans.contactId,
retrans.plaintextContent,
@ -155,7 +207,9 @@ Future sendTextMessage(
}
Future notifyContactAboutOpeningMessage(
int fromUserId, List<int> messageOtherIds) async {
int fromUserId,
List<int> messageOtherIds,
) async {
for (final messageOtherId in messageOtherIds) {
await encryptAndSendMessageAsync(
null,

View file

@ -1,4 +1,5 @@
import 'dart:convert';
import 'package:cryptography_plus/cryptography_plus.dart';
import 'package:drift/drift.dart';
import 'package:fixnum/fixnum.dart';
import 'package:libsignal_protocol_dart/libsignal_protocol_dart.dart';
@ -13,6 +14,7 @@ import 'package:twonly/src/model/protobuf/api/websocket/client_to_server.pbserve
import 'package:twonly/src/model/protobuf/api/websocket/error.pb.dart';
import 'package:twonly/src/model/protobuf/api/websocket/server_to_client.pb.dart'
as server;
import 'package:twonly/src/services/api/media_send.dart';
import 'package:twonly/src/services/api/messages.dart';
import 'package:twonly/src/services/api/utils.dart';
import 'package:twonly/src/services/api/media_received.dart';
@ -34,6 +36,9 @@ Future handleServerMessage(server.ServerToClient msg) async {
} else if (msg.v0.hasNewMessage()) {
Uint8List body = Uint8List.fromList(msg.v0.newMessage.body);
int fromUserId = msg.v0.newMessage.fromUserId.toInt();
var hash = uint8ListToHex(Uint8List.fromList(
(await Sha256().hash(msg.v0.newMessage.body)).bytes));
Log.info("Got new message from server: ${hash.substring(0, 10)}");
response = await handleNewMessage(fromUserId, body);
} else {
Log.error("Got a new message from the server: $msg");
@ -59,6 +64,8 @@ Future<client.Response> handleNewMessage(int fromUserId, Uint8List body) async {
return client.Response()..ok = ok;
}
Log.info("Got: ${message.kind}");
switch (message.kind) {
case MessageKind.contactRequest:
return handleContactRequest(fromUserId, message);
@ -151,6 +158,10 @@ Future<client.Response> handleNewMessage(int fromUserId, Uint8List body) async {
message.messageId!,
update,
);
// search for older messages, that where not yet ack by the other party
DirtyResending.gotAckFromUser(fromUserId);
break;
case MessageKind.pushKey:
@ -186,6 +197,8 @@ Future<client.Response> handleNewMessage(int fromUserId, Uint8List body) async {
// when a message is received doubled ignore it...
if ((await twonlyDB.messagesDao
.containsOtherMessageId(fromUserId, message.messageId!))) {
Log.error(
"Got a duplicated message from other user: ${message.messageId!}");
var ok = client.Response_Ok()..none = true;
return client.Response()..ok = ok;
}
@ -215,11 +228,11 @@ Future<client.Response> handleNewMessage(int fromUserId, Uint8List body) async {
fromUserId,
responseToMessageId,
MessagesCompanion(
errorWhileSending: Value(false),
openedAt: Value(
DateTime.now(),
) // when a user reacted to the media file, it should be marked as opened
),
errorWhileSending: Value(false),
openedAt: Value(
DateTime.now(),
), // when a user reacted to the media file, it should be marked as opened
),
);
}
@ -247,6 +260,7 @@ Future<client.Response> handleNewMessage(int fromUserId, Uint8List body) async {
if (messageId == null) {
return client.Response()..error = ErrorCode.InternalError;
}
if (message.kind == MessageKind.media) {
twonlyDB.contactsDao.incFlameCounter(
fromUserId,

View file

@ -26,9 +26,7 @@ Future<Uint8List?> signalEncryptMessage(
SignalContactPreKey? preKey = await getPreKeyByContactId(target);
SignalContactSignedPreKey? signedPreKey =
await getSignedPreKeyByContactId(
target,
);
await getSignedPreKeyByContactId(target);
if (signedPreKey != null) {
SessionBuilder sessionBuilder = SessionBuilder.fromSignalStore(
@ -127,10 +125,6 @@ Future<MessageJson?> signalDecryptMessage(int source, Uint8List msg) async {
),
),
);
} on InvalidKeyIdException catch (_) {
return null; // got the same message again
} on DuplicateMessageException catch (_) {
return null; // to the same message again
} catch (e) {
Log.error(e.toString());
return null;

View file

@ -1,6 +1,7 @@
import 'dart:io';
import 'package:flutter/foundation.dart';
import 'package:logging/logging.dart';
import 'package:mutex/mutex.dart';
import 'package:path_provider/path_provider.dart';
void initLogger() {
@ -28,6 +29,8 @@ class Log {
}
}
Mutex writeToLogGuard = Mutex();
Future<void> _writeLogToFile(LogRecord record) async {
final directory = await getApplicationSupportDirectory();
final logFile = File('${directory.path}/app.log');
@ -36,8 +39,10 @@ Future<void> _writeLogToFile(LogRecord record) async {
final logMessage =
'${DateTime.now().toString().split(".")[0]} ${record.level.name} [twonly] ${record.loggerName} > ${record.message}\n';
// Append the log message to the file
await logFile.writeAsString(logMessage, mode: FileMode.append);
writeToLogGuard.protect(() async {
// Append the log message to the file
await logFile.writeAsString(logMessage, mode: FileMode.append);
});
}
String _getCallerSourceCodeFilename() {