Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions src/ModularPipelines/Console/ConsoleCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ public async Task<IProgressSession> BeginProgressAsync(
if (!_options.Value.ShowProgressInConsole)
{
// Return a no-op session if progress is disabled
// Explicitly ensure deferred output is disabled for consistency
_outputCoordinator.SetProgressActive(false);
_activeSession = new NoOpProgressSession();
return _activeSession;
}

ProgressSession session;

lock (_phaseLock)
{
if (_isProgressActive)
Expand All @@ -219,19 +223,24 @@ public async Task<IProgressSession> BeginProgressAsync(
}

_isProgressActive = true;
}

var session = new ProgressSession(
this,
modules,
_options,
_loggerFactory,
cancellationToken);
// CRITICAL: Set OutputCoordinator's progress state inside the lock
// to prevent race conditions where a module completes between
// _isProgressActive = true and OutputCoordinator being notified
_outputCoordinator.SetProgressActive(true);

// Wire up the progress controller for output coordination
_outputCoordinator.SetProgressController(session);
session = new ProgressSession(
this,
modules,
_options,
_loggerFactory,
cancellationToken);

_activeSession = session;
// Wire up the progress controller for output coordination
_outputCoordinator.SetProgressController(session);

_activeSession = session;
}

// Start the progress display
session.Start();
Expand All @@ -247,6 +256,7 @@ internal void EndProgressPhase()
lock (_phaseLock)
{
_outputCoordinator.SetProgressController(NoOpProgressController.Instance);
_outputCoordinator.SetProgressActive(false);
_isProgressActive = false;
_activeSession = null;
}
Expand Down
22 changes: 22 additions & 0 deletions src/ModularPipelines/Console/IOutputCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,26 @@ internal interface IOutputCoordinator
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A task that completes when the buffer has been flushed.</returns>
Task EnqueueAndFlushAsync(IModuleOutputBuffer buffer, CancellationToken cancellationToken = default);

/// <summary>
/// Sets whether live progress is currently active.
/// When active, module output is deferred until pipeline end.
/// </summary>
/// <param name="isActive">True when progress display is running.</param>
void SetProgressActive(bool isActive);

/// <summary>
/// Called when a module completes. Decides whether to flush immediately or defer.
/// </summary>
/// <param name="buffer">The module's output buffer.</param>
/// <param name="moduleType">The type of the completed module.</param>
/// <param name="cancellationToken">Cancellation token.</param>
Task OnModuleCompletedAsync(IModuleOutputBuffer buffer, Type moduleType, CancellationToken cancellationToken = default);

/// <summary>
/// Flushes all deferred output in module completion order.
/// Called after progress ends, before results are printed.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
Task FlushDeferredAsync(CancellationToken cancellationToken = default);
}
125 changes: 125 additions & 0 deletions src/ModularPipelines/Console/OutputCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,21 @@ internal sealed class OutputCoordinator : IOutputCoordinator
private readonly Queue<PendingFlush> _pendingQueue = new();
private readonly SemaphoreSlim _writeLock = new(1, 1);

// Separate lock for deferred output operations to reduce contention
// with immediate flush operations that use _queueLock
private readonly object _deferredLock = new();

private IProgressController _progressController = NoOpProgressController.Instance;
private bool _isProcessingQueue;
private volatile bool _isFlushingOutput;
private volatile bool _isProgressActive;
private readonly List<DeferredModuleOutput> _deferredOutputs = new();

private readonly record struct DeferredModuleOutput(
IModuleOutputBuffer Buffer,
Type ModuleType,
DateTimeOffset CompletedAt
);

public OutputCoordinator(
IBuildSystemFormatterProvider formatterProvider,
Expand All @@ -45,6 +57,119 @@ public void SetProgressController(IProgressController controller)
_progressController = controller;
}

