Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Sources/Observability/Client/ObservabilityClientFactory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public struct ObservabilityClientFactory {
}

if options.traces == .enabled {
let tracesExporter = SamplingTraceExporterDecorator(
exporter: OtlpHttpTraceExporter(
endpoint: url,
config: .init(headers: options.customHeaders.map({ ($0.key, $0.value) }))
),
sampler: sampler
let traceEventExporter = OtlpTraceEventExporter(
endpoint: url,
config: .init(headers: options.customHeaders.map({ ($0.key, $0.value) }))
)
let decorator = TracerDecorator(options: options, sessionManager: sessionManager, exporter: tracesExporter)
Task {
await batchWorker.addExporter(traceEventExporter)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Asynchronous Exporter Registration Causes Lost Traces Metrics Logs

Race condition: The trace exporter is added to batchWorker in a fire-and-forget Task, but execution continues immediately without waiting. This means traces could be generated and sent to the eventQueue before the exporter is registered with batchWorker, causing those traces to be lost or not exported properly. The same issue exists for metrics (lines 123-125) and logs (lines 54-56). The Task should be awaited or the exporter should be added synchronously before the TracerDecorator is created and used.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good. It will be just in buffer for this time.

let decorator = TracerDecorator(options: options, sessionManager: sessionManager, sampler: sampler, eventQueue: eventQueue)
/// tracer is enabled
if options.autoInstrumentation.contains(.urlSession) {
autoInstrumentation.append(NetworkInstrumentationManager(options: options, tracer: decorator, session: sessionManager))
Expand Down
42 changes: 0 additions & 42 deletions Sources/Observability/Exporters/LDStdoutExporter.swift

This file was deleted.

39 changes: 0 additions & 39 deletions Sources/Observability/Exporters/SamplingLogExporter.swift

This file was deleted.

36 changes: 0 additions & 36 deletions Sources/Observability/Exporters/SamplingTraceExporter.swift

This file was deleted.

20 changes: 10 additions & 10 deletions Sources/Observability/Metrics/OtlpMetricEventExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,33 @@ import Common
import Foundation
import OpenTelemetryProtocolExporterCommon

public final class OtlpMetricEventExporter: EventExporting {
let otlpHttpClient: OtlpHttpClient
final class OtlpMetricEventExporter: EventExporting {
private let otlpHttpClient: OtlpHttpClient

public init(endpoint: URL,
config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
init(endpoint: URL,
config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
self.otlpHttpClient = OtlpHttpClient(endpoint: endpoint,
config: config,
useSession: useSession,
envVarHeaders: envVarHeaders)
}

public func export(items: [EventQueueItem]) async throws {
func export(items: [EventQueueItem]) async throws {
let metricDatas: [OpenTelemetrySdk.MetricData] = items.compactMap { item in
(item.payload as? MetricItem)?.metricData
}
guard metricDatas.isNotEmpty else {
return
}

try await export(metricDatas: metricDatas)
}

private func export(metricDatas: [MetricData],
explicitTimeout: TimeInterval? = nil) async throws {
let body =
Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with { request in
let body = Opentelemetry_Proto_Collector_Metrics_V1_ExportMetricsServiceRequest.with { request in
request.resourceMetrics = MetricsAdapter.toProtoResourceMetrics(
metricData: metricDatas)
}
Expand Down
20 changes: 10 additions & 10 deletions Sources/Observability/Metrics/OtlpMetricScheduleExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import OpenTelemetryApi
import OpenTelemetrySdk

final class OtlpMetricScheduleExporter: MetricExporter {
let eventQueue: EventQueue
let aggregationTemporalitySelector: AggregationTemporalitySelector
let defaultAggregationSelector: DefaultAggregationSelector
private let eventQueue: EventQueue
private let aggregationTemporalitySelector: AggregationTemporalitySelector
private let defaultAggregationSelector: DefaultAggregationSelector

init(eventQueue: EventQueue,
aggregationTemporalitySelector: AggregationTemporalitySelector = AggregationTemporality.alwaysCumulative(),
Expand Down Expand Up @@ -33,17 +33,17 @@ final class OtlpMetricScheduleExporter: MetricExporter {
}

public func getAggregationTemporality(
for instrument: OpenTelemetrySdk.InstrumentType
for instrument: OpenTelemetrySdk.InstrumentType
) -> OpenTelemetrySdk.AggregationTemporality {
return aggregationTemporalitySelector.getAggregationTemporality(
for: instrument)
return aggregationTemporalitySelector.getAggregationTemporality(
for: instrument)
}

// MARK: - DefaultAggregationSelector

public func getDefaultAggregation(
for instrument: OpenTelemetrySdk.InstrumentType
for instrument: OpenTelemetrySdk.InstrumentType
) -> OpenTelemetrySdk.Aggregation {
return defaultAggregationSelector.getDefaultAggregation(for: instrument)
return defaultAggregationSelector.getDefaultAggregation(for: instrument)
}
}
45 changes: 45 additions & 0 deletions Sources/Observability/Traces/EventSpanProcessor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import Foundation
import OpenTelemetryApi
import OpenTelemetrySdk

final class EventSpanProcessor: SpanProcessor {
private let eventQueue: EventQueue
private let sampler: ExportSampler
let isStartRequired = false
let isEndRequired = true

init(eventQueue: EventQueue, sampler: ExportSampler) {
self.eventQueue = eventQueue
self.sampler = sampler
}

func onStart(parentContext: OpenTelemetryApi.SpanContext?, span: any OpenTelemetrySdk.ReadableSpan) {
// No-op
}

func onEnd(span: any OpenTelemetrySdk.ReadableSpan) {
if !span.context.traceFlags.sampled {
return
}

let spanData = span.toSpanData()
let sampledItems = sampler.sampleSpans(items: [spanData])
guard !sampledItems.isEmpty else {
return
}

Task {
await eventQueue.send(SpanItem(spanData: spanData))
}
}

func shutdown(explicitTimeout: TimeInterval?) {
// No-op
}

func forceFlush(timeout: TimeInterval?) {
// No-op
}


}
39 changes: 39 additions & 0 deletions Sources/Observability/Traces/OtlpTraceEventExporter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import OpenTelemetrySdk
import Common
import Foundation
import OpenTelemetryProtocolExporterCommon

final class OtlpTraceEventExporter: EventExporting {
private let otlpHttpClient: OtlpHttpClient

init(endpoint: URL,
config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes) {
self.otlpHttpClient = OtlpHttpClient(endpoint: endpoint,
config: config,
useSession: useSession,
envVarHeaders: envVarHeaders)
}

func export(items: [EventQueueItem]) async throws {
let spanDatas: [OpenTelemetrySdk.SpanData] = items.compactMap { item in
(item.payload as? SpanItem)?.spanData
}
guard spanDatas.isNotEmpty else {
return
}
try await export(spanDatas: spanDatas)
}

private func export(spanDatas: [OpenTelemetrySdk.SpanData],
explicitTimeout: TimeInterval? = nil) async throws {
let body =
Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(
spanDataList: spanDatas)
}

try await otlpHttpClient.send(body: body, explicitTimeout: explicitTimeout)
}
}
20 changes: 20 additions & 0 deletions Sources/Observability/Traces/SpanItem.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import Foundation
import OpenTelemetrySdk

struct SpanItem: EventQueueItemPayload {
var exporterClass: AnyClass {
Observability.OtlpTraceEventExporter.self
}

let spanData: SpanData
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private ? understanding that all coming from EventQueueItemPayload could be internal, but why spanData is internal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need read spanData in exporter. SpanData is what Protobuf exporter uses

var timestamp: TimeInterval

init(spanData: SpanData) {
self.spanData = spanData
self.timestamp = spanData.endTime.timeIntervalSince1970
}

func cost() -> Int {
300 + spanData.events.count * 100 + spanData.attributes.count * 100
}
}
32 changes: 7 additions & 25 deletions Sources/Observability/Traces/TracerDecorator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,18 @@ final class TracerDecorator: Tracer {
private let options: Options
private let sessionManager: SessionManaging
private let tracerProvider: any TracerProvider
private let spanProcessor: any SpanProcessor
private let tracer: any Tracer

init(options: Options, sessionManager: SessionManaging, exporter: SpanExporter) {
private var activeSpan: Span?

init(options: Options, sessionManager: SessionManaging, sampler: ExportSampler, eventQueue: EventQueue) {
self.options = options
self.sessionManager = sessionManager
/// Using the default values from OpenTelemetry for Swift
/// For reference check:
///https://github.com/open-telemetry/opentelemetry-swift/blob/main/Sources/OpenTelemetrySdk/Trace/SpanProcessors/BatchSpanProcessor.swift
let processor = BatchSpanProcessor(
spanExporter: exporter,
scheduleDelay: 5,
exportTimeout: 30,
maxQueueSize: 2048,
maxExportBatchSize: 512,
)
self.spanProcessor = processor
let processor = EventSpanProcessor(eventQueue: eventQueue, sampler: sampler)
let provider = TracerProviderBuilder()
.add(spanProcessor: processor)
.with(resource: Resource(attributes: options.resourceAttributes))
.build()
self.tracerProvider = provider


self.tracer = tracerProvider.get(
instrumentationName: options.serviceName,
instrumentationVersion: options.serviceVersion,
Expand Down Expand Up @@ -84,20 +72,14 @@ extension TracerDecorator: TracesApi {
}

func flush() -> Bool {
/// span processor flush method differs from metrics and logs, it doesn't return a Result type
/// Processes all span events that have not yet been processed.
/// This method is executed synchronously on the calling thread
/// - Parameter timeout: Maximum time the flush complete or abort. If nil, it will wait indefinitely
/// func forceFlush(timeout: TimeInterval?)
spanProcessor.forceFlush(timeout: 5.0)
return true
}
}

/// Internal method used to set span start date
extension TracerDecorator {
func startSpan(name: String, attributes: [String : AttributeValue], startTime: Date = Date()) -> any Span {
let builder = tracer.spanBuilder(spanName: name)
extension Tracer {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Flush() Fails to Export Pending Spans

The flush() method now returns true without actually flushing any data. The previous implementation called spanProcessor.forceFlush(timeout: 5.0) to ensure spans were exported. With the new event queue architecture, the flush() method should trigger flushing of the eventQueue or batchWorker to ensure pending spans are exported, but it currently does nothing. This could lead to data loss if callers rely on flush() to ensure spans are exported before shutdown or other critical operations.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just server requirement from protocol and doesn't needed for mobile

func startSpan(name: String, attributes: [String : AttributeValue], startTime: Date) -> any Span {
let builder = spanBuilder(spanName: name)

if let parent = OpenTelemetry.instance.contextProvider.activeSpan {
builder.setParent(parent)
Expand Down
2 changes: 1 addition & 1 deletion TestApp/Sources/AppDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ let config = { () -> LDConfig in
// serviceName: "i-os-sessions",

sessionBackgroundTimeout: 3,
autoInstrumentation: [.memory])),
autoInstrumentation: [.memory, .urlSession])),
SessionReplay(options: .init(
isEnabled: true,
privacy: .init(
Expand Down