Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/30725.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
events: Add `vault_index` to an event's metadata if the metadata contains `modified=true`, to support client consistency controls when reading from Vault in response to an event where storage was modified.
```
2 changes: 1 addition & 1 deletion vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
if err != nil {
return nil, err
}
events, err := eventbus.NewEventBus(nodeID, eventsLogger)
events, err := eventbus.NewEventBus(nodeID, eventsLogger, c)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions vault/core_stubs_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (c *Core) EntWaitUntilWALShipped(ctx context.Context, index uint64) bool {
return true
}

func (c *Core) GetCurrentWALHeader() string {
return ""
}

func (c *Core) SecretsSyncLicensedActivated() bool { return false }

func (c *Core) IsMultisealEnabled() bool { return false }
Expand Down
42 changes: 39 additions & 3 deletions vault/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hashicorp/eventlogger/formatter_filters/cloudevents"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/parseutil"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
Expand All @@ -30,8 +31,9 @@ const (
// eventTypeAll is purely internal to the event bus. We use it to send all
// events down one big firehose, and pipelines define their own filtering
// based on what each subscriber is interested in.
eventTypeAll = "*"
defaultTimeout = 60 * time.Second
eventTypeAll = "*"
defaultTimeout = 60 * time.Second
eventMetadataVaultIndex = "vault_index"
)

var (
Expand All @@ -55,6 +57,13 @@ type EventBus struct {
timeout time.Duration
filters *Filters
cloudEventsFormatterFilter *cloudevents.FormatterFilter
walGetter StorageWALGetter
}

// StorageWALGetter is an interface used to fetch the current storage index
// from core without importing core
type StorageWALGetter interface {
GetCurrentWALHeader() string
}

type pluginEventBus struct {
Expand Down Expand Up @@ -111,6 +120,25 @@ func patchMountPath(data *logical.EventData, pluginInfo *logical.EventPluginInfo
return data
}

// getIndexForEvent returns the storage index (wal header) for events with
// metadata.modified=true.
func (bus *EventBus) getIndexForEvent(event *logical.EventReceived) (string, error) {
if event.Event == nil || event.Event.Metadata == nil || bus.walGetter == nil {
return "", nil
}
eventMetadataModified := event.Event.Metadata.GetFields()[logical.EventMetadataModified]
if eventMetadataModified != nil {
isModified, err := parseutil.ParseBool(eventMetadataModified.GetStringValue())
if err != nil {
return "", fmt.Errorf("failed to parse event metadata modified: %w", err)
}
if isModified {
return bus.walGetter.GetCurrentWALHeader(), nil
}
}
return "", nil
}

// SendEventInternal sends an event to the event bus and routes it to all relevant subscribers.
// This function does *not* wait for all subscribers to acknowledge before returning.
// This function is meant to be used by trusted internal code, so it can specify details like the namespace
Expand All @@ -136,6 +164,13 @@ func (bus *EventBus) SendEventInternal(_ context.Context, ns *namespace.Namespac
eventReceived.Event = data
} else {
eventReceived.Event = patchMountPath(data, pluginInfo)
walStr, err := bus.getIndexForEvent(eventReceived)
if err != nil {
bus.logger.Warn("Failed to get index for event", "error", err)
}
if walStr != "" {
eventReceived.Event.Metadata.Fields[eventMetadataVaultIndex] = structpb.NewStringValue(walStr)
}
}

// We can't easily know when the SendEvent is complete, so we can't call the cancel function.
Expand Down Expand Up @@ -170,7 +205,7 @@ func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.Even
return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, false, data)
}

func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) {
func NewEventBus(localNodeID string, logger hclog.Logger, c StorageWALGetter) (*EventBus, error) {
broker, err := eventlogger.NewBroker()
if err != nil {
return nil, err
Expand Down Expand Up @@ -205,6 +240,7 @@ func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) {
timeout: defaultTimeout,
cloudEventsFormatterFilter: cloudEventsFormatterFilter,
filters: NewFilters(localNodeID),
walGetter: c,
}, nil
}

Expand Down
Loading
Loading