Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
defaultBackoffStep = 10 * time.Second
// maximum duration to wait for the backoff
defaultBackoffMax = 120 * time.Second

defaultPPVisitChannelSize = 100
)

// DefaultProcessorStoragePath is the default path where processor state
Expand Down
30 changes: 22 additions & 8 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func newPartitionProcessor(partition int32,
joins: make(map[string]*PartitionTable),
input: make(chan *message, opts.partitionChannelSize),
inputTopics: topicList,
visitInput: make(chan *visit, 100),
visitInput: make(chan *visit, defaultPPVisitChannelSize),
visitCallbacks: visitCallbacks,
graph: graph,
stats: newPartitionProcStats(topicList, outputList),
Expand Down Expand Up @@ -693,12 +693,25 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
}

var wg sync.WaitGroup
// drain the channel and set all items to done we have added.
// Otherwise the caller will wait forever on the waitgroup
drainVisitInput := func() {

// drains the channel and drops out when closed.
// This is done when the processor shuts down during visit
// and makes sure the waitgroup is fully counted down.
drainUntilClose := func() {
for range pp.visitInput {
wg.Done()
}
}

// drains the input channel until there are no more items.
// does not wait for close, because the channel stays open for the next visit
drainUntilEmpty := func() {
for {
select {
case <-pp.visitInput:
case _, ok := <-pp.visitInput:
if !ok {
return
}
wg.Done()
default:
return
Expand All @@ -717,11 +730,11 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
wg.Add(1)
select {
case <-stopping:
drainVisitInput()
drainUntilClose()
wg.Done()
return ErrVisitAborted
case <-ctx.Done():
drainVisitInput()
drainUntilEmpty()
wg.Done()
return ctx.Err()
// enqueue the visit
Expand All @@ -747,9 +760,10 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
}()
select {
case <-stopping:
drainVisitInput()
drainUntilClose()
return ErrVisitAborted
case <-ctx.Done():
drainUntilEmpty()
return ctx.Err()
case <-wgDone:
}
Expand Down
108 changes: 99 additions & 9 deletions systemtest/processor_visit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/stretchr/testify/require"
)

// size of the channel used for the visitor, defined in goka/config.go
var (
visitChannelSize = 100
numPartitions = 10
)

// TestProcessorVisit tests the visiting functionality.
func TestProcessorVisit(t *testing.T) {
brokers := initSystemTest(t)
Expand All @@ -34,7 +40,7 @@ func TestProcessorVisit(t *testing.T) {
}

createEmitter := func(topic goka.Stream) (*goka.Emitter, func()) {
err = tm.EnsureStreamExists(string(topic), 10)
err = tm.EnsureStreamExists(string(topic), numPartitions)
require.NoError(t, err)

em, err := goka.NewEmitter(brokers, topic, new(codec.Int64),
Expand Down Expand Up @@ -90,7 +96,7 @@ func TestProcessorVisit(t *testing.T) {

pollTimed(t, "recovered", proc.Recovered)

em.EmitSync("value1", int64(1))
_ = em.EmitSync("value1", int64(1))

pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get("value1")
Expand All @@ -114,7 +120,7 @@ func TestProcessorVisit(t *testing.T) {

pollTimed(t, "recovered", proc.Recovered)

em.EmitSync("value1", int64(1))
_ = em.EmitSync("value1", int64(1))

pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get("value1")
Expand All @@ -128,6 +134,90 @@ func TestProcessorVisit(t *testing.T) {
require.Error(t, <-done)
})

// Tests if a panic occurs while visiting, while the iterator is still pushing
// messages into the partition processor's visit-channel.
// Regression test for https://github.com/lovoo/goka/issues/433
t.Run("visit-panic-slow", func(t *testing.T) {
group, input := nextTopics()
em, finish := createEmitter(input)
defer finish()
proc, cancel, done := runProc(createProc(group, input, 500*time.Millisecond))

pollTimed(t, "recovered", proc.Recovered)

// create twice as many items in the table as the visit-channel's size.
// This way we can make sure that the visitor will have to block on
// pushing it to the partition-processor visitInputChannel.
numMsgs := visitChannelSize * numPartitions * 2
for i := 0; i < numMsgs; i++ {
_, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1))
}

// wait for all messages to have propagated
pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get(fmt.Sprintf("value-%d", numMsgs-1))
return val1 != nil && val1.(int64) == 1
})

// pass wrong type to visitor -> which will be passed to the visit --> will panic
require.Error(t, proc.VisitAll(context.Background(), "visitor", "asdf"))

// no need to cancel, the visitAll will kill the processor.
_ = cancel
require.Error(t, <-done)
})

// Verifies a visit is gracefully shutdown when the processor is canceled while
// the visit is running.
t.Run("visit-shutdown-slow", func(t *testing.T) {
group, input := nextTopics()
em, finish := createEmitter(input)
defer finish()
proc, cancel, done := runProc(createProc(group, input, 1*time.Second))

pollTimed(t, "recovered", proc.Recovered)

// create twice as many items in the table as the visit-channel's size.
// This way we can make sure that the visitor will have to block on
// pushing it to the partition-processor visitInputChannel.
numMsgs := visitChannelSize * numPartitions * 2
for i := 0; i < numMsgs; i++ {
_, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1))
}

// wait for all messages to have propagated
pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get(fmt.Sprintf("value-%d", numMsgs-1))
return val1 != nil && val1.(int64) == 1
})

visitCtx, visitCancel := context.WithCancel(context.Background())
defer visitCancel()

var (
visitErr error
visitDone = make(chan struct{})
)

// start the visitor
go func() {
defer close(visitDone)
visitErr = proc.VisitAll(visitCtx, "visitor", int64(25))
}()

// wait half of what the processor takes for message to process, so we can stop it in the middle
time.Sleep(500 * time.Millisecond)
// stop the visit
visitCancel()

// wait for visiting done
<-visitDone
require.ErrorContains(t, visitErr, "canceled")

cancel()
require.NoError(t, <-done)
})

