Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.{ContinuousQueryInfo, ContinuousQueryListener}
import org.apache.spark.sql.util.ContinuousQueryListener._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}

Expand Down Expand Up @@ -167,7 +167,7 @@ class StreamExecution(
// Mark ACTIVE and then post the event. QueryStarted event is synchronously sent to listeners,
// so must mark this as ACTIVE first.
state = ACTIVE
postEvent(new QueryStarted(this)) // Assumption: Does not throw exception.
postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw exception.

// Unblock starting thread
startLatch.countDown()
Expand Down Expand Up @@ -206,7 +206,7 @@ class StreamExecution(
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(new QueryTerminated(this))
postEvent(new QueryTerminated(this.toInfo, this.exception.map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
Expand Down Expand Up @@ -374,7 +374,7 @@ class StreamExecution(
logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
// Update committed offsets.
committedOffsets ++= availableOffsets
postEvent(new QueryProgress(this))
postEvent(new QueryProgress(this.toInfo))
}

private def postEvent(event: ContinuousQueryListener.Event) {
Expand Down Expand Up @@ -484,6 +484,13 @@ class StreamExecution(
""".stripMargin
}

private def toInfo: ContinuousQueryInfo = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make this a deep copy, especially for sourceStatuses and sinkStatus ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. They are immutable.

new ContinuousQueryInfo(
this.name,
this.sourceStatuses,
this.sinkStatus)
}

trait State
case object INITIALIZED extends State
case object ACTIVE extends State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.util

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.ContinuousQuery
import org.apache.spark.sql._
import org.apache.spark.sql.util.ContinuousQueryListener._

/**
Expand Down Expand Up @@ -65,11 +65,33 @@ object ContinuousQueryListener {
trait Event

/** Event representing the start of a query */
class QueryStarted private[sql](val query: ContinuousQuery) extends Event
class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends Event

/** Event representing any progress updates in a query */
class QueryProgress private[sql](val query: ContinuousQuery) extends Event
class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends Event

/** Event representing that termination of a query */
class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
/**
* Event representing that termination of a query
*
* @param queryInfo
* @param exception The excpetion information of the [[ContinuousQuery]] if any. Otherwise, it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exception

* will be `None`.
*/
class QueryTerminated private[sql](
val queryInfo: ContinuousQueryInfo,
val exception: Option[String]) extends Event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want the stacktrace too as well as how far it made it before it failed. It might be okay to serialize the exception itself, we just need to make sure its not pulling in the whole query.

}

/**
* :: Experimental ::
* A class that contains information about [[ContinuousQuery]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A class used to report information about the progress of a [[ContinuousQuery]].

*
* @param name The [[ContinuousQuery]] name
* @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources.
* @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
*/
@Experimental
class ContinuousQueryInfo private[sql](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more questions:

  • Is there a reason this isn't in the object with the other listener messages? At a minimum it should probably be defined near SourceStatus (which is currently defined at the top level). We should find a place to colocate all of them.
  • Is name enough to uniquely identify a streaming query?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is unique within each session i believe. Maybe we should have a string representing the session (maybe sessionState.toString for start)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly I'm thinking you will want some way to go from this to the actual query object, but maybe thats not a requirement (or could still be added later).

val name: String,
val sourceStatuses: Seq[SourceStatus],
val sinkStatus: SinkStatus)
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
Assert("Incorrect query status in onQueryStarted") {
val status = listener.startStatus
assert(status != null)
assert(status.active == true)
assert(status.sourceStatuses.size === 1)
assert(status.sourceStatuses(0).description.contains("Memory"))

Expand All @@ -74,7 +73,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
assert(listener.progressStatuses.size === 1)
val status = listener.progressStatuses.peek()
assert(status != null)
assert(status.active == true)
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))

Expand All @@ -87,8 +85,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
eventually(Timeout(streamingTimeout)) {
val status = listener.terminationStatus
assert(status != null)

assert(status.active === false) // must be inactive by the time onQueryTerm is called
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
}
Expand Down Expand Up @@ -165,9 +161,9 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
// to catch errors in the async listener events
@volatile private var asyncTestWaiter = new Waiter

@volatile var startStatus: QueryStatus = null
@volatile var terminationStatus: QueryStatus = null
val progressStatuses = new ConcurrentLinkedQueue[QueryStatus]
@volatile var startStatus: ContinuousQueryInfo = null
@volatile var terminationStatus: ContinuousQueryInfo = null
val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo]

def reset(): Unit = {
startStatus = null
Expand All @@ -183,35 +179,23 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with

override def onQueryStarted(queryStarted: QueryStarted): Unit = {
asyncTestWaiter {
startStatus = QueryStatus(queryStarted.query)
startStatus = queryStarted.queryInfo
}
}

override def onQueryProgress(queryProgress: QueryProgress): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryProgress called before onQueryStarted")
progressStatuses.add(QueryStatus(queryProgress.query))
progressStatuses.add(queryProgress.queryInfo)
}
}

override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
asyncTestWaiter {
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
terminationStatus = QueryStatus(queryTerminated.query)
terminationStatus = queryTerminated.queryInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is not tested.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for the exception

}
asyncTestWaiter.dismiss()
}
}

case class QueryStatus(
active: Boolean,
exception: Option[Exception],
sourceStatuses: Array[SourceStatus],
sinkStatus: SinkStatus)

object QueryStatus {
def apply(query: ContinuousQuery): QueryStatus = {
QueryStatus(query.isActive, query.exception, query.sourceStatuses, query.sinkStatus)
}
}
}