/// <inheritdoc />
public void SetProgressActive(bool isActive)
{
lock (_deferredLock)
{
if (isActive)
{
// Starting a new progress session - check for stale deferred outputs
// from a previous run that crashed before FlushDeferredAsync was called
if (_deferredOutputs.Count > 0)
{
_logger.LogWarning(
"Found {Count} stale deferred outputs from a previous pipeline run. " +
"This indicates FlushDeferredAsync was not called. Clearing to prevent memory leak.",
_deferredOutputs.Count);
_deferredOutputs.Clear();
}
}
else
{
// Progress ending - clean up any remaining deferred outputs that weren't flushed
// This handles cancellation scenarios where FlushDeferredAsync wasn't called
if (_deferredOutputs.Count > 0)
{
_logger.LogWarning(
"Progress ended with {Count} unflushed deferred outputs. " +
"This indicates FlushDeferredAsync was not called (possibly due to cancellation). " +
"Clearing to prevent memory leak.",
_deferredOutputs.Count);
_deferredOutputs.Clear();
}
}
}

_isProgressActive = isActive;
}

/// <inheritdoc />
public async Task OnModuleCompletedAsync(IModuleOutputBuffer buffer, Type moduleType, CancellationToken cancellationToken = default)
{
if (!buffer.HasOutput)
{
return;
}

if (_isProgressActive)
{
// Progress is active - defer output until pipeline end
// Uses separate lock to avoid contention with immediate flush operations
lock (_deferredLock)
{
_deferredOutputs.Add(new DeferredModuleOutput(
buffer,
moduleType,
DateTimeOffset.UtcNow
));
}
}
else
{
// No progress - flush immediately (existing behavior)
await EnqueueAndFlushAsync(buffer, cancellationToken).ConfigureAwait(false);
}
}

/// <inheritdoc />
public async Task FlushDeferredAsync(CancellationToken cancellationToken = default)
{
List<DeferredModuleOutput> toFlush;

lock (_deferredLock)
{
if (_deferredOutputs.Count == 0)
{
return;
}

// Order by completion time to maintain consistent output ordering
toFlush = _deferredOutputs
.OrderBy(x => x.CompletedAt)
.ToList();
_deferredOutputs.Clear();
}

var formatter = _formatterProvider.GetFormatter();

await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
_isFlushingOutput = true;
try
{
foreach (var output in toFlush)
{
if (cancellationToken.IsCancellationRequested)
{
break;
}

FlushBuffer(output.Buffer, formatter);
}
}
finally
{
_isFlushingOutput = false;
}
}
finally
{
_writeLock.Release();
}
}

/// <inheritdoc />
public async Task EnqueueAndFlushAsync(IModuleOutputBuffer buffer, CancellationToken cancellationToken = default)
{
Expand Down
17 changes: 13 additions & 4 deletions src/ModularPipelines/Engine/Executors/PipelineOutputCoordinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@ internal class PipelineOutputCoordinator : IPipelineOutputCoordinator
private readonly IInternalSummaryLogger _summaryLogger;
private readonly IExceptionBuffer _exceptionBuffer;
private readonly IConsoleCoordinator _consoleCoordinator;
private readonly IOutputCoordinator _outputCoordinator;

public PipelineOutputCoordinator(
IPrintProgressExecutor printProgressExecutor,
IConsolePrinter consolePrinter,
IInternalSummaryLogger summaryLogger,
IExceptionBuffer exceptionBuffer,
IConsoleCoordinator consoleCoordinator)
IConsoleCoordinator consoleCoordinator,
IOutputCoordinator outputCoordinator)
{
_printProgressExecutor = printProgressExecutor;
_consolePrinter = consolePrinter;
_summaryLogger = summaryLogger;
_exceptionBuffer = exceptionBuffer;
_consoleCoordinator = consoleCoordinator;
_outputCoordinator = outputCoordinator;
}

/// <inheritdoc />
Expand All @@ -48,7 +51,7 @@ public async Task<IPipelineOutputScope> InitializeAsync()
_consoleCoordinator.Install();

