@@ -2,10 +2,10 @@ package agent
22
33import (
44 "context"
5- "fmt"
65 "sync"
76 "time"
87
8+ "github.com/odpf/meteor/models"
99 "github.com/odpf/meteor/plugins"
1010 "github.com/odpf/meteor/recipe"
1111 "github.com/odpf/meteor/registry"
@@ -94,67 +94,52 @@ func (r *Agent) RunMultiple(recipes []recipe.Recipe) []Run {
9494
9595// Run executes the specified recipe.
9696func (r * Agent ) Run (recipe recipe.Recipe ) (run Run ) {
97- r .logger .Info ("running recipe" , "recipe" , recipe .Name )
98- var wg sync.WaitGroup
97+ run .Recipe = recipe
98+ r .logger .Info ("running recipe" , "recipe" , run .Recipe .Name )
99+
99100 var (
101+ ctx = context .Background ()
100102 getDuration = r .startDuration ()
101- channel = make ( chan interface {} )
103+ stream = newStream ( )
102104 )
103- run .Recipe = recipe
104105
105- ctx , cancel := context .WithCancel (context .Background ())
106- defer cancel ()
106+ runExtractor , err := r .setupExtractor (ctx , recipe .Source , stream )
107+ if err != nil {
108+ run .Error = err
109+ return
110+ }
107111
108- // run extractors
109- extrChannel := channel
110- go func () {
111- if err := r .runExtractor (ctx , recipe .Source , extrChannel ); err != nil {
112- run .Error = r .buildTaskError (TaskTypeExtract , recipe .Source .Type , err )
112+ for _ , pr := range recipe .Processors {
113+ if err := r .setupProcessor (ctx , pr , stream ); err != nil {
114+ run .Error = err
115+ return
113116 }
114-
115- close (extrChannel )
116- }()
117-
118- // run processors
119- for _ , processorRecipe := range recipe .Processors {
120- inChannel := channel
121- outChannel := make (chan interface {})
122-
123- // need to store the recipe since it would be needed inside a goroutine
124- // not storing it inside the loop scope would cause
125- // processorRecipe to always be the last recipe in the loop
126- tempRecipe := processorRecipe
127- go func () {
128- if err := r .runProcessor (ctx , tempRecipe , inChannel , outChannel ); err != nil {
129- run .Error = r .buildTaskError (TaskTypeProcess , tempRecipe .Name , err )
130- }
131-
132- close (outChannel )
133- }()
134-
135- // replace the channel with the new out channel
136- // this would allow the next processor or sink to
137- // receive the processed data instead of data directly from extractor
138- channel = outChannel
139117 }
140118
141- // run sinks
142- for _ , sinkRecipe := range recipe .Sinks {
143- // need to store the recipe since it would be needed inside a goroutine
144- // not storing it inside the loop scope would cause
145- // sinkRecipe to always be the last recipe in the loop
146- tempRecipe := sinkRecipe
147- wg .Add (1 )
148- go func () {
149- if err := r .runSink (ctx , tempRecipe , channel ); err != nil {
150- run .Error = r .buildTaskError (TaskTypeSink , tempRecipe .Name , err )
151- }
152- wg .Done ()
153- }()
119+ for _ , sr := range recipe .Sinks {
120+ if err := r .setupSink (ctx , sr , stream ); err != nil {
121+ run .Error = err
122+ return
123+ }
154124 }
155125
156- wg .Wait ()
126+ // create a goroutine to let extractor concurrently emit data
127+ // while stream is listening via stream.Listen().
128+ go func () {
129+ err = runExtractor ()
130+ if err != nil {
131+ run .Error = err
132+ }
133+ stream .Close ()
134+ }()
135+
136+ // start listening.
137+ // this process is blocking
138+ if err := stream .broadcast (); err != nil {
139+ run .Error = err
140+ }
157141
142+ // code will reach here stream.Listen() is done.
158143 success := run .Error == nil
159144 durationInMs := getDuration ()
160145 r .monitor .RecordRun (recipe , durationInMs , success )
@@ -168,52 +153,79 @@ func (r *Agent) Run(recipe recipe.Recipe) (run Run) {
168153 return
169154}
170155
171- // runExtractor runs an extractor.
172- func (r * Agent ) runExtractor (ctx context.Context , sourceRecipe recipe.SourceRecipe , in chan <- interface {}) (err error ) {
173- extractor , err := r .extractorFactory .Get (sourceRecipe .Type )
156+ func (r * Agent ) setupExtractor (ctx context.Context , sr recipe.SourceRecipe , str * stream ) (runFn func () error , err error ) {
157+ extractor , err := r .extractorFactory .Get (sr .Type )
174158 if err != nil {
159+ err = errors .Wrapf (err , "could not find extractor \" %s\" " , sr .Type )
175160 return
176161 }
177- if err = extractor .Extract (ctx , sourceRecipe .Config , in ); err != nil {
162+ err = extractor .Init (ctx , sr .Config )
163+ if err != nil {
164+ err = errors .Wrapf (err , "could not initiate extractor \" %s\" " , sr .Type )
178165 return
179166 }
180167
168+ runFn = func () (err error ) {
169+ err = extractor .Extract (ctx , str .push )
170+ if err != nil {
171+ err = errors .Wrapf (err , "error running extractor \" %s\" " , sr .Type )
172+ }
173+
174+ return
175+ }
181176 return
182177}
183178
184- // runProcessor runs a processor.
185- func ( r * Agent ) runProcessor ( ctx context. Context , processorRecipe recipe. ProcessorRecipe , in <- chan interface {}, out chan <- interface {}) ( err error ) {
186- processor , err : = r .processorFactory .Get (processorRecipe .Name )
179+ func ( r * Agent ) setupProcessor ( ctx context. Context , pr recipe. ProcessorRecipe , str * stream ) ( err error ) {
180+ var proc plugins. Processor
181+ proc , err = r .processorFactory .Get (pr .Name )
187182 if err != nil {
183+ err = errors .Wrapf (err , "could not find processor \" %s\" " , pr .Name )
188184 return
189185 }
190- if err = processor .Process (ctx , processorRecipe .Config , in , out ); err != nil {
186+ err = proc .Init (ctx , pr .Config )
187+ if err != nil {
188+ err = errors .Wrapf (err , "could not initiate processor \" %s\" " , pr .Name )
191189 return
192190 }
193191
192+ str .setMiddleware (func (src models.Record ) (dst models.Record , err error ) {
193+ dst , err = proc .Process (ctx , src )
194+ if err != nil {
195+ err = errors .Wrapf (err , "error running processor \" %s\" " , pr .Name )
196+ return
197+ }
198+
199+ return
200+ })
201+
194202 return
195203}
196204
197- // runSink runs a sink.
198- func ( r * Agent ) runSink ( ctx context. Context , sinkRecipe recipe. SinkRecipe , in <- chan interface {}) ( err error ) {
199- sink , err : = r .sinkFactory .Get (sinkRecipe .Name )
205+ func ( r * Agent ) setupSink ( ctx context. Context , sr recipe. SinkRecipe , stream * stream ) ( err error ) {
206+ var sink plugins. Syncer
207+ sink , err = r .sinkFactory .Get (sr .Name )
200208 if err != nil {
209+ err = errors .Wrapf (err , "could not find sink \" %s\" " , sr .Name )
201210 return
202211 }
203- if err = sink .Sink (ctx , sinkRecipe .Config , in ); err != nil {
212+ err = sink .Init (ctx , sr .Config )
213+ if err != nil {
214+ err = errors .Wrapf (err , "could not initiate sink \" %s\" " , sr .Name )
204215 return
205216 }
206217
207- return
208- }
218+ stream .subscribe (func (records []models.Record ) (err error ) {
219+ err = sink .Sink (ctx , records )
220+ if err != nil {
221+ err = errors .Wrapf (err , "error running sink \" %s\" " , sr .Name )
222+ return
223+ }
209224
210- // buildTaskError builds a task error.
211- func (r * Agent ) buildTaskError (taskType TaskType , name string , err error ) error {
212- return fmt .Errorf (
213- "error running %s task \" %s\" : %s" ,
214- taskType ,
215- name ,
216- err )
225+ return
226+ }, 0 )
227+
228+ return
217229}
218230
219231// startDuration starts a timer.
0 commit comments