From bd7882dca7d91fcc83290cbd640fe33d2016c5ea Mon Sep 17 00:00:00 2001 From: Matt Straathof Date: Tue, 19 Sep 2023 14:30:50 -0700 Subject: [PATCH] feat: add new metrics setup for goodmetrics client --- README.md | 21 ++-- package.json | 2 +- src/dev-driver.ts | 53 --------- src/goodmetrics/_Metrics.ts | 23 +++- src/goodmetrics/data/StatisticSet.ts | 9 ++ .../downstream/goodmetricsClient.ts | 108 ++++++++++++++++++ src/goodmetrics/metricsFactory.ts | 29 +++-- src/goodmetrics/metricsSetups.ts | 101 ++++++++++++++++ src/goodmetrics/pipeline/aggregator.ts | 68 +++++++++++ src/index.ts | 6 +- test/dev-driver.ts | 37 ++++++ tsconfig.json | 3 +- 12 files changed, 380 insertions(+), 80 deletions(-) delete mode 100644 src/dev-driver.ts create mode 100644 src/goodmetrics/downstream/goodmetricsClient.ts create mode 100644 test/dev-driver.ts diff --git a/README.md b/README.md index 772d039..bd62d5c 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,7 @@ example usage ```javascript -import {MetricsSetups} from './goodmetrics/metricsSetups'; -import {Dimension} from './goodmetrics/_Metrics'; -import {TimestampAt} from './goodmetrics/metricsFactory'; +import {Dimension, MetricsSetups} from 'goodmetrics-nodejs'; const delay = async (ms: number): Promise => { return await new Promise(resolve => { @@ -22,13 +20,16 @@ const main = async () => { prescientDimensions: new Map(), }); - await metrics.record('my_metric', TimestampAt.Start, async metrics => { - console.info('inside metrics block'); - metrics.measure('runs', 1); - await delay(100); - metrics.dimension('result', 'success'); - }); + await metrics.unaryMetricsFactory.record( + {name: 'test'}, + async metrics => { + console.info('inside metrics block'); + metrics.measure('runs', 1); + await delay(100); + metrics.dimension('result', 'success'); + } + ); }; main().finally(); -``` \ No newline at end of file +``` diff --git a/package.json b/package.json index f653989..8ceb3d2 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "build": "tsc", "lint": "eslint . --ext .ts", "format": "eslint . --ext .ts --fix", - "dev-driver": "ts-node --prefer-ts-exts src/dev-driver.ts" + "dev-driver": "ts-node --prefer-ts-exts test/dev-driver.ts" }, "main": "dist/index.js", "files": [ diff --git a/src/dev-driver.ts b/src/dev-driver.ts deleted file mode 100644 index d30851d..0000000 --- a/src/dev-driver.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { - Header, - HeaderInterceptorProvider, -} from './goodmetrics/downstream/grpc/headerInterceptor'; -import {OpenTelemetryClient} from './goodmetrics/downstream/openTelemetryClient'; -import {MetricsSetups} from './goodmetrics/metricsSetups'; -import {_Metrics, Metrics} from './goodmetrics/_Metrics'; -import {TimestampAt} from './goodmetrics/metricsFactory'; - -const delay = async (ms: number) => { - return await new Promise(resolve => { - setTimeout(() => { - resolve(); - }, ms); - }); -}; - -const main = async () => { - const headers = [new Header('lightstep-access-token', '')]; - const client = OpenTelemetryClient.connect({ - sillyOtlpHostname: 'ingest.lightstep.com', - port: 443, - metricDimensions: new Map(), - resourceDimensions: new Map(), - interceptors: [ - new HeaderInterceptorProvider(headers).createHeadersInterceptor(), - ], - }); - - const metrics = MetricsSetups.lightstepNativeOtlp({ - aggregationWidthMillis: 10 * 1000, - lightstepAccessToken: '', - logError(message: string, error: unknown): void { - console.error(message, error); - }, - onSendUnary(metrics: Metrics[]): void { - console.log('sending unary', metrics); - }, - }); - - await delay(8000); - await metrics.unaryMetricsFactory.record( - 'test', - TimestampAt.Start, - metrics => { - metrics.measure('runs', 1); - } - ); - - await delay(15000); -}; - -main().finally(); diff --git a/src/goodmetrics/_Metrics.ts b/src/goodmetrics/_Metrics.ts index 4b6d740..cb3d3e4 100644 --- a/src/goodmetrics/_Metrics.ts +++ b/src/goodmetrics/_Metrics.ts @@ -8,6 +8,7 @@ import Histogram = otlp_metrics.opentelemetry.proto.metrics.v1.Histogram; import HistogramDataPoint = otlp_metrics.opentelemetry.proto.metrics.v1.HistogramDataPoint; import AggregationTemporality = otlp_metrics.opentelemetry.proto.metrics.v1.AggregationTemporality; import {newNumberDataPoint} from './data/otlp/numbersDataPoint'; +import {goodmetrics} from 'goodmetrics-generated'; export enum MetricsBehavior { DEFAULT = 'default', @@ -21,6 +22,8 @@ export abstract class Dimension { } abstract asOtlpKeyValue(): KeyValue; + + abstract asGoodmetricsDimension(): goodmetrics.Dimension; } export class StringDimension extends Dimension { @@ -34,6 +37,12 @@ export class StringDimension extends Dimension { const value = new AnyValue({string_value: this.value}); return new KeyValue({key: this.name, value}); } + + asGoodmetricsDimension(): goodmetrics.Dimension { + return new goodmetrics.Dimension({ + string: this.value, + }); + } } export class NumberDimension extends Dimension { @@ -47,6 +56,12 @@ export class NumberDimension extends Dimension { const value = new AnyValue({int_value: this.value}); return new KeyValue({key: this.name, value}); } + + asGoodmetricsDimension(): goodmetrics.Dimension { + return new goodmetrics.Dimension({ + number: this.value, + }); + } } export class BooleanDimension extends Dimension { @@ -60,6 +75,12 @@ export class BooleanDimension extends Dimension { const value = new AnyValue({bool_value: this.value}); return new KeyValue({key: this.name, value}); } + + asGoodmetricsDimension(): goodmetrics.Dimension { + return new goodmetrics.Dimension({ + boolean: this.value, + }); + } } interface ViewProps { @@ -109,7 +130,7 @@ export class _Metrics implements Metrics { readonly metricsBehavior: MetricsBehavior; readonly metricMeasurements: Map = new Map(); readonly metricDistributions: Map = new Map(); - private readonly metricDimensions: Map = new Map(); + readonly metricDimensions: Map = new Map(); constructor(props: MetricsProps) { this.name = props.name; this.timestampMillis = props.timestampMillis; diff --git a/src/goodmetrics/data/StatisticSet.ts b/src/goodmetrics/data/StatisticSet.ts index bb0a216..18a1392 100644 --- a/src/goodmetrics/data/StatisticSet.ts +++ b/src/goodmetrics/data/StatisticSet.ts @@ -62,6 +62,15 @@ export class StatisticSet extends Aggregation { }); } + values() { + return { + min: this.min, + max: this.max, + count: this.count, + sum: this.sum, + }; + } + private statisticSetDataPoint(props: StatisticSetDataPointProps): Metric { return new Metric({ name: `${props.metricName}_${props.measurementName}_${props.statisticSetComponent}`, diff --git a/src/goodmetrics/downstream/goodmetricsClient.ts b/src/goodmetrics/downstream/goodmetricsClient.ts new file mode 100644 index 0000000..20894a1 --- /dev/null +++ b/src/goodmetrics/downstream/goodmetricsClient.ts @@ -0,0 +1,108 @@ +import {_Metrics, Dimension} from '../_Metrics'; +import {goodmetrics} from 'goodmetrics-generated'; +import MetricsClient = goodmetrics.MetricsClient; +import {ChannelCredentials, Interceptor} from '@grpc/grpc-js'; +import MetricsRequest = goodmetrics.MetricsRequest; +import {AggregatedBatch} from '../pipeline/aggregator'; + +interface GoodmetricsClientProps { + address: string; +} +interface GoodmetricsConnectProps { + hostname: string; + port: number; +} + +export class GoodmetricsClient { + private readonly prescientDimensions: Map; + private readonly client: MetricsClient; + private readonly interceptors: Interceptor[]; + private constructor(props: GoodmetricsClientProps) { + this.client = new MetricsClient( + props.address, + ChannelCredentials.createSsl() + ); + this.prescientDimensions = new Map(); + this.interceptors = []; + } + + static connect(props: GoodmetricsConnectProps): GoodmetricsClient { + return new GoodmetricsClient({ + address: `${props.hostname}:${props.port}`, + }); + } + + async sendMetricsBatch(metrics: _Metrics[]): Promise { + const request = new MetricsRequest({ + shared_dimensions: this.prescientDimensionsToProto(), + metrics: this.metricsToGoodmetrics(metrics), + }); + + return await new Promise((resolve, reject) => { + this.client.SendMetrics(request, {interceptors: this.interceptors}, e => { + if (!e) { + resolve(); + } else { + reject(e); + } + }); + }); + } + + async sendPreaggregatedMetrics( + aggregatedBatch: AggregatedBatch[] + ): Promise { + const datums: goodmetrics.Datum[] = []; + for (const batch of aggregatedBatch) { + datums.push(...batch.asGoodmetrics()); + } + const request = new MetricsRequest({ + shared_dimensions: this.prescientDimensionsToProto(), + metrics: datums, + }); + + return await new Promise((resolve, reject) => { + this.client.SendMetrics(request, {interceptors: this.interceptors}, e => { + if (!e) { + resolve(); + } else { + reject(e); + } + }); + }); + } + + private prescientDimensionsToProto(): Map { + const dimensionsMap = new Map(); + for (const [key, dimen] of this.prescientDimensions) { + dimensionsMap.set(key, dimen.asGoodmetricsDimension()); + } + return new Map(); + } + + private metricsToGoodmetrics(metrics: _Metrics[]): goodmetrics.Datum[] { + return metrics.map(metric => { + const dimensionsMap = new Map(); + const measurementsMap = new Map(); + for (const [key, dimen] of metric.metricDimensions) { + dimensionsMap.set(key, dimen.asGoodmetricsDimension()); + } + for (const [key, value] of metric.metricMeasurements) { + const _int = Number.isInteger(value) ? value : undefined; + if (_int) { + measurementsMap.set(key, new goodmetrics.Measurement({i64: _int})); + } else { + measurementsMap.set(key, new goodmetrics.Measurement({f64: value})); + } + } + for (const [key, value] of metric.metricDistributions) { + measurementsMap.set(key, new goodmetrics.Measurement({i64: value})); + } + return new goodmetrics.Datum({ + metric: metric.name, + dimensions: dimensionsMap, + measurements: measurementsMap, + }); + }); + } +} diff --git a/src/goodmetrics/metricsFactory.ts b/src/goodmetrics/metricsFactory.ts index fd58078..aebfd63 100644 --- a/src/goodmetrics/metricsFactory.ts +++ b/src/goodmetrics/metricsFactory.ts @@ -31,6 +31,17 @@ interface Props { totalTimeType: TotaltimeType; } +interface RecordOptions { + name: string; + stampAt?: TimestampAt; +} + +interface RecordWithBehaviorOptions { + name: string; + stampAt?: TimestampAt; + behavior: MetricsBehavior; +} + export class MetricsFactory { protected readonly metricsSink: MetricsSink; private readonly totalTimeType: TotaltimeType; @@ -66,25 +77,25 @@ export class MetricsFactory { } async record( - name: string, - stampAt: TimestampAt, + options: RecordOptions, block: (metrics: Metrics) => Promise | T ): Promise { return await this.recordWithBehavior( - name, - stampAt, - MetricsBehavior.DEFAULT, + { + name: options.name, + stampAt: options.stampAt, + behavior: MetricsBehavior.DEFAULT, + }, block ); } async recordWithBehavior( - name: string, - stampAt: TimestampAt, - metricsBehavior: MetricsBehavior, + options: RecordWithBehaviorOptions, block: (metrics: Metrics) => Promise | T ): Promise { - const metrics = this.getMetrics(name, stampAt, metricsBehavior); + const stampAt = options.stampAt ?? TimestampAt.Start; + const metrics = this.getMetrics(options.name, stampAt, options.behavior); try { const res = await block(metrics); return res; diff --git a/src/goodmetrics/metricsSetups.ts b/src/goodmetrics/metricsSetups.ts index 115312b..7d24cc8 100644 --- a/src/goodmetrics/metricsSetups.ts +++ b/src/goodmetrics/metricsSetups.ts @@ -15,6 +15,7 @@ import {Batcher} from './pipeline/batcher'; // @ts-ignore import * as csp from 'js-csp'; import {AggregatedBatch, Aggregator} from './pipeline/aggregator'; +import {GoodmetricsClient} from './downstream/goodmetricsClient'; export interface ConfiguredMetrics { unaryMetricsFactory: MetricsFactory; @@ -76,7 +77,32 @@ interface LightstepNativeOtlpProps { onSendPreaggregated?: (aggregatedBatch: AggregatedBatch[]) => void; } +interface GoodmetricsSetupProps { + host?: string; + port?: number; + aggregationWidthMillis?: number; +} + export class MetricsSetups { + static goodMetrics(props: GoodmetricsSetupProps): ConfiguredMetrics { + const host = props.host ?? 'localhost'; + const port = props.port ?? 9573; + const aggregationWidthMillis = props.aggregationWidthMillis ?? 10 * 1000; + const unaryFactory = this.configureGoodmetricsUnaryFactory({ + host: host, + port: port, + }); + const preaggregatedFactory = this.configureGoodmetricsPreaggregatedFactory({ + host, + port, + aggregationWidthMillis, + }); + + return { + unaryMetricsFactory: unaryFactory, + preaggregatedMetricsFactory: preaggregatedFactory, + }; + } static lightstepNativeOtlp( props: LightstepNativeOtlpProps ): ConfiguredMetrics { @@ -236,4 +262,79 @@ export class MetricsSetups { ], }); } + + private static configureGoodmetricsUnaryFactory(props: { + host: string; + port: number; + }): MetricsFactory { + const unaryClient = GoodmetricsClient.connect({ + hostname: props.host, + port: props.port, + }); + const unarySink = new SynchronizingBuffer(); + const unaryFactory = new MetricsFactory({ + metricsSink: unarySink, + totalTimeType: TotaltimeType.DistributionMilliseconds, + }); + const unaryBatcher = new Batcher({upstream: unarySink}); + + // launching coroutine to process unary metric batches + // eslint-disable-next-line @typescript-eslint/no-unsafe-call,@typescript-eslint/no-unsafe-member-access + csp.go(async function* () { + for await (const batch of unaryBatcher.consume()) { + if (batch.length === 0) { + console.log('batch array has no length, no metrics to send'); + yield; + continue; + } + try { + await unaryClient.sendMetricsBatch(batch); + } catch (e) { + console.error('failed to send unary batch', e); + } + yield; + } + }); + + return unaryFactory; + } + + private static configureGoodmetricsPreaggregatedFactory(props: { + host: string; + port: number; + aggregationWidthMillis: number; + }): MetricsFactory { + const preaggregatedClient = GoodmetricsClient.connect({ + hostname: props.host ?? 'localhost', + port: props.port ?? 9573, + }); + const preaggregatedSink = new Aggregator({ + aggregationWidthMillis: props.aggregationWidthMillis, + }); + const preaggregatedFactory = new MetricsFactory({ + metricsSink: preaggregatedSink, + totalTimeType: TotaltimeType.DistributionMilliseconds, + }); + const preaggregatedBatcher = new Batcher({upstream: preaggregatedSink}); + + // launching coroutine to process unary metric batches + // eslint-disable-next-line @typescript-eslint/no-unsafe-call,@typescript-eslint/no-unsafe-member-access + csp.go(async function* () { + for await (const batch of preaggregatedBatcher.consume()) { + if (batch.length === 0) { + console.log('batch array has no length, no metrics to send'); + yield; + continue; + } + try { + await preaggregatedClient.sendPreaggregatedMetrics(batch); + } catch (e) { + console.error('failed to send preaggregated batch', e); + } + yield; + } + }); + + return preaggregatedFactory; + } } diff --git a/src/goodmetrics/pipeline/aggregator.ts b/src/goodmetrics/pipeline/aggregator.ts index 2bf150d..bb12449 100644 --- a/src/goodmetrics/pipeline/aggregator.ts +++ b/src/goodmetrics/pipeline/aggregator.ts @@ -9,6 +9,7 @@ import {_Metrics, Dimension} from '../_Metrics'; import {MetricsPipeline} from './metricsPipeline'; import {MetricsSink} from './metricsSink'; import {CancellationToken} from './cancellationToken'; +import {goodmetrics} from 'goodmetrics-generated'; type DimensionPosition = Set; @@ -86,6 +87,43 @@ export class AggregatedBatch { }); } + asGoodmetrics(): goodmetrics.Datum[] { + const datums: goodmetrics.Datum[] = []; + for (const [dimensionPosition, measurementMap] of this.positions) { + const templateDatum = this.initializeGoodMetricPositionDatum( + dimensionPosition, + this.timestampMillis, + this.metric + ); + for (const [measurement, aggregation] of measurementMap) { + templateDatum.measurements.set( + measurement, + this.aggregationAsGoodmetricsProto(aggregation) + ); + } + datums.push(templateDatum); + } + + return datums; + } + + private initializeGoodMetricPositionDatum( + dimensionPosition: MetricPosition, + timestampMillis: number, + name: string + ): goodmetrics.Datum { + const dimensionsMap = new Map(); + for (const position of dimensionPosition) { + dimensionsMap.set(position.name, position.asGoodmetricsDimension()); + } + + return new goodmetrics.Datum({ + unix_nanos: Math.floor(timestampMillis * 1000 * 1000), + metric: name, + dimensions: dimensionsMap, + }); + } + private asGoofyOtlpMetricSequence(): Metric[] { const metricsWeCareAbout: Metric[] = []; this.positions.forEach((measurements, metricPositions) => { @@ -121,6 +159,36 @@ export class AggregatedBatch { return metricsWeCareAbout; } + + private aggregationAsGoodmetricsProto( + aggregation: Aggregation + ): goodmetrics.Measurement { + if (aggregation instanceof Histogram) { + const buckets = new Map(); + for (const [bucket, count] of aggregation.bucketCounts) { + buckets.set(bucket, count); + } + return new goodmetrics.Measurement({ + histogram: new goodmetrics.Histogram({ + buckets, + }), + }); + } else if (aggregation instanceof StatisticSet) { + const aggreValues = aggregation.values(); + return new goodmetrics.Measurement({ + statistic_set: new goodmetrics.StatisticSet({ + samplecount: aggreValues.count, + samplesum: aggreValues.sum, + minimum: aggreValues.min, + maximum: aggreValues.max, + }), + }); + } else { + throw new Error( + 'cannot convert aggregation into a goodmetrics proto, unknown type' + ); + } + } } type AggregatorProps = { diff --git a/src/index.ts b/src/index.ts index 4e186af..ceb9ce8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,4 @@ -import { - OpenTelemetryClient, - SecurityMode, -} from './goodmetrics/downstream/openTelemetryClient'; +import {SecurityMode} from './goodmetrics/downstream/openTelemetryClient'; import {Dimension} from './goodmetrics/_Metrics'; import {MetricsSink} from './goodmetrics/pipeline/metricsSink'; import { @@ -11,7 +8,6 @@ import { } from './goodmetrics/metricsFactory'; import {MetricsSetups} from './goodmetrics/metricsSetups'; export { - OpenTelemetryClient, Dimension, SecurityMode, MetricsSink, diff --git a/test/dev-driver.ts b/test/dev-driver.ts new file mode 100644 index 0000000..98ce1c4 --- /dev/null +++ b/test/dev-driver.ts @@ -0,0 +1,37 @@ +import {MetricsSetups} from '../src/goodmetrics/metricsSetups'; +import {Metrics} from '../src/goodmetrics/_Metrics'; + +const delay = async (ms: number) => { + return await new Promise(resolve => { + setTimeout(() => { + resolve(); + }, ms); + }); +}; + +const main = async () => { + const metrics = MetricsSetups.lightstepNativeOtlp({ + aggregationWidthMillis: 10 * 1000, + lightstepAccessToken: '', + logError(message: string, error: unknown): void { + console.error(message, error); + }, + onSendUnary(metrics: Metrics[]): void { + console.log('sending unary', metrics); + }, + }); + + await delay(8000); + await metrics.unaryMetricsFactory.record( + { + name: 'test', + }, + metrics => { + metrics.measure('runs', 1); + } + ); + + await delay(15000); +}; + +main().finally(); diff --git a/tsconfig.json b/tsconfig.json index dc7b97b..c1ff38a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -21,7 +21,8 @@ "outDir": "dist" }, "include": [ - "src" + "src", + "test" ], "exclude": [ "dist"