Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions agent/engine/dependencygraph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ const (
)

var (
// CredentialsNotResolvedErr is the error where a container needs to wait for
// credentials before it can process by agent
CredentialsNotResolvedErr = &dependencyError{err: errors.New("dependency graph: container execution credentials not available")}
// CredentialsNotResolvedErr is the error when a task needs to wait for credentials before it can be progressed to its desired status by the agent
CredentialsNotResolvedErr = &dependencyError{err: errors.New("dependency graph: execution role credentials not available")}
// DependentContainerNotResolvedErr is the error where a dependent container isn't in expected state
DependentContainerNotResolvedErr = &dependencyError{err: errors.New("dependency graph: dependent container not in expected state")}
// ContainerPastDesiredStatusErr is the error where the container status is bigger than desired status
Expand Down
46 changes: 34 additions & 12 deletions agent/engine/engine_sudo_linux_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,9 +868,19 @@ func TestGMSATaskFileS3Err(t *testing.T) {
cfg.GMSACapable = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}
cfg.AWSRegion = "us-west-2"

taskEngine, done, _ := setupGMSALinux(cfg, nil, t)
taskEngine, done, credentialsManager := setupGMSALinux(cfg, nil, t)
defer done()

// mock execution role credentials
mockCreds := &credentials.TaskIAMRoleCredentials{
ARN: "testGMSAFileTaskARN",
IAMRoleCredentials: credentials.IAMRoleCredentials{
RoleArn: "arn:aws:iam::123456789012:role/execution-role",
CredentialsID: "exec-creds-id",
},
}
credentialsManager.SetTaskCredentials(mockCreds)

stateChangeEvents := taskEngine.StateChangeEvents()

testContainer := CreateTestContainer()
Expand All @@ -880,11 +890,12 @@ func TestGMSATaskFileS3Err(t *testing.T) {
testContainer.DockerConfig.HostConfig = &hostConfig

testTask := &apitask.Task{
Arn: "testGMSAFileTaskARN",
Family: "family",
Version: "1",
DesiredStatusUnsafe: apitaskstatus.TaskRunning,
Containers: []*apicontainer.Container{testContainer},
Arn: "testGMSAFileTaskARN",
Family: "family",
Version: "1",
DesiredStatusUnsafe: apitaskstatus.TaskRunning,
Containers: []*apicontainer.Container{testContainer},
ExecutionCredentialsID: "exec-creds-id",
}
testTask.Containers[0].TransitionDependenciesMap = make(map[apicontainerstatus.ContainerStatus]apicontainer.TransitionDependencySet)
testTask.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource)
Expand All @@ -908,9 +919,19 @@ func TestGMSATaskFileSSMErr(t *testing.T) {
cfg.GMSACapable = config.BooleanDefaultFalse{Value: config.ExplicitlyEnabled}
cfg.AWSRegion = "us-west-2"

taskEngine, done, _ := setupGMSALinux(cfg, nil, t)
taskEngine, done, credentialsManager := setupGMSALinux(cfg, nil, t)
defer done()

// mock execution role credentials
mockCreds := &credentials.TaskIAMRoleCredentials{
ARN: "testGMSAFileTaskARN",
IAMRoleCredentials: credentials.IAMRoleCredentials{
RoleArn: "arn:aws:iam::123456789012:role/execution-role",
CredentialsID: "exec-creds-id",
},
}
credentialsManager.SetTaskCredentials(mockCreds)

stateChangeEvents := taskEngine.StateChangeEvents()

testContainer := CreateTestContainer()
Expand All @@ -920,11 +941,12 @@ func TestGMSATaskFileSSMErr(t *testing.T) {
testContainer.DockerConfig.HostConfig = &hostConfig

testTask := &apitask.Task{
Arn: "testGMSAFileTaskARN",
Family: "family",
Version: "1",
DesiredStatusUnsafe: apitaskstatus.TaskRunning,
Containers: []*apicontainer.Container{testContainer},
Arn: "testGMSAFileTaskARN",
Family: "family",
Version: "1",
DesiredStatusUnsafe: apitaskstatus.TaskRunning,
Containers: []*apicontainer.Container{testContainer},
ExecutionCredentialsID: "exec-creds-id",
}
testTask.Containers[0].TransitionDependenciesMap = make(map[apicontainerstatus.ContainerStatus]apicontainer.TransitionDependencySet)
testTask.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource)
Expand Down
111 changes: 111 additions & 0 deletions agent/engine/engine_unix_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ import (
"github.com/aws/amazon-ecs-agent/agent/dockerclient"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/dockerapi"
"github.com/aws/amazon-ecs-agent/agent/dockerclient/sdkclientfactory"
mockssmfactory "github.com/aws/amazon-ecs-agent/agent/ssm/factory/mocks"
mockssm "github.com/aws/amazon-ecs-agent/agent/ssm/mocks"
"github.com/aws/amazon-ecs-agent/agent/statechange"
"github.com/aws/amazon-ecs-agent/agent/taskresource"
taskresourcevolume "github.com/aws/amazon-ecs-agent/agent/taskresource/volume"
"github.com/aws/amazon-ecs-agent/agent/utils"
apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status"
apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/ipcompatibility"
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/ttime"
"github.com/golang/mock/gomock"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ssm"
ssmtypes "github.com/aws/aws-sdk-go-v2/service/ssm/types"
"github.com/cihub/seelog"
"github.com/containerd/cgroups/v3"
"github.com/docker/docker/api/types"
Expand Down Expand Up @@ -1908,3 +1915,107 @@ func TestHostResourceManagerLaunchTypeBehavior(t *testing.T) {
})
}
}

