Skip to content

Commit 1c544bf

Browse files
authored
[Internal] Client Telemetry: Adds sampling logic for network level telemetry (#3750)
* sampling logic add * throttlinfix * fix tests * fix tests * remove dispose from sampler * add 412 * draft push * refactor code for sampling * clean up * start testing * adding tests * wip * test fix * fix test * fix tests * deleted extra file * remove one toList * add custome logic for sampling * code refactor * replace key * remove commented code from test * added more comments * simpler sampler logic * refactor code
1 parent d822239 commit 1c544bf

File tree

12 files changed

+516
-251
lines changed

12 files changed

+516
-251
lines changed

Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetry.cs

Lines changed: 11 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ internal class ClientTelemetry : IDisposable
3636
private readonly ClientTelemetryProperties clientTelemetryInfo;
3737
private readonly ClientTelemetryProcessor processor;
3838
private readonly DiagnosticsHandlerHelper diagnosticsHelper;
39-
39+
private readonly NetworkDataRecorder networkDataRecorder;
40+
4041
private readonly CancellationTokenSource cancellationTokenSource;
4142

4243
private readonly GlobalEndpointManager globalEndpointManager;
@@ -49,11 +50,8 @@ internal class ClientTelemetry : IDisposable
4950
private ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoMap
5051
= new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>();
5152

52-
private ConcurrentDictionary<RequestInfo, LongConcurrentHistogram> requestInfoMap
53-
= new ConcurrentDictionary<RequestInfo, LongConcurrentHistogram>();
54-
5553
private int numberOfFailures = 0;
56-
54+
5755
/// <summary>
5856
/// Only for Mocking in tests
5957
/// </summary>
@@ -112,6 +110,8 @@ internal ClientTelemetry(
112110
GlobalEndpointManager globalEndpointManager)
113111
{
114112
this.diagnosticsHelper = diagnosticsHelper ?? throw new ArgumentNullException(nameof(diagnosticsHelper));
113+
this.globalEndpointManager = globalEndpointManager;
114+
115115
this.processor = new ClientTelemetryProcessor(httpClient, authorizationTokenProvider);
116116

117117
this.clientTelemetryInfo = new ClientTelemetryProperties(
@@ -122,8 +122,9 @@ internal ClientTelemetry(
122122
preferredRegions: preferredRegions,
123123
aggregationIntervalInSec: (int)observingWindow.TotalSeconds);
124124

125+
this.networkDataRecorder = new NetworkDataRecorder();
126+
125127
this.cancellationTokenSource = new CancellationTokenSource();
126-
this.globalEndpointManager = globalEndpointManager;
127128
}
128129

129130
/// <summary>
@@ -179,18 +180,15 @@ private async Task EnrichAndSendAsync()
179180

180181
ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot
181182
= Interlocked.Exchange(ref this.cacheRefreshInfoMap, new ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram>());
182-
183-
ConcurrentDictionary<RequestInfo, LongConcurrentHistogram> requestInfoSnapshot
184-
= Interlocked.Exchange(ref this.requestInfoMap, new ConcurrentDictionary<RequestInfo, LongConcurrentHistogram>());
185-
183+
186184
try
187185
{
188186
await this.processor
189-
.ProcessAndSendAsync(
187+
.ProcessAndSendAsync(
190188
clientTelemetryInfo: this.clientTelemetryInfo,
191189
operationInfoSnapshot: operationInfoSnapshot,
192190
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
193-
requestInfoSnapshot: requestInfoSnapshot,
191+
requestInfoSnapshot: this.networkDataRecorder.GetRequests(),
194192
cancellationToken: this.cancellationTokenSource.Token);
195193

196194
this.numberOfFailures = 0;
@@ -296,7 +294,7 @@ internal void CollectOperationInfo(CosmosDiagnostics cosmosDiagnostics,
296294

297295
// Record Network/Replica Information
298296
SummaryDiagnostics summaryDiagnostics = new SummaryDiagnostics(trace);
299-
this.RecordRntbdResponses(containerId, databaseId, summaryDiagnostics.StoreResponseStatistics.Value);
297+
this.networkDataRecorder.Record(summaryDiagnostics.StoreResponseStatistics.Value, databaseId, containerId);
300298

301299
string regionsContacted = ClientTelemetryHelper.GetContactedRegions(cosmosDiagnostics.GetContactedRegions());
302300

@@ -338,37 +336,6 @@ internal void CollectOperationInfo(CosmosDiagnostics cosmosDiagnostics,
338336
}
339337
}
340338

341-
/// <summary>
342-
/// Records RNTBD calls statistics
343-
/// </summary>
344-
/// <param name="containerId"></param>
345-
/// <param name="databaseId"></param>
346-
/// <param name="storeResponseStatistics"></param>
347-
private void RecordRntbdResponses(string containerId, string databaseId, List<StoreResponseStatistics> storeResponseStatistics)
348-
{
349-
foreach (StoreResponseStatistics storetatistics in storeResponseStatistics)
350-
{
351-
if (ClientTelemetryOptions.IsEligible((int)storetatistics.StoreResult.StatusCode, (int)storetatistics.StoreResult.SubStatusCode, storetatistics.RequestLatency))
352-
{
353-
RequestInfo requestInfo = new RequestInfo()
354-
{
355-
DatabaseName = databaseId,
356-
ContainerName = containerId,
357-
Uri = storetatistics.StoreResult.StorePhysicalAddress.ToString(),
358-
StatusCode = (int)storetatistics.StoreResult.StatusCode,
359-
SubStatusCode = (int)storetatistics.StoreResult.SubStatusCode,
360-
Resource = storetatistics.RequestResourceType.ToString(),
361-
Operation = storetatistics.RequestOperationType.ToString(),
362-
};
363-
364-
LongConcurrentHistogram latencyHist = this.requestInfoMap.GetOrAdd(requestInfo, x => new LongConcurrentHistogram(ClientTelemetryOptions.RequestLatencyMin,
365-
ClientTelemetryOptions.RequestLatencyMax,
366-
ClientTelemetryOptions.RequestLatencyPrecision));
367-
latencyHist.RecordValue(storetatistics.RequestLatency.Ticks);
368-
}
369-
}
370-
}
371-
372339
/// <summary>
373340
/// Dispose of cosmos client.It will get disposed with client so not making it thread safe.
374341
/// </summary>

Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryOptions.cs

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,18 @@ internal static class ClientTelemetryOptions
8888

8989
internal static readonly ResourceType AllowedResourceTypes = ResourceType.Document;
9090
// Why 5 sec? As of now, if any network request is taking more than 5 millisecond sec, we will consider it slow request this value can be revisited in future
91-
private static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5);
91+
internal static readonly TimeSpan NetworkLatencyThreshold = TimeSpan.FromMilliseconds(5);
92+
internal static readonly int NetworkRequestsSampleSizeThreshold = 10;
93+
9294
internal static readonly JsonSerializerSettings JsonSerializerSettings = new JsonSerializerSettings
9395
{
9496
NullValueHandling = NullValueHandling.Ignore,
9597
MaxDepth = 64, // https://github.com/advisories/GHSA-5crp-9r3c-p9vr
9698
};
9799

98-
private static readonly List<int> ExcludedStatusCodes = new List<int> { 404, 409 };
100+
internal static readonly List<int> ExcludedStatusCodes = new List<int> { 404, 409, 412 };
99101

102+
internal static readonly int NetworkTelemetrySampleSize = 200;
100103
internal static int PayloadSizeThreshold = 1024 * 1024 * 2; // 2MB
101104

102105
private static Uri clientTelemetryEndpoint;
@@ -180,32 +183,5 @@ internal static string GetEnvironmentName()
180183
}
181184
return environmentName;
182185
}
183-
184-
/// <summary>
185-
/// This method will return true if the request is failed with User or Server Exception and not excluded from telemetry.
186-
/// This method will return true if the request latency is more than the threshold.
187-
/// otherwise return false
188-
/// </summary>
189-
/// <param name="statusCode"></param>
190-
/// <param name="subStatusCode"></param>
191-
/// <param name="latencyInMs"></param>
192-
/// <returns>true/false</returns>
193-
internal static bool IsEligible(int statusCode, int subStatusCode, TimeSpan latencyInMs)
194-
{
195-
return
196-
ClientTelemetryOptions.IsStatusCodeNotExcluded(statusCode, subStatusCode) &&
197-
(ClientTelemetryOptions.IsUserOrServerError(statusCode) || latencyInMs >= ClientTelemetryOptions.NetworkLatencyThreshold);
198-
}
199-
200-
private static bool IsUserOrServerError(int statusCode)
201-
{
202-
return statusCode >= 400 && statusCode <= 599;
203-
}
204-
205-
private static bool IsStatusCodeNotExcluded(int statusCode, int subStatusCode)
206-
{
207-
return !(ClientTelemetryOptions.ExcludedStatusCodes.Contains(statusCode) && subStatusCode == 0);
208-
}
209-
210186
}
211187
}

Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryPayloadWriter.cs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public static async Task SerializedPayloadChunksAsync(
2121
ClientTelemetryProperties properties,
2222
ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)> operationInfoSnapshot,
2323
ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot,
24-
ConcurrentDictionary<RequestInfo, LongConcurrentHistogram> requestInfoSnapshot,
24+
IReadOnlyList<RequestInfo> sampledRequestInfo,
2525
Func<string, Task> callback)
2626
{
2727
if (properties == null)
@@ -46,11 +46,12 @@ public static async Task SerializedPayloadChunksAsync(
4646
OperationInfo payloadForRequestCharge = payloadForLatency.Copy();
4747
payloadForRequestCharge.MetricInfo = new MetricInfo(ClientTelemetryOptions.RequestChargeName, ClientTelemetryOptions.RequestChargeUnit);
4848
payloadForRequestCharge.SetAggregators(entry.Value.requestcharge, ClientTelemetryOptions.HistogramPrecisionFactor);
49-
49+
5050
string latencyMetrics = JsonConvert.SerializeObject(payloadForLatency);
5151
string requestChargeMetrics = JsonConvert.SerializeObject(payloadForRequestCharge);
52-
53-
if (lengthNow + latencyMetrics.Length + requestChargeMetrics.Length > ClientTelemetryOptions.PayloadSizeThreshold)
52+
53+
int thisSectionLength = latencyMetrics.Length + requestChargeMetrics.Length;
54+
if (lengthNow + thisSectionLength > ClientTelemetryOptions.PayloadSizeThreshold)
5455
{
5556
writer.WriteEndArray();
5657
writer.WriteEndObject();
@@ -98,21 +99,16 @@ public static async Task SerializedPayloadChunksAsync(
9899

99100
}
100101

101-
if (requestInfoSnapshot?.Any() == true)
102+
if (sampledRequestInfo?.Any() == true)
102103
{
103104
writer.WritePropertyName("requestInfo");
104105
writer.WriteStartArray();
105106

106-
foreach (KeyValuePair<RequestInfo, LongConcurrentHistogram> entry in requestInfoSnapshot)
107+
foreach (RequestInfo entry in sampledRequestInfo)
107108
{
108109
long lengthNow = stringBuilder.Length;
109-
110-
MetricInfo metricInfo = new MetricInfo(ClientTelemetryOptions.RequestLatencyName, ClientTelemetryOptions.RequestLatencyUnit);
111-
metricInfo.SetAggregators(entry.Value, ClientTelemetryOptions.TicksToMsFactor);
112-
113-
RequestInfo payloadForLatency = entry.Key;
114-
payloadForLatency.Metrics.Add(metricInfo);
115-
string latencyMetrics = JsonConvert.SerializeObject(payloadForLatency);
110+
111+
string latencyMetrics = JsonConvert.SerializeObject(entry);
116112

117113
if (lengthNow + latencyMetrics.Length > ClientTelemetryOptions.PayloadSizeThreshold)
118114
{

Microsoft.Azure.Cosmos/src/Telemetry/ClientTelemetryProcessor.cs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.Telemetry
66
{
77
using System;
88
using System.Collections.Concurrent;
9+
using System.Collections.Generic;
910
using System.Net.Http;
1011
using System.Text;
1112
using System.Threading;
@@ -22,7 +23,7 @@ internal class ClientTelemetryProcessor
2223

2324
private readonly AuthorizationTokenProvider tokenProvider;
2425
private readonly CosmosHttpClient httpClient;
25-
26+
2627
internal ClientTelemetryProcessor(CosmosHttpClient httpClient, AuthorizationTokenProvider tokenProvider)
2728
{
2829
this.httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
@@ -32,17 +33,12 @@ internal ClientTelemetryProcessor(CosmosHttpClient httpClient, AuthorizationToke
3233
/// <summary>
3334
/// It will create Task to process and send client telemetry payload to Client Telemetry Service.
3435
/// </summary>
35-
/// <param name="clientTelemetryInfo"></param>
36-
/// <param name="operationInfoSnapshot"></param>
37-
/// <param name="cacheRefreshInfoSnapshot"></param>
38-
/// <param name="requestInfoSnapshot"></param>
39-
/// <param name="cancellationToken"></param>
4036
/// <returns>Task</returns>
4137
internal async Task ProcessAndSendAsync(
4238
ClientTelemetryProperties clientTelemetryInfo,
43-
ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharge)> operationInfoSnapshot,
39+
ConcurrentDictionary<OperationInfo, (LongConcurrentHistogram latency, LongConcurrentHistogram requestcharget)> operationInfoSnapshot,
4440
ConcurrentDictionary<CacheRefreshInfo, LongConcurrentHistogram> cacheRefreshInfoSnapshot,
45-
ConcurrentDictionary<RequestInfo, LongConcurrentHistogram> requestInfoSnapshot,
41+
IReadOnlyList<RequestInfo> requestInfoSnapshot,
4642
CancellationToken cancellationToken)
4743
{
4844
try
@@ -51,7 +47,7 @@ await ClientTelemetryPayloadWriter.SerializedPayloadChunksAsync(
5147
properties: clientTelemetryInfo,
5248
operationInfoSnapshot: operationInfoSnapshot,
5349
cacheRefreshInfoSnapshot: cacheRefreshInfoSnapshot,
54-
requestInfoSnapshot: requestInfoSnapshot,
50+
sampledRequestInfo: requestInfoSnapshot,
5551
callback: async (payload) => await this.SendAsync(clientTelemetryInfo.GlobalDatabaseAccountName, payload, cancellationToken));
5652
}
5753
catch (Exception ex)

Microsoft.Azure.Cosmos/src/Telemetry/Models/RequestInfo.cs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,14 @@ internal sealed class RequestInfo
3737

3838
public override int GetHashCode()
3939
{
40-
int hash = 3;
40+
int hash = this.GetHashCodeForSampler();
4141
hash = (hash * 7) ^ (this.Uri == null ? 0 : this.Uri.GetHashCode());
42+
return hash;
43+
}
44+
45+
public int GetHashCodeForSampler()
46+
{
47+
int hash = 3;
4248
hash = (hash * 7) ^ (this.DatabaseName == null ? 0 : this.DatabaseName.GetHashCode());
4349
hash = (hash * 7) ^ (this.ContainerName == null ? 0 : this.ContainerName.GetHashCode());
4450
hash = (hash * 7) ^ (this.Operation == null ? 0 : this.Operation.GetHashCode());
@@ -47,7 +53,7 @@ public override int GetHashCode()
4753
hash = (hash * 7) ^ (this.SubStatusCode.GetHashCode());
4854
return hash;
4955
}
50-
56+
5157
public override bool Equals(object obj)
5258
{
5359
bool isequal = obj is RequestInfo payload &&
@@ -62,5 +68,26 @@ public override bool Equals(object obj)
6268
return isequal;
6369
}
6470

71+
public double GetP99Latency()
72+
{
73+
foreach (MetricInfo metric in this.Metrics)
74+
{
75+
if (metric.MetricsName.Equals(ClientTelemetryOptions.RequestLatencyName, StringComparison.OrdinalIgnoreCase))
76+
{
77+
return metric.Percentiles[ClientTelemetryOptions.Percentile99];
78+
}
79+
}
80+
return Double.MinValue; // least prioity for request info w/o latency info
81+
}
82+
83+
public double GetSampleCount()
84+
{
85+
return (double)this.Metrics[0].Count;
86+
}
87+
88+
public override string ToString()
89+
{
90+
return "Latency : " + this.GetP99Latency() + ", SampleCount : " + this.GetSampleCount();
91+
}
6592
}
6693
}

0 commit comments

Comments
 (0)