Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
*/
package io.micrometer.tracing.otel.bridge;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import io.micrometer.tracing.exporter.FinishedSpan;
import io.micrometer.tracing.exporter.SpanExportingPredicate;
import io.micrometer.tracing.exporter.SpanFilter;
Expand All @@ -28,16 +23,22 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
* Wraps the {@link SpanExporter} delegate with additional predicate, reporting and
* filtering logic.
* Creates a {@link SpanExporter} with additional predicate, reporting and filtering
* logic.
*
* @author Marcin Grzejszczak
* @since 1.0.0
*/
public class CompositeSpanExporter implements io.opentelemetry.sdk.trace.export.SpanExporter {

private final io.opentelemetry.sdk.trace.export.SpanExporter delegate;
private final Collection<io.opentelemetry.sdk.trace.export.SpanExporter> exporters;

private final List<SpanExportingPredicate> predicates;

Expand All @@ -47,29 +48,42 @@ public class CompositeSpanExporter implements io.opentelemetry.sdk.trace.export.

/**
* Creates a new instance of {@link CompositeSpanExporter}.
* @param delegate a {@link SpanExporter} delegate
* @param exporters {@link SpanExporter} exporters
* @param predicates predicates that decide which spans should be exported
* @param reporters reporters that export spans
* @param spanFilters filters that mutate spans before reporting them
*/
public CompositeSpanExporter(SpanExporter delegate, List<SpanExportingPredicate> predicates,
List<SpanReporter> reporters, List<SpanFilter> spanFilters) {
this.delegate = delegate;
public CompositeSpanExporter(Collection<io.opentelemetry.sdk.trace.export.SpanExporter> exporters,
List<SpanExportingPredicate> predicates, List<SpanReporter> reporters, List<SpanFilter> spanFilters) {
this.exporters = exporters;
this.predicates = predicates == null ? Collections.emptyList() : predicates;
this.reporters = reporters == null ? Collections.emptyList() : reporters;
this.spanFilters = spanFilters == null ? Collections.emptyList() : spanFilters;
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return this.delegate.export(spans.stream().filter(this::shouldProcess).map(spanData -> {
List<SpanData> changedSpanData = spans.stream().filter(this::shouldProcess).map(spanData -> {
FinishedSpan finishedSpan = OtelFinishedSpan.fromOtel(spanData);
for (SpanFilter spanFilter : spanFilters) {
finishedSpan = spanFilter.map(finishedSpan);
}
return OtelFinishedSpan.toOtel(finishedSpan);
}).peek(spanData -> this.reporters.forEach(reporter -> reporter.report(OtelFinishedSpan.fromOtel(spanData))))
.collect(Collectors.toList()));
}).collect(Collectors.toList());
List<CompletableResultCode> results = new ArrayList<>();
changedSpanData.forEach(spanData -> {
this.reporters.forEach(reporter -> {
try {
reporter.report(OtelFinishedSpan.fromOtel(spanData));
results.add(CompletableResultCode.ofSuccess());
}
catch (Exception ex) {
results.add(CompletableResultCode.ofFailure());
}
});
});
this.exporters.forEach(spanExporter -> results.add(spanExporter.export(changedSpanData)));
return CompletableResultCode.ofAll(results);
}

private boolean shouldProcess(SpanData span) {
Expand All @@ -83,12 +97,24 @@ private boolean shouldProcess(SpanData span) {

@Override
public CompletableResultCode flush() {
return this.delegate.flush();
return CompletableResultCode
.ofAll(this.exporters.stream().map(SpanExporter::flush).collect(Collectors.toList()));
}

@Override
public CompletableResultCode shutdown() {
return this.delegate.shutdown();
List<CompletableResultCode> results = new ArrayList<>();
for (SpanReporter reporter : this.reporters) {
try {
reporter.close();
results.add(CompletableResultCode.ofSuccess());
}
catch (Exception ex) {
results.add(CompletableResultCode.ofFailure());
}
}
results.addAll(this.exporters.stream().map(SpanExporter::shutdown).collect(Collectors.toList()));
return CompletableResultCode.ofAll(results);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/**
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.tracing.otel.bridge;

import io.micrometer.tracing.exporter.SpanExportingPredicate;
import io.micrometer.tracing.exporter.SpanFilter;
import io.micrometer.tracing.exporter.SpanReporter;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import org.assertj.core.api.BDDAssertions;
import org.junit.jupiter.api.Test;
import org.mockito.BDDMockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

class CompositeSpanExporterTests {

@Test
void should_call_predicate_filter_reporter_and_then_exporter() {
SpanExporter exporter = mock(SpanExporter.class);
given(exporter.export(BDDMockito.any())).willReturn(CompletableResultCode.ofSuccess());
SpanExportingPredicate predicate = span -> span.getName().equals("foo");
SpanFilter filter = span -> span.setName("baz");
SpanReporter reporter = mock(SpanReporter.class);

SpanData fooSpan = new CustomSpanData("foo");
SpanData barSpan = new CustomSpanData("bar");

CompletableResultCode resultCode = new CompositeSpanExporter(Collections.singleton(exporter),
Collections.singletonList(predicate), Collections.singletonList(reporter),
Collections.singletonList(filter)).export(Arrays.asList(fooSpan, barSpan));

then(reporter).should().report(BDDMockito.argThat(finishedSpan -> "baz".equals(finishedSpan.getName())));
then(exporter).should().export(
BDDMockito.argThat(spans -> spans.size() == 1 && "baz".equals(spans.iterator().next().getName())));
BDDAssertions.then(resultCode.isSuccess()).isTrue();
}

@Test
void should_flush_all_exporters() {
SpanExporter exporter = mock(SpanExporter.class);
given(exporter.flush()).willReturn(CompletableResultCode.ofSuccess());

CompletableResultCode resultCode = new CompositeSpanExporter(Collections.singleton(exporter), null, null, null)
.flush();

then(exporter).should().flush();
BDDAssertions.then(resultCode.isSuccess()).isTrue();
}

@Test
void should_shutdown_all_exporters() {
SpanExporter exporter = mock(SpanExporter.class);
given(exporter.shutdown()).willReturn(CompletableResultCode.ofSuccess());

CompletableResultCode resultCode = new CompositeSpanExporter(Collections.singleton(exporter), null, null, null)
.shutdown();

verify(exporter).shutdown();
BDDAssertions.then(resultCode.isSuccess()).isTrue();
}

static class CustomSpanData implements SpanData {

private String name;

CustomSpanData(String name) {
this.name = name;
}

@Override
public String getName() {
return this.name;
}

@Override
public SpanKind getKind() {
return SpanKind.PRODUCER;
}

@Override
public SpanContext getSpanContext() {
return SpanContext.getInvalid();
}

@Override
public SpanContext getParentSpanContext() {
return SpanContext.getInvalid();
}

@Override
public StatusData getStatus() {
return StatusData.ok();
}

@Override
public long getStartEpochNanos() {
return 0L;
}

@Override
public Attributes getAttributes() {
return Attributes.empty();
}

@Override
public List<EventData> getEvents() {
return Collections.emptyList();
}

@Override
public List<LinkData> getLinks() {
return Collections.emptyList();
}

@Override
public long getEndEpochNanos() {
return 0L;
}

@Override
public boolean hasEnded() {
return true;
}

@Override
public int getTotalRecordedEvents() {
return 10;
}

@Override
public int getTotalRecordedLinks() {
return 20;
}

@Override
public int getTotalAttributeCount() {
return 30;
}

@Override
public InstrumentationLibraryInfo getInstrumentationLibraryInfo() {
return InstrumentationLibraryInfo.empty();
}

@Override
public InstrumentationScopeInfo getInstrumentationScopeInfo() {
return InstrumentationScopeInfo.empty();
}

@Override
public Resource getResource() {
return Resource.empty();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@
* @author Marcin Grzejszczak
* @since 1.0.0
*/
public interface SpanReporter {
public interface SpanReporter extends AutoCloseable {

/**
* Reports the finished span.
* @param span a span that was ended and is ready to be reported.
*/
void report(FinishedSpan span);

@Override
default void close() throws Exception {

}

}