Skip to content

Commit 30c6a0d

Browse files
committed
issue #369 auto-commit test
1 parent 0f272ce commit 30c6a0d

1 file changed

Lines changed: 156 additions & 0 deletions

File tree

systemtest/commit_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package systemtest
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/Shopify/sarama"
10+
"github.com/lovoo/goka"
11+
"github.com/lovoo/goka/codec"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
// TestAutoCommit tests/demonstrates the behavior of disabling the auto-commit functionality.
16+
// The autocommiter sends the offsets of the marked messages to the broker regularily. If the processor shuts down
17+
// (or the group rebalances), the offsets are sent one last time, so just turning it of is not enough.
18+
// To get a processor to not commit any offsets, we're using a fault-injecting proxy
19+
// and cut the connections before shutdown, so the last-commit is failing.
20+
func TestAutoCommit(t *testing.T) {
21+
t.Parallel()
22+
var (
23+
group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-commit-test", time.Now().Unix()))
24+
inputStream = goka.Stream(group) + "-input"
25+
brokers = initSystemTest(t)
26+
)
27+
28+
// we'll use the proxy for cutting the connection before the final commit.
29+
fi := NewFIProxy()
30+
31+
cfg := goka.DefaultConfig()
32+
// make sure we consume everything
33+
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
34+
// disable auto-commit
35+
cfg.Consumer.Offsets.AutoCommit.Enable = false
36+
37+
// use the fault-injecting proxy
38+
cfg.Net.Proxy.Enable = true
39+
cfg.Net.Proxy.Dialer = fi
40+
41+
goka.ReplaceGlobalConfig(cfg)
42+
43+
defer func() {
44+
goka.ReplaceGlobalConfig(goka.DefaultConfig())
45+
}()
46+
47+
var offsets []int64
48+
49+
em, err := goka.NewEmitter(brokers, inputStream, new(codec.Int64))
50+
require.NoError(t, err)
51+
for i := 0; i < 10; i++ {
52+
require.NoError(t, em.EmitSync("key", int64(i)))
53+
}
54+
55+
require.NoError(t, em.Finish())
56+
57+
createProc := func() *goka.Processor {
58+
proc, err := goka.NewProcessor(brokers, goka.DefineGroup(group,
59+
goka.Input(inputStream, new(codec.Int64), func(ctx goka.Context, msg interface{}) {
60+
val := msg.(int64)
61+
62+
// append offset
63+
offsets = append(offsets, val)
64+
}),
65+
))
66+
67+
require.NoError(t, err)
68+
return proc
69+
}
70+
71+
// run the first processor
72+
_, cancel, done := runProc(createProc())
73+
pollTimed(t, "all-received1", 10, func() bool {
74+
return len(offsets) == 10 && offsets[0] == 0
75+
})
76+
77+
// make all connections fail
78+
fi.SetWriteError(errors.New("cutting connecting"))
79+
80+
// cancel processor
81+
cancel()
82+
<-done
83+
84+
// reset errors, reset offsets and restart processor
85+
fi.ResetErrors()
86+
offsets = nil
87+
_, cancel, done = runProc(createProc())
88+
89+
// --> we'll receive all messages again
90+
// --> i.e., no offsets were committed
91+
pollTimed(t, "all-received2", 10, func() bool {
92+
return len(offsets) == 10 && offsets[0] == 0
93+
})
94+
95+
cancel()
96+
<-done
97+
}
98+
99+
// Test a failing processor does not mark the message.
100+
// Two messages (1, 2) are emitted, after consuming (2), it crashes.
101+
// Starting it a second time will reconsume it.
102+
func TestUnmarkedMessages(t *testing.T) {
103+
t.Parallel()
104+
var (
105+
group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-mark-test", time.Now().Unix()))
106+
inputStream = goka.Stream(group) + "-input"
107+
brokers = initSystemTest(t)
108+
)
109+
110+
// make sure we consume everything
111+
cfg := goka.DefaultConfig()
112+
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
113+
goka.ReplaceGlobalConfig(cfg)
114+
115+
var values []int64
116+
117+
// emit exactly one message
118+
em, err := goka.NewEmitter(brokers, inputStream, new(codec.Int64))
119+
require.NoError(t, err)
120+
require.NoError(t, em.EmitSync("key", int64(1)))
121+
require.NoError(t, em.EmitSync("key", int64(2)))
122+
require.NoError(t, em.Finish())
123+
124+
createProc := func() *goka.Processor {
125+
proc, err := goka.NewProcessor(brokers, goka.DefineGroup(group,
126+
goka.Input(inputStream, new(codec.Int64), func(ctx goka.Context, msg interface{}) {
127+
val := msg.(int64)
128+
values = append(values, val)
129+
130+
// the only way to not commit a message is to fail the processor.
131+
// We'll fail after the second message
132+
if val == 2 {
133+
ctx.Fail(errors.New("test"))
134+
}
135+
}),
136+
))
137+
138+
require.NoError(t, err)
139+
return proc
140+
}
141+
142+
// run the first processor
143+
runProc(createProc())
144+
pollTimed(t, "all-received1", 10, func() bool {
145+
return len(values) == 2 && values[0] == 1
146+
})
147+
148+
// reset values
149+
values = nil
150+
151+
// restart -> we'll only receive the second message
152+
runProc(createProc())
153+
pollTimed(t, "all-received2", 10, func() bool {
154+
return len(values) == 1 && values[0] == 2
155+
})
156+
}

0 commit comments

Comments
 (0)