var printProgressExecutor = await _printProgressExecutor.InitializeAsync().ConfigureAwait(false);
return new PipelineOutputScope(printProgressExecutor, _consoleCoordinator);
return new PipelineOutputScope(printProgressExecutor, _consoleCoordinator, _outputCoordinator);
}

/// <inheritdoc />
Expand Down Expand Up @@ -77,13 +80,16 @@ private sealed class PipelineOutputScope : IPipelineOutputScope
{
private readonly IPrintProgressExecutor _printProgressExecutor;
private readonly IConsoleCoordinator _consoleCoordinator;
private readonly IOutputCoordinator _outputCoordinator;

public PipelineOutputScope(
IPrintProgressExecutor printProgressExecutor,
IConsoleCoordinator consoleCoordinator)
IConsoleCoordinator consoleCoordinator,
IOutputCoordinator outputCoordinator)
{
_printProgressExecutor = printProgressExecutor;
_consoleCoordinator = consoleCoordinator;
_outputCoordinator = outputCoordinator;
}

public async ValueTask DisposeAsync()
Expand All @@ -92,7 +98,10 @@ public async ValueTask DisposeAsync()
// 1. Stop progress display FIRST (ends buffering phase)
await _printProgressExecutor.DisposeAsync().ConfigureAwait(false);

// 2. Flush buffered module output from coordinator
// 2. Flush deferred module output (in completion order)
await _outputCoordinator.FlushDeferredAsync().ConfigureAwait(false);

// 3. Flush any unattributed output from coordinator
_consoleCoordinator.FlushModuleOutput();
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/ModularPipelines/Logging/ModuleLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ public override async ValueTask DisposeAsync()
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await _outputCoordinator.EnqueueAndFlushAsync(_buffer, cts.Token).ConfigureAwait(false);
await _outputCoordinator.OnModuleCompletedAsync(_buffer, typeof(T), cts.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Timeout occurred - log warning, output may be lost
_defaultLogger.LogWarning(
"Module output flush timed out after 30 seconds for {ModuleType}. Some output may be lost.",
"Module output handling timed out after 30 seconds for {ModuleType}. Some output may be lost.",
typeof(T).Name);
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ModularPipelines.Context;
using ModularPipelines.Extensions;
using ModularPipelines.Modules;
using ModularPipelines.Options;
using ModularPipelines.TestHelpers;

namespace ModularPipelines.UnitTests.Console;

public class OutputCoordinatorDeferredFlushTests : TestBase
{
private class Module1 : Module<bool>
{
protected internal override async Task<bool> ExecuteAsync(IModuleContext context, CancellationToken cancellationToken)
{
context.Logger.LogInformation("Module1 output");
await Task.Delay(50, cancellationToken);
return true;
}
}

private class Module2 : Module<bool>
{
protected internal override async Task<bool> ExecuteAsync(IModuleContext context, CancellationToken cancellationToken)
{
context.Logger.LogInformation("Module2 output");
await Task.Yield();
return true;
}
}

[Test]
public async Task Pipeline_Completes_When_Progress_Disabled()
{
var host = await TestPipelineHostBuilder.Create()
.ConfigureServices((_, services) =>
{
services.Configure<PipelineOptions>(opt => opt.ShowProgressInConsole = false);
})
.AddModule<Module1>()
.BuildHostAsync();

await using (host)
{
var result = await host.ExecutePipelineAsync();
await Assert.That(result.Status).IsEqualTo(ModularPipelines.Enums.Status.Successful);
}
}

[Test]
public async Task Pipeline_With_Multiple_Modules_Completes_Successfully()
{
var host = await TestPipelineHostBuilder.Create()
.ConfigureServices((_, services) =>
{
services.Configure<PipelineOptions>(opt => opt.ShowProgressInConsole = false);
})
.AddModule<Module1>()
.AddModule<Module2>()
.BuildHostAsync();

await using (host)
{
var result = await host.ExecutePipelineAsync();
await Assert.That(result.Status).IsEqualTo(ModularPipelines.Enums.Status.Successful);
}
}
}
Loading