// TestTaskResourceDependencyOnRestart verifies that a task with resources properly waits for execution role credentials after agent restart, blocking
// until credentials arrive from ACS, then progressing to RUNNING.
// Note: although this test asserts functionality that is platform-agnostic, the test setup is such on Windows that we'd run into a timing issue.
// This is because the VerifyManifestPulledStateChange helper methods wait for events on the task engine's state channels,
// but on Windows, these are skipped due to no local registry setup, so the test progresses to its assertions faster than intended.
func TestTaskResourceDependencyOnRestart(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

taskArn := "arn:aws:ecs:us-east-1:123456789012:task/testTaskResourceDependencyOnRestart"
testCredentialsID := "test-execution-credentials-id"
secretValueFrom := "arn:aws:ssm:us-east-1:123456789012:parameter/test-secret"
secretValue := "mock-secret-value"

// Create a task with a SSM secret resource dependency
testTask := CreateTestTask(taskArn)
testTask.Containers[0].Image = "busybox:latest" // Override container image
testTask.Containers[0].Secrets = []apicontainer.Secret{
{
Name: "test-secret",
ValueFrom: secretValueFrom,
Provider: "ssm",
Region: "us-east-1",
},
}
testTask.Containers[0].TransitionDependenciesMap = make(map[apicontainerstatus.ContainerStatus]apicontainer.TransitionDependencySet)
testTask.ResourcesMapUnsafe = make(map[string][]taskresource.TaskResource)
testTask.SetExecutionRoleCredentialsID(testCredentialsID)

// Start engine with default config
taskEngine, done, _, credentialsManager := setupWithDefaultConfig(t)
defer done()

// Set up SSM mocks
ssmClientCreator := mockssmfactory.NewMockSSMClientCreator(ctrl)
mockSSMClient := mockssm.NewMockSSMClient(ctrl)
mockCreds := &credentials.TaskIAMRoleCredentials{
ARN: taskArn,
IAMRoleCredentials: credentials.IAMRoleCredentials{
RoleArn: "arn:aws:iam::123456789012:role/execution-role",
CredentialsID: testCredentialsID,
},
}
ssmOutput := &ssm.GetParametersOutput{
Parameters: []ssmtypes.Parameter{
{
Name: aws.String(secretValueFrom),
Value: aws.String(secretValue),
},
},
}
ssmClientCreator.EXPECT().NewSSMClient("us-east-1", mockCreds.IAMRoleCredentials, ipcompatibility.NewIPv4OnlyCompatibility()).Return(mockSSMClient, nil).AnyTimes()
mockSSMClient.EXPECT().GetParameters(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, in *ssm.GetParametersInput, optFns ...func(*ssm.Options)) {
require.Equal(t, []string{secretValueFrom}, in.Names)
}).Return(ssmOutput, nil).AnyTimes()

