Skip to content

Commit e82dc21

Browse files
authored
Feat : added truncate table support (#19633)
## Which issue does this PR close? - Related to #19617 ## Rationale for this change DataFusion recently added TableProvider hooks for row-level DML operations such as DELETE and UPDATE, but TRUNCATE TABLE was still unsupported. ## What changes are included in this PR? This PR adds planning and integration support for TRUNCATE TABLE in DataFusion, completing another part of the DML surface alongside existing DELETE and UPDATE support. Specifically, it includes: - SQL parsing support for TRUNCATE TABLE - Logical plan support via a new WriteOp::Truncate DML operation - Physical planner routing for TRUNCATE statements - A new TableProvider::truncate() hook for storage-native implementations - Protobuf / DML node support for serializing and deserializing TRUNCATE operations - SQL logic tests validating logical and physical planning behavior The implementation follows the same structure and conventions as the existing DELETE and UPDATE DML support. Execution semantics are delegated to individual TableProvider implementations via the new hook. ## Are these changes tested? Yes. The PR includes: SQL logic tests that verify: - Parsing of TRUNCATE TABLE - Correct logical plan generation - Correct physical planner routing - Clear and consistent errors for providers that do not yet support TRUNCATE These tests mirror the existing testing strategy used for unsupported DELETE and UPDATE operations. ## Are there any user-facing changes? Yes. Users can now execute TRUNCATE TABLE statements in DataFusion for tables whose TableProvider supports the new truncate() hook. Tables that do not support TRUNCATE will return a clear NotImplemented error.
1 parent 4d8d48c commit e82dc21

File tree

12 files changed

+280
-2
lines changed

12 files changed

+280
-2
lines changed

datafusion/catalog/src/table.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,14 @@ pub trait TableProvider: Debug + Sync + Send {
353353
) -> Result<Arc<dyn ExecutionPlan>> {
354354
not_impl_err!("UPDATE not supported for {} table", self.table_type())
355355
}
356+
357+
/// Remove all rows from the table.
358+
///
359+
/// Should return an [ExecutionPlan] producing a single row with count (UInt64),
360+
/// representing the number of rows removed.
361+
async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
362+
not_impl_err!("TRUNCATE not supported for {} table", self.table_type())
363+
}
356364
}
357365

358366
/// Arguments for scanning a table with [`TableProvider::scan_with_args`].

