Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libbeat/docs/metrics-in-logs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ endif::[]
| `.output.events.total` | Integer | Number of events currently being processed by the output. | If this number grows over time, it may indicate that the output destination (e.g. {ls} pipeline or {es} cluster) is not able to accept events at the same or faster rate than what {beatname_uc} is sending to it.
| `.output.events.acked` | Integer | Number of events acknowledged by the output destination. | Generally, we want this number to be the same as `.output.events.total` as this indicates that the output destination has reliably received all the events sent to it.
| `.output.events.failed` | Integer | Number of events that {beatname_uc} tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it's useful to check {beatname_uc}'s logs right before this log entry's `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later.
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs.
|===

ifeval::["{beatname_lc}"=="filebeat"]
Expand Down
8 changes: 6 additions & 2 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error
func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) ([]publisher.Event, error) {
span, ctx := apm.StartSpan(ctx, "publishEvents", "output")
defer span.End()
begin := time.Now()

st := client.observer

if st != nil {
Expand All @@ -246,8 +246,10 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
return nil, nil
}

begin := time.Now()
params := map[string]string{"filter_path": "errors,items.*.error,items.*.status"}
status, result, sendErr := client.conn.Bulk(ctx, "", "", params, bulkItems)
timeSinceSend := time.Since(begin)

if sendErr != nil {
if status == http.StatusRequestEntityTooLarge {
Expand All @@ -265,7 +267,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)

client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.",
pubCount,
time.Since(begin))
timeSinceSend)

// check response for transient errors
var failedEvents []publisher.Event
Expand All @@ -289,6 +291,8 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
st.Dropped(dropped)
st.Duplicate(duplicates)
st.ErrTooMany(stats.tooMany)
st.ReportLatency(timeSinceSend)

}

