1515 */
1616package com .databricks .spark .csv
1717
18-
1918import org .apache .spark .rdd .RDD
2019import org .apache .spark .sql .{DataFrame , SQLContext }
2120import org .apache .spark .sql .types .StructType
@@ -117,12 +116,11 @@ class CsvParser extends Serializable {
117116 this
118117 }
119118
120- /** Returns a Schema RDD for the given CSV path. */
121- @ throws[RuntimeException ]
122- def csvFile (sqlContext : SQLContext , path : String ): DataFrame = {
123- val relation : CsvRelation = CsvRelation (
124- () => TextFile .withCharset(sqlContext.sparkContext, path, charset),
125- Some (path),
119+ /** Returns a csvRelation instance based on the state definition of csv parser.*/
120+ private [csv] def csvRelation (sqlContext : SQLContext , csvRDD : RDD [String ], path : Option [String ]): CsvRelation = {
121+ CsvRelation (
122+ () => csvRDD,
123+ path,
126124 useHeader,
127125 delimiter,
128126 quote,
@@ -137,27 +135,16 @@ class CsvParser extends Serializable {
137135 inferSchema,
138136 codec,
139137 nullValue)(sqlContext)
138+ }
139+ /** Returns a Schema RDD for the given CSV path. */
140+ @ throws[RuntimeException ]
141+ def csvFile (sqlContext : SQLContext , path : String ): DataFrame = {
142+ val relation : CsvRelation = csvRelation(sqlContext, TextFile .withCharset(sqlContext.sparkContext, path, charset), Some (path))
140143 sqlContext.baseRelationToDataFrame(relation)
141144 }
142145
143146 def csvRdd (sqlContext : SQLContext , csvRDD : RDD [String ]): DataFrame = {
144- val relation : CsvRelation = CsvRelation (
145- () => csvRDD,
146- None ,
147- useHeader,
148- delimiter,
149- quote,
150- escape,
151- comment,
152- parseMode,
153- parserLib,
154- ignoreLeadingWhiteSpace,
155- ignoreTrailingWhiteSpace,
156- treatEmptyValuesAsNulls,
157- schema,
158- inferSchema,
159- codec,
160- nullValue)(sqlContext)
147+ val relation : CsvRelation = csvRelation(sqlContext, csvRDD, None )
161148 sqlContext.baseRelationToDataFrame(relation)
162149 }
163150}
0 commit comments