Skip to content

Commit 76f4086

Browse files
authored
Add status reporting for Journald input (#42462)
This commit adds the status reporting for the Journald input. It also adds a debug log to the `UpdateStatus` function from `v2.Context`.
1 parent 04eac62 commit 76f4086

5 files changed

Lines changed: 96 additions & 12 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
409409
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
410410
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]
411411
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]
412+
- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462]
412413

413414
*Auditbeat*
414415

filebeat/input/journald/environment_test.go

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
3333
"github.com/elastic/beats/v7/libbeat/beat"
3434
"github.com/elastic/beats/v7/libbeat/common/acker"
35+
"github.com/elastic/beats/v7/libbeat/management/status"
3536
"github.com/elastic/beats/v7/libbeat/statestore"
3637
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
3738
conf "github.com/elastic/elastic-agent-libs/config"
@@ -40,10 +41,11 @@ import (
4041
)
4142

4243
type inputTestingEnvironment struct {
43-
t *testing.T
44-
workingDir string
45-
stateStore *testInputStore
46-
pipeline *mockPipelineConnector
44+
t *testing.T
45+
workingDir string
46+
stateStore *testInputStore
47+
pipeline *mockPipelineConnector
48+
statusReporter *mockStatusReporter
4749

4850
pluginInitOnce sync.Once
4951
plugin v2.Plugin
@@ -54,10 +56,11 @@ type inputTestingEnvironment struct {
5456

5557
func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
5658
return &inputTestingEnvironment{
57-
t: t,
58-
workingDir: t.TempDir(),
59-
stateStore: openTestStatestore(),
60-
pipeline: &mockPipelineConnector{},
59+
t: t,
60+
workingDir: t.TempDir(),
61+
stateStore: openTestStatestore(),
62+
pipeline: &mockPipelineConnector{},
63+
statusReporter: &mockStatusReporter{},
6164
}
6265
}
6366

@@ -95,7 +98,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input)
9598
}
9699
}()
97100

98-
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx}
101+
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, StatusReporter: e.statusReporter}
99102
if err := inp.Run(inputCtx, e.pipeline); err != nil {
100103
e.t.Errorf("input 'Run' method returned an error: %s", err)
101104
}
@@ -125,6 +128,25 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
125128
}, 5*time.Second, 10*time.Millisecond, &msg)
126129
}
127130

131+
func (e *inputTestingEnvironment) RequireStatuses(expected []statusUpdate) {
132+
t := e.t
133+
t.Helper()
134+
got := e.statusReporter.GetUpdates()
135+
if len(got) != len(expected) {
136+
t.Fatalf("expecting %d updates, got %d", len(expected), len(got))
137+
}
138+
139+
for i := range expected {
140+
g, e := got[i], expected[i]
141+
if g != e {
142+
t.Errorf(
143+
"expecting [%d] status update to be {state:%s, msg:%s}, got {state:%s, msg:%s}",
144+
i, e.state.String(), e.msg, g.state.String(), g.msg,
145+
)
146+
}
147+
}
148+
}
149+
128150
type testInputStore struct {
129151
registry *statestore.Registry
130152
}
@@ -251,3 +273,25 @@ func blockingACKer(starter context.Context) beat.EventListener {
251273
}
252274
})
253275
}
276+
277+
type statusUpdate struct {
278+
state status.Status
279+
msg string
280+
}
281+
282+
type mockStatusReporter struct {
283+
mutex sync.RWMutex
284+
updates []statusUpdate
285+
}
286+
287+
func (m *mockStatusReporter) UpdateStatus(status status.Status, msg string) {
288+
m.mutex.Lock()
289+
m.updates = append(m.updates, statusUpdate{status, msg})
290+
m.mutex.Unlock()
291+
}
292+
293+
func (m *mockStatusReporter) GetUpdates() []statusUpdate {
294+
m.mutex.RLock()
295+
defer m.mutex.RUnlock()
296+
return append([]statusUpdate{}, m.updates...)
297+
}

