-
-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathConsoleCoordinator.cs
More file actions
439 lines (373 loc) · 14.3 KB
/
ConsoleCoordinator.cs
File metadata and controls
439 lines (373 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ModularPipelines.Engine;
using ModularPipelines.Helpers;
using ModularPipelines.Models;
using ModularPipelines.Modules;
using ModularPipelines.Options;
using Spectre.Console;
namespace ModularPipelines.Console;
/// <summary>
/// Central implementation that owns all console output.
/// </summary>
/// <remarks>
/// <para>
/// <b>Architecture:</b> This coordinator is the single point of control for all console output.
/// It intercepts Console.Out/Error to catch rogue writes and routes them to appropriate buffers.
/// Spectre.Console is configured to write directly to the real console for progress rendering.
/// </para>
/// <para>
/// <b>Thread Safety:</b> This class is thread-safe. All methods can be called
/// concurrently from multiple threads.
/// </para>
/// <para>
/// <b>IProgressDisplay:</b> Implements IProgressDisplay to integrate with existing notification
/// system. The ProgressPrinter forwards notifications here, which are delegated to the active session.
/// </para>
/// </remarks>
[ExcludeFromCodeCoverage]
internal class ConsoleCoordinator : IConsoleCoordinator, IProgressDisplay
{
private const int DefaultCiConsoleWidth = 160;
private readonly IBuildSystemFormatterProvider _formatterProvider;
private readonly IResultsPrinter _resultsPrinter;
private readonly ISecretObfuscator _secretObfuscator;
private readonly IOptions<PipelineOptions> _options;
private readonly ILoggerFactory _loggerFactory;
private readonly IBuildSystemDetector _buildSystemDetector;
private readonly IServiceProvider _serviceProvider;
// Console state
private TextWriter? _originalConsoleOut;
private TextWriter? _originalConsoleError;
private IAnsiConsole? _originalAnsiConsole;
private CoordinatedTextWriter? _coordinatedOut;
private CoordinatedTextWriter? _coordinatedError;
// Phase management
private volatile bool _isProgressActive;
private readonly object _phaseLock = new();
private bool _isInstalled;
private IProgressSession? _activeSession;
// Module buffers
private readonly ConcurrentDictionary<Type, ModuleOutputBuffer> _moduleBuffers = new();
private readonly ModuleOutputBuffer _unattributedBuffer;
// Deferred exceptions
private readonly ConcurrentQueue<string> _deferredExceptions = new();
// Logger for output
private ILogger? _outputLogger;
private readonly IOutputCoordinator _outputCoordinator;
public ConsoleCoordinator(
IBuildSystemFormatterProvider formatterProvider,
IResultsPrinter resultsPrinter,
ISecretObfuscator secretObfuscator,
IOptions<PipelineOptions> options,
ILoggerFactory loggerFactory,
IBuildSystemDetector buildSystemDetector,
IServiceProvider serviceProvider,
IOutputCoordinator outputCoordinator)
{
_formatterProvider = formatterProvider;
_resultsPrinter = resultsPrinter;
_secretObfuscator = secretObfuscator;
_options = options;
_loggerFactory = loggerFactory;
_buildSystemDetector = buildSystemDetector;
_serviceProvider = serviceProvider;
_outputCoordinator = outputCoordinator;
_unattributedBuffer = new ModuleOutputBuffer("Pipeline", typeof(void));
}
/// <inheritdoc />
public void Install()
{
lock (_phaseLock)
{
if (_isInstalled)
{
throw new InvalidOperationException("ConsoleCoordinator is already installed.");
}
// Save original streams before any modifications
var originalOut = System.Console.Out;
var originalError = System.Console.Error;
var originalAnsi = AnsiConsole.Console;
try
{
_originalConsoleOut = originalOut;
_originalConsoleError = originalError;
_originalAnsiConsole = originalAnsi;
// Configure Spectre.Console to use the REAL console directly
// This bypasses our interception for progress rendering
AnsiConsole.Console = AnsiConsole.Create(new AnsiConsoleSettings
{
Out = new AnsiConsoleOutput(_originalConsoleOut)
});
// Configure console width for CI environments
// Spectre.Console defaults to 80 characters when it can't detect terminal width,
// which is common in CI environments where output is redirected
ConfigureConsoleWidth();
// Create logger for structured output during flush
_outputLogger = _loggerFactory.CreateLogger("ModularPipelines.Output");
// Install our intercepting writers
// Buffer output when progress is active AND we're not in the middle of flushing
_coordinatedOut = new CoordinatedTextWriter(
this,
_originalConsoleOut,
() => _isProgressActive && !_outputCoordinator.IsFlushing,
_secretObfuscator);
_coordinatedError = new CoordinatedTextWriter(
this,
_originalConsoleError,
() => _isProgressActive && !_outputCoordinator.IsFlushing,
_secretObfuscator);
System.Console.SetOut(_coordinatedOut);
System.Console.SetError(_coordinatedError);
_isInstalled = true;
}
catch
{
// Restore original streams on failure
System.Console.SetOut(originalOut);
System.Console.SetError(originalError);
AnsiConsole.Console = originalAnsi;
_originalConsoleOut = null;
_originalConsoleError = null;
_originalAnsiConsole = null;
_coordinatedOut = null;
_coordinatedError = null;
throw;
}
}
}
/// <inheritdoc />
public void Uninstall()
{
lock (_phaseLock)
{
if (!_isInstalled)
{
return;
}
// Flush any remaining buffered content
_coordinatedOut?.Flush();
_coordinatedError?.Flush();
// Restore original streams
if (_originalConsoleOut != null)
{
System.Console.SetOut(_originalConsoleOut);
}
if (_originalConsoleError != null)
{
System.Console.SetError(_originalConsoleError);
}
if (_originalAnsiConsole != null)
{
AnsiConsole.Console = _originalAnsiConsole;
}
_originalConsoleOut = null;
_originalConsoleError = null;
_originalAnsiConsole = null;
_coordinatedOut = null;
_coordinatedError = null;
_isInstalled = false;
}
}
/// <inheritdoc />
public async Task<IProgressSession> BeginProgressAsync(
OrganizedModules modules,
CancellationToken cancellationToken)
{
if (!_options.Value.ShowProgressInConsole)
{
// Return a no-op session if progress is disabled
// Explicitly ensure deferred output is disabled for consistency
_outputCoordinator.SetProgressActive(false);
_activeSession = new NoOpProgressSession();
return _activeSession;
}
ProgressSession session;
lock (_phaseLock)
{
if (_isProgressActive)
{
throw new InvalidOperationException("Progress session is already active.");
}
_isProgressActive = true;
// CRITICAL: Set OutputCoordinator's progress state inside the lock
// to prevent race conditions where a module completes between
// _isProgressActive = true and OutputCoordinator being notified
_outputCoordinator.SetProgressActive(true);
session = new ProgressSession(
this,
modules,
_options,
_loggerFactory,
cancellationToken);
// Wire up the progress controller for output coordination
_outputCoordinator.SetProgressController(session);
_activeSession = session;
}
// Start the progress display
session.Start();
return session;
}
/// <summary>
/// Ends the progress phase. Called by ProgressSession.DisposeAsync().
/// </summary>
internal void EndProgressPhase()
{
lock (_phaseLock)
{
_outputCoordinator.SetProgressController(NoOpProgressController.Instance);
_outputCoordinator.SetProgressActive(false);
_isProgressActive = false;
_activeSession = null;
}
}
/// <inheritdoc />
public IModuleOutputBuffer GetModuleBuffer(Type moduleType)
{
return _moduleBuffers.GetOrAdd(moduleType, t => new ModuleOutputBuffer(t));
}
/// <inheritdoc />
public IModuleOutputBuffer GetUnattributedBuffer() => _unattributedBuffer;
/// <inheritdoc />
public void FlushModuleOutput()
{
// Output is now flushed immediately when modules complete.
// This method remains for API compatibility but only flushes
// unattributed output (pipeline-level logs).
if (_originalConsoleOut == null)
{
return; // Not installed, nothing to flush
}
var formatter = _formatterProvider.GetFormatter();
// Flush unattributed output (if any)
if (_unattributedBuffer.HasOutput)
{
var unattributedLogger = _outputLogger ?? _loggerFactory.CreateLogger("ModularPipelines.Output");
_unattributedBuffer.FlushTo(_originalConsoleOut, formatter, unattributedLogger);
}
}
/// <inheritdoc />
public void WriteResults(PipelineSummary summary)
{
// Results printer uses AnsiConsole which we've configured
// to write directly to the real console
_resultsPrinter.PrintResults(summary);
}
/// <inheritdoc />
public void AddDeferredException(string message)
{
_deferredExceptions.Enqueue(message);
}
/// <inheritdoc />
public void WriteExceptions()
{
if (_deferredExceptions.IsEmpty)
{
return;
}
var messages = new List<string>();
while (_deferredExceptions.TryDequeue(out var message))
{
messages.Add(message);
}
if (messages.Count == 0)
{
return;
}
// Write using AnsiConsole (goes to real console)
AnsiConsole.WriteLine();
AnsiConsole.MarkupLine("[bold yellow]\u26a0[/] [bold red]Deferred Exceptions[/]");
AnsiConsole.WriteLine();
foreach (var message in messages)
{
AnsiConsole.WriteLine(message);
}
AnsiConsole.WriteLine();
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
// Flush any buffered module output before uninstalling
// This ensures logs are written even when pipeline isn't executed (e.g., in tests)
if (_isInstalled)
{
try
{
FlushModuleOutput();
}
catch (InvalidOperationException)
{
// Ignore if not installed - FlushModuleOutput requires installation
}
}
Uninstall();
await Task.CompletedTask;
}
private void ConfigureConsoleWidth()
{
var configuredWidth = _options.Value.ConsoleWidth;
if (configuredWidth.HasValue)
{
// User explicitly configured a width
AnsiConsole.Console.Profile.Width = configuredWidth.Value;
}
else if (_buildSystemDetector.IsKnownBuildAgent)
{
// Running in a known CI environment - use expanded width
// CI environments typically don't have a TTY, causing Spectre.Console to default to 80 chars
AnsiConsole.Console.Profile.Width = DefaultCiConsoleWidth;
}
// Otherwise, leave Spectre.Console's auto-detected width (works well for local terminals)
}
#region IProgressDisplay Implementation
/// <summary>
/// Implements IProgressDisplay.RunAsync for integration with existing notification system.
/// This is called by ProgressPrinter when the old code path is used.
/// </summary>
async Task IProgressDisplay.RunAsync(OrganizedModules organizedModules, CancellationToken cancellationToken)
{
// Install if not already installed (for backward compatibility)
if (!_isInstalled)
{
Install();
}
await using var session = await BeginProgressAsync(organizedModules, cancellationToken).ConfigureAwait(false);
// Wait for cancellation - the session runs until disposed
try
{
await Task.Delay(Timeout.Infinite, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Expected - progress ends when cancelled
}
}
/// <inheritdoc />
void IProgressDisplay.OnModuleStarted(ModuleState moduleState, TimeSpan estimatedDuration)
{
_activeSession?.OnModuleStarted(moduleState, estimatedDuration);
}
/// <inheritdoc />
void IProgressDisplay.OnModuleCompleted(ModuleState moduleState, bool isSuccessful)
{
_activeSession?.OnModuleCompleted(moduleState, isSuccessful);
}
/// <inheritdoc />
void IProgressDisplay.OnModuleSkipped(ModuleState moduleState)
{
_activeSession?.OnModuleSkipped(moduleState);
}
/// <inheritdoc />
void IProgressDisplay.OnSubModuleCreated(IModule parentModule, SubModuleBase subModule, TimeSpan estimatedDuration)
{
_activeSession?.OnSubModuleCreated(parentModule, subModule, estimatedDuration);
}
/// <inheritdoc />
void IProgressDisplay.OnSubModuleCompleted(SubModuleBase subModule, bool isSuccessful)
{
_activeSession?.OnSubModuleCompleted(subModule, isSuccessful);
}
#endregion
}