Skip to content
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* A class used to report information about the progress of a [[ContinuousQuery]].
*
* @param name The [[ContinuousQuery]] name
* @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources.
* @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
*/
@Experimental
class ContinuousQueryInfo private[sql](
val name: String,
val sourceStatuses: Seq[SourceStatus],
val sinkStatus: SinkStatus)
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.sql.execution.streaming
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
* vector clock that must progress linearly forward.
*/
case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
case class CompositeOffset(offsets: Array[Option[Offset]]) extends Offset {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the type to Array as jackson-module-scala doesn't support Seq[Option[X]]. Probably because of the type erasure.

/**
* Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
* or greater than the specified object.
Expand Down Expand Up @@ -67,6 +67,15 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {

override def toString: String =
offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]")

override def equals(other: Any): Boolean = other match {
case that: CompositeOffset => offsets != null && offsets.sameElements(that.offsets)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to redefine equals and hashCode as offsets is an array

case _ => false
}

override def hashCode(): Int = {
offsets.toSeq.hashCode()
}
}

object CompositeOffset {
Expand All @@ -75,6 +84,10 @@ object CompositeOffset {
* `nulls` in the sequence are converted to `None`s.
*/
def fill(offsets: Offset*): CompositeOffset = {
CompositeOffset(offsets.map(Option(_)))
new CompositeOffset(offsets.map(Option(_)).toArray)
}

def apply(offsets: Seq[Option[Offset]]): CompositeOffset = {
new CompositeOffset(offsets.toArray)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
}
}

/**
* Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark
* listener bus.
*/
private case class WrappedContinuousQueryListenerEvent(
streamingListenerEvent: ContinuousQueryListener.Event) extends SparkListenerEvent {
}