filebeat/input/journald/input.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
input "github.com/elastic/beats/v7/filebeat/input/v2"
3131
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
3232
"github.com/elastic/beats/v7/libbeat/feature"
33+
"github.com/elastic/beats/v7/libbeat/management/status"
3334
"github.com/elastic/beats/v7/libbeat/reader"
3435
"github.com/elastic/beats/v7/libbeat/reader/parser"
3536
conf "github.com/elastic/elastic-agent-libs/config"
@@ -155,6 +156,8 @@ func (inp *journald) Run(
155156
logger := ctx.Logger.
156157
With("path", src.Name()).
157158
With("input_id", inp.ID)
159+
160+
ctx.UpdateStatus(status.Starting, "Starting")
158161
currentCheckpoint := initCheckpoint(logger, cursor)
159162

160163
mode := inp.Seek
@@ -174,7 +177,9 @@ func (inp *journald) Run(
174177
journalctl.Factory,
175178
)
176179
if err != nil {
177-
return fmt.Errorf("could not start journal reader: %w", err)
180+
wrappedErr := fmt.Errorf("could not start journal reader: %w", err)
181+
ctx.UpdateStatus(status.Failed, wrappedErr.Error())
182+
return wrappedErr
178183
}
179184

180185
defer reader.Close()
@@ -187,6 +192,7 @@ func (inp *journald) Run(
187192
saveRemoteHostname: inp.SaveRemoteHostname,
188193
})
189194

195+
ctx.UpdateStatus(status.Running, "Running")
190196
for {
191197
entry, err := parser.Next()
192198
if err != nil {
@@ -198,14 +204,18 @@ func (inp *journald) Run(
198204
case errors.Is(err, journalctl.ErrRestarting):
199205
continue
200206
default:
201-
logger.Errorf("could not read event: %s", err)
207+
msg := fmt.Sprintf("could not read event: %s", err)
208+
ctx.UpdateStatus(status.Failed, msg)
209+
logger.Error(msg)
202210
return err
203211
}
204212
}
205213

206214
event := entry.ToEvent()
207215
if err := publisher.Publish(event, event.Private); err != nil {
208-
logger.Errorf("could not publish event: %s", err)
216+
msg := fmt.Sprintf("could not publish event: %s", err)
217+
ctx.UpdateStatus(status.Failed, msg)
218+
logger.Errorf(msg)
209219
return err
210220
}
211221
}

filebeat/input/journald/input_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield"
3838
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
3939
"github.com/elastic/beats/v7/libbeat/beat"
40+
"github.com/elastic/beats/v7/libbeat/management/status"
4041
"github.com/elastic/elastic-agent-libs/logp"
4142
"github.com/elastic/elastic-agent-libs/mapstr"
4243
)
@@ -336,6 +337,33 @@ func TestReaderAdapterCanHandleNonStringFields(t *testing.T) {
336337
}
337338
}
338339

340+
func TestInputCanReportStatus(t *testing.T) {
341+
out := decompress(t, filepath.Join("testdata", "multiple-boots.journal.gz"))
342+
343+
env := newInputTestingEnvironment(t)
344+
cfg := mapstr.M{
345+
"paths": []string{out},
346+
}
347+
inp := env.mustCreateInput(cfg)
348+
349+
ctx, cancelInput := context.WithCancel(context.Background())
350+
t.Cleanup(cancelInput)
351+
352+
env.startInput(ctx, inp)
353+
env.waitUntilEventCount(6)
354+
355+
env.RequireStatuses([]statusUpdate{
356+
{
357+
state: status.Starting,
358+
msg: "Starting",
359+
},
360+
{
361+
state: status.Running,
362+
msg: "Running",
363+
},
364+
})
365+
}
366+
339367
func decompress(t *testing.T, namegz string) string {
340368
t.Helper()
341369

filebeat/input/v2/input.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ type Context struct {
9797

9898
func (c Context) UpdateStatus(status status.Status, msg string) {
9999
if c.StatusReporter != nil {
100+
c.Logger.Debugf("updating status, status: '%s', message: '%s'", status.String(), msg)
100101
c.StatusReporter.UpdateStatus(status, msg)
101102
}
102103
}

0 commit comments

Comments
 (0)