@@ -5,7 +5,7 @@ package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperh
55
66import (
77 "context"
8- "errors "
8+ "sync "
99 "time"
1010
1111 "go.uber.org/multierr"
@@ -63,12 +63,10 @@ type controller struct {
6363
6464 tickerCh <- chan time.Time
6565
66- initialized bool
67- done chan struct {}
68- terminated chan struct {}
66+ done chan struct {}
67+ wg sync.WaitGroup
6968
70- obsrecv * receiverhelper.ObsReport
71- recvSettings receiver.Settings
69+ obsrecv * receiverhelper.ObsReport
7270}
7371
7472// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers.
@@ -78,10 +76,6 @@ func NewScraperControllerReceiver(
7876 nextConsumer consumer.Metrics ,
7977 options ... ScraperControllerOption ,
8078) (component.Component , error ) {
81- if cfg .CollectionInterval <= 0 {
82- return nil , errors .New ("collection_interval must be a positive duration" )
83- }
84-
8579 obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {
8680 ReceiverID : set .ID ,
8781 Transport : "" ,
@@ -99,9 +93,7 @@ func NewScraperControllerReceiver(
9993 timeout : cfg .Timeout ,
10094 nextConsumer : nextConsumer ,
10195 done : make (chan struct {}),
102- terminated : make (chan struct {}),
10396 obsrecv : obsrecv ,
104- recvSettings : set ,
10597 }
10698
10799 for _ , op := range options {
@@ -110,14 +102,11 @@ func NewScraperControllerReceiver(
110102
111103 sc .obsScrapers = make ([]* obsReport , len (sc .scrapers ))
112104 for i , scraper := range sc .scrapers {
113- scrp , err : = newScraper (obsReportSettings {
105+ sc . obsScrapers [ i ] , err = newScraper (obsReportSettings {
114106 ReceiverID : sc .id ,
115107 Scraper : scraper .ID (),
116- ReceiverCreateSettings : sc . recvSettings ,
108+ ReceiverCreateSettings : set ,
117109 })
118-
119- sc .obsScrapers [i ] = scrp
120-
121110 if err != nil {
122111 return nil , err
123112 }
@@ -134,20 +123,15 @@ func (sc *controller) Start(ctx context.Context, host component.Host) error {
134123 }
135124 }
136125
137- sc .initialized = true
138126 sc .startScraping ()
139127 return nil
140128}
141129
142130// Shutdown the receiver, invoked during service shutdown.
143131func (sc * controller ) Shutdown (ctx context.Context ) error {
144- sc .stopScraping ()
145-
146- // wait until scraping ticker has terminated
147- if sc .initialized {
148- <- sc .terminated
149- }
150-
132+ // Signal the goroutine to stop.
133+ close (sc .done )
134+ sc .wg .Wait ()
151135 var errs error
152136 for _ , scraper := range sc .scrapers {
153137 errs = multierr .Append (errs , scraper .Shutdown (ctx ))
@@ -159,12 +143,13 @@ func (sc *controller) Shutdown(ctx context.Context) error {
159143// startScraping initiates a ticker that calls Scrape based on the configured
160144// collection interval.
161145func (sc * controller ) startScraping () {
146+ sc .wg .Add (1 )
162147 go func () {
148+ defer sc .wg .Done ()
163149 if sc .initialDelay > 0 {
164150 select {
165151 case <- time .After (sc .initialDelay ):
166152 case <- sc .done :
167- sc .terminated <- struct {}{}
168153 return
169154 }
170155 }
@@ -184,7 +169,6 @@ func (sc *controller) startScraping() {
184169 case <- sc .tickerCh :
185170 sc .scrapeMetricsAndReport ()
186171 case <- sc .done :
187- sc .terminated <- struct {}{}
188172 return
189173 }
190174 }
@@ -222,11 +206,6 @@ func (sc *controller) scrapeMetricsAndReport() {
222206 sc .obsrecv .EndMetricsOp (ctx , "" , dataPointCount , err )
223207}
224208
225- // stopScraping stops the ticker
226- func (sc * controller ) stopScraping () {
227- close (sc .done )
228- }
229-
230209// withScrapeContext will return a context that has no deadline if timeout is 0
231210// which implies no explicit timeout had occurred, otherwise, a context
232211// with a deadline of the provided timeout is returned.
0 commit comments