|
17 | 17 |
|
18 | 18 | package org.apache.spark.sql.execution.streaming.state |
19 | 19 |
|
20 | | -import java.io.File |
| 20 | +import java.io.{File, IOException} |
| 21 | +import java.net.URI |
21 | 22 |
|
22 | 23 | import scala.collection.mutable |
23 | 24 | import scala.util.Random |
24 | 25 |
|
25 | 26 | import org.apache.hadoop.conf.Configuration |
26 | | -import org.apache.hadoop.fs.Path |
| 27 | +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} |
27 | 28 | import org.scalatest.{BeforeAndAfter, PrivateMethodTester} |
28 | 29 | import org.scalatest.concurrent.Eventually._ |
29 | 30 | import org.scalatest.time.SpanSugar._ |
@@ -455,6 +456,18 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth |
455 | 456 | } |
456 | 457 | } |
457 | 458 |
|
| 459 | + test("commit fails when rename fails") { |
| 460 | + import RenameReturnsFalseFileSystem._ |
| 461 | + val dir = scheme + "://" + Utils.createDirectory(tempDir, Random.nextString(5)).toString |
| 462 | + val conf = new Configuration() |
| 463 | + conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName) |
| 464 | + val provider = newStoreProvider(dir = dir, hadoopConf = conf) |
| 465 | + val store = provider.getStore(0) |
| 466 | + put(store, "a", 0) |
| 467 | + val e = intercept[IllegalStateException](store.commit()) |
| 468 | + assert(e.getCause.getMessage.contains("Failed to rename")) |
| 469 | + } |
| 470 | + |
458 | 471 | def getDataFromFiles( |
459 | 472 | provider: HDFSBackedStateStoreProvider, |
460 | 473 | version: Int = -1): Set[(String, Int)] = { |
@@ -524,17 +537,18 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth |
524 | 537 | def newStoreProvider( |
525 | 538 | opId: Long = Random.nextLong, |
526 | 539 | partition: Int = 0, |
527 | | - minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get |
| 540 | + minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, |
| 541 | + dir: String = Utils.createDirectory(tempDir, Random.nextString(5)).toString, |
| 542 | + hadoopConf: Configuration = new Configuration() |
528 | 543 | ): HDFSBackedStateStoreProvider = { |
529 | | - val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString |
530 | 544 | val sqlConf = new SQLConf() |
531 | 545 | sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot) |
532 | 546 | new HDFSBackedStateStoreProvider( |
533 | 547 | StateStoreId(dir, opId, partition), |
534 | 548 | keySchema, |
535 | 549 | valueSchema, |
536 | 550 | new StateStoreConf(sqlConf), |
537 | | - new Configuration()) |
| 551 | + hadoopConf) |
538 | 552 | } |
539 | 553 |
|
540 | 554 | def remove(store: StateStore, condition: String => Boolean): Unit = { |
@@ -598,3 +612,19 @@ private[state] object StateStoreSuite { |
598 | 612 | }}.toSet |
599 | 613 | } |
600 | 614 | } |
| 615 | + |
| 616 | +/** Fake FileSystem to test whether the method `fs.exists` is called during |
| 617 | + * `DataSource.resolveRelation`. |
| 618 | + */ |
| 619 | +class RenameReturnsFalseFileSystem extends RawLocalFileSystem { |
| 620 | + import RenameReturnsFalseFileSystem._ |
| 621 | + override def getUri: URI = { |
| 622 | + URI.create(s"$scheme:///") |
| 623 | + } |
| 624 | + |
| 625 | + override def rename(src: Path, dst: Path): Boolean = false |
| 626 | +} |
| 627 | + |
| 628 | +object RenameReturnsFalseFileSystem { |
| 629 | + val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs" |
| 630 | +} |
0 commit comments