-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathNetheriteOrchestrationServiceSettings.cs
More file actions
519 lines (437 loc) · 23.8 KB
/
NetheriteOrchestrationServiceSettings.cs
File metadata and controls
519 lines (437 loc) · 23.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
namespace DurableTask.Netherite
{
using System;
using System.Runtime;
using DurableTask.Core;
using FASTER.core;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
/// <summary>
/// Settings for the <see cref="NetheriteOrchestrationService"/> class.
/// </summary>
[JsonObject]
public class NetheriteOrchestrationServiceSettings
{
/// <summary>
/// The name of the taskhub.
/// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary>
public string HubName { get; set; }
/// <summary>
/// Gets or sets the name for resolving the Azure storage connection string.
/// </summary>
public string StorageConnectionName { get; set; } = "AzureWebJobsStorage";
/// <summary>
/// Gets or sets the name for resolving the Eventhubs namespace connection string.
/// Pseudo-connection-strings "Memory" or "SingleHost" can be used to configure
/// in-memory emulation, or single-host configuration, respectively.
/// </summary>
public string EventHubsConnectionName { get; set; } = "EventHubsConnection";
/// <summary>
/// Gets or sets the identifier for the current worker.
/// </summary>
public string WorkerId { get; set; } = Environment.MachineName;
/// <summary>
/// Gets or sets the number of partitions to use when creating a new taskhub.
/// If a taskhub already exists, this number is irrelevant.
/// </summary>
public int PartitionCount { get; set; } = 12;
/// <summary>
/// Optionally, a name for an Azure Table to use for publishing load information. If set to null or empty,
/// then Azure blobs are used instead. The use of Azure blobs is currently not supported on consumption plans, or on elastic premium plans without runtime scaling.
/// </summary>
public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions";
/// <summary>
/// Tuning parameters for the FASTER logs
/// </summary>
public Faster.BlobManager.FasterTuningParameters FasterTuningParameters { get; set; } = null;
/// <summary>
/// Gets or sets the maximum number of activity work items that can be processed concurrently on a single node.
/// The default value is 100.
/// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary>
public int MaxConcurrentActivityFunctions { get; set; } = 100;
/// <summary>
/// Gets or sets the maximum number of orchestration work items that can be processed concurrently on a single node.
/// The default value is 100.
/// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary>
public int MaxConcurrentOrchestratorFunctions { get; set; } = 100;
/// <summary>
/// Gets or sets the maximum number of entity work items that can be processed concurrently on a single node.
/// The default value is 100.
/// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary>
public int MaxConcurrentEntityFunctions { get; set; } = 100;
/// <summary>
/// Whether to use separate work item queues for entities and orchestrators.
/// This defaults to false, to maintain compatility with legacy front ends.
/// Newer front ends explicitly set this to true.
/// </summary>
public bool UseSeparateQueueForEntityWorkItems { get; set; } = false;
/// <summary>
/// Gets or sets the maximum number of entity operations that are processed as a single batch.
/// The default value is 1000.
/// Matches corresponding property in Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskOptions.
/// </summary>
public int MaxEntityOperationBatchSize { get; set; } = 1000;
/// <summary>
/// Gets or sets the number of dispatchers used to dispatch orchestrations.
/// </summary>
public int OrchestrationDispatcherCount { get; set; } = 1;
/// <summary>
/// Gets or sets the number of dispatchers used to dispatch activities.
/// </summary>
public int ActivityDispatcherCount { get; set; } = 1;
/// <summary>
/// Limit for how much memory on each node should be used for caching instance states and histories
/// </summary>
public int? InstanceCacheSizeMB { get; set; } = null;
/// <summary>
/// Gets or sets the partition management option
/// </summary>
[JsonConverter(typeof(StringEnumConverter))]
public PartitionManagementOptions PartitionManagement { get; set; } = PartitionManagementOptions.EventProcessorHost;
/// <summary>
/// Additional parameters for the partition management, if necessary
/// </summary>
public string PartitionManagementParameters { get; set; } = null;
/// <summary>
/// The path to the file containing the taskhub parameters.
/// </summary>
public string TaskhubParametersFilePath { get; set; } = "taskhubparameters.json";
/// <summary>
/// Gets or sets the activity scheduler option
/// </summary>
[JsonConverter(typeof(StringEnumConverter))]
public ActivitySchedulerOptions ActivityScheduler { get; set; } = ActivitySchedulerOptions.Locavore;
/// <summary>
/// Gets or sets a flag indicating whether to enable caching of execution cursors to avoid replay.
/// </summary>
public bool CacheOrchestrationCursors { get; set; } = true;
/// <summary>
/// Whether we should carry over unexecuted raised events to the next iteration of an orchestration on ContinueAsNew.
/// </summary>
[JsonConverter(typeof(StringEnumConverter))]
public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew { get; set; } = BehaviorOnContinueAsNew.Carryover;
/// <summary>
/// When true, will throw an exception when attempting to create an orchestration with an existing dedupe status.
/// </summary>
public bool ThrowExceptionOnInvalidDedupeStatus { get; set; } = false;
/// <summary>
/// Whether to checkpoint the current state of a partition when it is stopped. This improves recovery time but
/// lengthens shutdown time and can cause memory pressure if many partitions are stopped at the same time,
/// for example if a host is shutting down.
/// </summary>
public bool TakeStateCheckpointWhenStoppingPartition { get; set; } = false;
/// <summary>
/// A limit on how many bytes to append to the log before initiating a state checkpoint. The default is 20MB.
/// </summary>
public long MaxNumberBytesBetweenCheckpoints { get; set; } = 20 * 1024 * 1024;
/// <summary>
/// A limit on how many events to append to the log before initiating a state checkpoint. The default is 10000.
/// </summary>
public long MaxNumberEventsBetweenCheckpoints { get; set; } = 10 * 1000;
/// <summary>
/// A limit on how long to wait between state checkpoints, in milliseconds. The default is 60s.
/// </summary>
public long IdleCheckpointFrequencyMs { get; set; } = 60 * 1000;
/// <summary>
/// Set this to a local file path to make FASTER use local files instead of blobs. Currently,
/// this makes sense only for local testing and debugging.
/// </summary>
public string UseLocalDirectoryForPartitionStorage { get; set; } = null;
/// <summary>
/// Whether to keep an in-memory set of all instance ids in memory. This is required for supporting paged queries.
/// </summary>
public bool KeepInstanceIdsInMemory = true;
/// <summary>
/// Whether to immediately shut down the transport layer and terminate the process when a fatal exception is observed.
/// This is true by default, to enable failing hosts to leave quickly which allows other hosts to recover the partitions more quickly.
/// </summary>
public bool EmergencyShutdownOnFatalExceptions = true;
/// <summary>
/// Forces steps to pe persisted before applying their effects, disabling all pipelining.
/// </summary>
public bool PersistStepsFirst { get; set; } = false;
/// <summary>
/// If true, the start of work items is delayed until the dequeue count
/// is persisted. Defaults to false, which improves latency but means the
/// reported dequeue count may be lower than the actual dequeue count in some cases.
/// </summary>
public bool PersistDequeueCountBeforeStartingWorkItem { get; set; } = false;
/// <summary>
/// Pack TaskMessages generated by a single work item for the same destination into a single event.
/// </summary>
public int PackPartitionTaskMessages { get; set; } = 100;
/// <summary>
/// Time limit for partition startup, in minutes.
/// </summary>
public int PartitionStartupTimeoutMinutes { get; set; } = 15;
/// <summary>
/// If true, disables the prefetching during replay.
/// </summary>
public bool DisablePrefetchDuringReplay { get; set; } = false;
/// <summary>
/// Allows attaching additional checkers and debuggers during testing.
/// </summary>
[JsonIgnore]
public TestHooks TestHooks { get; set; } = null;
/// <summary>
/// A lower limit on the severity level of trace events emitted by the transport layer.
/// </summary>
/// <remarks>This level applies to both ETW events and ILogger events.</remarks>
[JsonConverter(typeof(StringEnumConverter))]
public LogLevel TransportLogLevelLimit { get; set; } = LogLevel.Debug;
/// <summary>
/// A lower limit on the severity level of trace events emitted by the storage layer.
/// </summary>
/// <remarks>This level applies to both ETW events and ILogger events.</remarks>
[JsonConverter(typeof(StringEnumConverter))]
public LogLevel StorageLogLevelLimit { get; set; } = LogLevel.Debug;
/// <summary>
/// A lower limit on the severity level of event processor trace events emitted.
/// </summary>
/// <remarks>This level applies to both ETW events and ILogger events.</remarks>
[JsonConverter(typeof(StringEnumConverter))]
public LogLevel EventLogLevelLimit { get; set; } = LogLevel.Debug;
/// <summary>
/// A lower limit on the severity level of work item trace events emitted.
/// </summary>
/// <remarks>This level applies to both ETW events and ILogger events.</remarks>
[JsonConverter(typeof(StringEnumConverter))]
public LogLevel WorkItemLogLevelLimit { get; set; } = LogLevel.Debug;
/// <summary>
/// A lower limit on the severity level of client trace events emitted.
/// </summary>
/// <remarks>This level applies to both ETW events and ILogger events.</remarks>
[JsonConverter(typeof(StringEnumConverter))]
public LogLevel ClientLogLevelLimit { get; set; } = LogLevel.Debug;
/// <summary>
/// A lower limit on the severity level of load monitor trace events emitted.
/// </summary>
/// <remarks>This level applies to both ETW events and ILogger events.</remarks>
[JsonConverter(typeof(StringEnumConverter))]
public LogLevel LoadMonitorLogLevelLimit { get; set; } = LogLevel.Debug;
/// <summary>
/// A lower limit on the severity level of all other trace events emitted.
/// </summary>
/// <remarks>This level applies to both ETW events and ILogger events.</remarks>
[JsonConverter(typeof(StringEnumConverter))]
public LogLevel LogLevelLimit { get; set; } = LogLevel.Debug;
#region Compatibility Shim
/// <summary>
/// The resolved storage connection string. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
[Obsolete("connections should be resolved by calling settings.Validate(ConnectionResolver resolver)")]
public string ResolvedStorageConnectionString { get; set; }
/// <summary>
/// The resolved event hubs connection string. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
[Obsolete("connections should be resolved by calling settings.Validate(ConnectionResolver resolver)")]
public string ResolvedTransportConnectionString { get; set; }
/// <summary>
/// A name for resolving a storage connection string to be used specifically for the page blobs, or null if page blobs are to be stored in the default account.
/// </summary>
[JsonProperty(DefaultValueHandling = DefaultValueHandling.Ignore)]
[Obsolete("connections should be resolved by calling settings.Validate(ConnectionResolver resolver)")]
public string PageBlobStorageConnectionName { get; set; } = null;
/// <summary>
/// The resolved page blob storage connection string, or null if page blobs are to be stored in the default account. Is never serialized or deserialized.
/// </summary>
[JsonIgnore]
[Obsolete("connections should be resolved by calling settings.Validate(ConnectionResolver resolver)")]
public string ResolvedPageBlobStorageConnectionString { get; set; }
class CompatibilityResolver : ConnectionResolver
{
readonly Func<string, string> nameResolver;
CompatibilityResolver(Func<string, string> nameResolver)
{
this.nameResolver = nameResolver;
}
public override ConnectionInfo ResolveConnectionInfo(string taskHub, string connectionName, ResourceType recourceType)
{
throw new NotImplementedException();
}
public override void ResolveLayerConfiguration(string connectionName, out StorageChoices storageChoice, out TransportChoices transportChoice)
{
throw new NotImplementedException();
}
}
#endregion
#region Parameters that are set during resolution
/// <summary>
/// The type of storage layer to be used
/// </summary>
[JsonIgnore]
public StorageChoices StorageChoice { get; protected set; }
/// <summary>
/// The type of transport layer to be used
/// </summary>
[JsonIgnore]
public TransportChoices TransportChoice { get; protected set; }
/// <summary>
/// The connection information for Azure Storage blobs.
/// If not explicitly set, this is populated during validation by resolving <see cref="StorageConnectionName"/>.
/// </summary>
[JsonIgnore]
public ConnectionInfo BlobStorageConnection { get; protected set; }
/// <summary>
/// The connection information for Azure Storage tables.
/// If not explicitly set, this is populated during validation by resolving <see cref="StorageConnectionName"/>.
/// </summary>
[JsonIgnore]
public ConnectionInfo TableStorageConnection { get; protected set; }
/// <summary>
/// The connection information for the event hubs namespace.
/// If not explicitly set, this is populated during validation by resolving <see cref="EventHubsConnectionName"/>.
/// </summary>
[JsonIgnore]
public ConnectionInfo EventHubsConnection { get; protected set; }
/// <summary>
/// The connection information for Azure Storage page blobs.
///This is usually null, which means the same <see cref="BlobStorageConnection"/> should be used for page blobs also.
/// </summary>
[JsonIgnore]
public ConnectionInfo PageBlobStorageConnection { get; protected set; }
/// <summary>
/// Whether the storage layer was configured to use a different connection for page blobs than for other blobs
/// </summary>
[JsonIgnore]
internal bool UseSeparatePageBlobStorage => this.PageBlobStorageConnection != null;
[JsonIgnore]
public string StorageAccountName
=> this.StorageChoice == StorageChoices.Memory ? "Memory" : this.BlobStorageConnection.ResourceName;
/// <summary>
/// Whether the settings have been validated and resolved
/// </summary>
[JsonIgnore]
public bool ResolutionComplete { get; protected set; }
#endregion
/// <summary>
/// Validates the settings, throwing exceptions if there are issues.
/// </summary>
/// <param name="nameResolver">Optionally, a resolver for connection names.</param>
public void Validate(Func<string, string> connectionNameToConnectionString = null)
{
this.Validate(new CompatibilityConnectionResolver(this, connectionNameToConnectionString));
}
/// <summary>
/// Validates the settings and resolves the connections, throwing exceptions if there are issues.
/// </summary>
/// <param name="resolver">A connection resolver.</param>
public void Validate(ConnectionResolver resolver)
{
if (string.IsNullOrEmpty(this.HubName))
{
throw new NetheriteConfigurationException($"Must specify {nameof(this.HubName)} for Netherite storage provider.");
}
ValidateTaskhubName(this.HubName);
if (this.PartitionCount < 1 || this.PartitionCount > 32)
{
throw new ArgumentOutOfRangeException(nameof(this.PartitionCount));
}
if (this.MaxConcurrentOrchestratorFunctions <= 0)
{
throw new ArgumentOutOfRangeException(nameof(this.MaxConcurrentOrchestratorFunctions));
}
if (this.MaxConcurrentActivityFunctions <= 0)
{
throw new ArgumentOutOfRangeException(nameof(this.MaxConcurrentActivityFunctions));
}
resolver.ResolveLayerConfiguration(this.EventHubsConnectionName, out var storage, out var transport);
this.StorageChoice = storage;
this.TransportChoice = transport;
if (this.TransportChoice == TransportChoices.EventHubs)
{
// we need a valid event hubs connection
if (string.IsNullOrEmpty(this.EventHubsConnectionName) && resolver is ConnectionNameToConnectionStringResolver)
{
throw new NetheriteConfigurationException($"Must specify {nameof(this.EventHubsConnectionName)} for Netherite storage provider.");
}
try
{
this.EventHubsConnection = resolver.ResolveConnectionInfo(this.HubName, this.EventHubsConnectionName, ConnectionResolver.ResourceType.EventHubsNamespace);
}
catch (Exception e)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.EventHubsConnectionName)}={this.EventHubsConnectionName} to create an eventhubs connection for Netherite storage provider: {e.Message}", e);
}
if (this.EventHubsConnection == null)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.EventHubsConnectionName)}={this.EventHubsConnectionName} to create an eventhubs connection for Netherite storage provider.");
}
}
if (this.StorageChoice == StorageChoices.Faster || this.TransportChoice == TransportChoices.EventHubs)
{
// we need a valid blob storage connection
if (string.IsNullOrEmpty(this.StorageConnectionName) && resolver is ConnectionNameToConnectionStringResolver)
{
throw new NetheriteConfigurationException($"Must specify {nameof(this.StorageConnectionName)} for Netherite storage provider.");
}
try
{
this.BlobStorageConnection = resolver.ResolveConnectionInfo(this.HubName, this.StorageConnectionName, ConnectionResolver.ResourceType.BlobStorage);
}
catch (Exception e)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.StorageConnectionName)}={this.StorageConnectionName} to create a blob storage connection for Netherite storage provider: {e.Message}", e);
}
if (this.BlobStorageConnection == null)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.StorageConnectionName)}={this.StorageConnectionName} to create a blob storage connection for Netherite storage provider.");
}
}
if (this.StorageChoice == StorageChoices.Faster && !string.IsNullOrEmpty(this.LoadInformationAzureTableName))
{
// we need a valid table storage connection
try
{
this.TableStorageConnection = resolver.ResolveConnectionInfo(this.HubName, this.StorageConnectionName, ConnectionResolver.ResourceType.TableStorage);
}
catch (Exception e)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.StorageConnectionName)}={this.StorageConnectionName} to create a table storage connection for Netherite storage provider: {e.Message}", e);
}
if (this.TableStorageConnection == null)
{
throw new NetheriteConfigurationException($"Could not resolve {nameof(this.StorageConnectionName)}={this.StorageConnectionName} to create a table storage connection for Netherite storage provider.");
}
}
if (this.StorageChoice == StorageChoices.Faster)
{
// some custom resolvers may specify a separate page blob connection, but usually this will be null
this.PageBlobStorageConnection = resolver.ResolveConnectionInfo(this.HubName, this.StorageConnectionName, ConnectionResolver.ResourceType.PageBlobStorage);
}
// we have completed validation and resolution
this.ResolutionComplete = true;
}
const int MinTaskHubNameSize = 3;
const int MaxTaskHubNameSize = 45;
public static void ValidateTaskhubName(string taskhubName)
{
if (taskhubName.Length < MinTaskHubNameSize || taskhubName.Length > MaxTaskHubNameSize)
{
throw new NetheriteConfigurationException(GetTaskHubErrorString(taskhubName));
}
try
{
Microsoft.Azure.Storage.NameValidator.ValidateContainerName(taskhubName.ToLowerInvariant());
Microsoft.Azure.Storage.NameValidator.ValidateBlobName(taskhubName);
}
catch (ArgumentException e)
{
throw new NetheriteConfigurationException(GetTaskHubErrorString(taskhubName), e);
}
}
static string GetTaskHubErrorString(string hubName)
{
return $"Task hub name '{hubName}' should contain only alphanumeric characters, start with a letter, and have length between {MinTaskHubNameSize} and {MaxTaskHubNameSize}.";
}
}
}