Skip to content

Commit ee4e697

Browse files
OTel CompositeSpanExporter refactoring (#100)
* WIP * Fixed CompositeSpanExporter for OTel
1 parent 42d4a58 commit ee4e697

File tree

3 files changed

+237
-18
lines changed

3 files changed

+237
-18
lines changed

micrometer-tracing-bridges/micrometer-tracing-bridge-otel/src/main/java/io/micrometer/tracing/otel/bridge/CompositeSpanExporter.java

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@
1515
*/
1616
package io.micrometer.tracing.otel.bridge;
1717

18-
import java.util.Collection;
19-
import java.util.Collections;
20-
import java.util.List;
21-
import java.util.stream.Collectors;
22-
2318
import io.micrometer.tracing.exporter.FinishedSpan;
2419
import io.micrometer.tracing.exporter.SpanExportingPredicate;
2520
import io.micrometer.tracing.exporter.SpanFilter;
@@ -28,16 +23,22 @@
2823
import io.opentelemetry.sdk.trace.data.SpanData;
2924
import io.opentelemetry.sdk.trace.export.SpanExporter;
3025

26+
import java.util.ArrayList;
27+
import java.util.Collection;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.stream.Collectors;
31+
3132
/**
32-
* Wraps the {@link SpanExporter} delegate with additional predicate, reporting and
33-
* filtering logic.
33+
* Creates a {@link SpanExporter} with additional predicate, reporting and filtering
34+
* logic.
3435
*
3536
* @author Marcin Grzejszczak
3637
* @since 1.0.0
3738
*/
3839
public class CompositeSpanExporter implements io.opentelemetry.sdk.trace.export.SpanExporter {
3940

40-
private final io.opentelemetry.sdk.trace.export.SpanExporter delegate;
41+
private final Collection<io.opentelemetry.sdk.trace.export.SpanExporter> exporters;
4142

4243
private final List<SpanExportingPredicate> predicates;
4344

@@ -47,29 +48,42 @@ public class CompositeSpanExporter implements io.opentelemetry.sdk.trace.export.
4748

4849
/**
4950
* Creates a new instance of {@link CompositeSpanExporter}.
50-
* @param delegate a {@link SpanExporter} delegate
51+
* @param exporters {@link SpanExporter} exporters
5152
* @param predicates predicates that decide which spans should be exported
5253
* @param reporters reporters that export spans
5354
* @param spanFilters filters that mutate spans before reporting them
5455
*/
55-
public CompositeSpanExporter(SpanExporter delegate, List<SpanExportingPredicate> predicates,
56-
List<SpanReporter> reporters, List<SpanFilter> spanFilters) {
57-
this.delegate = delegate;
56+
public CompositeSpanExporter(Collection<io.opentelemetry.sdk.trace.export.SpanExporter> exporters,
57+
List<SpanExportingPredicate> predicates, List<SpanReporter> reporters, List<SpanFilter> spanFilters) {
58+
this.exporters = exporters;
5859
this.predicates = predicates == null ? Collections.emptyList() : predicates;
5960
this.reporters = reporters == null ? Collections.emptyList() : reporters;
6061
this.spanFilters = spanFilters == null ? Collections.emptyList() : spanFilters;
6162
}
6263

6364
@Override
6465
public CompletableResultCode export(Collection<SpanData> spans) {
65-
return this.delegate.export(spans.stream().filter(this::shouldProcess).map(spanData -> {
66+
List<SpanData> changedSpanData = spans.stream().filter(this::shouldProcess).map(spanData -> {
6667
FinishedSpan finishedSpan = OtelFinishedSpan.fromOtel(spanData);
6768
for (SpanFilter spanFilter : spanFilters) {
6869
finishedSpan = spanFilter.map(finishedSpan);
6970
}
7071
return OtelFinishedSpan.toOtel(finishedSpan);
71-
}).peek(spanData -> this.reporters.forEach(reporter -> reporter.report(OtelFinishedSpan.fromOtel(spanData))))
72-
.collect(Collectors.toList()));
72+
}).collect(Collectors.toList());
73+
List<CompletableResultCode> results = new ArrayList<>();
74+
changedSpanData.forEach(spanData -> {
75+
this.reporters.forEach(reporter -> {
76+
try {
77+
reporter.report(OtelFinishedSpan.fromOtel(spanData));
78+
results.add(CompletableResultCode.ofSuccess());
79+
}
80+
catch (Exception ex) {
81+
results.add(CompletableResultCode.ofFailure());
82+
}
83+
});
84+
});
85+
this.exporters.forEach(spanExporter -> results.add(spanExporter.export(changedSpanData)));
86+
return CompletableResultCode.ofAll(results);
7387
}
7488

7589
private boolean shouldProcess(SpanData span) {
@@ -83,12 +97,24 @@ private boolean shouldProcess(SpanData span) {
8397

8498
@Override
8599
public CompletableResultCode flush() {
86-
return this.delegate.flush();
100+
return CompletableResultCode
101+
.ofAll(this.exporters.stream().map(SpanExporter::flush).collect(Collectors.toList()));
87102
}
88103

89104
@Override
90105
public CompletableResultCode shutdown() {
91-
return this.delegate.shutdown();
106+
List<CompletableResultCode> results = new ArrayList<>();
107+
for (SpanReporter reporter : this.reporters) {
108+
try {
109+
reporter.close();
110+
results.add(CompletableResultCode.ofSuccess());
111+
}
112+
catch (Exception ex) {
113+
results.add(CompletableResultCode.ofFailure());
114+
}
115+
}
116+
results.addAll(this.exporters.stream().map(SpanExporter::shutdown).collect(Collectors.toList()));
117+
return CompletableResultCode.ofAll(results);
92118
}
93119

94120
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/**
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.tracing.otel.bridge;
17+
18+
import io.micrometer.tracing.exporter.SpanExportingPredicate;
19+
import io.micrometer.tracing.exporter.SpanFilter;
20+
import io.micrometer.tracing.exporter.SpanReporter;
21+
import io.opentelemetry.api.common.Attributes;
22+
import io.opentelemetry.api.trace.SpanContext;
23+
import io.opentelemetry.api.trace.SpanKind;
24+
import io.opentelemetry.sdk.common.CompletableResultCode;
25+
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
26+
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
27+
import io.opentelemetry.sdk.resources.Resource;
28+
import io.opentelemetry.sdk.trace.data.EventData;
29+
import io.opentelemetry.sdk.trace.data.LinkData;
30+
import io.opentelemetry.sdk.trace.data.SpanData;
31+
import io.opentelemetry.sdk.trace.data.StatusData;
32+
import io.opentelemetry.sdk.trace.export.SpanExporter;
33+
import org.assertj.core.api.BDDAssertions;
34+
import org.junit.jupiter.api.Test;
35+
import org.mockito.BDDMockito;
36+
37+
import java.util.Arrays;
38+
import java.util.Collections;
39+
import java.util.List;
40+
41+
import static org.mockito.BDDMockito.given;
42+
import static org.mockito.BDDMockito.then;
43+
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.verify;
45+
46+
class CompositeSpanExporterTests {
47+
48+
@Test
49+
void should_call_predicate_filter_reporter_and_then_exporter() {
50+
SpanExporter exporter = mock(SpanExporter.class);
51+
given(exporter.export(BDDMockito.any())).willReturn(CompletableResultCode.ofSuccess());
52+
SpanExportingPredicate predicate = span -> span.getName().equals("foo");
53+
SpanFilter filter = span -> span.setName("baz");
54+
SpanReporter reporter = mock(SpanReporter.class);
55+
56+
SpanData fooSpan = new CustomSpanData("foo");
57+
SpanData barSpan = new CustomSpanData("bar");
58+
59+
CompletableResultCode resultCode = new CompositeSpanExporter(Collections.singleton(exporter),
60+
Collections.singletonList(predicate), Collections.singletonList(reporter),
61+
Collections.singletonList(filter)).export(Arrays.asList(fooSpan, barSpan));
62+
63+
then(reporter).should().report(BDDMockito.argThat(finishedSpan -> "baz".equals(finishedSpan.getName())));
64+
then(exporter).should().export(
65+
BDDMockito.argThat(spans -> spans.size() == 1 && "baz".equals(spans.iterator().next().getName())));
66+
BDDAssertions.then(resultCode.isSuccess()).isTrue();
67+
}
68+
69+
@Test
70+
void should_flush_all_exporters() {
71+
SpanExporter exporter = mock(SpanExporter.class);
72+
given(exporter.flush()).willReturn(CompletableResultCode.ofSuccess());
73+
74+
CompletableResultCode resultCode = new CompositeSpanExporter(Collections.singleton(exporter), null, null, null)
75+
.flush();
76+
77+
then(exporter).should().flush();
78+
BDDAssertions.then(resultCode.isSuccess()).isTrue();
79+
}
80+
81+
@Test
82+
void should_shutdown_all_exporters() {
83+
SpanExporter exporter = mock(SpanExporter.class);
84+
given(exporter.shutdown()).willReturn(CompletableResultCode.ofSuccess());
85+
86+
CompletableResultCode resultCode = new CompositeSpanExporter(Collections.singleton(exporter), null, null, null)
87+
.shutdown();
88+
89+
verify(exporter).shutdown();
90+
BDDAssertions.then(resultCode.isSuccess()).isTrue();
91+
}
92+
93+
static class CustomSpanData implements SpanData {
94+
95+
private String name;
96+
97+
CustomSpanData(String name) {
98+
this.name = name;
99+
}
100+
101+
@Override
102+
public String getName() {
103+
return this.name;
104+
}
105+
106+
@Override
107+
public SpanKind getKind() {
108+
return SpanKind.PRODUCER;
109+
}
110+
111+
@Override
112+
public SpanContext getSpanContext() {
113+
return SpanContext.getInvalid();
114+
}
115+
116+
@Override
117+
public SpanContext getParentSpanContext() {
118+
return SpanContext.getInvalid();
119+
}
120+
121+
@Override
122+
public StatusData getStatus() {
123+
return StatusData.ok();
124+
}
125+
126+
@Override
127+
public long getStartEpochNanos() {
128+
return 0L;
129+
}
130+
131+
@Override
132+
public Attributes getAttributes() {
133+
return Attributes.empty();
134+
}
135+
136+
@Override
137+
public List<EventData> getEvents() {
138+
return Collections.emptyList();
139+
}
140+
141+
@Override
142+
public List<LinkData> getLinks() {
143+
return Collections.emptyList();
144+
}
145+
146+
@Override
147+
public long getEndEpochNanos() {
148+
return 0L;
149+
}
150+
151+
@Override
152+
public boolean hasEnded() {
153+
return true;
154+
}
155+
156+
@Override
157+
public int getTotalRecordedEvents() {
158+
return 10;
159+
}
160+
161+
@Override
162+
public int getTotalRecordedLinks() {
163+
return 20;
164+
}
165+
166+
@Override
167+
public int getTotalAttributeCount() {
168+
return 30;
169+
}
170+
171+
@Override
172+
public InstrumentationLibraryInfo getInstrumentationLibraryInfo() {
173+
return InstrumentationLibraryInfo.empty();
174+
}
175+
176+
@Override
177+
public InstrumentationScopeInfo getInstrumentationScopeInfo() {
178+
return InstrumentationScopeInfo.empty();
179+
}
180+
181+
@Override
182+
public Resource getResource() {
183+
return Resource.empty();
184+
}
185+
186+
}
187+
188+
}

micrometer-tracing/src/main/java/io/micrometer/tracing/exporter/SpanReporter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@
2121
* @author Marcin Grzejszczak
2222
* @since 1.0.0
2323
*/
24-
public interface SpanReporter {
24+
public interface SpanReporter extends AutoCloseable {
2525

2626
/**
2727
* Reports the finished span.
2828
* @param span a span that was ended and is ready to be reported.
2929
*/
3030
void report(FinishedSpan span);
3131

32+
@Override
33+
default void close() throws Exception {
34+
35+
}
36+
3237
}

0 commit comments

Comments
 (0)