From 8cbdb53a6bb37aa28888b365ec115414a2519719 Mon Sep 17 00:00:00 2001 From: dragonfly1033 <=> Date: Thu, 7 Aug 2025 18:20:00 +0100 Subject: [PATCH 1/2] feat(sdk): Add creation of indexes and indexing of messages. --- Cargo.lock | 1 + crates/matrix-sdk-search/src/index.rs | 15 ++- crates/matrix-sdk/Cargo.toml | 3 + crates/matrix-sdk/src/client/builder/mod.rs | 37 +++++- crates/matrix-sdk/src/client/mod.rs | 111 ++++++++++++++++++ crates/matrix-sdk/src/event_cache/mod.rs | 7 ++ crates/matrix-sdk/src/event_cache/room/mod.rs | 80 +++++++++++++ crates/matrix-sdk/src/room/mod.rs | 24 ++++ 8 files changed, 271 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96e302c8b95..b1e2576485b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3090,6 +3090,7 @@ dependencies = [ "matrix-sdk-common", "matrix-sdk-ffi-macros", "matrix-sdk-indexeddb", + "matrix-sdk-search", "matrix-sdk-sqlite", "matrix-sdk-test", "mime", diff --git a/crates/matrix-sdk-search/src/index.rs b/crates/matrix-sdk-search/src/index.rs index 9f4d5ef2b88..25b7851397d 100644 --- a/crates/matrix-sdk-search/src/index.rs +++ b/crates/matrix-sdk-search/src/index.rs @@ -193,7 +193,7 @@ mod tests { use std::{collections::HashSet, error::Error}; use matrix_sdk_test::event_factory::EventFactory; - use ruma::{event_id, owned_event_id, room_id, user_id}; + use ruma::{event_id, room_id, user_id}; use crate::index::RoomIndex; @@ -227,10 +227,14 @@ mod tests { let mut index = RoomIndex::new_in_ram(room_id).expect("failed to make index in ram: {index:?}"); + let event_id_1 = event_id!("$event_id_1:localhost"); + let event_id_2 = event_id!("$event_id_2:localhost"); + let event_id_3 = event_id!("$event_id_3:localhost"); + index.add_event( EventFactory::new() .text_msg("This is a sentence") - .event_id(event_id!("$event_id_1:localhost")) + .event_id(event_id_1) .room(room_id) .sender(user_id!("@user_id:localhost")) .into_any_message_like_event(), @@ -239,7 +243,7 @@ mod tests { index.add_event( EventFactory::new() .text_msg("All new words") - .event_id(event_id!("$event_id_2:localhost")) + .event_id(event_id_2) .room(room_id) .sender(user_id!("@user_id:localhost")) .into_any_message_like_event(), @@ -248,7 +252,7 @@ mod tests { index.add_event( EventFactory::new() .text_msg("A similar sentence") - .event_id(event_id!("$event_id_3:localhost")) + .event_id(event_id_3) .room(room_id) .sender(user_id!("@user_id:localhost")) .into_any_message_like_event(), @@ -259,8 +263,7 @@ mod tests { let result = index.search("sentence", 10).expect("search failed with: {result:?}"); let result: HashSet<_> = result.iter().collect(); - let true_value = - [owned_event_id!("$event_id_1:localhost"), owned_event_id!("$event_id_3:localhost")]; + let true_value = [event_id_1.to_owned(), event_id_3.to_owned()]; let true_value: HashSet<_> = true_value.iter().collect(); assert_eq!(result, true_value, "search result not correct: {result:?}"); diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index b1077fac54a..50e7be774d3 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -62,6 +62,8 @@ docsrs = ["e2e-encryption", "sqlite", "indexeddb", "sso-login", "qrcode"] # Add support for inline media galleries via msgtypes unstable-msc4274 = ["ruma/unstable-msc4274", "matrix-sdk-base/unstable-msc4274"] +experimental-search = ["matrix-sdk-search"] + [dependencies] anyhow = { workspace = true, optional = true } anymap2 = "0.13.0" @@ -92,6 +94,7 @@ matrix-sdk-ffi-macros = { workspace = true, optional = true } matrix-sdk-indexeddb = { workspace = true, optional = true } matrix-sdk-sqlite = { workspace = true, optional = true } matrix-sdk-test = { workspace = true, optional = true } +matrix-sdk-search = { workspace = true, optional = true } mime.workspace = true mime2ext = "0.1.53" oauth2.workspace = true diff --git a/crates/matrix-sdk/src/client/builder/mod.rs b/crates/matrix-sdk/src/client/builder/mod.rs index a601d526bd1..b0cd1add2c9 100644 --- a/crates/matrix-sdk/src/client/builder/mod.rs +++ b/crates/matrix-sdk/src/client/builder/mod.rs @@ -15,16 +15,24 @@ mod homeserver_config; +#[cfg(feature = "experimental-search")] +use std::collections::HashMap; #[cfg(feature = "sqlite")] use std::path::Path; +#[cfg(any(feature = "experimental-search", feature = "sqlite"))] +use std::path::PathBuf; use std::{collections::BTreeSet, fmt, sync::Arc}; use homeserver_config::*; #[cfg(feature = "e2e-encryption")] use matrix_sdk_base::crypto::DecryptionSettings; use matrix_sdk_base::{store::StoreConfig, BaseClient, ThreadingSupport}; +#[cfg(feature = "experimental-search")] +use matrix_sdk_search::index::RoomIndex; #[cfg(feature = "sqlite")] use matrix_sdk_sqlite::SqliteStoreConfig; +#[cfg(feature = "experimental-search")] +use ruma::OwnedRoomId; use ruma::{ api::{error::FromHttpResponseError, MatrixVersion, SupportedVersions}, OwnedServerName, ServerName, @@ -114,6 +122,8 @@ pub struct ClientBuilder { enable_share_history_on_invite: bool, cross_process_store_locks_holder_name: String, threading_support: ThreadingSupport, + #[cfg(feature = "experimental-search")] + index_base_dir: IndexBaseDir, } impl ClientBuilder { @@ -145,6 +155,8 @@ impl ClientBuilder { cross_process_store_locks_holder_name: Self::DEFAULT_CROSS_PROCESS_STORE_LOCKS_HOLDER_NAME.to_owned(), threading_support: ThreadingSupport::Disabled, + #[cfg(feature = "experimental-search")] + index_base_dir: IndexBaseDir::Ram, } } @@ -489,6 +501,13 @@ impl ClientBuilder { self } + /// The base directory in which each room's index directory will be stored. + #[cfg(feature = "experimental-search")] + pub fn index_base_directory(mut self, path: IndexBaseDir) -> Self { + self.index_base_dir = path; + self + } + /// Create a [`Client`] with the options set on this builder. /// /// # Errors @@ -590,6 +609,10 @@ impl ClientBuilder { let event_cache = OnceCell::new(); let latest_events = OnceCell::new(); + #[cfg(feature = "experimental-search")] + let room_indexes: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + let inner = ClientInner::new( auth_ctx, server, @@ -607,6 +630,10 @@ impl ClientBuilder { #[cfg(feature = "e2e-encryption")] self.enable_share_history_on_invite, self.cross_process_store_locks_holder_name, + #[cfg(feature = "experimental-search")] + room_indexes, + #[cfg(feature = "experimental-search")] + self.index_base_dir, ) .await; @@ -717,6 +744,14 @@ async fn build_indexeddb_store_config( panic!("the IndexedDB is only available on the 'wasm32' arch") } +#[cfg(feature = "experimental-search")] +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub enum IndexBaseDir { + Directory(PathBuf), + Ram, +} + #[derive(Clone, Debug)] enum HttpConfig { #[cfg(not(target_family = "wasm"))] @@ -755,7 +790,7 @@ enum BuilderStoreConfig { #[cfg(feature = "sqlite")] Sqlite { config: SqliteStoreConfig, - cache_path: Option, + cache_path: Option, }, #[cfg(feature = "indexeddb")] IndexedDb { diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index 73d41274dd4..edd6ae95b5e 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -14,6 +14,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "experimental-search")] +use std::collections::HashMap; use std::{ collections::{btree_map, BTreeMap, BTreeSet}, fmt::{self, Debug}, @@ -38,6 +40,12 @@ use matrix_sdk_base::{ StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport, }; use matrix_sdk_common::ttl_cache::TtlCache; +#[cfg(feature = "experimental-search")] +use matrix_sdk_search::error::IndexError; +#[cfg(feature = "experimental-search")] +use matrix_sdk_search::index::RoomIndex; +#[cfg(feature = "experimental-search")] +use ruma::events::AnyMessageLikeEvent; #[cfg(feature = "e2e-encryption")] use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent}; use ruma::{ @@ -72,11 +80,15 @@ use ruma::{ RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId, }; use serde::de::DeserializeOwned; +#[cfg(feature = "experimental-search")] +use tokio::sync::MutexGuard; use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard}; use tracing::{debug, error, instrument, trace, warn, Instrument, Span}; use url::Url; use self::futures::SendRequest; +#[cfg(feature = "experimental-search")] +use crate::client::builder::IndexBaseDir; use crate::{ authentication::{ matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback, @@ -350,6 +362,14 @@ pub(crate) struct ClientInner { /// /// [`LatestEvent`]: crate::latest_event::LatestEvent latest_events: OnceCell, + + /// HashMap that links each joined room to its RoomIndex + #[cfg(feature = "experimental-search")] + room_indexes: Arc>>, + + /// Base directory that stores the directories for each RoomIndex + #[cfg(feature = "experimental-search")] + index_base_dir: IndexBaseDir, } impl ClientInner { @@ -374,6 +394,10 @@ impl ClientInner { #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings, #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool, cross_process_store_locks_holder_name: String, + #[cfg(feature = "experimental-search")] room_indexes: Arc< + Mutex>, + >, + #[cfg(feature = "experimental-search")] index_base_dir: IndexBaseDir, ) -> Arc { let caches = ClientCaches { server_info: server_info.into(), @@ -409,6 +433,10 @@ impl ClientInner { #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite, server_max_upload_size: Mutex::new(OnceCell::new()), + #[cfg(feature = "experimental-search")] + room_indexes, + #[cfg(feature = "experimental-search")] + index_base_dir, }; #[allow(clippy::let_and_return)] @@ -2713,6 +2741,10 @@ impl Client { #[cfg(feature = "e2e-encryption")] self.inner.enable_share_history_on_invite, cross_process_store_locks_holder_name, + #[cfg(feature = "experimental-search")] + self.inner.room_indexes.clone(), + #[cfg(feature = "experimental-search")] + self.inner.index_base_dir.clone(), ) .await, }; @@ -2810,6 +2842,85 @@ impl Client { &self.base_client().decryption_settings } + /// Add [`AnyMessageLikeEvent`] to [`RoomIndex`] of given [`RoomId`] + #[cfg(feature = "experimental-search")] + pub async fn index_event( + &self, + event: AnyMessageLikeEvent, + room_id: &RoomId, + ) -> Result<(), IndexError> { + let mut hash_map = self.inner.room_indexes.lock().await; + + let result = if let Some(index) = hash_map.get_mut(room_id) { + index.add_event(event) + } else { + self.add_index_impl(room_id, &mut hash_map)?; + let index = hash_map.get_mut(room_id).expect("key just added"); + index.add_event(event) + }; + + match result { + Ok(_) => {} + Err(IndexError::CannotIndexRedactedMessage) + | Err(IndexError::EmptyMessage) + | Err(IndexError::MessageTypeNotSupported) => { + debug!("failed to parse event for indexing: {result:?}") + } + Err(IndexError::TantivyError(err)) => { + error!("failed to add/commit event to index: {err:?}") + } + Err(_) => error!("unexpected error during indexing: {result:?}"), + }; + Ok(()) + } + + /// Add [`RoomIndex`] for given [`RoomId`] to room_indexes + #[cfg(feature = "experimental-search")] + pub async fn add_index(&self, room_id: &RoomId) -> Result<(), IndexError> { + let mut hash_map = self.inner.room_indexes.lock().await; + self.add_index_impl(room_id, &mut hash_map)?; + Ok(()) + } + + #[cfg(feature = "experimental-search")] + fn add_index_impl( + &self, + room_id: &RoomId, + hash_map: &mut MutexGuard<'_, HashMap>, + ) -> Result<(), IndexError> { + if !hash_map.contains_key(room_id) { + let index = match &self.inner.index_base_dir { + IndexBaseDir::Directory(path) => RoomIndex::open_or_create(path, room_id)?, + IndexBaseDir::Ram => RoomIndex::new_in_ram(room_id)?, + }; + hash_map.insert(room_id.to_owned(), index); + } + Ok(()) + } + + /// Search a [`Room`]'s index for the query and return at most + /// max_number_of_results results. + #[cfg(feature = "experimental-search")] + pub async fn search_index( + &self, + query: &str, + max_number_of_results: usize, + room_id: &RoomId, + ) -> Option> { + let hash_map = self.inner.room_indexes.lock().await; + if let Some(index) = hash_map.get(room_id) { + index + .search(query, max_number_of_results) + .inspect_err(|err| { + error!("error occurred while searching index: {err:?}"); + }) + .ok() + } else { + warn!("Tried to search in a room with no index"); + None + } + } + /// Whether the client is configured to take thread subscriptions (MSC4306 /// and MSC4308) into account. /// diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 2b948b1a481..9ad521d179c 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -45,6 +45,8 @@ use matrix_sdk_base::{ timer, }; use matrix_sdk_common::executor::{spawn, JoinHandle}; +#[cfg(feature = "experimental-search")] +use matrix_sdk_search::error::IndexError; use room::RoomEventCacheState; use ruma::{events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId}; use tokio::sync::{ @@ -116,6 +118,11 @@ pub enum EventCacheError { /// A string containing details about the error. details: String, }, + + /// An error occurred in the index. + #[cfg(feature = "experimental-search")] + #[error(transparent)] + IndexError(#[from] IndexError), } /// A result using the [`EventCacheError`]. diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index d61b7874e20..890ec9214be 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -27,12 +27,18 @@ use std::{ use events::sort_positions_descending; use eyeball::SharedObservable; use eyeball_im::VectorDiff; +#[cfg(feature = "experimental-search")] +use futures_util::StreamExt; +#[cfg(feature = "experimental-search")] +use matrix_sdk_base::deserialized_responses::{TimelineEvent, TimelineEventKind}; use matrix_sdk_base::{ deserialized_responses::AmbiguityChange, event_cache::Event, linked_chunk::Position, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; +#[cfg(feature = "experimental-search")] +use ruma::events::{AnyMessageLikeEvent, AnySyncTimelineEvent}; use ruma::{ api::Direction, events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent}, @@ -523,6 +529,49 @@ impl RoomEventCacheInner { Ok(()) } + #[cfg(feature = "experimental-search")] + fn parse_timeline_event(&self, event: &TimelineEvent) -> Option { + let maybe_try_event = match &event.kind { + TimelineEventKind::Decrypted(d) => Some(d.event.deserialize()), + TimelineEventKind::PlainText { event } => match event.deserialize() { + Ok(event_obj) => match event_obj { + AnySyncTimelineEvent::MessageLike(sync_event) => { + Some(Ok(sync_event.into_full_event(self.room_id.clone()))) + } + AnySyncTimelineEvent::State(_) => None, + }, + Err(e) => Some(Err(e)), + }, + + TimelineEventKind::UnableToDecrypt { event: _, utd_info: _ } => None, + }; + + match maybe_try_event { + Some(Ok(event)) => match event { + // TODO: Handle room redaction. + AnyMessageLikeEvent::RoomRedaction(_) | _ => Some(event), + }, + Some(Err(e)) => { + warn!("failed to index event: {e:?}"); + None + } + None => None, + } + } + + #[cfg(feature = "experimental-search")] + async fn index_timeline_event(&self, event: &TimelineEvent) -> Result<()> { + if let Some(message_event) = self.parse_timeline_event(event) { + if let Some(room) = self.weak_room.get() { + room.index_event(message_event).await.map_err(EventCacheError::from) + } else { + Err(EventCacheError::RoomNotFound { room_id: self.room_id.to_owned() }) + } + } else { + Ok(()) + } + } + /// Handle a [`Timeline`], i.e. new events received by a sync for this /// room. async fn handle_timeline( @@ -551,6 +600,37 @@ impl RoomEventCacheInner { self.pagination_batch_token_notifier.notify_one(); } + // We can also add the event to the index. + #[cfg(feature = "experimental-search")] + for diff in timeline_event_diffs.iter() { + match diff { + VectorDiff::Append { values } => { + let batch_size = 5; + let futures: Vec<_> = + values.iter().map(|event| self.index_timeline_event(event)).collect(); + let mut buffer = + futures_util::stream::iter(futures).buffer_unordered(batch_size); + + while let Some(result) = buffer.next().await { + if let Err(e) = result { + warn!("error while trying to index event: {e:?}"); + } + } + } + VectorDiff::Insert { index: _, value } => { + if let Err(e) = self.index_timeline_event(value).await { + warn!("error while trying to index event: {e:?}"); + } + } + VectorDiff::Remove { index: _ } => {} + VectorDiff::Clear => {} + _ => { + warn!("vector diff case shouldn't happen"); + continue; + } + } + } + let mut update_has_been_sent = false; // The order matters here: first send the timeline event diffs, then only the diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index d508ef26883..c70c30dc6fb 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -54,10 +54,17 @@ use matrix_sdk_common::{ executor::{spawn, JoinHandle}, timeout::timeout, }; +#[cfg(feature = "experimental-search")] +use matrix_sdk_search::error::IndexError; +#[cfg(feature = "experimental-search")] +#[cfg(doc)] +use matrix_sdk_search::index::RoomIndex; use mime::Mime; use reply::Reply; #[cfg(feature = "unstable-msc4274")] use ruma::events::room::message::GalleryItemType; +#[cfg(feature = "experimental-search")] +use ruma::events::AnyMessageLikeEvent; #[cfg(feature = "e2e-encryption")] use ruma::events::{ room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent, @@ -3671,6 +3678,23 @@ impl Room { opts.send(self, event_id).await } + /// Add an [`AnyMessageLikeEvent`] to this room's [`RoomIndex`] + #[cfg(feature = "experimental-search")] + pub(crate) async fn index_event(&self, event: AnyMessageLikeEvent) -> Result<(), IndexError> { + self.client.index_event(event, self.room_id()).await + } + + /// Search this room's [`RoomIndex`] for query and return at most + /// max_number_of_results results. + #[cfg(feature = "experimental-search")] + pub async fn search_index( + &self, + query: &str, + max_number_of_results: usize, + ) -> Option> { + self.client.search_index(query, max_number_of_results, self.room_id()).await + } + /// Subscribe to a given thread in this room. /// /// This will subscribe the user to the thread, so that they will receive From 726784815db02e79868aa378349dfd6bfd30c88c Mon Sep 17 00:00:00 2001 From: dragonfly1033 <=> Date: Sat, 9 Aug 2025 00:50:49 +0100 Subject: [PATCH 2/2] test(sdk): Add integration test for search --- crates/matrix-sdk-search/src/error.rs | 13 -- crates/matrix-sdk-search/src/index.rs | 64 +++--- crates/matrix-sdk-search/src/schema.rs | 110 +++++------ crates/matrix-sdk-search/src/writer.rs | 13 -- crates/matrix-sdk/Cargo.toml | 2 +- crates/matrix-sdk/src/client/builder/mod.rs | 32 ++- crates/matrix-sdk/src/client/mod.rs | 115 ++--------- crates/matrix-sdk/src/client/search.rs | 137 +++++++++++++ .../matrix-sdk/src/event_cache/pagination.rs | 40 ++-- crates/matrix-sdk/src/event_cache/room/mod.rs | 184 +++++++++--------- crates/matrix-sdk/src/room/mod.rs | 25 ++- .../tests/integration/room/joined.rs | 30 +++ testing/matrix-sdk-test/src/event_factory.rs | 6 +- 13 files changed, 401 insertions(+), 370 deletions(-) create mode 100644 crates/matrix-sdk/src/client/search.rs diff --git a/crates/matrix-sdk-search/src/error.rs b/crates/matrix-sdk-search/src/error.rs index b7e3ba0a13d..a07b040d1da 100644 --- a/crates/matrix-sdk-search/src/error.rs +++ b/crates/matrix-sdk-search/src/error.rs @@ -12,19 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! The event cache is an abstraction layer, sitting between the Rust SDK and a -//! final client, that acts as a global observer of all the rooms, gathering and -//! inferring some extra useful information about each room. In particular, this -//! doesn't require subscribing to a specific room to get access to this -//! information. -//! -//! It's intended to be fast, robust and easy to maintain, having learned from -//! previous endeavours at implementing middle to high level features elsewhere -//! in the SDK, notably in the UI's Timeline object. -//! -//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more -//! details about the historical reasons that led us to start writing this. - use tantivy::{ directory::error::OpenDirectoryError as TantivyOpenDirectoryError, query::QueryParserError as TantivyQueryParserError, diff --git a/crates/matrix-sdk-search/src/index.rs b/crates/matrix-sdk-search/src/index.rs index 25b7851397d..bfe5969801d 100644 --- a/crates/matrix-sdk-search/src/index.rs +++ b/crates/matrix-sdk-search/src/index.rs @@ -12,22 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! The event cache is an abstraction layer, sitting between the Rust SDK and a -//! final client, that acts as a global observer of all the rooms, gathering and -//! inferring some extra useful information about each room. In particular, this -//! doesn't require subscribing to a specific room to get access to this -//! information. -//! -//! It's intended to be fast, robust and easy to maintain, having learned from -//! previous endeavours at implementing middle to high level features elsewhere -//! in the SDK, notably in the UI's Timeline object. -//! -//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more -//! details about the historical reasons that led us to start writing this. - use std::{fmt, fs, path::Path, sync::Arc}; -use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnyMessageLikeEvent}; +use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncMessageLikeEvent}; use tantivy::{ Index, IndexReader, TantivyDocument, collector::TopDocs, @@ -44,6 +31,11 @@ use crate::{ writer::SearchIndexWriter, }; +/// A struct to represent the operations on a [`RoomIndex`] +pub(crate) enum RoomIndexOperation { + Add(TantivyDocument), +} + /// A struct that holds all data pertaining to a particular room's /// message index. pub struct RoomIndex { @@ -91,9 +83,9 @@ impl RoomIndex { RoomIndex::new_with(index, schema, room_id) } - /// Create new [`RoomIndex`] which stores the index in RAM. + /// Create new [`RoomIndex`] which stores the index in memory. /// Intended for testing. - pub fn new_in_ram(room_id: &RoomId) -> Result { + pub fn new_in_memory(room_id: &RoomId) -> Result { let schema = RoomMessageSchema::new(); let index = Index::create_in_ram(schema.as_tantivy_schema()); RoomIndex::new_with(index, schema, room_id) @@ -130,10 +122,14 @@ impl RoomIndex { RoomIndex::new_with(index, schema, room_id) } - /// Add [`AnyMessageLikeEvent`] to [`RoomIndex`] - pub fn add_event(&mut self, event: AnyMessageLikeEvent) -> Result<(), IndexError> { - let doc = self.schema.make_doc(event)?; - self.writer.add_document(doc)?; // TODO: This is blocking. Handle it. + /// Handle [`AnySyncMessageLikeEvent`] + /// + /// This which will add/remove/edit an event in the index based on the + /// event type. + pub fn handle_event(&mut self, event: AnySyncMessageLikeEvent) -> Result<(), IndexError> { + match self.schema.handle_event(event)? { + RoomIndexOperation::Add(document) => self.writer.add_document(document)?, + }; Ok(()) } @@ -198,64 +194,64 @@ mod tests { use crate::index::RoomIndex; #[test] - fn test_make_index_in_ram() { + fn test_make_index_in_memory() { let room_id = room_id!("!room_id:localhost"); - let index = RoomIndex::new_in_ram(room_id); + let index = RoomIndex::new_in_memory(room_id); index.expect("failed to make index in ram: {index:?}"); } #[test] - fn test_add_event() { + fn test_handle_event() { let room_id = room_id!("!room_id:localhost"); let mut index = - RoomIndex::new_in_ram(room_id).expect("failed to make index in ram: {index:?}"); + RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}"); let event = EventFactory::new() .text_msg("event message") .event_id(event_id!("$event_id:localhost")) .room(room_id) .sender(user_id!("@user_id:localhost")) - .into_any_message_like_event(); + .into_any_sync_message_like_event(); - index.add_event(event).expect("failed to add event: {res:?}"); + index.handle_event(event).expect("failed to add event: {res:?}"); } #[test] fn test_search_populated_index() -> Result<(), Box> { let room_id = room_id!("!room_id:localhost"); let mut index = - RoomIndex::new_in_ram(room_id).expect("failed to make index in ram: {index:?}"); + RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}"); let event_id_1 = event_id!("$event_id_1:localhost"); let event_id_2 = event_id!("$event_id_2:localhost"); let event_id_3 = event_id!("$event_id_3:localhost"); - index.add_event( + index.handle_event( EventFactory::new() .text_msg("This is a sentence") .event_id(event_id_1) .room(room_id) .sender(user_id!("@user_id:localhost")) - .into_any_message_like_event(), + .into_any_sync_message_like_event(), )?; - index.add_event( + index.handle_event( EventFactory::new() .text_msg("All new words") .event_id(event_id_2) .room(room_id) .sender(user_id!("@user_id:localhost")) - .into_any_message_like_event(), + .into_any_sync_message_like_event(), )?; - index.add_event( + index.handle_event( EventFactory::new() .text_msg("A similar sentence") .event_id(event_id_3) .room(room_id) .sender(user_id!("@user_id:localhost")) - .into_any_message_like_event(), + .into_any_sync_message_like_event(), )?; index.commit_and_reload()?; @@ -275,7 +271,7 @@ mod tests { fn test_search_empty_index() -> Result<(), Box> { let room_id = room_id!("!room_id:localhost"); let mut index = - RoomIndex::new_in_ram(room_id).expect("failed to make index in ram: {index:?}"); + RoomIndex::new_in_memory(room_id).expect("failed to make index in ram: {index:?}"); index.commit_and_reload()?; diff --git a/crates/matrix-sdk-search/src/schema.rs b/crates/matrix-sdk-search/src/schema.rs index 64019e30ee8..3cee31aa44c 100644 --- a/crates/matrix-sdk-search/src/schema.rs +++ b/crates/matrix-sdk-search/src/schema.rs @@ -12,39 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! The event cache is an abstraction layer, sitting between the Rust SDK and a -//! final client, that acts as a global observer of all the rooms, gathering and -//! inferring some extra useful information about each room. In particular, this -//! doesn't require subscribing to a specific room to get access to this -//! information. -//! -//! It's intended to be fast, robust and easy to maintain, having learned from -//! previous endeavours at implementing middle to high level features elsewhere -//! in the SDK, notably in the UI's Timeline object. -//! -//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more -//! details about the historical reasons that led us to start writing this. - -use ruma::{ - MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, - events::{ - AnyMessageLikeEvent, MessageLikeEvent, MessageLikeEventContent, RedactContent, - RedactedMessageLikeEventContent, room::message::MessageType, - }, +use ruma::events::{ + AnySyncMessageLikeEvent, MessageLikeEventContent, RedactContent, + RedactedMessageLikeEventContent, SyncMessageLikeEvent, room::message::MessageType, }; use tantivy::{ DateTime, TantivyDocument, doc, schema::{DateOptions, DateTimePrecision, Field, INDEXED, STORED, STRING, Schema, TEXT}, }; -use crate::error::{IndexError, IndexSchemaError}; +use crate::{ + error::{IndexError, IndexSchemaError}, + index::RoomIndexOperation, +}; pub(crate) trait MatrixSearchIndexSchema { fn new() -> Self; fn default_search_fields(&self) -> Vec; fn primary_key(&self) -> Field; fn as_tantivy_schema(&self) -> Schema; - fn make_doc(&self, event: AnyMessageLikeEvent) -> Result; + fn handle_event( + &self, + event: AnySyncMessageLikeEvent, + ) -> Result; } #[derive(Debug, Clone)] @@ -58,48 +48,31 @@ pub(crate) struct RoomMessageSchema { } impl RoomMessageSchema { - fn parse_event( + /// Given an [`AnySyncMessageLikeEvent`] and a function to convert the + /// content into a String to be indexed, return a [`TantivyDocument`] to + /// index. + fn make_doc( &self, - event: MessageLikeEvent, - get_body: F, - ) -> Result<(OwnedEventId, String, MilliSecondsSinceUnixEpoch, OwnedUserId), IndexError> + event: SyncMessageLikeEvent, + get_body_from_content: F, + ) -> Result where ::Redacted: RedactedMessageLikeEventContent, F: FnOnce(&C) -> Result, { let unredacted = event.as_original().ok_or(IndexError::CannotIndexRedactedMessage)?; - let body = get_body(&unredacted.content)?; + let body = get_body_from_content(&unredacted.content)?; - Ok(( - unredacted.event_id.clone(), - body, - unredacted.origin_server_ts, - unredacted.sender.clone(), + Ok(doc!( + self.event_id_field => unredacted.event_id.to_string(), + self.body_field => body, + self.date_field => + DateTime::from_timestamp_millis( + unredacted.origin_server_ts.get().into()), + self.sender_field => unredacted.sender.to_string(), )) } - - fn parse_any_event( - &self, - event: AnyMessageLikeEvent, - ) -> Result<(OwnedEventId, String, MilliSecondsSinceUnixEpoch, OwnedUserId), IndexError> { - match event { - // old m.room.message behaviour - AnyMessageLikeEvent::RoomMessage(event) => { - self.parse_event(event, |content| match &content.msgtype { - MessageType::Text(content) => Ok(content.body.clone()), - _ => Err(IndexError::MessageTypeNotSupported), - }) - } - - // new m.message behaviour - AnyMessageLikeEvent::Message(event) => self.parse_event(event, |content| { - content.text.find_plain().ok_or(IndexError::EmptyMessage).map(|v| v.to_owned()) - }), - - _ => Err(IndexError::MessageTypeNotSupported), - } - } } impl MatrixSearchIndexSchema for RoomMessageSchema { @@ -140,17 +113,28 @@ impl MatrixSearchIndexSchema for RoomMessageSchema { self.inner.clone() } - fn make_doc(&self, event: AnyMessageLikeEvent) -> Result { - let (event_id, body, timestamp, sender) = self.parse_any_event(event)?; + fn handle_event( + &self, + event: AnySyncMessageLikeEvent, + ) -> Result { + match event { + // m.room.message behaviour + AnySyncMessageLikeEvent::RoomMessage(event) => self + .make_doc(event, |content| match &content.msgtype { + MessageType::Text(content) => Ok(content.body.clone()), + _ => Err(IndexError::MessageTypeNotSupported), + }) + .map(RoomIndexOperation::Add), - Ok(doc!( - self.event_id_field => event_id.to_string(), - self.body_field => body, - self.date_field => - DateTime::from_timestamp_millis( - timestamp.get().into()), - self.sender_field => sender.to_string(), - )) + // new MSC-1767 m.message behaviour + AnySyncMessageLikeEvent::Message(event) => self + .make_doc(event, |content| { + content.text.find_plain().ok_or(IndexError::EmptyMessage).map(|v| v.to_owned()) + }) + .map(RoomIndexOperation::Add), + + _ => Err(IndexError::MessageTypeNotSupported), + } } } diff --git a/crates/matrix-sdk-search/src/writer.rs b/crates/matrix-sdk-search/src/writer.rs index b3905b72c5b..146f2d98a46 100644 --- a/crates/matrix-sdk-search/src/writer.rs +++ b/crates/matrix-sdk-search/src/writer.rs @@ -12,19 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! The event cache is an abstraction layer, sitting between the Rust SDK and a -//! final client, that acts as a global observer of all the rooms, gathering and -//! inferring some extra useful information about each room. In particular, this -//! doesn't require subscribing to a specific room to get access to this -//! information. -//! -//! It's intended to be fast, robust and easy to maintain, having learned from -//! previous endeavours at implementing middle to high level features elsewhere -//! in the SDK, notably in the UI's Timeline object. -//! -//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more -//! details about the historical reasons that led us to start writing this. - use tantivy::{IndexWriter, TantivyDocument, TantivyError}; use crate::{OpStamp, error::IndexError}; diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 50e7be774d3..04cbcdc0560 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -92,9 +92,9 @@ matrix-sdk-base.workspace = true matrix-sdk-common.workspace = true matrix-sdk-ffi-macros = { workspace = true, optional = true } matrix-sdk-indexeddb = { workspace = true, optional = true } +matrix-sdk-search = { workspace = true, optional = true } matrix-sdk-sqlite = { workspace = true, optional = true } matrix-sdk-test = { workspace = true, optional = true } -matrix-sdk-search = { workspace = true, optional = true } mime.workspace = true mime2ext = "0.1.53" oauth2.workspace = true diff --git a/crates/matrix-sdk/src/client/builder/mod.rs b/crates/matrix-sdk/src/client/builder/mod.rs index b0cd1add2c9..af4b8d23d28 100644 --- a/crates/matrix-sdk/src/client/builder/mod.rs +++ b/crates/matrix-sdk/src/client/builder/mod.rs @@ -27,12 +27,8 @@ use homeserver_config::*; #[cfg(feature = "e2e-encryption")] use matrix_sdk_base::crypto::DecryptionSettings; use matrix_sdk_base::{store::StoreConfig, BaseClient, ThreadingSupport}; -#[cfg(feature = "experimental-search")] -use matrix_sdk_search::index::RoomIndex; #[cfg(feature = "sqlite")] use matrix_sdk_sqlite::SqliteStoreConfig; -#[cfg(feature = "experimental-search")] -use ruma::OwnedRoomId; use ruma::{ api::{error::FromHttpResponseError, MatrixVersion, SupportedVersions}, OwnedServerName, ServerName, @@ -42,6 +38,10 @@ use tokio::sync::{broadcast, Mutex, OnceCell}; use tracing::{debug, field::debug, instrument, Span}; use super::{Client, ClientInner}; +#[cfg(feature = "experimental-search")] +use crate::client::search::SearchIndex; +#[cfg(feature = "experimental-search")] +use crate::client::search::SearchIndexStoreKind; #[cfg(feature = "e2e-encryption")] use crate::crypto::{CollectStrategy, TrustRequirement}; #[cfg(feature = "e2e-encryption")] @@ -123,7 +123,7 @@ pub struct ClientBuilder { cross_process_store_locks_holder_name: String, threading_support: ThreadingSupport, #[cfg(feature = "experimental-search")] - index_base_dir: IndexBaseDir, + search_index_store_kind: SearchIndexStoreKind, } impl ClientBuilder { @@ -156,7 +156,7 @@ impl ClientBuilder { Self::DEFAULT_CROSS_PROCESS_STORE_LOCKS_HOLDER_NAME.to_owned(), threading_support: ThreadingSupport::Disabled, #[cfg(feature = "experimental-search")] - index_base_dir: IndexBaseDir::Ram, + search_index_store_kind: SearchIndexStoreKind::InMemory, } } @@ -503,8 +503,8 @@ impl ClientBuilder { /// The base directory in which each room's index directory will be stored. #[cfg(feature = "experimental-search")] - pub fn index_base_directory(mut self, path: IndexBaseDir) -> Self { - self.index_base_dir = path; + pub fn search_index_store(mut self, kind: SearchIndexStoreKind) -> Self { + self.search_index_store_kind = kind; self } @@ -610,8 +610,8 @@ impl ClientBuilder { let latest_events = OnceCell::new(); #[cfg(feature = "experimental-search")] - let room_indexes: Arc>> = - Arc::new(Mutex::new(HashMap::new())); + let search_index = + SearchIndex::new(Arc::new(Mutex::new(HashMap::new())), self.search_index_store_kind); let inner = ClientInner::new( auth_ctx, @@ -631,9 +631,7 @@ impl ClientBuilder { self.enable_share_history_on_invite, self.cross_process_store_locks_holder_name, #[cfg(feature = "experimental-search")] - room_indexes, - #[cfg(feature = "experimental-search")] - self.index_base_dir, + search_index, ) .await; @@ -744,14 +742,6 @@ async fn build_indexeddb_store_config( panic!("the IndexedDB is only available on the 'wasm32' arch") } -#[cfg(feature = "experimental-search")] -#[allow(dead_code)] -#[derive(Clone, Debug)] -pub enum IndexBaseDir { - Directory(PathBuf), - Ram, -} - #[derive(Clone, Debug)] enum HttpConfig { #[cfg(not(target_family = "wasm"))] diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index edd6ae95b5e..87230e7d561 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -14,8 +14,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(feature = "experimental-search")] -use std::collections::HashMap; use std::{ collections::{btree_map, BTreeMap, BTreeSet}, fmt::{self, Debug}, @@ -40,12 +38,6 @@ use matrix_sdk_base::{ StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport, }; use matrix_sdk_common::ttl_cache::TtlCache; -#[cfg(feature = "experimental-search")] -use matrix_sdk_search::error::IndexError; -#[cfg(feature = "experimental-search")] -use matrix_sdk_search::index::RoomIndex; -#[cfg(feature = "experimental-search")] -use ruma::events::AnyMessageLikeEvent; #[cfg(feature = "e2e-encryption")] use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent}; use ruma::{ @@ -80,15 +72,11 @@ use ruma::{ RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId, }; use serde::de::DeserializeOwned; -#[cfg(feature = "experimental-search")] -use tokio::sync::MutexGuard; use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard}; use tracing::{debug, error, instrument, trace, warn, Instrument, Span}; use url::Url; use self::futures::SendRequest; -#[cfg(feature = "experimental-search")] -use crate::client::builder::IndexBaseDir; use crate::{ authentication::{ matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback, @@ -123,8 +111,12 @@ use crate::{ mod builder; pub(crate) mod caches; pub(crate) mod futures; +#[cfg(feature = "experimental-search")] +pub(crate) mod search; pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder}; +#[cfg(feature = "experimental-search")] +use crate::client::search::SearchIndex; #[cfg(not(target_family = "wasm"))] type NotificationHandlerFut = Pin + Send>>; @@ -363,13 +355,9 @@ pub(crate) struct ClientInner { /// [`LatestEvent`]: crate::latest_event::LatestEvent latest_events: OnceCell, - /// HashMap that links each joined room to its RoomIndex #[cfg(feature = "experimental-search")] - room_indexes: Arc>>, - - /// Base directory that stores the directories for each RoomIndex - #[cfg(feature = "experimental-search")] - index_base_dir: IndexBaseDir, + /// Handler for [`RoomIndex`]'s of each room + search_index: SearchIndex, } impl ClientInner { @@ -394,10 +382,7 @@ impl ClientInner { #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings, #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool, cross_process_store_locks_holder_name: String, - #[cfg(feature = "experimental-search")] room_indexes: Arc< - Mutex>, - >, - #[cfg(feature = "experimental-search")] index_base_dir: IndexBaseDir, + #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex, ) -> Arc { let caches = ClientCaches { server_info: server_info.into(), @@ -434,9 +419,7 @@ impl ClientInner { enable_share_history_on_invite, server_max_upload_size: Mutex::new(OnceCell::new()), #[cfg(feature = "experimental-search")] - room_indexes, - #[cfg(feature = "experimental-search")] - index_base_dir, + search_index: search_index_handler, }; #[allow(clippy::let_and_return)] @@ -2742,9 +2725,7 @@ impl Client { self.inner.enable_share_history_on_invite, cross_process_store_locks_holder_name, #[cfg(feature = "experimental-search")] - self.inner.room_indexes.clone(), - #[cfg(feature = "experimental-search")] - self.inner.index_base_dir.clone(), + self.inner.search_index.clone(), ) .await, }; @@ -2842,83 +2823,9 @@ impl Client { &self.base_client().decryption_settings } - /// Add [`AnyMessageLikeEvent`] to [`RoomIndex`] of given [`RoomId`] #[cfg(feature = "experimental-search")] - pub async fn index_event( - &self, - event: AnyMessageLikeEvent, - room_id: &RoomId, - ) -> Result<(), IndexError> { - let mut hash_map = self.inner.room_indexes.lock().await; - - let result = if let Some(index) = hash_map.get_mut(room_id) { - index.add_event(event) - } else { - self.add_index_impl(room_id, &mut hash_map)?; - let index = hash_map.get_mut(room_id).expect("key just added"); - index.add_event(event) - }; - - match result { - Ok(_) => {} - Err(IndexError::CannotIndexRedactedMessage) - | Err(IndexError::EmptyMessage) - | Err(IndexError::MessageTypeNotSupported) => { - debug!("failed to parse event for indexing: {result:?}") - } - Err(IndexError::TantivyError(err)) => { - error!("failed to add/commit event to index: {err:?}") - } - Err(_) => error!("unexpected error during indexing: {result:?}"), - }; - Ok(()) - } - - /// Add [`RoomIndex`] for given [`RoomId`] to room_indexes - #[cfg(feature = "experimental-search")] - pub async fn add_index(&self, room_id: &RoomId) -> Result<(), IndexError> { - let mut hash_map = self.inner.room_indexes.lock().await; - self.add_index_impl(room_id, &mut hash_map)?; - Ok(()) - } - - #[cfg(feature = "experimental-search")] - fn add_index_impl( - &self, - room_id: &RoomId, - hash_map: &mut MutexGuard<'_, HashMap>, - ) -> Result<(), IndexError> { - if !hash_map.contains_key(room_id) { - let index = match &self.inner.index_base_dir { - IndexBaseDir::Directory(path) => RoomIndex::open_or_create(path, room_id)?, - IndexBaseDir::Ram => RoomIndex::new_in_ram(room_id)?, - }; - hash_map.insert(room_id.to_owned(), index); - } - Ok(()) - } - - /// Search a [`Room`]'s index for the query and return at most - /// max_number_of_results results. - #[cfg(feature = "experimental-search")] - pub async fn search_index( - &self, - query: &str, - max_number_of_results: usize, - room_id: &RoomId, - ) -> Option> { - let hash_map = self.inner.room_indexes.lock().await; - if let Some(index) = hash_map.get(room_id) { - index - .search(query, max_number_of_results) - .inspect_err(|err| { - error!("error occurred while searching index: {err:?}"); - }) - .ok() - } else { - warn!("Tried to search in a room with no index"); - None - } + pub(crate) fn search_index(&self) -> &SearchIndex { + &self.inner.search_index } /// Whether the client is configured to take thread subscriptions (MSC4306 diff --git a/crates/matrix-sdk/src/client/search.rs b/crates/matrix-sdk/src/client/search.rs new file mode 100644 index 00000000000..6c6850bc5fd --- /dev/null +++ b/crates/matrix-sdk/src/client/search.rs @@ -0,0 +1,137 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{collections::hash_map::HashMap, path::PathBuf, sync::Arc}; + +use matrix_sdk_search::{error::IndexError, index::RoomIndex}; +use ruma::{events::AnySyncMessageLikeEvent, OwnedEventId, OwnedRoomId, RoomId}; +use tokio::sync::{Mutex, MutexGuard}; +use tracing::{debug, error}; + +/// Type of location to store [`RoomIndex`] +#[derive(Clone, Debug)] +pub enum SearchIndexStoreKind { + /// Store in file system folder + Directory(PathBuf), + /// Store in memory + InMemory, +} + +/// Object that handles inteeraction with [`RoomIndex`]'s for search +#[derive(Clone, Debug)] +pub(crate) struct SearchIndex { + /// HashMap that links each joined room to its RoomIndex + room_indexes: Arc>>, + + /// Base directory that stores the directories for each RoomIndex + search_index_store_kind: SearchIndexStoreKind, +} + +impl SearchIndex { + /// Create a new [`SearchIndexHandler`] + pub fn new( + room_indexes: Arc>>, + search_index_store_kind: SearchIndexStoreKind, + ) -> Self { + Self { room_indexes, search_index_store_kind } + } + + pub async fn lock(&self) -> SearchIndexGuard<'_> { + SearchIndexGuard { + index_map: self.room_indexes.lock().await, + search_index_store_kind: &self.search_index_store_kind, + } + } +} + +pub(crate) struct SearchIndexGuard<'a> { + /// Guard around the [`RoomIndex`] map + index_map: MutexGuard<'a, HashMap>, + + /// Base directory that stores the directories for each RoomIndex + search_index_store_kind: &'a SearchIndexStoreKind, +} + +impl SearchIndexGuard<'_> { + fn create_index(&self, room_id: &RoomId) -> Result { + let index = match self.search_index_store_kind { + SearchIndexStoreKind::Directory(path) => RoomIndex::open_or_create(path, room_id)?, + SearchIndexStoreKind::InMemory => RoomIndex::new_in_memory(room_id)?, + }; + Ok(index) + } + + /// Handle an [`AnySyncMessageLikeEvent`] in the [`RoomIndex`] of a given + /// [`RoomId`] + /// + /// This which will add/remove/edit an event in the index based on the + /// event type. + pub(crate) fn handle_event( + &mut self, + event: AnySyncMessageLikeEvent, + room_id: &RoomId, + ) -> Result<(), IndexError> { + if !self.index_map.contains_key(room_id) { + let index = self.create_index(room_id)?; + self.index_map.insert(room_id.to_owned(), index); + } + + let index = self.index_map.get_mut(room_id).expect("index should exist"); + let result = index.handle_event(event); + + match result { + Ok(_) => {} + Err(IndexError::CannotIndexRedactedMessage) + | Err(IndexError::EmptyMessage) + | Err(IndexError::MessageTypeNotSupported) => { + debug!("failed to parse event for indexing: {result:?}") + } + Err(IndexError::TantivyError(err)) => { + error!("failed to handle event in index: {err:?}") + } + Err(_) => error!("unexpected error during indexing: {result:?}"), + } + Ok(()) + } + + /// Search a [`Room`]'s index for the query and return at most + /// max_number_of_results results. + pub(crate) fn search( + &self, + query: &str, + max_number_of_results: usize, + room_id: &RoomId, + ) -> Option> { + if let Some(index) = self.index_map.get(room_id) { + index + .search(query, max_number_of_results) + .inspect_err(|err| { + error!("error occurred while searching index: {err:?}"); + }) + .ok() + } else { + debug!("Tried to search in a room with no index"); + None + } + } + + /// Commit a [`Room`]'s [`RoomIndex`] and reload searchers + pub(crate) fn commit_and_reload(&mut self, room_id: &RoomId) { + if let Some(index) = self.index_map.get_mut(room_id) { + let _ = index.commit_and_reload().inspect_err(|err| { + error!("error occurred while committing: {err:?}"); + }); + } + } +} diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 992b3d35a7f..d658a6eea1c 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -263,32 +263,34 @@ impl RoomPagination { batch_size: u16, prev_token: Option, ) -> Result> { - let (events, new_token) = { - let Some(room) = self.inner.weak_room.get() else { - // The client is shutting down, return an empty default response. - return Ok(Some(BackPaginationOutcome { - reached_start: false, - events: Default::default(), - })); - }; - - let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref()); - options.limit = batch_size.into(); - - let response = room - .messages(options) - .await - .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?; - - (response.chunk, response.end) + let Some(room) = self.inner.weak_room.get() else { + // The client is shutting down, return an empty default response. + return Ok(Some(BackPaginationOutcome { + reached_start: false, + events: Default::default(), + })); }; + let mut options = MessagesOptions::new(Direction::Backward).from(prev_token.as_deref()); + options.limit = batch_size.into(); + + let response = room + .messages(options) + .await + .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?; + if let Some((outcome, timeline_event_diffs)) = self .inner .state .write() .await - .handle_backpagination(events, new_token, prev_token) + .handle_backpagination( + response.chunk, + response.end, + prev_token, + #[cfg(feature = "experimental-search")] + &room, + ) .await? { if !timeline_event_diffs.is_empty() { diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 890ec9214be..d970d8ada02 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -27,18 +27,12 @@ use std::{ use events::sort_positions_descending; use eyeball::SharedObservable; use eyeball_im::VectorDiff; -#[cfg(feature = "experimental-search")] -use futures_util::StreamExt; -#[cfg(feature = "experimental-search")] -use matrix_sdk_base::deserialized_responses::{TimelineEvent, TimelineEventKind}; use matrix_sdk_base::{ deserialized_responses::AmbiguityChange, event_cache::Event, linked_chunk::Position, sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline}, }; -#[cfg(feature = "experimental-search")] -use ruma::events::{AnyMessageLikeEvent, AnySyncTimelineEvent}; use ruma::{ api::Direction, events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent}, @@ -529,49 +523,6 @@ impl RoomEventCacheInner { Ok(()) } - #[cfg(feature = "experimental-search")] - fn parse_timeline_event(&self, event: &TimelineEvent) -> Option { - let maybe_try_event = match &event.kind { - TimelineEventKind::Decrypted(d) => Some(d.event.deserialize()), - TimelineEventKind::PlainText { event } => match event.deserialize() { - Ok(event_obj) => match event_obj { - AnySyncTimelineEvent::MessageLike(sync_event) => { - Some(Ok(sync_event.into_full_event(self.room_id.clone()))) - } - AnySyncTimelineEvent::State(_) => None, - }, - Err(e) => Some(Err(e)), - }, - - TimelineEventKind::UnableToDecrypt { event: _, utd_info: _ } => None, - }; - - match maybe_try_event { - Some(Ok(event)) => match event { - // TODO: Handle room redaction. - AnyMessageLikeEvent::RoomRedaction(_) | _ => Some(event), - }, - Some(Err(e)) => { - warn!("failed to index event: {e:?}"); - None - } - None => None, - } - } - - #[cfg(feature = "experimental-search")] - async fn index_timeline_event(&self, event: &TimelineEvent) -> Result<()> { - if let Some(message_event) = self.parse_timeline_event(event) { - if let Some(room) = self.weak_room.get() { - room.index_event(message_event).await.map_err(EventCacheError::from) - } else { - Err(EventCacheError::RoomNotFound { room_id: self.room_id.to_owned() }) - } - } else { - Ok(()) - } - } - /// Handle a [`Timeline`], i.e. new events received by a sync for this /// room. async fn handle_timeline( @@ -591,8 +542,22 @@ impl RoomEventCacheInner { // Add all the events to the backend. trace!("adding new events"); - let (stored_prev_batch_token, timeline_event_diffs) = - self.state.write().await.handle_sync(timeline).await?; + #[cfg(feature = "experimental-search")] + let Some(room) = self.weak_room.get() else { + trace!("Couldn't get room while handling timeline"); + return Ok(()); + }; + + let (stored_prev_batch_token, timeline_event_diffs) = self + .state + .write() + .await + .handle_sync( + timeline, + #[cfg(feature = "experimental-search")] + &room, + ) + .await?; // Now that all events have been added, we can trigger the // `pagination_token_notifier`. @@ -600,37 +565,6 @@ impl RoomEventCacheInner { self.pagination_batch_token_notifier.notify_one(); } - // We can also add the event to the index. - #[cfg(feature = "experimental-search")] - for diff in timeline_event_diffs.iter() { - match diff { - VectorDiff::Append { values } => { - let batch_size = 5; - let futures: Vec<_> = - values.iter().map(|event| self.index_timeline_event(event)).collect(); - let mut buffer = - futures_util::stream::iter(futures).buffer_unordered(batch_size); - - while let Some(result) = buffer.next().await { - if let Err(e) = result { - warn!("error while trying to index event: {e:?}"); - } - } - } - VectorDiff::Insert { index: _, value } => { - if let Err(e) = self.index_timeline_event(value).await { - warn!("error while trying to index event: {e:?}"); - } - } - VectorDiff::Remove { index: _ } => {} - VectorDiff::Clear => {} - _ => { - warn!("vector diff case shouldn't happen"); - continue; - } - } - } - let mut update_has_been_sent = false; // The order matters here: first send the timeline event diffs, then only the @@ -695,6 +629,8 @@ mod private { use eyeball::SharedObservable; use eyeball_im::VectorDiff; + #[cfg(feature = "experimental-search")] + use matrix_sdk_base::deserialized_responses::TimelineEvent; use matrix_sdk_base::{ apply_redaction, deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind}, @@ -710,10 +646,12 @@ mod private { sync::Timeline, }; use matrix_sdk_common::executor::spawn; + #[cfg(feature = "experimental-search")] + use ruma::events::AnyMessageLikeEvent; use ruma::{ events::{ - relation::RelationType, room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent, - MessageLikeEventType, + relation::RelationType, room::redaction::SyncRoomRedactionEvent, + AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType, }, room_version_rules::RoomVersionRules, serde::Raw, @@ -731,6 +669,8 @@ mod private { deduplicator::filter_duplicate_events, room::threads::ThreadEventCache, BackPaginationOutcome, RoomPaginationStatus, ThreadEventCacheUpdate, }; + #[cfg(feature = "experimental-search")] + use crate::Room; /// State for a single room's event cache. /// @@ -1514,6 +1454,49 @@ mod private { Ok(Some((target, related))) } + #[cfg(feature = "experimental-search")] + fn parse_timeline_event(&self, event: &TimelineEvent) -> Option { + let maybe_try_event = match &event.kind { + TimelineEventKind::Decrypted(d) => { + Some(d.event.deserialize().map(AnyMessageLikeEvent::into)) + } + TimelineEventKind::PlainText { event } => match event.deserialize() { + Ok(event_obj) => match event_obj { + AnySyncTimelineEvent::MessageLike(sync_event) => Some(Ok(sync_event)), + AnySyncTimelineEvent::State(_) => None, + }, + Err(e) => Some(Err(e)), + }, + + TimelineEventKind::UnableToDecrypt { event: _, utd_info: _ } => None, + }; + + match maybe_try_event { + Some(Ok(event)) => Some(event), + Some(Err(e)) => { + warn!("failed to parse event: {e:?}"); + None + } + None => None, + } + } + + /// Takes a [`TimelineEvent`] and passes it to the [`RoomIndex`] of the + /// given room which will add/remove/edit an event in the index based on + /// the event type. + #[cfg(feature = "experimental-search")] + async fn index_event( + &self, + event: &TimelineEvent, + room: &Room, + ) -> Result<(), EventCacheError> { + if let Some(message_event) = self.parse_timeline_event(event) { + room.index_event(message_event).await.map_err(EventCacheError::from) + } else { + Ok(()) + } + } + /// Post-process new events, after they have been added to the in-memory /// linked chunk. /// @@ -1522,6 +1505,7 @@ mod private { &mut self, events: Vec, is_sync: bool, + #[cfg(feature = "experimental-search")] room: &Room, ) -> Result<(), EventCacheError> { // Update the store before doing the post-processing. self.propagate_changes().await?; @@ -1529,7 +1513,13 @@ mod private { let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new(); for event in events { - self.maybe_apply_new_redaction(&event).await?; + self.maybe_apply_new_redaction(&event).await?; // TODO: Handle redaction for search index + + // We can also add the event to the index. + #[cfg(feature = "experimental-search")] + if let Err(err) = self.index_event(&event, room).await { + warn!("error while trying to index event: {err:?}"); + } if let Some(thread_root) = extract_thread_root(event.raw()) { new_events_by_thread.entry(thread_root).or_default().push(event.clone()); @@ -1678,9 +1668,9 @@ mod private { // It is a `m.room.redaction`! We can deserialize it entirely. - let Ok(AnySyncTimelineEvent::MessageLike( - ruma::events::AnySyncMessageLikeEvent::RoomRedaction(redaction), - )) = raw_event.deserialize() + let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction( + redaction, + ))) = raw_event.deserialize() else { return Ok(()); }; @@ -1768,6 +1758,7 @@ mod private { pub async fn handle_sync( &mut self, mut timeline: Timeline, + #[cfg(feature = "experimental-search")] room: &Room, ) -> Result<(bool, Vec>), EventCacheError> { let mut prev_batch = timeline.prev_batch.take(); @@ -1860,7 +1851,13 @@ mod private { self.room_linked_chunk .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events); - self.post_process_new_events(events, true).await?; + self.post_process_new_events( + events, + true, + #[cfg(feature = "experimental-search")] + room, + ) + .await?; if timeline.limited && has_new_gap { // If there was a previous batch token for a limited timeline, unload the chunks @@ -1890,6 +1887,7 @@ mod private { events: Vec, mut new_token: Option, prev_token: Option, + #[cfg(feature = "experimental-search")] room: &Room, ) -> Result>)>, EventCacheError> { // Check that the previous token still exists; otherwise it's a sign that the @@ -1964,7 +1962,13 @@ mod private { ); // Note: this flushes updates to the store. - self.post_process_new_events(topo_ordered_events, false).await?; + self.post_process_new_events( + topo_ordered_events, + false, + #[cfg(feature = "experimental-search")] + room, + ) + .await?; let event_diffs = self.room_linked_chunk.updates_as_vector_diffs(); diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index c70c30dc6fb..dffc7826c04 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -63,12 +63,11 @@ use mime::Mime; use reply::Reply; #[cfg(feature = "unstable-msc4274")] use ruma::events::room::message::GalleryItemType; -#[cfg(feature = "experimental-search")] -use ruma::events::AnyMessageLikeEvent; +#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))] +use ruma::events::AnySyncMessageLikeEvent; #[cfg(feature = "e2e-encryption")] use ruma::events::{ - room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent, - SyncMessageLikeEvent, + room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncTimelineEvent, SyncMessageLikeEvent, }; use ruma::{ api::client::{ @@ -3678,21 +3677,29 @@ impl Room { opts.send(self, event_id).await } - /// Add an [`AnyMessageLikeEvent`] to this room's [`RoomIndex`] + /// Handle an [`AnySyncMessageLikeEvent`] in this room's [`RoomIndex`]. + /// + /// This which will add/remove/edit an event in the index based on the + /// event type. #[cfg(feature = "experimental-search")] - pub(crate) async fn index_event(&self, event: AnyMessageLikeEvent) -> Result<(), IndexError> { - self.client.index_event(event, self.room_id()).await + pub(crate) async fn index_event( + &self, + event: AnySyncMessageLikeEvent, + ) -> Result<(), IndexError> { + self.client.search_index().lock().await.handle_event(event, self.room_id()) } /// Search this room's [`RoomIndex`] for query and return at most /// max_number_of_results results. #[cfg(feature = "experimental-search")] - pub async fn search_index( + pub async fn search( &self, query: &str, max_number_of_results: usize, ) -> Option> { - self.client.search_index(query, max_number_of_results, self.room_id()).await + let mut search_index_guard = self.client.search_index().lock().await; + search_index_guard.commit_and_reload(self.room_id()); + search_index_guard.search(query, max_number_of_results, self.room_id()) } /// Subscribe to a given thread in this room. diff --git a/crates/matrix-sdk/tests/integration/room/joined.rs b/crates/matrix-sdk/tests/integration/room/joined.rs index d9fc5b25592..b95f11178d1 100644 --- a/crates/matrix-sdk/tests/integration/room/joined.rs +++ b/crates/matrix-sdk/tests/integration/room/joined.rs @@ -740,6 +740,36 @@ async fn test_room_message_send() { assert_eq!(event_id!("$h29iv0s8:example.com"), response.event_id) } +#[cfg(feature = "experimental-search")] +#[async_test] +async fn test_sync_message_is_indexed() { + let mock_server = MatrixMockServer::new().await; + let client = mock_server.client_builder().build().await; + + client.event_cache().subscribe().unwrap(); + + let room_id = room_id!("!room_id:localhost"); + let event_id = event_id!("$event_id:localost"); + let user_id = user_id!("@user_id:localost"); + + let event_factory = EventFactory::new(); + let room = mock_server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id).add_timeline_bulk(vec![event_factory + .text_msg("this is a sentence") + .event_id(event_id) + .sender(user_id) + .into_raw_sync()]), + ) + .await; + + let response = room.search("this", 5).await.expect("search should have 1 result"); + + assert_eq!(response.len(), 1, "unexpected numbers of responses: {response:?}"); + assert_eq!(response[0], event_id, "event id doesn't match: {response:?}"); +} + #[async_test] async fn test_room_redact() { let server = MatrixMockServer::new().await; diff --git a/testing/matrix-sdk-test/src/event_factory.rs b/testing/matrix-sdk-test/src/event_factory.rs index 4c092e84973..e888075af52 100644 --- a/testing/matrix-sdk-test/src/event_factory.rs +++ b/testing/matrix-sdk-test/src/event_factory.rs @@ -28,8 +28,8 @@ use ruma::{ OwnedRoomId, OwnedTransactionId, OwnedUserId, OwnedVoipId, RoomId, RoomVersionId, TransactionId, UInt, UserId, VoipVersionId, events::{ - AnyMessageLikeEvent, AnyStateEvent, AnySyncMessageLikeEvent, AnySyncStateEvent, - AnySyncTimelineEvent, AnyTimelineEvent, BundledMessageLikeRelations, False, Mentions, + AnyStateEvent, AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent, + AnyTimelineEvent, BundledMessageLikeRelations, False, Mentions, RedactedMessageLikeEventContent, RedactedStateEventContent, StateEventContent, StaticEventContent, beacon::BeaconEventContent, @@ -321,7 +321,7 @@ where self.into_raw() } - pub fn into_any_message_like_event(self) -> AnyMessageLikeEvent { + pub fn into_any_sync_message_like_event(self) -> AnySyncMessageLikeEvent { self.into_raw().deserialize().expect("expected message like event") }