fixes issue with messsages not received
Some checks are pending
Flutter analyze & test / flutter_analyze_and_test (push) Waiting to run

This commit is contained in:
otsmr 2026-05-22 12:41:08 +02:00
parent a50c2ba7d7
commit 3499a08155
10 changed files with 101 additions and 61 deletions

3
.gitignore vendored
View file

@ -15,6 +15,9 @@
*.sqlite-wal *.sqlite-wal
migrate_working_dir/ migrate_working_dir/
fastlane/report.xml
fastlane/README.md
# IntelliJ related # IntelliJ related
*.iml *.iml
*.ipr *.ipr

View file

@ -1,6 +1,6 @@
# Changelog # Changelog
## 0.2.18 ## 0.2.20
- New: Adds an "Ask a Friend" button to new contact suggestions. - New: Adds an "Ask a Friend" button to new contact suggestions.
- New: Adds security profiles. - New: Adds security profiles.

View file

@ -28,7 +28,6 @@ Future<bool> handleNewContactRequest(int fromUserId) async {
await handleContactAccept(fromUserId); await handleContactAccept(fromUserId);
} }
// contact was already accepted, so just accept the request in the background.
await sendCipherText( await sendCipherText(
contact.userId, contact.userId,
EncryptedContent( EncryptedContent(
@ -36,6 +35,7 @@ Future<bool> handleNewContactRequest(int fromUserId) async {
type: EncryptedContent_ContactRequest_Type.ACCEPT, type: EncryptedContent_ContactRequest_Type.ACCEPT,
), ),
), ),
blocking: false,
); );
return true; return true;
} }
@ -238,6 +238,7 @@ Future<int?> checkForProfileUpdate(
type: EncryptedContent_ContactUpdate_Type.REQUEST, type: EncryptedContent_ContactUpdate_Type.REQUEST,
), ),
), ),
blocking: false,
); );
} }
} }

View file

@ -17,9 +17,7 @@ Future<void> handleGroupCreate(
EncryptedContent_GroupCreate newGroup, EncryptedContent_GroupCreate newGroup,
String receiptId, String receiptId,
) async { ) async {
final user = await twonlyDB.contactsDao final user = await twonlyDB.contactsDao.getContactByUserId(fromUserId).getSingleOrNull();
.getContactByUserId(fromUserId)
.getSingleOrNull();
if (user == null) { if (user == null) {
// Only contacts can invite other contacts, so this can (via the UI) not happen. // Only contacts can invite other contacts, so this can (via the UI) not happen.
Log.error( Log.error(
@ -229,6 +227,7 @@ Future<void> handleResendGroupPublicKey(
groupPublicKey: keyPair.getPublicKey().serialize(), groupPublicKey: keyPair.getPublicKey().serialize(),
), ),
), ),
blocking: false,
); );
} }

View file

@ -17,6 +17,7 @@ Future<void> checkForUserDiscoveryChanges(
List<int> receivedVersion, List<int> receivedVersion,
String receiptId, String receiptId,
) async { ) async {
Log.info('[$receiptId] Checking for a new user discovery version.');
final currentVersion = await UserDiscoveryService.shouldRequestNewMessages( final currentVersion = await UserDiscoveryService.shouldRequestNewMessages(
fromUserId, fromUserId,
receivedVersion, receivedVersion,
@ -36,6 +37,7 @@ Future<void> checkForUserDiscoveryChanges(
currentVersion: currentVersion.toList(), currentVersion: currentVersion.toList(),
), ),
), ),
blocking: false,
); );
} }
} }
@ -73,6 +75,7 @@ Future<void> handleUserDiscoveryRequest(
messages: newMessages, messages: newMessages,
), ),
), ),
blocking: false,
); );
} else { } else {
Log.info('[$receiptId] Got update request, but there are no new updates for the user'); Log.info('[$receiptId] Got update request, but there are no new updates for the user');

View file

@ -566,5 +566,5 @@ Future<void> sendContactMyProfileData(int contactId) async {
username: userService.currentUser.username, username: userService.currentUser.username,
), ),
); );
await sendCipherText(contactId, encryptedContent); await sendCipherText(contactId, encryptedContent, blocking: false);
} }

View file

