@@ -32,8 +32,9 @@ use crate::datasource::physical_plan::{
3232use crate :: error:: Result ;
3333use crate :: execution:: context:: SessionState ;
3434use crate :: physical_plan:: insert:: { DataSink , DataSinkExec } ;
35- use crate :: physical_plan:: { DisplayAs , DisplayFormatType , Statistics } ;
36- use crate :: physical_plan:: { ExecutionPlan , SendableRecordBatchStream } ;
35+ use crate :: physical_plan:: {
36+ DisplayAs , DisplayFormatType , ExecutionPlan , SendableRecordBatchStream , Statistics ,
37+ } ;
3738
3839use arrow:: array:: RecordBatch ;
3940use arrow:: csv:: WriterBuilder ;
@@ -136,12 +137,13 @@ impl CsvFormat {
136137 /// Set true to indicate that the first line is a header.
137138 /// - default to true
138139 pub fn with_has_header ( mut self , has_header : bool ) -> Self {
139- self . options . has_header = has_header;
140+ self . options . has_header = Some ( has_header) ;
140141 self
141142 }
142143
143- /// True if the first line is a header.
144- pub fn has_header ( & self ) -> bool {
144+ /// Returns `Some(true)` if the first line is a header, `Some(false)` if
145+ /// it is not, and `None` if it is not specified.
146+ pub fn has_header ( & self ) -> Option < bool > {
145147 self . options . has_header
146148 }
147149
@@ -200,7 +202,7 @@ impl FileFormat for CsvFormat {
200202
201203 async fn infer_schema (
202204 & self ,
203- _state : & SessionState ,
205+ state : & SessionState ,
204206 store : & Arc < dyn ObjectStore > ,
205207 objects : & [ ObjectMeta ] ,
206208 ) -> Result < SchemaRef > {
@@ -211,7 +213,7 @@ impl FileFormat for CsvFormat {
211213 for object in objects {
212214 let stream = self . read_to_delimited_chunks ( store, object) . await ;
213215 let ( schema, records_read) = self
214- . infer_schema_from_stream ( records_to_read, stream)
216+ . infer_schema_from_stream ( state , records_to_read, stream)
215217 . await ?;
216218 records_to_read -= records_read;
217219 schemas. push ( schema) ;
@@ -236,13 +238,17 @@ impl FileFormat for CsvFormat {
236238
237239 async fn create_physical_plan (
238240 & self ,
239- _state : & SessionState ,
241+ state : & SessionState ,
240242 conf : FileScanConfig ,
241243 _filters : Option < & Arc < dyn PhysicalExpr > > ,
242244 ) -> Result < Arc < dyn ExecutionPlan > > {
243245 let exec = CsvExec :: new (
244246 conf,
245- self . options . has_header ,
247+ // If format options does not specify whether there is a header,
248+ // we consult configuration options.
249+ self . options
250+ . has_header
251+ . unwrap_or ( state. config_options ( ) . catalog . has_header ) ,
246252 self . options . delimiter ,
247253 self . options . quote ,
248254 self . options . escape ,
@@ -286,6 +292,7 @@ impl CsvFormat {
286292 /// number of lines that were read
287293 async fn infer_schema_from_stream (
288294 & self ,
295+ state : & SessionState ,
289296 mut records_to_read : usize ,
290297 stream : impl Stream < Item = Result < Bytes > > ,
291298 ) -> Result < ( Schema , usize ) > {
@@ -298,7 +305,13 @@ impl CsvFormat {
298305
299306 while let Some ( chunk) = stream. next ( ) . await . transpose ( ) ? {
300307 let format = arrow:: csv:: reader:: Format :: default ( )
301- . with_header ( self . options . has_header && first_chunk)
308+ . with_header (
309+ first_chunk
310+ && self
311+ . options
312+ . has_header
313+ . unwrap_or ( state. config_options ( ) . catalog . has_header ) ,
314+ )
302315 . with_delimiter ( self . options . delimiter ) ;
303316
304317 let ( Schema { fields, .. } , records_read) =
@@ -538,6 +551,7 @@ mod tests {
538551 use datafusion_common:: cast:: as_string_array;
539552 use datafusion_common:: stats:: Precision ;
540553 use datafusion_common:: { internal_err, GetExt } ;
554+ use datafusion_execution:: runtime_env:: { RuntimeConfig , RuntimeEnv } ;
541555 use datafusion_expr:: { col, lit} ;
542556
543557 use chrono:: DateTime ;
@@ -554,7 +568,8 @@ mod tests {
554568 let task_ctx = state. task_ctx ( ) ;
555569 // skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work)
556570 let projection = Some ( vec ! [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 10 , 11 , 12 ] ) ;
557- let exec = get_exec ( & state, "aggregate_test_100.csv" , projection, None ) . await ?;
571+ let exec =
572+ get_exec ( & state, "aggregate_test_100.csv" , projection, None , true ) . await ?;
558573 let stream = exec. execute ( 0 , task_ctx) ?;
559574
560575 let tt_batches: i32 = stream
@@ -582,7 +597,7 @@ mod tests {
582597 let task_ctx = session_ctx. task_ctx ( ) ;
583598 let projection = Some ( vec ! [ 0 , 1 , 2 , 3 ] ) ;
584599 let exec =
585- get_exec ( & state, "aggregate_test_100.csv" , projection, Some ( 1 ) ) . await ?;
600+ get_exec ( & state, "aggregate_test_100.csv" , projection, Some ( 1 ) , true ) . await ?;
586601 let batches = collect ( exec, task_ctx) . await ?;
587602 assert_eq ! ( 1 , batches. len( ) ) ;
588603 assert_eq ! ( 4 , batches[ 0 ] . num_columns( ) ) ;
@@ -597,7 +612,8 @@ mod tests {
597612 let state = session_ctx. state ( ) ;
598613
599614 let projection = None ;
600- let exec = get_exec ( & state, "aggregate_test_100.csv" , projection, None ) . await ?;
615+ let exec =
616+ get_exec ( & state, "aggregate_test_100.csv" , projection, None , true ) . await ?;
601617
602618 let x: Vec < String > = exec
603619 . schema ( )
@@ -633,7 +649,8 @@ mod tests {
633649 let state = session_ctx. state ( ) ;
634650 let task_ctx = session_ctx. task_ctx ( ) ;
635651 let projection = Some ( vec ! [ 0 ] ) ;
636- let exec = get_exec ( & state, "aggregate_test_100.csv" , projection, None ) . await ?;
652+ let exec =
653+ get_exec ( & state, "aggregate_test_100.csv" , projection, None , true ) . await ?;
637654
638655 let batches = collect ( exec, task_ctx) . await . expect ( "Collect batches" ) ;
639656
@@ -716,6 +733,11 @@ mod tests {
716733 async fn query_compress_data (
717734 file_compression_type : FileCompressionType ,
718735 ) -> Result < ( ) > {
736+ let runtime = Arc :: new ( RuntimeEnv :: new ( RuntimeConfig :: new ( ) ) . unwrap ( ) ) ;
737+ let mut cfg = SessionConfig :: new ( ) ;
738+ cfg. options_mut ( ) . catalog . has_header = true ;
739+ let session_state = SessionState :: new_with_config_rt ( cfg, runtime) ;
740+
719741 let integration = LocalFileSystem :: new_with_prefix ( arrow_test_data ( ) ) . unwrap ( ) ;
720742
721743 let path = Path :: from ( "csv/aggregate_test_100.csv" ) ;
@@ -757,7 +779,7 @@ mod tests {
757779 . read_to_delimited_chunks_from_stream ( compressed_stream. unwrap ( ) )
758780 . await ;
759781 let ( schema, records_read) = compressed_csv
760- . infer_schema_from_stream ( records_to_read, decoded_stream)
782+ . infer_schema_from_stream ( & session_state , records_to_read, decoded_stream)
761783 . await ?;
762784
763785 assert_eq ! ( expected, schema) ;
@@ -803,9 +825,10 @@ mod tests {
803825 file_name : & str ,
804826 projection : Option < Vec < usize > > ,
805827 limit : Option < usize > ,
828+ has_header : bool ,
806829 ) -> Result < Arc < dyn ExecutionPlan > > {
807830 let root = format ! ( "{}/csv" , crate :: test_util:: arrow_test_data( ) ) ;
808- let format = CsvFormat :: default ( ) ;
831+ let format = CsvFormat :: default ( ) . with_has_header ( has_header ) ;
809832 scan_format ( state, & format, & root, file_name, projection, limit) . await
810833 }
811834
0 commit comments