diff --git a/lib/main.dart b/lib/main.dart index 0c5e3c1..0e45a9f 100644 --- a/lib/main.dart +++ b/lib/main.dart @@ -48,7 +48,7 @@ void main() async { purgeReceivedMediaFiles(); purgeSendMediaFiles(); - await initMediaUploader(); + await initFileDownloader(); runApp( MultiProvider( diff --git a/lib/src/database/daos/messages_dao.dart b/lib/src/database/daos/messages_dao.dart index 11728e7..68c8dc8 100644 --- a/lib/src/database/daos/messages_dao.dart +++ b/lib/src/database/daos/messages_dao.dart @@ -97,6 +97,18 @@ class MessagesDao extends DatabaseAccessor .get(); } + Future> getAllNonACKMessagesFromUser() { + return (select(messages) + ..where((t) => + t.acknowledgeByUser.equals(false) & + t.messageOtherId.isNull() & + t.errorWhileSending.equals(false) & + t.sendAt.isBiggerThanValue( + DateTime.now().subtract(Duration(minutes: 10)), + ))) + .get(); + } + Stream> getAllStoredMediaFiles() { return (select(messages) ..where((t) => t.mediaStored.equals(true)) @@ -191,10 +203,6 @@ class MessagesDao extends DatabaseAccessor } } - Future deleteMessageById(int messageId) { - return (delete(messages)..where((t) => t.messageId.equals(messageId))).go(); - } - Future deleteMessagesByContactId(int contactId) { return (delete(messages) ..where((t) => @@ -207,7 +215,9 @@ class MessagesDao extends DatabaseAccessor } Future containsOtherMessageId( - int fromUserId, int messageOtherId) async { + int fromUserId, + int messageOtherId, + ) async { final query = select(messages) ..where((t) => t.messageOtherId.equals(messageOtherId) & diff --git a/lib/src/services/api/media_received.dart b/lib/src/services/api/media_received.dart index ab3b478..c9b8231 100644 --- a/lib/src/services/api/media_received.dart +++ b/lib/src/services/api/media_received.dart @@ -1,5 +1,6 @@ import 'dart:convert'; import 'dart:io'; +import 'package:background_downloader/background_downloader.dart'; import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:drift/drift.dart'; import 'package:path/path.dart'; @@ -8,8 +9,6 @@ import 'package:twonly/globals.dart'; import 'package:twonly/src/database/twonly_database.dart'; import 'package:twonly/src/database/tables/messages_table.dart'; import 'package:twonly/src/model/json/message.dart'; -import 'package:http/http.dart' as http; -// import 'package:twonly/src/providers/api/api_utils.dart'; import 'package:twonly/src/services/api/media_send.dart'; import 'package:cryptography_plus/cryptography_plus.dart'; import 'package:twonly/src/services/api/utils.dart'; @@ -75,6 +74,34 @@ Future isAllowedToDownload(bool isVideo) async { return false; } +Future handleDownloadStatusUpdate(TaskStatusUpdate update) async { + bool failed = false; + int messageId = int.parse(update.task.taskId.replaceAll("download_", "")); + + if (update.status == TaskStatus.failed || + update.status == TaskStatus.canceled) { + Log.error("Download failed: ${update.status}"); + failed = true; + } else if (update.status == TaskStatus.complete) { + if (update.responseStatusCode == 200) { + Log.info("Download was successfully for $messageId"); + await handleEncryptedFile(messageId); + } else { + Log.error( + "Got invalid response status code: ${update.responseStatusCode}"); + } + } + + if (failed) { + Message? message = await twonlyDB.messagesDao + .getMessageByMessageId(messageId) + .getSingleOrNull(); + if (message != null) { + await handleMediaError(message); + } + } +} + Future startDownloadMedia(Message message, bool force, {int retryCounter = 0}) async { if (message.contentJson == null) return; @@ -90,6 +117,7 @@ Future startDownloadMedia(Message message, bool force, final content = MessageContent.fromJson(message.kind, jsonDecode(message.contentJson!)); + if (content is! MediaMessageContent) return; if (content.downloadToken == null) return; @@ -135,60 +163,42 @@ Future startDownloadMedia(Message message, bool force, String apiUrl = "http${apiService.apiSecure}://${apiService.apiHost}/api/download/$downloadToken"; - var httpClient = http.Client(); - var request = http.Request('GET', Uri.parse(apiUrl)); - var response = httpClient.send(request); + try { + final task = DownloadTask( + url: apiUrl, + taskId: "download_${media.messageId}", + directory: "media/received/", + baseDirectory: BaseDirectory.applicationSupport, + filename: "${media.messageId}.encrypted", + priority: 0, + retries: 10, + ); - List> chunks = []; - int downloaded = 0; + Log.info( + "Got media file. Starting download: ${downloadToken.substring(0, 10)}"); - response.asStream().listen((http.StreamedResponse r) { - r.stream.listen((List chunk) { - // Display percentage of completion - Log.info( - 'downloadPercentage: ${downloaded / (r.contentLength ?? 0) * 100}'); + final result = await FileDownloader().enqueue(task); - chunks.add(chunk); - downloaded += chunk.length; - }, onDone: () async { - if (r.statusCode != 200) { - Log.error("Download error: $r"); - if (r.statusCode == 418) { - Log.error("Got custom error code: ${chunks.toList()}"); - handleMediaError(message); - } - return; - } - - // Display percentage of completion - Log.info( - 'downloadPercentage: ${downloaded / (r.contentLength ?? 0) * 100}'); - - // Save the file - final Uint8List bytes = Uint8List(r.contentLength ?? 0); - int offset = 0; - for (List chunk in chunks) { - bytes.setRange(offset, offset + chunk.length, chunk); - offset += chunk.length; - } - await writeMediaFile(message.messageId, "encrypted", bytes); - handleEncryptedFile(message, - encryptedBytesTmp: bytes, retryCounter: retryCounter); - return; - }); - }); + return result; + } catch (e) { + Log.error("Exception during upload: $e"); + } } -Future handleEncryptedFile( - Message msg, { - Uint8List? encryptedBytesTmp, - int retryCounter = 0, -}) async { - Uint8List? encryptedBytes = - encryptedBytesTmp ?? await readMediaFile(msg.messageId, "encrypted"); +Future handleEncryptedFile(int messageId) async { + Message? msg = await twonlyDB.messagesDao + .getMessageByMessageId(messageId) + .getSingleOrNull(); + if (msg == null) { + Log.error("Not message for downloaded file found: $messageId"); + return; + } + + Uint8List? encryptedBytes = await readMediaFile(msg.messageId, "encrypted"); if (encryptedBytes == null) { Log.error("encrypted bytes are not found for ${msg.messageId}"); + return; } MediaMessageContent content = @@ -198,7 +208,7 @@ Future handleEncryptedFile( SecretKeyData secretKeyData = SecretKeyData(content.encryptionKey!); SecretBox secretBox = SecretBox( - encryptedBytes!, + encryptedBytes, nonce: content.encryptionNonce!, mac: Mac(content.encryptionMac!), ); @@ -216,15 +226,9 @@ Future handleEncryptedFile( await writeMediaFile(msg.messageId, "png", imageBytes); } catch (e) { - if (retryCounter >= 1) { - Log.error( - "could not decrypt the media file in the second try. reporting error to user: $e"); - handleMediaError(msg); - return; - } - Log.error("could not decrypt the media file trying again: $e"); - startDownloadMedia(msg, true, retryCounter: retryCounter + 1); - // try downloading again.... + Log.error( + "could not decrypt the media file in the second try. reporting error to user: $e"); + handleMediaError(msg); return; } @@ -252,6 +256,7 @@ Future getVideoPath(int mediaId) async { Future readMediaFile(int mediaId, String type) async { String basePath = await getMediaFilePath(mediaId, "received"); File file = File("$basePath.$type"); + Log.info("Reading: ${file}"); if (!await file.exists()) { return null; } @@ -365,3 +370,6 @@ Future purgeMediaFiles(Directory directory) async { } } } + +// /data/user/0/eu.twonly.testing/files/media/received/27.encrypted +// /data/user/0/eu.twonly.testing/app_flutter/data/user/0/eu.twonly.testing/files/media/received/27.encrypted diff --git a/lib/src/services/api/media_send.dart b/lib/src/services/api/media_send.dart index a94b057..00149fa 100644 --- a/lib/src/services/api/media_send.dart +++ b/lib/src/services/api/media_send.dart @@ -52,73 +52,18 @@ Future isAllowedToSend() async { return null; } -Future initMediaUploader() async { +Future initFileDownloader() async { FileDownloader().updates.listen((update) async { switch (update) { case TaskStatusUpdate(): - bool failed = false; - int mediaUploadId = int.parse(update.task.taskId); - MediaUpload? media = await twonlyDB.mediaUploadsDao - .getMediaUploadById(mediaUploadId) - .getSingleOrNull(); - if (media == null) { - Log.error( - "Got an upload task but no upload media in the media upload database", - ); - return; + if (update.task.taskId.contains("upload_")) { + await handleUploadStatusUpdate(update); } - - if (update.status == TaskStatus.failed || - update.status == TaskStatus.canceled) { - Log.error("Upload failed: ${update.status}"); - failed = true; - } else if (update.status == TaskStatus.complete) { - if (update.responseStatusCode == 200) { - Log.info("Upload of $mediaUploadId success!"); - - await twonlyDB.mediaUploadsDao.updateMediaUpload( - mediaUploadId, - MediaUploadsCompanion( - state: Value(UploadState.receiverNotified), - ), - ); - - for (final messageId in media.messageIds!) { - await twonlyDB.messagesDao.updateMessageByMessageId( - messageId, - MessagesCompanion( - acknowledgeByServer: Value(true), - errorWhileSending: Value(false), - ), - ); - } - return; - } else if (update.responseStatusCode != null) { - if (update.responseStatusCode! >= 400 && - update.responseStatusCode! < 500) { - failed = true; - } - Log.error( - "Got error while uploading: ${update.responseStatusCode}", - ); - } + if (update.task.taskId.contains("download_")) { + await handleDownloadStatusUpdate(update); } - - if (failed) { - for (final messageId in media.messageIds!) { - await twonlyDB.messagesDao.updateMessageByMessageId( - messageId, - MessagesCompanion( - acknowledgeByServer: Value(true), - errorWhileSending: Value(true), - ), - ); - } - } - - print('Status update for ${update.task} with status ${update.status}'); case TaskProgressUpdate(): - print( + Log.info( 'Progress update for ${update.task} with progress ${update.progress}'); } }); @@ -132,8 +77,8 @@ Future initMediaUploader() async { if (kDebugMode) { FileDownloader().configureNotification( running: TaskNotification( - 'Uploading', - 'Uploading your {filename} ({progress}).', + 'Uploading/Downloading', + '{filename} ({progress}).', ), complete: null, progressBar: true, @@ -325,8 +270,7 @@ Future encryptMediaFiles( state.encryptionMac = secretBox.mac.bytes; - final algorithm = Sha256(); - state.sha2Hash = (await algorithm.hash(secretBox.cipherText)).bytes; + state.sha2Hash = (await Sha256().hash(secretBox.cipherText)).bytes; final encryptedBytes = Uint8List.fromList(secretBox.cipherText); await writeSendMediaFile( @@ -444,6 +388,72 @@ Future handleNextMediaUploadSteps(int mediaUploadId) async { /// /// /// + +Future handleUploadStatusUpdate(TaskStatusUpdate update) async { + bool failed = false; + int mediaUploadId = int.parse(update.task.taskId.replaceAll("upload_", "")); + + MediaUpload? media = await twonlyDB.mediaUploadsDao + .getMediaUploadById(mediaUploadId) + .getSingleOrNull(); + if (media == null) { + Log.error( + "Got an upload task but no upload media in the media upload database", + ); + return; + } + + if (update.status == TaskStatus.failed || + update.status == TaskStatus.canceled) { + Log.error("Upload failed: ${update.status}"); + failed = true; + } else if (update.status == TaskStatus.complete) { + if (update.responseStatusCode == 200) { + Log.info("Upload of $mediaUploadId success!"); + + await twonlyDB.mediaUploadsDao.updateMediaUpload( + mediaUploadId, + MediaUploadsCompanion( + state: Value(UploadState.receiverNotified), + ), + ); + + for (final messageId in media.messageIds!) { + await twonlyDB.messagesDao.updateMessageByMessageId( + messageId, + MessagesCompanion( + acknowledgeByServer: Value(true), + errorWhileSending: Value(false), + ), + ); + } + return; + } else if (update.responseStatusCode != null) { + if (update.responseStatusCode! >= 400 && + update.responseStatusCode! < 500) { + failed = true; + } + Log.error( + "Got error while uploading: ${update.responseStatusCode}", + ); + } + } + + if (failed) { + for (final messageId in media.messageIds!) { + await twonlyDB.messagesDao.updateMessageByMessageId( + messageId, + MessagesCompanion( + acknowledgeByServer: Value(true), + errorWhileSending: Value(true), + ), + ); + } + } + Log.info( + 'Status update for ${update.task.taskId} with status ${update.status}'); +} + Future handleUploadError(MediaUpload mediaUpload) async { // if the messageIds are already there notify the user about this error... if (mediaUpload.messageIds != null) { @@ -496,6 +506,20 @@ Future handleMediaUpload(MediaUpload media) async { if (message == null) continue; + Contact? contact = await twonlyDB.contactsDao + .getContactByUserId(message.contactId) + .getSingleOrNull(); + + if (contact == null || contact.deleted) { + Log.warn( + "Contact deleted ${message.contactId} or not found in database."); + await twonlyDB.messagesDao.updateMessageByMessageId( + message.messageId, + MessagesCompanion(errorWhileSending: Value(true)), + ); + continue; + } + await twonlyDB.contactsDao.incFlameCounter( message.contactId, false, @@ -552,7 +576,7 @@ Future handleMediaUpload(MediaUpload media) async { try { final task = UploadTask.fromFile( - taskId: "${media.mediaUploadId}", + taskId: "upload_${media.mediaUploadId}", displayName: (media.metadata?.isVideo ?? false) ? "image" : "video", file: uploadRequestFile, url: apiUrl, diff --git a/lib/src/services/api/messages.dart b/lib/src/services/api/messages.dart index c42a31e..08131c6 100644 --- a/lib/src/services/api/messages.dart +++ b/lib/src/services/api/messages.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:drift/drift.dart'; @@ -13,6 +14,43 @@ import 'package:twonly/src/services/notification.service.dart'; import 'package:twonly/src/utils/log.dart'; import 'package:twonly/src/utils/storage.dart'; +class DirtyResendingItem { + DirtyResendingItem({required this.gotLastAck}); + DateTime gotLastAck; + Timer? timer; +} + +class DirtyResending { + static final Map _gotLastAck = {}; + + static Future gotAckFromUser(int contactID) async { + _gotLastAck[contactID]?.timer?.cancel(); + + _gotLastAck[contactID] = DirtyResendingItem(gotLastAck: DateTime.now()); + _gotLastAck[contactID]?.timer = Timer(Duration(seconds: 10), () async { + _gotLastAck.remove(contactID); + _handleNonACKMessagesForUser(contactID); + }); + } + + static Future _handleNonACKMessagesForUser(int contactID) async { + final List toResendMessages = + await twonlyDB.messagesDao.getAllNonACKMessagesFromUser(); + + for (final Message message in toResendMessages) { + Log.info("Got newer ACKs from user ${message.messageId}"); + await twonlyDB.messagesDao.updateMessageByMessageId( + message.messageId, + MessagesCompanion( + errorWhileSending: Value(true), + ), + ); + } + } +} + +Future handleOlderNonAckMessages() async {} + Future tryTransmitMessages() async { final retransIds = await twonlyDB.messageRetransmissionDao.getRetransmitAbleMessages(); @@ -36,6 +74,20 @@ Future sendRetransmitMessage(int retransId) async { return; } + Contact? contact = await twonlyDB.contactsDao + .getContactByUserId(retrans.contactId) + .getSingleOrNull(); + if (contact == null || contact.deleted) { + Log.warn("Contact deleted $retransId or not found in database."); + if (retrans.messageId != null) { + await twonlyDB.messagesDao.updateMessageByMessageId( + retrans.messageId!, + MessagesCompanion(errorWhileSending: Value(true)), + ); + } + return; + } + Uint8List? encryptedBytes = await signalEncryptMessage( retrans.contactId, retrans.plaintextContent, @@ -155,7 +207,9 @@ Future sendTextMessage( } Future notifyContactAboutOpeningMessage( - int fromUserId, List messageOtherIds) async { + int fromUserId, + List messageOtherIds, +) async { for (final messageOtherId in messageOtherIds) { await encryptAndSendMessageAsync( null, diff --git a/lib/src/services/api/server_messages.dart b/lib/src/services/api/server_messages.dart index 875ebd6..f11482a 100644 --- a/lib/src/services/api/server_messages.dart +++ b/lib/src/services/api/server_messages.dart @@ -1,4 +1,5 @@ import 'dart:convert'; +import 'package:cryptography_plus/cryptography_plus.dart'; import 'package:drift/drift.dart'; import 'package:fixnum/fixnum.dart'; import 'package:libsignal_protocol_dart/libsignal_protocol_dart.dart'; @@ -13,6 +14,7 @@ import 'package:twonly/src/model/protobuf/api/websocket/client_to_server.pbserve import 'package:twonly/src/model/protobuf/api/websocket/error.pb.dart'; import 'package:twonly/src/model/protobuf/api/websocket/server_to_client.pb.dart' as server; +import 'package:twonly/src/services/api/media_send.dart'; import 'package:twonly/src/services/api/messages.dart'; import 'package:twonly/src/services/api/utils.dart'; import 'package:twonly/src/services/api/media_received.dart'; @@ -34,6 +36,9 @@ Future handleServerMessage(server.ServerToClient msg) async { } else if (msg.v0.hasNewMessage()) { Uint8List body = Uint8List.fromList(msg.v0.newMessage.body); int fromUserId = msg.v0.newMessage.fromUserId.toInt(); + var hash = uint8ListToHex(Uint8List.fromList( + (await Sha256().hash(msg.v0.newMessage.body)).bytes)); + Log.info("Got new message from server: ${hash.substring(0, 10)}"); response = await handleNewMessage(fromUserId, body); } else { Log.error("Got a new message from the server: $msg"); @@ -59,6 +64,8 @@ Future handleNewMessage(int fromUserId, Uint8List body) async { return client.Response()..ok = ok; } + Log.info("Got: ${message.kind}"); + switch (message.kind) { case MessageKind.contactRequest: return handleContactRequest(fromUserId, message); @@ -151,6 +158,10 @@ Future handleNewMessage(int fromUserId, Uint8List body) async { message.messageId!, update, ); + + // search for older messages, that where not yet ack by the other party + DirtyResending.gotAckFromUser(fromUserId); + break; case MessageKind.pushKey: @@ -186,6 +197,8 @@ Future handleNewMessage(int fromUserId, Uint8List body) async { // when a message is received doubled ignore it... if ((await twonlyDB.messagesDao .containsOtherMessageId(fromUserId, message.messageId!))) { + Log.error( + "Got a duplicated message from other user: ${message.messageId!}"); var ok = client.Response_Ok()..none = true; return client.Response()..ok = ok; } @@ -215,11 +228,11 @@ Future handleNewMessage(int fromUserId, Uint8List body) async { fromUserId, responseToMessageId, MessagesCompanion( - errorWhileSending: Value(false), - openedAt: Value( - DateTime.now(), - ) // when a user reacted to the media file, it should be marked as opened - ), + errorWhileSending: Value(false), + openedAt: Value( + DateTime.now(), + ), // when a user reacted to the media file, it should be marked as opened + ), ); } @@ -247,6 +260,7 @@ Future handleNewMessage(int fromUserId, Uint8List body) async { if (messageId == null) { return client.Response()..error = ErrorCode.InternalError; } + if (message.kind == MessageKind.media) { twonlyDB.contactsDao.incFlameCounter( fromUserId, diff --git a/lib/src/services/signal/encryption.signal.dart b/lib/src/services/signal/encryption.signal.dart index 8eb9c9a..dada972 100644 --- a/lib/src/services/signal/encryption.signal.dart +++ b/lib/src/services/signal/encryption.signal.dart @@ -26,9 +26,7 @@ Future signalEncryptMessage( SignalContactPreKey? preKey = await getPreKeyByContactId(target); SignalContactSignedPreKey? signedPreKey = - await getSignedPreKeyByContactId( - target, - ); + await getSignedPreKeyByContactId(target); if (signedPreKey != null) { SessionBuilder sessionBuilder = SessionBuilder.fromSignalStore( @@ -127,10 +125,6 @@ Future signalDecryptMessage(int source, Uint8List msg) async { ), ), ); - } on InvalidKeyIdException catch (_) { - return null; // got the same message again - } on DuplicateMessageException catch (_) { - return null; // to the same message again } catch (e) { Log.error(e.toString()); return null; diff --git a/lib/src/utils/log.dart b/lib/src/utils/log.dart index c68beba..e385382 100644 --- a/lib/src/utils/log.dart +++ b/lib/src/utils/log.dart @@ -1,6 +1,7 @@ import 'dart:io'; import 'package:flutter/foundation.dart'; import 'package:logging/logging.dart'; +import 'package:mutex/mutex.dart'; import 'package:path_provider/path_provider.dart'; void initLogger() { @@ -28,6 +29,8 @@ class Log { } } +Mutex writeToLogGuard = Mutex(); + Future _writeLogToFile(LogRecord record) async { final directory = await getApplicationSupportDirectory(); final logFile = File('${directory.path}/app.log'); @@ -36,8 +39,10 @@ Future _writeLogToFile(LogRecord record) async { final logMessage = '${DateTime.now().toString().split(".")[0]} ${record.level.name} [twonly] ${record.loggerName} > ${record.message}\n'; - // Append the log message to the file - await logFile.writeAsString(logMessage, mode: FileMode.append); + writeToLogGuard.protect(() async { + // Append the log message to the file + await logFile.writeAsString(logMessage, mode: FileMode.append); + }); } String _getCallerSourceCodeFilename() {