Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class DistributedJobSettings
{
internal const string StaticPeriod = "00:00:05";
internal const string StaticDelay = "00:01:00";
internal const string StaticMaxExecutionTime = "00:05:00";

/// <summary>
/// Gets or sets a value for the period of checking if there are any runnable distributed jobs.
Expand All @@ -22,4 +23,11 @@ public class DistributedJobSettings
/// </summary>
[DefaultValue(StaticDelay)]
public TimeSpan Delay { get; set; } = TimeSpan.Parse(StaticDelay);

/// <summary>
/// Gets or sets the maximum execution time for a distributed job before it is considered timed out.
/// When a job exceeds this time, it is considered stale and can be picked up by another server for recovery and restarted.
/// </summary>
[DefaultValue(StaticMaxExecutionTime)]
public TimeSpan MaximumExecutionTime { get; set; } = TimeSpan.Parse(StaticMaxExecutionTime);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Diagnostics;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Umbraco.Cms.Core;
Expand All @@ -22,10 +21,6 @@ public class DistributedBackgroundJobHostedService : BackgroundService
/// <summary>
/// Initializes a new instance of the <see cref="DistributedBackgroundJobHostedService"/> class.
/// </summary>
/// <param name="logger"></param>
/// <param name="runtimeState"></param>
/// <param name="distributedJobService"></param>
/// <param name="distributedJobSettings"></param>
public DistributedBackgroundJobHostedService(
ILogger<DistributedBackgroundJobHostedService> logger,
IRuntimeState runtimeState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,30 @@ public interface IDistributedJobRepository
/// Deletes a job.
/// </summary>
void Delete(DistributedBackgroundJobModel distributedBackgroundJob);

/// <summary>
/// Adds multiple jobs in a single batch operation.
/// </summary>
/// <param name="jobs">The jobs to add.</param>
void Add(IEnumerable<DistributedBackgroundJobModel> jobs)
{
// TODO: Delete default implementation in V18
foreach (DistributedBackgroundJobModel job in jobs)
{
Add(job);
}
}

/// <summary>
/// Deletes multiple jobs in a single batch operation.
/// </summary>
/// <param name="jobs">The jobs to delete.</param>
void Delete(IEnumerable<DistributedBackgroundJobModel> jobs)
{
// TODO: Delete default implementation in V18
foreach (DistributedBackgroundJobModel job in jobs)
{
Delete(job);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,40 @@ public void Delete(DistributedBackgroundJobModel distributedBackgroundJob)
}
}

/// <inheritdoc/>
public void Add(IEnumerable<DistributedBackgroundJobModel> jobs)
{
if (scopeAccessor.AmbientScope is null)
{
throw new InvalidOperationException("No scope, could not add distributed jobs");
}

IEnumerable<DistributedJobDto> dtos = jobs.Select(MapToDto);
scopeAccessor.AmbientScope.Database.InsertBulk(dtos);
}

/// <inheritdoc/>
public void Delete(IEnumerable<DistributedBackgroundJobModel> jobs)
{
if (scopeAccessor.AmbientScope is null)
{
throw new InvalidOperationException("No scope, could not delete distributed jobs");
}

var jobIds = jobs.Select(x => x.Id).ToArray();
if (jobIds.Length is 0)
{
return;
}

Sql<ISqlContext> sql = scopeAccessor.AmbientScope.SqlContext.Sql()
.Delete()
.From<DistributedJobDto>()
.WhereIn<DistributedJobDto>(x => x.Id, jobIds);

scopeAccessor.AmbientScope.Database.Execute(sql);
}

private DistributedJobDto MapToDto(DistributedBackgroundJobModel model) =>
new()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Umbraco.Cms.Core;
using Umbraco.Cms.Core.Configuration.Models;
using Umbraco.Cms.Core.DependencyInjection;
using Umbraco.Cms.Core.Scoping;
using Umbraco.Cms.Infrastructure.BackgroundJobs;
using Umbraco.Cms.Infrastructure.Models;
Expand All @@ -14,24 +18,41 @@ public class DistributedJobService : IDistributedJobService
private readonly IDistributedJobRepository _distributedJobRepository;
private readonly IEnumerable<IDistributedBackgroundJob> _distributedBackgroundJobs;
private readonly ILogger<DistributedJobService> _logger;
private readonly DistributedJobSettings _settings;

/// <summary>
/// Initializes a new instance of the <see cref="DistributedJobService"/> class.
/// </summary>
/// <param name="coreScopeProvider"></param>
/// <param name="distributedJobRepository"></param>
/// <param name="distributedBackgroundJobs"></param>
/// <param name="logger"></param>
[Obsolete("Use the constructor that accepts IOptions<DistributedJobSettings>. Scheduled for removal in V18.")]
public DistributedJobService(
ICoreScopeProvider coreScopeProvider,
IDistributedJobRepository distributedJobRepository,
IEnumerable<IDistributedBackgroundJob> distributedBackgroundJobs,
ILogger<DistributedJobService> logger)
: this(
coreScopeProvider,
distributedJobRepository,
distributedBackgroundJobs,
logger,
StaticServiceProvider.Instance.GetRequiredService<IOptions<DistributedJobSettings>>())
{
}

/// <summary>
/// Initializes a new instance of the <see cref="DistributedJobService"/> class.
/// </summary>
public DistributedJobService(
ICoreScopeProvider coreScopeProvider,
IDistributedJobRepository distributedJobRepository,
IEnumerable<IDistributedBackgroundJob> distributedBackgroundJobs,
ILogger<DistributedJobService> logger,
IOptions<DistributedJobSettings> settings)
{
_coreScopeProvider = coreScopeProvider;
_distributedJobRepository = distributedJobRepository;
_distributedBackgroundJobs = distributedBackgroundJobs;
_logger = logger;
_settings = settings.Value;
}

/// <inheritdoc />
Expand All @@ -42,7 +63,8 @@ public DistributedJobService(
scope.EagerWriteLock(Constants.Locks.DistributedJobs);

IEnumerable<DistributedBackgroundJobModel> jobs = _distributedJobRepository.GetAll();
DistributedBackgroundJobModel? job = jobs.FirstOrDefault(x => x.LastRun < DateTime.UtcNow - x.Period);
DistributedBackgroundJobModel? job = jobs.FirstOrDefault(x => x.LastRun < DateTime.UtcNow - x.Period
&& (x.IsRunning is false || x.LastAttemptedRun < DateTime.UtcNow - x.Period - _settings.MaximumExecutionTime));

if (job is null)
{
Expand Down Expand Up @@ -93,45 +115,64 @@ public async Task FinishAsync(string jobName)
/// <inheritdoc />
public async Task EnsureJobsAsync()
{
// Pre-compute registered job data outside the lock to minimize lock hold time
var registeredJobsByName = _distributedBackgroundJobs.ToDictionary(x => x.Name, x => x.Period);

// Early exit if no registered jobs
if (registeredJobsByName.Count is 0)
{
return;
}

using ICoreScope scope = _coreScopeProvider.CreateCoreScope();
scope.WriteLock(Constants.Locks.DistributedJobs);

DistributedBackgroundJobModel[] existingJobs = _distributedJobRepository.GetAll().ToArray();
var existingJobsByName = existingJobs.ToDictionary(x => x.Name);

foreach (IDistributedBackgroundJob registeredJob in _distributedBackgroundJobs)
// Collect all changes first, then execute - minimizes time spent in the critical section
var jobsToAdd = new List<DistributedBackgroundJobModel>();
DateTime utcNow = DateTime.UtcNow;

foreach (KeyValuePair<string, TimeSpan> registeredJob in registeredJobsByName)
{
if (existingJobsByName.TryGetValue(registeredJob.Name, out DistributedBackgroundJobModel? existingJob))
if (existingJobsByName.TryGetValue(registeredJob.Key, out DistributedBackgroundJobModel? existingJob))
{
// Update if period has changed
if (existingJob.Period != registeredJob.Period)
// Update only if period has actually changed
if (existingJob.Period != registeredJob.Value)
{
existingJob.Period = registeredJob.Period;
existingJob.Period = registeredJob.Value;
_distributedJobRepository.Update(existingJob);
}
}
else
{
// Add new job (fresh install or newly registered job)
var newJob = new DistributedBackgroundJobModel
// Collect new jobs for batch insert
jobsToAdd.Add(new DistributedBackgroundJobModel
{
Name = registeredJob.Name,
Period = registeredJob.Period,
LastRun = DateTime.UtcNow,
Name = registeredJob.Key,
Period = registeredJob.Value,
LastRun = utcNow,
IsRunning = false,
LastAttemptedRun = DateTime.UtcNow,
};
_distributedJobRepository.Add(newJob);
LastAttemptedRun = utcNow,
});
}
}

// Remove jobs that are no longer registered in code
var registeredJobNames = _distributedBackgroundJobs.Select(x => x.Name).ToHashSet();
IEnumerable<DistributedBackgroundJobModel> jobsToRemove = existingJobs.Where(x => registeredJobNames.Contains(x.Name) is false);
// Batch insert new jobs
if (jobsToAdd.Count > 0)
{
_distributedJobRepository.Add(jobsToAdd);
}

// Batch delete jobs that are no longer registered
var jobsToRemove = existingJobs
.Where(x => registeredJobsByName.ContainsKey(x.Name) is false)
.ToList();

foreach (DistributedBackgroundJobModel jobToRemove in jobsToRemove)
if (jobsToRemove.Count > 0)
{
_distributedJobRepository.Delete(jobToRemove);
_distributedJobRepository.Delete(jobsToRemove);
}

scope.Complete();
Expand Down
Loading
Loading