diff --git a/.circleci/config.yml b/.circleci/config.yml index e3a5de908f6cd..383f3e351c91c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,6 +8,12 @@ defaults: &defaults TERM: dumb BUILD_SBT_CACHE: "/home/circleci/build-sbt-cache" +spark-25299-config: &spark-25299-config + machine: + image: circleci/classic:201808-01 + environment: &defaults-environment + TERM: dumb + BUILD_SBT_CACHE: "/home/circleci/build-sbt-cache" test-defaults: &test-defaults <<: *defaults @@ -23,6 +29,12 @@ all-branches-and-tags: &all-branches-and-tags tags: only: /.*/ +spark-25299-branch-only: &spark-25299-branch-only + filters: + branches: + only: + - spark-25299 + deployable-branches-and-tags: &deployable-branches-and-tags filters: tags: @@ -452,6 +464,22 @@ jobs: key: v1-maven-dependency-cache-versioned-{{ checksum "pom.xml" }} paths: ~/.m2 + run-spark-25299-benchmarks: + <<: *spark-25299-config + steps: + - *checkout-code + - attach_workspace: + at: . + - *restore-build-sbt-cache + - *link-in-build-sbt-cache + - *restore-ivy-cache + - *restore-build-binaries-cache + - *restore-home-sbt-cache + - run: + command: ./dev/run-spark-25299-benchmarks.sh -u + - store_artifacts: + path: /tmp/artifacts/ + deploy-gradle: <<: *defaults docker: @@ -512,6 +540,10 @@ workflows: requires: - build-sbt <<: *all-branches-and-tags + - run-spark-25299-benchmarks: + requires: + - build-sbt + <<: *spark-25299-branch-only - run-scala-tests: requires: - build-sbt diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala index bb389cdb39dfd..5fdb0d289b6b1 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala @@ -111,13 +111,15 @@ private[spark] class Benchmark( // The results are going to be processor specific so it is useful to include that. out.println(Benchmark.getJVMOSInfo()) out.println(Benchmark.getProcessorName()) - out.printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)", + out.printf("%-40s %14s %14s %11s %12s %13s %10s\n", name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)", "Per Row(ns)", "Relative") - out.println("-" * 96) + out.println("-" * 120) results.zip(benchmarks).foreach { case (result, benchmark) => - out.printf("%-40s %16s %12s %13s %10s\n", + out.printf("%-40s %14s %14s %11s %12s %13s %10s\n", benchmark.name, - "%5.0f / %4.0f" format (result.bestMs, result.avgMs), + "%5.0f" format result.bestMs, + "%4.0f" format result.avgMs, + "%5.0f" format result.stdevMs, "%10.1f" format result.bestRate, "%6.1f" format (1000 / result.bestRate), "%3.1fX" format (firstBest / result.bestMs)) @@ -156,9 +158,13 @@ private[spark] class Benchmark( // scalastyle:off println(s" Stopped after $i iterations, ${runTimes.sum / 1000000} ms") // scalastyle:on + assert(runTimes.nonEmpty) val best = runTimes.min val avg = runTimes.sum / runTimes.size - Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0) + val stdev = if (runTimes.size > 1) { + math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1)) + } else 0 + Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0) } } @@ -191,7 +197,7 @@ private[spark] object Benchmark { } case class Case(name: String, fn: Timer => Unit, numIters: Int) - case class Result(avgMs: Double, bestRate: Double, bestMs: Double) + case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double) /** * This should return a user helpful processor information. Getting at this depends on the OS. diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleWriterBenchmarkBase.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleWriterBenchmarkBase.scala new file mode 100644 index 0000000000000..8e6a69fb7080c --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleWriterBenchmarkBase.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{BufferedInputStream, Closeable, File, FileInputStream, FileOutputStream} +import java.util.UUID + +import org.apache.commons.io.FileUtils +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Answers.RETURNS_SMART_NULLS +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.when +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.apache.spark.{HashPartitioner, ShuffleDependency, SparkConf, TaskContext} +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager} +import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} +import org.apache.spark.serializer.{KryoSerializer, Serializer, SerializerManager} +import org.apache.spark.shuffle.IndexShuffleBlockResolver +import org.apache.spark.storage.{BlockManager, DiskBlockManager, TempShuffleBlockId} +import org.apache.spark.util.Utils + +abstract class ShuffleWriterBenchmarkBase extends BenchmarkBase { + + protected val DEFAULT_DATA_STRING_SIZE = 5 + + // This is only used in the writer constructors, so it's ok to mock + @Mock(answer = RETURNS_SMART_NULLS) protected var dependency: + ShuffleDependency[String, String, String] = _ + // This is only used in the stop() function, so we can safely mock this without affecting perf + @Mock(answer = RETURNS_SMART_NULLS) protected var taskContext: TaskContext = _ + @Mock(answer = RETURNS_SMART_NULLS) protected var rpcEnv: RpcEnv = _ + @Mock(answer = RETURNS_SMART_NULLS) protected var rpcEndpointRef: RpcEndpointRef = _ + + protected val defaultConf: SparkConf = new SparkConf(loadDefaults = false) + protected val serializer: Serializer = new KryoSerializer(defaultConf) + protected val partitioner: HashPartitioner = new HashPartitioner(10) + protected val serializerManager: SerializerManager = + new SerializerManager(serializer, defaultConf) + protected val shuffleMetrics: TaskMetrics = new TaskMetrics + + protected val tempFilesCreated: ArrayBuffer[File] = new ArrayBuffer[File] + protected val filenameToFile: mutable.Map[String, File] = new mutable.HashMap[String, File] + + class TestDiskBlockManager(tempDir: File) extends DiskBlockManager(defaultConf, false) { + override def getFile(filename: String): File = { + if (filenameToFile.contains(filename)) { + filenameToFile(filename) + } else { + val outputFile = File.createTempFile("shuffle", null, tempDir) + filenameToFile(filename) = outputFile + outputFile + } + } + + override def createTempShuffleBlock(): (TempShuffleBlockId, File) = { + var blockId = new TempShuffleBlockId(UUID.randomUUID()) + val file = getFile(blockId) + tempFilesCreated += file + (blockId, file) + } + } + + class TestBlockManager(tempDir: File, memoryManager: MemoryManager) extends BlockManager("0", + rpcEnv, + null, + serializerManager, + defaultConf, + memoryManager, + null, + null, + null, + null, + 1) { + override val diskBlockManager = new TestDiskBlockManager(tempDir) + override val remoteBlockTempFileManager = null + } + + protected var tempDir: File = _ + + protected var blockManager: BlockManager = _ + protected var blockResolver: IndexShuffleBlockResolver = _ + + protected var memoryManager: TestMemoryManager = _ + protected var taskMemoryManager: TaskMemoryManager = _ + + MockitoAnnotations.initMocks(this) + when(dependency.partitioner).thenReturn(partitioner) + when(dependency.serializer).thenReturn(serializer) + when(dependency.shuffleId).thenReturn(0) + when(taskContext.taskMetrics()).thenReturn(shuffleMetrics) + when(rpcEnv.setupEndpoint(any[String], any[RpcEndpoint])).thenReturn(rpcEndpointRef) + + def setup(): Unit = { + memoryManager = new TestMemoryManager(defaultConf) + memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES) + taskMemoryManager = new TaskMemoryManager(memoryManager, 0) + tempDir = Utils.createTempDir() + blockManager = new TestBlockManager(tempDir, memoryManager) + blockResolver = new IndexShuffleBlockResolver( + defaultConf, + blockManager) + } + + def addBenchmarkCase(benchmark: Benchmark, name: String)(func: Benchmark.Timer => Unit): Unit = { + benchmark.addTimerCase(name) { timer => + setup() + func(timer) + teardown() + } + } + + def teardown(): Unit = { + FileUtils.deleteDirectory(tempDir) + tempFilesCreated.clear() + filenameToFile.clear() + } + + protected class DataIterator (size: Int) + extends Iterator[Product2[String, String]] { + val random = new Random(123) + var count = 0 + override def hasNext: Boolean = { + count < size + } + + override def next(): Product2[String, String] = { + count+=1 + val string = random.alphanumeric.take(5).mkString + (string, string) + } + } + + + def createDataIterator(size: Int): DataIterator = { + new DataIterator(size) + } + +} diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala new file mode 100644 index 0000000000000..317cd23279ede --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import org.mockito.Mockito.when + +import org.apache.spark.{Aggregator, SparkEnv} +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.shuffle.BaseShuffleHandle +import org.apache.spark.util.Utils + +/** + * Benchmark to measure performance for aggregate primitives. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/-results.txt". + * }}} + */ +object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { + + private val shuffleHandle: BaseShuffleHandle[String, String, String] = + new BaseShuffleHandle( + shuffleId = 0, + numMaps = 1, + dependency = dependency) + + private val MIN_NUM_ITERS = 10 + private val DATA_SIZE_SMALL = 1000 + private val DATA_SIZE_LARGE = + PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/4/DEFAULT_DATA_STRING_SIZE + + def getWriter(aggregator: Option[Aggregator[String, String, String]], + sorter: Option[Ordering[String]]): SortShuffleWriter[String, String, String] = { + // we need this since SortShuffleWriter uses SparkEnv to get lots of its private vars + SparkEnv.set(new SparkEnv( + "0", + null, + serializer, + null, + serializerManager, + null, + null, + null, + blockManager, + null, + null, + null, + null, + defaultConf + )) + + if (aggregator.isEmpty && sorter.isEmpty) { + when(dependency.mapSideCombine).thenReturn(false) + } else { + when(dependency.mapSideCombine).thenReturn(false) + when(dependency.aggregator).thenReturn(aggregator) + when(dependency.keyOrdering).thenReturn(sorter) + } + + when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager) + + val shuffleWriter = new SortShuffleWriter[String, String, String]( + blockResolver, + shuffleHandle, + 0, + taskContext + ) + shuffleWriter + } + + def writeBenchmarkWithSmallDataset(): Unit = { + val size = DATA_SIZE_SMALL + val benchmark = new Benchmark("SortShuffleWriter without spills", + size, + minNumIters = MIN_NUM_ITERS, + output = output) + addBenchmarkCase(benchmark, "small dataset without spills") { timer => + val shuffleWriter = getWriter(Option.empty, Option.empty) + val dataIterator = createDataIterator(size) + try { + timer.startTiming() + shuffleWriter.write(dataIterator) + timer.stopTiming() + assert(tempFilesCreated.isEmpty) + } finally { + shuffleWriter.stop(true) + } + } + benchmark.run() + } + + def writeBenchmarkWithSpill(): Unit = { + val size = DATA_SIZE_LARGE + + val benchmark = new Benchmark("SortShuffleWriter with spills", + size, + minNumIters = MIN_NUM_ITERS, + output = output, + outputPerIteration = true) + addBenchmarkCase(benchmark, "no map side combine") { timer => + val shuffleWriter = getWriter(Option.empty, Option.empty) + val dataIterator = createDataIterator(size) + try { + timer.startTiming() + shuffleWriter.write(dataIterator) + timer.stopTiming() + assert(tempFilesCreated.length == 7) + } finally { + shuffleWriter.stop(true) + } + } + + def createCombiner(i: String): String = i + def mergeValue(i: String, j: String): String = if (Ordering.String.compare(i, j) > 0) i else j + def mergeCombiners(i: String, j: String): String = + if (Ordering.String.compare(i, j) > 0) i else j + val aggregator = + new Aggregator[String, String, String](createCombiner, mergeValue, mergeCombiners) + addBenchmarkCase(benchmark, "with map side aggregation") { timer => + val shuffleWriter = getWriter(Some(aggregator), Option.empty) + val dataIterator = createDataIterator(size) + try { + timer.startTiming() + shuffleWriter.write(dataIterator) + timer.stopTiming() + assert(tempFilesCreated.length == 7) + } finally { + shuffleWriter.stop(true) + } + } + + val sorter = Ordering.String + addBenchmarkCase(benchmark, "with map side sort") { timer => + val shuffleWriter = getWriter(Option.empty, Some(sorter)) + val dataIterator = createDataIterator(size) + try { + timer.startTiming() + shuffleWriter.write(dataIterator) + timer.stopTiming() + assert(tempFilesCreated.length == 7) + } finally { + shuffleWriter.stop(true) + } + } + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("SortShuffleWriter writer") { + writeBenchmarkWithSmallDataset() + writeBenchmarkWithSpill() + } + } +} diff --git a/dev/run-spark-25299-benchmarks.sh b/dev/run-spark-25299-benchmarks.sh new file mode 100755 index 0000000000000..2a0fe2088f219 --- /dev/null +++ b/dev/run-spark-25299-benchmarks.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Script to create a binary distribution for easy deploys of Spark. +# The distribution directory defaults to dist/ but can be overridden below. +# The distribution contains fat (assembly) jars that include the Scala library, +# so it is completely self contained. +# It does not contain source or *.class files. + +set -oue pipefail + + +function usage { + echo "Usage: $(basename $0) [-h] [-u]" + echo "" + echo "Runs the perf tests and optionally uploads the results as a comment to a PR" + echo "" + echo " -h help" + echo " -u Upload the perf results as a comment" + # Exit as error for nesting scripts + exit 1 +} + +UPLOAD=false +while getopts "hu" opt; do + case $opt in + h) + usage + exit 0;; + u) + UPLOAD=true;; + esac +done + +echo "Running SPARK-25299 benchmarks" + +SPARK_GENERATE_BENCHMARK_FILES=1 ./build/sbt "sql/test:runMain org.apache.spark.shuffle.sort.SortShuffleWriterBenchmark" + +SPARK_DIR=`pwd` + +mkdir -p /tmp/artifacts +cp $SPARK_DIR/sql/core/benchmarks/SortShuffleWriterBenchmark-results.txt /tmp/artifacts/ + +if [ "$UPLOAD" = false ]; then + exit 0 +fi + +IFS= +RESULTS="" +for benchmark_file in /tmp/artifacts/*.txt; do + RESULTS+=$(cat $benchmark_file) + RESULTS+=$'\n\n' +done + +echo $RESULTS +# Get last git message, filter out empty lines, get the last number of the first line. This is the PR number +PULL_REQUEST_NUM=$(git log -1 --pretty=%B | awk NF | awk '{print $NF}' | head -1 | sed 's/(//g' | sed 's/)//g' | sed 's/#//g') + + +USERNAME=svc-spark-25299 +PASSWORD=$SVC_SPARK_25299_PASSWORD +message='{"body": "```' +message+=$'\n' +message+=$RESULTS +message+=$'\n' +json_message=$(echo $message | awk '{printf "%s\\n", $0}') +json_message+='```", "event":"COMMENT"}' +echo "$json_message" > benchmark_results.json + +echo "Sending benchmark requests to PR $PULL_REQUEST_NUM" +curl -XPOST https://${USERNAME}:${PASSWORD}@api.github.com/repos/palantir/spark/pulls/${PULL_REQUEST_NUM}/reviews -d @benchmark_results.json +rm benchmark_results.json