Skip to content

Commit 55d238f

Browse files
committed
[SPARK-30946][SS] Serde entry with UnsafeRow on FileStream(Source/Sink)Log with LZ4 compression
1 parent 5866bc7 commit 55d238f

8 files changed

Lines changed: 246 additions & 97 deletions

File tree

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,11 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
135135
private[this] val defaultSeed: Int = 0x9747b28c // LZ4BlockOutputStream.DEFAULT_SEED
136136

137137
override def compressedOutputStream(s: OutputStream): OutputStream = {
138+
compressedOutputStream(s, syncFlush = false)
139+
}
140+
141+
def compressedOutputStream(s: OutputStream, syncFlush: Boolean): OutputStream = {
138142
val blockSize = conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt
139-
val syncFlush = false
140143
new LZ4BlockOutputStream(
141144
s,
142145
blockSize,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20-
import java.io.{InputStream, IOException, OutputStream}
20+
import java.io.{DataInputStream, DataOutputStream, InputStream, IOException, OutputStream}
2121
import java.nio.charset.StandardCharsets.UTF_8
2222

23+
import scala.collection.mutable
2324
import scala.io.{Source => IOSource}
2425
import scala.reflect.ClassTag
2526

27+
import com.google.common.io.ByteStreams
2628
import org.apache.hadoop.fs.Path
2729
import org.json4s.NoTypeHints
2830
import org.json4s.jackson.Serialization
2931

32+
import org.apache.spark.io.LZ4CompressionCodec
3033
import org.apache.spark.sql.SparkSession
34+
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
3135

3236
/**
3337
* An abstract class for compactible metadata logs. It will write one log file for each batch.
@@ -105,6 +109,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
105109
interval
106110
}
107111

112+
private val sparkConf = sparkSession.sparkContext.getConf
113+
108114
/**
109115
* Filter out the obsolete logs.
110116
*/
@@ -131,24 +137,89 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
131137
}
132138
}
133139

140+
def dataToUnsafeRow(data: T): UnsafeRow
141+
def unsafeRowToData(row: UnsafeRow): T
142+
def numFieldsForUnsafeRow: Int
143+
134144
override def serialize(logData: Array[T], out: OutputStream): Unit = {
135145
// called inside a try-finally where the underlying stream is closed in the caller
136146
out.write(("v" + metadataLogVersion).getBytes(UTF_8))
147+
metadataLogVersion match {
148+
case 1 => serializeToV1(out, logData)
149+
case 2 => serializeToV2(out, logData)
150+
case _ =>
151+
throw new IllegalStateException(s"UnsupportedLogVersion: unknown log version is provided" +
152+
s", v$metadataLogVersion.")
153+
}
154+
}
155+
156+
private def serializeToV1(out: OutputStream, logData: Array[T]): Unit = {
137157
logData.foreach { data =>
138158
out.write('\n')
139159
out.write(Serialization.write(data).getBytes(UTF_8))
140160
}
141161
}
142162

163+
private def serializeToV2(out: OutputStream, logData: Array[T]): Unit = {
164+
out.write('\n')
165+
val dos = compressStream(out)
166+
logData.foreach { data =>
167+
val row = dataToUnsafeRow(data)
168+
val rowBytes = row.getBytes
169+
dos.writeInt(rowBytes.size)
170+
dos.write(rowBytes)
171+
}
172+
dos.writeInt(-1)
173+
dos.flush()
174+
}
175+
143176
override def deserialize(in: InputStream): Array[T] = {
144-
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
145-
if (!lines.hasNext) {
177+
val line = readLine(in)
178+
if (line == null || line.isEmpty) {
146179
throw new IllegalStateException("Incomplete log file")
147180
}
148-
validateVersion(lines.next(), metadataLogVersion)
181+
182+
val version = parseVersion(line)
183+
version match {
184+
case 1 if version <= metadataLogVersion => deserializeFromV1(in)
185+
case 2 if version <= metadataLogVersion => deserializeFromV2(in)
186+
case version =>
187+
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
188+
s"is v${metadataLogVersion}, but encountered v$version. The log file was produced " +
189+
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
190+
}
191+
}
192+
193+
private def deserializeFromV1(in: InputStream): Array[T] = {
194+
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
149195
lines.map(Serialization.read[T]).toArray
150196
}
151197

198+
private def deserializeFromV2(in: InputStream): Array[T] = {
199+
val list = new scala.collection.mutable.ArrayBuffer[T]
200+
201+
val dis = decompressStream(in)
202+
var eof = false
203+
204+
while (!eof) {
205+
val size = dis.readInt()
206+
if (size == -1) {
207+
eof = true
208+
} else if (size < 0) {
209+
throw new IOException(
210+
s"Error to deserialize file: size cannot be $size")
211+
} else {
212+
val rowBuffer = new Array[Byte](size)
213+
ByteStreams.readFully(dis, rowBuffer, 0, size)
214+
val row = new UnsafeRow(numFieldsForUnsafeRow)
215+
row.pointTo(rowBuffer, size)
216+
list += unsafeRowToData(row)
217+
}
218+
}
219+
220+
list.toArray
221+
}
222+
152223
override def add(batchId: Long, logs: Array[T]): Boolean = {
153224
val batchAdded =
154225
if (isCompactionBatch(batchId, compactInterval)) {
@@ -264,6 +335,33 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
264335
}
265336
}
266337
}
338+
339+
private def readLine(in: InputStream): String = {
340+
val line = new mutable.ArrayBuffer[Byte]()
341+
var eol = false
342+
while (!eol) {
343+
val b = in.read()
344+
if (b == -1 || b == '\n') {
345+
eol = true
346+
} else {
347+
line += b.toByte
348+
}
349+
}
350+
351+
new String(line.toArray, UTF_8)
352+
}
353+
354+
private def compressStream(outputStream: OutputStream): DataOutputStream = {
355+
// set syncFlush to true since we don't call close for compressed stream but call flush instead
356+
val compressed = new LZ4CompressionCodec(sparkConf)
357+
.compressedOutputStream(outputStream, syncFlush = true)
358+
new DataOutputStream(compressed)
359+
}
360+
361+
private def decompressStream(inputStream: InputStream): DataInputStream = {
362+
val compressed = new LZ4CompressionCodec(sparkConf).compressedInputStream(inputStream)
363+
new DataInputStream(compressed)
364+
}
267365
}
268366

