Skip to content

Commit bb8b691

Browse files
xupefeidongjoon-hyun
authored andcommitted
[SPARK-48139][CONNECT][TESTS] Try stabilising multi-thread tests in CI
### What changes were proposed in this pull request? This PR tries to stabilise flaky tests which involve thread pools. Some tests are failing due to the thread pool having 2 threads instead of 3 or 4. ### Why are the changes needed? To let CI pass. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI will tell. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48622 from xupefei/ci-threadpool. Authored-by: Paddy Xu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent e8c7963 commit bb8b691

3 files changed

Lines changed: 7 additions & 8 deletions

File tree

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.sql
1818

19-
import java.util.concurrent.ForkJoinPool
19+
import java.util.concurrent.Executors
2020

2121
import scala.collection.mutable
2222
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
@@ -137,15 +137,14 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
137137
assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
138138
}
139139

140-
// TODO(SPARK-48139): Re-enable `SparkSessionE2ESuite.interrupt tag`
141-
ignore("interrupt tag") {
140+
test("interrupt tag") {
142141
val session = spark
143142
import session.implicits._
144143

145144
// global ExecutionContext has only 2 threads in Apache Spark CI
146145
// create own thread pool for four Futures used in this test
147146
val numThreads = 4
148-
val fpool = new ForkJoinPool(numThreads)
147+
val fpool = Executors.newFixedThreadPool(numThreads)
149148
val executionContext = ExecutionContext.fromExecutorService(fpool)
150149

151150
val q1 = Future {

core/src/test/scala/org/apache/spark/JobCancellationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark
1919

20-
import java.util.concurrent.{Semaphore, TimeUnit}
20+
import java.util.concurrent.{Executors, Semaphore, TimeUnit}
2121
import java.util.concurrent.atomic.AtomicInteger
2222

2323
import scala.collection.mutable.ArrayBuffer
@@ -302,7 +302,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
302302
// global ExecutionContext has only 2 threads in Apache Spark CI
303303
// create own thread pool for four Futures used in this test
304304
val numThreads = 4
305-
val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool", numThreads)
305+
val fpool = Executors.newFixedThreadPool(numThreads)
306306
val executionContext = ExecutionContext.fromExecutorService(fpool)
307307

308308
try {

sql/core/src/test/scala/org/apache/spark/sql/SparkSessionJobTaggingAndCancellationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql
1919

20-
import java.util.concurrent.{ConcurrentHashMap, Semaphore, TimeUnit}
20+
import java.util.concurrent.{ConcurrentHashMap, Executors, Semaphore, TimeUnit}
2121
import java.util.concurrent.atomic.AtomicInteger
2222

2323
import scala.concurrent.{ExecutionContext, Future}
@@ -121,7 +121,7 @@ class SparkSessionJobTaggingAndCancellationSuite
121121
// global ExecutionContext has only 2 threads in Apache Spark CI
122122
// create own thread pool for four Futures used in this test
123123
val numThreads = 3
124-
val fpool = ThreadUtils.newForkJoinPool("job-tags-test-thread-pool", numThreads)
124+
val fpool = Executors.newFixedThreadPool(numThreads)
125125
val executionContext = ExecutionContext.fromExecutorService(fpool)
126126

127127
try {

0 commit comments

Comments
 (0)