t.Run("visit-shutdown", func(t *testing.T) {
group, input := nextTopics()
em, finish := createEmitter(input)
Expand All @@ -138,8 +228,8 @@ func TestProcessorVisit(t *testing.T) {

// emit two values where goka.DefaultHasher says they're in the same partition.
// We need to achieve this to test that a shutdown will visit one value but not the other
em.EmitSync("0", int64(1))
em.EmitSync("02", int64(1))
_ = em.EmitSync("0", int64(1))
_ = em.EmitSync("02", int64(1))

pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get("02")
Expand Down Expand Up @@ -196,7 +286,7 @@ func TestProcessorVisit(t *testing.T) {
defer emFinish()
// create the group table manually, otherwise the proc and the view are racing

tm.EnsureTableExists(string(goka.GroupTable(group)), 10)
_ = tm.EnsureTableExists(string(goka.GroupTable(group)), 10)
// scenario: sleep in visit, processor shuts down--> visit should cancel too
proc, cancel, done := runProc(createProc(group, input, 500*time.Millisecond))
view, viewCancel, viewDone := runView(createView(group))
Expand All @@ -207,7 +297,7 @@ func TestProcessorVisit(t *testing.T) {
// emit two values where goka.DefaultHasher says they're in the same partition.
// We need to achieve this to test that a shutdown will visit one value but not the other
for i := 0; i < 100; i++ {
em.Emit(fmt.Sprintf("value-%d", i), int64(1))
_, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1))
}
// emFinish()

Expand Down Expand Up @@ -251,7 +341,7 @@ func TestProcessorVisit(t *testing.T) {
em, finish := createEmitter(input)
defer finish()
// create the group table manually, otherwise the proc and the view are racing
tm.EnsureTableExists(string(goka.GroupTable(group)), 10)
_ = tm.EnsureTableExists(string(goka.GroupTable(group)), 10)
// scenario: sleep in visit, processor shuts down--> visit should cancel too
proc1, cancel1, done1 := runProc(createProc(group, input, 500*time.Millisecond))

Expand All @@ -260,7 +350,7 @@ func TestProcessorVisit(t *testing.T) {
// emit two values where goka.DefaultHasher says they're in the same partition.
// We need to achieve this to test that a shutdown will visit one value but not the other
for i := 0; i < 100; i++ {
em.Emit(fmt.Sprintf("value-%d", i), int64(1))
_, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1))
}

// poll until all values are there
Expand Down