Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/early-rice-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@vercel/otel": minor
---

Ignore auto-configuration based on the OTEL*EXPORTER_OTLP* env vars when trace drains are used. This avoids duplicate trace export.
3 changes: 3 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"trailingComma": "all"
}
26 changes: 24 additions & 2 deletions packages/bridge-emulator/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import formidable from "formidable";
export interface Bridge {
port: number;
fetches: Request[];
reportedSpans: object[];
fetch: (input: string, init?: RequestInit) => Promise<Response>;
reset: () => void;
close: () => void;
}

interface BridgeOptions {
serverPort: number;
traceDrains?: string[];
}

export async function start(opts: BridgeOptions): Promise<Bridge> {
Expand All @@ -34,6 +36,13 @@ interface StatusRequest {
data: { status: string; [key: string]: unknown };
}

interface ReportSpansRequest {
cmd: "reportSpans";
testId: string;
runtime?: string;
data: object;
}

interface UnknownRequest {
cmd: "unknown";
}
Expand All @@ -42,17 +51,21 @@ type BridgeEmulatorRequest =
| UnknownRequest
| AckRequest
| EchoRequest
| StatusRequest;
| StatusRequest
| ReportSpansRequest;

class BridgeEmulatorServer implements Bridge {
public port = -1;
private serverPort: number;
private server: Server | undefined;
private waitingAck = new Map<string, Promise<unknown>>();
private traceDrains: string[] | undefined;
public fetches: Request[] = [];
public reportedSpans: object[] = [];

constructor({ serverPort }: BridgeOptions) {
constructor({ serverPort, traceDrains }: BridgeOptions) {
this.serverPort = serverPort;
this.traceDrains = traceDrains;
}

async connect(): Promise<void> {
Expand Down Expand Up @@ -144,6 +157,12 @@ class BridgeEmulatorServer implements Bridge {
res.end();
return;
}
if (json.cmd === "reportSpans") {
res.writeHead(204, "OK", { "X-Server": "bridge" });
res.end();
this.reportedSpans.push(json.data);
return;
}

res.writeHead(400, "Bad request", { "X-Server": "bridge" });
res.end();
Expand Down Expand Up @@ -204,6 +223,9 @@ class BridgeEmulatorServer implements Bridge {
"x-otel-test-id": testId,
"x-otel-test-url": input,
"x-otel-test-bridge-port": String(this.port),
...(this.traceDrains
? { "x-otel-test-trace-drains": this.traceDrains.join(",") }
: undefined),
},
});
const resClone = res.clone();
Expand Down
16 changes: 16 additions & 0 deletions packages/bridge-emulator/src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class BridgeEmulatorContextReader implements TextMapPropagator {
"x-otel-test-id": testId,
"x-otel-test-url": url,
"x-otel-test-bridge-port": bridgePort,
"x-otel-test-trace-drains": traceDrainsCommaDelimited,
...headers
} = allHeaders;
if (testId && bridgePort) {
Expand Down Expand Up @@ -109,7 +110,22 @@ export class BridgeEmulatorContextReader implements TextMapPropagator {
reportSpans: (data): void => {
// eslint-disable-next-line no-console
console.log("[BridgeEmulatorServer] reportSpans", data);
void fetch(`http://127.0.0.1:${bridgePort}`, {
method: "POST",
body: JSON.stringify({
cmd: "reportSpans",
testId,
runtime: process.env.NEXT_RUNTIME,
data: data ?? {},
}),
headers: { "content-type": "application/json" },
// @ts-expect-error - internal Next request.
next: { internal: true },
});
},
...(traceDrainsCommaDelimited
? { traceDrains: traceDrainsCommaDelimited.split(",") }
: undefined),
},
};
}
Expand Down
2 changes: 1 addition & 1 deletion packages/otel/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async function buildAll(): Promise<void> {
const s = await stat(file);
if (s.size > maxSize) {
errors.push(
`${file}: the size of ${s.size} is over the maximum allowed size of ${maxSize}`
`${file}: the size of ${s.size} is over the maximum allowed size of ${maxSize}`,
);
}
}
Expand Down
43 changes: 43 additions & 0 deletions packages/otel/src/processor/filter-when-drained-span-processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import type { Context } from "@opentelemetry/api";
import type {
Span,
ReadableSpan,
SpanProcessor,
} from "@opentelemetry/sdk-trace-base";
import { diag } from "@opentelemetry/api";
import { isDraining } from "../vercel-request-context/is-draining";

let reported = false;

/** @internal */
export class FilterWhenDrainedSpanProcessor implements SpanProcessor {
constructor(private processor: SpanProcessor) {}

forceFlush(): Promise<void> {
return this.processor.forceFlush();
}

shutdown(): Promise<void> {
return this.processor.shutdown();
}

onStart(span: Span, parentContext: Context): void {
if (isDraining()) {
if (!reported) {
reported = true;
diag.debug(
"@vercel/otel: skipping automatic exporter due to configured trace drains",
);
}
return;
}
this.processor.onStart(span, parentContext);
}

onEnd(span: ReadableSpan): void {
if (isDraining()) {
return;
}
this.processor.onEnd(span);
}
}
61 changes: 35 additions & 26 deletions packages/otel/src/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import { FetchInstrumentation } from "./instrumentations/fetch";
import { W3CTraceContextPropagator } from "./propagators/w3c-tracecontext-propagator";
import { VercelRuntimePropagator } from "./vercel-request-context/propagator";
import { VercelRuntimeSpanExporter } from "./vercel-request-context/exporter";
import { FilterWhenDrainedSpanProcessor } from "./processor/filter-when-drained-span-processor";

type Env = ReturnType<typeof parseEnvironment>;

Expand Down Expand Up @@ -138,7 +139,7 @@ export class Sdk {
"vercel.project_id": process.env.VERCEL_PROJECT_ID || undefined,

...configuration.attributes,
})
}),
);
const resourceDetectors = configuration.resourceDetectors ?? [
envDetectorSync,
Expand All @@ -154,17 +155,17 @@ export class Sdk {
const propagators = parsePropagators(
configuration.propagators,
configuration,
env
env,
);
const traceSampler = parseSampler(configuration.traceSampler, env);
const spanProcessors = parseSpanProcessor(
configuration.spanProcessors,
configuration,
env
env,
);
if (spanProcessors.length === 0) {
diag.warn(
"@vercel/otel: No span processors configured. No spans will be exported."
"@vercel/otel: No span processors configured. No spans will be exported.",
);
}
const spanLimits = configuration.spanLimits;
Expand All @@ -177,8 +178,8 @@ export class Sdk {
tracerProvider.addSpanProcessor(
new CompositeSpanProcessor(
spanProcessors,
configuration.attributesFromHeaders
)
configuration.attributesFromHeaders,
),
);
tracerProvider.register({
contextManager,
Expand Down Expand Up @@ -207,7 +208,7 @@ export class Sdk {

const instrumentations = parseInstrumentations(
configuration.instrumentations,
configuration.instrumentationConfig
configuration.instrumentationConfig,
);
this.disableInstrumentations = registerInstrumentations({
instrumentations,
Expand All @@ -232,7 +233,7 @@ export class Sdk {
diag.info(
"@vercel/otel: shutting down",
promises.length,
process.env.NEXT_RUNTIME
process.env.NEXT_RUNTIME,
);

await Promise.all(promises);
Expand All @@ -254,21 +255,21 @@ function getEnv(): Env {

function parseInstrumentations(
arg: InstrumentationOptionOrName[] | undefined,
instrumentationConfig: InstrumentationConfiguration | undefined
instrumentationConfig: InstrumentationConfiguration | undefined,
): InstrumentationOption[] {
return (arg ?? ["auto"])
.map((instrumentationOrName) => {
if (instrumentationOrName === "auto") {
diag.debug(
"@vercel/otel: Configure instrumentations: fetch",
instrumentationConfig?.fetch
instrumentationConfig?.fetch,
);
return [new FetchInstrumentation(instrumentationConfig?.fetch)];
}
if (instrumentationOrName === "fetch") {
diag.debug(
"@vercel/otel: Configure instrumentations: fetch",
instrumentationConfig?.fetch
instrumentationConfig?.fetch,
);
return new FetchInstrumentation(instrumentationConfig?.fetch);
}
Expand All @@ -280,7 +281,7 @@ function parseInstrumentations(
function parsePropagators(
arg: PropagatorOrName[] | undefined,
configuration: Configuration,
env: Env
env: Env,
): TextMapPropagator[] {
const envPropagators =
process.env.OTEL_PROPAGATORS &&
Expand Down Expand Up @@ -311,7 +312,7 @@ function parsePropagators(
diag.debug(
`@vercel/otel: Configure propagators: ${autoList
.map((i) => i.name)
.join(", ")}`
.join(", ")}`,
);
return autoList.map((i) => i.propagator);
}
Expand Down Expand Up @@ -370,8 +371,8 @@ function parseSampler(arg: SampleOrName | undefined, env: Env): Sampler {
default:
diag.error(
`@vercel/otel: OTEL_TRACES_SAMPLER value "${String(
env.OTEL_TRACES_SAMPLER
)} invalid, defaulting to ${FALLBACK_OTEL_TRACES_SAMPLER}".`
env.OTEL_TRACES_SAMPLER,
)} invalid, defaulting to ${FALLBACK_OTEL_TRACES_SAMPLER}".`,
);
return new AlwaysOnSampler();
}
Expand All @@ -383,27 +384,27 @@ function getSamplerProbabilityFromEnv(env: Env): number {
env.OTEL_TRACES_SAMPLER_ARG === ""
) {
diag.error(
`@vercel/otel: OTEL_TRACES_SAMPLER_ARG is blank, defaulting to ${DEFAULT_RATIO}.`
`@vercel/otel: OTEL_TRACES_SAMPLER_ARG is blank, defaulting to ${DEFAULT_RATIO}.`,
);
return DEFAULT_RATIO;
}

diag.debug(
"@vercel/otel: Configure sampler probability: ",
env.OTEL_TRACES_SAMPLER_ARG
env.OTEL_TRACES_SAMPLER_ARG,
);
const probability = Number(env.OTEL_TRACES_SAMPLER_ARG);

if (isNaN(probability)) {
diag.error(
`@vercel/otel: OTEL_TRACES_SAMPLER_ARG=${env.OTEL_TRACES_SAMPLER_ARG} was given, but it is invalid, defaulting to ${DEFAULT_RATIO}.`
`@vercel/otel: OTEL_TRACES_SAMPLER_ARG=${env.OTEL_TRACES_SAMPLER_ARG} was given, but it is invalid, defaulting to ${DEFAULT_RATIO}.`,
);
return DEFAULT_RATIO;
}

if (probability < 0 || probability > 1) {
diag.error(
`@vercel/otel: OTEL_TRACES_SAMPLER_ARG=${env.OTEL_TRACES_SAMPLER_ARG} was given, but it is out of range ([0..1]), defaulting to ${DEFAULT_RATIO}.`
`@vercel/otel: OTEL_TRACES_SAMPLER_ARG=${env.OTEL_TRACES_SAMPLER_ARG} was given, but it is out of range ([0..1]), defaulting to ${DEFAULT_RATIO}.`,
);
return DEFAULT_RATIO;
}
Expand All @@ -414,7 +415,7 @@ function getSamplerProbabilityFromEnv(env: Env): number {
function parseSpanProcessor(
arg: SpanProcessorOrName[] | undefined,
configuration: Configuration,
env: Env
env: Env,
): SpanProcessor[] {
return [
...(arg ?? ["auto"])
Expand All @@ -434,7 +435,7 @@ function parseSpanProcessor(
diag.debug(
"@vercel/otel: Configure vercel otel collector on port: ",
port,
protocol
protocol,
);
const config = {
url: `http://localhost:${port}/v1/traces`,
Expand All @@ -445,7 +446,11 @@ function parseSpanProcessor(
? new OTLPHttpProtoTraceExporter(config)
: new OTLPHttpJsonTraceExporter(config);

processors.push(new BatchSpanProcessor(exporter));
processors.push(
new FilterWhenDrainedSpanProcessor(
new BatchSpanProcessor(exporter),
),
);
}

// Consider going throw `VERCEL_OTEL_ENDPOINTS` (otel collector) for OTLP.
Expand All @@ -455,7 +460,11 @@ function parseSpanProcessor(
env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT ||
env.OTEL_EXPORTER_OTLP_ENDPOINT
) {
processors.push(new BatchSpanProcessor(parseTraceExporter(env)));
processors.push(
new FilterWhenDrainedSpanProcessor(
new BatchSpanProcessor(parseTraceExporter(env)),
),
);
}

return processors;
Expand All @@ -482,14 +491,14 @@ function parseTraceExporter(env: Env): SpanExporter {
const headers = {
...baggageUtils.parseKeyPairsIntoRecord(env.OTEL_EXPORTER_OTLP_HEADERS),
...baggageUtils.parseKeyPairsIntoRecord(
env.OTEL_EXPORTER_OTLP_TRACES_HEADERS
env.OTEL_EXPORTER_OTLP_TRACES_HEADERS,
),
};
diag.debug(
"@vercel/otel: Configure trace exporter: ",
protocol,
url,
`headers: ${Object.keys(headers).join(",") || "<none>"}`
`headers: ${Object.keys(headers).join(",") || "<none>"}`,
);
switch (protocol) {
case "http/json":
Expand All @@ -499,7 +508,7 @@ function parseTraceExporter(env: Env): SpanExporter {
default:
// "grpc" protocol is not supported in Edge.
diag.warn(
`@vercel/otel: Unsupported OTLP traces protocol: ${protocol}. Using http/protobuf.`
`@vercel/otel: Unsupported OTLP traces protocol: ${protocol}. Using http/protobuf.`,
);
return new OTLPHttpProtoTraceExporter();
}
Expand Down
Loading