Skip to content

Commit 965b318

Browse files
authored
feat: Protobuf serde for Json file sink (#8062)
* Protobuf serde for Json file sink * Fix tests * Fix test
1 parent 21b2af1 commit 965b318

File tree

13 files changed

+1703
-208
lines changed

13 files changed

+1703
-208
lines changed

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ impl BatchSerializer for JsonSerializer {
230230
}
231231

232232
/// Implements [`DataSink`] for writing to a Json file.
233-
struct JsonSink {
233+
pub struct JsonSink {
234234
/// Config options for writing data
235235
config: FileSinkConfig,
236236
}
@@ -258,10 +258,16 @@ impl DisplayAs for JsonSink {
258258
}
259259

260260
impl JsonSink {
261-
fn new(config: FileSinkConfig) -> Self {
261+
/// Create from config.
262+
pub fn new(config: FileSinkConfig) -> Self {
262263
Self { config }
263264
}
264265

266+
/// Retrieve the inner [`FileSinkConfig`].
267+
pub fn config(&self) -> &FileSinkConfig {
268+
&self.config
269+
}
270+
265271
async fn append_all(
266272
&self,
267273
data: SendableRecordBatchStream,

datafusion/physical-plan/src/insert.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,21 @@ impl FileSinkExec {
151151
}
152152
}
153153

154+
/// Input execution plan
155+
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
156+
&self.input
157+
}
158+
154159
/// Returns insert sink
155160
pub fn sink(&self) -> &dyn DataSink {
156161
self.sink.as_ref()
157162
}
158163

164+
/// Optional sort order for output data
165+
pub fn sort_order(&self) -> &Option<Vec<PhysicalSortRequirement>> {
166+
&self.sort_order
167+
}
168+
159169
/// Returns the metrics of the underlying [DataSink]
160170
pub fn metrics(&self) -> Option<MetricsSet> {
161171
self.sink.metrics()
@@ -170,7 +180,7 @@ impl DisplayAs for FileSinkExec {
170180
) -> std::fmt::Result {
171181
match t {
172182
DisplayFormatType::Default | DisplayFormatType::Verbose => {
173-
write!(f, "InsertExec: sink=")?;
183+
write!(f, "FileSinkExec: sink=")?;
174184
self.sink.fmt_as(t, f)
175185
}
176186
}

datafusion/proto/proto/datafusion.proto

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,9 +1130,63 @@ message PhysicalPlanNode {
11301130
SortPreservingMergeExecNode sort_preserving_merge = 21;
11311131
NestedLoopJoinExecNode nested_loop_join = 22;
11321132
AnalyzeExecNode analyze = 23;
1133+
JsonSinkExecNode json_sink = 24;
11331134
}
11341135
}
11351136

1137+
enum FileWriterMode {
1138+
APPEND = 0;
1139+
PUT = 1;
1140+
PUT_MULTIPART = 2;
1141+
}
1142+
1143+
enum CompressionTypeVariant {
1144+
GZIP = 0;
1145+
BZIP2 = 1;
1146+
XZ = 2;
1147+
ZSTD = 3;
1148+
UNCOMPRESSED = 4;
1149+
}
1150+
1151+
message PartitionColumn {
1152+
string name = 1;
1153+
ArrowType arrow_type = 2;
1154+
}
1155+
1156+
message FileTypeWriterOptions {
1157+
oneof FileType {
1158+
JsonWriterOptions json_options = 1;
1159+
}
1160+
}
1161+
1162+
message JsonWriterOptions {
1163+
CompressionTypeVariant compression = 1;
1164+
}
1165+
1166+
message FileSinkConfig {
1167+
string object_store_url = 1;
1168+
repeated PartitionedFile file_groups = 2;
1169+
repeated string table_paths = 3;
1170+
Schema output_schema = 4;
1171+
repeated PartitionColumn table_partition_cols = 5;
1172+
FileWriterMode writer_mode = 6;
1173+
bool single_file_output = 7;
1174+
bool unbounded_input = 8;
1175+
bool overwrite = 9;
1176+
FileTypeWriterOptions file_type_writer_options = 10;
1177+
}
1178+
1179+
message JsonSink {
1180+
FileSinkConfig config = 1;
1181+
}
1182+
1183+
message JsonSinkExecNode {
1184+
PhysicalPlanNode input = 1;
1185+
JsonSink sink = 2;
1186+
Schema sink_schema = 3;
1187+
PhysicalSortExprNodeCollection sort_order = 4;
1188+
}
1189+
11361190
message PhysicalExtensionNode {
11371191
bytes node = 1;
11381192
repeated PhysicalPlanNode inputs = 2;

0 commit comments

Comments
 (0)