1515 * limitations under the License.
1616 */
1717
18- package org .apache .spark .sql
19- package parquet
18+ package org .apache .spark .sql .parquet
2019
21- import _root_ . parquet .io .InvalidRecordException
22- import _root_ . parquet .schema .MessageType
23- import _root_ . parquet .hadoop .{ParquetOutputFormat , ParquetInputFormat }
24- import _root_ . parquet .hadoop .util .ContextUtil
20+ import parquet .io .InvalidRecordException
21+ import parquet .schema .MessageType
22+ import parquet .hadoop .{ParquetOutputFormat , ParquetInputFormat }
23+ import parquet .hadoop .util .ContextUtil
2524
2625import org .apache .spark .rdd .RDD
27- import org .apache .spark .api .java .JavaPairRDD
28- import org .apache .spark .SparkContext
26+ import org .apache .spark .{TaskContext , SerializableWritable , SparkContext }
2927import org .apache .spark .sql .catalyst .expressions .{Row , Attribute , Expression }
3028import org .apache .spark .sql .execution .{SparkPlan , UnaryNode , LeafNode }
3129
32- import org .apache .hadoop .mapreduce .Job
30+ import org .apache .hadoop .mapreduce .lib .output .{FileOutputFormat => NewFileOutputFormat }
31+ import org .apache .hadoop .mapreduce ._
3332import org .apache .hadoop .conf .Configuration
3433import org .apache .hadoop .fs .Path
3534
3635import java .io .IOException
36+ import java .text .SimpleDateFormat
37+ import java .util .Date
3738
3839/**
3940 * Parquet table scan operator. Imports the file that backs the given
4041 * [[ParquetRelation ]] as a RDD[Row].
4142 */
4243case class ParquetTableScan (
43- output : Seq [Attribute ],
44- relation : ParquetRelation ,
45- columnPruningPred : Option [Expression ])(
44+ @ transient output : Seq [Attribute ],
45+ @ transient relation : ParquetRelation ,
46+ @ transient columnPruningPred : Option [Expression ])(
4647 @ transient val sc : SparkContext )
4748 extends LeafNode {
4849
@@ -56,12 +57,18 @@ case class ParquetTableScan(
5657 RowReadSupport .PARQUET_ROW_REQUESTED_SCHEMA ,
5758 ParquetTypesConverter .convertFromAttributes(output).toString)
5859 // TODO: think about adding record filters
60+ /* Comments regarding record filters: it would be nice to push down as much filtering
61+ to Parquet as possible. However, currently it seems we cannot pass enough information
62+ to materialize an (arbitrary) Catalyst [[Predicate]] inside Parquet's
63+ ``FilteredRecordReader`` (via Configuration, for example). Simple
64+ filter-rows-by-column-values however should be supported.
65+ */
5966 sc.newAPIHadoopFile(
6067 relation.path,
6168 classOf [ParquetInputFormat [Row ]],
6269 classOf [Void ], classOf [Row ],
6370 conf)
64- .map(_._2)
71+ .map(_._2)
6572 }
6673
6774 /**
@@ -72,7 +79,7 @@ case class ParquetTableScan(
7279 */
7380 def pruneColumns (prunedAttributes : Seq [Attribute ]): ParquetTableScan = {
7481 val success = validateProjection(prunedAttributes)
75- if (success) {
82+ if (success) {
7683 ParquetTableScan (prunedAttributes, relation, columnPruningPred)(sc)
7784 } else {
7885 sys.error(" Warning: Could not validate Parquet schema projection in pruneColumns" )
@@ -102,10 +109,10 @@ case class ParquetTableScan(
102109}
103110
104111case class InsertIntoParquetTable (
105- relation : ParquetRelation ,
106- child : SparkPlan )(
112+ @ transient relation : ParquetRelation ,
113+ @ transient child : SparkPlan )(
107114 @ transient val sc : SparkContext )
108- extends UnaryNode {
115+ extends UnaryNode with SparkHadoopMapReduceUtil {
109116
110117 /**
111118 * Inserts all the rows in the Parquet file. Note that OVERWRITE is implicit, since
@@ -142,18 +149,64 @@ case class InsertIntoParquetTable(
142149 s " Unable to clear output directory ${fspath.toString} prior "
143150 + s " to InsertIntoParquetTable: \n ${e.toString}" )
144151 }
145-
146- JavaPairRDD .fromRDD(childRdd.map(Tuple2 (null , _))).saveAsNewAPIHadoopFile(
147- relation.path.toString,
148- classOf [Void ],
149- classOf [ParquetRelation .RowType ],
150- classOf [_root_.parquet.hadoop.ParquetOutputFormat [ParquetRelation .RowType ]],
151- conf)
152+ saveAsHadoopFile(childRdd, relation.path.toString, conf)
152153
153154 // We return the child RDD to allow chaining (alternatively, one could return nothing).
154155 childRdd
155156 }
156157
157158 override def output = child.output
159+
160+ // based on ``saveAsNewAPIHadoopFile`` in [[PairRDDFunctions]]
161+ // TODO: Maybe PairRDDFunctions should use Product2 instead of Tuple2?
162+ // .. then we could use the default one and could use [[MutablePair]]
163+ // instead of ``Tuple2``
164+ private def saveAsHadoopFile (
165+ rdd : RDD [Row ],
166+ path : String ,
167+ conf : Configuration ) {
168+ val job = new Job (conf)
169+ val keyType = classOf [Void ]
170+ val outputFormatType = classOf [parquet.hadoop.ParquetOutputFormat [Row ]]
171+ job.setOutputKeyClass(keyType)
172+ job.setOutputValueClass(classOf [Row ])
173+ val wrappedConf = new SerializableWritable (job.getConfiguration)
174+ NewFileOutputFormat .setOutputPath(job, new Path (path))
175+ val formatter = new SimpleDateFormat (" yyyyMMddHHmm" )
176+ val jobtrackerID = formatter.format(new Date ())
177+ val stageId = sc.newRddId()
178+
179+ def writeShard (context : TaskContext , iter : Iterator [Row ]): Int = {
180+ // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
181+ // around by taking a mod. We expect that no task will be attempted 2 billion times.
182+ val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
183+ /* "reduce task" <split #> <attempt # = spark task #> */
184+ val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false , context.partitionId,
185+ attemptNumber)
186+ val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
187+ val format = outputFormatType.newInstance
188+ val committer = format.getOutputCommitter(hadoopContext)
189+ committer.setupTask(hadoopContext)
190+ val writer = format.getRecordWriter(hadoopContext)
191+ while (iter.hasNext) {
192+ val row = iter.next()
193+ writer.write(null , row)
194+ }
195+ writer.close(hadoopContext)
196+ committer.commitTask(hadoopContext)
197+ return 1
198+ }
199+ val jobFormat = outputFormatType.newInstance
200+ /* apparently we need a TaskAttemptID to construct an OutputCommitter;
201+ * however we're only going to use this local OutputCommitter for
202+ * setupJob/commitJob, so we just use a dummy "map" task.
203+ */
204+ val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true , 0 , 0 )
205+ val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
206+ val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
207+ jobCommitter.setupJob(jobTaskContext)
208+ sc.runJob(rdd, writeShard _)
209+ jobCommitter.commitJob(jobTaskContext)
210+ }
158211}
159212
0 commit comments