Skip to content

Commit 22e3433

Browse files
HeartSaVioRcloud-fan
authored andcommitted
[SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory
### What changes were proposed in this pull request? This patch fixes the missed spot - the test initializes FileStreamSinkLog with its "output" directory instead of "metadata" directory, hence the verification against sink log was no-op. ### Why are the changes needed? Without the fix, the verification against sink log was no-op. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Checked with debugger in test, and verified `allFiles()` returns non-zero entries. (It returned zero entry, as there's no metadata.) Closes #28930 from HeartSaVioR/SPARK-29999-FOLLOWUP-fix-test. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 5472170) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 503e56a commit 22e3433

File tree

2 files changed

+17
-12
lines changed

2 files changed

+17
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ object FileStreamSink extends Logging {
4545
val hdfsPath = new Path(singlePath)
4646
val fs = hdfsPath.getFileSystem(hadoopConf)
4747
if (fs.isDirectory(hdfsPath)) {
48-
val metadataPath = new Path(hdfsPath, metadataDir)
49-
checkEscapedMetadataPath(fs, metadataPath, sqlConf)
48+
val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
5049
fs.exists(metadataPath)
5150
} else {
5251
false
@@ -55,6 +54,12 @@ object FileStreamSink extends Logging {
5554
}
5655
}
5756

57+
def getMetadataLogPath(fs: FileSystem, path: Path, sqlConf: SQLConf): Path = {
58+
val metadataDir = new Path(path, FileStreamSink.metadataDir)
59+
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf)
60+
metadataDir
61+
}
62+
5863
def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = {
5964
if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
6065
&& StreamExecution.containsSpecialCharsInPath(metadataPath)) {
@@ -125,14 +130,12 @@ class FileStreamSink(
125130
partitionColumnNames: Seq[String],
126131
options: Map[String, String]) extends Sink with Logging {
127132

133+
import FileStreamSink._
134+
128135
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
129136
private val basePath = new Path(path)
130-
private val logPath = {
131-
val metadataDir = new Path(basePath, FileStreamSink.metadataDir)
132-
val fs = metadataDir.getFileSystem(hadoopConf)
133-
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf)
134-
metadataDir
135-
}
137+
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
138+
sparkSession.sessionState.conf)
136139
private val fileLog =
137140
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)
138141

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -555,10 +555,12 @@ abstract class FileStreamSinkSuite extends StreamTest {
555555
}
556556
}
557557

558-
val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
559-
spark.sessionState.newHadoopConf())
560-
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
561-
outputDir.getCanonicalPath)
558+
val outputDirPath = new Path(outputDir.getCanonicalPath)
559+
val hadoopConf = spark.sessionState.newHadoopConf()
560+
val fs = outputDirPath.getFileSystem(hadoopConf)
561+
val logPath = FileStreamSink.getMetadataLogPath(fs, outputDirPath, conf)
562+
563+
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString)
562564

563565
val allFiles = sinkLog.allFiles()
564566
// only files from non-empty partition should be logged

0 commit comments

Comments
 (0)