Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
c7abec6
add initial bypass merge sort shuffle writer benchmarks
yifeih Feb 23, 2019
22ef648
dd unsafe shuffle writer benchmarks
yifeih Feb 25, 2019
4084e27
changes in bypassmergesort benchmarks
yifeih Feb 25, 2019
fb8266d
cleanup
yifeih Feb 26, 2019
89104e2
add circle script
yifeih Feb 26, 2019
b90b381
add this branch for testing
yifeih Feb 26, 2019
5e13dd8
fix circle attempt 1
yifeih Feb 26, 2019
845e645
checkout code
yifeih Feb 26, 2019
a68f459
add some caches?
yifeih Feb 26, 2019
757f6fe
why is it not pull caches...
yifeih Feb 26, 2019
0bcd5d9
save as artifact instead of publishing
yifeih Feb 26, 2019
26c01ec
mkdir
yifeih Feb 26, 2019
0d7a036
typo
yifeih Feb 26, 2019
3fc5331
try uploading artifacts again
yifeih Feb 26, 2019
8c33701
try print per iteration to avoid circle erroring out on idle
yifeih Feb 26, 2019
9546397
blah (#495)
yifeih Feb 27, 2019
d72ba73
make a PR comment
yifeih Feb 27, 2019
1859805
actually delete files
yifeih Feb 27, 2019
c20f0be
run benchmarks on test build branch
yifeih Feb 27, 2019
444d46a
oops forgot to enable upload
yifeih Feb 27, 2019
2322933
add sort shuffle writer benchmarks
yifeih Feb 27, 2019
da0d91c
add stdev
yifeih Feb 27, 2019
e590917
cleanup sort a bit
yifeih Feb 27, 2019
cbfdb99
fix stdev text
yifeih Feb 27, 2019
cbe38c6
fix sort shuffle
yifeih Feb 27, 2019
acdda71
initial code for read side
yifeih Feb 28, 2019
fd7a7c5
format
yifeih Feb 28, 2019
d82618b
use times and sample stdev
yifeih Feb 28, 2019
610ea1d
add assert for at least one iteration
yifeih Feb 28, 2019
295d7f3
cleanup shuffle write to use fewer mocks and single base interface
yifeih Mar 1, 2019
0c696dc
shuffle read works with transport client... needs lots of cleaning
yifeih Mar 1, 2019
323a296
test running in cicle
yifeih Mar 1, 2019
85836c2
scalastyle
yifeih Mar 1, 2019
b67d1f3
dont publish results yet
yifeih Mar 1, 2019
252963d
cleanup writer code
yifeih Mar 4, 2019
f72afb2
get only git message
yifeih Mar 4, 2019
3bcd35e
fix command to get PR number
yifeih Mar 4, 2019
d8b5d79
add SortshuffleWriterBenchmark
yifeih Mar 4, 2019
d9fb78a
writer code
yifeih Mar 4, 2019
b142951
cleanup
yifeih Mar 5, 2019
d0466b8
Merge remote-tracking branch 'origin' into yh/add-benchmarks-and-ci
yifeih Mar 5, 2019
f91dfad
fix benchmark script
yifeih Mar 5, 2019
5839b1d
use ArgumentMatchers
yifeih Mar 5, 2019
0b8c7ed
also in shufflewriterbenchmarkbase
yifeih Mar 5, 2019
d11f87f
scalastyle
yifeih Mar 5, 2019
6f2779f
add apache license
yifeih Mar 5, 2019
bbe9edc
fix some scale stuff
yifeih Mar 5, 2019
567d372
fix up tests
yifeih Mar 5, 2019
47c1938
only copy benchmarks we care about
yifeih Mar 5, 2019
e79ac28
increase size for reader again
yifeih Mar 5, 2019
c3858df
delete two writers and reader for PR
yifeih Mar 5, 2019
9d46fae
SPARK-25299: Add shuffle reader benchmarks (#506)
yifeih Mar 5, 2019
9f51758
Revert "SPARK-25299: Add shuffle reader benchmarks (#506)"
yifeih Mar 5, 2019
bcb09c5
add -e to bash script
yifeih Mar 5, 2019
25da723
blah
yifeih Mar 5, 2019
13703fa
enable upload as a PR comment and prevent running benchmarks on this …
yifeih Mar 6, 2019
e3751cd
Revert "enable upload as a PR comment and prevent running benchmarks …
yifeih Mar 6, 2019
33a1b72
try machine execution
yifeih Mar 6, 2019
fa1b96c
try uploading benchmarks (#498)
yifeih Mar 7, 2019
37cef1f
only upload results when merging into the feature branch
yifeih Mar 11, 2019
459e1b5
lock down machine image
yifeih Mar 12, 2019
4cabdbd
don't write input data to disk
yifeih Mar 13, 2019
47d2dcf
run benchmark test
yifeih Mar 13, 2019
c78e491
stop creating file cleanup threads for every block manager
yifeih Mar 13, 2019
f28b75c
use alphanumeric again
yifeih Mar 13, 2019
a85acf4
use a new random everytime
yifeih Mar 13, 2019
f26ab40
close the writers -__________-
yifeih Mar 13, 2019
103c660
delete branch and publish results as comment
yifeih Mar 13, 2019
c3e58c5
close in finally
yifeih Mar 14, 2019
96f1d0d
Merge branch 'yh/add-benchmarks-and-ci' of github.com:yifeih/spark in…
yifeih Mar 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ all-branches-and-tags: &all-branches-and-tags
tags:
only: /.*/

spark-25299-branch-only: &spark-25299-branch-only
filters:
branches:
only:
- spark-25299
- spark-25299-test-build
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to build on this branch?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I was just using this to test the comment posting, see this merge: #502


deployable-branches-and-tags: &deployable-branches-and-tags
filters:
tags:
Expand Down Expand Up @@ -452,6 +459,24 @@ jobs:
key: v1-maven-dependency-cache-versioned-{{ checksum "pom.xml" }}
paths: ~/.m2

run-spark-25299-benchmarks:
<<: *defaults
docker:
- image: palantirtechnologies/circle-spark-r:0.1.3
steps:
- *checkout-code
- attach_workspace:
at: .
- *restore-build-sbt-cache
- *link-in-build-sbt-cache
- *restore-ivy-cache
- *restore-build-binaries-cache
- *restore-home-sbt-cache
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if i'm using a superset or a subset of the useful caches or some random other combination... just sort of copied things that seemed related to sbt because that's what i'm running in this step

- run:
command: ./dev/run-spark-25299-benchmarks.sh -u
- store_artifacts:
path: /tmp/artifacts/

deploy-gradle:
<<: *defaults
docker:
Expand Down Expand Up @@ -512,6 +537,10 @@ workflows:
requires:
- build-sbt
<<: *all-branches-and-tags
- run-spark-25299-benchmarks:
requires:
- build-sbt
<<: *spark-25299-branch-only
- run-scala-tests:
requires:
- build-sbt
Expand Down
14 changes: 8 additions & 6 deletions core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,17 @@ private[spark] class Benchmark(
// The results are going to be processor specific so it is useful to include that.
out.println(Benchmark.getJVMOSInfo())
out.println(Benchmark.getProcessorName())
out.printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
"Per Row(ns)", "Relative")
out.printf("%-40s %16s %12s %13s %10s %16s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
"Per Row(ns)", "Relative", "Stdev (ms)")
out.println("-" * 96)
results.zip(benchmarks).foreach { case (result, benchmark) =>
out.printf("%-40s %16s %12s %13s %10s\n",
out.printf("%-40s %16s %12s %13s %10s %16s\n",
benchmark.name,
"%5.0f / %4.0f" format (result.bestMs, result.avgMs),
"%10.1f" format result.bestRate,
"%6.1f" format (1000 / result.bestRate),
"%3.1fX" format (firstBest / result.bestMs))
"%3.1fX" format (firstBest / result.bestMs),
"%5.0f" format result.stdevMs)
}
out.println
// scalastyle:on
Expand Down Expand Up @@ -158,7 +159,8 @@ private[spark] class Benchmark(
// scalastyle:on
val best = runTimes.min
val avg = runTimes.sum / runTimes.size
Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0)
val stdev = math.sqrt(runTimes.map(time => math.pow(time - avg, 2)).sum / runTimes.size)
Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0)
}
}

