Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libbeat/docs/metrics-in-logs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ endif::[]
| `.output.events.total` | Integer | Number of events currently being processed by the output. | If this number grows over time, it may indicate that the output destination (e.g. {ls} pipeline or {es} cluster) is not able to accept events at the same or faster rate than what {beatname_uc} is sending to it.
| `.output.events.acked` | Integer | Number of events acknowledged by the output destination. | Generally, we want this number to be the same as `.output.events.total` as this indicates that the output destination has reliably received all the events sent to it.
| `.output.events.failed` | Integer | Number of events that {beatname_uc} tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it's useful to check {beatname_uc}'s logs right before this log entry's `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later.
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs.
|===

ifeval::["{beatname_lc}"=="filebeat"]
Expand Down
8 changes: 6 additions & 2 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error
func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) ([]publisher.Event, error) {
span, ctx := apm.StartSpan(ctx, "publishEvents", "output")
defer span.End()
begin := time.Now()

st := client.observer

if st != nil {
Expand All @@ -246,6 +246,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
return nil, nil
}

begin := time.Now()
params := map[string]string{"filter_path": "errors,items.*.error,items.*.status"}
status, result, sendErr := client.conn.Bulk(ctx, "", "", params, bulkItems)

Expand All @@ -263,9 +264,10 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
pubCount := len(data)
span.Context.SetLabel("events_published", pubCount)

timeSinceSend := time.Since(begin)
client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.",
pubCount,
time.Since(begin))
timeSinceSend)

// check response for transient errors
var failedEvents []publisher.Event
Expand All @@ -289,6 +291,8 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
st.Dropped(dropped)
st.Duplicate(duplicates)
st.ErrTooMany(stats.tooMany)
st.ReportLatency(timeSinceSend)

}

if failed > 0 {
Expand Down
6 changes: 6 additions & 0 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"os"
"path/filepath"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand Down Expand Up @@ -119,6 +120,7 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
st.NewBatch(len(events))

dropped := 0

for i := range events {
event := &events[i]

Expand All @@ -135,6 +137,7 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
continue
}

begin := time.Now()
if _, err = out.rotator.Write(append(serializedEvent, '\n')); err != nil {
st.WriteError(err)

Expand All @@ -149,9 +152,12 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
}

st.WriteBytes(len(serializedEvent) + 1)
took := time.Since(begin)
st.ReportLatency(took)
}

st.Dropped(dropped)

st.Acked(len(events) - dropped)

return nil
Expand Down
3 changes: 3 additions & 0 deletions libbeat/outputs/logstash/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {
}