@ -86,8 +86,18 @@ Future<void> handleClient2ClientMessage(NewMessage newMessage) async {
final receiptId = message.receiptId; final receiptId = message.receiptId;
final mutex = _messageLocks.putIfAbsent(receiptId, Mutex.new); final mutex = _messageLocks.putIfAbsent(receiptId, Mutex.new);
if (mutex.isLocked) {
Log.info(
'[$receiptId] Skipping — already being processed by another handler',
);
return;
}
await mutex.protect(() async { await mutex.protect(() async {
await _handleClient2ClientMessage(newMessage, message); try {
await _handleClient2ClientMessage(newMessage, message);
} finally {
_messageLocks.remove(receiptId);
}
}); });
} }
@ -143,7 +153,7 @@ Future<void> _handleClient2ClientMessage(
Log.info( Log.info(
'[$receiptId] Sending error message to the original sender with receiptId $newReceiptId.', '[$receiptId] Sending error message to the original sender with receiptId $newReceiptId.',
); );
await tryToSendCompleteMessage(receiptId: newReceiptId); await tryToSendCompleteMessage(receiptId: newReceiptId, blocking: false);
} }
case Message_Type.CIPHERTEXT: case Message_Type.CIPHERTEXT:
@ -216,7 +226,7 @@ Future<void> _handleClient2ClientMessage(
} catch (e) { } catch (e) {
Log.warn('[$receiptId] Error inserting receipt: $e'); Log.warn('[$receiptId] Error inserting receipt: $e');
} }
await tryToSendCompleteMessage(receiptId: receiptId); await tryToSendCompleteMessage(receiptId: receiptId, blocking: false);
} }
case Message_Type.TEST_NOTIFICATION: case Message_Type.TEST_NOTIFICATION:
break; break;

View file

