Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion influxdb3/src/commands/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,16 @@ pub struct Config {
#[clap(long = "accept-partial")]
accept_partial_writes: bool,

/// Flag to request the server not wait for sync before ACK'ing
///
/// This option returns a success before a write is durable.
#[clap(long = "no-sync")]
no_sync_writes: bool,

/// Give a quoted line protocol line via the command line
line_protocol: Option<Vec<String>>,

/// Specify a supported precision (eg: ns, us, ms, s).
/// Specify a supported precision (eg: auto, ns, us, ms, s).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe auto is left out as server sets it to Precision::Auto when it's not passed through, so users can leave out this argument when calling influxdb3 write.

params.precision.unwrap_or(Precision::Auto),

I don't think there's any harm in exposing it, but setting explicitly to auto on client might be redundant.

Copy link
Copy Markdown
Contributor Author

@philjb philjb Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think including it in the help is useful so users know it is an option; they might like to be explicit or future proof against changes to the default; but i don't feel strongly about it and its a minor thing. I'm inclined to leave it as in the pr.

#[clap(short = 'p', long = "precision")]
precision: Option<Precision>,

Expand Down Expand Up @@ -99,6 +105,9 @@ pub(crate) async fn command(config: Config) -> Result<()> {
if config.accept_partial_writes {
req = req.accept_partial(true);
}
if config.no_sync_writes {
req = req.no_sync(true);
}
req.body(writes).send().await?;

println!("success");
Expand Down
45 changes: 45 additions & 0 deletions influxdb3/tests/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,48 @@ async fn configure_last_caches() {
.await
.expect("should delete the cache");
}

#[tokio::test]
async fn write_with_no_sync() {
let server = TestServer::spawn().await;
let db_name = "foo";
let tbl_name = "bar";
let client = influxdb3_client::Client::new(
server.client_addr(),
Some("../testing-certs/rootCA.pem".into()),
)
.unwrap();

// Test with no_sync(true) - should succeed without waiting for fsync
client
.api_v3_write_lp(db_name)
.precision(Precision::Nanosecond)
.accept_partial(false)
.no_sync(true)
.body(format!("{tbl_name},t1=a,t2=aa f1=123 1000"))
.send()
.await
.expect("write with no_sync(true)");

// Test with no_sync(false) - default behavior, waits for fsync
client
.api_v3_write_lp(db_name)
.precision(Precision::Nanosecond)
.accept_partial(false)
.no_sync(false)
.body(format!("{tbl_name},t1=b,t2=bb f1=456 2000"))
.send()
.await
.expect("write with no_sync(false)");

// Verify both writes were successful by querying
let result = client
.api_v3_query_sql(db_name, format!("SELECT COUNT(*) FROM {tbl_name}"))
.format(Format::Csv)
.send()
.await
.expect("query to verify writes");
let s = String::from_utf8(result.as_ref().to_vec()).unwrap();
// Both writes should be present
assert_eq!(s, "count(*)\n2\n", "Expected query results");
}
2 changes: 1 addition & 1 deletion influxdb3/tests/server/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ async fn writes_with_different_schema_should_fail() {
#[tokio::test]
/// Check that the no_sync param can be used on any endpoint. However, this only means that serde
/// will parse it just fine. It is only able to be used in the v3 endpoint and will
/// default to requiring the WAL to synce before returning.
/// default to requiring the WAL to sync before returning.
async fn api_no_sync_param() {
let server = TestServer::spawn().await;
let client = server.http_client();
Expand Down
6 changes: 6 additions & 0 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,12 @@ impl<B> WriteRequestBuilder<'_, B> {
self.params.accept_partial = Some(set_to);
self
}

/// Set the `no_sync` parameter
pub fn no_sync(mut self, set_to: bool) -> Self {
self.params.no_sync = Some(set_to);
self
}
}

impl<'c> WriteRequestBuilder<'c, NoBody> {
Expand Down
1 change: 1 addition & 0 deletions influxdb3_types/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl std::str::FromStr for Precision {

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let p = match s {
"auto" => Self::Auto,
"s" => Self::Second,
"ms" => Self::Millisecond,
"us" => Self::Microsecond,
Expand Down