Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 91 additions & 68 deletions flow/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,26 @@ import (
"github.com/reugn/go-streams"
)

// timedElement stores an incoming element along with its timestamp.
// SlidingWindowOpts represents SlidingWindow configuration options.
type SlidingWindowOpts[T any] struct {
// EventTimeExtractor is a function that extracts the event time from an element.
// Event time is the time at which the event occurred on its producing device.
// Using event time enables correct windowing even when events arrive out of order
// or with delays.
//
// If EventTimeExtractor is not specified, processing time is used. Processing time
// refers to the system time of the machine executing the window operation.
EventTimeExtractor func(T) time.Time
// EmitPartialWindow determines whether to emit window elements before the first
// full window duration has elapsed. If false, the first window will only be
// emitted after the full window duration.
EmitPartialWindow bool
}

// timedElement stores an incoming element along with its event time.
type timedElement[T any] struct {
element T
timestamp int64
eventTime time.Time
}

// SlidingWindow assigns elements to windows of fixed length configured by the window
Expand All @@ -21,14 +37,16 @@ type timedElement[T any] struct {
// In this case elements are assigned to multiple windows.
// T indicates the incoming element type, and the outgoing element type is []T.
type SlidingWindow[T any] struct {
mu sync.Mutex
windowSize time.Duration
slidingInterval time.Duration
queue []timedElement[T]
in chan any
out chan any
done chan struct{}
timestampExtractor func(T) int64
mu sync.Mutex
windowSize time.Duration
slidingInterval time.Duration
queue []timedElement[T]

in chan any
out chan any
done chan struct{}

opts SlidingWindowOpts[T]
}

// Verify SlidingWindow satisfies the Flow interface.
Expand All @@ -43,40 +61,36 @@ var _ streams.Flow = (*SlidingWindow[any])(nil)
// slidingInterval is the sliding interval of generated windows.
//
// NewSlidingWindow panics if slidingInterval is larger than windowSize.
func NewSlidingWindow[T any](
windowSize time.Duration,
slidingInterval time.Duration) *SlidingWindow[T] {
return NewSlidingWindowWithExtractor[T](windowSize, slidingInterval, nil)
func NewSlidingWindow[T any](windowSize, slidingInterval time.Duration) *SlidingWindow[T] {
return NewSlidingWindowWithOpts[T](windowSize, slidingInterval, SlidingWindowOpts[T]{})
}

// NewSlidingWindowWithExtractor returns a new SlidingWindow operator based on event time.
// Event time is the time that each individual event occurred on its producing device.
// Gives correct results on out-of-order events, late events, or on replays of data.
// NewSlidingWindowWithOpts returns a new SlidingWindow operator configured with the
// provided configuration options.
// T specifies the incoming element type, and the outgoing element type is []T.
//
// windowSize is the Duration of generated windows.
// slidingInterval is the sliding interval of generated windows.
// timestampExtractor is the record timestamp (in nanoseconds) extractor.
// opts are the sliding window configuration options.
//
// NewSlidingWindowWithExtractor panics if slidingInterval is larger than windowSize.
func NewSlidingWindowWithExtractor[T any](
windowSize time.Duration,
slidingInterval time.Duration,
timestampExtractor func(T) int64) *SlidingWindow[T] {
// NewSlidingWindowWithOpts panics if slidingInterval is larger than windowSize.
func NewSlidingWindowWithOpts[T any](
windowSize, slidingInterval time.Duration, opts SlidingWindowOpts[T]) *SlidingWindow[T] {

if windowSize < slidingInterval {
panic("sliding interval is larger than window size")
}

slidingWindow := &SlidingWindow[T]{
windowSize: windowSize,
slidingInterval: slidingInterval,
queue: make([]timedElement[T], 0),
in: make(chan any),
out: make(chan any),
done: make(chan struct{}),
timestampExtractor: timestampExtractor,
windowSize: windowSize,
slidingInterval: slidingInterval,
in: make(chan any),
out: make(chan any),
done: make(chan struct{}),
opts: opts,
}

// start buffering incoming stream elements
go slidingWindow.receive()

return slidingWindow
Expand Down Expand Up @@ -115,23 +129,25 @@ func (sw *SlidingWindow[T]) transmit(inlet streams.Inlet) {
close(inlet.In())
}

// timestamp extracts the timestamp from an element if the timestampExtractor is set.
// Otherwise, the system time is returned.
func (sw *SlidingWindow[T]) timestamp(element T) int64 {
if sw.timestampExtractor == nil {
return time.Now().UnixNano()
// eventTime extracts the time from an element if the EventTimeExtractor is set.
// Otherwise, the processing time is returned.
func (sw *SlidingWindow[T]) eventTime(element T) time.Time {
if sw.opts.EventTimeExtractor == nil {
return time.Now()
}
return sw.timestampExtractor(element)
return sw.opts.EventTimeExtractor(element)
}

// receive buffers the incoming elements by pushing them into the queue,
// wrapping the original item into a timedElement along with its timestamp.
// wrapping the original item into a timedElement along with its event time.
func (sw *SlidingWindow[T]) receive() {
for element := range sw.in {
eventTime := sw.eventTime(element.(T))

sw.mu.Lock()
timed := timedElement[T]{
element: element.(T),
timestamp: sw.timestamp(element.(T)),
eventTime: eventTime,
}
sw.queue = append(sw.queue, timed)
sw.mu.Unlock()
Expand All @@ -141,8 +157,16 @@ func (sw *SlidingWindow[T]) receive() {

// emit captures and emits a new window every sw.slidingInterval.
func (sw *SlidingWindow[T]) emit() {
// wait for the sliding window to start
time.Sleep(sw.windowSize - sw.slidingInterval)
if !sw.opts.EmitPartialWindow {
timer := time.NewTimer(sw.windowSize - sw.slidingInterval)
select {
case <-timer.C:
case <-sw.done:
timer.Stop()
close(sw.out)
return
}
}

lastTick := time.Now()
ticker := time.NewTicker(sw.slidingInterval)
Expand All @@ -165,29 +189,13 @@ func (sw *SlidingWindow[T]) emit() {
// window to the output channel and moving the window to the next position.
func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
sw.mu.Lock()

// sort elements in the queue by their timestamp
// sort elements in the queue by their time
sort.Slice(sw.queue, func(i, j int) bool {
return sw.queue[i].timestamp < sw.queue[j].timestamp
return sw.queue[i].eventTime.Before(sw.queue[j].eventTime)
})

// calculate the next window start time
nextWindowStartTime := tick.Add(-sw.windowSize).Add(sw.slidingInterval).UnixNano()
// initialize the next window queue
var nextWindowQueue []timedElement[T]
for i, element := range sw.queue {
if element.timestamp > nextWindowStartTime {
nextWindowQueue = make([]timedElement[T], len(sw.queue)-i)
_ = copy(nextWindowQueue, sw.queue[i:])
break
}
}

// extract current window elements
windowElements := extractWindowElements(sw.queue, tick.UnixNano())
// move the window
sw.queue = nextWindowQueue

windowElements := sw.extractWindowElements(tick)
sw.mu.Unlock()

// send elements downstream if the current window is not empty
Expand All @@ -196,16 +204,31 @@ func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
}
}

// extractWindowElements extracts current window elements from the given slice
// of timedElement. Elements newer than now will not be included.
func extractWindowElements[T any](timed []timedElement[T], now int64) []T {
elements := make([]T, 0, len(timed))
for _, timedElement := range timed {
if timedElement.timestamp < now {
elements = append(elements, timedElement.element)
} else {
break // we can break since the input is an ordered slice
// extractWindowElements extracts and returns elements from the sliding window that
// fall within the current window. Elements newer than tick will not be included.
// The sliding window queue is updated to remove previous interval elements.
func (sw *SlidingWindow[T]) extractWindowElements(tick time.Time) []T {
// calculate the next window start time
nextWindowStartTime := tick.Add(-sw.windowSize).Add(sw.slidingInterval)

elements := make([]T, 0, len(sw.queue))
var remainingElements []timedElement[T]
for i, element := range sw.queue {
if remainingElements == nil && element.eventTime.After(nextWindowStartTime) {
// copy remaining elements
remainingElements = make([]timedElement[T], len(sw.queue)-i)
_ = copy(remainingElements, sw.queue[i:])
}
switch {
case element.eventTime.Before(tick):
elements = append(elements, element.element)
default:
break // we can break since the queue is ordered
}
}

// move the window
sw.queue = remainingElements

return elements
}
Loading
Loading