diff --git a/src/Runner.Common/BrokerServer.cs b/src/Runner.Common/BrokerServer.cs index 1f6c01685bd..751ae1eee32 100644 --- a/src/Runner.Common/BrokerServer.cs +++ b/src/Runner.Common/BrokerServer.cs @@ -23,6 +23,8 @@ public interface IBrokerServer : IRunnerService Task GetRunnerMessageAsync(Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, bool disableUpdate, CancellationToken token); + Task AcknowledgeRunnerRequestAsync(string runnerRequestId, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, CancellationToken token); + Task UpdateConnectionIfNeeded(Uri serverUri, VssCredentials credentials); Task ForceRefreshConnection(VssCredentials credentials); @@ -67,10 +69,17 @@ public Task GetRunnerMessageAsync(Guid? sessionId, TaskAgentSt var brokerSession = RetryRequest( async () => await _brokerHttpClient.GetRunnerMessageAsync(sessionId, version, status, os, architecture, disableUpdate, cancellationToken), cancellationToken, shouldRetry: ShouldRetryException); - return brokerSession; } + public async Task AcknowledgeRunnerRequestAsync(string runnerRequestId, Guid? sessionId, TaskAgentStatus status, string version, string os, string architecture, CancellationToken cancellationToken) + { + CheckConnection(); + + // No retries + await _brokerHttpClient.AcknowledgeRunnerRequestAsync(runnerRequestId, sessionId, version, status, os, architecture, cancellationToken); + } + public async Task DeleteSessionAsync(CancellationToken cancellationToken) { CheckConnection(); diff --git a/src/Runner.Common/RunnerService.cs b/src/Runner.Common/RunnerService.cs index f266d8a0263..ccaa83f698f 100644 --- a/src/Runner.Common/RunnerService.cs +++ b/src/Runner.Common/RunnerService.cs @@ -70,7 +70,7 @@ protected async Task EstablishVssConnection(Uri serverUrl, VssCre protected async Task RetryRequest(Func func, CancellationToken cancellationToken, - int maxRetryAttemptsCount = 5, + int maxAttempts = 5, Func shouldRetry = null ) { @@ -79,31 +79,31 @@ async Task wrappedFunc() await func(); return Unit.Value; } - await RetryRequest(wrappedFunc, cancellationToken, maxRetryAttemptsCount, shouldRetry); + await RetryRequest(wrappedFunc, cancellationToken, maxAttempts, shouldRetry); } protected async Task RetryRequest(Func> func, CancellationToken cancellationToken, - int maxRetryAttemptsCount = 5, + int maxAttempts = 5, Func shouldRetry = null ) { - var retryCount = 0; + var attempt = 0; while (true) { - retryCount++; + attempt++; cancellationToken.ThrowIfCancellationRequested(); try { return await func(); } // TODO: Add handling of non-retriable exceptions: https://github.com/github/actions-broker/issues/122 - catch (Exception ex) when (retryCount < maxRetryAttemptsCount && (shouldRetry == null || shouldRetry(ex))) + catch (Exception ex) when (attempt < maxAttempts && (shouldRetry == null || shouldRetry(ex))) { Trace.Error("Catch exception during request"); Trace.Error(ex); var backOff = BackoffTimerHelper.GetRandomBackoff(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15)); - Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxRetryAttemptsCount - retryCount} attempt left."); + Trace.Warning($"Back off {backOff.TotalSeconds} seconds before next retry. {maxAttempts - attempt} attempt left."); await Task.Delay(backOff, cancellationToken); } } diff --git a/src/Runner.Listener/BrokerMessageListener.cs b/src/Runner.Listener/BrokerMessageListener.cs index a25670c9852..7c9ca401cba 100644 --- a/src/Runner.Listener/BrokerMessageListener.cs +++ b/src/Runner.Listener/BrokerMessageListener.cs @@ -23,7 +23,7 @@ public sealed class BrokerMessageListener : RunnerService, IMessageListener private RunnerSettings _settings; private ITerminal _term; private TimeSpan _getNextMessageRetryInterval; - private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; + private TaskAgentStatus _runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; private VssCredentials _creds; private VssCredentials _credsV2; @@ -258,7 +258,7 @@ public async Task DeleteSessionAsync() public void OnJobStatus(object sender, JobStatusEventArgs e) { Trace.Info("Received job status event. JobState: {0}", e.Status); - runnerStatus = e.Status; + _runnerStatus = e.Status; try { _getMessagesTokenSource?.Cancel(); @@ -291,7 +291,7 @@ public async Task GetNextMessageAsync(CancellationToken token) } message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId, - runnerStatus, + _runnerStatus, BuildConstants.RunnerPackage.Version, VarUtil.OS, VarUtil.OSArchitecture, @@ -417,6 +417,21 @@ public async Task DeleteMessageAsync(TaskAgentMessage message) await Task.CompletedTask; } + public async Task AcknowledgeMessageAsync(string runnerRequestId, CancellationToken cancellationToken) + { + using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); // Short timeout + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); + Trace.Info($"Acknowledging runner request '{runnerRequestId}'."); + await _brokerServer.AcknowledgeRunnerRequestAsync( + runnerRequestId, + _session.SessionId, + _runnerStatus, + BuildConstants.RunnerPackage.Version, + VarUtil.OS, + VarUtil.OSArchitecture, + linkedCts.Token); + } + private bool IsGetNextMessageExceptionRetriable(Exception ex) { if (ex is TaskAgentNotFoundException || diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index d30ed2bf9da..ef06dd1af3c 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -32,6 +32,7 @@ public interface IMessageListener : IRunnerService Task DeleteSessionAsync(); Task GetNextMessageAsync(CancellationToken token); Task DeleteMessageAsync(TaskAgentMessage message); + Task AcknowledgeMessageAsync(string runnerRequestId, CancellationToken cancellationToken); Task RefreshListenerTokenAsync(); void OnJobStatus(object sender, JobStatusEventArgs e); @@ -52,7 +53,7 @@ public sealed class MessageListener : RunnerService, IMessageListener private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); private readonly Dictionary _sessionCreationExceptionTracker = new(); - private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; + private TaskAgentStatus _runnerStatus = TaskAgentStatus.Online; private CancellationTokenSource _getMessagesTokenSource; private VssCredentials _creds; private VssCredentials _credsV2; @@ -217,7 +218,7 @@ public async Task DeleteSessionAsync() public void OnJobStatus(object sender, JobStatusEventArgs e) { Trace.Info("Received job status event. JobState: {0}", e.Status); - runnerStatus = e.Status; + _runnerStatus = e.Status; try { _getMessagesTokenSource?.Cancel(); @@ -250,7 +251,7 @@ public async Task GetNextMessageAsync(CancellationToken token) message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, _session.SessionId, _lastMessageId, - runnerStatus, + _runnerStatus, BuildConstants.RunnerPackage.Version, VarUtil.OS, VarUtil.OSArchitecture, @@ -274,7 +275,7 @@ public async Task GetNextMessageAsync(CancellationToken token) } message = await _brokerServer.GetRunnerMessageAsync(_session.SessionId, - runnerStatus, + _runnerStatus, BuildConstants.RunnerPackage.Version, VarUtil.OS, VarUtil.OSArchitecture, @@ -437,6 +438,21 @@ public async Task RefreshListenerTokenAsync() await _brokerServer.ForceRefreshConnection(_credsV2); } + public async Task AcknowledgeMessageAsync(string runnerRequestId, CancellationToken cancellationToken) + { + using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); // Short timeout + using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); + Trace.Info($"Acknowledging runner request '{runnerRequestId}'."); + await _brokerServer.AcknowledgeRunnerRequestAsync( + runnerRequestId, + _session.SessionId, + _runnerStatus, + BuildConstants.RunnerPackage.Version, + VarUtil.OS, + VarUtil.OSArchitecture, + linkedCts.Token); + } + private TaskAgentMessage DecryptMessage(TaskAgentMessage message) { if (_session.EncryptionKey == null || diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index 569d5505cad..8262d31f12d 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -654,22 +654,42 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) else { var messageRef = StringUtil.ConvertFromJson(message.Body); - Pipelines.AgentJobRequestMessage jobRequestMessage = null; - // Create connection - var credMgr = HostContext.GetService(); + // Acknowledge (best-effort) + if (messageRef.ShouldAcknowledge) // Temporary feature flag + { + try + { + await _listener.AcknowledgeMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); + } + catch (Exception ex) + { + Trace.Error($"Best-effort acknowledge failed for request '{messageRef.RunnerRequestId}'"); + Trace.Error(ex); + } + } + + Pipelines.AgentJobRequestMessage jobRequestMessage = null; if (string.IsNullOrEmpty(messageRef.RunServiceUrl)) { + // Connect + var credMgr = HostContext.GetService(); var creds = credMgr.LoadCredentials(allowAuthUrlV2: false); var actionsRunServer = HostContext.CreateService(); await actionsRunServer.ConnectAsync(new Uri(settings.ServerUrl), creds); + + // Get job message jobRequestMessage = await actionsRunServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token); } else { + // Connect + var credMgr = HostContext.GetService(); var credsV2 = credMgr.LoadCredentials(allowAuthUrlV2: true); var runServer = HostContext.CreateService(); await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), credsV2); + + // Get job message try { jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageRef.BillingOwnerId, messageQueueLoopTokenSource.Token); @@ -698,7 +718,10 @@ ex is TaskOrchestrationJobAlreadyAcquiredException || // HTTP status 409 } } + // Dispatch jobDispatcher.Run(jobRequestMessage, runOnce); + + // Run once? if (runOnce) { Trace.Info("One time used runner received job message."); diff --git a/src/Runner.Listener/RunnerJobRequestRef.cs b/src/Runner.Listener/RunnerJobRequestRef.cs index a74662dd2d9..331dbb21df5 100644 --- a/src/Runner.Listener/RunnerJobRequestRef.cs +++ b/src/Runner.Listener/RunnerJobRequestRef.cs @@ -10,6 +10,9 @@ public sealed class RunnerJobRequestRef [DataMember(Name = "runner_request_id")] public string RunnerRequestId { get; set; } + + [DataMember(Name = "should_acknowledge")] + public bool ShouldAcknowledge { get; set; } [DataMember(Name = "run_service_url")] public string RunServiceUrl { get; set; } diff --git a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs index cd16c1e7395..d4a05526290 100644 --- a/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/BrokerHttpClient.cs @@ -79,6 +79,7 @@ public async Task GetRunnerMessageAsync( { queryParams.Add("status", status.Value.ToString()); } + if (runnerVersion != null) { queryParams.Add("runnerVersion", runnerVersion); @@ -142,7 +143,6 @@ public async Task GetRunnerMessageAsync( } public async Task CreateSessionAsync( - TaskAgentSession session, CancellationToken cancellationToken = default) { @@ -191,6 +191,76 @@ public async Task DeleteSessionAsync( throw new Exception($"Failed to delete broker session: {result.Error}"); } + public async Task AcknowledgeRunnerRequestAsync( + string runnerRequestId, + Guid? sessionId, + string runnerVersion, + TaskAgentStatus? status, + string os = null, + string architecture = null, + CancellationToken cancellationToken = default) + { + // URL + var requestUri = new Uri(Client.BaseAddress, "acknowledge"); + + // Query parameters + List> queryParams = new List>(); + if (sessionId != null) + { + queryParams.Add("sessionId", sessionId.Value.ToString()); + } + if (status != null) + { + queryParams.Add("status", status.Value.ToString()); + } + if (runnerVersion != null) + { + queryParams.Add("runnerVersion", runnerVersion); + } + if (os != null) + { + queryParams.Add("os", os); + } + if (architecture != null) + { + queryParams.Add("architecture", architecture); + } + + // Body + var payload = new Dictionary + { + ["runnerRequestId"] = runnerRequestId, + }; + var requestContent = new ObjectContent>(payload, new VssJsonMediaTypeFormatter(true)); + + // POST + var result = await SendAsync( + new HttpMethod("POST"), + requestUri: requestUri, + queryParameters: queryParams, + content: requestContent, + readErrorBody: true, + cancellationToken: cancellationToken); + + if (result.IsSuccess) + { + return; + } + + if (TryParseErrorBody(result.ErrorBody, out BrokerError brokerError)) + { + switch (brokerError.ErrorKind) + { + case BrokerErrorKind.RunnerNotFound: + throw new RunnerNotFoundException(brokerError.Message); + default: + break; + } + } + + throw new Exception($"Failed to acknowledge runner request. Request to {requestUri} failed with status: {result.StatusCode}. Error message {result.Error}"); + } + private static bool TryParseErrorBody(string errorBody, out BrokerError error) { if (!string.IsNullOrEmpty(errorBody))