// Configure engine with SSM client creator
taskEngine.(*DockerTaskEngine).resourceFields = &taskresource.ResourceFields{
ResourceFieldsCommon: &taskresource.ResourceFieldsCommon{
SSMClientCreator: ssmClientCreator,
CredentialsManager: credentialsManager,
},
}

// Add task to engine without credentials
go taskEngine.AddTask(testTask)

// Verify task progresses but gets blocked waiting for execution credentials
VerifyContainerManifestPulledStateChange(t, taskEngine)
VerifyTaskManifestPulledStateChange(t, taskEngine)
Comment thread
singholt marked this conversation as resolved.

// Verify the task has SSM resource that requires execution credentials
resources := testTask.GetResources()
var ssmResource taskresource.TaskResource
for _, resource := range resources {
if resource.GetName() == "ssmsecret" {
ssmResource = resource
break
}
}
require.NotNil(t, ssmResource, "SSM resource should exist")
require.True(t, ssmResource.RequiresExecutionRoleCredentials(), "SSM resource should require execution credentials")

// Mock credentials arrival from ACS
credentialsManager.SetTaskCredentials(mockCreds)

// Mimic ACS payload responder: after setting credentials, call AddTask to trigger task re-evaluation
// This is how blocked tasks get unblocked when credentials arrive from ACS
go taskEngine.AddTask(testTask)

// Verify task progresses through expected states after credentials are available
VerifyContainerRunningStateChange(t, taskEngine)
VerifyTaskRunningStateChange(t, taskEngine)

// Stop the task
taskUpdate := CreateTestTask("testTaskResourceDependencyOnRestart")
taskUpdate.Arn = taskArn
taskUpdate.SetDesiredStatus(apitaskstatus.TaskStopped)
go taskEngine.AddTask(taskUpdate)

VerifyContainerStoppedStateChange(t, taskEngine)
VerifyTaskStoppedStateChange(t, taskEngine)
}
53 changes: 42 additions & 11 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ import (
)

