Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ processing operations. These building blocks can be used to transform and manipu
- **Map:** Transforms each element in the stream.
- **FlatMap:** Transforms each element into a stream of slices of zero or more elements.
- **Filter:** Selects elements from the stream based on a condition.
- **Fold:** Combines elements of the stream with the last folded value and emits the new value.
Requires an initial value.
- **Reduce:** Combines elements of the stream with the last reduced value and emits the new value.
Does not require an initial value.
- **PassThrough:** Passes elements through unchanged.
- **Split<sup>1</sup>:** Divides the stream into two streams based on a boolean predicate.
- **FanOut<sup>1</sup>:** Duplicates the stream to multiple outputs for parallel processing.
Expand Down
3 changes: 1 addition & 2 deletions examples/maze/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
file
out.txt
maze
89 changes: 53 additions & 36 deletions examples/maze/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"math/rand"
"strings"
"time"

ext "github.com/reugn/go-streams/extension"
Expand All @@ -10,61 +12,76 @@ import (
type Move int

const (
LEFT Move = iota
RIGHT
left Move = iota
right
)

// Simulate waling through a maze
var MAZE = "------"
var MOVES = []Move{RIGHT, RIGHT, LEFT, RIGHT, LEFT, LEFT, RIGHT, RIGHT, RIGHT, LEFT}
const (
mazeLength = 10
movesNumber = 20
)

var maze = strings.Repeat("-", mazeLength)

// Simulate walking through a maze.
func main() {
source := ext.NewChanSource(moveChan(500*time.Millisecond, movesNumber))
positionFlow := flow.NewFold(mazeLength/2, move)
formatFlow := flow.NewMap(format, 1)
sink := ext.NewStdoutSink()

source.
Via(positionFlow).
Via(formatFlow).
To(sink)
}

// move calculates the next position given the current position and a Move.
// If the move is invalid (out of bounds), the original position is returned.
func move(m Move, pos int) int {
if m == RIGHT {
switch {
case m == left && pos > 0:
return pos - 1
case m == right && pos < len(maze)-1:
return pos + 1
default:
return pos
}

return pos - 1
}

// format marks the position with an X.
func format(pos int) string {
// Mark the position with an X
positionInMaze := MAZE[:pos] + "X" + MAZE[pos+1:]
return positionInMaze
return maze[:pos] + "X" + maze[pos+1:]
}

// Make a move every interval
func moveChan(interval time.Duration) chan any {
// Create a sequence of moves
// moveChan creates a channel that emits n random moves at the specified interval.
func moveChan(interval time.Duration, n int) chan any {
outChan := make(chan any)

go func() {
ticker := time.NewTicker(interval)
// Start at position 0
pos := 0
for _ = range ticker.C {
// Send the next move
outChan <- MOVES[pos]
// Move to the next position
pos = (pos + 1)
if pos >= len(MOVES) {
// Stop
close(outChan)
break
}
defer close(outChan)
// generate a sequence of moves
moves := generateRandomMoves(n)
for i := 0; i < n; i++ {
time.Sleep(interval)
// send the next move
outChan <- moves[i]
}
}()

return outChan
}

func main() {
source := ext.NewChanSource(moveChan(time.Second))
positionFlow := flow.NewFold(0, move)
formatFlow := flow.NewMap(format, 1)
sink := ext.NewStdoutSink()
// generateRandomMoves creates a random sequence of moves with length n.
func generateRandomMoves(n int) []Move {
if n <= 0 {
return []Move{}
}

source.
Via(positionFlow).
Via(formatFlow).
To(sink)
moves := make([]Move, n)
for i := 0; i < n; i++ {
moves[i] = Move(rand.Intn(2)) //nolint:gosec
}

return moves
}
9 changes: 6 additions & 3 deletions flow/fold.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
// FoldFunction represents a Fold transformation function.
type FoldFunction[T, R any] func(T, R) R

// Fold takes one element and produces one element.
// Fold implements a "rolling" fold transformation on a data stream with an
// initial value. Combines the current element with the last folded value and
// emits the new value.
//
// in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
//
Expand All @@ -27,7 +29,8 @@ var _ streams.Flow = (*Fold[any, any])(nil)
// NewFold returns a new Fold operator.
// T specifies the incoming element type, and the outgoing element type is R.
//
// FoldFunction is the Fold transformation function.
// init is the initial value for the folding process.
// foldFunction is the function that performs the fold transformation.
func NewFold[T, R any](init R, foldFunction FoldFunction[T, R]) *Fold[T, R] {
foldFlow := &Fold[T, R]{
init: init,
Expand Down Expand Up @@ -71,7 +74,7 @@ func (m *Fold[T, R]) transmit(inlet streams.Inlet) {
}

func (m *Fold[T, R]) doStream() {
var lastFolded = m.init
lastFolded := m.init
for element := range m.in {
lastFolded = m.foldFunction(element.(T), lastFolded)
m.out <- lastFolded
Expand Down
34 changes: 27 additions & 7 deletions flow/fold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,25 @@ func TestFold(t *testing.T) {
tests := []struct {
name string
foldFlow streams.Flow
ptr bool
}{
{
name: "strings",
name: "values",
foldFlow: flow.NewFold(
"",
func(a int, b string) string {
return b + strconv.Itoa(a)
}),
ptr: false,
},
{
name: "pointers",
foldFlow: flow.NewFold(
"",
func(a *int, b string) string {
return b + strconv.Itoa(*a)
}),
ptr: true,
},
}
input := []int{1, 2, 3, 4, 5}
Expand All @@ -34,13 +45,22 @@ func TestFold(t *testing.T) {
source := ext.NewChanSource(in)
sink := ext.NewChanSink(out)

ingestSlice(input, in)
close(in)
if tt.ptr {
ingestSlice(ptrSlice(input), in)
close(in)

source.
Via(tt.foldFlow).
To(sink)
} else {
ingestSlice(input, in)
close(in)

source.
Via(tt.foldFlow).
Via(flow.NewPassThrough()). // Via coverage
To(sink)
source.
Via(tt.foldFlow).
Via(flow.NewPassThrough()). // Via coverage
To(sink)
}

output := readSlice[string](out)
assert.Equal(t, expected, output)
Expand Down
14 changes: 6 additions & 8 deletions flow/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// ReduceFunction combines the current element with the last reduced value.
type ReduceFunction[T any] func(T, T) T

// Reduce represents a “rolling” reduce on a data stream.
// Reduce implements a “rolling” reduce transformation on a data stream.
// Combines the current element with the last reduced value and emits the new value.
//
// in -- 1 -- 2 ---- 3 -- 4 ------ 5 --
Expand All @@ -19,7 +19,6 @@ type Reduce[T any] struct {
reduceFunction ReduceFunction[T]
in chan any
out chan any
lastReduced any
}

// Verify Reduce satisfies the Flow interface.
Expand Down Expand Up @@ -71,15 +70,14 @@ func (r *Reduce[T]) transmit(inlet streams.Inlet) {
}

func (r *Reduce[T]) doStream() {
var lastReduced any
for element := range r.in {
if r.lastReduced == nil {
r.lastReduced = element
if lastReduced == nil {
lastReduced = element
} else {
r.lastReduced = r.reduceFunction(
r.lastReduced.(T),
element.(T))
lastReduced = r.reduceFunction(lastReduced.(T), element.(T))
}
r.out <- r.lastReduced
r.out <- lastReduced
}
close(r.out)
}
Loading