diff --git a/rust_dependencies/protocols/src/user_discovery/mod.rs b/rust_dependencies/protocols/src/user_discovery/mod.rs new file mode 100644 index 00000000..68bccd63 --- /dev/null +++ b/rust_dependencies/protocols/src/user_discovery/mod.rs @@ -0,0 +1,770 @@ +pub mod error; +pub mod stores; +#[cfg(test)] +pub mod tests; +pub mod traits; + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::u8; +use blahaj::{Share, Sharks}; +use prost::Message; +use serde::{Deserialize, Serialize}; +use tokio::sync::{Mutex, MutexGuard}; +use crate::user_discovery::error::{Result, UserDiscoveryError}; +use crate::user_discovery::traits::{AnnouncedUser, OtherPromotion, UserDiscoveryUtils}; +use crate::user_discovery::user_discovery_message::{UserDiscoveryAnnouncement, UserDiscoveryPromotion}; +use crate::user_discovery::user_discovery_message::user_discovery_promotion::AnnouncementShareDecrypted; +use crate::user_discovery::user_discovery_message::user_discovery_promotion::announcement_share_decrypted::SignedData; +pub use traits::UserDiscoveryStore; + +/// Type of the user id, this must be consistent with the user id defined in +/// the types.proto +pub type UserID = i64; + +include!(concat!(env!("OUT_DIR"), "/user_discovery.rs")); + +#[derive(Serialize, Deserialize, Clone, Debug)] +struct UserDiscoveryConfig { + /// The number of required shares to get the secret + threshold: u8, + /// Currently limited to <= 255 as GF 256 is used + total_number_of_shares: u8, + /// Version of announcements + announcement_version: u32, + /// Version of promotions + promotion_version: u32, + /// This is a random public_id associated with a single announcement. + public_id: i64, + /// Verification shares + verification_shares: Vec>, + // The users' id: + user_id: UserID, + // If others user should promote the promotion to other users + share_promotion: bool, +} + +/// +/// The main struct to access the user discovery functionality. +/// +/// As generic values it requires a UserDiscoveryStore and a UserDiscoveryUtils. +/// +pub struct UserDiscovery +where + Store: UserDiscoveryStore, + Utils: UserDiscoveryUtils, +{ + store: Store, + utils: Utils, + config_lock: Arc>, +} + +impl UserDiscovery { + /// Creates a new instance of the user discovery. + pub fn new(store: Store, utils: Utils) -> Result { + Ok(Self { + store, + utils, + config_lock: Arc::default(), + }) + } + + /// Initializes or updates the user discovery. + /// + /// This function will generate new verification shares and update the config. + /// + /// # Arguments + /// + /// * `threshold` - The number of required shares to get the secret + /// * `user_id` - The owner's user id + /// * `public_key` - The owner's public key + /// + /// # Returns + /// + /// * `Ok(())` - If the user discovery was initialized or updated successfully + /// * `Err(UserDiscoveryError)` - If the user discovery was not initialized or updated successfully + /// + pub async fn initialize_or_update( + &self, + threshold: u8, + user_id: UserID, + public_key: Vec, + share_promotion: bool, + ) -> Result<()> { + tracing::info!("Protocols: initialize_or_update started, getting config from store"); + let config = match self.store.get_config().await { + Ok(config) => { + let mut config: UserDiscoveryConfig = serde_json::from_str(&config)?; + config.threshold = threshold; + config + } + Err(_) => UserDiscoveryConfig { + threshold, + user_id, + ..Default::default() + }, + }; + + let public_id = rand::random(); + + let signed_data = SignedData { + public_id, + user_id, + public_key, + }; + + tracing::info!("Protocols: signing data"); + let signature = self.utils.sign_data(&signed_data.encode_to_vec()).await?; + + debug_assert_eq!(threshold, config.threshold); + + tracing::info!("Protocols: setting up announcements"); + let verification_shares = self + .setup_announcements(&config, signed_data, signature) + .await?; + + debug_assert_eq!(verification_shares.len(), threshold as usize - 1); + + tracing::info!("Protocols: updating config in store"); + + let config_lock = self.config_lock.lock().await; + let mut final_config = match self.store.get_config().await { + Ok(c) => serde_json::from_str(&c)?, + Err(_) => UserDiscoveryConfig { + threshold, + user_id, + ..Default::default() + }, + }; + + final_config.public_id = public_id; + final_config.announcement_version += 1; + final_config.verification_shares = verification_shares; + final_config.share_promotion = share_promotion; + final_config.threshold = threshold; + + self.update_config(final_config, config_lock).await?; + + tracing::info!("Protocols: initialize_or_update finished"); + Ok(()) + } + + /// + /// Returns the current version of the owner's user discovery state. + /// + /// The version is incremented every time the user discovery is initialized or updated. + /// It should be send contacts which participate in the user discovery, so they can + /// check if there is new data available for them using the `get_new_messages` function + /// on there side. + /// + /// + /// # Returns + /// + /// * `Ok(Vec)` - The current version of the user discovery + /// * `Err(UserDiscoveryError)` - If there where errors in the store. + /// + pub async fn get_current_version(&self) -> Result> { + let (config, _) = self.get_config().await?; + Ok(UserDiscoveryVersion { + announcement: config.announcement_version, + promotion: config.promotion_version, + } + .encode_to_vec()) + } + + /// + /// Returns all users discovery though the user discovery and there relations + /// + /// # Returns + /// + /// * `Ok(HashMap)>>)` - All connections the user has discovered + /// * `Err(UserDiscoveryError)` - If there where erros in the store. + /// + pub async fn get_all_announced_users( + &self, + ) -> Result)>>> { + self.store.get_all_announced_users().await + } + + /// + /// Returns all new user discovery messages for the provided contact and his current version. + /// + /// # Arguments + /// + /// * `contact_id` - The contact id of the user + /// * `received_version` - The version of the user discovery the contact has received so far + /// + /// # Returns + /// + /// * `Ok(Vec>)` - The new user discovery messages + /// * `Err(UserDiscoveryError)` - If there where errors in the store or if the received version is invalid. + /// + pub async fn get_new_messages( + &self, + contact_id: UserID, + received_version: &[u8], + ) -> Result>> { + let mut messages = vec![]; + let received_version = UserDiscoveryVersion::decode(received_version)?; + + let (config, _) = self.get_config().await?; + let version = Some(UserDiscoveryVersion { + announcement: config.announcement_version, + promotion: config.promotion_version, + }); + + if received_version.announcement < config.announcement_version { + tracing::info!("New announcement message available for {}", contact_id); + + let announcement_share = self.store.get_share_for_contact(contact_id).await?; + + let user_discovery_announcement = Some(UserDiscoveryAnnouncement { + public_id: config.public_id, + threshold: config.threshold as u32, + announcement_share, + verification_shares: config.verification_shares, + share_promotion: config.share_promotion, + }); + + messages.push( + UserDiscoveryMessage { + user_discovery_announcement, + version, + ..Default::default() + } + .encode_to_vec(), + ); + } + if received_version.promotion < config.promotion_version { + tracing::info!("New promotion message available for user {}", contact_id); + let promoting_messages = self + .store + .get_own_promotions_after_version(received_version.promotion) + .await?; + + let size = promoting_messages.len(); + let mut filtered: Vec> = promoting_messages + .into_iter() + .filter(|x| !x.is_empty()) // filter ignored versions + .collect(); + + if filtered.len() != size { + // ensure the receiver will get the later version in case the last message was filtered out + filtered.push( + UserDiscoveryMessage { + version: Some(UserDiscoveryVersion { + announcement: config.announcement_version, + promotion: config.promotion_version, + }), + ..Default::default() + } + .encode_to_vec(), + ); + } + messages.extend_from_slice(&filtered); + } + Ok(messages) + } + + /// + /// Checks if the provided user has new announcements and a request of update should be send. + /// + /// # Arguments + /// + /// * `contact_id` - The contact id of the user + /// * `version` - The current version of the user discovery from the contact + /// + /// # Returns + /// + /// * `Ok(bool)` - True if the user has new announcements + /// * `Err(UserDiscoveryError)` - If there where errors in the store or if the received version is invalid. + /// + pub async fn should_request_new_messages( + &self, + contact_id: UserID, + version: &[u8], + ) -> Result>> { + let received_version = UserDiscoveryVersion::decode(version)?; + let stored_version = match self.store.get_contact_version(contact_id).await? { + Some(buf) => UserDiscoveryVersion::decode(buf.as_slice())?, + None => UserDiscoveryVersion { + announcement: 0, + promotion: 0, + }, + }; + tracing::debug!( + received.announcement = %received_version.announcement, + received.promotion = %received_version.promotion, + stored.announcement = %stored_version.announcement, + stored.promotion = %stored_version.promotion, + "Comparing version numbers" + ); + if received_version.announcement > stored_version.announcement + || received_version.promotion > stored_version.promotion + { + Ok(Some(stored_version.encode_to_vec())) + } else { + Ok(None) + } + } + + #[cfg(test)] + pub(crate) async fn get_contact_version(&self, contact_id: UserID) -> Result>> { + self.store.get_contact_version(contact_id).await + } + + /// Returns the latest version for this discovery. + /// Before calling this function the application must sure that contact_id is qualified to be announced. + pub async fn handle_new_messages( + &self, + contact_id: UserID, + public_key_verified_timestamp: Option, + messages: Vec>, + ) -> Result<()> { + for message in messages { + let Ok(message) = UserDiscoveryMessage::decode(message.as_slice()) else { + tracing::error!("Could not parse the message. Continue to the next message..."); + continue; + }; + let Some(version) = message.version else { + continue; + }; + + if let Some(uda) = message.user_discovery_announcement { + if let Err(err) = self + .handle_user_discovery_announcement( + contact_id, + public_key_verified_timestamp, + uda, + ) + .await + { + tracing::warn!("Ignoring: {err}"); + } + } else if let Some(udp) = message.user_discovery_promotion { + if let Err(err) = self.handle_user_discovery_promotion(contact_id, udp).await { + tracing::warn!("Ignoring: {err}"); + } + } + + // Always update the version... + self.store + .set_contact_version(contact_id, version.encode_to_vec()) + .await?; + } + + Ok(()) + } + + pub async fn update_verification_state_for_user( + &self, + contact_id: UserID, + public_key_verified_timestamp: Option, + ) -> Result<()> { + let (mut config, config_lock) = self.get_config().await?; + config.promotion_version += 1; + + let Some(current_promotion) = self.store.get_contact_promotion(contact_id).await? else { + // User does not participate... + return Ok(()); + }; + + let old_message = UserDiscoveryMessage::decode(current_promotion.as_slice())?; + + let Some(old_promotion) = old_message.user_discovery_promotion else { + tracing::error!("A contact should only have a promotion message..."); + return Ok(()); + }; + + let message = UserDiscoveryMessage { + version: Some(UserDiscoveryVersion { + announcement: config.announcement_version, + promotion: config.promotion_version, + }), + user_discovery_promotion: Some(UserDiscoveryPromotion { + promotion_id: rand::random(), + public_id: old_promotion.public_id, + threshold: old_promotion.threshold, + announcement_share: old_promotion.announcement_share, + public_key_verified_timestamp, + }), + ..Default::default() + }; + + self.store + .push_own_promotion_and_clear_old_version( + contact_id, + config.promotion_version, + message.encode_to_vec(), + ) + .await?; + + self.update_config(config, config_lock).await?; + + Ok(()) + } + + async fn setup_announcements( + &self, + config: &UserDiscoveryConfig, + signed_data: SignedData, + signature: Vec, + ) -> Result>> { + tracing::debug!( + "Initializing user discovery with {} total shares and with a threshold of {}", + config.total_number_of_shares, + config.threshold + ); + + let encrypted_announcement = AnnouncementShareDecrypted { + signed_data: Some(signed_data), + signature, + } + .encode_to_vec(); + + let sharks = Sharks(config.threshold as u8); + let dealer = sharks.dealer(&encrypted_announcement); + + let mut shares: Vec> = dealer + .take(config.total_number_of_shares as usize) + .map(|x| Vec::from(&x)) + .collect(); + + if shares.len() != config.total_number_of_shares as usize + || shares.is_empty() + || shares.len() <= (config.threshold as usize * 2) + { + return Err(UserDiscoveryError::ShamirsSecret( + "Invalid length of shares where generated".to_string(), + )); + } + + tracing::debug!( + "Generated {} shares each with a size of: {}", + shares.len(), + shares[0].len() + ); + + let mut verification_shares = vec![]; + + let split_index = shares.len() - (config.threshold - 1) as usize; + verification_shares.extend(shares.drain(split_index..)); + + self.store.set_shares(shares).await?; + + Ok(verification_shares) + } + + async fn get_config(&self) -> Result<(UserDiscoveryConfig, MutexGuard<'_, bool>)> { + let mut lock = self.config_lock.lock().await; + *lock = true; + Ok((serde_json::from_str(&self.store.get_config().await?)?, lock)) + } + + async fn update_config( + &self, + config: UserDiscoveryConfig, + mut config_lock: MutexGuard<'_, bool>, + ) -> Result<()> { + self.store + .update_config(serde_json::to_string_pretty(&config)?) + .await?; + *config_lock = false; + Ok(()) + } + + async fn handle_user_discovery_announcement( + &self, + contact_id: UserID, + public_key_verified_timestamp: Option, + uda: UserDiscoveryAnnouncement, + ) -> Result<()> { + tracing::info!("Got a user discovery announcement from {contact_id}."); + + if uda.threshold as usize != uda.verification_shares.len() + 1 { + tracing::error!( + "UDA contains to few shares to verify: {} != {} + 1.", + uda.threshold, + uda.verification_shares.len(), + ); + return Ok(()); + } + + let sharks = Sharks(uda.threshold as u8); + + let mut all_shares = uda.verification_shares.clone(); + all_shares.push(uda.announcement_share.clone()); + let shares: Vec<_> = all_shares + .iter() + .filter_map(|x| Share::try_from(x.as_slice()).ok()) + .collect(); + + match sharks.recover(&shares) { + Ok(secret) => { + let asd = AnnouncementShareDecrypted::decode(secret.as_slice())?; + + let Some(signed_data) = asd.signed_data else { + return Err(UserDiscoveryError::MaliciousAnnouncementData( + "missing signed data".into(), + )); + }; + + if contact_id != signed_data.user_id { + return Err(UserDiscoveryError::MaliciousAnnouncementData(format!( + "contact_id ({contact_id}) != signed_data.user_id ({})", + signed_data.user_id + ))); + } + + if !self + .utils + .verify_stored_pubkey(contact_id, &signed_data.public_key) + .await? + { + return Err(UserDiscoveryError::MaliciousAnnouncementData(format!( + "public key does not match with stored one", + ))); + } + + if !self + .utils + .verify_signature( + &signed_data.encode_to_vec(), + &signed_data.public_key, + &asd.signature, + ) + .await? + { + return Err(UserDiscoveryError::MaliciousAnnouncementData(format!( + "signature invalid", + ))); + } + + // Only add this user to the promotions if the users enabled this feature + if uda.share_promotion { + let (mut config, config_lock) = self.get_config().await?; + config.promotion_version += 1; + + let message = UserDiscoveryMessage { + version: Some(UserDiscoveryVersion { + announcement: config.announcement_version, + promotion: config.promotion_version, + }), + user_discovery_promotion: Some(UserDiscoveryPromotion { + promotion_id: rand::random(), + public_id: signed_data.public_id, + threshold: uda.threshold, + announcement_share: uda.announcement_share, + public_key_verified_timestamp, + }), + ..Default::default() + }; + + self.store + .push_own_promotion_and_clear_old_version( + contact_id, + config.promotion_version, + message.encode_to_vec(), + ) + .await?; + + self.update_config(config, config_lock).await?; + } + + let announced_user = AnnouncedUser { + user_id: signed_data.user_id, + public_key: signed_data.public_key, + public_id: uda.public_id, + }; + + tracing::debug!( + "NEW PROMOTION 3: {} knows {}", + contact_id, + announced_user.user_id + ); + + // User is known, so add him to thr users relations + self.store + .push_new_user_relation( + contact_id, + announced_user.clone(), + public_key_verified_timestamp, + ) + .await?; + + // As we no now the public_id from the user, all promotions up to this point are also known, so add these to the relations database as well + let promotions = self + .store + .get_other_promotions_by_public_id(uda.public_id) + .await?; + + for promotion in promotions { + self.store + .push_new_user_relation( + promotion.from_contact_id, + announced_user, + promotion.public_key_verified_timestamp, + ) + .await?; + return Ok(()); + } + + Ok(()) + } + Err(err) => Err(UserDiscoveryError::ShamirsSecret(err.to_string())), + } + } + + async fn handle_user_discovery_promotion( + &self, + from_contact_id: UserID, + udp: UserDiscoveryPromotion, + ) -> Result<()> { + tracing::debug!("Received a new UDP with public_id = {}.", &udp.public_id); + + if udp.announcement_share.is_empty() { + tracing::info!("Got empty announcement share. Ignoring it.."); + return Ok(()); + } + + self.store + .store_other_promotion(OtherPromotion { + from_contact_id, + promotion_id: udp.promotion_id, + threshold: udp.threshold as u8, + public_id: udp.public_id, + announcement_share: udp.announcement_share, + public_key_verified_timestamp: udp.public_key_verified_timestamp, + }) + .await?; + + if let Some(contact) = self + .store + .get_announced_user_by_public_id(udp.public_id) + .await? + { + tracing::debug!( + "NEW PROMOTION 2: {} knows {}", + from_contact_id, + contact.user_id + ); + // The user is already known, just propagate the relation ship + self.store + .push_new_user_relation(from_contact_id, contact, udp.public_key_verified_timestamp) + .await?; + return Ok(()); + } + + let promotions = self + .store + .get_other_promotions_by_public_id(udp.public_id) + .await?; + + // Deduplicate shares by their raw bytes to prevent invalid Shamir's Secret Sharing recoveries. + // Multiple identical shares (e.g. due to contact resending promotions, or DB duplicate writes) + // will cause `recover` to interpolate incorrectly and return garbage bytes. + let mut unique_shares_set = HashSet::new(); + let mut unique_promotions = Vec::new(); + for p in promotions { + if unique_shares_set.insert(p.announcement_share.clone()) { + unique_promotions.push(p); + } + } + + if unique_promotions.len() < udp.threshold as usize { + tracing::debug!( + "Not enough unique shares ({} < {}) to decrypt announcement. Waiting for next share.", + unique_promotions.len(), + udp.threshold + ); + return Ok(()); + } + + tracing::debug!("Enough shares decrypting announcement."); + + let shares: Vec<_> = unique_promotions + .iter() + .map(|x| x.announcement_share.to_owned()) + .filter_map(|x| Share::try_from(x.as_slice()).ok()) + .collect(); + + match Sharks(udp.threshold as u8).recover(&shares) { + Ok(secret) => { + tracing::debug!("Could decrypt announcement."); + let asd = AnnouncementShareDecrypted::decode(secret.as_slice())?; + if let Some(signed_data) = asd.signed_data { + if udp.public_id != signed_data.public_id { + tracing::error!( + "Mismatch of the announced public id and the signed public id " + ); + return Ok(()); + } + + if !self + .utils + .verify_signature( + &signed_data.encode_to_vec(), + &signed_data.public_key, + &asd.signature, + ) + .await? + { + return Err(UserDiscoveryError::MaliciousAnnouncementData(format!( + "signature is invalid", + ))); + } + + tracing::debug!("Announcement valid."); + + let announced_user = AnnouncedUser { + user_id: signed_data.user_id, + public_key: signed_data.public_key, + public_id: udp.public_id, + }; + + let (config, _) = self.get_config().await?; + + let user_id = config.user_id; + for promotion in unique_promotions { + // Do not store the announcement of the users itself. + // Or in case the promotion promotes myself + if promotion.from_contact_id == announced_user.user_id + || announced_user.user_id == user_id + { + continue; + } + tracing::debug!( + "NEW PROMOTION: {:x} knows {:x}", + promotion.from_contact_id, + announced_user.user_id + ); + self.store + .push_new_user_relation( + promotion.from_contact_id, + announced_user.clone(), + promotion.public_key_verified_timestamp, + ) + .await?; + } + } + Ok(()) + } + Err(err) => Err(UserDiscoveryError::ShamirsSecret(err.to_string())), + } + } +} + +impl Default for UserDiscoveryConfig { + fn default() -> Self { + Self { + threshold: 2, + total_number_of_shares: u8::MAX, + announcement_version: 0, + promotion_version: 0, + verification_shares: vec![], + public_id: 0, + share_promotion: true, + user_id: 0, + } + } +}