if failed > 0 {
Expand Down
6 changes: 6 additions & 0 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"os"
"path/filepath"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
Expand Down Expand Up @@ -119,6 +120,7 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
st.NewBatch(len(events))

dropped := 0

for i := range events {
event := &events[i]

Expand All @@ -135,6 +137,7 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
continue
}

begin := time.Now()
if _, err = out.rotator.Write(append(serializedEvent, '\n')); err != nil {
st.WriteError(err)

Expand All @@ -149,9 +152,12 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error {
}

st.WriteBytes(len(serializedEvent) + 1)
took := time.Since(begin)
st.ReportLatency(took)
}

st.Dropped(dropped)

st.Acked(len(events) - dropped)

return nil
Expand Down
6 changes: 5 additions & 1 deletion libbeat/outputs/logstash/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {
}

for len(events) > 0 {

// check if we need to reconnect
if c.ticker != nil {
select {
Expand All @@ -136,12 +137,14 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {
err error
)

begin := time.Now()
if c.win == nil {
n, err = c.sendEvents(events)
} else {
n, err = c.publishWindowed(events)
}

took := time.Since(begin)
st.ReportLatency(took)
c.log.Debugf("%v events out of %v events sent to logstash host %s. Continue sending",
n, len(events), c.Host())

Expand All @@ -163,6 +166,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error {

return err
}

}

batch.ACK()
Expand Down
21 changes: 19 additions & 2 deletions libbeat/outputs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

package outputs

import "github.com/elastic/elastic-agent-libs/monitoring"
import (
"time"

"github.com/rcrowley/go-metrics"

"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

// Stats implements the Observer interface, for collecting metrics on common
// outputs events.
Expand Down Expand Up @@ -46,13 +53,15 @@ type Stats struct {

readBytes *monitoring.Uint // total amount of bytes read
readErrors *monitoring.Uint // total number of errors while waiting for response on output

sendLatencyMillis metrics.Sample
}

// NewStats creates a new Stats instance using a backing monitoring registry.
// This function will create and register a number of metrics with the registry passed.
// The registry must not be null.
func NewStats(reg *monitoring.Registry) *Stats {
return &Stats{
obj := &Stats{
batches: monitoring.NewUint(reg, "events.batches"),
events: monitoring.NewUint(reg, "events.total"),
acked: monitoring.NewUint(reg, "events.acked"),
Expand All @@ -69,7 +78,11 @@ func NewStats(reg *monitoring.Registry) *Stats {

readBytes: monitoring.NewUint(reg, "read.bytes"),
readErrors: monitoring.NewUint(reg, "read.errors"),

sendLatencyMillis: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "write.latency", adapter.Accept).Register("histogram", metrics.NewHistogram(obj.sendLatencyMillis))
return obj
}

// NewBatch updates active batch and event metrics.
Expand All @@ -81,6 +94,10 @@ func (s *Stats) NewBatch(n int) {
}
}

func (s *Stats) ReportLatency(time time.Duration) {
s.sendLatencyMillis.Update(time.Milliseconds())
}

// Acked updates active and acked event metrics.
func (s *Stats) Acked(n int) {
if s != nil {
Expand Down
52 changes: 28 additions & 24 deletions libbeat/outputs/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@

package outputs

import "time"

// Observer provides an interface used by outputs to report common events on
// documents/events being published and I/O workload.
type Observer interface {
NewBatch(int) // report new batch being processed with number of events
Acked(int) // report number of acked events
Failed(int) // report number of failed events
Dropped(int) // report number of dropped events
Duplicate(int) // report number of events detected as duplicates (e.g. on resends)
Cancelled(int) // report number of cancelled events
Split() // report a batch was split for being too large to ingest
WriteError(error) // report an I/O error on write
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read
ErrTooMany(int) // report too many requests response
NewBatch(int) // report new batch being processed with number of events
ReportLatency(time.Duration) // report the duration a send to the output takes
Acked(int) // report number of acked events
Failed(int) // report number of failed events
Dropped(int) // report number of dropped events
Duplicate(int) // report number of events detected as duplicates (e.g. on resends)
Cancelled(int) // report number of cancelled events
Split() // report a batch was split for being too large to ingest
WriteError(error) // report an I/O error on write
WriteBytes(int) // report number of bytes being written
ReadError(error) // report an I/O error on read
ReadBytes(int) // report number of bytes being read
ErrTooMany(int) // report too many requests response
}

type emptyObserver struct{}
Expand All @@ -43,15 +46,16 @@ func NewNilObserver() Observer {
return nilObserver
}

func (*emptyObserver) NewBatch(int) {}
func (*emptyObserver) Acked(int) {}
func (*emptyObserver) Duplicate(int) {}
func (*emptyObserver) Failed(int) {}
func (*emptyObserver) Dropped(int) {}
func (*emptyObserver) Cancelled(int) {}
func (*emptyObserver) Split() {}
func (*emptyObserver) WriteError(error) {}
func (*emptyObserver) WriteBytes(int) {}
func (*emptyObserver) ReadError(error) {}
func (*emptyObserver) ReadBytes(int) {}
func (*emptyObserver) ErrTooMany(int) {}
func (*emptyObserver) NewBatch(int) {}
func (*emptyObserver) ReportLatency(_ time.Duration) {}
func (*emptyObserver) Acked(int) {}
func (*emptyObserver) Duplicate(int) {}
func (*emptyObserver) Failed(int) {}
func (*emptyObserver) Dropped(int) {}
func (*emptyObserver) Cancelled(int) {}
func (*emptyObserver) Split() {}
func (*emptyObserver) WriteError(error) {}
func (*emptyObserver) WriteBytes(int) {}
func (*emptyObserver) ReadError(error) {}
func (*emptyObserver) ReadBytes(int) {}
func (*emptyObserver) ErrTooMany(int) {}
5 changes: 4 additions & 1 deletion libbeat/outputs/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,11 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn {
return nil, nil
}

start := time.Now()
// RPUSH returns total length of list -> fail and retry all on error
_, err := conn.Do(command, args...)
took := time.Since(start)
c.observer.ReportLatency(took)
if err != nil {
c.log.Errorf("Failed to %v to redis list with: %+v", command, err)
return okEvents, err
Expand Down Expand Up @@ -283,7 +286,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF
for i := range serialized {
_, err := conn.Receive()
if err != nil {
if _, ok := err.(redis.Error); ok {
if _, ok := err.(redis.Error); ok { //nolint:errorlint //this line checks against a type, not an instance of an error
c.log.Errorf("Failed to %v event to list with %+v",
command, err)
failed = append(failed, data[i])
Expand Down
25 changes: 13 additions & 12 deletions libbeat/outputs/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,15 +637,16 @@ type TestObserver struct {
errTooMany int
}

func (to *TestObserver) NewBatch(batch int) { to.batch += batch }
func (to *TestObserver) Acked(acked int) { to.acked += acked }
func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate }
func (to *TestObserver) Failed(failed int) { to.failed += failed }
func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped }
func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled }
func (to *TestObserver) Split() { to.split++ }
func (to *TestObserver) WriteError(we error) { to.writeError = we }
func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb }
func (to *TestObserver) ReadError(re error) { to.readError = re }
func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb }
func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err }
func (to *TestObserver) NewBatch(batch int) { to.batch += batch }
func (to *TestObserver) Acked(acked int) { to.acked += acked }
func (to *TestObserver) ReportLatency(_ time.Duration) {}
func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate }
func (to *TestObserver) Failed(failed int) { to.failed += failed }
func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped }
func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled }
func (to *TestObserver) Split() { to.split++ }
func (to *TestObserver) WriteError(we error) { to.writeError = we }
func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb }
func (to *TestObserver) ReadError(re error) { to.readError = re }
func (to *TestObserver) ReadBytes(rb int) { to.readBytes += rb }
func (to *TestObserver) ErrTooMany(err int) { to.errTooMany = +err }