Skip to content

Commit cb83d30

Browse files
nordfjordrauno56vmarchaud
authored
feat(propagation-utils): end pub-sub process span on promise settled (#1055)
Co-authored-by: Rauno Viskus <[email protected]> Co-authored-by: Rauno Viskus <[email protected]> Co-authored-by: Valentin Marchaud <[email protected]>
1 parent 227ae20 commit cb83d30

File tree

3 files changed

+156
-2
lines changed

3 files changed

+156
-2
lines changed

packages/opentelemetry-propagation-utils/package.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
"precompile": "tsc --version && lerna run version:update --scope @opentelemetry/propagation-utils --include-dependencies",
1616
"prepare": "npm run compile",
1717
"prewatch": "npm run precompile",
18+
"tdd": "npm run test -- --watch-extensions ts --watch",
19+
"test": "nyc ts-mocha -p tsconfig.json 'test/**/*.test.ts'",
1820
"watch": "tsc --build --watch tsconfig.json tsconfig.esm.json"
1921
},
2022
"repository": "open-telemetry/opentelemetry-js-contrib",
@@ -43,8 +45,16 @@
4345
},
4446
"devDependencies": {
4547
"@opentelemetry/api": "^1.0.0",
48+
"@opentelemetry/contrib-test-utils": "^0.31.0",
49+
"@types/mocha": "^9.1.1",
4650
"@types/node": "16.11.21",
51+
"@types/sinon": "^10.0.11",
52+
"expect": "27.4.2",
4753
"gts": "3.1.0",
54+
"mocha": "7.2.0",
55+
"nyc": "15.1.0",
56+
"sinon": "13.0.1",
57+
"ts-mocha": "10.0.0",
4858
"typescript": "4.3.5"
4959
}
5060
}

packages/opentelemetry-propagation-utils/src/pubsub-propagation.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ const patchArrayFilter = (
5656
});
5757
};
5858

59+
function isPromise(value: unknown): value is Promise<unknown> {
60+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
61+
return typeof (value as any)?.then === 'function';
62+
}
63+
5964
const patchArrayFunction = (
6065
messages: OtelProcessedMessage[],
6166
functionName: 'forEach' | 'map',
@@ -77,10 +82,18 @@ const patchArrayFunction = (
7782
if (!messageSpan) return callback.apply(this, callbackArgs);
7883

7984
const res = context.with(trace.setSpan(loopContext, messageSpan), () => {
85+
let result: Promise<unknown> | unknown;
8086
try {
81-
return callback.apply(this, callbackArgs);
87+
result = callback.apply(this, callbackArgs);
88+
if (isPromise(result)) {
89+
const endSpan = () => message[END_SPAN_FUNCTION]?.();
90+
result.then(endSpan, endSpan);
91+
}
92+
return result;
8293
} finally {
83-
message[END_SPAN_FUNCTION]?.();
94+
if (!isPromise(result)) {
95+
message[END_SPAN_FUNCTION]?.();
96+
}
8497
}
8598
});
8699

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import utils from '../src/pubsub-propagation';
17+
import {
18+
getTestSpans,
19+
registerInstrumentationTestingProvider,
20+
resetMemoryExporter,
21+
} from '@opentelemetry/contrib-test-utils';
22+
import { ROOT_CONTEXT, trace } from '@opentelemetry/api';
23+
import * as expect from 'expect';
24+
25+
registerInstrumentationTestingProvider();
26+
27+
const tracer = trace.getTracer('test');
28+
29+
afterEach(() => {
30+
resetMemoryExporter();
31+
});
32+
33+
describe('Pubsub propagation', () => {
34+
it('Span ends immediately when the function returns a non-promise', () => {
35+
const messages = [{}];
36+
utils.patchMessagesArrayToStartProcessSpans({
37+
messages,
38+
tracer,
39+
parentContext: ROOT_CONTEXT,
40+
messageToSpanDetails: () => ({
41+
name: 'test',
42+
parentContext: ROOT_CONTEXT,
43+
attributes: {},
44+
}),
45+
});
46+
utils.patchArrayForProcessSpans(messages, tracer, ROOT_CONTEXT);
47+
48+
expect(getTestSpans().length).toBe(0);
49+
50+
messages.map(x => x);
51+
52+
expect(getTestSpans().length).toBe(1);
53+
expect(getTestSpans()[0]).toMatchObject({ name: 'test process' });
54+
});
55+
56+
it('Span ends on promise-resolution', () => {
57+
const messages = [{}];
58+
utils.patchMessagesArrayToStartProcessSpans({
59+
messages,
60+
tracer,
61+
parentContext: ROOT_CONTEXT,
62+
messageToSpanDetails: () => ({
63+
name: 'test',
64+
parentContext: ROOT_CONTEXT,
65+
attributes: {},
66+
}),
67+
});
68+
utils.patchArrayForProcessSpans(messages, tracer, ROOT_CONTEXT);
69+
70+
expect(getTestSpans().length).toBe(0);
71+
72+
let resolve: (value: unknown) => void;
73+
74+
messages.map(
75+
() =>
76+
new Promise(res => {
77+
resolve = res;
78+
})
79+
);
80+
81+
expect(getTestSpans().length).toBe(0);
82+
83+
// @ts-expect-error Typescript thinks this value is used before assignment
84+
resolve(undefined);
85+
86+
// We use setTimeout here to make sure our assertations run
87+
// after the promise resolves
88+
return new Promise(res => setTimeout(res, 0)).then(() => {
89+
expect(getTestSpans().length).toBe(1);
90+
expect(getTestSpans()[0]).toMatchObject({ name: 'test process' });
91+
});
92+
});
93+
94+
it('Span ends on promise-rejection', () => {
95+
const messages = [{}];
96+
utils.patchMessagesArrayToStartProcessSpans({
97+
messages,
98+
tracer,
99+
parentContext: ROOT_CONTEXT,
100+
messageToSpanDetails: () => ({
101+
name: 'test',
102+
parentContext: ROOT_CONTEXT,
103+
attributes: {},
104+
}),
105+
});
106+
utils.patchArrayForProcessSpans(messages, tracer, ROOT_CONTEXT);
107+
108+
expect(getTestSpans().length).toBe(0);
109+
110+
let reject: (value: unknown) => void;
111+
112+
messages.map(
113+
() =>
114+
new Promise((_, rej) => {
115+
reject = rej;
116+
})
117+
);
118+
119+
expect(getTestSpans().length).toBe(0);
120+
121+
// @ts-expect-error Typescript thinks this value is used before assignment
122+
reject(new Error('Failed'));
123+
124+
// We use setTimeout here to make sure our assertations run
125+
// after the promise resolves
126+
return new Promise(res => setTimeout(res, 0)).then(() => {
127+
expect(getTestSpans().length).toBe(1);
128+
expect(getTestSpans()[0]).toMatchObject({ name: 'test process' });
129+
});
130+
});
131+
});

0 commit comments

Comments
 (0)