Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions changelog/fragments/1773217838-main.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: Fix an issue where some components could be missing from the status output

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
description: >
In some cases, a late update from an older component instance could overwrite a newer state.
This could cause components to be missing from the status output.
With this fix, updates from older instances are ignored if a newer update has already been processed.

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,29 @@ func (c *Coordinator) refreshState() {
func (c *Coordinator) applyComponentState(state runtime.ComponentComponentState) {
// check for any component updates to the known PID, so we can update the component monitoring
found := false
ignore := false
for i, other := range c.state.Components {
if other.Component.ID == state.Component.ID {
// We want to update the component state if the incoming update is from the same instance or a newer instance of the component.
// We determine this by comparing start times, since a newer instance would have a later start time.
if other.Component.ID == state.Component.ID && (other.Component.StartTime.UnixNano() <= state.Component.StartTime.UnixNano()) {
if other.State.Pid != state.State.Pid {
c.componentPidRequiresUpdate.Store(true)
}
c.state.Components[i] = state
found = true
break
} else if other.Component.ID == state.Component.ID && other.Component.StartTime.UnixNano() > state.Component.StartTime.UnixNano() {
// This is a case where a component has transitioned to a new state but we receive a late update from the older component.
ignore = true
break
}
}
if ignore {
// we have received a component update from an older instance of a component.
// This can happen when the old instance takes a long time to stop and we have already processed the new instance's STARTING/HEALTHY state.
// In this case, we should ignore this update since it is stale and would incorrectly overwrite the current state of the component.
return
}
if !found {
c.state.Components = append(c.state.Components, state)
if state.State.Pid != 0 {
Expand Down
233 changes: 233 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator_state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package coordinator

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"

pkgcomponent "github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
"github.com/elastic/elastic-agent/pkg/utils/broadcaster"
)

func TestApplyComponentState_LateStoppedFromDifferentRuntimeIgnored(t *testing.T) {
comp1 := pkgcomponent.Component{
ID: "filestream-default",
RuntimeManager: pkgcomponent.OtelRuntimeManager,
StartTime: time.Now(),
}
comp2 := pkgcomponent.Component{
ID: "system/metrics-default",
RuntimeManager: pkgcomponent.OtelRuntimeManager,
StartTime: time.Now(),
}
coord := &Coordinator{
state: State{
CoordinatorState: agentclient.Healthy,
CoordinatorMessage: "Running",
},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
componentModel: []pkgcomponent.Component{
comp1,
comp2,
},
}

filestreamID := "filestream-default"
metricsID := "system/metrics-default"

// Both components start under otel runtime and become healthy.
coord.applyComponentState(runtime.ComponentComponentState{
Component: comp1,
State: runtime.ComponentState{
State: client.UnitStateHealthy,
Message: "Healthy",
},
})
coord.applyComponentState(runtime.ComponentComponentState{
Component: comp2,
State: runtime.ComponentState{
State: client.UnitStateHealthy,
Message: "Healthy",
},
})
require.Len(t, coord.state.Components, 2)

// Runtime switch: both components start under the process runtime.
// The STARTING state is allowed to replace the existing entry even when
// the RuntimeManager differs.
comp1New := pkgcomponent.Component{
ID: "filestream-default",
RuntimeManager: pkgcomponent.ProcessRuntimeManager,
StartTime: time.Now(),
}
comp2New := pkgcomponent.Component{
ID: "system/metrics-default",
RuntimeManager: pkgcomponent.ProcessRuntimeManager,
StartTime: time.Now(),
}

coord.applyComponentState(runtime.ComponentComponentState{
Component: comp1New,
State: runtime.ComponentState{
State: client.UnitStateStarting,
Message: "Starting",
},
})
coord.applyComponentState(runtime.ComponentComponentState{
Component: comp2New,
State: runtime.ComponentState{
State: client.UnitStateStarting,
Message: "Starting",
},
})
require.Len(t, coord.state.Components, 2, "STARTING from a new runtime should replace, not duplicate")
for _, cs := range coord.state.Components {
assert.Equal(t, pkgcomponent.ProcessRuntimeManager, cs.Component.RuntimeManager,
"component %s should now be under process runtime", cs.Component.ID)
assert.Equal(t, client.UnitStateStarting, cs.State.State)
}

// Process runtime components become healthy.
coord.applyComponentState(runtime.ComponentComponentState{
Component: comp1New,
State: runtime.ComponentState{
State: client.UnitStateHealthy,
Message: "Healthy: communicating with pid",
},
})
coord.applyComponentState(runtime.ComponentComponentState{
Component: comp2New,
State: runtime.ComponentState{
State: client.UnitStateHealthy,
Message: "Healthy: communicating with pid",
},
})
require.Len(t, coord.state.Components, 2)
for _, cs := range coord.state.Components {
assert.Equal(t, client.UnitStateHealthy, cs.State.State,
"component %s should be healthy", cs.Component.ID)
}

// Late STOPPED events arrive from the old otel runtime. This simulates
// the race where the otel collector takes >3 seconds to stop, so these
// events arrive after the process runtime is already healthy.
coord.applyComponentState(runtime.ComponentComponentState{
Component: comp1,
State: runtime.ComponentState{
State: client.UnitStateStopped,
},
})
coord.applyComponentState(runtime.ComponentComponentState{
Component: comp2,
State: runtime.ComponentState{
State: client.UnitStateStopped,
},
})

// Both process runtime components must still be present and healthy.
require.Len(t, coord.state.Components, 2,
"late STOPPED from otel runtime must not remove process runtime components")

componentsByID := make(map[string]runtime.ComponentComponentState, len(coord.state.Components))
for _, cs := range coord.state.Components {
componentsByID[cs.Component.ID] = cs
}

for _, id := range []string{metricsID, filestreamID} {
cs, ok := componentsByID[id]
require.True(t, ok, "component %s should still be present", id)
assert.Equal(t, client.UnitStateHealthy, cs.State.State,
"component %s should still be healthy", id)
assert.Equal(t, pkgcomponent.ProcessRuntimeManager, cs.Component.RuntimeManager,
"component %s should still be under process runtime", id)
}
}

func TestApplyComponentState_StoppedFromSameRuntimeRemovesComponent(t *testing.T) {
coord := &Coordinator{
logger: logp.NewLogger("testing"),
state: State{
CoordinatorState: agentclient.Healthy,
CoordinatorMessage: "Running",
},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
}

coord.applyComponentState(runtime.ComponentComponentState{
Component: pkgcomponent.Component{
ID: "filestream-default",
RuntimeManager: pkgcomponent.ProcessRuntimeManager,
},
State: runtime.ComponentState{
State: client.UnitStateHealthy,
Message: "Healthy",
},
})
require.Len(t, coord.state.Components, 1)

coord.applyComponentState(runtime.ComponentComponentState{
Component: pkgcomponent.Component{
ID: "filestream-default",
RuntimeManager: pkgcomponent.ProcessRuntimeManager,
},
State: runtime.ComponentState{
State: client.UnitStateStopped,
},
})

assert.Empty(t, coord.state.Components,
"STOPPED from the same runtime should remove the component")
}

func TestApplyComponentState_StartingFromNewRuntimeReplacesExisting(t *testing.T) {
coord := &Coordinator{
state: State{
CoordinatorState: agentclient.Healthy,
CoordinatorMessage: "Running",
},
stateBroadcaster: broadcaster.New(State{}, 0, 0),
}

coord.applyComponentState(runtime.ComponentComponentState{
Component: pkgcomponent.Component{
ID: "filestream-default",
RuntimeManager: pkgcomponent.OtelRuntimeManager,
StartTime: time.Now(),
},
State: runtime.ComponentState{
State: client.UnitStateHealthy,
Message: "Healthy",
},
})
require.Len(t, coord.state.Components, 1)
assert.Equal(t, pkgcomponent.OtelRuntimeManager, coord.state.Components[0].Component.RuntimeManager)

coord.applyComponentState(runtime.ComponentComponentState{
Component: pkgcomponent.Component{
ID: "filestream-default",
RuntimeManager: pkgcomponent.ProcessRuntimeManager,
StartTime: time.Now(),
},
State: runtime.ComponentState{
State: client.UnitStateStarting,
Message: "Starting",
},
})

require.Len(t, coord.state.Components, 1,
"STARTING from new runtime should replace, not duplicate")
assert.Equal(t, pkgcomponent.ProcessRuntimeManager, coord.state.Components[0].Component.RuntimeManager,
"component should now be under process runtime")
assert.Equal(t, client.UnitStateStarting, coord.state.Components[0].State.State)
}
6 changes: 6 additions & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"slices"
"sort"
"strings"
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -298,6 +299,9 @@ type Component struct {
Component *proto.Component `yaml:"component,omitempty"`

OutputStatusReporting *StatusReporting `yaml:"-"`

// StartTime is the time when the component was first created.
StartTime time.Time `yaml:"-"`
}

type StatusReporting struct {
Expand Down Expand Up @@ -637,6 +641,7 @@ func (r *RuntimeSpecs) componentsForInputType(
Features: featureFlags.AsProto(),
Component: componentConfig.AsProto(),
OutputStatusReporting: extractStatusReporting(output.Config),
StartTime: time.Now(),
})
}
}
Expand Down Expand Up @@ -683,6 +688,7 @@ func (r *RuntimeSpecs) componentsForInputType(
Features: featureFlags.AsProto(),
Component: componentConfig.AsProto(),
OutputStatusReporting: extractStatusReporting(output.Config),
StartTime: time.Now(),
})
}
}
Expand Down
Loading