From 4ebd167c45326d86ee5764b379bc4b61cb2a751d Mon Sep 17 00:00:00 2001 From: "wangguangxin.cn" Date: Thu, 22 Jun 2023 09:21:34 +0800 Subject: [PATCH 1/2] Make ColumnarBatchSerializer supports relocation so that continuous shuffle block fetching can be enabled --- cpp/core/jni/JniWrapper.cc | 2 ++ cpp/core/shuffle/LocalPartitionWriter.cc | 4 +++- cpp/core/shuffle/ShuffleWriter.h | 1 + .../vectorized/ShuffleWriterJniWrapper.java | 12 ++++++------ .../vectorized/ColumnarBatchSerializer.scala | 9 +++++++++ .../apache/spark/shuffle/ColumnarShuffleWriter.scala | 3 +++ .../main/scala/io/glutenproject/GlutenConfig.scala | 8 ++++++++ 7 files changed, 32 insertions(+), 7 deletions(-) diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index da3e3e679b58..5f619eea6839 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -754,6 +754,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper jboolean preferEvict, jlong allocatorId, jboolean writeSchema, + jboolean writeEOS, jlong firstBatchHandle, jlong taskAttemptId, jint pushBufferMaxSize, @@ -816,6 +817,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper } shuffleWriterOptions.write_schema = writeSchema; + shuffleWriterOptions.write_eos = writeEOS; shuffleWriterOptions.prefer_evict = preferEvict; if (numSubDirs > 0) { diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index d7c401c22c60..1972824aaa11 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -96,7 +96,9 @@ class PreferEvictPartitionWriter::LocalPartitionWriterInstance { } RETURN_NOT_OK(writeRecordBatchPayload(dataFileOs.get())); - RETURN_NOT_OK(writeEos(dataFileOs.get())); + if (shuffleWriter_->options().write_eos) { + RETURN_NOT_OK(writeEos(dataFileOs.get())); + } clearCache(); ARROW_ASSIGN_OR_RAISE(auto after_write, dataFileOs->Tell()); diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h index 77a26ba8fad9..ee3928339265 100644 --- a/cpp/core/shuffle/ShuffleWriter.h +++ b/cpp/core/shuffle/ShuffleWriter.h @@ -45,6 +45,7 @@ struct ShuffleWriterOptions { bool prefer_evict = true; bool write_schema = false; bool buffered_write = false; + bool write_eos = true; std::string data_file; std::string partition_writer_type = "local"; diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java index 34cd2ffec99e..5a8864d3e428 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/ShuffleWriterJniWrapper.java @@ -44,12 +44,12 @@ public long make(NativePartitioning part, long offheapPerTask, int bufferSize, int bufferCompressThreshold, String dataFile, int subDirsPerLocalDir, String localDirs, boolean preferEvict, long memoryPoolId, boolean writeSchema, - long handle, long taskAttemptId) { + boolean writeEOS, long handle, long taskAttemptId) { return nativeMake(part.getShortName(), part.getNumPartitions(), offheapPerTask, bufferSize, codec, codecBackend, bufferCompressThreshold, dataFile, subDirsPerLocalDir, - localDirs, preferEvict, memoryPoolId, writeSchema, handle, - taskAttemptId, 0, null, "local"); + localDirs, preferEvict, memoryPoolId, writeSchema, writeEOS, + handle, taskAttemptId, 0, null, "local"); } /** @@ -70,7 +70,7 @@ public long makeForRSS(NativePartitioning part, long offheapPerTask, return nativeMake(part.getShortName(), part.getNumPartitions(), offheapPerTask, bufferSize, codec, null, bufferCompressThreshold, null, 0, null, true, - memoryPoolId, false, handle, taskAttemptId, + memoryPoolId, false, true, handle, taskAttemptId, pushBufferMaxSize, pusher, partitionWriterType); } @@ -80,8 +80,8 @@ public native long nativeMake(String shortName, int numPartitions, int bufferCompressThreshold, String dataFile, int subDirsPerLocalDir, String localDirs, boolean preferEvict, long memoryPoolId, - boolean writeSchema, long handle, - long taskAttemptId, int pushBufferMaxSize, + boolean writeSchema, boolean writeEOS, + long handle, long taskAttemptId, int pushBufferMaxSize, Object pusher, String partitionWriterType); /** diff --git a/gluten-data/src/main/scala/io/glutenproject/vectorized/ColumnarBatchSerializer.scala b/gluten-data/src/main/scala/io/glutenproject/vectorized/ColumnarBatchSerializer.scala index 51d347c262fd..2a4645a1cafb 100644 --- a/gluten-data/src/main/scala/io/glutenproject/vectorized/ColumnarBatchSerializer.scala +++ b/gluten-data/src/main/scala/io/glutenproject/vectorized/ColumnarBatchSerializer.scala @@ -17,6 +17,8 @@ package io.glutenproject.vectorized +import io.glutenproject.GlutenConfig + import java.io._ import java.nio.ByteBuffer @@ -50,10 +52,17 @@ class ColumnarBatchSerializer( extends Serializer with Serializable { + // if don't write schema and EOS in shuffle writer, then the erializer supports relocation + private val supportsRelocation = + !GlutenConfig.getConf.columnarShuffleWriteSchema && + !GlutenConfig.getConf.columnarShuffleWriteEOS + /** Creates a new [[SerializerInstance]]. */ override def newInstance(): SerializerInstance = { new ColumnarBatchSerializerInstance(schema, readBatchNumRows, numOutputRows, decompressTime) } + + override def supportsRelocationOfSerializedObjects: Boolean = supportsRelocation } private class ColumnarBatchSerializerInstance( diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 65cfea512795..088637b6ba9d 100644 --- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -78,6 +78,8 @@ class ColumnarShuffleWriter[K, V]( private val writeSchema = GlutenConfig.getConf.columnarShuffleWriteSchema + private val writeEOS = GlutenConfig.getConf.columnarShuffleWriteEOS + private val jniWrapper = new ShuffleWriterJniWrapper private var nativeShuffleWriter: Long = -1L @@ -152,6 +154,7 @@ class ColumnarShuffleWriter[K, V]( }) .getNativeInstanceId, writeSchema, + writeEOS, handle, taskContext.taskAttemptId() ) diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index c3f8c62648d6..c80a8f4e9da4 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -125,6 +125,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def columnarShuffleWriteSchema: Boolean = conf.getConf(COLUMNAR_SHUFFLE_WRITE_SCHEMA_ENABLED) + def columnarShuffleWriteEOS: Boolean = conf.getConf(COLUMNAR_SHUFFLE_WRITE_EOS_ENABLED) + def columnarShuffleCodec: Option[String] = conf.getConf(COLUMNAR_SHUFFLE_CODEC) def columnarShuffleCodecBackend: Option[String] = conf @@ -705,6 +707,12 @@ object GlutenConfig { .booleanConf .createWithDefault(false) + val COLUMNAR_SHUFFLE_WRITE_EOS_ENABLED = + buildConf("spark.gluten.sql.columnar.shuffle.writeEOS") + .internal() + .booleanConf + .createWithDefault(true) + val COLUMNAR_SHUFFLE_CODEC = buildConf("spark.gluten.sql.columnar.shuffle.codec") .internal() From fc2f101fb6078cc2b51f8a7424b70819b3cdcaf3 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Fri, 21 Jul 2023 10:11:07 +0800 Subject: [PATCH 2/2] fix for prefer cache code path in shuffle Signed-off-by: Yuan Zhou --- cpp/core/shuffle/LocalPartitionWriter.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 1972824aaa11..6118d78e07b1 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -382,7 +382,9 @@ arrow::Status PreferCachePartitionWriter::stop() { } // 8. Write EOS if any payload written. if (!firstWrite) { - RETURN_NOT_OK(writeEos(dataFileOs_.get())); + if (shuffleWriter_->options().write_eos) { + RETURN_NOT_OK(writeEos(dataFileOs_.get())); + } } ARROW_ASSIGN_OR_RAISE(auto endInFinalFile, dataFileOs_->Tell());