Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,34 +379,71 @@ fmt.Printf("Important tasks: %d\n", metrics.Get("important-tasks"))

## Flow Control

The package provides two methods for completion:
The package provides several methods for flow control and completion:

```go
// Close tells workers no more data will be submitted
// Used by the producer (sender) of data
// Submit adds items to the pool. Not safe for concurrent use.
// Used by the producer (sender) of data.
p.Submit(item)

// Send safely adds items to the pool from multiple goroutines.
// Used when submitting from worker to another pool, or when multiple goroutines send data.
p.Send(item)

// Close tells workers no more data will be submitted.
// Used by the producer (sender) of data.
p.Close(ctx)

// Wait blocks until all processing is done
// Used by the consumer (receiver) of results
// Wait blocks until all processing is done.
// Used by the consumer (receiver) of results.
p.Wait(ctx)
```

Typical producer/consumer pattern:
Common usage patterns:

```go
// Producer goroutine
// 1. Single producer submitting items
go func() {
defer p.Close(ctx) // signal no more data
for _, task := range tasks {
p.Submit(task)
p.Submit(task) // Submit is safe here - single goroutine
}
}()

// Consumer waits for completion
// 2. Workers submitting to next stage
p1 := pool.New[int](5, pool.WorkerFunc[int](func(ctx context.Context, v int) error {
result := process(v)
p2.Send(result) // Send is safe for concurrent calls from workers
return nil
}))

// 3. Consumer waiting for completion
if err := p.Wait(ctx); err != nil {
// handle error
}
```

Pool completion callback allows executing code when all workers are done:
```go
p := pool.New[string](5, worker).
WithPoolCompleteFn(func(ctx context.Context) error {
// called once after all workers complete
log.Println("all workers finished")
return nil
})
```

The completion callback executes when:
- All workers have completed processing
- Errors occurred but pool continued (`WithContinueOnError()`)
- Does not execute on context cancellation

Important notes:
- Use `Submit` when sending items from a single goroutine
- Use `Send` when workers need to submit items to another pool
- Pool completion callback helps coordinate multi-stage processing
- Errors in completion callback are included in pool's error result

## Optional parameters

Configure pool behavior using With methods:
Expand All @@ -425,7 +462,8 @@ Available options:
- `WithWorkerChanSize(size int)` - sets buffer size for worker channels (default: 1)
- `WithChunkFn(fn func(T) string)` - controls work distribution by key (default: none, random distribution)
- `WithContinueOnError()` - continues processing on errors (default: false)
- `WithCompleteFn(fn func(ctx, id, worker))` - called on worker completion (default: none)
- `WithWorkerCompleteFn(fn func(ctx, id, worker))` - called on worker completion (default: none)
- `WithPoolCompleteFn(fn func(ctx))` - called on pool completion, i.e., when all workers have completed (default: none)

### Alternative pool implementations

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Pool Chain Processing - Example
# Pool Chain Processing (with collectors) - Example