269367
object CompactibleFileStreamLog {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,24 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20+
import java.io.{DataInputStream, DataOutputStream, InputStream, IOException, OutputStream}
2021
import java.net.URI
22+
import java.nio.charset.StandardCharsets.UTF_8
2123

24+
import scala.collection.mutable
25+
import scala.io.{Source => IOSource}
26+
27+
import com.google.common.io.ByteStreams
2228
import org.apache.hadoop.fs.{FileStatus, Path}
2329
import org.json4s.NoTypeHints
2430
import org.json4s.jackson.Serialization
2531

32+
import org.apache.spark.io.LZ4CompressionCodec
2633
import org.apache.spark.sql.SparkSession
34+
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
2735
import org.apache.spark.sql.internal.SQLConf
36+
import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType, StringType, StructField, StructType}
37+
import org.apache.spark.unsafe.types.UTF8String
2838

2939
/**
3040
* The status of a file outputted by [[FileStreamSink]]. A file is visible only if it appears in
@@ -66,6 +76,41 @@ object SinkFileStatus {
6676
}
6777
}
6878

79+
object SinkFileStatusV2 {
80+
val SCHEMA = new StructType(
81+
Array(
82+
StructField("path", StringType),
83+
StructField("size", LongType),
84+
StructField("isDir", BooleanType),
85+
StructField("modificationTime", LongType),
86+
StructField("blockReplication", IntegerType),
87+
StructField("blockSize", LongType),
88+
StructField("action", StringType)
89+
)
90+
)
91+
92+
val PROJ_UNSAFE_ROW = UnsafeProjection.create(SCHEMA.fields.map(_.dataType))
93+
94+
def fromRow(row: UnsafeRow): SinkFileStatus = {
95+
SinkFileStatus(
96+
row.getString(0),
97+
row.getLong(1),
98+
row.getBoolean(2),
99+
row.getLong(3),
100+
row.getInt(4),
101+
row.getLong(5),
102+
row.getString(6)
103+
)
104+
}
105+
106+
def toRow(entry: SinkFileStatus): UnsafeRow = {
107+
val row = new GenericInternalRow(Array[Any](
108+
UTF8String.fromString(entry.path), entry.size, entry.isDir, entry.modificationTime,
109+
entry.blockReplication, entry.blockSize, UTF8String.fromString(entry.action)))
110+
PROJ_UNSAFE_ROW.apply(row).copy()
111+
}
112+
}
113+
69114
/**
70115
* A special log for [[FileStreamSink]]. It will write one log file for each batch. The first line
71116
* of the log file is the version number, and there are multiple JSON lines following. Each JSON
@@ -93,6 +138,8 @@ class FileStreamSinkLog(
93138
protected override val defaultCompactInterval =
94139
sparkSession.sessionState.conf.fileSinkLogCompactInterval
95140

141+
private val sparkConf = sparkSession.sparkContext.getConf
142+
96143
require(defaultCompactInterval > 0,
97144
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
98145
"to a positive value.")
@@ -105,10 +152,16 @@ class FileStreamSinkLog(
105152
logs.filter(f => !deletedFiles.contains(f.path))
106153
}
107154
}
155+
156+
override def dataToUnsafeRow(data: SinkFileStatus): UnsafeRow = SinkFileStatusV2.toRow(data)
157+
158+
override def unsafeRowToData(row: UnsafeRow): SinkFileStatus = SinkFileStatusV2.fromRow(row)
159+
160+
override def numFieldsForUnsafeRow: Int = SinkFileStatusV2.SCHEMA.fields.length
108161
}
109162

110163
object FileStreamSinkLog {
111-
val VERSION = 1
164+
val VERSION = 2
112165
val DELETE_ACTION = "delete"
113166
val ADD_ACTION = "add"
114167
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
2929
import org.apache.spark.deploy.SparkHadoopUtil
3030
import org.apache.spark.internal.Logging
3131
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
32+
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
3233
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3334
import org.apache.spark.sql.connector.read.streaming
3435
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl}
3536
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
3637
import org.apache.spark.sql.internal.SQLConf
37-
import org.apache.spark.sql.types.StructType
38+
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
39+
import org.apache.spark.unsafe.types.UTF8String
3840
import org.apache.spark.util.ThreadUtils
3941

4042
/**
@@ -313,6 +315,29 @@ object FileStreamSource {
313315

314316
case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable
315317

318+
object FileEntryV2 {
319+
val SCHEMA = new StructType(
320+
Array(
321+
StructField("path", StringType),
322+
StructField("timestamp", LongType),
323+
StructField("batchId", LongType)
324+
)
325+
)
326+
327+
val PROJ_UNSAFE_ROW = UnsafeProjection.create(SCHEMA.fields.map(_.dataType))
328+
329+
def fromRow(row: UnsafeRow): FileEntry = {
330+
FileEntry(row.getString(0), row.getLong(1), row.getLong(2))
331+
}
332+
333+
def toRow(entry: FileEntry): UnsafeRow = {
334+
val row = new GenericInternalRow(Array[Any](
335+
UTF8String.fromString(entry.path), entry.timestamp, entry.batchId
336+
))
337+
PROJ_UNSAFE_ROW.apply(row).copy()
338+
}
339+
}
340+
316341
/**
317342
* A custom hash map used to track the list of files seen. This map is not thread-safe.
318343
*

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,22 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20+
import java.io.{DataInputStream, DataOutputStream, InputStream, IOException, OutputStream}
21+
import java.nio.charset.StandardCharsets.UTF_8
2022
import java.util.{LinkedHashMap => JLinkedHashMap}
2123
import java.util.Map.Entry
2224

2325
import scala.collection.mutable
26+
import scala.io.{Source => IOSource}
2427

28+
import com.google.common.io.ByteStreams
2529
import org.json4s.NoTypeHints
2630
import org.json4s.jackson.Serialization
2731

32+
import org.apache.spark.io.LZ4CompressionCodec
2833
import org.apache.spark.sql.SparkSession
29-
import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
34+
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
35+
import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, FileEntryV2}
3036
import org.apache.spark.sql.internal.SQLConf
3137

3238
class FileStreamSourceLog(
@@ -52,6 +58,8 @@ class FileStreamSourceLog(
5258

5359
private implicit val formats = Serialization.formats(NoTypeHints)
5460

61+
private val sparkConf = sparkSession.sparkContext.getConf
62+
5563
// A fixed size log entry cache to cache the file entries belong to the compaction batch. It is
5664
// used to avoid scanning the compacted log file to retrieve it's own batch data.
5765
private val cacheSize = compactInterval
@@ -122,8 +130,14 @@ class FileStreamSourceLog(
122130
}
123131
batches
124132
}
133+
134+
override def dataToUnsafeRow(data: FileEntry): UnsafeRow = FileEntryV2.toRow(data)
135+
136+
override def unsafeRowToData(row: UnsafeRow): FileEntry = FileEntryV2.fromRow(row)
137+
138+
override def numFieldsForUnsafeRow: Int = FileEntryV2.SCHEMA.fields.length
125139
}
126140

127141
object FileStreamSourceLog {
128-
val VERSION = 1
142+
val VERSION = 2
129143
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,16 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
240240
* "v123xyz" etc.)
241241
*/
242242
private[sql] def validateVersion(text: String, maxSupportedVersion: Int): Int = {
243+
val version = parseVersion(text)
244+
if (version > maxSupportedVersion) {
245+
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
246+
s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
247+
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
248+
}
249+
version
250+
}
251+
252+
private[sql] def parseVersion(text: String): Int = {
243253
if (text.length > 0 && text(0) == 'v') {
244254
val version =
245255
try {
@@ -249,15 +259,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
249259
throw new IllegalStateException(s"Log file was malformed: failed to read correct log " +
250260
s"version from $text.")
251261
}
252-
if (version > 0) {
253-
if (version > maxSupportedVersion) {
254-
throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " +
255-
s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " +
256-
s"by a newer version of Spark and cannot be read by this version. Please upgrade.")
257-
} else {
258-
return version
259-
}
260-
}
262+
263+
if (version > 0) return version
261264
}
262265

263266
// reaching here means we failed to read the correct log version

0 commit comments

Comments
 (0)