diff --git a/src/ModularPipelines/Console/ConsoleCoordinator.cs b/src/ModularPipelines/Console/ConsoleCoordinator.cs index 8aa28e9693..51d26a1ca0 100644 --- a/src/ModularPipelines/Console/ConsoleCoordinator.cs +++ b/src/ModularPipelines/Console/ConsoleCoordinator.cs @@ -207,10 +207,14 @@ public async Task BeginProgressAsync( if (!_options.Value.ShowProgressInConsole) { // Return a no-op session if progress is disabled + // Explicitly ensure deferred output is disabled for consistency + _outputCoordinator.SetProgressActive(false); _activeSession = new NoOpProgressSession(); return _activeSession; } + ProgressSession session; + lock (_phaseLock) { if (_isProgressActive) @@ -219,19 +223,24 @@ public async Task BeginProgressAsync( } _isProgressActive = true; - } - var session = new ProgressSession( - this, - modules, - _options, - _loggerFactory, - cancellationToken); + // CRITICAL: Set OutputCoordinator's progress state inside the lock + // to prevent race conditions where a module completes between + // _isProgressActive = true and OutputCoordinator being notified + _outputCoordinator.SetProgressActive(true); - // Wire up the progress controller for output coordination - _outputCoordinator.SetProgressController(session); + session = new ProgressSession( + this, + modules, + _options, + _loggerFactory, + cancellationToken); - _activeSession = session; + // Wire up the progress controller for output coordination + _outputCoordinator.SetProgressController(session); + + _activeSession = session; + } // Start the progress display session.Start(); @@ -247,6 +256,7 @@ internal void EndProgressPhase() lock (_phaseLock) { _outputCoordinator.SetProgressController(NoOpProgressController.Instance); + _outputCoordinator.SetProgressActive(false); _isProgressActive = false; _activeSession = null; } diff --git a/src/ModularPipelines/Console/IOutputCoordinator.cs b/src/ModularPipelines/Console/IOutputCoordinator.cs index 116b6a61e2..0c5911678c 100644 --- a/src/ModularPipelines/Console/IOutputCoordinator.cs +++ b/src/ModularPipelines/Console/IOutputCoordinator.cs @@ -27,4 +27,26 @@ internal interface IOutputCoordinator /// Cancellation token. /// A task that completes when the buffer has been flushed. Task EnqueueAndFlushAsync(IModuleOutputBuffer buffer, CancellationToken cancellationToken = default); + + /// + /// Sets whether live progress is currently active. + /// When active, module output is deferred until pipeline end. + /// + /// True when progress display is running. + void SetProgressActive(bool isActive); + + /// + /// Called when a module completes. Decides whether to flush immediately or defer. + /// + /// The module's output buffer. + /// The type of the completed module. + /// Cancellation token. + Task OnModuleCompletedAsync(IModuleOutputBuffer buffer, Type moduleType, CancellationToken cancellationToken = default); + + /// + /// Flushes all deferred output in module completion order. + /// Called after progress ends, before results are printed. + /// + /// Cancellation token. + Task FlushDeferredAsync(CancellationToken cancellationToken = default); } diff --git a/src/ModularPipelines/Console/OutputCoordinator.cs b/src/ModularPipelines/Console/OutputCoordinator.cs index 78860832eb..2700ecc49d 100644 --- a/src/ModularPipelines/Console/OutputCoordinator.cs +++ b/src/ModularPipelines/Console/OutputCoordinator.cs @@ -20,9 +20,21 @@ internal sealed class OutputCoordinator : IOutputCoordinator private readonly Queue _pendingQueue = new(); private readonly SemaphoreSlim _writeLock = new(1, 1); + // Separate lock for deferred output operations to reduce contention + // with immediate flush operations that use _queueLock + private readonly object _deferredLock = new(); + private IProgressController _progressController = NoOpProgressController.Instance; private bool _isProcessingQueue; private volatile bool _isFlushingOutput; + private volatile bool _isProgressActive; + private readonly List _deferredOutputs = new(); + + private readonly record struct DeferredModuleOutput( + IModuleOutputBuffer Buffer, + Type ModuleType, + DateTimeOffset CompletedAt + ); public OutputCoordinator( IBuildSystemFormatterProvider formatterProvider, @@ -45,6 +57,119 @@ public void SetProgressController(IProgressController controller) _progressController = controller; } + /// + public void SetProgressActive(bool isActive) + { + lock (_deferredLock) + { + if (isActive) + { + // Starting a new progress session - check for stale deferred outputs + // from a previous run that crashed before FlushDeferredAsync was called + if (_deferredOutputs.Count > 0) + { + _logger.LogWarning( + "Found {Count} stale deferred outputs from a previous pipeline run. " + + "This indicates FlushDeferredAsync was not called. Clearing to prevent memory leak.", + _deferredOutputs.Count); + _deferredOutputs.Clear(); + } + } + else + { + // Progress ending - clean up any remaining deferred outputs that weren't flushed + // This handles cancellation scenarios where FlushDeferredAsync wasn't called + if (_deferredOutputs.Count > 0) + { + _logger.LogWarning( + "Progress ended with {Count} unflushed deferred outputs. " + + "This indicates FlushDeferredAsync was not called (possibly due to cancellation). " + + "Clearing to prevent memory leak.", + _deferredOutputs.Count); + _deferredOutputs.Clear(); + } + } + } + + _isProgressActive = isActive; + } + + /// + public async Task OnModuleCompletedAsync(IModuleOutputBuffer buffer, Type moduleType, CancellationToken cancellationToken = default) + { + if (!buffer.HasOutput) + { + return; + } + + if (_isProgressActive) + { + // Progress is active - defer output until pipeline end + // Uses separate lock to avoid contention with immediate flush operations + lock (_deferredLock) + { + _deferredOutputs.Add(new DeferredModuleOutput( + buffer, + moduleType, + DateTimeOffset.UtcNow + )); + } + } + else + { + // No progress - flush immediately (existing behavior) + await EnqueueAndFlushAsync(buffer, cancellationToken).ConfigureAwait(false); + } + } + + /// + public async Task FlushDeferredAsync(CancellationToken cancellationToken = default) + { + List toFlush; + + lock (_deferredLock) + { + if (_deferredOutputs.Count == 0) + { + return; + } + + // Order by completion time to maintain consistent output ordering + toFlush = _deferredOutputs + .OrderBy(x => x.CompletedAt) + .ToList(); + _deferredOutputs.Clear(); + } + + var formatter = _formatterProvider.GetFormatter(); + + await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + _isFlushingOutput = true; + try + { + foreach (var output in toFlush) + { + if (cancellationToken.IsCancellationRequested) + { + break; + } + + FlushBuffer(output.Buffer, formatter); + } + } + finally + { + _isFlushingOutput = false; + } + } + finally + { + _writeLock.Release(); + } + } + /// public async Task EnqueueAndFlushAsync(IModuleOutputBuffer buffer, CancellationToken cancellationToken = default) { diff --git a/src/ModularPipelines/Engine/Executors/PipelineOutputCoordinator.cs b/src/ModularPipelines/Engine/Executors/PipelineOutputCoordinator.cs index 3ce55fdd74..0cd665d06e 100644 --- a/src/ModularPipelines/Engine/Executors/PipelineOutputCoordinator.cs +++ b/src/ModularPipelines/Engine/Executors/PipelineOutputCoordinator.cs @@ -26,19 +26,22 @@ internal class PipelineOutputCoordinator : IPipelineOutputCoordinator private readonly IInternalSummaryLogger _summaryLogger; private readonly IExceptionBuffer _exceptionBuffer; private readonly IConsoleCoordinator _consoleCoordinator; + private readonly IOutputCoordinator _outputCoordinator; public PipelineOutputCoordinator( IPrintProgressExecutor printProgressExecutor, IConsolePrinter consolePrinter, IInternalSummaryLogger summaryLogger, IExceptionBuffer exceptionBuffer, - IConsoleCoordinator consoleCoordinator) + IConsoleCoordinator consoleCoordinator, + IOutputCoordinator outputCoordinator) { _printProgressExecutor = printProgressExecutor; _consolePrinter = consolePrinter; _summaryLogger = summaryLogger; _exceptionBuffer = exceptionBuffer; _consoleCoordinator = consoleCoordinator; + _outputCoordinator = outputCoordinator; } /// @@ -48,7 +51,7 @@ public async Task InitializeAsync() _consoleCoordinator.Install(); var printProgressExecutor = await _printProgressExecutor.InitializeAsync().ConfigureAwait(false); - return new PipelineOutputScope(printProgressExecutor, _consoleCoordinator); + return new PipelineOutputScope(printProgressExecutor, _consoleCoordinator, _outputCoordinator); } /// @@ -77,13 +80,16 @@ private sealed class PipelineOutputScope : IPipelineOutputScope { private readonly IPrintProgressExecutor _printProgressExecutor; private readonly IConsoleCoordinator _consoleCoordinator; + private readonly IOutputCoordinator _outputCoordinator; public PipelineOutputScope( IPrintProgressExecutor printProgressExecutor, - IConsoleCoordinator consoleCoordinator) + IConsoleCoordinator consoleCoordinator, + IOutputCoordinator outputCoordinator) { _printProgressExecutor = printProgressExecutor; _consoleCoordinator = consoleCoordinator; + _outputCoordinator = outputCoordinator; } public async ValueTask DisposeAsync() @@ -92,7 +98,10 @@ public async ValueTask DisposeAsync() // 1. Stop progress display FIRST (ends buffering phase) await _printProgressExecutor.DisposeAsync().ConfigureAwait(false); - // 2. Flush buffered module output from coordinator + // 2. Flush deferred module output (in completion order) + await _outputCoordinator.FlushDeferredAsync().ConfigureAwait(false); + + // 3. Flush any unattributed output from coordinator _consoleCoordinator.FlushModuleOutput(); } } diff --git a/src/ModularPipelines/Logging/ModuleLogger.cs b/src/ModularPipelines/Logging/ModuleLogger.cs index f9eadfa7ba..419eab50f0 100644 --- a/src/ModularPipelines/Logging/ModuleLogger.cs +++ b/src/ModularPipelines/Logging/ModuleLogger.cs @@ -173,13 +173,13 @@ public override async ValueTask DisposeAsync() try { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - await _outputCoordinator.EnqueueAndFlushAsync(_buffer, cts.Token).ConfigureAwait(false); + await _outputCoordinator.OnModuleCompletedAsync(_buffer, typeof(T), cts.Token).ConfigureAwait(false); } catch (OperationCanceledException) { // Timeout occurred - log warning, output may be lost _defaultLogger.LogWarning( - "Module output flush timed out after 30 seconds for {ModuleType}. Some output may be lost.", + "Module output handling timed out after 30 seconds for {ModuleType}. Some output may be lost.", typeof(T).Name); } catch (Exception ex) diff --git a/test/ModularPipelines.UnitTests/Console/OutputCoordinatorDeferredFlushTests.cs b/test/ModularPipelines.UnitTests/Console/OutputCoordinatorDeferredFlushTests.cs new file mode 100644 index 0000000000..39b3c96df2 --- /dev/null +++ b/test/ModularPipelines.UnitTests/Console/OutputCoordinatorDeferredFlushTests.cs @@ -0,0 +1,69 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using ModularPipelines.Context; +using ModularPipelines.Extensions; +using ModularPipelines.Modules; +using ModularPipelines.Options; +using ModularPipelines.TestHelpers; + +namespace ModularPipelines.UnitTests.Console; + +public class OutputCoordinatorDeferredFlushTests : TestBase +{ + private class Module1 : Module + { + protected internal override async Task ExecuteAsync(IModuleContext context, CancellationToken cancellationToken) + { + context.Logger.LogInformation("Module1 output"); + await Task.Delay(50, cancellationToken); + return true; + } + } + + private class Module2 : Module + { + protected internal override async Task ExecuteAsync(IModuleContext context, CancellationToken cancellationToken) + { + context.Logger.LogInformation("Module2 output"); + await Task.Yield(); + return true; + } + } + + [Test] + public async Task Pipeline_Completes_When_Progress_Disabled() + { + var host = await TestPipelineHostBuilder.Create() + .ConfigureServices((_, services) => + { + services.Configure(opt => opt.ShowProgressInConsole = false); + }) + .AddModule() + .BuildHostAsync(); + + await using (host) + { + var result = await host.ExecutePipelineAsync(); + await Assert.That(result.Status).IsEqualTo(ModularPipelines.Enums.Status.Successful); + } + } + + [Test] + public async Task Pipeline_With_Multiple_Modules_Completes_Successfully() + { + var host = await TestPipelineHostBuilder.Create() + .ConfigureServices((_, services) => + { + services.Configure(opt => opt.ShowProgressInConsole = false); + }) + .AddModule() + .AddModule() + .BuildHostAsync(); + + await using (host) + { + var result = await host.ExecutePipelineAsync(); + await Assert.That(result.Status).IsEqualTo(ModularPipelines.Enums.Status.Successful); + } + } +}