// Do not log streaming events in event log as history server does not support these events.
protected[spark] override def logEvent: Boolean = false
}
/**
* Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark
* listener bus.
*/
case class WrappedContinuousQueryListenerEvent(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it out of ContinuousQueryListenerBus in order to eliminate the reference

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't actually understand the structure of all the different even busses, but why do we need a different event type? Why don't all the events just extend SparkListenerEvent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This pattern is from the Streaming events. But that's for binary compatibility. We don't need this pattern in the new APIs.

streamingListenerEvent: ContinuousQueryListener.Event) extends SparkListenerEvent {

// Do not log streaming events in event log as history server does not support these events.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The history server isn't robust to unknown messages? Seems bad that we always drop them. What happens when we do add support?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's robust. Just don't log unnecessary events since we don't have any UI for them.

protected[spark] override def logEvent: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.spark.sql.execution.streaming

import com.fasterxml.jackson.annotation.JsonTypeInfo

/**
* An offset is a monotonically increasing metric used to track progress in the computation of a
* stream. An [[Offset]] must be comparable, and the result of `compareTo` must be consistent
* with `equals` and `hashcode`.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? What JSON does this produce?

@vanzin, I find this very hard to reason about relative to constructing the JSON by hand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why Jackson has javadocs. :-) You read it once instead of having to understand how every single type ends up in json by having to read the specific manual serialization code for that type...

Anyway, to avoid copy & paste of this kind of code everywhere, it would be probably better to have a base trait for "traits that will be serialized in events and might have multiple implementations", and have that trait have this annotation (like SparkListenerEvent does).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? What JSON does this produce?

This is for polymorphic serialization. It will write the class name to the field @class (set by property = "@Class"). When deserializing an object, it needs the class name to know the concrete class since the reflection APIs will only provide the interface name.

Copy link
Contributor

@marmbrus marmbrus Jun 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my experience (with jackson specifically and people who have read the docs) you end up with surprises down the road due to magic.

@zsxwing In this particular case, maybe we should remove the offsets from the status messages, or turn them to strings or something. These are going to be implemented by specific sinks, so I'm not sure we want to expose the details of arbitrary classes to the listener bus.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had way more problems with manual serialization than with jackson, so well, YMMV I guess.

trait Offset extends Serializable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class StreamExecution(
// Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners,
// so must mark this as ACTIVE first.
state = ACTIVE
postEvent(new QueryStarted(this)) // Assumption: Does not throw exception.
postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw exception.

// Unblock starting thread
startLatch.countDown()
Expand Down Expand Up @@ -206,7 +206,7 @@ class StreamExecution(
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(new QueryTerminated(this))
postEvent(new QueryTerminated(this.toInfo, this.exception.map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
Expand Down Expand Up @@ -374,7 +374,7 @@ class StreamExecution(
logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
// Update committed offsets.
committedOffsets ++= availableOffsets
postEvent(new QueryProgress(this))
postEvent(new QueryProgress(this.toInfo))
}

private def postEvent(event: ContinuousQueryListener.Event) {
Expand Down Expand Up @@ -484,6 +484,13 @@ class StreamExecution(
""".stripMargin
}

private def toInfo: ContinuousQueryInfo = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make this a deep copy, especially for sourceStatuses and sinkStatus ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. They are immutable.

new ContinuousQueryInfo(
this.name,
this.sourceStatuses,
this.sinkStatus)
}

trait State
case object INITIALIZED extends State
case object ACTIVE extends State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.sql.util

import com.fasterxml.jackson.annotation.JsonTypeInfo

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.ContinuousQuery
import org.apache.spark.sql._
import org.apache.spark.sql.util.ContinuousQueryListener._

/**
Expand Down Expand Up @@ -62,14 +64,26 @@ abstract class ContinuousQueryListener {
object ContinuousQueryListener {

/** Base type of [[ContinuousQueryListener]] events */
@JsonTypeInfo(
use = JsonTypeInfo.Id.CLASS,
include = JsonTypeInfo.As.PROPERTY,
property = "@class")
trait Event

/** Event representing the start of a query */
class QueryStarted private[sql](val query: ContinuousQuery) extends Event
class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event

/** Event representing any progress updates in a query */
class QueryProgress private[sql](val query: ContinuousQuery) extends Event
class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event

/** Event representing that termination of a query */
class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
/**
* Event representing that termination of a query
*
* @param queryInfo
* @param exception The exception information of the [[ContinuousQuery]] if any. Otherwise, it
* will be `None`.
*/
class QueryTerminated private[sql](
val queryInfo: ContinuousQueryInfo,
val exception: Option[String]) extends Event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want the stacktrace too as well as how far it made it before it failed. It might be okay to serialize the exception itself, we just need to make sure its not pulling in the whole query.

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
AssertOnQuery(_.sourceStatuses(0).offset === None),
AssertOnQuery(_.sinkStatus.description.contains("Memory")),
AssertOnQuery(_.sinkStatus.offset === new CompositeOffset(None :: Nil)),
AssertOnQuery(_.sinkStatus.offset === CompositeOffset(None :: Nil)),
AddData(inputData, 1, 2),
CheckAnswer(6, 3),
AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(0))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.util.ContinuousQueryListener.{QueryProgress, QueryStarted, QueryTerminated}
import org.apache.spark.util.JsonProtocol

class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {

Expand All @@ -52,7 +54,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
Assert("Incorrect query status in onQueryStarted") {
val status = listener.startStatus
assert(status != null)
assert(status.active == true)
assert(status.sourceStatuses.size === 1)
assert(status.sourceStatuses(0).description.contains("Memory"))

Expand All @@ -74,7 +75,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
assert(listener.progressStatuses.size === 1)
val status = listener.progressStatuses.peek()
assert(status != null)
assert(status.active == true)
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))

Expand All @@ -87,8 +87,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
eventually(Timeout(streamingTimeout)) {
val status = listener.terminationStatus
assert(status != null)

assert(status.active === false) // must be inactive by the time onQueryTerm is called
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
}
Expand Down Expand Up @@ -142,6 +140,95 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
}
}

test("exception should be reported in QueryTerminated") {
val listener = new QueryStatusCollector
withListenerAdded(listener) {
val input = MemoryStream[Int]
testStream(input.toDS.map(_ / 0))(
StartStream(),
AddData(input, 1),
ExpectFailure[SparkException](),
Assert {
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
assert(listener.terminationStatus !== null)
assert(listener.terminationException.isDefined &&
listener.terminationException.get.contains("java.lang.ArithmeticException"))
}
)
}
}

