maybe fixing the send error

This commit is contained in:
otsmr 2025-05-31 19:38:12 +02:00
parent 33ab69b1b7
commit a9f94e46a3
4 changed files with 64 additions and 7 deletions

View file

@ -17,7 +17,7 @@ class MediaUploadsDao extends DatabaseAccessor<TwonlyDatabase>
.get(); .get();
} }
Future updateMediaUpload( Future<int> updateMediaUpload(
int mediaUploadId, MediaUploadsCompanion updatedValues) { int mediaUploadId, MediaUploadsCompanion updatedValues) {
return (update(mediaUploads) return (update(mediaUploads)
..where((c) => c.mediaUploadId.equals(mediaUploadId))) ..where((c) => c.mediaUploadId.equals(mediaUploadId)))

View file

@ -88,15 +88,15 @@ class MessagesDao extends DatabaseAccessor<TwonlyDatabase>
.watch(); .watch();
} }
Future<List<Message>> getAllMessagesPendingUploadOlderThanAMinute() { Future<List<Message>> getAllMessagesPendingUpload() {
return (select(messages) return (select(messages)
..where( ..where(
(t) => (t) =>
t.acknowledgeByServer.equals(false) & t.acknowledgeByServer.equals(false) &
t.messageOtherId.isNull() & t.messageOtherId.isNull() &
t.mediaUploadId.isNotNull() &
t.downloadState.equals(DownloadState.pending.index) &
t.errorWhileSending.equals(false) & t.errorWhileSending.equals(false) &
t.sendAt.isSmallerThanValue(
DateTime.now().subtract(Duration(minutes: 5))) &
t.kind.equals(MessageKind.media.name), t.kind.equals(MessageKind.media.name),
)) ))
.get(); .get();

View file

@ -317,6 +317,7 @@ Future<void> purgeMediaFiles(Directory directory) async {
} }
} }
if (canBeDeleted) { if (canBeDeleted) {
Log.info("purged media file ${file.path} ");
file.deleteSync(); file.deleteSync();
} }
} else { } else {

View file

@ -63,11 +63,55 @@ Future<ErrorCode?> isAllowedToSend() async {
/// Create a new entry in the database /// Create a new entry in the database
Future<bool> checkForFailedUploads() async {
final messages = await twonlyDB.messagesDao.getAllMessagesPendingUpload();
List<int> mediaUploadIds = [];
for (Message message in messages) {
if (mediaUploadIds.contains(message.mediaUploadId)) {
continue;
}
int affectedRows = await twonlyDB.mediaUploadsDao.updateMediaUpload(
message.mediaUploadId!,
MediaUploadsCompanion(
uploadTokens: Value(null), // reupload them
state: Value(UploadState.pending),
encryptionData: Value(
null, // start from scratch e.q. encrypt the files again if already happen
),
),
);
if (affectedRows == 0) {
Log.error(
"The media from message ${message.messageId} already deleted.",
);
await twonlyDB.messagesDao.updateMessageByMessageId(
message.messageId,
MessagesCompanion(
errorWhileSending: Value(true),
),
);
} else {
mediaUploadIds.add(message.mediaUploadId!);
}
}
Log.error(
"Got ${messages.length} messages (${mediaUploadIds.length} media upload files) that are not correctly uploaded. Trying from scratch again.",
);
return mediaUploadIds.isNotEmpty; // return true if there are affected
}
final lockingHandleMediaFile = Mutex(); final lockingHandleMediaFile = Mutex();
Future retryMediaUpload({int maxRetries = 3}) async { Future retryMediaUpload({int maxRetries = 3}) async {
await lockingHandleMediaFile.protect(() async { if (maxRetries == 0) {
Log.error("retried media upload 3 times. abort retrying");
return;
}
bool retry = await lockingHandleMediaFile.protect<bool>(() async {
final mediaFiles = await twonlyDB.mediaUploadsDao.getMediaUploadsForRetry(); final mediaFiles = await twonlyDB.mediaUploadsDao.getMediaUploadsForRetry();
if (mediaFiles.isEmpty) return; if (mediaFiles.isEmpty) {
return checkForFailedUploads();
}
Log.info("re uploading ${mediaFiles.length} media files.");
for (final mediaFile in mediaFiles) { for (final mediaFile in mediaFiles) {
if (mediaFile.messageIds == null || mediaFile.metadata == null) { if (mediaFile.messageIds == null || mediaFile.metadata == null) {
// the media upload was canceled, // the media upload was canceled,
@ -89,7 +133,11 @@ Future retryMediaUpload({int maxRetries = 3}) async {
await handlePreProcessingState(mediaFile); await handlePreProcessingState(mediaFile);
} }
} }
return false;
}); });
if (retry) {
await retryMediaUpload(maxRetries: maxRetries - 1);
}
} }
Future<int?> initMediaUpload() async { Future<int?> initMediaUpload() async {
@ -151,12 +199,17 @@ Future handlePreProcessingState(MediaUpload media) async {
videoHandler, videoHandler,
); );
} catch (e) { } catch (e) {
Log.error("${media.mediaUploadId} got error in pre processing: $e");
await handleUploadError(media); await handleUploadError(media);
} }
} }
Future encryptAndPreUploadMediaFiles( Future encryptAndPreUploadMediaFiles(
int mediaUploadId, Future imageHandler, Future<bool>? videoHandler) async { int mediaUploadId,
Future imageHandler,
Future<bool>? videoHandler,
) async {
Log.info("$mediaUploadId encrypting files");
Uint8List dataToEncrypt = await imageHandler; Uint8List dataToEncrypt = await imageHandler;
/// if there is a video wait until it is finished with compression /// if there is a video wait until it is finished with compression
@ -268,6 +321,7 @@ Future handleNextMediaUploadSteps(int mediaUploadId) async {
if (mediaUpload == null) return false; if (mediaUpload == null) return false;
if (mediaUpload.state == UploadState.receiverNotified) { if (mediaUpload.state == UploadState.receiverNotified) {
/// Upload done and all users are notified :) /// Upload done and all users are notified :)
Log.info("$mediaUploadId is already done");
return false; return false;
} }
try { try {
@ -386,6 +440,8 @@ Future<bool> handleMediaUpload(int mediaUploadId) async {
filename: "upload", filename: "upload",
)); ));
Log.info("Starting upload from $mediaUploadId ");
try { try {
var streamedResponse = await requestMultipart.send(); var streamedResponse = await requestMultipart.send();