Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ object MimaExcludes {

// [SPARK-26616][MLlib] Expose document frequency in IDFModel
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.IDFModel.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf"),

// [SPARK-28199][SS] Remove deprecated ProcessingTime
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$")
)

// Exclude rules for 2.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import scala.concurrent.duration.Duration;

import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
Expand All @@ -40,7 +41,7 @@ public class Trigger {
* @since 2.2.0
*/
public static Trigger ProcessingTime(long intervalMs) {
return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS);
return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -56,7 +57,7 @@ public static Trigger ProcessingTime(long intervalMs) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
return ProcessingTime.create(interval, timeUnit);
return ProcessingTimeTrigger.create(interval, timeUnit);
}

/**
Expand All @@ -71,7 +72,7 @@ public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(Duration interval) {
return ProcessingTime.apply(interval);
return ProcessingTimeTrigger.apply(interval);
}

/**
Expand All @@ -84,7 +85,7 @@ public static Trigger ProcessingTime(Duration interval) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(String interval) {
return ProcessingTime.apply(interval);
return ProcessingTimeTrigger.apply(interval);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchSt
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.util.Clock

class MicroBatchExecution(
Expand All @@ -51,7 +51,7 @@ class MicroBatchExecution(
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.internal.Logging
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.util.{Clock, SystemClock}

trait TriggerExecutor {
Expand All @@ -43,7 +42,9 @@ case class OneTimeExecutor() extends TriggerExecutor {
/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/
case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
case class ProcessingTimeExecutor(
processingTime: ProcessingTimeTrigger,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename the variable together.

  • processingTime -> processingTimeTrigger.
  • private val intervalMs = processingTime.intervalMs -> private val intervalMs = processingTimeTrigger.intervalMs

clock: Clock = new SystemClock())
extends TriggerExecutor with Logging {

private val intervalMs = processingTime.intervalMs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@

package org.apache.spark.sql.execution.streaming

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.unsafe.types.CalendarInterval

/**
* A [[Trigger]] that processes only one batch of data in a streaming query then terminates
Expand All @@ -27,3 +32,34 @@ import org.apache.spark.sql.streaming.Trigger
@Experimental
@Evolving
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove the annotations? it's private but the annotations say it's an API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right. These classes are now not intended to expose so should remove annotations. Thanks for finding it out!

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well... in reality that was done in #25200. Let's make sure we check the latest code (not the code diff) while doing post-hoc review after long delay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sure. Thanks :D.

case object OneTimeTrigger extends Trigger

/**
* A [[Trigger]] that runs a query periodically based on the processing time. If `interval` is 0,
* the query will run as fast as possible.
*/
@Evolving
private[sql] case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put this into another file like ContinuousTrigger.scala and the previous ProcessingTime.scala?
If possible, please do git mv for renaming ProcessingTime to ProcessingTimeTrigger. Then, update the new file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this as @srowen's suggestion (#24996 (comment)) as OneTimeTrigger is there without its own file.

I'm still not sure, but if the intention of deprecation is hiding implementations to end users, actually I'd also like to move ContinuousTrigger to Triggers.scala, as they can be controlled together.

If we change the mind to have file per implementation, Triggers.scala would be better to be renamed as OneTimeTrigger.scala too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya. Moving ContinuousTrigger to Triggers.scala is also a possible way to be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to keep original class as we haven't deprecated it yet, and to allow end users to only create Trigger implementations as Trigger.xxx, we may also want to deprecated some more classes as well.

The change may look like below commit:
HeartSaVioR@f8488cf

IMHO this could be considered as another issue as more deprecations are happening. WDYT?

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, moving looks okay, but the new deprecation of OneTimeTrigger there is unexpected one for me.

@deprecated("use Trigger.Once()", "3.0.0")
// NOTE: In later release, we can change this to `private[sql]` and remove deprecated.
case object OneTimeTrigger extends Trigger

Please make another PR for the deprecation of OneTimeTrigger if we really need that.

If the PR has multiple themes unexpectedly, we cannot merge it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is we follow the comment above by moving OneTimeTrigger too, and leave it there. That's more consistent, and consistent with the intent. This leaves the class potentially 'public' in the bytecode, but it was before, and that's true of lots of private[spark] classes anyway. While I wouldn't argue with further moving things to sql/execution, I think that just putting the implementations in their natural home right now sounds coherent and an improvement, and doesn't expand the change much.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jul 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I guess I mixed up. My bad, not moving OneTimeTrigger but moving ContinuousTrigger. Let me enumerate necessary changes from what I understand:

  • OneTimeTrigger is already in Triggers.scala but it's open to public, so need to add deprecation and plan to hide it.
    • I guess adding private[sql] would work since Triggers.scala is already in sql.execution.streaming package. Not sure exact package of sql.execution will be hidden.
    • Otherwise we can add replacement as same as what we do for ProcessingTime, but requires bigger changeset.
  • Move ContinuousTrigger to Triggers.scala with private[sql] scope, and deprecate origin.

