Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ ts-jest,dev,MIT,Copyright (c) 2016-2018 Kulshekhar Kabra
tslint,dev,Apache-2.0,"Copyright 2013-2019 Palantir Technologies, Inc."
typescript,dev,Apache-2.0,Copyright (c) Microsoft Corporation.
dc-polyfill,import,MIT,"Copyright (c) 2023 Datadog, Inc."
hot-shots,import,MIT,Copyright 2011 Steve Ivy. All rights reserved.
promise-retry,import,MIT,Copyright (c) 2014 IndigoUnited
serialize-error,import,MIT,Copyright (c) Sindre Sorhus <[email protected]> (https://sindresorhus.com)
shimmer,import,BSD-2-Clause,"Copyright (c) 2013-2019, Forrest L Norvell"
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
"dependencies": {
"@aws-crypto/sha256-js": "5.2.0",
"dc-polyfill": "^0.1.3",
"hot-shots": "8.5.0",
"promise-retry": "^2.0.1",
"serialize-error": "^8.1.0",
"shimmer": "1.2.1"
Expand Down
2 changes: 1 addition & 1 deletion src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ describe("datadog", () => {

expect(mockedIncrementInvocations).toBeCalledTimes(1);
expect(mockedIncrementInvocations).toBeCalledWith(expect.anything(), mockContext);
expect(logger.debug).toHaveBeenCalledTimes(8);
expect(logger.debug).toHaveBeenCalledTimes(9);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we get another debug log for increasing buffer send size

expect(logger.debug).toHaveBeenLastCalledWith('{"status":"debug","message":"datadog:Unpatching HTTP libraries"}');
});

Expand Down
87 changes: 87 additions & 0 deletions src/metrics/dogstatsd.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import * as dgram from "node:dgram";
import { LambdaDogStatsD } from "./dogstatsd";

jest.mock("node:dgram", () => ({
createSocket: jest.fn(),
}));

describe("LambdaDogStatsD", () => {
let mockSend: jest.Mock;

beforeEach(() => {
// A send() that immediately calls its callback
mockSend = jest.fn((msg, port, host, cb) => cb());
(dgram.createSocket as jest.Mock).mockReturnValue({
send: mockSend,
getSendBufferSize: jest.fn().mockReturnValue(64 * 1024),
setSendBufferSize: jest.fn(),
bind: jest.fn(),
});
});

afterEach(() => {
jest.clearAllMocks();
});

it("sends a distribution metric without tags or timestamp", async () => {
const client = new LambdaDogStatsD();
client.distribution("metric", 1);
await client.flush();

expect(mockSend).toHaveBeenCalledWith(Buffer.from("metric:1|d", "utf8"), 8125, "127.0.0.1", expect.any(Function));
});

it("sends with tags (sanitized) and timestamp", async () => {
const client = new LambdaDogStatsD();
client.distribution("metric2", 2, 12345, ["tag1", "bad?tag"]);
await client.flush();

// "bad?tag" becomes "bad_tag"
expect(mockSend).toHaveBeenCalledWith(
Buffer.from("metric2:2|d|#tag1,bad_tag|T12345", "utf8"),
8125,
"127.0.0.1",
expect.any(Function),
);
});

it("rounds timestamp", async () => {
const client = new LambdaDogStatsD();
client.distribution("metric2", 2, 12345.678);
await client.flush();

expect(mockSend).toHaveBeenCalledWith(
Buffer.from("metric2:2|d|T12345", "utf8"),
8125,
"127.0.0.1",
expect.any(Function),
);
});

it("flush() resolves immediately when there are no sends", async () => {
const client = new LambdaDogStatsD();
await expect(client.flush()).resolves.toBeUndefined();
});

it("flush() times out if a send never invokes its callback", async () => {
// replace socket.send with a never‐calling callback
(dgram.createSocket as jest.Mock).mockReturnValue({
send: jest.fn(), // never calls callback
getSendBufferSize: jest.fn(),
setSendBufferSize: jest.fn(),
bind: jest.fn(),
});

const client = new LambdaDogStatsD();
client.distribution("will", 9);

jest.useFakeTimers();
const p = client.flush();
// advance past the 1000ms MAX_FLUSH_TIMEOUT
jest.advanceTimersByTime(1100);

// expect the Promise returned by flush() to resolve successfully
await expect(p).resolves.toBeUndefined();
jest.useRealTimers();
});
});
100 changes: 100 additions & 0 deletions src/metrics/dogstatsd.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import * as dgram from "node:dgram";
import { SocketType } from "node:dgram";
import { logDebug } from "../utils";

export class LambdaDogStatsD {
private static readonly HOST = "127.0.0.1";
private static readonly PORT = 8125;
private static readonly MIN_SEND_BUFFER_SIZE = 32 * 1024;
private static readonly ENCODING: BufferEncoding = "utf8";
private static readonly SOCKET_TYPE: SocketType = "udp4";
private static readonly TAG_RE = /[^\w\d_\-:\/\.]/gu;
private static readonly TAG_SUB = "_";
// The maximum amount to wait while flushing pending sends, so we don't block forever.
private static readonly MAX_FLUSH_TIMEOUT = 1000;

private readonly socket: dgram.Socket;
private readonly pendingSends = new Set<Promise<void>>();

constructor() {
this.socket = dgram.createSocket(LambdaDogStatsD.SOCKET_TYPE);
// Bind to a local port so we can set the socket’s send buffer size
this.socket.bind(0, () => {
LambdaDogStatsD.ensureMinSendBufferSize(this.socket);
});
}

private static ensureMinSendBufferSize(sock: dgram.Socket): void {
if (process.platform === "win32") {
return;
}

try {
const currentSize = sock.getSendBufferSize();
if (currentSize <= LambdaDogStatsD.MIN_SEND_BUFFER_SIZE) {
sock.setSendBufferSize(LambdaDogStatsD.MIN_SEND_BUFFER_SIZE);
logDebug(`Socket send buffer increased to ${LambdaDogStatsD.MIN_SEND_BUFFER_SIZE / 1024}kb`);
}
} catch {
logDebug("Unable to set socket's send buffer size")
}
}

/**
* Send a distribution value, optionally setting tags and timestamp.
* Timestamp is seconds since epoch.
*/
public distribution(metric: string, value: number, timestamp?: number, tags?: string[]): void {
this.report(metric, "d", value, tags, timestamp);
}

private normalizeTags(tags: string[]): string[] {
return tags.map((t) => t.replace(LambdaDogStatsD.TAG_RE, LambdaDogStatsD.TAG_SUB));
}

private report(metric: string, metricType: string, value: number | null, tags?: string[], timestamp?: number): void {
if (value == null) {
return;
}

if (timestamp) {
timestamp = Math.floor(timestamp);
}

const serializedTags = tags && tags.length ? `|#${this.normalizeTags(tags).join(",")}` : "";
const timestampPart = timestamp != null ? `|T${timestamp}` : "";
const payload = `${metric}:${value}|${metricType}${serializedTags}${timestampPart}`;
this.send(payload);
}

private send(packet: string) {
const msg = Buffer.from(packet, LambdaDogStatsD.ENCODING);
const promise = new Promise<void>((resolve) => {
this.socket.send(msg, LambdaDogStatsD.PORT, LambdaDogStatsD.HOST, (err) => {
if (err) {
logDebug(`Unable to send metric packet: ${err.message}`);
}

resolve();
});
});

this.pendingSends.add(promise);
void promise.finally(() => this.pendingSends.delete(promise));
}

/** Block until all in-flight sends have settled */
public async flush(): Promise<void> {
const allSettled = Promise.allSettled(this.pendingSends);
const maxTimeout = new Promise<"timeout">((resolve) => {
setTimeout(() => resolve("timeout"), LambdaDogStatsD.MAX_FLUSH_TIMEOUT);
});

const winner = await Promise.race([allSettled, maxTimeout]);
if (winner === "timeout") {
logDebug("Timed out before sending all metric payloads");
}

this.pendingSends.clear();
}
}
38 changes: 23 additions & 15 deletions src/metrics/listener.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import { LogLevel, setLogLevel } from "../utils";
import { EXTENSION_URL } from "./extension";

import { MetricsListener } from "./listener";
import StatsDClient from "hot-shots";
import { LambdaDogStatsD } from "./dogstatsd";
import { Context } from "aws-lambda";
jest.mock("hot-shots");

jest.mock("./dogstatsd");

jest.mock("@aws-sdk/client-secrets-manager", () => {
return {
Expand All @@ -17,6 +18,9 @@ jest.mock("@aws-sdk/client-secrets-manager", () => {
};
});

const MOCK_TIME_SECONDS = 1487076708;
const MOCK_TIME_MS = 1487076708000;

const siteURL = "example.com";

class MockKMS {
Expand Down Expand Up @@ -56,6 +60,7 @@ describe("MetricsListener", () => {

expect(nock.isDone()).toBeTruthy();
});

it("uses encrypted kms key if it's the only value available", async () => {
nock("https://api.example.com").post("/api/v1/distribution_points?api_key=kms-api-key-decrypted").reply(200, {});

Expand Down Expand Up @@ -184,7 +189,7 @@ describe("MetricsListener", () => {

it("logs metrics when logForwarding is enabled", async () => {
const spy = jest.spyOn(process.stdout, "write");
jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
jest.spyOn(Date.prototype, "getTime").mockReturnValue(MOCK_TIME_MS);
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
apiKey: "api-key",
Expand All @@ -202,22 +207,23 @@ describe("MetricsListener", () => {
listener.sendDistributionMetric("my-metric", 10, false, "tag:a", "tag:b");
await listener.onCompleteInvocation();

expect(spy).toHaveBeenCalledWith(`{"e":1487076708,"m":"my-metric","t":["tag:a","tag:b"],"v":10}\n`);
expect(spy).toHaveBeenCalledWith(`{"e":${MOCK_TIME_SECONDS},"m":"my-metric","t":["tag:a","tag:b"],"v":10}\n`);
});

it("always sends metrics to statsD when extension is enabled, ignoring logForwarding=true", async () => {
const flushScope = nock(EXTENSION_URL).post("/lambda/flush", JSON.stringify({})).reply(200);
mock({
"/opt/extensions/datadog-agent": Buffer.from([0]),
});
const distributionMock = jest.fn();
(StatsDClient as any).mockImplementation(() => {
(LambdaDogStatsD as any).mockImplementation(() => {
return {
distribution: distributionMock,
close: (callback: any) => callback(undefined),
};
});

jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
jest.spyOn(Date.prototype, "getTime").mockReturnValue(MOCK_TIME_MS);

const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
Expand All @@ -236,25 +242,26 @@ describe("MetricsListener", () => {
listener.sendDistributionMetric("my-metric", 10, false, "tag:a", "tag:b");
await listener.onCompleteInvocation();
expect(flushScope.isDone()).toBeTruthy();
expect(distributionMock).toHaveBeenCalledWith("my-metric", 10, undefined, ["tag:a", "tag:b"]);
expect(distributionMock).toHaveBeenCalledWith("my-metric", 10, MOCK_TIME_SECONDS, ["tag:a", "tag:b"]);
});

it("only sends metrics with timestamps to the API when the extension is enabled", async () => {
it("sends metrics with timestamps to statsD (not API!) when the extension is enabled", async () => {
jest.spyOn(Date.prototype, "getTime").mockReturnValue(MOCK_TIME_MS);
const flushScope = nock(EXTENSION_URL).post("/lambda/flush", JSON.stringify({})).reply(200);
mock({
"/opt/extensions/datadog-agent": Buffer.from([0]),
});
const apiScope = nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {});

const distributionMock = jest.fn();
(StatsDClient as any).mockImplementation(() => {
(LambdaDogStatsD as any).mockImplementation(() => {
return {
distribution: distributionMock,
close: (callback: any) => callback(undefined),
};
});

const metricTimeOneMinuteAgo = new Date(Date.now() - 60000);
const metricTimeOneMinuteAgo = new Date(MOCK_TIME_MS - 60000);
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
apiKey: "api-key",
Expand All @@ -280,12 +287,15 @@ describe("MetricsListener", () => {
"tag:a",
"tag:b",
);
listener.sendDistributionMetric("my-metric-without-a-timestamp", 10, false, "tag:a", "tag:b");
listener.sendDistributionMetric("my-metric-with-a-timestamp", 10, false, "tag:a", "tag:b");
await listener.onCompleteInvocation();

expect(flushScope.isDone()).toBeTruthy();
expect(apiScope.isDone()).toBeTruthy();
expect(distributionMock).toHaveBeenCalledWith("my-metric-without-a-timestamp", 10, undefined, ["tag:a", "tag:b"]);
expect(apiScope.isDone()).toBeFalsy();
expect(distributionMock).toHaveBeenCalledWith("my-metric-with-a-timestamp", 10, MOCK_TIME_SECONDS, [
"tag:a",
"tag:b",
]);
});

it("does not send historical metrics from over 4 hours ago to the API", async () => {
Expand Down Expand Up @@ -316,7 +326,6 @@ describe("MetricsListener", () => {

it("logs metrics when logForwarding is enabled with custom timestamp", async () => {
const spy = jest.spyOn(process.stdout, "write");
// jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
apiKey: "api-key",
Expand All @@ -328,7 +337,6 @@ describe("MetricsListener", () => {
localTesting: false,
siteURL,
});
// jest.useFakeTimers();

await listener.onStartInvocation({});
listener.sendDistributionMetricWithDate("my-metric", 10, new Date(1584983836 * 1000), false, "tag:a", "tag:b");
Expand Down
Loading
Loading