datafusion/core/src/physical_planner.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,30 @@ impl DefaultPhysicalPlanner {
655655
);
656656
}
657657
}
658+
LogicalPlan::Dml(DmlStatement {
659+
table_name,
660+
target,
661+
op: WriteOp::Truncate,
662+
..
663+
}) => {
664+
if let Some(provider) =
665+
target.as_any().downcast_ref::<DefaultTableSource>()
666+
{
667+
provider
668+
.table_provider
669+
.truncate(session_state)
670+
.await
671+
.map_err(|e| {
672+
e.context(format!(
673+
"TRUNCATE operation on table '{table_name}'"
674+
))
675+
})?
676+
} else {
677+
return exec_err!(
678+
"Table source can't be downcasted to DefaultTableSource"
679+
);
680+
}
681+
}
658682
LogicalPlan::Window(Window { window_expr, .. }) => {
659683
assert_or_internal_err!(
660684
!window_expr.is_empty(),

datafusion/core/tests/custom_sources_cases/dml_planning.rs

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Tests for DELETE and UPDATE planning to verify filter and assignment extraction.
18+
//! Tests for DELETE, UPDATE, and TRUNCATE planning to verify filter and assignment extraction.
1919
2020
use std::any::Any;
2121
use std::sync::{Arc, Mutex};
@@ -24,9 +24,10 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2424
use async_trait::async_trait;
2525
use datafusion::datasource::{TableProvider, TableType};
2626
use datafusion::error::Result;
27-
use datafusion::execution::context::SessionContext;
27+
use datafusion::execution::context::{SessionConfig, SessionContext};
2828
use datafusion::logical_expr::Expr;
2929
use datafusion_catalog::Session;
30+
use datafusion_common::ScalarValue;
3031
use datafusion_physical_plan::ExecutionPlan;
3132
use datafusion_physical_plan::empty::EmptyExec;
3233

@@ -165,6 +166,66 @@ impl TableProvider for CaptureUpdateProvider {
165166
}
166167
}
167168

169+
/// A TableProvider that captures whether truncate() was called.
170+
struct CaptureTruncateProvider {
171+
schema: SchemaRef,
172+
truncate_called: Arc<Mutex<bool>>,
173+
}
174+
175+
impl CaptureTruncateProvider {
176+
fn new(schema: SchemaRef) -> Self {
177+
Self {
178+
schema,
179+
truncate_called: Arc::new(Mutex::new(false)),
180+
}
181+
}
182+
183+
fn was_truncated(&self) -> bool {
184+
*self.truncate_called.lock().unwrap()
185+
}
186+
}
187+
188+
impl std::fmt::Debug for CaptureTruncateProvider {
189+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190+
f.debug_struct("CaptureTruncateProvider")
191+
.field("schema", &self.schema)
192+
.finish()
193+
}
194+
}
195+
196+
#[async_trait]
197+
impl TableProvider for CaptureTruncateProvider {
198+
fn as_any(&self) -> &dyn Any {
199+
self
200+
}
201+
202+
fn schema(&self) -> SchemaRef {
203+
Arc::clone(&self.schema)
204+
}
205+
206+
fn table_type(&self) -> TableType {
207+
TableType::Base
208+
}
209+
210+
async fn scan(
211+
&self,
212+
_state: &dyn Session,
213+
_projection: Option<&Vec<usize>>,
214+
_filters: &[Expr],
215+
_limit: Option<usize>,
216+
) -> Result<Arc<dyn ExecutionPlan>> {
217+
Ok(Arc::new(EmptyExec::new(Arc::clone(&self.schema))))
218+
}
219+
220+
async fn truncate(&self, _state: &dyn Session) -> Result<Arc<dyn ExecutionPlan>> {
221+
*self.truncate_called.lock().unwrap() = true;
222+
223+
Ok(Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![
224+
Field::new("count", DataType::UInt64, false),
225+
])))))
226+
}
227+
}
228+
168229
fn test_schema() -> SchemaRef {
169230
Arc::new(Schema::new(vec![
170231
Field::new("id", DataType::Int32, false),
@@ -269,6 +330,28 @@ async fn test_update_assignments() -> Result<()> {
269330
Ok(())
270331
}
271332

333+
#[tokio::test]
334+
async fn test_truncate_calls_provider() -> Result<()> {
335+
let provider = Arc::new(CaptureTruncateProvider::new(test_schema()));
336+
let config = SessionConfig::new().set(
337+
"datafusion.optimizer.max_passes",
338+
&ScalarValue::UInt64(Some(0)),
339+
);
340+
341+
let ctx = SessionContext::new_with_config(config);
342+
343+
ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
344+
345+
ctx.sql("TRUNCATE TABLE t").await?.collect().await?;
346+
347+
assert!(
348+
provider.was_truncated(),
349+
"truncate() should be called on the TableProvider"
350+
);
351+
352+
Ok(())
353+
}
354+
272355
#[tokio::test]
273356
async fn test_unsupported_table_delete() -> Result<()> {
274357
let schema = test_schema();
@@ -295,3 +378,18 @@ async fn test_unsupported_table_update() -> Result<()> {
295378
assert!(result.is_err() || result.unwrap().collect().await.is_err());
296379
Ok(())
297380
}
381+
382+
#[tokio::test]
383+
async fn test_unsupported_table_truncate() -> Result<()> {
384+
let schema = test_schema();
385+
let ctx = SessionContext::new();
386+
387+
let empty_table = datafusion::datasource::empty::EmptyTable::new(schema);
388+
ctx.register_table("empty_t", Arc::new(empty_table))?;
389+
390+
let result = ctx.sql("TRUNCATE TABLE empty_t").await;
391+
392+
assert!(result.is_err() || result.unwrap().collect().await.is_err());
393+
394+
Ok(())
395+
}

datafusion/expr/src/logical_plan/dml.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ pub enum WriteOp {
237237
Update,
238238
/// `CREATE TABLE AS SELECT` operation
239239
Ctas,
240+
/// `TRUNCATE` operation
241+
Truncate,
240242
}
241243

242244
impl WriteOp {
@@ -247,6 +249,7 @@ impl WriteOp {
247249
WriteOp::Delete => "Delete",
248250
WriteOp::Update => "Update",
249251
WriteOp::Ctas => "Ctas",
252+
WriteOp::Truncate => "Truncate",
250253
}
251254
}
252255
}

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ message DmlNode{
278278
INSERT_APPEND = 3;
279279
INSERT_OVERWRITE = 4;
280280
INSERT_REPLACE = 5;
281+
TRUNCATE = 6;
281282
}
282283
Type dml_type = 1;
283284
LogicalPlanNode input = 2;

datafusion/proto/src/generated/pbjson.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ impl From<protobuf::dml_node::Type> for WriteOp {
239239
}
240240
protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
241241
protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
242+
protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
242243
}
243244
}
244245
}

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,7 @@ impl From<&WriteOp> for protobuf::dml_node::Type {
729729
WriteOp::Delete => protobuf::dml_node::Type::Delete,
730730
WriteOp::Update => protobuf::dml_node::Type::Update,
731731
WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
732+
WriteOp::Truncate => protobuf::dml_node::Type::Truncate,
732733
}
733734
}
734735
}

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ async fn roundtrip_logical_plan_dml() -> Result<()> {
413413
"DELETE FROM T1",
414414
"UPDATE T1 SET a = 1",
415415
"CREATE TABLE T2 AS SELECT * FROM T1",
416+
"TRUNCATE TABLE T1",
416417
];
417418
for query in queries {
418419
let plan = ctx.sql(query).await?.into_optimized_plan()?;

0 commit comments

Comments
 (0)