Skip to content

Commit d9749d9

Browse files
committed
Make partial error generic and move it to the consumer package
Signed-off-by: Pavol Loffay <[email protected]>
1 parent a4a3665 commit d9749d9

File tree

6 files changed

+23
-16
lines changed

6 files changed

+23
-16
lines changed

processor/queuedprocessor/partial_error.go renamed to consumer/consumererror/partialerror.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,27 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package queuedprocessor
15+
package consumererror
1616

1717
import "go.opentelemetry.io/collector/consumer/pdata"
1818

19-
type partialRetryError struct {
19+
// PartialError can be used to signalize that a subset of received data failed to be processed or send.
20+
// The preceding components in the pipeline can use this information for partial retries.
21+
type PartialError struct {
2022
error
21-
toRetry pdata.Traces
23+
failed pdata.Traces
2224
}
2325

24-
// NewPartialRetryError creates an error that encapsulates data that should be retried.
25-
// Use this error type only when a subset of received data set should bre retried.
26-
func NewPartialRetryError(err error, toRetry pdata.Traces) error {
27-
return partialRetryError{
28-
error: err,
29-
toRetry: toRetry,
26+
// PartialTracesError creates PartialError for failed traces.
27+
// Use this error type only when a subset of received data set failed to be processed or sent.
28+
func PartialTracesError(err error, failed pdata.Traces) error {
29+
return PartialError{
30+
error: err,
31+
failed: failed,
3032
}
3133
}
34+
35+
// GetTraces returns failed traces.
36+
func (err PartialError) GetTraces() pdata.Traces {
37+
return err.failed
38+
}

processor/queuedprocessor/partial_error_test.go renamed to consumer/consumererror/partialerror_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package queuedprocessor
15+
package consumererror
1616

1717
import (
1818
"fmt"
@@ -26,7 +26,7 @@ import (
2626
func TestPartialError(t *testing.T) {
2727
td := testdata.GenerateTraceDataOneSpan()
2828
err := fmt.Errorf("some error")
29-
partialErr := NewPartialRetryError(err, td)
29+
partialErr := PartialTracesError(err, td)
3030
assert.Equal(t, err.Error(), partialErr.Error())
31-
assert.Equal(t, td, partialErr.(partialRetryError).toRetry)
31+
assert.Equal(t, td, partialErr.(PartialError).failed)
3232
}
File renamed without changes.

processor/queuedprocessor/queued_processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,8 @@ func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) {
193193
sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", allSpansCount))
194194
sp.onItemDropped(item, spanCountStats)
195195
} else {
196-
if partialErr, isPartial := err.(partialRetryError); isPartial {
197-
item.td = partialErr.toRetry
196+
if partialErr, isPartial := err.(consumererror.PartialError); isPartial {
197+
item.td = partialErr.GetTraces()
198198
spanCountStats = processor.NewSpanCountStats(item.td, sp.name)
199199
allSpansCount = spanCountStats.GetAllSpansCount()
200200
}

processor/queuedprocessor/queued_processor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestQueuedProcessor_noEnqueueOnPermanentError(t *testing.T) {
7373
}
7474

7575
func TestQueueProcessorPartialError(t *testing.T) {
76-
partialErr := partialRetryError{toRetry: testdata.GenerateTraceDataTwoSpansSameResource()}
76+
partialErr := consumererror.PartialTracesError(fmt.Errorf("some error"), testdata.GenerateTraceDataTwoSpansSameResource())
7777
td := testdata.GenerateTraceDataTwoSpansSameResource()
7878
c := &waitGroupTraceConsumer{
7979
consumeTraceDataError: partialErr,
@@ -93,7 +93,7 @@ func TestQueueProcessorPartialError(t *testing.T) {
9393
exporterData := c.getData()
9494
assert.Equal(t, 2, len(exporterData))
9595
assert.Equal(t, td, exporterData[0])
96-
assert.Equal(t, partialErr.toRetry, exporterData[1])
96+
assert.Equal(t, partialErr.(consumererror.PartialError).GetTraces(), exporterData[1])
9797
}
9898

9999
type waitGroupTraceConsumer struct {

0 commit comments

Comments
 (0)