Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ ts-jest,dev,MIT,Copyright (c) 2016-2018 Kulshekhar Kabra
tslint,dev,Apache-2.0,"Copyright 2013-2019 Palantir Technologies, Inc."
typescript,dev,Apache-2.0,Copyright (c) Microsoft Corporation.
dc-polyfill,import,MIT,"Copyright (c) 2023 Datadog, Inc."
hot-shots,import,MIT,Copyright 2011 Steve Ivy. All rights reserved.
promise-retry,import,MIT,Copyright (c) 2014 IndigoUnited
serialize-error,import,MIT,Copyright (c) Sindre Sorhus <[email protected]> (https://sindresorhus.com)
shimmer,import,BSD-2-Clause,"Copyright (c) 2013-2019, Forrest L Norvell"
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
"dependencies": {
"@aws-crypto/sha256-js": "5.2.0",
"dc-polyfill": "^0.1.3",
"hot-shots": "8.5.0",
"promise-retry": "^2.0.1",
"serialize-error": "^8.1.0",
"shimmer": "1.2.1"
Expand Down
85 changes: 85 additions & 0 deletions src/metrics/dogstatsd.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import * as dgram from "node:dgram";
import { LambdaDogStatsD } from "./dogstatsd";

jest.mock("node:dgram", () => ({
createSocket: jest.fn(),
}));

describe("LambdaDogStatsD", () => {
let mockSend: jest.Mock;

beforeEach(() => {
// A send() that immediately calls its callback
mockSend = jest.fn((msg, port, host, cb) => cb());
(dgram.createSocket as jest.Mock).mockReturnValue({
send: mockSend,
getSendBufferSize: jest.fn().mockReturnValue(64 * 1024),
setSendBufferSize: jest.fn(),
});
});

afterEach(() => {
jest.clearAllMocks();
});

it("sends a distribution metric without tags or timestamp", async () => {
const client = new LambdaDogStatsD();
client.distribution("metric", 1);
await client.flush();

expect(mockSend).toHaveBeenCalledWith(Buffer.from("metric:1|d", "utf8"), 8125, "127.0.0.1", expect.any(Function));
});

it("sends with tags (sanitized) and timestamp", async () => {
const client = new LambdaDogStatsD();
client.distribution("metric2", 2, 12345, ["tag1", "bad?tag"]);
await client.flush();

// "bad?tag" becomes "bad_tag"
expect(mockSend).toHaveBeenCalledWith(
Buffer.from("metric2:2|d|#tag1,bad_tag|T12345", "utf8"),
8125,
"127.0.0.1",
expect.any(Function),
);
});

it("rounds timestamp", async () => {
const client = new LambdaDogStatsD();
client.distribution("metric2", 2, 12345.678);
await client.flush();

expect(mockSend).toHaveBeenCalledWith(
Buffer.from("metric2:2|d|T12345", "utf8"),
8125,
"127.0.0.1",
expect.any(Function),
);
});

it("flush() resolves immediately when there are no sends", async () => {
const client = new LambdaDogStatsD();
await expect(client.flush()).resolves.toBeUndefined();
});

it("flush() times out if a send never invokes its callback", async () => {
// replace socket.send with a never‐calling callback
(dgram.createSocket as jest.Mock).mockReturnValue({
send: jest.fn(), // never calls callback
getSendBufferSize: jest.fn(),
setSendBufferSize: jest.fn(),
});

const client = new LambdaDogStatsD();
client.distribution("will", 9);

jest.useFakeTimers();
const p = client.flush();
// advance past the 1000ms MAX_FLUSH_TIMEOUT
jest.advanceTimersByTime(1100);

// expect the Promise returned by flush() to resolve successfully
await expect(p).resolves.toBeUndefined();
jest.useRealTimers();
});
});
97 changes: 97 additions & 0 deletions src/metrics/dogstatsd.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import * as dgram from "node:dgram";
import { SocketType } from "node:dgram";
import { logDebug } from "../utils";

export class LambdaDogStatsD {
private static readonly HOST = "127.0.0.1";
private static readonly PORT = 8125;
private static readonly MIN_SEND_BUFFER_SIZE = 32 * 1024;
private static readonly ENCODING: BufferEncoding = "utf8";
private static readonly SOCKET_TYPE: SocketType = "udp4";
private static readonly TAG_RE = /[^\w\d_\-:\/\.]/gu;
private static readonly TAG_SUB = "_";
// The maximum amount to wait while flushing pending sends, so we don't block forever.
private static readonly MAX_FLUSH_TIMEOUT = 1000;

private readonly socket: dgram.Socket;
private readonly pendingSends = new Set<Promise<void>>();

constructor() {
this.socket = dgram.createSocket(LambdaDogStatsD.SOCKET_TYPE);
LambdaDogStatsD.ensureMinSendBufferSize(this.socket);
}

private static ensureMinSendBufferSize(sock: dgram.Socket): void {
if (process.platform === "win32") {
return;
}

try {
const currentSize = sock.getSendBufferSize();
if (currentSize <= LambdaDogStatsD.MIN_SEND_BUFFER_SIZE) {
sock.setSendBufferSize(LambdaDogStatsD.MIN_SEND_BUFFER_SIZE);
logDebug(`Socket send buffer increased to ${LambdaDogStatsD.MIN_SEND_BUFFER_SIZE / 1024}kb`);
}
} catch {
// ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a debug log

}
}

/**
* Send a distribution value, optionally setting tags and timestamp.
* Timestamp is seconds since epoch.
*/
public distribution(metric: string, value: number, timestamp?: number, tags?: string[]): void {
this.report(metric, "d", value, tags, timestamp);
}

private normalizeTags(tags: string[]): string[] {
return tags.map((t) => t.replace(LambdaDogStatsD.TAG_RE, LambdaDogStatsD.TAG_SUB));
}

private report(metric: string, metricType: string, value: number | null, tags?: string[], timestamp?: number): void {
if (value == null) {
return;
}

if (timestamp) {
timestamp = Math.floor(timestamp);
}

const serializedTags = tags && tags.length ? `|#${this.normalizeTags(tags).join(",")}` : "";
const timestampPart = timestamp != null ? `|T${timestamp}` : "";
const payload = `${metric}:${value}|${metricType}${serializedTags}${timestampPart}`;
this.send(payload);
}

private send(packet: string) {
const msg = Buffer.from(packet, LambdaDogStatsD.ENCODING);
const promise = new Promise<void>((resolve) => {
this.socket.send(msg, LambdaDogStatsD.PORT, LambdaDogStatsD.HOST, (err) => {
if (err) {
logDebug(`Unable to send metric packet: ${err.message}`);
}

resolve();
});
});

this.pendingSends.add(promise);
void promise.finally(() => this.pendingSends.delete(promise));
}

/** Block until all in-flight sends have settled */
public async flush(): Promise<void> {
const allSettled = Promise.allSettled(this.pendingSends);
const maxTimeout = new Promise<"timeout">((resolve) => {
setTimeout(() => resolve("timeout"), LambdaDogStatsD.MAX_FLUSH_TIMEOUT);
});

const winner = await Promise.race([allSettled, maxTimeout]);
if (winner === "timeout") {
logDebug("Timed out before sending all metric payloads");
}

this.pendingSends.clear();
}
}
38 changes: 23 additions & 15 deletions src/metrics/listener.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import { LogLevel, setLogLevel } from "../utils";
import { EXTENSION_URL } from "./extension";

import { MetricsListener } from "./listener";
import StatsDClient from "hot-shots";
import { LambdaDogStatsD } from "./dogstatsd";
import { Context } from "aws-lambda";
jest.mock("hot-shots");

jest.mock("./dogstatsd");

jest.mock("@aws-sdk/client-secrets-manager", () => {
return {
Expand All @@ -17,6 +18,9 @@ jest.mock("@aws-sdk/client-secrets-manager", () => {
};
});

const MOCK_TIME_SECONDS = 1487076708;
const MOCK_TIME_MS = 1487076708000;

const siteURL = "example.com";

class MockKMS {
Expand Down Expand Up @@ -56,6 +60,7 @@ describe("MetricsListener", () => {

expect(nock.isDone()).toBeTruthy();
});

it("uses encrypted kms key if it's the only value available", async () => {
nock("https://api.example.com").post("/api/v1/distribution_points?api_key=kms-api-key-decrypted").reply(200, {});

Expand Down Expand Up @@ -184,7 +189,7 @@ describe("MetricsListener", () => {

it("logs metrics when logForwarding is enabled", async () => {
const spy = jest.spyOn(process.stdout, "write");
jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
jest.spyOn(Date.prototype, "getTime").mockReturnValue(MOCK_TIME_MS);
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
apiKey: "api-key",
Expand All @@ -202,22 +207,23 @@ describe("MetricsListener", () => {
listener.sendDistributionMetric("my-metric", 10, false, "tag:a", "tag:b");
await listener.onCompleteInvocation();

expect(spy).toHaveBeenCalledWith(`{"e":1487076708,"m":"my-metric","t":["tag:a","tag:b"],"v":10}\n`);
expect(spy).toHaveBeenCalledWith(`{"e":${MOCK_TIME_SECONDS},"m":"my-metric","t":["tag:a","tag:b"],"v":10}\n`);
});

it("always sends metrics to statsD when extension is enabled, ignoring logForwarding=true", async () => {
const flushScope = nock(EXTENSION_URL).post("/lambda/flush", JSON.stringify({})).reply(200);
mock({
"/opt/extensions/datadog-agent": Buffer.from([0]),
});
const distributionMock = jest.fn();
(StatsDClient as any).mockImplementation(() => {
(LambdaDogStatsD as any).mockImplementation(() => {
return {
distribution: distributionMock,
close: (callback: any) => callback(undefined),
};
});

jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
jest.spyOn(Date.prototype, "getTime").mockReturnValue(MOCK_TIME_MS);

const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
Expand All @@ -236,25 +242,26 @@ describe("MetricsListener", () => {
listener.sendDistributionMetric("my-metric", 10, false, "tag:a", "tag:b");
await listener.onCompleteInvocation();
expect(flushScope.isDone()).toBeTruthy();
expect(distributionMock).toHaveBeenCalledWith("my-metric", 10, undefined, ["tag:a", "tag:b"]);
expect(distributionMock).toHaveBeenCalledWith("my-metric", 10, MOCK_TIME_SECONDS, ["tag:a", "tag:b"]);
});

it("only sends metrics with timestamps to the API when the extension is enabled", async () => {
it("sends metrics with timestamps to statsD (not API!) when the extension is enabled", async () => {
jest.spyOn(Date.prototype, "getTime").mockReturnValue(MOCK_TIME_MS);
const flushScope = nock(EXTENSION_URL).post("/lambda/flush", JSON.stringify({})).reply(200);
mock({
"/opt/extensions/datadog-agent": Buffer.from([0]),
});
const apiScope = nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {});

const distributionMock = jest.fn();
(StatsDClient as any).mockImplementation(() => {
(LambdaDogStatsD as any).mockImplementation(() => {
return {
distribution: distributionMock,
close: (callback: any) => callback(undefined),
};
});

const metricTimeOneMinuteAgo = new Date(Date.now() - 60000);
const metricTimeOneMinuteAgo = new Date(MOCK_TIME_MS - 60000);
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
apiKey: "api-key",
Expand All @@ -280,12 +287,15 @@ describe("MetricsListener", () => {
"tag:a",
"tag:b",
);
listener.sendDistributionMetric("my-metric-without-a-timestamp", 10, false, "tag:a", "tag:b");
listener.sendDistributionMetric("my-metric-with-a-timestamp", 10, false, "tag:a", "tag:b");
await listener.onCompleteInvocation();

expect(flushScope.isDone()).toBeTruthy();
expect(apiScope.isDone()).toBeTruthy();
expect(distributionMock).toHaveBeenCalledWith("my-metric-without-a-timestamp", 10, undefined, ["tag:a", "tag:b"]);
expect(apiScope.isDone()).toBeFalsy();
expect(distributionMock).toHaveBeenCalledWith("my-metric-with-a-timestamp", 10, MOCK_TIME_SECONDS, [
"tag:a",
"tag:b",
]);
});

it("does not send historical metrics from over 4 hours ago to the API", async () => {
Expand Down Expand Up @@ -316,7 +326,6 @@ describe("MetricsListener", () => {

it("logs metrics when logForwarding is enabled with custom timestamp", async () => {
const spy = jest.spyOn(process.stdout, "write");
// jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
apiKey: "api-key",
Expand All @@ -328,7 +337,6 @@ describe("MetricsListener", () => {
localTesting: false,
siteURL,
});
// jest.useFakeTimers();

await listener.onStartInvocation({});
listener.sendDistributionMetricWithDate("my-metric", 10, new Date(1584983836 * 1000), false, "tag:a", "tag:b");
Expand Down
Loading