Expand Down Expand Up @@ -191,7 +193,7 @@ private[spark] object Benchmark {
}

case class Case(name: String, fn: Timer => Unit, numIters: Int)
case class Result(avgMs: Double, bestRate: Double, bestMs: Double)
case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double)

/**
* This should return a user helpful processor information. Getting at this depends on the OS.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort

import java.io.File
import java.util.UUID

import org.apache.commons.io.FileUtils
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.Matchers.{any, anyInt}
import org.mockito.Mockito.{doAnswer, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import scala.collection.mutable
import scala.util.Random

import org.apache.spark.{HashPartitioner, ShuffleDependency, SparkConf, TaskContext}
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter, TempShuffleBlockId}
import org.apache.spark.util.Utils

/**
* Benchmark to measure performance for aggregate primitives.
* {{{
* To run this benchmark:
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/<this class>-results.txt".
* }}}
*/
object BypassMergeSortShuffleWriterBenchmark extends BenchmarkBase {

@Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = _
@Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: DiskBlockManager = _
@Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _
@Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _
@Mock(answer = RETURNS_SMART_NULLS) private var dependency:
ShuffleDependency[String, String, String] = _

private var tempDir: File = _
private var shuffleHandle: BypassMergeSortShuffleHandle[String, String] = _
private val blockIdToFileMap: mutable.Map[BlockId, File] = new mutable.HashMap[BlockId, File]
private val partitioner: HashPartitioner = new HashPartitioner(10)
private val defaultConf: SparkConf = new SparkConf(loadDefaults = false)
private val javaSerializer: JavaSerializer = new JavaSerializer(defaultConf)

private val MIN_NUM_ITERS = 10

def setup(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = {
MockitoAnnotations.initMocks(this)
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.file.transferTo", String.valueOf(transferTo))
conf.set("spark.shuffle.file.buffer", "32k")

if (shuffleHandle == null) {
shuffleHandle = new BypassMergeSortShuffleHandle[String, String](
shuffleId = 0,
numMaps = 1,
dependency = dependency
)
}

val taskMetrics = new TaskMetrics
when(dependency.partitioner).thenReturn(partitioner)
when(dependency.serializer).thenReturn(javaSerializer)
when(dependency.shuffleId).thenReturn(0)

// Create the temporary directory to write local shuffle and temp files
tempDir = Utils.createTempDir()
val outputFile = File.createTempFile("shuffle", null, tempDir)
// Final mapper data file output
when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile)

// Create the temporary writers (backed by files), one for each partition.
when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
when(diskBlockManager.createTempShuffleBlock()).thenAnswer(
(invocation: InvocationOnMock) => {
val blockId = new TempShuffleBlockId(UUID.randomUUID)
val file = new File(tempDir, blockId.name)
blockIdToFileMap.put(blockId, file)
(blockId, file)
})
when(blockManager.getDiskWriter(
any[BlockId],
any[File],
any[SerializerInstance],
anyInt(),
any[ShuffleWriteMetrics]
)).thenAnswer(new Answer[DiskBlockObjectWriter] {
override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = {
val args = invocation.getArguments
val manager = new SerializerManager(javaSerializer, conf)
new DiskBlockObjectWriter(
args(1).asInstanceOf[File],
manager,
args(2).asInstanceOf[SerializerInstance],
args(3).asInstanceOf[Int],
syncWrites = false,
args(4).asInstanceOf[ShuffleWriteMetrics],
blockId = args(0).asInstanceOf[BlockId]
)
}
})

// writing the index file
doAnswer(new Answer[Void] {
def answer(invocationOnMock: InvocationOnMock): Void = {
val tmp: File = invocationOnMock.getArguments()(3).asInstanceOf[File]
if (tmp != null) {
outputFile.delete
tmp.renameTo(outputFile)
}
null
}
}).when(blockResolver)
.writeIndexFileAndCommit(anyInt, anyInt, any(classOf[Array[Long]]), any(classOf[File]))

val shuffleWriter = new BypassMergeSortShuffleWriter[String, String](
blockManager,
blockResolver,
shuffleHandle,
0,
conf,
taskMetrics.shuffleWriteMetrics
)

shuffleWriter
}

def cleanupTempFiles(): Unit = {
FileUtils.deleteDirectory(tempDir)
}

def writeBenchmarkWithLargeDataset(): Unit = {
val size = 10000000
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make these constants, either in the class or in a companion object.

val random = new Random(123)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use a shared Random instance across the whole test.

val data = (1 to size).map { i => {
val x = random.alphanumeric.take(5).mkString
Tuple2(x, x)
} }.toArray
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be factored out as it's used in both benchmarks

val benchmark = new Benchmark(
"BypassMergeSortShuffleWrite (with spill) " + size,
size,
minNumIters = MIN_NUM_ITERS,
output = output)
benchmark.addTimerCase("without transferTo") { timer =>
val shuffleWriter = setup(false)
timer.startTiming()
shuffleWriter.write(data.iterator)
timer.stopTiming()
cleanupTempFiles()
}
benchmark.addTimerCase("with transferTo") { timer =>
val shuffleWriter = setup(true)
timer.startTiming()
shuffleWriter.write(data.iterator)
timer.stopTiming()
cleanupTempFiles()
}
benchmark.run()
}

def writeBenchmarkWithSmallDataset(): Unit = {
val size = 10000
val random = new Random(123)
val data = (1 to size).map { i => {
val x = random.alphanumeric.take(5).mkString
Tuple2(x, x)
} }.toArray
val benchmark = new Benchmark("BypassMergeSortShuffleWrite (in memory buffer) " + size,
size,
minNumIters = MIN_NUM_ITERS,
output = output)
benchmark.addTimerCase("small dataset without spills on disk") { timer =>
val shuffleWriter = setup(false)
timer.startTiming()
shuffleWriter.write(data.iterator)
timer.stopTiming()
cleanupTempFiles()
}
benchmark.run()
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("BypassMergeSortShuffleWriter write") {
writeBenchmarkWithSmallDataset()
writeBenchmarkWithLargeDataset()
}
}
}
Loading