diff --git a/influxdb3/src/commands/write.rs b/influxdb3/src/commands/write.rs index 36c68e241e9..9128ab7c9f0 100644 --- a/influxdb3/src/commands/write.rs +++ b/influxdb3/src/commands/write.rs @@ -53,10 +53,16 @@ pub struct Config { #[clap(long = "accept-partial")] accept_partial_writes: bool, + /// Flag to request the server not wait for sync before ACK'ing + /// + /// This option returns a success before a write is durable. + #[clap(long = "no-sync")] + no_sync_writes: bool, + /// Give a quoted line protocol line via the command line line_protocol: Option>, - /// Specify a supported precision (eg: ns, us, ms, s). + /// Specify a supported precision (eg: auto, ns, us, ms, s). #[clap(short = 'p', long = "precision")] precision: Option, @@ -99,6 +105,9 @@ pub(crate) async fn command(config: Config) -> Result<()> { if config.accept_partial_writes { req = req.accept_partial(true); } + if config.no_sync_writes { + req = req.no_sync(true); + } req.body(writes).send().await?; println!("success"); diff --git a/influxdb3/tests/server/client.rs b/influxdb3/tests/server/client.rs index b4801e2ac48..45c7e7329c4 100644 --- a/influxdb3/tests/server/client.rs +++ b/influxdb3/tests/server/client.rs @@ -97,3 +97,48 @@ async fn configure_last_caches() { .await .expect("should delete the cache"); } + +#[tokio::test] +async fn write_with_no_sync() { + let server = TestServer::spawn().await; + let db_name = "foo"; + let tbl_name = "bar"; + let client = influxdb3_client::Client::new( + server.client_addr(), + Some("../testing-certs/rootCA.pem".into()), + ) + .unwrap(); + + // Test with no_sync(true) - should succeed without waiting for fsync + client + .api_v3_write_lp(db_name) + .precision(Precision::Nanosecond) + .accept_partial(false) + .no_sync(true) + .body(format!("{tbl_name},t1=a,t2=aa f1=123 1000")) + .send() + .await + .expect("write with no_sync(true)"); + + // Test with no_sync(false) - default behavior, waits for fsync + client + .api_v3_write_lp(db_name) + .precision(Precision::Nanosecond) + .accept_partial(false) + .no_sync(false) + .body(format!("{tbl_name},t1=b,t2=bb f1=456 2000")) + .send() + .await + .expect("write with no_sync(false)"); + + // Verify both writes were successful by querying + let result = client + .api_v3_query_sql(db_name, format!("SELECT COUNT(*) FROM {tbl_name}")) + .format(Format::Csv) + .send() + .await + .expect("query to verify writes"); + let s = String::from_utf8(result.as_ref().to_vec()).unwrap(); + // Both writes should be present + assert_eq!(s, "count(*)\n2\n", "Expected query results"); +} diff --git a/influxdb3/tests/server/write.rs b/influxdb3/tests/server/write.rs index 5d9efe3fb7b..2cc86b8aeb5 100644 --- a/influxdb3/tests/server/write.rs +++ b/influxdb3/tests/server/write.rs @@ -345,7 +345,7 @@ async fn writes_with_different_schema_should_fail() { #[tokio::test] /// Check that the no_sync param can be used on any endpoint. However, this only means that serde /// will parse it just fine. It is only able to be used in the v3 endpoint and will -/// default to requiring the WAL to synce before returning. +/// default to requiring the WAL to sync before returning. async fn api_no_sync_param() { let server = TestServer::spawn().await; let client = server.http_client(); diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index 2da3a6df8df..2614525fa5a 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -982,6 +982,12 @@ impl WriteRequestBuilder<'_, B> { self.params.accept_partial = Some(set_to); self } + + /// Set the `no_sync` parameter + pub fn no_sync(mut self, set_to: bool) -> Self { + self.params.no_sync = Some(set_to); + self + } } impl<'c> WriteRequestBuilder<'c, NoBody> { diff --git a/influxdb3_types/src/write.rs b/influxdb3_types/src/write.rs index 57ad3f3b475..bf49d46c072 100644 --- a/influxdb3_types/src/write.rs +++ b/influxdb3_types/src/write.rs @@ -33,6 +33,7 @@ impl std::str::FromStr for Precision { fn from_str(s: &str) -> std::result::Result { let p = match s { + "auto" => Self::Auto, "s" => Self::Second, "ms" => Self::Millisecond, "us" => Self::Microsecond,