From 5496952662983b1dd1a1993161943667fc6fd4a0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Sun, 23 Feb 2020 22:53:01 -0800 Subject: [PATCH 1/4] Enable FAIL_ON_UNKNOWN_PROPERTIES by default to parse Spark events --- .../org/apache/spark/util/JsonProtocol.scala | 3 ++- .../apache/spark/util/JsonProtocolSuite.scala | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 53824735d2fc..9254ac94005f 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -22,7 +22,7 @@ import java.util.{Properties, UUID} import scala.collection.JavaConverters._ import scala.collection.Map -import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.json4s.DefaultFormats import org.json4s.JsonAST._ @@ -59,6 +59,7 @@ private[spark] object JsonProtocol { private implicit val format = DefaultFormats private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) /** ------------------------------------------------- * * JSON serialization methods for SparkListenerEvents | diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index edc0662a0f73..0c534bcf52c7 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -483,6 +483,28 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), blocks, JString(blocks.toString)) testAccumValue(Some("anything"), 123, JString("123")) } + + test("forwards compatibility: ignore unknown fields") { + val expected = TestListenerEvent("foo", 123) + val unknownFieldsJson = + """{ + | "Event" : "org.apache.spark.util.TestListenerEvent", + | "foo" : "foo", + | "bar" : 123, + | "unknown" : "unknown" + |}""".stripMargin + assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) + } + + test("backwards compatibility: set default values for missing fields") { + val expected = TestListenerEvent("foo", 0) + val unknownFieldsJson = + """{ + | "Event" : "org.apache.spark.util.TestListenerEvent", + | "foo" : "foo" + |}""".stripMargin + assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) + } } @@ -2313,3 +2335,5 @@ private[spark] object JsonProtocolSuite extends Assertions { |} """.stripMargin } + +case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent From 41726768d19c3b48d05e2a3eb54c53d4561677d8 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Sun, 23 Feb 2020 23:01:16 -0800 Subject: [PATCH 2/4] update test names --- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 0c534bcf52c7..eb7f3079bee3 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -484,7 +484,7 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), 123, JString("123")) } - test("forwards compatibility: ignore unknown fields") { + test("SPARK-30936: forwards compatibility - ignore unknown fields") { val expected = TestListenerEvent("foo", 123) val unknownFieldsJson = """{ @@ -496,7 +496,7 @@ class JsonProtocolSuite extends SparkFunSuite { assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) } - test("backwards compatibility: set default values for missing fields") { + test("SPARK-30936: backwards compatibility - set default values for missing fields") { val expected = TestListenerEvent("foo", 0) val unknownFieldsJson = """{ From 7938701af1a9e166b20a6e51daf7c179990db90d Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 24 Feb 2020 10:25:56 -0800 Subject: [PATCH 3/4] fix tests --- .../StreamingQueryListenerSuite.scala | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 9d0f829ac968..4ca5f9581eba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -382,28 +382,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_0") { // query-event-logs-version-2.0.0.txt has all types of events generated by - // Structured Streaming in Spark 2.0.0. + // Structured Streaming in Spark 2.0.0. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt") + testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1) } - testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") { // query-event-logs-version-2.0.1.txt has all types of events generated by - // Structured Streaming in Spark 2.0.1. + // Structured Streaming in Spark 2.0.1. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt") + testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1) } - testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") { // query-event-logs-version-2.0.2.txt has all types of events generated by - // Structured Streaming in Spark 2.0.2. - // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it - // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt") + // Structured Streaming in Spark 2.0.2. We added a new `runId` field in 2.1.0. But we should + // still be able to load events generated by Spark 2.0.2. + testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5) } test("listener propagates observable metrics") { @@ -463,7 +462,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = { + private def testReplayListenerBusWithBorkenEventJsons( + fileName: String, + expectedEventSize: Int): Unit = { val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName") val events = mutable.ArrayBuffer[SparkListenerEvent]() try { @@ -479,8 +480,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { replayer.addListener(new SparkListener {}) replayer.replay(input, fileName) // SparkListenerApplicationEnd is the only valid event - assert(events.size === 1) - assert(events(0).isInstanceOf[SparkListenerApplicationEnd]) + assert(events.size === expectedEventSize) + assert(events.last.isInstanceOf[SparkListenerApplicationEnd]) } finally { input.close() } From 9b18998c786debecae53d7eaf2c6f4529487a985 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 24 Feb 2020 10:42:15 -0800 Subject: [PATCH 4/4] fix comment --- .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 4ca5f9581eba..6bb1646becf2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -400,8 +400,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") { // query-event-logs-version-2.0.2.txt has all types of events generated by - // Structured Streaming in Spark 2.0.2. We added a new `runId` field in 2.1.0. But we should - // still be able to load events generated by Spark 2.0.2. + // Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events + // in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2. testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5) }