const (
// waitForPullCredentialsTimeout is the timeout agent trying to wait for pull
// credentials from acs, after the timeout it will check the credentials manager
// and start processing the task or start another round of waiting
waitForPullCredentialsTimeout = 1 * time.Minute
// waitForExecutionRoleCredentialsTimeout is the timeout when waiting for execution role credentials to arrive from ACS
waitForExecutionRoleCredentialsTimeout = 1 * time.Minute
systemPingTimeout = 5 * time.Second
defaultTaskSteadyStatePollInterval = 5 * time.Minute
defaultTaskSteadyStatePollIntervalJitter = 30 * time.Second
Expand Down Expand Up @@ -1070,7 +1068,7 @@ func (mtask *managedTask) progressTask() {
// task may be moved to stopped.
// anyResourceTransition is set to true when transition function needs to be called or
// known status can be changed
anyResourceTransition, resTransitions := mtask.startResourceTransitions(
anyResourceTransition, resTransitions, resourceReasons := mtask.startResourceTransitions(
func(resource taskresource.TaskResource, nextStatus resourcestatus.ResourceStatus) {
mtask.transitionResource(resource, nextStatus)
transitionChange <- struct{}{}
Expand All @@ -1092,7 +1090,9 @@ func (mtask *managedTask) progressTask() {
// its impossible for containers to move forward. We will do an additional check to see if we are waiting for ACS
// execution credentials. If not, then we will abort the task progression.
if !atLeastOneTransitionStarted && !blockedByOrderingDependencies {
if !mtask.isWaitingForACSExecutionCredentials(reasons) {
// Combine reasons from both container and resource transitions
allReasons := append(reasons, resourceReasons...)
if !mtask.isWaitingForACSExecutionCredentials(allReasons) {
mtask.handleContainersUnableToTransitionState()
}
return
Expand Down Expand Up @@ -1143,16 +1143,16 @@ func (mtask *managedTask) progressTask() {
func (mtask *managedTask) isWaitingForACSExecutionCredentials(reasons []error) bool {
for _, reason := range reasons {
if reason == dependencygraph.CredentialsNotResolvedErr {
logger.Info("Waiting for credentials to pull from ECR", logger.Fields{
logger.Info("Waiting for execution role credentials to arrive from ACS", logger.Fields{
field.TaskID: mtask.GetID(),
})

timeoutCtx, timeoutCancel := context.WithTimeout(mtask.ctx, waitForPullCredentialsTimeout)
timeoutCtx, timeoutCancel := context.WithTimeout(mtask.ctx, waitForExecutionRoleCredentialsTimeout)
defer timeoutCancel()

timedOut := mtask.waitEvent(timeoutCtx.Done())
if timedOut {
logger.Info("Timed out waiting for acs credentials message", logger.Fields{
logger.Info("Timed out waiting for execution role credentials to arrive from ACS", logger.Fields{
field.TaskID: mtask.GetID(),
})
}
Expand Down Expand Up @@ -1244,8 +1244,9 @@ func (mtask *managedTask) handleTerminalDependencyError(container *apicontainer.

// startResourceTransitions steps through each resource in the task and calls
// the passed transition function when a transition should occur
func (mtask *managedTask) startResourceTransitions(transitionFunc resourceTransitionFunc) (bool, map[string]string) {
func (mtask *managedTask) startResourceTransitions(transitionFunc resourceTransitionFunc) (bool, map[string]string, []error) {
anyCanTransition := false
var reasons []error
transitions := make(map[string]string)
for _, res := range mtask.GetResources() {
knownStatus := res.GetKnownStatus()
Expand All @@ -1260,6 +1261,19 @@ func (mtask *managedTask) startResourceTransitions(transitionFunc resourceTransi
})
continue
}

// Check if resource requires execution role credentials and whether they're available
if !mtask.taskExecutionRoleCredentialsResolved(res) {
logger.Info("Can't transition resource due to missing execution role credentials", logger.Fields{
field.TaskID: mtask.GetID(),
field.Resource: res.GetName(),
field.KnownStatus: res.StatusString(knownStatus),
field.DesiredStatus: res.StatusString(desiredStatus),
})
reasons = append(reasons, dependencygraph.CredentialsNotResolvedErr)
continue
}

anyCanTransition = true
transition := mtask.resourceNextState(res)
// If the resource is already in a transition, skip
Expand All @@ -1281,7 +1295,24 @@ func (mtask *managedTask) startResourceTransitions(transitionFunc resourceTransi
transitions[res.GetName()] = transition.status
go transitionFunc(res, transition.nextState)
}
return anyCanTransition, transitions
return anyCanTransition, transitions, reasons
}

// taskExecutionRoleCredentialsResolved checks if execution credentials are available for a task resource
func (mtask *managedTask) taskExecutionRoleCredentialsResolved(resource taskresource.TaskResource) bool {
// If resource doesn't need execution role credentials, it's always resolved
if !resource.RequiresExecutionRoleCredentials() {
return true
}

// If resource is already created, credentials were available when it was created
if resource.GetKnownStatus() >= resourcestatus.ResourceCreated {
return true
}

// Check if credentials are available
_, ok := mtask.credentialsManager.GetTaskCredentials(mtask.Task.GetExecutionCredentialsID())
return ok
}

// transitionResource calls applyResourceState, and then notifies the managed
Expand Down
Loading
Loading