Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ class HadoopRDD[K, V](

// Sets the thread local variable for the file's name
split.inputSplit.value match {
case fs: FileSplit => SqlNewHadoopRDD.setInputFileName(fs.getPath.toString)
case _ => SqlNewHadoopRDD.unsetInputFileName()
case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
case _ => SqlNewHadoopRDDState.unsetInputFileName()
}

// Find a function that will return the FileSystem bytes read by this thread. Do this before
Expand Down Expand Up @@ -256,7 +256,7 @@ class HadoopRDD[K, V](

override def close() {
if (reader != null) {
SqlNewHadoopRDD.unsetInputFileName()
SqlNewHadoopRDDState.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import org.apache.spark.unsafe.types.UTF8String

/**
* State for SqlNewHadoopRDD objects. This is split this way because of the package splits.
* TODO: Move/Combine this with org.apache.spark.sql.datasources.SqlNewHadoopRDD
*/
private[spark] object SqlNewHadoopRDDState {
/**
* The thread variable for the name of the current file being read. This is used by
* the InputFileName function in Spark SQL.
*/
private[this] val inputFileName: ThreadLocal[UTF8String] = new ThreadLocal[UTF8String] {
override protected def initialValue(): UTF8String = UTF8String.fromString("")
}

def getInputFileName(): UTF8String = inputFileName.get()

private[spark] def setInputFileName(file: String) = inputFileName.set(UTF8String.fromString(file))

private[spark] def unsetInputFileName(): Unit = inputFileName.remove()

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark.sql.catalyst.expressions;

import java.io.*;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
Expand All @@ -26,22 +30,50 @@
import java.util.HashSet;
import java.util.Set;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import org.apache.spark.sql.types.*;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.CalendarIntervalType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.NullType;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.types.UserDefinedType;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

import static org.apache.spark.sql.types.DataTypes.*;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.ByteType;
import static org.apache.spark.sql.types.DataTypes.DateType;
import static org.apache.spark.sql.types.DataTypes.DoubleType;
import static org.apache.spark.sql.types.DataTypes.FloatType;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.NullType;
import static org.apache.spark.sql.types.DataTypes.ShortType;
import static org.apache.spark.sql.types.DataTypes.TimestampType;
import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

/**
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
*
Expand Down Expand Up @@ -116,11 +148,6 @@ public static boolean isMutable(DataType dt) {
/** The size of this row's backing data, in bytes) */
private int sizeInBytes;

private void setNotNullAt(int i) {
assertIndexIsValid(i);
BitSetMethods.unset(baseObject, baseOffset, i);
}

/** The width of the null tracking bit set, in bytes */
private int bitSetWidthInBytes;

Expand Down Expand Up @@ -187,6 +214,12 @@ public void pointTo(byte[] buf, int sizeInBytes) {
pointTo(buf, numFields, sizeInBytes);
}


public void setNotNullAt(int i) {
assertIndexIsValid(i);
BitSetMethods.unset(baseObject, baseOffset, i);
}

@Override
public void setNullAt(int i) {
assertIndexIsValid(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.rdd.SqlNewHadoopRDD
import org.apache.spark.rdd.SqlNewHadoopRDDState
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types.{DataType, StringType}
Expand All @@ -37,13 +37,13 @@ case class InputFileName() extends LeafExpression with Nondeterministic {
override protected def initInternal(): Unit = {}

override protected def evalInternal(input: InternalRow): UTF8String = {
SqlNewHadoopRDD.getInputFileName()
SqlNewHadoopRDDState.getInputFileName()
}

override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
ev.isNull = "false"
s"final ${ctx.javaType(dataType)} ${ev.value} = " +
"org.apache.spark.rdd.SqlNewHadoopRDD.getInputFileName();"
"org.apache.spark.rdd.SqlNewHadoopRDDState.getInputFileName();"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
*/
private static final int DEFAULT_VAR_LEN_SIZE = 32;

/**
* Tries to initialize the reader for this split. Returns true if this reader supports reading
* this split and false otherwise.
*/
public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
try {
initialize(inputSplit, taskAttemptContext);
return true;
} catch (Exception e) {
return false;
}
}

/**
* Implementation of RecordReader API.
*/
Expand Down Expand Up @@ -326,6 +339,7 @@ private void decodeBinaryBatch(int col, int num) throws IOException {
} else {
rowWriters[n].write(col, bytes.array(), bytes.position(), len);
}
rows[n].setNotNullAt(col);
} else {
rows[n].setNullAt(col);
}
Expand Down
5 changes: 5 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,11 @@ private[spark] object SQLConf {
"option must be set in Hadoop Configuration. 2. This option overrides " +
"\"spark.sql.sources.outputCommitterClass\".")

val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf(
key = "spark.sql.parquet.enableUnsafeRowRecordReader",
defaultValue = Some(true),
doc = "Enables using the custom ParquetUnsafeRowRecordReader.")

val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
defaultValue = Some(false),
doc = "When true, enable filter pushdown for ORC files.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date

import scala.reflect.ClassTag

import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
Expand All @@ -28,13 +30,12 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql.{SQLConf, SQLContext}
import org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{Utils, SerializableConfiguration, ShutdownHookManager}
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
import org.apache.spark.{Partition => SparkPartition, _}

import scala.reflect.ClassTag


private[spark] class SqlNewHadoopPartition(
rddId: Int,
Expand All @@ -61,13 +62,13 @@ private[spark] class SqlNewHadoopPartition(
* changes based on [[org.apache.spark.rdd.HadoopRDD]].
*/
private[spark] class SqlNewHadoopRDD[V: ClassTag](
sc : SparkContext,
sqlContext: SQLContext,
broadcastedConf: Broadcast[SerializableConfiguration],
@transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[Void, V]],
valueClass: Class[V])
extends RDD[V](sc, Nil)
extends RDD[V](sqlContext.sparkContext, Nil)
with SparkHadoopMapReduceUtil
with Logging {

Expand Down Expand Up @@ -99,7 +100,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
protected val enableUnsafeRowParquetReader: Boolean =
sc.conf.getBoolean("spark.parquet.enableUnsafeRowRecordReader", true)
sqlContext.getConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key).toBoolean

override def getPartitions: Array[SparkPartition] = {
val conf = getConf(isDriverSide = true)
Expand All @@ -120,8 +121,8 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
}

override def compute(
theSplit: SparkPartition,
context: TaskContext): Iterator[V] = {
theSplit: SparkPartition,
context: TaskContext): Iterator[V] = {
val iter = new Iterator[V] {
val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
Expand All @@ -132,8 +133,8 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](

// Sets the thread local variable for the file's name
split.serializableHadoopSplit.value match {
case fs: FileSplit => SqlNewHadoopRDD.setInputFileName(fs.getPath.toString)
case _ => SqlNewHadoopRDD.unsetInputFileName()
case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
case _ => SqlNewHadoopRDDState.unsetInputFileName()
}

// Find a function that will return the FileSystem bytes read by this thread. Do this before
Expand Down Expand Up @@ -163,15 +164,13 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
* TODO: plumb this through a different way?
*/
if (enableUnsafeRowParquetReader &&
format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
// TODO: move this class to sql.execution and remove this.
reader = Utils.classForName(
"org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader")
.newInstance().asInstanceOf[RecordReader[Void, V]]
try {
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
} catch {
case e: Exception => reader = null
format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
val parquetReader: UnsafeRowParquetRecordReader = new UnsafeRowParquetRecordReader()
if (!parquetReader.tryInitialize(
split.serializableHadoopSplit.value, hadoopAttemptContext)) {
parquetReader.close()
} else {
reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
}
}

Expand Down Expand Up @@ -217,7 +216,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](

private def close() {
if (reader != null) {
SqlNewHadoopRDD.unsetInputFileName()
SqlNewHadoopRDDState.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
Expand All @@ -235,7 +234,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
Expand Down Expand Up @@ -276,23 +275,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
}
super.persist(storageLevel)
}
}

private[spark] object SqlNewHadoopRDD {

/**
* The thread variable for the name of the current file being read. This is used by
* the InputFileName function in Spark SQL.
*/
private[this] val inputFileName: ThreadLocal[UTF8String] = new ThreadLocal[UTF8String] {
override protected def initialValue(): UTF8String = UTF8String.fromString("")
}

def getInputFileName(): UTF8String = inputFileName.get()

private[spark] def setInputFileName(file: String) = inputFileName.set(UTF8String.fromString(file))

private[spark] def unsetInputFileName(): Unit = inputFileName.remove()

/**
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private[sql] class ParquetRelation(

Utils.withDummyCallSite(sqlContext.sparkContext) {
new SqlNewHadoopRDD(
sc = sqlContext.sparkContext,
sqlContext = sqlContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
Expand Down
Loading