This example demonstrates how to chain multiple worker pools using [go-pkgz/pool](https://github.com/go-pkgz/pool) package to create a concurrent processing pipeline. It shows how to transform data through multiple processing stages while maintaining type safety and proper coordination between pools.

Expand Down
9 changes: 9 additions & 0 deletions exmples/collectors_chain/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module examples/collectors_chain

go 1.24

require github.com/go-pkgz/pool v0.5.0

require golang.org/x/sync v0.11.0 // indirect

replace github.com/go-pkgz/pool => ../..
12 changes: 12 additions & 0 deletions exmples/collectors_chain/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-pkgz/pool v0.5.0 h1:fP0WpEGMAcFEBQ7l7aAZsh7RBkzx34FVgufJoVvDTYY=
github.com/go-pkgz/pool v0.5.0/go.mod h1:e1qn5EYmXshPcOk2buL2ZC20w7RTAWUgbug+L2SyH7I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
14 changes: 9 additions & 5 deletions exmples/chain/main.go → exmples/collectors_chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type counterPool struct {
func newCounterPool(ctx context.Context, workers int) *counterPool {
collector := pool.NewCollector[countData](ctx, workers) // collector to gather results, buffer size == workers
p := pool.New[stringData](workers, pool.WorkerFunc[stringData](func(_ context.Context, n stringData) error {
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) // simulate heavy work
count := strings.Count(inputStrings[n.idx], "a") // use global var for logging only
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond) // simulate heavy work
count := strings.Count(inputStrings[n.idx], "a") // use global var for logging only
if count > 2 {
// demonstrates filtering: only strings with >2 'a's passed to the next stage
collector.Submit(countData{idx: n.idx, count: count, ts: n.ts})
Expand All @@ -66,7 +66,7 @@ type multiplierPool struct {
func newMultiplierPool(ctx context.Context, workers int) *multiplierPool {
collector := pool.NewCollector[multipliedData](ctx, workers)
p := pool.New[countData](workers, pool.WorkerFunc[countData](func(_ context.Context, n countData) error {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
multiplied := n.count * 10 // transform data: multiply by 10
fmt.Printf("multiplied: %d -> %d (src: %q, processing time: %v)\n",
n.count, multiplied, inputStrings[n.idx], time.Since(n.ts))
Expand All @@ -87,7 +87,7 @@ func newSquarePool(ctx context.Context, workers int) *squarePool {
squared := n.value * n.value
fmt.Printf("squared: %d -> %d (src: %q, processing time: %v)\n",
n.value, squared, inputStrings[n.idx], time.Since(n.ts))
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
collector.Submit(finalData{idx: n.idx, result: squared})
return nil
}))
Expand All @@ -114,7 +114,7 @@ func ProcessStrings(ctx context.Context, strings []string) ([]finalData, error)
for i := range strings {
fmt.Printf("submitting: %q\n", strings[i])
counter.WorkerGroup.Submit(stringData{idx: i, ts: time.Now()})
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
time.Sleep(time.Duration(rand.Intn(3)) * time.Millisecond)
}
// close pool and collector when all inputs are submitted
counter.WorkerGroup.Close(ctx)
Expand Down Expand Up @@ -172,6 +172,9 @@ func main() {
"abracadabra",
"bandanna",
"barbarian",
"antarctica",
"arctic",
"baccarat",
}

res, err := ProcessStrings(context.Background(), inputStrings)
Expand All @@ -182,4 +185,5 @@ func main() {
for _, v := range res {
fmt.Printf("src: %q, squared a-count: %d\n", inputStrings[v.idx], v.result)
}
fmt.Printf("\nTotal: %d", len(res))
}
104 changes: 104 additions & 0 deletions exmples/direct_chain/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Pool Chain Processing (direct) - Example

This example demonstrates how to chain multiple worker pools using [go-pkgz/pool](https://github.com/go-pkgz/pool) package to create a concurrent processing pipeline. Pools directly submit data to the next stage, with a collector only at the final stage to gather results.

## Key Concepts

1. Pool Chaining:
- Pools directly reference and send to the next pool
- Single collector at the end of chain
- Each stage processes independently
- Type-safe data transformation between stages

2. Data Flow:
- Input strings -> count 'a's -> multiply by 10 -> square
- Each stage has its own worker pool
- Final collector gathers results
- Processing time tracked at each stage

## Implementation Details

The example shows three key patterns:

1. Pool Declaration and Cross-References:
```go
var pCounter *pool.WorkerGroup[stringData]
var pMulti *pool.WorkerGroup[countData]
var pSquares *pool.WorkerGroup[multipliedData]
collector := pool.NewCollector[finalData](ctx, 10)
```

2. Direct Pool Submission:
```go
pCounter = pool.New[stringData](2, pool.WorkerFunc[stringData](
func(_ context.Context, d stringData) error {
count := strings.Count(d.data, "a")
if count > 2 {
pMulti.Send(countData{...}) // direct submission to next pool, thread safe version of Submit
}
return nil
}))
```

3. Pipeline Coordination:
```go
go func() {
pCounter.Wait(ctx) // wait for first pool
pMulti.Close(ctx) // close second pool
pSquares.Close(ctx) // close final pool
collector.Close() // close collector
}()
```

## Data Flow Types

```go
stringData { countData { multipliedData { finalData {
idx int idx int idx int idx int
data string count int value int result int
ts time.Time ts time.Time ts time.Time
} } } }
```

## Features

- Batch processing (size=3) in each pool
- Filtering capabilities (count > 2)
- Processing time tracking
- Independent worker counts per stage
- Built-in metrics collection
- Simulated processing delays

## Example Output

```
submitting: "alabama"
counted 'a' in "alabama" -> 4, duration: 123ms
multiplied: 4 -> 40 (src: "alabama", processing time: 234ms)
squared: 40 -> 1600 (src: "alabama", processing time: 345ms)

metrics:
counter: processed:11, errors:0, workers:2
multiplier: processed:6, errors:0, workers:4
squares: processed:6, errors:0, workers:4
```

## Usage

```go
res, err := ProcessStrings(context.Background(), []string{
"alabama", "california", "canada", "australia",
})
```

## Important Notes

- Pools must be declared before creation to allow cross-references
- Each stage can filter data (skip items)
- Send can be done directly from workers
- Close() propagates through the chain
- Single collector simplifies result gathering
- Batch size optimizes throughput
- Processing time tracked through pipeline

This simplified version demonstrates essential patterns for building concurrent processing pipelines while maintaining clean and efficient code structure.
2 changes: 1 addition & 1 deletion exmples/chain/go.mod → exmples/direct_chain/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module examples/chain
module examples/direct_chain

go 1.24.0

Expand Down
12 changes: 12 additions & 0 deletions exmples/direct_chain/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-pkgz/pool v0.5.0 h1:fP0WpEGMAcFEBQ7l7aAZsh7RBkzx34FVgufJoVvDTYY=
github.com/go-pkgz/pool v0.5.0/go.mod h1:e1qn5EYmXshPcOk2buL2ZC20w7RTAWUgbug+L2SyH7I=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading
Loading