Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ jobs:
- name: Install packages
run: go mod tidy
- name: Run Test
run: make test
run: make test
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ test:
test-coverage: test
go tool cover -html=coverage.out

test-e2e:
go test ./test/e2e -tags=integration -count=1

generate-proto: ## regenerate protos
@echo " > cloning protobuf from odpf/proton"
@echo " > generating protobuf"
Expand Down
6 changes: 6 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.SinkRecipe, stream *str
return err
}, defaultBatchSize)

stream.onClose(func() {
if err = sink.Close(); err != nil {
r.logger.Warn("error closing sink", "sink", sr.Name, "error", err)
}
})

return
}

Expand Down
51 changes: 30 additions & 21 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@ package agent_test
import (
"context"
"errors"
"github.com/odpf/meteor/test"
"testing"
"time"

"github.com/odpf/meteor/agent"
"github.com/odpf/meteor/models"
"github.com/odpf/meteor/models/odpf/assets"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/recipe"
"github.com/odpf/meteor/registry"
"github.com/odpf/meteor/test/mocks"
"github.com/odpf/meteor/test/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"testing"
"time"
)

var mockCtx = mock.AnythingOfType("*context.emptyCtx")
Expand Down Expand Up @@ -43,7 +42,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: registry.NewExtractorFactory(),
ProcessorFactory: registry.NewProcessorFactory(),
SinkFactory: registry.NewSinkFactory(),
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.IsType(t, agent.Run{}, run)
Expand All @@ -67,7 +66,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: registry.NewExtractorFactory(),
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand All @@ -92,7 +91,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: registry.NewProcessorFactory(),
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand All @@ -119,7 +118,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: registry.NewSinkFactory(),
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -150,7 +149,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -183,7 +182,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -218,7 +217,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand All @@ -243,6 +242,7 @@ func TestRunnerRun(t *testing.T) {

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -253,7 +253,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand All @@ -277,6 +277,7 @@ func TestRunnerRun(t *testing.T) {

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -287,7 +288,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -318,6 +319,7 @@ func TestRunnerRun(t *testing.T) {

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -328,7 +330,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -358,6 +360,7 @@ func TestRunnerRun(t *testing.T) {

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -368,7 +371,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.Error(t, run.Error)
Expand Down Expand Up @@ -400,6 +403,7 @@ func TestRunnerRun(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(errors.New("some error"))
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -410,7 +414,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
Expand Down Expand Up @@ -442,6 +446,7 @@ func TestRunnerRun(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(errors.New("some error"))
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -452,7 +457,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
StopOnSinkError: true,
})
run := r.Run(validRecipe)
Expand Down Expand Up @@ -485,6 +490,7 @@ func TestRunnerRun(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -495,7 +501,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
Expand Down Expand Up @@ -528,6 +534,7 @@ func TestRunnerRun(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -544,7 +551,7 @@ func TestRunnerRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Monitor: monitor,
Logger: test.Logger,
Logger: utils.Logger,
})
run := r.Run(validRecipe)
assert.NoError(t, run.Error)
Expand Down Expand Up @@ -579,6 +586,7 @@ func TestRunnerRun(t *testing.T) {
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(plugins.NewRetryError(err)).Once()
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -589,7 +597,7 @@ func TestRunnerRun(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
})
Expand Down Expand Up @@ -628,6 +636,7 @@ func TestRunnerRunMultiple(t *testing.T) {
sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil)
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
Expand All @@ -638,7 +647,7 @@ func TestRunnerRunMultiple(t *testing.T) {
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: test.Logger,
Logger: utils.Logger,
})
runs := r.RunMultiple(recipeList)

Expand Down
14 changes: 13 additions & 1 deletion agent/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type subscriber struct {
type stream struct {
middlewares []streamMiddleware
subscribers []*subscriber
onCloses []func()
closed bool
err error
}
Expand All @@ -38,6 +39,13 @@ func (s *stream) subscribe(callback func(batchedData []models.Record) error, bat
return s
}

// onClose() is used to register callback for after stream is closed.
func (s *stream) onClose(callback func()) *stream {
s.onCloses = append(s.onCloses, callback)

return s
}

// broadcast() will start listening to emitter for any pushed data.
// This process is blocking, so most times you would want to call this inside a goroutine.
func (s *stream) broadcast() error {
Expand All @@ -55,7 +63,7 @@ func (s *stream) broadcast() error {
batch := newBatch(l.batchSize)
// listen to channel and emit data to subscriber callback if batch is full
for d := range l.channel {
if err := batch.add(d); err != nil {
if err := batch.add(d); err != nil {
s.closeWithError(err)
}
if batch.isFull() {
Expand Down Expand Up @@ -116,6 +124,10 @@ func (s *stream) Close() {
close(l.channel)
}
s.closed = true

for _, onClose := range s.onCloses {
onClose()
}
}

func (s *stream) runMiddlewares(d models.Record) (res models.Record, err error) {
Expand Down
6 changes: 3 additions & 3 deletions plugins/extractors/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ package bigquery_test

import (
"context"
"github.com/odpf/meteor/test/utils"
"testing"

"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/plugins/extractors/bigquery"
"github.com/odpf/meteor/test"
"github.com/stretchr/testify/assert"
)

func TestInit(t *testing.T) {
t.Run("should return error if config is invalid", func(t *testing.T) {
extr := bigquery.New(test.Logger)
extr := bigquery.New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := extr.Init(ctx, map[string]interface{}{
Expand All @@ -24,7 +24,7 @@ func TestInit(t *testing.T) {
assert.Equal(t, plugins.InvalidConfigError{}, err)
})
t.Run("should not return invalid config error if config is valid", func(t *testing.T) {
extr := bigquery.New(test.Logger)
extr := bigquery.New(utils.Logger)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := extr.Init(ctx, map[string]interface{}{
Expand Down
Loading