diff --git a/influxdb3_cache/src/distinct_cache/cache.rs b/influxdb3_cache/src/distinct_cache/cache.rs index 2ab6a3a9b52..febce30f3d4 100644 --- a/influxdb3_cache/src/distinct_cache/cache.rs +++ b/influxdb3_cache/src/distinct_cache/cache.rs @@ -167,7 +167,7 @@ impl DistinctCache { ?predicates, ?projection, ?limit, - ">>> distinct cache record batches" + "distinct cache record batches" ); let n_columns = projection .as_ref() @@ -410,6 +410,7 @@ impl Node { } else if next_predicates.is_empty() && next_builders.is_empty() { if let Some(builder) = builder { builder.append_value(value.0); + total_count += 1; } } if let Some(new_limit) = limit.checked_sub(count) { @@ -417,10 +418,8 @@ impl Node { } else { break; } - } else { - if let Some(builder) = builder { - builder.append_value(value.0); - } + } else if let Some(builder) = builder { + builder.append_value(value.0); total_count += 1; } } diff --git a/influxdb3_cache/src/distinct_cache/mod.rs b/influxdb3_cache/src/distinct_cache/mod.rs index 08177c808a6..6ac4186b107 100644 --- a/influxdb3_cache/src/distinct_cache/mod.rs +++ b/influxdb3_cache/src/distinct_cache/mod.rs @@ -1015,4 +1015,159 @@ mod tests { &results ); } + + #[test_log::test(tokio::test)] + async fn test_distinct_with_where_clause_bug() { + let writer = TestWriter::new().await; + writer + .catalog() + .create_table( + TestWriter::DB_NAME, + "bar", + &["t1", "t2", "t3"], + &[("f1", FieldDataType::Float), ("f2", FieldDataType::Float)], + ) + .await + .unwrap(); + writer + .catalog() + .create_distinct_cache( + TestWriter::DB_NAME, + "bar", + Some("foo"), + &["t1", "t2", "t3"], + Default::default(), + Default::default(), + ) + .await + .unwrap(); + + let dvc = DistinctCacheProvider::new_from_catalog( + writer.catalog().time_provider(), + writer.catalog(), + ) + .await + .unwrap(); + + let write_batch = writer + .write_lp_to_write_batch( + "\ + bar,t1=AA,t2=\"(1\\,\\ 2]\",t3=None f1=-0.0621216848940003,f2=160.75626873391323\n\ + bar,t1=BB,t2=\"(0\\,\\ 1]\",t3=\"(2.0\\,\\ 4.0]\" f1=0.0183506941911869,f2=60.72371267622072\n\ + ", + 100, + ) + .await; + let wal_contents = influxdb3_wal::create::wal_contents( + (0, 100, 1), + [influxdb3_wal::create::write_batch_op(write_batch)], + ); + dvc.write_wal_contents_to_cache(&wal_contents); + + let ctx = SessionContext::new(); + let distinct_func = DistinctCacheFunction::new(writer.db_schema().id, Arc::clone(&dvc)); + ctx.register_udtf(DISTINCT_CACHE_UDTF_NAME, Arc::new(distinct_func)); + + // should be able to do a basic query to distinct cache: + let results = ctx + .sql("select * from distinct_cache('bar', 'foo')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_batches_eq!( + [ + "+----+----------+--------------+", + "| t1 | t2 | t3 |", + "+----+----------+--------------+", + "| AA | \"(1, 2]\" | None |", + "| BB | \"(0, 1]\" | \"(2.0, 4.0]\" |", + "+----+----------+--------------+", + ], + &results + ); + + // should be able to query with a WHERE clause: + let results = ctx + .sql("select * from distinct_cache('bar', 'foo') where t1 = 'BB'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_batches_eq!( + [ + "+----+----------+--------------+", + "| t1 | t2 | t3 |", + "+----+----------+--------------+", + "| BB | \"(0, 1]\" | \"(2.0, 4.0]\" |", + "+----+----------+--------------+", + ], + &results + ); + + // should be able to query with a projection: + let results = ctx + .sql("select t2 from distinct_cache('bar', 'foo')") + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_batches_eq!( + [ + "+----------+", + "| t2 |", + "+----------+", + "| \"(1, 2]\" |", + "| \"(0, 1]\" |", + "+----------+", + ], + &results + ); + + // should be able to query with projection and a WHERE clause: + let results = ctx + .sql("select t2 from distinct_cache('bar', 'foo') where t1 = 'BB'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_batches_eq!( + [ + "+----------+", + "| t2 |", + "+----------+", + "| \"(0, 1]\" |", + "+----------+", + ], + &results + ); + + // should be able to query with projection and a WHERE clause: + let results = ctx + .sql("select t2, t3 from distinct_cache('bar', 'foo') where t1 = 'BB'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_batches_eq!( + [ + "+----------+--------------+", + "| t2 | t3 |", + "+----------+--------------+", + "| \"(0, 1]\" | \"(2.0, 4.0]\" |", + "+----------+--------------+", + ], + &results + ); + } } diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 59360e66e69..7e13f8bdd26 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -224,6 +224,10 @@ impl Catalog { Ok(catalog) } + pub fn time_provider(&self) -> Arc { + Arc::clone(&self.time_provider) + } + pub fn set_state_shutdown(&self) { *self.state.lock() = CatalogState::Shutdown; }