Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/Runner.Common/BrokerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public interface IBrokerServer : IRunnerService

Task<TaskAgentMessage> GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token);

Task AcknowledgeRunnerRequestAsync(string runnerRequestId, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, CancellationToken token);

Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials);

Task ForceRefreshConnection(VssCredentials credentials);
Expand Down Expand Up @@ -67,10 +69,17 @@ public Task<TaskAgentMessage> GetRunnerMessageAsync(Guid? sessionId, TaskAgentSt
var brokerSession = RetryRequest<TaskAgentMessage>(
async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken, shouldRetry: ShouldRetryException);


return brokerSession;
}

public async Task AcknowledgeRunnerRequestAsync(string runnerRequestId, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, CancellationToken cancellationToken)
{
CheckConnection();

// No retries
await _brokerHttpClient.AcknowledgeRunnerRequestAsync(runnerRequestId, sessionId, version, status, os, architecture, cancellationToken);
}

public async Task DeleteSessionAsync(CancellationToken cancellationToken)
{
CheckConnection();
Expand Down
14 changes: 7 additions & 7 deletions src/Runner.Common/RunnerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected async Task<VssConnection> EstablishVssConnection(Uri serverUrl, VssCre

protected async Task RetryRequest(Func<Task> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5,
int maxAttempts = 5,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous description was a misnomer. This value indicates maximum number of attempts (i.e. initial attempt + retries).

Func<Exception, bool> shouldRetry = null
)
{
Expand All @@ -79,31 +79,31 @@ async Task<Unit> wrappedFunc()
await func();
return Unit.Value;
}
await RetryRequest<Unit>(wrappedFunc, cancellationToken, maxRetryAttemptsCount, shouldRetry);
await RetryRequest<Unit>(wrappedFunc, cancellationToken, maxAttempts, shouldRetry);
}

protected async Task<T> RetryRequest<T>(Func<Task<T>> func,
CancellationToken cancellationToken,
int maxRetryAttemptsCount = 5,
int maxAttempts = 5,
Func<Exception, bool> shouldRetry = null
)
{
var retryCount = 0;
var attempt = 0;
while (true)
{
retryCount++;
attempt++;
cancellationToken.ThrowIfCancellationRequested();
try
{
return await func();
}
// TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122
catch (Exception ex) when (retryCount < maxRetryAttemptsCount && (shouldRetry == null || shouldRetry(ex)))
catch (Exception ex) when (attempt < maxAttempts && (shouldRetry == null || shouldRetry(ex)))
{
Trace.Error("Catch exception during request");
Trace.Error(ex);
var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15));
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left.");
Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxAttempts - attempt} attempt left.");
await Task.Delay(backOff, cancellationToken);
}
}
Expand Down
21 changes: 18 additions & 3 deletions src/Runner.Listener/BrokerMessageListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public sealed class BrokerMessageListener : RunnerService, IMessageListener
private RunnerSettings _settings;
private ITerminal _term;
private TimeSpan _getNextMessageRetryInterval;
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private TaskAgentStatus _runnerStatus = TaskAgentStatus.Online;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

private CancellationTokenSource _getMessagesTokenSource;
private VssCredentials _creds;
private VssCredentials _credsV2;
Expand Down Expand Up @@ -258,7 +258,7 @@ public async Task DeleteSessionAsync()
public void OnJobStatus(object sender, JobStatusEventArgs e)
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
_runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
Expand Down Expand Up @@ -291,7 +291,7 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
}

message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId,
runnerStatus,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
Expand Down Expand Up @@ -417,6 +417,21 @@ public async Task DeleteMessageAsync(TaskAgentMessage message)
await Task.CompletedTask;
}

public async Task AcknowledgeMessageAsync(string runnerRequestId, CancellationToken cancellationToken)
{
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); // Short timeout
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 sec is meant to be short timeout in long run as well? do we do such things at any place in code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can refine this further if needed. I just wanted to keep it short so it doesn't interfere with actually acquiring the job.

This new message ACK is best-effort anyway. It doesn't have to be 100%, it just has to be good enough to help us distinguish acquire timeout reason for the most part.

Trace.Info($"Acknowledging runner request '{runnerRequestId}'.");
await _brokerServer.AcknowledgeRunnerRequestAsync(
runnerRequestId,
_session.SessionId,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
linkedCts.Token);
}

private bool IsGetNextMessageExceptionRetriable(Exception ex)
{
if (ex is TaskAgentNotFoundException ||
Expand Down
24 changes: 20 additions & 4 deletions src/Runner.Listener/MessageListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public interface IMessageListener : IRunnerService
Task DeleteSessionAsync();
Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token);
Task DeleteMessageAsync(TaskAgentMessage message);
Task AcknowledgeMessageAsync(string runnerRequestId, CancellationToken cancellationToken);

Task RefreshListenerTokenAsync();
void OnJobStatus(object sender, JobStatusEventArgs e);
Expand All @@ -52,7 +53,7 @@ public sealed class MessageListener : RunnerService, IMessageListener
private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4);
private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30);
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new();
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private TaskAgentStatus _runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;
private VssCredentials _creds;
private VssCredentials _credsV2;
Expand Down Expand Up @@ -217,7 +218,7 @@ public async Task DeleteSessionAsync()
public void OnJobStatus(object sender, JobStatusEventArgs e)
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
_runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
Expand Down Expand Up @@ -250,7 +251,7 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
_session.SessionId,
_lastMessageId,
runnerStatus,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
Expand All @@ -274,7 +275,7 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
}