@ -104,7 +104,8 @@ class UserDiscoveryService {
static Future<Uint8List?> getCurrentVersion() async { static Future<Uint8List?> getCurrentVersion() async {
try { try {
return await FlutterUserDiscovery.getCurrentVersion(); return await FlutterUserDiscovery.getCurrentVersion()
.timeout(const Duration(seconds: 5));
} catch (e) { } catch (e) {
Log.error(e); Log.error(e);
return null; return null;
@ -140,7 +141,7 @@ class UserDiscoveryService {
return await FlutterUserDiscovery.shouldRequestNewMessages( return await FlutterUserDiscovery.shouldRequestNewMessages(
contactId: fromUserId, contactId: fromUserId,
version: receivedVersion, version: receivedVersion,
); ).timeout(const Duration(seconds: 5));
} catch (e) { } catch (e) {
Log.error(e); Log.error(e);
return null; return null;
@ -155,7 +156,7 @@ class UserDiscoveryService {
return await FlutterUserDiscovery.getNewMessages( return await FlutterUserDiscovery.getNewMessages(
contactId: fromUserId, contactId: fromUserId,
receivedVersion: receivedVersion, receivedVersion: receivedVersion,
); ).timeout(const Duration(seconds: 5));
} catch (e) { } catch (e) {
Log.error(e); Log.error(e);
return null; return null;
@ -175,7 +176,7 @@ class UserDiscoveryService {
messages: messages, messages: messages,
publicKeyVerifiedTimestamp: publicKeyVerifiedTimestamp:
verifications.lastOrNull?.createdAt.millisecondsSinceEpoch, verifications.lastOrNull?.createdAt.millisecondsSinceEpoch,
); ).timeout(const Duration(seconds: 5));
} catch (e) { } catch (e) {
Log.error(e); Log.error(e);
} }

View file

@ -3,7 +3,7 @@ description: "twonly, a privacy-friendly way to connect with friends through sec
publish_to: 'none' publish_to: 'none'
version: 0.2.17+126 version: 0.2.19+128
environment: environment:
sdk: ^3.11.0 sdk: ^3.11.0

View file

@ -10,7 +10,7 @@ use std::u8;
use blahaj::{Share, Sharks}; use blahaj::{Share, Sharks};
use prost::Message; use prost::Message;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, MutexGuard}; use tokio::sync::Mutex;
use crate::user_discovery::error::{Result, UserDiscoveryError}; use crate::user_discovery::error::{Result, UserDiscoveryError};
use crate::user_discovery::traits::{AnnouncedUser, OtherPromotion, UserDiscoveryUtils}; use crate::user_discovery::traits::{AnnouncedUser, OtherPromotion, UserDiscoveryUtils};
use crate::user_discovery::user_discovery_message::{UserDiscoveryAnnouncement, UserDiscoveryPromotion}; use crate::user_discovery::user_discovery_message::{UserDiscoveryAnnouncement, UserDiscoveryPromotion};
@ -56,7 +56,7 @@ where
{ {
store: Store, store: Store,
utils: Utils, utils: Utils,
config_lock: Arc<Mutex<bool>>, config_lock: Arc<Mutex<()>>,
} }
impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store, Utils> { impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store, Utils> {
@ -101,6 +101,7 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
Err(_) => UserDiscoveryConfig { Err(_) => UserDiscoveryConfig {
threshold, threshold,
user_id, user_id,
total_number_of_shares: 255,
..Default::default() ..Default::default()
}, },
}; };
@ -126,24 +127,27 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
debug_assert_eq!(verification_shares.len(), threshold as usize - 1); debug_assert_eq!(verification_shares.len(), threshold as usize - 1);
tracing::info!("Protocols: updating config in store"); tracing::info!("Protocols: updating config in store");
let config_lock = self.config_lock.lock().await;
let mut final_config = match self.store.get_config().await {
Ok(c) => serde_json::from_str(&c)?,
Err(_) => UserDiscoveryConfig {
threshold,
user_id,
..Default::default()
},
};
final_config.public_id = public_id; {
final_config.announcement_version += 1; let mut final_config = match self.store.get_config().await {
final_config.verification_shares = verification_shares; Ok(c) => serde_json::from_str(&c)?,
final_config.share_promotion = share_promotion; Err(_) => UserDiscoveryConfig {
final_config.threshold = threshold; threshold,
user_id,
..Default::default()
},
};
self.update_config(final_config, config_lock).await?; final_config.public_id = public_id;
final_config.announcement_version += 1;
final_config.verification_shares = verification_shares;
final_config.share_promotion = share_promotion;
final_config.threshold = threshold;
self.store
.update_config(serde_json::to_string_pretty(&final_config)?)
.await?;
}
tracing::info!("Protocols: initialize_or_update finished"); tracing::info!("Protocols: initialize_or_update finished");
Ok(()) Ok(())
@ -164,7 +168,7 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
/// * `Err(UserDiscoveryError)` - If there where errors in the store. /// * `Err(UserDiscoveryError)` - If there where errors in the store.
/// ///
pub async fn get_current_version(&self) -> Result<Vec<u8>> { pub async fn get_current_version(&self) -> Result<Vec<u8>> {
let (config, _) = self.get_config().await?; let config = self.get_config_snapshot().await?;
Ok(UserDiscoveryVersion { Ok(UserDiscoveryVersion {
announcement: config.announcement_version, announcement: config.announcement_version,
promotion: config.promotion_version, promotion: config.promotion_version,
@ -207,7 +211,7 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
let mut messages = vec![]; let mut messages = vec![];
let received_version = UserDiscoveryVersion::decode(received_version)?; let received_version = UserDiscoveryVersion::decode(received_version)?;
let (config, _) = self.get_config().await?; let config = self.get_config_snapshot().await?;
let version = Some(UserDiscoveryVersion { let version = Some(UserDiscoveryVersion {
announcement: config.announcement_version, announcement: config.announcement_version,
promotion: config.promotion_version, promotion: config.promotion_version,
@ -361,9 +365,6 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
contact_id: UserID, contact_id: UserID,
public_key_verified_timestamp: Option<i64>, public_key_verified_timestamp: Option<i64>,
) -> Result<()> { ) -> Result<()> {
let (mut config, config_lock) = self.get_config().await?;
config.promotion_version += 1;
let Some(current_promotion) = self.store.get_contact_promotion(contact_id).await? else { let Some(current_promotion) = self.store.get_contact_promotion(contact_id).await? else {
// User does not participate... // User does not participate...
return Ok(()); return Ok(());
@ -376,10 +377,19 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
return Ok(()); return Ok(());
}; };
// Read-modify-write the config to get the new promotion_version
let mut new_promotion_version = 0u32;
let mut announcement_version = 0u32;
self.read_modify_write_config(|config| {
config.promotion_version += 1;
new_promotion_version = config.promotion_version;
announcement_version = config.announcement_version;
}).await?;
let message = UserDiscoveryMessage { let message = UserDiscoveryMessage {
version: Some(UserDiscoveryVersion { version: Some(UserDiscoveryVersion {
announcement: config.announcement_version, announcement: announcement_version,
promotion: config.promotion_version, promotion: new_promotion_version,
}), }),
user_discovery_promotion: Some(UserDiscoveryPromotion { user_discovery_promotion: Some(UserDiscoveryPromotion {
promotion_id: rand::random(), promotion_id: rand::random(),
@ -394,13 +404,11 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
self.store self.store
.push_own_promotion_and_clear_old_version( .push_own_promotion_and_clear_old_version(
contact_id, contact_id,
config.promotion_version, new_promotion_version,
message.encode_to_vec(), message.encode_to_vec(),
) )
.await?; .await?;
self.update_config(config, config_lock).await?;
Ok(()) Ok(())
} }
@ -455,21 +463,31 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
Ok(verification_shares) Ok(verification_shares)
} }
async fn get_config(&self) -> Result<(UserDiscoveryConfig, MutexGuard<'_, bool>)> { /// Reads the config from the store without holding any lock.
let mut lock = self.config_lock.lock().await; /// Use this for read-only access to the config.
*lock = true; async fn get_config_snapshot(&self) -> Result<UserDiscoveryConfig> {
Ok((serde_json::from_str(&self.store.get_config().await?)?, lock)) Ok(serde_json::from_str(&self.store.get_config().await?)?)
} }
async fn update_config( /// Atomically reads the config, applies the mutation, and writes it back.
&self, /// The config_lock is only held during the read-modify-write cycle,
config: UserDiscoveryConfig, /// NOT across any async Dart callbacks from the caller.
mut config_lock: MutexGuard<'_, bool>, async fn read_modify_write_config<F>(&self, mutate: F) -> Result<()>
) -> Result<()> { where
F: FnOnce(&mut UserDiscoveryConfig),
{
let _lock = tokio::time::timeout(
std::time::Duration::from_secs(10),
self.config_lock.lock(),
)
.await
.ok();
let mut config: UserDiscoveryConfig =
serde_json::from_str(&self.store.get_config().await?)?;
mutate(&mut config);
self.store self.store
.update_config(serde_json::to_string_pretty(&config)?) .update_config(serde_json::to_string_pretty(&config)?)
.await?; .await?;
*config_lock = false;
Ok(()) Ok(())
} }
@ -542,13 +560,19 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
// Only add this user to the promotions if the users enabled this feature // Only add this user to the promotions if the users enabled this feature
if uda.share_promotion { if uda.share_promotion {
let (mut config, config_lock) = self.get_config().await?; // Read-modify-write the config to get the new promotion_version
config.promotion_version += 1; let mut new_promotion_version = 0u32;
let mut announcement_version = 0u32;
self.read_modify_write_config(|config| {
config.promotion_version += 1;
new_promotion_version = config.promotion_version;
announcement_version = config.announcement_version;
}).await?;
let message = UserDiscoveryMessage { let message = UserDiscoveryMessage {
version: Some(UserDiscoveryVersion { version: Some(UserDiscoveryVersion {
announcement: config.announcement_version, announcement: announcement_version,
promotion: config.promotion_version, promotion: new_promotion_version,
}), }),
user_discovery_promotion: Some(UserDiscoveryPromotion { user_discovery_promotion: Some(UserDiscoveryPromotion {
promotion_id: rand::random(), promotion_id: rand::random(),
@ -563,12 +587,10 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
self.store self.store
.push_own_promotion_and_clear_old_version( .push_own_promotion_and_clear_old_version(
contact_id, contact_id,
config.promotion_version, new_promotion_version,
message.encode_to_vec(), message.encode_to_vec(),
) )
.await?; .await?;
self.update_config(config, config_lock).await?;
} }
let announced_user = AnnouncedUser { let announced_user = AnnouncedUser {
@ -722,7 +744,7 @@ impl<Store: UserDiscoveryStore, Utils: UserDiscoveryUtils> UserDiscovery<Store,
public_id: udp.public_id, public_id: udp.public_id,
}; };
let (config, _) = self.get_config().await?; let config = self.get_config_snapshot().await?;
let user_id = config.user_id; let user_id = config.user_id;
for promotion in unique_promotions { for promotion in unique_promotions {
@ -758,7 +780,7 @@ impl Default for UserDiscoveryConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
threshold: 2, threshold: 2,
total_number_of_shares: u8::MAX, total_number_of_shares: 255,
announcement_version: 0, announcement_version: 0,
promotion_version: 0, promotion_version: 0,
verification_shares: vec![], verification_shares: vec![],
@ -768,3 +790,4 @@ impl Default for UserDiscoveryConfig {
} }
} }
} }