test("QueryStarted serialization") {
val queryStartedInfo = new ContinuousQueryInfo(
"name",
Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)),
new SinkStatus("sink", CompositeOffset(None :: None :: Nil)))
val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo)
val json = JsonProtocol.sparkEventToJson(WrappedContinuousQueryListenerEvent(queryStarted))
val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[WrappedContinuousQueryListenerEvent]
.streamingListenerEvent.asInstanceOf[ContinuousQueryListener.QueryStarted]
assertContinuousQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo)
}

test("QueryProgress serialization") {
val queryProcessInfo = new ContinuousQueryInfo(
"name",
Seq(
new SourceStatus("source1", Some(LongOffset(0))),
new SourceStatus("source2", Some(LongOffset(1)))),
new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1))))))
val queryProcess = new ContinuousQueryListener.QueryProgress(queryProcessInfo)
val json = JsonProtocol.sparkEventToJson(WrappedContinuousQueryListenerEvent(queryProcess))
val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[WrappedContinuousQueryListenerEvent]
.streamingListenerEvent.asInstanceOf[ContinuousQueryListener.QueryProgress]
assertContinuousQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo)
}

test("QueryTerminated serialization") {
val queryTerminatedInfo = new ContinuousQueryInfo(
"name",
Seq(
new SourceStatus("source1", Some(LongOffset(0))),
new SourceStatus("source2", Some(LongOffset(1)))),
new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1))))))
val queryQueryTerminated =
new ContinuousQueryListener.QueryTerminated(queryTerminatedInfo, Some("exception"))
val json =
JsonProtocol.sparkEventToJson(WrappedContinuousQueryListenerEvent(queryQueryTerminated))
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[WrappedContinuousQueryListenerEvent]
.streamingListenerEvent.asInstanceOf[ContinuousQueryListener.QueryTerminated]
assertContinuousQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo)
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
}

private def assertContinuousQueryInfoEquals(
expected: ContinuousQueryInfo,
actual: ContinuousQueryInfo): Unit = {
assert(expected.name === actual.name)
assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
case (expectedSource, actualSource) =>
assertSourceStatus(expectedSource, actualSource)
}
assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
}

private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = {
assert(expected.description === actual.description)
assert {
expected.offset.isEmpty && actual.offset.isEmpty ||
(expected.offset.isDefined && actual.offset.isDefined &&
expected.offset.get.compareTo(actual.offset.get) === 0)
}
}

private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = {
assert(expected.description === actual.description)
assert(expected.offset.compareTo(actual.offset) === 0)
}

private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = {
try {
Expand All @@ -165,9 +252,10 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
// to catch errors in the async listener events
@volatile private var asyncTestWaiter = new Waiter

@volatile var startStatus: QueryStatus = null
@volatile var terminationStatus: QueryStatus = null
val progressStatuses = new ConcurrentLinkedQueue[QueryStatus]
@volatile var startStatus: ContinuousQueryInfo = null
@volatile var terminationStatus: ContinuousQueryInfo = null
@volatile var terminationException: Option[String] = null
val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo]

def reset(): Unit = {
startStatus = null
Expand All @@ -183,35 +271,24 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with

override def onQueryStarted(queryStarted: QueryStarted): Unit = {
asyncTestWaiter {
startStatus = QueryStatus(queryStarted.query)
startStatus = queryStarted.queryInfo
}
}

override def onQueryProgress(queryProgress: QueryProgress): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryProgress called before onQueryStarted")
progressStatuses.add(QueryStatus(queryProgress.query))
progressStatuses.add(queryProgress.queryInfo)
}
}

override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
terminationStatus = QueryStatus(queryTerminated.query)
terminationStatus = queryTerminated.queryInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is not tested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for the exception

terminationException = queryTerminated.exception
}
asyncTestWaiter.dismiss()
}
}

case class QueryStatus(
active: Boolean,
exception: Option[Exception],
sourceStatuses: Array[SourceStatus],
sinkStatus: SinkStatus)

object QueryStatus {
def apply(query: ContinuousQuery): QueryStatus = {
QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus)
}
}
}