diff --git a/flow/sliding_window.go b/flow/sliding_window.go index aed2bec..59ebe03 100644 --- a/flow/sliding_window.go +++ b/flow/sliding_window.go @@ -8,10 +8,26 @@ import ( "github.com/reugn/go-streams" ) -// timedElement stores an incoming element along with its timestamp. +// SlidingWindowOpts represents SlidingWindow configuration options. +type SlidingWindowOpts[T any] struct { + // EventTimeExtractor is a function that extracts the event time from an element. + // Event time is the time at which the event occurred on its producing device. + // Using event time enables correct windowing even when events arrive out of order + // or with delays. + // + // If EventTimeExtractor is not specified, processing time is used. Processing time + // refers to the system time of the machine executing the window operation. + EventTimeExtractor func(T) time.Time + // EmitPartialWindow determines whether to emit window elements before the first + // full window duration has elapsed. If false, the first window will only be + // emitted after the full window duration. + EmitPartialWindow bool +} + +// timedElement stores an incoming element along with its event time. type timedElement[T any] struct { element T - timestamp int64 + eventTime time.Time } // SlidingWindow assigns elements to windows of fixed length configured by the window @@ -21,14 +37,16 @@ type timedElement[T any] struct { // In this case elements are assigned to multiple windows. // T indicates the incoming element type, and the outgoing element type is []T. type SlidingWindow[T any] struct { - mu sync.Mutex - windowSize time.Duration - slidingInterval time.Duration - queue []timedElement[T] - in chan any - out chan any - done chan struct{} - timestampExtractor func(T) int64 + mu sync.Mutex + windowSize time.Duration + slidingInterval time.Duration + queue []timedElement[T] + + in chan any + out chan any + done chan struct{} + + opts SlidingWindowOpts[T] } // Verify SlidingWindow satisfies the Flow interface. @@ -43,40 +61,36 @@ var _ streams.Flow = (*SlidingWindow[any])(nil) // slidingInterval is the sliding interval of generated windows. // // NewSlidingWindow panics if slidingInterval is larger than windowSize. -func NewSlidingWindow[T any]( - windowSize time.Duration, - slidingInterval time.Duration) *SlidingWindow[T] { - return NewSlidingWindowWithExtractor[T](windowSize, slidingInterval, nil) +func NewSlidingWindow[T any](windowSize, slidingInterval time.Duration) *SlidingWindow[T] { + return NewSlidingWindowWithOpts[T](windowSize, slidingInterval, SlidingWindowOpts[T]{}) } -// NewSlidingWindowWithExtractor returns a new SlidingWindow operator based on event time. -// Event time is the time that each individual event occurred on its producing device. -// Gives correct results on out-of-order events, late events, or on replays of data. +// NewSlidingWindowWithOpts returns a new SlidingWindow operator configured with the +// provided configuration options. // T specifies the incoming element type, and the outgoing element type is []T. // // windowSize is the Duration of generated windows. // slidingInterval is the sliding interval of generated windows. -// timestampExtractor is the record timestamp (in nanoseconds) extractor. +// opts are the sliding window configuration options. // -// NewSlidingWindowWithExtractor panics if slidingInterval is larger than windowSize. -func NewSlidingWindowWithExtractor[T any]( - windowSize time.Duration, - slidingInterval time.Duration, - timestampExtractor func(T) int64) *SlidingWindow[T] { +// NewSlidingWindowWithOpts panics if slidingInterval is larger than windowSize. +func NewSlidingWindowWithOpts[T any]( + windowSize, slidingInterval time.Duration, opts SlidingWindowOpts[T]) *SlidingWindow[T] { if windowSize < slidingInterval { panic("sliding interval is larger than window size") } slidingWindow := &SlidingWindow[T]{ - windowSize: windowSize, - slidingInterval: slidingInterval, - queue: make([]timedElement[T], 0), - in: make(chan any), - out: make(chan any), - done: make(chan struct{}), - timestampExtractor: timestampExtractor, + windowSize: windowSize, + slidingInterval: slidingInterval, + in: make(chan any), + out: make(chan any), + done: make(chan struct{}), + opts: opts, } + + // start buffering incoming stream elements go slidingWindow.receive() return slidingWindow @@ -115,23 +129,25 @@ func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) { close(inlet.In()) } -// timestamp extracts the timestamp from an element if the timestampExtractor is set. -// Otherwise, the system time is returned. -func (sw *SlidingWindow[T]) timestamp(element T) int64 { - if sw.timestampExtractor == nil { - return time.Now().UnixNano() +// eventTime extracts the time from an element if the EventTimeExtractor is set. +// Otherwise, the processing time is returned. +func (sw *SlidingWindow[T]) eventTime(element T) time.Time { + if sw.opts.EventTimeExtractor == nil { + return time.Now() } - return sw.timestampExtractor(element) + return sw.opts.EventTimeExtractor(element) } // receive buffers the incoming elements by pushing them into the queue, -// wrapping the original item into a timedElement along with its timestamp. +// wrapping the original item into a timedElement along with its event time. func (sw *SlidingWindow[T]) receive() { for element := range sw.in { + eventTime := sw.eventTime(element.(T)) + sw.mu.Lock() timed := timedElement[T]{ element: element.(T), - timestamp: sw.timestamp(element.(T)), + eventTime: eventTime, } sw.queue = append(sw.queue, timed) sw.mu.Unlock() @@ -141,8 +157,16 @@ func (sw *SlidingWindow[T]) receive() { // emit captures and emits a new window every sw.slidingInterval. func (sw *SlidingWindow[T]) emit() { - // wait for the sliding window to start - time.Sleep(sw.windowSize - sw.slidingInterval) + if !sw.opts.EmitPartialWindow { + timer := time.NewTimer(sw.windowSize - sw.slidingInterval) + select { + case <-timer.C: + case <-sw.done: + timer.Stop() + close(sw.out) + return + } + } lastTick := time.Now() ticker := time.NewTicker(sw.slidingInterval) @@ -165,29 +189,13 @@ func (sw *SlidingWindow[T]) emit() { // window to the output channel and moving the window to the next position. func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) { sw.mu.Lock() - - // sort elements in the queue by their timestamp + // sort elements in the queue by their time sort.Slice(sw.queue, func(i, j int) bool { - return sw.queue[i].timestamp < sw.queue[j].timestamp + return sw.queue[i].eventTime.Before(sw.queue[j].eventTime) }) - // calculate the next window start time - nextWindowStartTime := tick.Add(-sw.windowSize).Add(sw.slidingInterval).UnixNano() - // initialize the next window queue - var nextWindowQueue []timedElement[T] - for i, element := range sw.queue { - if element.timestamp > nextWindowStartTime { - nextWindowQueue = make([]timedElement[T], len(sw.queue)-i) - _ = copy(nextWindowQueue, sw.queue[i:]) - break - } - } - // extract current window elements - windowElements := extractWindowElements(sw.queue, tick.UnixNano()) - // move the window - sw.queue = nextWindowQueue - + windowElements := sw.extractWindowElements(tick) sw.mu.Unlock() // send elements downstream if the current window is not empty @@ -196,16 +204,31 @@ func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) { } } -// extractWindowElements extracts current window elements from the given slice -// of timedElement. Elements newer than now will not be included. -func extractWindowElements[T any](timed []timedElement[T], now int64) []T { - elements := make([]T, 0, len(timed)) - for _, timedElement := range timed { - if timedElement.timestamp < now { - elements = append(elements, timedElement.element) - } else { - break // we can break since the input is an ordered slice +// extractWindowElements extracts and returns elements from the sliding window that +// fall within the current window. Elements newer than tick will not be included. +// The sliding window queue is updated to remove previous interval elements. +func (sw *SlidingWindow[T]) extractWindowElements(tick time.Time) []T { + // calculate the next window start time + nextWindowStartTime := tick.Add(-sw.windowSize).Add(sw.slidingInterval) + + elements := make([]T, 0, len(sw.queue)) + var remainingElements []timedElement[T] + for i, element := range sw.queue { + if remainingElements == nil && element.eventTime.After(nextWindowStartTime) { + // copy remaining elements + remainingElements = make([]timedElement[T], len(sw.queue)-i) + _ = copy(remainingElements, sw.queue[i:]) + } + switch { + case element.eventTime.Before(tick): + elements = append(elements, element.element) + default: + break // we can break since the queue is ordered } } + + // move the window + sw.queue = remainingElements + return elements } diff --git a/flow/sliding_window_test.go b/flow/sliding_window_test.go index ce34596..974d1ef 100644 --- a/flow/sliding_window_test.go +++ b/flow/sliding_window_test.go @@ -11,8 +11,8 @@ import ( ) func TestSlidingWindow(t *testing.T) { - in := make(chan any) - out := make(chan any) + in := make(chan any, 7) + out := make(chan any, 7) source := ext.NewChanSource(in) slidingWindow := flow.NewSlidingWindow[string](50*time.Millisecond, 20*time.Millisecond) @@ -27,11 +27,9 @@ func TestSlidingWindow(t *testing.T) { closeDeferred(in, 250*time.Millisecond) }() - go func() { - source. - Via(slidingWindow). - To(sink) - }() + source. + Via(slidingWindow). + To(sink) outputValues := readSlice[[]string](sink.Out) fmt.Println(outputValues) @@ -52,41 +50,42 @@ type element struct { } func TestSlidingWindow_WithExtractor(t *testing.T) { - in := make(chan any) - out := make(chan any) + in := make(chan any, 10) + out := make(chan any, 7) source := ext.NewChanSource(in) - slidingWindow := flow.NewSlidingWindowWithExtractor( + slidingWindow := flow.NewSlidingWindowWithOpts( 50*time.Millisecond, 20*time.Millisecond, - func(e element) int64 { - return e.ts + flow.SlidingWindowOpts[element]{ + EventTimeExtractor: func(e element) time.Time { + return time.UnixMilli(e.ts) + }, }) sink := ext.NewChanSink(out) now := time.Now() inputValues := []element{ - {"c", now.Add(29 * time.Millisecond).UnixNano()}, - {"a", now.Add(2 * time.Millisecond).UnixNano()}, - {"b", now.Add(17 * time.Millisecond).UnixNano()}, - {"d", now.Add(35 * time.Millisecond).UnixNano()}, - {"f", now.Add(93 * time.Millisecond).UnixNano()}, - {"e", now.Add(77 * time.Millisecond).UnixNano()}, - {"g", now.Add(120 * time.Millisecond).UnixNano()}, + {"c", now.Add(29 * time.Millisecond).UnixMilli()}, + {"a", now.Add(2 * time.Millisecond).UnixMilli()}, + {"b", now.Add(17 * time.Millisecond).UnixMilli()}, + {"d", now.Add(35 * time.Millisecond).UnixMilli()}, + {"f", now.Add(93 * time.Millisecond).UnixMilli()}, + {"e", now.Add(77 * time.Millisecond).UnixMilli()}, + {"g", now.Add(119 * time.Millisecond).UnixMilli()}, } go ingestSlice(inputValues, in) go closeDeferred(in, 250*time.Millisecond) + // send some out-of-order events - go ingestDeferred(element{"h", now.Add(5 * time.Millisecond).UnixNano()}, + go ingestDeferred(element{"h", now.Add(5 * time.Millisecond).UnixMilli()}, in, 145*time.Millisecond) - go ingestDeferred(element{"i", now.Add(3 * time.Millisecond).UnixNano()}, + go ingestDeferred(element{"i", now.Add(3 * time.Millisecond).UnixMilli()}, in, 145*time.Millisecond) - go func() { - source. - Via(slidingWindow). - To(sink) - }() + source. + Via(slidingWindow). + To(sink) var outputValues [][]string for e := range sink.Out { @@ -113,42 +112,44 @@ func elementValues(elements []element) []string { } func TestSlidingWindow_WithExtractorPtr(t *testing.T) { - in := make(chan any) - out := make(chan any) + in := make(chan any, 10) + out := make(chan any, 10) source := ext.NewChanSource(in) - slidingWindow := flow.NewSlidingWindowWithExtractor( + slidingWindow := flow.NewSlidingWindowWithOpts( 50*time.Millisecond, 20*time.Millisecond, - func(e *element) int64 { - return e.ts + flow.SlidingWindowOpts[*element]{ + EventTimeExtractor: func(e *element) time.Time { + return time.UnixMilli(e.ts) + }, + EmitPartialWindow: true, }) sink := ext.NewChanSink(out) now := time.Now() inputValues := []*element{ - {"c", now.Add(29 * time.Millisecond).UnixNano()}, - {"a", now.Add(2 * time.Millisecond).UnixNano()}, - {"b", now.Add(17 * time.Millisecond).UnixNano()}, - {"d", now.Add(35 * time.Millisecond).UnixNano()}, - {"f", now.Add(93 * time.Millisecond).UnixNano()}, - {"e", now.Add(77 * time.Millisecond).UnixNano()}, - {"g", now.Add(120 * time.Millisecond).UnixNano()}, + {"c", now.Add(29 * time.Millisecond).UnixMilli()}, + {"a", now.Add(2 * time.Millisecond).UnixMilli()}, + {"b", now.Add(17 * time.Millisecond).UnixMilli()}, + {"d", now.Add(35 * time.Millisecond).UnixMilli()}, + {"f", now.Add(93 * time.Millisecond).UnixMilli()}, + {"e", now.Add(77 * time.Millisecond).UnixMilli()}, + {"g", now.Add(119 * time.Millisecond).UnixMilli()}, } go ingestSlice(inputValues, in) go closeDeferred(in, 250*time.Millisecond) + // send some out-of-order events - go ingestDeferred(&element{"h", now.Add(5 * time.Millisecond).UnixNano()}, + go ingestDeferred(&element{"h", now.Add(5 * time.Millisecond).UnixMilli()}, in, 145*time.Millisecond) - go ingestDeferred(&element{"i", now.Add(3 * time.Millisecond).UnixNano()}, + go ingestDeferred(&element{"i", now.Add(3 * time.Millisecond).UnixMilli()}, in, 145*time.Millisecond) - go func() { - source. - Via(slidingWindow). - Via(flow.NewPassThrough()). // Via coverage - To(sink) - }() + source. + Via(slidingWindow). + Via(flow.NewPassThrough()). // Via coverage + To(sink) var outputValues [][]string for e := range sink.Out { @@ -156,14 +157,16 @@ func TestSlidingWindow_WithExtractorPtr(t *testing.T) { } fmt.Println(outputValues) - assert.Equal(t, 6, len(outputValues)) // [[a b c d] [c d] [e] [e f] [f g] [i h g]] + assert.Equal(t, 8, len(outputValues)) // [[a b] [a b c d] [b c d] [d e] [e f] [e f g] [f g] [i h g]] - assert.Equal(t, []string{"a", "b", "c", "d"}, outputValues[0]) - assert.Equal(t, []string{"c", "d"}, outputValues[1]) - assert.Equal(t, []string{"e"}, outputValues[2]) - assert.Equal(t, []string{"e", "f"}, outputValues[3]) - assert.Equal(t, []string{"f", "g"}, outputValues[4]) - assert.Equal(t, []string{"i", "h", "g"}, outputValues[5]) + assert.Equal(t, []string{"a", "b"}, outputValues[0]) + assert.Equal(t, []string{"a", "b", "c", "d"}, outputValues[1]) + assert.Equal(t, []string{"b", "c", "d"}, outputValues[2]) + assert.Equal(t, []string{"d", "e"}, outputValues[3]) + assert.Equal(t, []string{"e", "f"}, outputValues[4]) + assert.Equal(t, []string{"e", "f", "g"}, outputValues[5]) + assert.Equal(t, []string{"f", "g"}, outputValues[6]) + assert.Equal(t, []string{"i", "h", "g"}, outputValues[7]) } func elementValuesPtr(elements []*element) []string { @@ -179,3 +182,29 @@ func TestSlidingWindow_InvalidArguments(t *testing.T) { flow.NewSlidingWindow[string](10*time.Millisecond, 20*time.Millisecond) }) } + +func TestSlidingWindow_EarlyStreamClosure(t *testing.T) { + in := make(chan any, 3) + out := make(chan any, 3) + + source := ext.NewChanSource(in) + slidingWindow := flow.NewSlidingWindow[element](50*time.Millisecond, 20*time.Millisecond) + sink := ext.NewChanSink(out) + + now := time.Now() + inputValues := []element{ + {"a", now.Add(2 * time.Millisecond).UnixMilli()}, + {"b", now.Add(17 * time.Millisecond).UnixMilli()}, + } + go ingestSlice(inputValues, in) + go closeDeferred(in, 10*time.Millisecond) + + source. + Via(slidingWindow). + To(sink) + + outputValues := readSlice[[]any](sink.Out) + fmt.Println(outputValues) + + assert.Equal(t, 0, len(outputValues)) +}