Skip to content

Commit 5cc3dc2

Browse files
authored
fix(otlp-exporter-base)!: ensure we do not retry after the timeout has elapsed (#4889)
1 parent 01cea7c commit 5cc3dc2

File tree

10 files changed

+166
-65
lines changed

10 files changed

+166
-65
lines changed

experimental/packages/otlp-exporter-base/src/exporter-transport.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@
1717
import { ExportResponse } from './export-response';
1818

1919
export interface IExporterTransport {
20-
send(data: Uint8Array): Promise<ExportResponse>;
20+
send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse>;
2121
shutdown(): void;
2222
}

experimental/packages/otlp-exporter-base/src/platform/node/OTLPExporterNodeBase.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import { ISerializer } from '@opentelemetry/otlp-transformer';
2323
import { IExporterTransport } from '../../exporter-transport';
2424
import { createHttpExporterTransport } from './http-exporter-transport';
2525
import { OTLPExporterError } from '../../types';
26-
import { createRetryingTransport } from '../../retryable-transport';
26+
import { createRetryingTransport } from '../../retrying-transport';
2727

2828
/**
2929
* Collector Metric Exporter abstract base class
@@ -76,7 +76,6 @@ export abstract class OTLPExporterNodeBase<
7676
signalSpecificHeaders
7777
),
7878
url: this.url,
79-
timeoutMillis: this.timeoutMillis,
8079
}),
8180
});
8281
}
@@ -100,16 +99,19 @@ export abstract class OTLPExporterNodeBase<
10099
return;
101100
}
102101

103-
const promise = this._transport.send(data).then(response => {
104-
if (response.status === 'success') {
105-
onSuccess();
106-
return;
107-
}
108-
if (response.status === 'failure' && response.error) {
109-
onError(response.error);
110-
}
111-
onError(new OTLPExporterError('Export failed with unknown error'));
112-
}, onError);
102+
const promise = this._transport
103+
.send(data, this.timeoutMillis)
104+
.then(response => {
105+
if (response.status === 'success') {
106+
onSuccess();
107+
} else if (response.status === 'failure' && response.error) {
108+
onError(response.error);
109+
} else if (response.status === 'retryable') {
110+
onError(new OTLPExporterError('Export failed with retryable status'));
111+
} else {
112+
onError(new OTLPExporterError('Export failed with unknown error'));
113+
}
114+
}, onError);
113115

114116
this._sendingPromises.push(promise);
115117
const popPromise = () => {

experimental/packages/otlp-exporter-base/src/platform/node/http-exporter-transport.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class HttpExporterTransport implements IExporterTransport {
3232

3333
constructor(private _parameters: HttpRequestParameters) {}
3434

35-
async send(data: Uint8Array): Promise<ExportResponse> {
35+
async send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
3636
if (this._send == null) {
3737
// Lazy require to ensure that http/https is not required before instrumentations can wrap it.
3838
const {
@@ -50,9 +50,15 @@ class HttpExporterTransport implements IExporterTransport {
5050
return new Promise<ExportResponse>(resolve => {
5151
// this will always be defined
5252
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
53-
this._send?.(this._parameters, this._agent!, data, result => {
54-
resolve(result);
55-
});
53+
this._send?.(
54+
this._parameters,
55+
this._agent!,
56+
data,
57+
result => {
58+
resolve(result);
59+
},
60+
timeoutMillis
61+
);
5662
});
5763
}
5864
shutdown() {

experimental/packages/otlp-exporter-base/src/platform/node/http-transport-types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ export type sendWithHttp = (
2222
params: HttpRequestParameters,
2323
agent: http.Agent | https.Agent,
2424
data: Uint8Array,
25-
onDone: (response: ExportResponse) => void
25+
onDone: (response: ExportResponse) => void,
26+
timeoutMillis: number
2627
) => void;
2728

2829
export interface HttpRequestParameters {
29-
timeoutMillis: number;
3030
url: string;
3131
headers: Record<string, string>;
3232
compression: 'gzip' | 'none';

experimental/packages/otlp-exporter-base/src/platform/node/http-transport-utils.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ export function sendWithHttp(
4040
params: HttpRequestParameters,
4141
agent: http.Agent | https.Agent,
4242
data: Uint8Array,
43-
onDone: (response: ExportResponse) => void
43+
onDone: (response: ExportResponse) => void,
44+
timeoutMillis: number
4445
): void {
4546
const parsedUrl = new URL(params.url);
4647
const nodeVersion = Number(process.versions.node.split('.')[0]);
@@ -86,7 +87,7 @@ export function sendWithHttp(
8687
});
8788
});
8889

89-
req.setTimeout(params.timeoutMillis, () => {
90+
req.setTimeout(timeoutMillis, () => {
9091
req.destroy();
9192
onDone({
9293
status: 'failure',

experimental/packages/otlp-exporter-base/src/retryable-transport.ts renamed to experimental/packages/otlp-exporter-base/src/retrying-transport.ts

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,42 @@ function getJitter() {
3333
class RetryingTransport implements IExporterTransport {
3434
constructor(private _transport: IExporterTransport) {}
3535

36-
private retry(data: Uint8Array, inMillis: number): Promise<ExportResponse> {
36+
private retry(
37+
data: Uint8Array,
38+
timeoutMillis: number,
39+
inMillis: number
40+
): Promise<ExportResponse> {
3741
return new Promise((resolve, reject) => {
3842
setTimeout(() => {
39-
this._transport.send(data).then(resolve, reject);
43+
this._transport.send(data, timeoutMillis).then(resolve, reject);
4044
}, inMillis);
4145
});
4246
}
4347

44-
async send(data: Uint8Array): Promise<ExportResponse> {
45-
let result = await this._transport.send(data);
48+
async send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
49+
const deadline = Date.now() + timeoutMillis;
50+
let result = await this._transport.send(data, timeoutMillis);
4651
let attempts = MAX_ATTEMPTS;
4752
let nextBackoff = INITIAL_BACKOFF;
4853

4954
while (result.status === 'retryable' && attempts > 0) {
5055
attempts--;
51-
const backoff = Math.min(nextBackoff, MAX_BACKOFF) + getJitter();
56+
57+
// use maximum of computed backoff and 0 to avoid negative timeouts
58+
const backoff = Math.max(
59+
Math.min(nextBackoff, MAX_BACKOFF) + getJitter(),
60+
0
61+
);
5262
nextBackoff = nextBackoff * BACKOFF_MULTIPLIER;
53-
result = await this.retry(data, result.retryInMillis ?? backoff);
63+
const retryInMillis = result.retryInMillis ?? backoff;
64+
65+
// return when expected retry time is after the export deadline.
66+
const remainingTimeoutMillis = deadline - Date.now();
67+
if (retryInMillis > remainingTimeoutMillis) {
68+
return result;
69+
}
70+
71+
result = await this.retry(data, remainingTimeoutMillis, retryInMillis);
5472
}
5573

5674
return result;

experimental/packages/otlp-exporter-base/test/common/retrying-transport.test.ts

Lines changed: 80 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
import * as sinon from 'sinon';
1818
import * as assert from 'assert';
1919
import { IExporterTransport } from '../../src';
20-
import { createRetryingTransport } from '../../src/retryable-transport';
20+
import { createRetryingTransport } from '../../src/retrying-transport';
2121
import { ExportResponse } from '../../src';
2222
import { assertRejects } from '../testHelper';
2323

24+
const timeoutMillis = 1000000;
25+
2426
describe('RetryingTransport', function () {
2527
describe('send', function () {
2628
it('does not retry when underlying transport succeeds', async function () {
@@ -39,10 +41,14 @@ describe('RetryingTransport', function () {
3941
const transport = createRetryingTransport({ transport: mockTransport });
4042

4143
// act
42-
const actualResponse = await transport.send(mockData);
44+
const actualResponse = await transport.send(mockData, timeoutMillis);
4345

4446
// assert
45-
sinon.assert.calledOnceWithExactly(transportStubs.send, mockData);
47+
sinon.assert.calledOnceWithExactly(
48+
transportStubs.send,
49+
mockData,
50+
timeoutMillis
51+
);
4652
assert.deepEqual(actualResponse, expectedResponse);
4753
});
4854

@@ -63,10 +69,14 @@ describe('RetryingTransport', function () {
6369
const transport = createRetryingTransport({ transport: mockTransport });
6470

6571
// act
66-
const actualResponse = await transport.send(mockData);
72+
const actualResponse = await transport.send(mockData, timeoutMillis);
6773

6874
// assert
69-
sinon.assert.calledOnceWithExactly(transportStubs.send, mockData);
75+
sinon.assert.calledOnceWithExactly(
76+
transportStubs.send,
77+
mockData,
78+
timeoutMillis
79+
);
7080
assert.deepEqual(actualResponse, expectedResponse);
7181
});
7282

@@ -84,10 +94,14 @@ describe('RetryingTransport', function () {
8494
const transport = createRetryingTransport({ transport: mockTransport });
8595

8696
// act
87-
await assertRejects(() => transport.send(mockData));
97+
await assertRejects(() => transport.send(mockData, timeoutMillis));
8898

8999
// assert
90-
sinon.assert.calledOnceWithExactly(transportStubs.send, mockData);
100+
sinon.assert.calledOnceWithExactly(
101+
transportStubs.send,
102+
mockData,
103+
timeoutMillis
104+
);
91105
});
92106

93107
it('does retry when the underlying transport returns retryable', async function () {
@@ -113,11 +127,19 @@ describe('RetryingTransport', function () {
113127
const transport = createRetryingTransport({ transport: mockTransport });
114128

115129
// act
116-
const actualResponse = await transport.send(mockData);
130+
const actualResponse = await transport.send(mockData, timeoutMillis);
117131

118132
// assert
119133
sinon.assert.calledTwice(transportStubs.send);
120-
sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData);
134+
sinon.assert.alwaysCalledWithMatch(
135+
transportStubs.send,
136+
mockData,
137+
sinon.match.number.and(
138+
sinon.match(value => {
139+
return value <= timeoutMillis;
140+
})
141+
)
142+
);
121143
assert.deepEqual(actualResponse, successResponse);
122144
});
123145

@@ -143,11 +165,19 @@ describe('RetryingTransport', function () {
143165
const transport = createRetryingTransport({ transport: mockTransport });
144166

145167
// act
146-
await assertRejects(() => transport.send(mockData));
168+
await assertRejects(() => transport.send(mockData, timeoutMillis));
147169

148170
// assert
149171
sinon.assert.calledTwice(transportStubs.send);
150-
sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData);
172+
sinon.assert.alwaysCalledWithMatch(
173+
transportStubs.send,
174+
mockData,
175+
sinon.match.number.and(
176+
sinon.match(value => {
177+
return value <= timeoutMillis;
178+
})
179+
)
180+
);
151181
});
152182

153183
it('does retry 5 times, then resolves as retryable', async function () {
@@ -169,11 +199,48 @@ describe('RetryingTransport', function () {
169199
const transport = createRetryingTransport({ transport: mockTransport });
170200

171201
// act
172-
const result = await transport.send(mockData);
202+
const result = await transport.send(mockData, timeoutMillis);
173203

174204
// assert
175205
sinon.assert.callCount(transportStubs.send, 6); // 1 initial try and 5 retries
176-
sinon.assert.alwaysCalledWithExactly(transportStubs.send, mockData);
206+
sinon.assert.alwaysCalledWithMatch(
207+
transportStubs.send,
208+
mockData,
209+
sinon.match.number.and(
210+
sinon.match(value => {
211+
return value <= timeoutMillis;
212+
})
213+
)
214+
);
215+
assert.strictEqual(result, retryResponse);
216+
});
217+
218+
it('does not retry when retryInMillis takes place after timeoutMillis', async function () {
219+
// arrange
220+
const retryResponse: ExportResponse = {
221+
status: 'retryable',
222+
retryInMillis: timeoutMillis + 100,
223+
};
224+
225+
const mockData = Uint8Array.from([1, 2, 3]);
226+
227+
const transportStubs = {
228+
send: sinon.stub().resolves(retryResponse),
229+
shutdown: sinon.stub(),
230+
};
231+
const mockTransport = <IExporterTransport>transportStubs;
232+
const transport = createRetryingTransport({ transport: mockTransport });
233+
234+
// act
235+
const result = await transport.send(mockData, timeoutMillis);
236+
237+
// assert
238+
// initial try, no retries.
239+
sinon.assert.calledOnceWithExactly(
240+
transportStubs.send,
241+
mockData,
242+
timeoutMillis
243+
);
177244
assert.strictEqual(result, retryResponse);
178245
});
179246
});

experimental/packages/otlp-grpc-exporter-base/src/OTLPGRPCExporterNodeBase.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ export abstract class OTLPGRPCExporterNodeBase<
9797
grpcName: grpcName,
9898
grpcPath: grpcPath,
9999
metadata: metadataProvider,
100-
timeoutMillis: this.timeoutMillis,
101100
});
102101
}
103102

@@ -126,16 +125,19 @@ export abstract class OTLPGRPCExporterNodeBase<
126125
return;
127126
}
128127

129-
const promise = this._transport.send(data).then(response => {
130-
if (response.status === 'success') {
131-
onSuccess();
132-
return;
133-
}
134-
if (response.status === 'failure' && response.error) {
135-
onError(response.error);
136-
}
137-
onError(new OTLPExporterError('Export failed with unknown error'));
138-
}, onError);
128+
const promise = this._transport
129+
.send(data, this.timeoutMillis)
130+
.then(response => {
131+
if (response.status === 'success') {
132+
onSuccess();
133+
} else if (response.status === 'failure' && response.error) {
134+
onError(response.error);
135+
} else if (response.status === 'retryable') {
136+
onError(new OTLPExporterError('Export failed with retryable status'));
137+
} else {
138+
onError(new OTLPExporterError('Export failed with unknown error'));
139+
}
140+
}, onError);
139141

140142
this._sendingPromises.push(promise);
141143
const popPromise = () => {

experimental/packages/otlp-grpc-exporter-base/src/grpc-exporter-transport.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ export interface GrpcExporterTransportParameters {
8888
*/
8989
metadata: () => Metadata;
9090
compression: 'gzip' | 'none';
91-
timeoutMillis: number;
9291
}
9392

9493
export class GrpcExporterTransport implements IExporterTransport {
@@ -101,7 +100,7 @@ export class GrpcExporterTransport implements IExporterTransport {
101100
this._client?.close();
102101
}
103102

104-
send(data: Uint8Array): Promise<ExportResponse> {
103+
send(data: Uint8Array, timeoutMillis: number): Promise<ExportResponse> {
105104
// We need to make a for gRPC
106105
const buffer = Buffer.from(data);
107106

@@ -145,9 +144,7 @@ export class GrpcExporterTransport implements IExporterTransport {
145144
}
146145

147146
return new Promise<ExportResponse>(resolve => {
148-
// this will always be defined
149-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
150-
const deadline = Date.now() + this._parameters.timeoutMillis;
147+
const deadline = Date.now() + timeoutMillis;
151148

152149
// this should never happen
153150
if (this._metadata == null) {

0 commit comments

Comments
 (0)