From eb87c63f653917af5a221b780681bb7de6fd217c Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 2 Jul 2017 17:28:30 +0900 Subject: [PATCH 01/10] insert assert to ensure Unsafe.sizeInBytes is a multiple of 8 --- .../org/apache/spark/sql/catalyst/expressions/UnsafeRow.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 86de90984ca0..5e52c5b54624 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -167,6 +167,7 @@ public UnsafeRow() {} */ public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) { assert numFields >= 0 : "numFields (" + numFields + ") should >= 0"; + assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.baseObject = baseObject; this.baseOffset = baseOffset; this.sizeInBytes = sizeInBytes; @@ -183,6 +184,7 @@ public void pointTo(byte[] buf, int sizeInBytes) { } public void setTotalSize(int sizeInBytes) { + assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.sizeInBytes = sizeInBytes; } @@ -538,6 +540,7 @@ public void copyFrom(UnsafeRow row) { row.baseObject, row.baseOffset, this.baseObject, this.baseOffset, row.sizeInBytes); // update the sizeInBytes. this.sizeInBytes = row.sizeInBytes; + assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; } /** @@ -664,6 +667,7 @@ public void writeExternal(ObjectOutput out) throws IOException { public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { this.baseOffset = BYTE_ARRAY_OFFSET; this.sizeInBytes = in.readInt(); + assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.numFields = in.readInt(); this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); this.baseObject = new byte[sizeInBytes]; @@ -682,6 +686,7 @@ public void write(Kryo kryo, Output out) { public void read(Kryo kryo, Input in) { this.baseOffset = BYTE_ARRAY_OFFSET; this.sizeInBytes = in.readInt(); + assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.numFields = in.readInt(); this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); this.baseObject = new byte[sizeInBytes]; From 6cde32fc7b8f28af2f8f07032337fe19af883575 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 3 Jul 2017 00:55:42 +0900 Subject: [PATCH 02/10] fix in ExternalSorter --- .../spark/sql/execution/UnsafeExternalRowSorter.java | 7 ++++--- .../apache/spark/sql/execution/UnsafeKVExternalSorter.java | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index c29b002a998c..5d1f8136bd96 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -208,9 +208,10 @@ private static final class RowComparator extends RecordComparator { @Override public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) { - // TODO: Why are the sizes -1? - row1.pointTo(baseObj1, baseOff1, -1); - row2.pointTo(baseObj2, baseOff2, -1); + // Note that since ordering doesn't need the total length of the record, we just pass 0 + // into the row. + row1.pointTo(baseObj1, baseOff1, 0); + row2.pointTo(baseObj2, baseOff2, 0); return ordering.compare(row1, row2); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index ee5bcfd02c79..d8acf11a9791 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -238,10 +238,10 @@ private static final class KVComparator extends RecordComparator { @Override public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) { - // Note that since ordering doesn't need the total length of the record, we just pass -1 + // Note that since ordering doesn't need the total length of the record, we just pass 0 // into the row. - row1.pointTo(baseObj1, baseOff1 + 4, -1); - row2.pointTo(baseObj2, baseOff2 + 4, -1); + row1.pointTo(baseObj1, baseOff1 + 4, 0); + row2.pointTo(baseObj2, baseOff2 + 4, 0); return ordering.compare(row1, row2); } } From ccc820f1589fa991a0fca781f5eaae51434349a1 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 3 Jul 2017 00:58:04 +0900 Subject: [PATCH 03/10] fix in RowBasedKeyValueBatch --- .../expressions/FixedLengthRowBasedKeyValueBatch.java | 6 +++--- .../expressions/VariableLengthRowBasedKeyValueBatch.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java index a88a315bf479..1f671ffd5065 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java @@ -62,7 +62,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen, keyRowId = numRows; keyRow.pointTo(base, recordOffset, klen); - valueRow.pointTo(base, recordOffset + klen, vlen + 4); + valueRow.pointTo(base, recordOffset + klen, vlen + 8); numRows++; return valueRow; } @@ -95,7 +95,7 @@ protected UnsafeRow getValueFromKey(int rowId) { getKeyRow(rowId); } assert(rowId >= 0); - valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 4); + valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 8); return valueRow; } @@ -131,7 +131,7 @@ public boolean next() { } key.pointTo(base, offsetInPage, klen); - value.pointTo(base, offsetInPage + klen, vlen + 4); + value.pointTo(base, offsetInPage + klen, vlen + 8); offsetInPage += recordLength; recordsInPage -= 1; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java index ea4f984be24e..0b0547123934 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java @@ -65,7 +65,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen, keyRowId = numRows; keyRow.pointTo(base, recordOffset + 8, klen); - valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 4); + valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 8); numRows++; return valueRow; } @@ -102,7 +102,7 @@ public UnsafeRow getValueFromKey(int rowId) { long offset = keyRow.getBaseOffset(); int klen = keyRow.getSizeInBytes(); int vlen = Platform.getInt(base, offset - 8) - klen - 4; - valueRow.pointTo(base, offset + klen, vlen + 4); + valueRow.pointTo(base, offset + klen, vlen + 8); return valueRow; } @@ -146,7 +146,7 @@ public boolean next() { currentvlen = totalLength - currentklen; key.pointTo(base, offsetInPage + 8, currentklen); - value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 4); + value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 8); offsetInPage += 8 + totalLength + 8; recordsInPage -= 1; From 3bd72c98bbd943c77e00b4518f6d022396a534c1 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 5 Jul 2017 18:16:00 +0900 Subject: [PATCH 04/10] fix size of a byte array in UnsafeRow since 4-byte area is used only in a page --- .../expressions/FixedLengthRowBasedKeyValueBatch.java | 6 +++--- .../expressions/VariableLengthRowBasedKeyValueBatch.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java index 1f671ffd5065..df52f9c2d549 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java @@ -62,7 +62,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen, keyRowId = numRows; keyRow.pointTo(base, recordOffset, klen); - valueRow.pointTo(base, recordOffset + klen, vlen + 8); + valueRow.pointTo(base, recordOffset + klen, vlen); numRows++; return valueRow; } @@ -95,7 +95,7 @@ protected UnsafeRow getValueFromKey(int rowId) { getKeyRow(rowId); } assert(rowId >= 0); - valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 8); + valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen); return valueRow; } @@ -131,7 +131,7 @@ public boolean next() { } key.pointTo(base, offsetInPage, klen); - value.pointTo(base, offsetInPage + klen, vlen + 8); + value.pointTo(base, offsetInPage + klen, vlen); offsetInPage += recordLength; recordsInPage -= 1; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java index 0b0547123934..905e6820ce6e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java @@ -65,7 +65,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen, keyRowId = numRows; keyRow.pointTo(base, recordOffset + 8, klen); - valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 8); + valueRow.pointTo(base, recordOffset + 8 + klen, vlen); numRows++; return valueRow; } @@ -102,7 +102,7 @@ public UnsafeRow getValueFromKey(int rowId) { long offset = keyRow.getBaseOffset(); int klen = keyRow.getSizeInBytes(); int vlen = Platform.getInt(base, offset - 8) - klen - 4; - valueRow.pointTo(base, offset + klen, vlen + 8); + valueRow.pointTo(base, offset + klen, vlen); return valueRow; } @@ -146,7 +146,7 @@ public boolean next() { currentvlen = totalLength - currentklen; key.pointTo(base, offsetInPage + 8, currentklen); - value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 8); + value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen); offsetInPage += 8 + totalLength + 8; recordsInPage -= 1; From 8a7a9483ba727d61486b8fa111a026a86abdfb64 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 5 Jul 2017 18:24:41 +0900 Subject: [PATCH 05/10] round up size of key/value in UnsafeRow to a multiple of 8 --- .../state/HDFSBackedStateStoreProvider.scala | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index bae7a15165e4..e23a5e1b9297 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -350,20 +350,24 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit throw new IOException( s"Error reading delta file $fileToRead of $this: key size cannot be $keySize") } else { - val keyRowBuffer = new Array[Byte](keySize) + // If key size in an existing file is not a multiple of 8, round it to multiple of 8 + val keyAllocationSize = ((keySize + 7) / 8) * 8 + val keyRowBuffer = new Array[Byte](keyAllocationSize) ByteStreams.readFully(input, keyRowBuffer, 0, keySize) val keyRow = new UnsafeRow(keySchema.fields.length) - keyRow.pointTo(keyRowBuffer, keySize) + keyRow.pointTo(keyRowBuffer, keyAllocationSize) val valueSize = input.readInt() if (valueSize < 0) { map.remove(keyRow) } else { - val valueRowBuffer = new Array[Byte](valueSize) + // If value size in an existing file is not a multiple of 8, round it to multiple of 8 + val valueAllocationSize = ((valueSize + 7) / 8) * 8 + val valueRowBuffer = new Array[Byte](valueAllocationSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - valueRow.pointTo(valueRowBuffer, valueSize) + valueRow.pointTo(valueRowBuffer, valueAllocationSize) map.put(keyRow, valueRow) } } @@ -413,21 +417,25 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit throw new IOException( s"Error reading snapshot file $fileToRead of $this: key size cannot be $keySize") } else { - val keyRowBuffer = new Array[Byte](keySize) + // If key size in an existing file is not a multiple of 8, round it to multiple of 8 + val keyAllocationSize = ((keySize + 7) / 8) * 8 + val keyRowBuffer = new Array[Byte](keyAllocationSize) ByteStreams.readFully(input, keyRowBuffer, 0, keySize) val keyRow = new UnsafeRow(keySchema.fields.length) - keyRow.pointTo(keyRowBuffer, keySize) + keyRow.pointTo(keyRowBuffer, keyAllocationSize) val valueSize = input.readInt() if (valueSize < 0) { throw new IOException( s"Error reading snapshot file $fileToRead of $this: value size cannot be $valueSize") } else { - val valueRowBuffer = new Array[Byte](valueSize) + // If value size in an existing file is not a multiple of 8, round it to multiple of 8 + val valueAllocationSize = ((valueSize + 7) / 8) * 8 + val valueRowBuffer = new Array[Byte](valueAllocationSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - valueRow.pointTo(valueRowBuffer, valueSize) + valueRow.pointTo(valueRowBuffer, valueAllocationSize) map.put(keyRow, valueRow) } } From 7f5a269c70c7dfe22d7d990dc075061682c09804 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 13 Jul 2017 19:44:01 +0900 Subject: [PATCH 06/10] add a test suite for storing to and recovering from a checkpoint --- .../spark/sql/streaming/StreamSuite.scala | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6f7b9d35a6bb..e0bb211a534c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -479,6 +479,61 @@ class StreamSuite extends StreamTest { CheckAnswer((1, 2), (2, 2), (3, 2))) } + testQuietly("store to and recover from a checkpoint") { + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + def query(data: MemoryStream[Int], checkpointDir: String, queryName: String): + DataStreamWriter[Row] = { + data.toDF + .groupBy($"value") + .agg(count("*")) + .writeStream + .outputMode("complete") + .option("checkpointLocation", checkpointDir) + .format("memory") + .queryName(queryName) + } + + withSQLConf( + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + var writeQuery: StreamingQuery = null + try { + val data = MemoryStream[Int] + writeQuery = query(data, checkpointDir, "write").start() + + data.addData(1, 2, 3, 4) + writeQuery.processAllAvailable() + data.addData(3, 4, 5, 6) + writeQuery.processAllAvailable() + data.addData(5, 6, 7, 8) + writeQuery.processAllAvailable() + } finally { + assert(writeQuery != null) + writeQuery.stop() + } + + var restartQuery: StreamingQuery = null + try { + val data = MemoryStream[Int] + data.addData(1, 2, 3, 4) + data.addData(3, 4, 5, 6) + data.addData(5, 6, 7, 8) + + restartQuery = query(data, checkpointDir, "counts").start() + restartQuery.processAllAvailable() + data.addData(9) + restartQuery.processAllAvailable() + + QueryTest.checkAnswer(spark.table("counts").toDF, + Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: + Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil) + } finally { + assert(restartQuery != null) + restartQuery.stop() + } + } + } + testQuietly("recover from a Spark v2.1 checkpoint") { var inputData: MemoryStream[Int] = null var query: DataStreamWriter[Row] = null From 762f02a2c9211ab953a2dc4b2d9938911f2e883d Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 20 Jul 2017 03:55:55 +0900 Subject: [PATCH 07/10] address review comments --- .../sql/catalyst/expressions/UnsafeRow.java | 3 --- .../state/HDFSBackedStateStoreProvider.scala | 26 +++++++------------ 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 5e52c5b54624..655b1d0d4b41 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -540,7 +540,6 @@ public void copyFrom(UnsafeRow row) { row.baseObject, row.baseOffset, this.baseObject, this.baseOffset, row.sizeInBytes); // update the sizeInBytes. this.sizeInBytes = row.sizeInBytes; - assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; } /** @@ -667,7 +666,6 @@ public void writeExternal(ObjectOutput out) throws IOException { public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { this.baseOffset = BYTE_ARRAY_OFFSET; this.sizeInBytes = in.readInt(); - assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.numFields = in.readInt(); this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); this.baseObject = new byte[sizeInBytes]; @@ -686,7 +684,6 @@ public void write(Kryo kryo, Output out) { public void read(Kryo kryo, Input in) { this.baseOffset = BYTE_ARRAY_OFFSET; this.sizeInBytes = in.readInt(); - assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.numFields = in.readInt(); this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); this.baseObject = new byte[sizeInBytes]; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index e23a5e1b9297..e667e5889693 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -350,24 +350,21 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit throw new IOException( s"Error reading delta file $fileToRead of $this: key size cannot be $keySize") } else { - // If key size in an existing file is not a multiple of 8, round it to multiple of 8 - val keyAllocationSize = ((keySize + 7) / 8) * 8 - val keyRowBuffer = new Array[Byte](keyAllocationSize) + val keyRowBuffer = new Array[Byte](keySize) ByteStreams.readFully(input, keyRowBuffer, 0, keySize) val keyRow = new UnsafeRow(keySchema.fields.length) - keyRow.pointTo(keyRowBuffer, keyAllocationSize) + keyRow.pointTo(keyRowBuffer, keySize) val valueSize = input.readInt() if (valueSize < 0) { map.remove(keyRow) } else { - // If value size in an existing file is not a multiple of 8, round it to multiple of 8 - val valueAllocationSize = ((valueSize + 7) / 8) * 8 - val valueRowBuffer = new Array[Byte](valueAllocationSize) + val valueRowBuffer = new Array[Byte](valueSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - valueRow.pointTo(valueRowBuffer, valueAllocationSize) + // If valueSize in existing file is not multiple of 8, round it down to multiple of 8 + valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } } @@ -417,25 +414,22 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit throw new IOException( s"Error reading snapshot file $fileToRead of $this: key size cannot be $keySize") } else { - // If key size in an existing file is not a multiple of 8, round it to multiple of 8 - val keyAllocationSize = ((keySize + 7) / 8) * 8 - val keyRowBuffer = new Array[Byte](keyAllocationSize) + val keyRowBuffer = new Array[Byte](keySize) ByteStreams.readFully(input, keyRowBuffer, 0, keySize) val keyRow = new UnsafeRow(keySchema.fields.length) - keyRow.pointTo(keyRowBuffer, keyAllocationSize) + keyRow.pointTo(keyRowBuffer, keySize) val valueSize = input.readInt() if (valueSize < 0) { throw new IOException( s"Error reading snapshot file $fileToRead of $this: value size cannot be $valueSize") } else { - // If value size in an existing file is not a multiple of 8, round it to multiple of 8 - val valueAllocationSize = ((valueSize + 7) / 8) * 8 - val valueRowBuffer = new Array[Byte](valueAllocationSize) + val valueRowBuffer = new Array[Byte](valueSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - valueRow.pointTo(valueRowBuffer, valueAllocationSize) + // If valueSize in existing file is not multiple of 8, round it down to multiple of 8 + valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } } From 01597010145f7769506cfa51c0450a2db779fc07 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 25 Jul 2017 16:52:20 +0900 Subject: [PATCH 08/10] address review comments --- .../state/HDFSBackedStateStoreProvider.scala | 11 +++- .../spark/sql/streaming/StreamSuite.scala | 55 ------------------- 2 files changed, 9 insertions(+), 57 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index e667e5889693..d885ebfb22a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -363,7 +363,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val valueRowBuffer = new Array[Byte](valueSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - // If valueSize in existing file is not multiple of 8, round it down to multiple of 8 + // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. + // This is work around for the following. + // Pre-Spark 2.3 mistakenly append 4 bytes to the value row in + // `FixedLengthRowBasedKeyValueBatch`, which gets persisted into the checkpoint data + assert(valueSize % 8 == 0) valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } @@ -428,7 +432,10 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val valueRowBuffer = new Array[Byte](valueSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - // If valueSize in existing file is not multiple of 8, round it down to multiple of 8 + // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. + // This is work around for the following. + // Pre-Spark 2.3 mistakenly append 4 bytes to the value row in + // `FixedLengthRowBasedKeyValueBatch`, which gets persisted into the checkpoint data valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index e0bb211a534c..6f7b9d35a6bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -479,61 +479,6 @@ class StreamSuite extends StreamTest { CheckAnswer((1, 2), (2, 2), (3, 2))) } - testQuietly("store to and recover from a checkpoint") { - val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath - - def query(data: MemoryStream[Int], checkpointDir: String, queryName: String): - DataStreamWriter[Row] = { - data.toDF - .groupBy($"value") - .agg(count("*")) - .writeStream - .outputMode("complete") - .option("checkpointLocation", checkpointDir) - .format("memory") - .queryName(queryName) - } - - withSQLConf( - SQLConf.SHUFFLE_PARTITIONS.key -> "10") { - var writeQuery: StreamingQuery = null - try { - val data = MemoryStream[Int] - writeQuery = query(data, checkpointDir, "write").start() - - data.addData(1, 2, 3, 4) - writeQuery.processAllAvailable() - data.addData(3, 4, 5, 6) - writeQuery.processAllAvailable() - data.addData(5, 6, 7, 8) - writeQuery.processAllAvailable() - } finally { - assert(writeQuery != null) - writeQuery.stop() - } - - var restartQuery: StreamingQuery = null - try { - val data = MemoryStream[Int] - data.addData(1, 2, 3, 4) - data.addData(3, 4, 5, 6) - data.addData(5, 6, 7, 8) - - restartQuery = query(data, checkpointDir, "counts").start() - restartQuery.processAllAvailable() - data.addData(9) - restartQuery.processAllAvailable() - - QueryTest.checkAnswer(spark.table("counts").toDF, - Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: - Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil) - } finally { - assert(restartQuery != null) - restartQuery.stop() - } - } - } - testQuietly("recover from a Spark v2.1 checkpoint") { var inputData: MemoryStream[Int] = null var query: DataStreamWriter[Row] = null From 54be80ef4849fddb3ff51c53a64173883d1ed026 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 25 Jul 2017 18:58:18 +0900 Subject: [PATCH 09/10] fix test failure by removing an assert for debugging --- .../execution/streaming/state/HDFSBackedStateStoreProvider.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index d885ebfb22a9..ec156ca06bda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -367,7 +367,6 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit // This is work around for the following. // Pre-Spark 2.3 mistakenly append 4 bytes to the value row in // `FixedLengthRowBasedKeyValueBatch`, which gets persisted into the checkpoint data - assert(valueSize % 8 == 0) valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } From cc467de59a5cb7de0e8bdf3421e4a019b31feaff Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 26 Jul 2017 13:38:24 +0900 Subject: [PATCH 10/10] address review comments --- .../state/HDFSBackedStateStoreProvider.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index ec156ca06bda..5f4161bf28e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -364,9 +364,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. - // This is work around for the following. - // Pre-Spark 2.3 mistakenly append 4 bytes to the value row in - // `FixedLengthRowBasedKeyValueBatch`, which gets persisted into the checkpoint data + // This is a workaround for the following: + // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in + // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } @@ -432,9 +432,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. - // This is work around for the following. - // Pre-Spark 2.3 mistakenly append 4 bytes to the value row in - // `FixedLengthRowBasedKeyValueBatch`, which gets persisted into the checkpoint data + // This is a workaround for the following: + // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in + // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) }