I guess both moving and deprecating make the changeset looking verbose, but I guess even in major release we may not want to remove public classes which haven't been deprecated.

I guess my commit (HeartSaVioR@f8488cf) mentioned above already covered it, so please take a look at the commit. If we are OK to go or would like to continue reviewing under the commit, I'll add the commit to the PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops I misspoke, I mean "move the implementations to Triggers.scala".
I'm personally OK with not even deprecating, just moving, as it's a major release and a small detail, but, OK with deprecation too.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jul 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah either is fine for me too. If we would like to have simpler one, skipping deprecation would work. If we would like to have safer one (possibly user facing API), deprecation would work. I'd like to ask the decision for committers/PMC members, as it seems like related to some policy on the project.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To move this forward, I suggest we just move the class and skip deprecation. A note in the Spark 3.0 migration guide about streaming would be good, as we're removing a deprecated class anyway.

require(intervalMs >= 0, "the interval of trigger should not be negative")
}

private[sql] object ProcessingTimeTrigger {
def apply(interval: String): ProcessingTimeTrigger = {
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
new ProcessingTimeTrigger(TimeUnit.MICROSECONDS.toMillis(cal.microseconds))
}

def apply(interval: Duration): ProcessingTimeTrigger = {
ProcessingTimeTrigger(interval.toMillis)
}

def create(interval: String): ProcessingTimeTrigger = {
apply(interval)
}

def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = {
ProcessingTimeTrigger(unit.toMillis(interval))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, TableCapability}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.util.Clock

class ContinuousExecution(
Expand Down Expand Up @@ -93,7 +93,7 @@ class ContinuousExecution(
}

private val triggerExecutor = trigger match {
case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTimeTrigger(t), triggerClock)
case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
import org.apache.spark.sql.streaming.Trigger

class ProcessingTimeSuite extends SparkFunSuite {

test("create") {
def getIntervalMs(trigger: Trigger): Long = trigger.asInstanceOf[ProcessingTime].intervalMs
def getIntervalMs(trigger: Trigger): Long = {
trigger.asInstanceOf[ProcessingTimeTrigger].intervalMs
}

assert(getIntervalMs(Trigger.ProcessingTime(10.seconds)) === 10 * 1000)
assert(getIntervalMs(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) === 10 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.streaming.util.StreamManualClock

class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
Expand All @@ -35,7 +34,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
val timeout = 10.seconds

test("nextBatchTime") {
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100))
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTimeTrigger(100))
assert(processingTimeExecutor.nextBatchTime(0) === 100)
assert(processingTimeExecutor.nextBatchTime(1) === 100)
assert(processingTimeExecutor.nextBatchTime(99) === 100)
Expand All @@ -49,7 +48,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
val clock = new StreamManualClock()
@volatile var continueExecuting = true
@volatile var clockIncrementInTrigger = 0L
val executor = ProcessingTimeExecutor(ProcessingTime("1000 milliseconds"), clock)
val executor = ProcessingTimeExecutor(ProcessingTimeTrigger("1000 milliseconds"), clock)
val executorThread = new Thread() {
override def run(): Unit = {
executor.execute(() => {
Expand Down Expand Up @@ -97,7 +96,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {

test("calling nextBatchTime with the result of a previous call should return the next interval") {
val intervalMS = 100
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS))
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTimeTrigger(intervalMS))

val ITERATION = 10
var nextBatchTime: Long = 0
Expand All @@ -111,7 +110,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {

private def testBatchTermination(intervalMs: Long): Unit = {
var batchCounts = 0
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs))
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTimeTrigger(intervalMs))
processingTimeExecutor.execute(() => {
batchCounts += 1
// If the batch termination works correctly, batchCounts should be 3 after `execute`
Expand All @@ -130,7 +129,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
@volatile var batchFallingBehindCalled = false
val t = new Thread() {
override def run(): Unit = {
val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTime(100), clock) {
val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTimeTrigger(100), clock) {
override def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = {
batchFallingBehindCalled = true
}
Expand Down