Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/how-to/parallelization.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class BuildProjectModule : Module

public record MyParallelLimit : IParallelLimit
{
public int Limit => 2;
public static int Limit => 2;
}
```

Expand Down
39 changes: 34 additions & 5 deletions src/ModularPipelines/Attributes/ParallelLimiterAttribute.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,56 @@
using ModularPipelines.Interfaces;
using ModularPipelines.Helpers;
using ModularPipelines.Interfaces;
using Semaphores;

namespace ModularPipelines.Attributes;

/// <summary>
/// Specifies a parallel execution limit for a module using a strongly-typed limit class.
/// </summary>
/// <typeparam name="TParallelLimit">The type implementing <see cref="IParallelLimit"/>.</typeparam>
[AttributeUsage(AttributeTargets.Assembly | AttributeTargets.Class | AttributeTargets.Method)]
public sealed class ParallelLimiterAttribute<TParallelLimit> : ParallelLimiterAttribute
where TParallelLimit : IParallelLimit, new()
where TParallelLimit : IParallelLimit
{
public ParallelLimiterAttribute() : base(typeof(TParallelLimit))
{
}

/// <inheritdoc />
internal override AsyncSemaphore GetLock(IParallelLimitProvider provider)
{
return provider.GetLock<TParallelLimit>();
}
}

public class ParallelLimiterAttribute : Attribute
/// <summary>
/// Base attribute for specifying parallel execution limits.
/// </summary>
public abstract class ParallelLimiterAttribute : Attribute
{
/// <summary>
/// Gets the type implementing <see cref="IParallelLimit"/>.
/// </summary>
public Type Type { get; }

public ParallelLimiterAttribute(Type type)
/// <summary>
/// Initializes a new instance of the <see cref="ParallelLimiterAttribute"/> class.
/// </summary>
/// <param name="type">The type implementing <see cref="IParallelLimit"/>.</param>
protected ParallelLimiterAttribute(Type type)
{
if (!type.IsAssignableTo(typeof(IParallelLimit)))
{
throw new Exception("Type must be of IParallelLimit");
throw new ArgumentException("Type must implement IParallelLimit", nameof(type));
}

Type = type;
}

/// <summary>
/// Gets the semaphore lock from the provider without reflection.
/// </summary>
/// <param name="provider">The parallel limit provider.</param>
/// <returns>The semaphore for this limit type.</returns>
internal abstract AsyncSemaphore GetLock(IParallelLimitProvider provider);
}
68 changes: 68 additions & 0 deletions src/ModularPipelines/Engine/Execution/ExecutionContextFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System.Collections.Concurrent;
using System.Linq.Expressions;
using ModularPipelines.Models;
using ModularPipelines.Modules;

namespace ModularPipelines.Engine.Execution;

/// <summary>
/// Factory for creating ModuleExecutionContext instances without reflection.
/// </summary>
/// <remarks>
/// Replaces Activator.CreateInstance with compiled expression trees for better performance.
/// </remarks>
internal static class ExecutionContextFactory
{
/// <summary>
/// Delegate signature for creating a ModuleExecutionContext.
/// </summary>
internal delegate ModuleExecutionContext CreateContextDelegate(IModule module, Type moduleType);

private static readonly ConcurrentDictionary<Type, CreateContextDelegate> ContextFactoryCache = new();

/// <summary>
/// Creates a ModuleExecutionContext for the specified module.
/// </summary>
/// <param name="module">The module instance.</param>
/// <param name="moduleType">The type of the module.</param>
/// <returns>A typed ModuleExecutionContext.</returns>
public static ModuleExecutionContext Create(IModule module, Type moduleType)
{
var resultType = module.ResultType;
var factory = ContextFactoryCache.GetOrAdd(resultType, CreateFactory);
return factory(module, moduleType);
}

private static CreateContextDelegate CreateFactory(Type resultType)
{
// Parameters
var moduleParam = Expression.Parameter(typeof(IModule), "module");
var moduleTypeParam = Expression.Parameter(typeof(Type), "moduleType");

// Get the generic types
var contextType = typeof(ModuleExecutionContext<>).MakeGenericType(resultType);
var typedModuleType = typeof(Module<>).MakeGenericType(resultType);

// Find the constructor: ModuleExecutionContext<T>(Module<T> module, Type moduleType)
var constructor = contextType.GetConstructor(new[] { typedModuleType, typeof(Type) })
?? throw new InvalidOperationException(
$"Could not find constructor for {contextType.Name} with (Module<{resultType.Name}>, Type) parameters.");

// Cast module to Module<T>
var castModule = Expression.Convert(moduleParam, typedModuleType);

// Create new ModuleExecutionContext<T>((Module<T>)module, moduleType)
var newContext = Expression.New(constructor, castModule, moduleTypeParam);

// Cast to base type
var castToBase = Expression.Convert(newContext, typeof(ModuleExecutionContext));

// Create and compile the lambda
var lambda = Expression.Lambda<CreateContextDelegate>(
castToBase,
moduleParam,
moduleTypeParam);

return lambda.Compile();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
using System.Collections.Concurrent;
using System.Linq.Expressions;
using System.Reflection;
using ModularPipelines.Context;
using ModularPipelines.Models;
using ModularPipelines.Modules;

namespace ModularPipelines.Engine.Execution;

/// <summary>
/// Factory for creating cached delegates to execute modules without runtime reflection.
/// </summary>
/// <remarks>
/// This class replaces the reflection-heavy pattern of using MakeGenericMethod and GetProperty("Result")
/// with compiled expression trees that are cached per result type.
/// </remarks>
internal static class ModuleExecutionDelegateFactory
{
/// <summary>
/// Delegate signature for executing a module and returning its result.
/// </summary>
internal delegate Task<IModuleResult> ExecuteModuleDelegate(
IModuleExecutionPipeline pipeline,
IModule module,
ModuleExecutionContext executionContext,
IModuleContext moduleContext,
CancellationToken cancellationToken);

private static readonly ConcurrentDictionary<Type, ExecuteModuleDelegate> ExecutorCache = new();

/// <summary>
/// Gets a cached delegate for executing a module with the specified result type.
/// </summary>
/// <param name="resultType">The result type of the module (T in Module&lt;T&gt;).</param>
/// <returns>A delegate that executes the module and returns its result.</returns>
public static ExecuteModuleDelegate GetExecutor(Type resultType)
{
return ExecutorCache.GetOrAdd(resultType, CreateExecutor);
}

private static ExecuteModuleDelegate CreateExecutor(Type resultType)
{
// Parameters for the delegate
var pipelineParam = Expression.Parameter(typeof(IModuleExecutionPipeline), "pipeline");
var moduleParam = Expression.Parameter(typeof(IModule), "module");
var contextParam = Expression.Parameter(typeof(ModuleExecutionContext), "executionContext");
var moduleContextParam = Expression.Parameter(typeof(IModuleContext), "moduleContext");
var cancellationTokenParam = Expression.Parameter(typeof(CancellationToken), "cancellationToken");

// Get the generic types
var moduleType = typeof(Module<>).MakeGenericType(resultType);
var executionContextType = typeof(ModuleExecutionContext<>).MakeGenericType(resultType);
var moduleResultType = typeof(ModuleResult<>).MakeGenericType(resultType);
var taskType = typeof(Task<>).MakeGenericType(moduleResultType);

// Cast module to Module<T>
var castModule = Expression.Convert(moduleParam, moduleType);

// Cast executionContext to ModuleExecutionContext<T>
var castContext = Expression.Convert(contextParam, executionContextType);

// Get the ExecuteAsync method
var executeMethod = typeof(IModuleExecutionPipeline)
.GetMethod(nameof(IModuleExecutionPipeline.ExecuteAsync))!
.MakeGenericMethod(resultType);

// Call pipeline.ExecuteAsync<T>(module, executionContext, moduleContext, cancellationToken)
var callExecute = Expression.Call(
pipelineParam,
executeMethod,
castModule,
castContext,
moduleContextParam,
cancellationTokenParam);

// We need to create an async wrapper that awaits the task and casts the result to IModuleResult
// Since Expression trees can't directly represent async/await, we'll use a helper method
var helperMethod = typeof(ModuleExecutionDelegateFactory)
.GetMethod(nameof(ExecuteAndCastAsync), BindingFlags.NonPublic | BindingFlags.Static)!
.MakeGenericMethod(resultType);

var callHelper = Expression.Call(
helperMethod,
pipelineParam,
castModule,
castContext,
moduleContextParam,
cancellationTokenParam);

// Create and compile the lambda
var lambda = Expression.Lambda<ExecuteModuleDelegate>(
callHelper,
pipelineParam,
moduleParam,
contextParam,
moduleContextParam,
cancellationTokenParam);

return lambda.Compile();
}

private static async Task<IModuleResult> ExecuteAndCastAsync<T>(
IModuleExecutionPipeline pipeline,
Module<T> module,
ModuleExecutionContext<T> executionContext,
IModuleContext moduleContext,
CancellationToken cancellationToken)
{
var result = await pipeline.ExecuteAsync(module, executionContext, moduleContext, cancellationToken)
.ConfigureAwait(false);
return result;
}
}
110 changes: 110 additions & 0 deletions src/ModularPipelines/Engine/Execution/ModuleResultFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
using System.Collections.Concurrent;
using System.Linq.Expressions;
using System.Reflection;
using ModularPipelines.Models;
using ModularPipelines.Modules;

namespace ModularPipelines.Engine.Execution;

/// <summary>
/// Factory for creating ModuleResult instances without reflection.
/// </summary>
/// <remarks>
/// Replaces reflection-based constructor invocation with compiled expression trees.
/// </remarks>
internal static class ModuleResultFactory
{
/// <summary>
/// Delegate for creating a ModuleResult with a null value (for skipped modules).
/// </summary>
internal delegate IModuleResult CreateSkippedResultDelegate(ModuleExecutionContext executionContext);

/// <summary>
/// Delegate for creating a ModuleResult with an exception.
/// </summary>
internal delegate IModuleResult CreateExceptionResultDelegate(Exception exception, ModuleExecutionContext executionContext);

private static readonly ConcurrentDictionary<Type, CreateSkippedResultDelegate> SkippedResultCache = new();
private static readonly ConcurrentDictionary<Type, CreateExceptionResultDelegate> ExceptionResultCache = new();

/// <summary>
/// Creates a skipped ModuleResult for the specified result type.
/// </summary>
public static IModuleResult CreateSkipped(Type resultType, ModuleExecutionContext executionContext)
{
var factory = SkippedResultCache.GetOrAdd(resultType, CreateSkippedFactory);
return factory(executionContext);
}

/// <summary>
/// Creates an exception ModuleResult for the specified result type.
/// </summary>
public static IModuleResult CreateException(Type resultType, Exception exception, ModuleExecutionContext executionContext)
{
var factory = ExceptionResultCache.GetOrAdd(resultType, CreateExceptionFactory);
return factory(exception, executionContext);
}

private static CreateSkippedResultDelegate CreateSkippedFactory(Type resultType)
{
var contextParam = Expression.Parameter(typeof(ModuleExecutionContext), "executionContext");
var resultGenericType = typeof(ModuleResult<>).MakeGenericType(resultType);
var typedContextType = typeof(ModuleExecutionContext<>).MakeGenericType(resultType);

// Cast to typed context
var castContext = Expression.Convert(contextParam, typedContextType);

// Find the internal constructor: ModuleResult<T>(T? value, ModuleExecutionContext context)
var constructor = resultGenericType.GetConstructor(
BindingFlags.NonPublic | BindingFlags.Instance,
null,
new[] { resultType, typeof(ModuleExecutionContext) },
null);

if (constructor == null)
{
throw new InvalidOperationException(
$"Could not find internal constructor for ModuleResult<{resultType.Name}>(T?, ModuleExecutionContext)");
}

// Create: new ModuleResult<T>(default(T), executionContext)
var defaultValue = Expression.Default(resultType);
var newResult = Expression.New(constructor, defaultValue, contextParam);

// Cast to IModuleResult
var castToInterface = Expression.Convert(newResult, typeof(IModuleResult));

var lambda = Expression.Lambda<CreateSkippedResultDelegate>(castToInterface, contextParam);
return lambda.Compile();
}

private static CreateExceptionResultDelegate CreateExceptionFactory(Type resultType)
{
var exceptionParam = Expression.Parameter(typeof(Exception), "exception");
var contextParam = Expression.Parameter(typeof(ModuleExecutionContext), "executionContext");
var resultGenericType = typeof(ModuleResult<>).MakeGenericType(resultType);

// Find the internal constructor: ModuleResult<T>(Exception exception, ModuleExecutionContext context)
// Note: The constructor takes the base class ModuleExecutionContext, not ModuleExecutionContext<T>
var constructor = resultGenericType.GetConstructor(
BindingFlags.NonPublic | BindingFlags.Instance,
null,
new[] { typeof(Exception), typeof(ModuleExecutionContext) },
null);

if (constructor == null)
{
throw new InvalidOperationException(
$"Could not find internal constructor for ModuleResult<{resultType.Name}>(Exception, ModuleExecutionContext)");
}

// Create: new ModuleResult<T>(exception, executionContext)
var newResult = Expression.New(constructor, exceptionParam, contextParam);

// Cast to IModuleResult
var castToInterface = Expression.Convert(newResult, typeof(IModuleResult));

var lambda = Expression.Lambda<CreateExceptionResultDelegate>(castToInterface, exceptionParam, contextParam);
return lambda.Compile();
}
}
15 changes: 4 additions & 11 deletions src/ModularPipelines/Engine/Execution/ModuleResultRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,13 @@ public void RegisterTerminatedResult(IModule module, Type moduleType, Exception
{
var resultType = module.ResultType;

// Create execution context with PipelineTerminated status
var contextType = typeof(ModuleExecutionContext<>).MakeGenericType(resultType);
var executionContext = (ModuleExecutionContext)Activator.CreateInstance(contextType, module, moduleType)!;
// Create execution context with PipelineTerminated status using compiled delegate factory
var executionContext = ExecutionContextFactory.Create(module, moduleType);
executionContext.Status = Enums.Status.PipelineTerminated;
executionContext.Exception = exception;

// Create ModuleResult<T> with the exception
var resultGenericType = typeof(ModuleResult<>).MakeGenericType(resultType);
var result = (IModuleResult)Activator.CreateInstance(
resultGenericType,
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance,
null,
new object[] { exception, executionContext },
null)!;
// Create ModuleResult<T> with the exception using compiled delegate factory
var result = ModuleResultFactory.CreateException(resultType, exception, executionContext);

_resultRegistry.RegisterResult(moduleType, result);
}
Expand Down
Loading
Loading