Skip to content

Commit d5db95b

Browse files
committed
[HOTFIX][Streaming][MLlib] use temp folder for checkpoint
or Jenkins will complain about no Apache header in checkpoint files. tdas rxin Author: Xiangrui Meng <meng@databricks.com> Closes #2046 from mengxr/tmp-checkpoint and squashes the following commits: 0d3ec73 [Xiangrui Meng] remove ssc.stop 9797843 [Xiangrui Meng] change checkpointDir to lazy val 89964ab [Xiangrui Meng] use temp folder for checkpoint (cherry picked from commit fce5c0f) Signed-off-by: Xiangrui Meng <meng@databricks.com>
1 parent 148e45b commit d5db95b

2 files changed

Lines changed: 11 additions & 12 deletions

File tree

mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ class StreamingLinearRegressionSuite extends FunSuite with TestSuiteBase {
4949

5050
// Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data
5151
test("parameter accuracy") {
52-
5352
// create model
5453
val model = new StreamingLinearRegressionWithSGD()
5554
.setInitialWeights(Vectors.dense(0.0, 0.0))
@@ -82,7 +81,6 @@ class StreamingLinearRegressionSuite extends FunSuite with TestSuiteBase {
8281

8382
// Test that parameter estimates improve when learning Y = 10*X1 on streaming data
8483
test("parameter convergence") {
85-
8684
// create model
8785
val model = new StreamingLinearRegressionWithSGD()
8886
.setInitialWeights(Vectors.dense(0.0))
@@ -113,12 +111,10 @@ class StreamingLinearRegressionSuite extends FunSuite with TestSuiteBase {
113111
assert(deltas.forall(x => (x._1 - x._2) <= 0.1))
114112
// check that error shrunk on at least 2 batches
115113
assert(deltas.map(x => if ((x._1 - x._2) < 0) 1 else 0).sum > 1)
116-
117114
}
118115

119116
// Test predictions on a stream
120117
test("predictions") {
121-
122118
// create model initialized with true weights
123119
val model = new StreamingLinearRegressionWithSGD()
124120
.setInitialWeights(Vectors.dense(10.0, 10.0))
@@ -142,7 +138,5 @@ class StreamingLinearRegressionSuite extends FunSuite with TestSuiteBase {
142138
// compute the mean absolute error and check that it's always less than 0.1
143139
val errors = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints)
144140
assert(errors.forall(x => x <= 0.1))
145-
146141
}
147-
148142
}

streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717

1818
package org.apache.spark.streaming
1919

20-
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
21-
import org.apache.spark.streaming.util.ManualClock
20+
import java.io.{ObjectInputStream, IOException}
2221

2322
import scala.collection.mutable.ArrayBuffer
2423
import scala.collection.mutable.SynchronizedBuffer
2524
import scala.reflect.ClassTag
2625

27-
import java.io.{ObjectInputStream, IOException}
28-
2926
import org.scalatest.{BeforeAndAfter, FunSuite}
27+
import com.google.common.io.Files
3028

31-
import org.apache.spark.{SparkContext, SparkConf, Logging}
29+
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
30+
import org.apache.spark.streaming.util.ManualClock
31+
import org.apache.spark.{SparkConf, Logging}
3232
import org.apache.spark.rdd.RDD
3333

3434
/**
@@ -119,7 +119,12 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
119119
def batchDuration = Seconds(1)
120120

121121
// Directory where the checkpoint data will be saved
122-
def checkpointDir = "checkpoint"
122+
lazy val checkpointDir = {
123+
val dir = Files.createTempDir()
124+
logDebug(s"checkpointDir: $dir")
125+
dir.deleteOnExit()
126+
dir.toString
127+
}
123128

124129
// Number of partitions of the input parallel collections created for testing
125130
def numInputPartitions = 2

0 commit comments

Comments
 (0)