for len(events) > 0 {
begin := time.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is too large of a span, we have reconnect and windowed events in here. I think we need to re-think this part. We really just want the time span to start right before the data is sent over the network, and end right when we know if data is written (ACK), even if there is an error we want to "bill" the time spent communicating with logstash.

FYI it's OK if we want to pull out logstash from this PR and make a new issue to track that going forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah. I actually kinda chose that span deliberately, having reconnect be part of the count actually seemed fairly accurate? It's a blocking operation, so a reconnect would would influence the latency?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It definitely increases the overall latency for sending the event, but that isn't what this metric is focused on. This is just trying to get a sense of how long does it take for the output to receive the data and then get back to us with acknowledgment that the data is written.

An example might help, if the events/sec drop, but this metric goes up, we have a good indicator that we should be looking at the output (logstash/elasticsearch/kafka) for issues, not the beat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, that makes sense. Tinkered with it a bit.

// check if we need to reconnect
if c.ticker != nil {
select {
Expand Down Expand Up @@ -163,6 +164,8 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {

return err
}
took := time.Since(begin)
st.ReportLatency(took)
}

batch.ACK()
Expand Down
21 changes: 19 additions & 2 deletions libbeat/outputs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

package outputs

import "github.com/elastic/elastic-agent-libs/monitoring"
import (
"time"

"github.com/rcrowley/go-metrics"

"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

// Stats implements the Observer interface, for collecting metrics on common
// outputs events.
Expand Down Expand Up @@ -46,13 +53,15 @@ type Stats struct {

readBytes *monitoring.Uint // total amount of bytes read
readErrors *monitoring.Uint // total number of errors while waiting for response on output

sendLatencyMillis metrics.Sample
}

// NewStats creates a new Stats instance using a backing monitoring registry.
// This function will create and register a number of metrics with the registry passed.
// The registry must not be null.
func NewStats(reg *monitoring.Registry) *Stats {
return &Stats{
obj := &Stats{
batches: monitoring.NewUint(reg, "events.batches"),
events: monitoring.NewUint(reg, "events.total"),
acked: monitoring.NewUint(reg, "events.acked"),
Expand All @@ -69,7 +78,11 @@ func NewStats(reg *monitoring.Registry) *Stats {

readBytes: monitoring.NewUint(reg, "read.bytes"),
readErrors: monitoring.NewUint(reg, "read.errors"),

sendLatencyMillis: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "write.latency", adapter.Accept).Register("histogram", metrics.NewHistogram(obj.sendLatencyMillis))
return obj
}

// NewBatch updates active batch and event metrics.
Expand All @@ -81,6 +94,10 @@ func (s *Stats) NewBatch(n int) {
}
}

func (s *Stats) ReportLatency(time time.Duration) {
s.sendLatencyMillis.Update(time.Milliseconds())
}

// Acked updates active and acked event metrics.
func (s *Stats) Acked(n int) {
if s != nil {
Expand Down
52 changes: 28 additions & 24 deletions libbeat/outputs/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@

package outputs

import "time"

// Observer provides an interface used by outputs to report common events on
// documents/events being published and I/O workload.
type Observer interface {
NewBatch(int) // report new batch being processed with number of events
Acked(int) // report number of acked events
Failed(int) // report number of failed events
Dropped(int) // report number of dropped events
Duplicate(int) // report number of events detected as duplicates (e.g. on resends)
Cancelled(int) // report number of cancelled events
Split() // report a batch was split for being too large to ingest
WriteError(error) // report an I/O error on write
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read
ErrTooMany(int) // report too many requests response
NewBatch(int) // report new batch being processed with number of events
ReportLatency(time.Duration) // report the duration a send to the output takes
Acked(int) // report number of acked events
Failed(int) // report number of failed events
Dropped(int) // report number of dropped events
Duplicate(int) // report number of events detected as duplicates (e.g. on resends)
Cancelled(int) // report number of cancelled events
Split() // report a batch was split for being too large to ingest
WriteError(error) // report an I/O error on write
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read
ErrTooMany(int) // report too many requests response
}

type emptyObserver struct{}
Expand All @@ -43,15 +46,16 @@ func NewNilObserver() Observer {
return nilObserver
}

func (*emptyObserver) NewBatch(int) {}
func (*emptyObserver) Acked(int) {}
func (*emptyObserver) Duplicate(int) {}
func (*emptyObserver) Failed(int) {}
func (*emptyObserver) Dropped(int) {}
func (*emptyObserver) Cancelled(int) {}
func (*emptyObserver) Split() {}
func (*emptyObserver) WriteError(error) {}
func (*emptyObserver) WriteBytes(int) {}
func (*emptyObserver) ReadError(error) {}
func (*emptyObserver) ReadBytes(int) {}
func (*emptyObserver) ErrTooMany(int) {}
func (*emptyObserver) NewBatch(int) {}
func (*emptyObserver) ReportLatency(_ time.Duration) {}
func (*emptyObserver) Acked(int) {}
func (*emptyObserver) Duplicate(int) {}
func (*emptyObserver) Failed(int) {}
func (*emptyObserver) Dropped(int) {}
func (*emptyObserver) Cancelled(int) {}
func (*emptyObserver) Split() {}
func (*emptyObserver) WriteError(error) {}
func (*emptyObserver) WriteBytes(int) {}
func (*emptyObserver) ReadError(error) {}
func (*emptyObserver) ReadBytes(int) {}
func (*emptyObserver) ErrTooMany(int) {}
5 changes: 4 additions & 1 deletion libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn {
return nil, nil
}

start := time.Now()
// RPUSH returns total length of list -> fail and retry all on error
_, err := conn.Do(command, args...)
if err != nil {
Expand All @@ -241,6 +242,8 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn {

}

took := time.Since(start)
c.observer.ReportLatency(took)
c.observer.Acked(len(okEvents))
return nil, nil
}
Expand Down Expand Up @@ -283,7 +286,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF
for i := range serialized {
_, err := conn.Receive()
if err != nil {
if _, ok := err.(redis.Error); ok {
if _, ok := err.(redis.Error); ok { //nolint:errorlint //this line checks against a type, not an instance of an error
c.log.Errorf("Failed to %v event to list with %+v",
command, err)
failed = append(failed, data[i])
Expand Down
25 changes: 13 additions & 12 deletions libbeat/outputs/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,15 +637,16 @@ type TestObserver struct {
errTooMany int
}

func (to *TestObserver) NewBatch(batch int) { to.batch += batch }
func (to *TestObserver) Acked(acked int) { to.acked += acked }
func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate }
func (to *TestObserver) Failed(failed int) { to.failed += failed }
func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped }
func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled }
func (to *TestObserver) Split() { to.split++ }
func (to *TestObserver) WriteError(we error) { to.writeError = we }
func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb }
func (to *TestObserver) ReadError(re error) { to.readError = re }
func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb }
func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err }
func (to *TestObserver) NewBatch(batch int) { to.batch += batch }
func (to *TestObserver) Acked(acked int) { to.acked += acked }
func (to *TestObserver) ReportLatency(_ time.Duration) {}
func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate }
func (to *TestObserver) Failed(failed int) { to.failed += failed }
func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped }
func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled }
func (to *TestObserver) Split() { to.split++ }
func (to *TestObserver) WriteError(we error) { to.writeError = we }
func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb }
func (to *TestObserver) ReadError(re error) { to.readError = re }
func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb }
func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err }