Skip to content

Commit c6f15da

Browse files
authored
Merge branch '8.19' into mergify/bp/8.19/pr-8592
2 parents 3fa8247 + 0b2194d commit c6f15da

File tree

4 files changed

+152
-67
lines changed

4 files changed

+152
-67
lines changed

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -654,49 +654,47 @@ func (c *Coordinator) SetLogLevel(ctx context.Context, lvl *logp.Level) error {
654654
// watchRuntimeComponents listens for state updates from the runtime
655655
// manager, logs them, and forwards them to CoordinatorState.
656656
// Runs in its own goroutine created in Coordinator.Run.
657-
func (c *Coordinator) watchRuntimeComponents(ctx context.Context) {
658-
state := make(map[string]runtime.ComponentState)
657+
func (c *Coordinator) watchRuntimeComponents(
658+
ctx context.Context,
659+
runtimeComponentStates <-chan runtime.ComponentComponentState,
660+
otelStatuses <-chan *status.AggregateStatus,
661+
) {
662+
// We need to track otel component state separately because otel components may not always get a STOPPED status
663+
// If we receive an otel status without the state of a component we're tracking, we need to emit a fake STOPPED
664+
// status for it. Process component states should not be affected by this logic.
665+
processState := make(map[string]runtime.ComponentState)
666+
otelState := make(map[string]runtime.ComponentState)
659667

660-
var subChan <-chan runtime.ComponentComponentState
661-
var otelChan <-chan *status.AggregateStatus
662-
// A real Coordinator will always have a runtime manager, but unit tests
663-
// may not initialize all managers -- in that case we leave subChan nil,
664-
// and just idle until Coordinator shuts down.
665-
if c.runtimeMgr != nil {
666-
subChan = c.runtimeMgr.SubscribeAll(ctx).Ch()
667-
}
668-
if c.otelMgr != nil {
669-
otelChan = c.otelMgr.Watch()
670-
}
671668
for {
672669
select {
673670
case <-ctx.Done():
674671
return
675-
case componentState := <-subChan:
676-
logComponentStateChange(c.logger, state, &componentState)
672+
case componentState := <-runtimeComponentStates:
673+
logComponentStateChange(c.logger, processState, &componentState)
677674
// Forward the final changes back to Coordinator, unless our context
678675
// has ended.
679676
select {
680677
case c.managerChans.runtimeManagerUpdate <- componentState:
681678
case <-ctx.Done():
682679
return
683680
}
684-
case otelStatus := <-otelChan:
681+
case otelStatus := <-otelStatuses:
685682
// We don't break on errors here, because we want to forward the status
686683
// even if there was an error, and the rest of the code gracefully handles componentStates being nil
687684
componentStates, err := translate.GetAllComponentStates(otelStatus, c.componentModel)
688685
if err != nil {
689686
c.setOTelError(err)
690687
}
691-
err = translate.DropComponentStateFromOtelStatus(otelStatus)
688+
finalOtelStatus, err := translate.DropComponentStateFromOtelStatus(otelStatus)
692689
if err != nil {
693690
c.setOTelError(err)
691+
finalOtelStatus = otelStatus
694692
}
695693

696694
// forward the remaining otel status
697695
// TODO: Implement subscriptions for otel manager status to avoid the need for this
698696
select {
699-
case c.managerChans.otelManagerUpdate <- otelStatus:
697+
case c.managerChans.otelManagerUpdate <- finalOtelStatus:
700698
case <-ctx.Done():
701699
return
702700
}
@@ -707,7 +705,7 @@ func (c *Coordinator) watchRuntimeComponents(ctx context.Context) {
707705
for _, componentState := range componentStates {
708706
componentIds[componentState.Component.ID] = true
709707
}
710-
for id := range state {
708+
for id := range otelState {
711709
if _, ok := componentIds[id]; !ok {
712710
// this component is not in the configuration anymore, emit a fake STOPPED state
713711
componentStates = append(componentStates, runtime.ComponentComponentState{
@@ -722,7 +720,7 @@ func (c *Coordinator) watchRuntimeComponents(ctx context.Context) {
722720
}
723721
// now handle the component states
724722
for _, componentState := range componentStates {
725-
logComponentStateChange(c.logger, state, &componentState)
723+
logComponentStateChange(c.logger, otelState, &componentState)
726724
// Forward the final changes back to Coordinator, unless our context
727725
// has ended.
728726
select {
@@ -813,7 +811,19 @@ func (c *Coordinator) Run(ctx context.Context) error {
813811
// log all changes in the state of the runtime and update the coordinator state
814812
watchCtx, watchCanceller := context.WithCancel(ctx)
815813
defer watchCanceller()
816-
go c.watchRuntimeComponents(watchCtx)
814+
815+
var subChan <-chan runtime.ComponentComponentState
816+
var otelChan <-chan *status.AggregateStatus
817+
// A real Coordinator will always have a runtime manager, but unit tests
818+
// may not initialize all managers -- in that case we leave subChan nil,
819+
// and just idle until Coordinator shuts down.
820+
if c.runtimeMgr != nil {
821+
subChan = c.runtimeMgr.SubscribeAll(ctx).Ch()
822+
}
823+
if c.otelMgr != nil {
824+
otelChan = c.otelMgr.Watch()
825+
}
826+
go c.watchRuntimeComponents(watchCtx, subChan, otelChan)
817827

818828
// Close the state broadcaster on finish, but leave it running in the
819829
// background until all subscribers have read the final values or their

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 84 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,42 +1341,43 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) {
13411341
logger := logp.NewLogger("testing")
13421342

13431343
statusChan := make(chan *status.AggregateStatus)
1344-
otelManager := &fakeOTelManager{
1345-
statusChan: statusChan,
1346-
}
1347-
1348-
componentModel := []component.Component{
1349-
{
1350-
ID: "filestream-default",
1351-
InputType: "filestream",
1352-
OutputType: "elasticsearch",
1353-
RuntimeManager: component.OtelRuntimeManager,
1354-
InputSpec: &component.InputRuntimeSpec{
1355-
BinaryName: "agentbeat",
1356-
Spec: component.InputSpec{
1357-
Command: &component.CommandSpec{
1358-
Args: []string{"filebeat"},
1359-
},
1344+
1345+
runtimeStateChan := make(chan runtime.ComponentComponentState)
1346+
1347+
otelComponent := component.Component{
1348+
ID: "filestream-default",
1349+
InputType: "filestream",
1350+
OutputType: "elasticsearch",
1351+
RuntimeManager: component.OtelRuntimeManager,
1352+
InputSpec: &component.InputRuntimeSpec{
1353+
BinaryName: "agentbeat",
1354+
Spec: component.InputSpec{
1355+
Command: &component.CommandSpec{
1356+
Args: []string{"filebeat"},
13601357
},
13611358
},
1362-
Units: []component.Unit{
1363-
{
1364-
ID: "filestream-unit",
1365-
Type: client.UnitTypeInput,
1366-
Config: &proto.UnitExpectedConfig{
1367-
Streams: []*proto.Stream{
1368-
{Id: "test-1"},
1369-
{Id: "test-2"},
1370-
},
1359+
},
1360+
Units: []component.Unit{
1361+
{
1362+
ID: "filestream-unit",
1363+
Type: client.UnitTypeInput,
1364+
Config: &proto.UnitExpectedConfig{
1365+
Streams: []*proto.Stream{
1366+
{Id: "test-1"},
1367+
{Id: "test-2"},
13711368
},
13721369
},
1373-
{
1374-
ID: "filestream-default",
1375-
Type: client.UnitTypeOutput,
1376-
},
1370+
},
1371+
{
1372+
ID: "filestream-default",
1373+
Type: client.UnitTypeOutput,
13771374
},
13781375
},
13791376
}
1377+
processComponent := otelComponent
1378+
processComponent.RuntimeManager = component.ProcessRuntimeManager
1379+
processComponent.ID = "filestream-process"
1380+
13801381
otelStatus := &status.AggregateStatus{
13811382
Event: componentstatus.NewEvent(componentstatus.StatusOK),
13821383
ComponentStatusMap: map[string]*status.AggregateStatus{
@@ -1411,17 +1412,17 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) {
14111412
otelManagerUpdate: make(chan *status.AggregateStatus),
14121413
runtimeManagerUpdate: make(chan runtime.ComponentComponentState),
14131414
},
1414-
otelMgr: otelManager,
1415-
state: State{},
1416-
componentModel: componentModel,
1415+
state: State{},
14171416
}
14181417

14191418
// start runtime status watching
1420-
go coord.watchRuntimeComponents(ctx)
1419+
go coord.watchRuntimeComponents(ctx, runtimeStateChan, statusChan)
14211420

14221421
// no component status
14231422
assert.Empty(t, coord.state.Components)
14241423

1424+
coord.componentModel = []component.Component{otelComponent}
1425+
14251426
// push the status into the coordinator
14261427
select {
14271428
case statusChan <- otelStatus:
@@ -1446,8 +1447,57 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) {
14461447

14471448
assert.Len(t, coord.state.Components, 1)
14481449

1450+
// Add both a process component and an otel component, in that order. Both should appear in the state.
1451+
coord.componentModel = []component.Component{otelComponent, processComponent}
1452+
1453+
// push the process component state into the coordinator
1454+
select {
1455+
case runtimeStateChan <- runtime.ComponentComponentState{
1456+
Component: processComponent,
1457+
State: runtime.ComponentState{
1458+
State: client.UnitStateHealthy,
1459+
},
1460+
}:
1461+
case <-ctx.Done():
1462+
t.Fatal("timeout waiting for coordinator to receive status")
1463+
}
1464+
1465+
select {
1466+
case componentState := <-coord.managerChans.runtimeManagerUpdate:
1467+
coord.applyComponentState(componentState)
1468+
case <-ctx.Done():
1469+
t.Fatal("timeout waiting for coordinator to receive status")
1470+
}
1471+
1472+
// push the otel status into the coordinator
1473+
select {
1474+
case statusChan <- otelStatus:
1475+
case <-ctx.Done():
1476+
t.Fatal("timeout waiting for coordinator to receive status")
1477+
}
1478+
1479+
select {
1480+
case finalOtelStatus := <-coord.managerChans.otelManagerUpdate:
1481+
// we shouldn't have any status remaining for the otel collector, as the status we've pushed earlier only
1482+
// contains beats receiver status for the "filestream-default" component
1483+
// this status is removed from the otel collector status, because it's reported as component state instead
1484+
assert.Empty(t, finalOtelStatus.ComponentStatusMap)
1485+
case <-ctx.Done():
1486+
t.Fatal("timeout waiting for coordinator to receive status")
1487+
}
1488+
1489+
select {
1490+
case componentState := <-coord.managerChans.runtimeManagerUpdate:
1491+
coord.applyComponentState(componentState)
1492+
case <-ctx.Done():
1493+
t.Fatal("timeout waiting for coordinator to receive status")
1494+
}
1495+
1496+
assert.Len(t, coord.state.Components, 2)
1497+
14491498
// Now, we remove the component and resend the same status. The component state should be deleted.
14501499
coord.componentModel = []component.Component{}
1500+
coord.state = State{}
14511501
select {
14521502
case statusChan <- otelStatus:
14531503
case <-ctx.Done():
@@ -1471,7 +1521,7 @@ func TestCoordinatorTranslatesOtelStatusToComponentState(t *testing.T) {
14711521

14721522
assert.Empty(t, coord.state.Components)
14731523

1474-
// Push an invalid status, there should be no component state, but there should be an otel status
1524+
// Push an invalid status, there should be no otel component state, but there should be an otel status
14751525
select {
14761526
case statusChan <- invalidOtelStatus:
14771527
case <-ctx.Done():

internal/pkg/otel/translate/status.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,26 +56,28 @@ func GetAllComponentStates(otelStatus *status.AggregateStatus, components []comp
5656

5757
// DropComponentStateFromOtelStatus removes the statuses of otel pipelines representing runtime components from the
5858
// given status.
59-
func DropComponentStateFromOtelStatus(otelStatus *status.AggregateStatus) error {
59+
func DropComponentStateFromOtelStatus(otelStatus *status.AggregateStatus) (*status.AggregateStatus, error) {
6060
if otelStatus == nil {
61-
return nil
61+
return nil, nil
6262
}
63-
for pipelineStatusId := range otelStatus.ComponentStatusMap {
63+
64+
newStatus := deepCopyStatus(otelStatus)
65+
for pipelineStatusId := range newStatus.ComponentStatusMap {
6466
pipelineId := &pipeline.ID{}
6567
componentKind, pipelineIdStr := parseEntityStatusId(pipelineStatusId)
6668
if componentKind != "pipeline" {
67-
return fmt.Errorf("pipeline status id %s is not a pipeline", pipelineStatusId)
69+
return nil, fmt.Errorf("pipeline status id %s is not a pipeline", pipelineStatusId)
6870
}
6971
err := pipelineId.UnmarshalText([]byte(pipelineIdStr)) // there's no ergonomic way to do this conversion
7072
if err != nil {
71-
return err
73+
return nil, err
7274
}
7375
if strings.HasPrefix(pipelineId.Name(), OtelNamePrefix) {
74-
delete(otelStatus.ComponentStatusMap, pipelineStatusId)
76+
delete(newStatus.ComponentStatusMap, pipelineStatusId)
7577
}
7678
}
7779

78-
return nil
80+
return newStatus, nil
7981
}
8082

8183
// getOtelRuntimePipelineStatuses finds otel pipeline statuses belonging to runtime components and returns them as a map
@@ -273,3 +275,24 @@ func parseEntityStatusId(id string) (kind string, entityId string) {
273275
}
274276
return parts[0], parts[1]
275277
}
278+
279+
// deepCopyStatus makes a deep copy of the status.
280+
func deepCopyStatus(otelStatus *status.AggregateStatus) *status.AggregateStatus {
281+
if otelStatus == nil {
282+
return nil
283+
}
284+
285+
newStatus := &status.AggregateStatus{
286+
Event: otelStatus.Event,
287+
}
288+
if otelStatus.ComponentStatusMap == nil {
289+
return newStatus
290+
}
291+
292+
newStatus.ComponentStatusMap = make(map[string]*status.AggregateStatus, len(otelStatus.ComponentStatusMap))
293+
for k, v := range otelStatus.ComponentStatusMap {
294+
newStatus.ComponentStatusMap[k] = deepCopyStatus(v)
295+
}
296+
297+
return newStatus
298+
}

internal/pkg/otel/translate/status_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,9 @@ func TestGetAllComponentState(t *testing.T) {
180180

181181
func TestDropComponentStateFromOtelStatus(t *testing.T) {
182182
t.Run("empty", func(t *testing.T) {
183-
err := DropComponentStateFromOtelStatus(nil)
183+
s, err := DropComponentStateFromOtelStatus(nil)
184184
require.NoError(t, err)
185+
require.Nil(t, s)
185186
})
186187

187188
t.Run("drop non otel", func(t *testing.T) {
@@ -195,10 +196,10 @@ func TestDropComponentStateFromOtelStatus(t *testing.T) {
195196
},
196197
},
197198
}
198-
err := DropComponentStateFromOtelStatus(otelStatus)
199+
s, err := DropComponentStateFromOtelStatus(otelStatus)
199200
require.NoError(t, err)
200-
assert.Len(t, otelStatus.ComponentStatusMap, 1)
201-
assert.Contains(t, otelStatus.ComponentStatusMap, "pipeline:logs")
201+
assert.Len(t, s.ComponentStatusMap, 1)
202+
assert.Contains(t, s.ComponentStatusMap, "pipeline:logs")
202203
})
203204

204205
t.Run("invalid status", func(t *testing.T) {
@@ -209,8 +210,9 @@ func TestDropComponentStateFromOtelStatus(t *testing.T) {
209210
},
210211
},
211212
}
212-
err := DropComponentStateFromOtelStatus(otelStatus)
213+
s, err := DropComponentStateFromOtelStatus(otelStatus)
213214
require.Error(t, err)
215+
require.Nil(t, s)
214216
assert.Equal(t, "pipeline status id logs is not a pipeline", err.Error())
215217
})
216218
}

0 commit comments

Comments
 (0)