message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId,
runnerStatus,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
Expand Down Expand Up @@ -437,6 +438,21 @@ public async Task RefreshListenerTokenAsync()
await _brokerServer.ForceRefreshConnection(_credsV2);
}

public async Task AcknowledgeMessageAsync(string runnerRequestId, CancellationToken cancellationToken)
{
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); // Short timeout
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
Trace.Info($"Acknowledging runner request '{runnerRequestId}'.");
await _brokerServer.AcknowledgeRunnerRequestAsync(
runnerRequestId,
_session.SessionId,
_runnerStatus,
BuildConstants.RunnerPackage.Version,
VarUtil.OS,
VarUtil.OSArchitecture,
linkedCts.Token);
}

private TaskAgentMessage DecryptMessage(TaskAgentMessage message)
{
if (_session.EncryptionKey == null ||
Expand Down
29 changes: 26 additions & 3 deletions src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -654,22 +654,42 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
else
{
var messageRef = StringUtil.ConvertFromJson<RunnerJobRequestRef>(message.Body);
Pipelines.AgentJobRequestMessage jobRequestMessage = null;

// Create connection
var credMgr = HostContext.GetService<ICredentialManager>();
// Acknowledge (best-effort)
if (messageRef.ShouldAcknowledge) // Temporary feature flag
{
try
{
await _listener.AcknowledgeMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
catch (Exception ex)
{
Trace.Error($"Best-effort acknowledge failed for request '{messageRef.RunnerRequestId}'");
Trace.Error(ex);
}
}

Pipelines.AgentJobRequestMessage jobRequestMessage = null;
if (string.IsNullOrEmpty(messageRef.RunServiceUrl))
{
// Connect
var credMgr = HostContext.GetService<ICredentialManager>();
var creds = credMgr.LoadCredentials(allowAuthUrlV2: false);
var actionsRunServer = HostContext.CreateService<IActionsRunServer>();
await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds);

// Get job message
jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
}
else
{
// Connect
var credMgr = HostContext.GetService<ICredentialManager>();
var credsV2 = credMgr.LoadCredentials(allowAuthUrlV2: true);
var runServer = HostContext.CreateService<IRunServer>();
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), credsV2);

// Get job message
try
{
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageRef.BillingOwnerId, messageQueueLoopTokenSource.Token);
Expand Down Expand Up @@ -698,7 +718,10 @@ ex is TaskOrchestrationJobAlreadyAcquiredException || // HTTP status 409
}
}

// Dispatch
jobDispatcher.Run(jobRequestMessage, runOnce);

// Run once?
if (runOnce)
{
Trace.Info("One time used runner received job message.");
Expand Down
3 changes: 3 additions & 0 deletions src/Runner.Listener/RunnerJobRequestRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ public sealed class RunnerJobRequestRef

[DataMember(Name = "runner_request_id")]
public string RunnerRequestId { get; set; }

[DataMember(Name = "should_acknowledge")]
public bool ShouldAcknowledge { get; set; }

[DataMember(Name = "run_service_url")]
public string RunServiceUrl { get; set; }
Expand Down
72 changes: 71 additions & 1 deletion src/Sdk/WebApi/WebApi/BrokerHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public async Task<TaskAgentMessage> GetRunnerMessageAsync(
{
queryParams.Add("status", status.Value.ToString());
}

if (runnerVersion != null)
{
queryParams.Add("runnerVersion", runnerVersion);
Expand Down Expand Up @@ -142,7 +143,6 @@ public async Task<TaskAgentMessage> GetRunnerMessageAsync(
}

public async Task<TaskAgentSession> CreateSessionAsync(

TaskAgentSession session,
CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -191,6 +191,76 @@ public async Task DeleteSessionAsync(
throw new Exception($"Failed to delete broker session: {result.Error}");
}

public async Task AcknowledgeRunnerRequestAsync(
string runnerRequestId,
Guid? sessionId,
string runnerVersion,
TaskAgentStatus? status,
string os = null,
string architecture = null,
CancellationToken cancellationToken = default)
{
// URL
var requestUri = new Uri(Client.BaseAddress, "acknowledge");

// Query parameters
List<KeyValuePair<string, string>> queryParams = new List<KeyValuePair<string, string>>();
if (sessionId != null)
{
queryParams.Add("sessionId", sessionId.Value.ToString());
}
if (status != null)
{
queryParams.Add("status", status.Value.ToString());
}
if (runnerVersion != null)
{
queryParams.Add("runnerVersion", runnerVersion);
}
if (os != null)
{
queryParams.Add("os", os);
}
if (architecture != null)
{
queryParams.Add("architecture", architecture);
}

// Body
var payload = new Dictionary<string, string>
{
["runnerRequestId"] = runnerRequestId,
};
var requestContent = new ObjectContent<Dictionary<string, string>>(payload, new VssJsonMediaTypeFormatter(true));

// POST
var result = await SendAsync<object>(
new HttpMethod("POST"),
requestUri: requestUri,
queryParameters: queryParams,
content: requestContent,
readErrorBody: true,
cancellationToken: cancellationToken);

if (result.IsSuccess)
{
return;
}

if (TryParseErrorBody(result.ErrorBody, out BrokerError brokerError))
{
switch (brokerError.ErrorKind)
{
case BrokerErrorKind.RunnerNotFound:
throw new RunnerNotFoundException(brokerError.Message);
default:
break;
}
}

throw new Exception($"Failed to acknowledge runner request. Request to {requestUri} failed with status: {result.StatusCode}. Error message {result.Error}");
}

private static bool TryParseErrorBody(string errorBody, out BrokerError error)
{
if (!string.IsNullOrEmpty(errorBody))
Expand Down
Loading