@@ -9,25 +9,26 @@ import (
99 "bytes"
1010 "context"
1111 "crypto/tls"
12- "fmt"
1312 "io"
1413 "net/http"
14+ "sync/atomic"
1515 "time"
1616
1717 "github.com/cenkalti/backoff/v4"
18+ "github.com/elastic/go-docappender"
1819 elasticsearch7 "github.com/elastic/go-elasticsearch/v7"
19- esutil7 "github.com/elastic/go-elasticsearch/v7/esutil"
2020 "go.uber.org/zap"
21+ "golang.org/x/sync/errgroup"
2122
2223 "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
2324)
2425
2526type esClientCurrent = elasticsearch7.Client
2627type esConfigCurrent = elasticsearch7.Config
27- type esBulkIndexerCurrent = esutil7.BulkIndexer
2828
29- type esBulkIndexerItem = esutil7.BulkIndexerItem
30- type esBulkIndexerResponseItem = esutil7.BulkIndexerResponseItem
29+ type esBulkIndexerCurrent = BulkIndexerPool
30+
31+ type esBulkIndexerItem = docappender.BulkIndexerItem
3132
3233// clientLogger implements the estransport.Logger interface
3334// that is required by the Elasticsearch client for logging.
@@ -136,22 +137,6 @@ func newTransport(config *Config, tlsCfg *tls.Config) *http.Transport {
136137 return transport
137138}
138139
139- func newBulkIndexer (logger * zap.Logger , client * elasticsearch7.Client , config * Config ) (esBulkIndexerCurrent , error ) {
140- // TODO: add debug logger
141- return esutil7 .NewBulkIndexer (esutil7.BulkIndexerConfig {
142- NumWorkers : config .NumWorkers ,
143- FlushBytes : config .Flush .Bytes ,
144- FlushInterval : config .Flush .Interval ,
145- Client : client ,
146- Pipeline : config .Pipeline ,
147- Timeout : config .Timeout ,
148-
149- OnError : func (_ context.Context , err error ) {
150- logger .Error (fmt .Sprintf ("Bulk indexer error: %v" , err ))
151- },
152- })
153- }
154-
155140func createElasticsearchBackoffFunc (config * RetrySettings ) func (int ) time.Duration {
156141 if ! config .Enabled {
157142 return nil
@@ -175,52 +160,138 @@ func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Durati
175160 }
176161}
177162
178- func shouldRetryEvent (status int ) bool {
179- for _ , retryable := range retryOnStatus {
180- if status == retryable {
181- return true
163+ func pushDocuments (ctx context.Context , index string , document []byte , bulkIndexer * esBulkIndexerCurrent ) error {
164+ return bulkIndexer .Add (ctx , index , bytes .NewReader (document ))
165+ }
166+
167+ func newBulkIndexer (logger * zap.Logger , client * elasticsearch7.Client , config * Config ) (* esBulkIndexerCurrent , error ) {
168+ numWorkers := config .NumWorkers
169+ if numWorkers == 0 {
170+ numWorkers = 1
171+ }
172+
173+ flushInterval := config .Flush .Interval
174+ if flushInterval == 0 {
175+ flushInterval = 30 * time .Second
176+ }
177+
178+ flushBytes := config .Flush .Bytes
179+ if flushBytes == 0 {
180+ flushBytes = 5e+6
181+ }
182+
183+ var maxDocRetry int
184+ if config .Retry .Enabled {
185+ // max_requests includes initial attempt
186+ // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32344
187+ maxDocRetry = config .Retry .MaxRequests - 1
188+ }
189+ group := & errgroup.Group {}
190+ items := make (chan esBulkIndexerItem , config .NumWorkers )
191+ stats := bulkIndexerStats {}
192+
193+ for i := 0 ; i < numWorkers ; i ++ {
194+ w := worker {
195+ indexer : docappender .NewBulkIndexer (client , 0 , maxDocRetry ),
196+ items : items ,
197+ flushInterval : flushInterval ,
198+ flushTimeout : config .Timeout ,
199+ flushBytes : flushBytes ,
200+ logger : logger ,
201+ stats : & stats ,
182202 }
203+ group .Go (w .run )
183204 }
184- return false
205+ return & BulkIndexerPool {
206+ items : items ,
207+ errgroup : group ,
208+ stats : & stats ,
209+ }, nil
210+ }
211+
212+ type bulkIndexerStats struct {
213+ docsIndexed atomic.Int64
214+ }
215+
216+ type BulkIndexerPool struct {
217+ items chan esBulkIndexerItem
218+ errgroup * errgroup.Group
219+ stats * bulkIndexerStats
185220}
186221
187- func pushDocuments (ctx context.Context , logger * zap.Logger , index string , document []byte , bulkIndexer esBulkIndexerCurrent , maxAttempts int ) error {
188- attempts := 1
189- body := bytes .NewReader (document )
190- item := esBulkIndexerItem {Action : createAction , Index : index , Body : body }
191- // Setup error handler. The handler handles the per item response status based on the
192- // selective ACKing in the bulk response.
193- item .OnFailure = func (ctx context.Context , item esBulkIndexerItem , resp esBulkIndexerResponseItem , err error ) {
194- switch {
195- case attempts < maxAttempts && shouldRetryEvent (resp .Status ):
196- logger .Debug ("Retrying to index" ,
197- zap .String ("name" , index ),
198- zap .Int ("attempt" , attempts ),
199- zap .Int ("status" , resp .Status ),
200- zap .NamedError ("reason" , err ))
201-
202- attempts ++
203- _ , _ = body .Seek (0 , io .SeekStart )
204- _ = bulkIndexer .Add (ctx , item )
205-
206- case resp .Status == 0 && err != nil :
207- // Encoding error. We didn't even attempt to send the event
208- logger .Error ("Drop docs: failed to add docs to the bulk request buffer." ,
209- zap .NamedError ("reason" , err ))
210-
211- case err != nil :
212- logger .Error ("Drop docs: failed to index" ,
213- zap .String ("name" , index ),
214- zap .Int ("attempt" , attempts ),
215- zap .Int ("status" , resp .Status ),
216- zap .NamedError ("reason" , err ))
217-
218- default :
219- logger .Error (fmt .Sprintf ("Drop docs: failed to index: %#v" , resp .Error ),
220- zap .Int ("attempt" , attempts ),
221- zap .Int ("status" , resp .Status ))
222+ func (p * BulkIndexerPool ) Add (ctx context.Context , index string , document io.WriterTo ) error {
223+ item := esBulkIndexerItem {
224+ Index : index ,
225+ Body : document ,
226+ }
227+ select {
228+ case <- ctx .Done ():
229+ return ctx .Err ()
230+ case p .items <- item :
231+ return nil
232+ }
233+ }
234+
235+ func (p * BulkIndexerPool ) Close (ctx context.Context ) error {
236+ close (p .items )
237+ doneCh := make (chan struct {})
238+ go func () {
239+ p .errgroup .Wait ()
240+ close (doneCh )
241+ }()
242+ select {
243+ case <- ctx .Done ():
244+ return ctx .Err ()
245+ case <- doneCh :
246+ return nil
247+ }
248+ }
249+
250+ type worker struct {
251+ indexer * docappender.BulkIndexer
252+ items chan esBulkIndexerItem
253+ flushInterval time.Duration
254+ flushTimeout time.Duration
255+ flushBytes int
256+
257+ stats * bulkIndexerStats
258+
259+ logger * zap.Logger
260+ }
261+
262+ func (w * worker ) run () error {
263+ flushTick := time .NewTicker (w .flushInterval )
264+ for {
265+ select {
266+ case item := <- w .items :
267+ // check if BulkIndexer is closing
268+ zero := esBulkIndexerItem {}
269+ if item == zero {
270+ w .flush ()
271+ return nil
272+ }
273+
274+ w .indexer .Add (item )
275+ // w.indexer.Len() can be either compressed or uncompressed bytes
276+ if w .indexer .Len () >= w .flushBytes {
277+ w .flush ()
278+ flushTick .Reset (w .flushInterval )
279+ }
280+ case <- flushTick .C :
281+ // bulk indexer needs to be flushed every flush interval because
282+ // there may be pending bytes in bulk indexer buffer due to e.g. document level 429
283+ w .flush ()
222284 }
223285 }
286+ }
224287
225- return bulkIndexer .Add (ctx , item )
288+ func (w * worker ) flush () error {
289+ ctx , cancel := context .WithTimeout (context .Background (), w .flushTimeout )
290+ defer cancel ()
291+ stat , err := w .indexer .Flush (ctx )
292+ w .stats .docsIndexed .Add (stat .Indexed )
293+ if err != nil {
294+ w .logger .Error ("bulk indexer flush error" , zap .Error (err ))
295+ }
296+ return err
226297}
0 commit comments