diff --git a/changelog/30725.txt b/changelog/30725.txt new file mode 100644 index 00000000000..d393d36e1af --- /dev/null +++ b/changelog/30725.txt @@ -0,0 +1,3 @@ +```release-note:improvement +events: Add `vault_index` to an event's metadata if the metadata contains `modified=true`, to support client consistency controls when reading from Vault in response to an event where storage was modified. +``` diff --git a/vault/core.go b/vault/core.go index 954a1dedaae..17d840d5630 100644 --- a/vault/core.go +++ b/vault/core.go @@ -1365,7 +1365,7 @@ func NewCore(conf *CoreConfig) (*Core, error) { if err != nil { return nil, err } - events, err := eventbus.NewEventBus(nodeID, eventsLogger) + events, err := eventbus.NewEventBus(nodeID, eventsLogger, c) if err != nil { return nil, err } diff --git a/vault/core_stubs_oss.go b/vault/core_stubs_oss.go index ff45cb3769c..e31d59ae55e 100644 --- a/vault/core_stubs_oss.go +++ b/vault/core_stubs_oss.go @@ -103,6 +103,10 @@ func (c *Core) EntWaitUntilWALShipped(ctx context.Context, index uint64) bool { return true } +func (c *Core) GetCurrentWALHeader() string { + return "" +} + func (c *Core) SecretsSyncLicensedActivated() bool { return false } func (c *Core) IsMultisealEnabled() bool { return false } diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index 2eb112e52bf..60eba6c1d77 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/eventlogger/formatter_filters/cloudevents" "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-secure-stdlib/parseutil" "github.com/hashicorp/go-uuid" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" @@ -30,8 +31,9 @@ const ( // eventTypeAll is purely internal to the event bus. We use it to send all // events down one big firehose, and pipelines define their own filtering // based on what each subscriber is interested in. - eventTypeAll = "*" - defaultTimeout = 60 * time.Second + eventTypeAll = "*" + defaultTimeout = 60 * time.Second + eventMetadataVaultIndex = "vault_index" ) var ( @@ -55,6 +57,13 @@ type EventBus struct { timeout time.Duration filters *Filters cloudEventsFormatterFilter *cloudevents.FormatterFilter + walGetter StorageWALGetter +} + +// StorageWALGetter is an interface used to fetch the current storage index +// from core without importing core +type StorageWALGetter interface { + GetCurrentWALHeader() string } type pluginEventBus struct { @@ -111,6 +120,25 @@ func patchMountPath(data *logical.EventData, pluginInfo *logical.EventPluginInfo return data } +// getIndexForEvent returns the storage index (wal header) for events with +// metadata.modified=true. +func (bus *EventBus) getIndexForEvent(event *logical.EventReceived) (string, error) { + if event.Event == nil || event.Event.Metadata == nil || bus.walGetter == nil { + return "", nil + } + eventMetadataModified := event.Event.Metadata.GetFields()[logical.EventMetadataModified] + if eventMetadataModified != nil { + isModified, err := parseutil.ParseBool(eventMetadataModified.GetStringValue()) + if err != nil { + return "", fmt.Errorf("failed to parse event metadata modified: %w", err) + } + if isModified { + return bus.walGetter.GetCurrentWALHeader(), nil + } + } + return "", nil +} + // SendEventInternal sends an event to the event bus and routes it to all relevant subscribers. // This function does *not* wait for all subscribers to acknowledge before returning. // This function is meant to be used by trusted internal code, so it can specify details like the namespace @@ -136,6 +164,13 @@ func (bus *EventBus) SendEventInternal(_ context.Context, ns *namespace.Namespac eventReceived.Event = data } else { eventReceived.Event = patchMountPath(data, pluginInfo) + walStr, err := bus.getIndexForEvent(eventReceived) + if err != nil { + bus.logger.Warn("Failed to get index for event", "error", err) + } + if walStr != "" { + eventReceived.Event.Metadata.Fields[eventMetadataVaultIndex] = structpb.NewStringValue(walStr) + } } // We can't easily know when the SendEvent is complete, so we can't call the cancel function. @@ -170,7 +205,7 @@ func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.Even return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, false, data) } -func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) { +func NewEventBus(localNodeID string, logger hclog.Logger, c StorageWALGetter) (*EventBus, error) { broker, err := eventlogger.NewBroker() if err != nil { return nil, err @@ -205,6 +240,7 @@ func NewEventBus(localNodeID string, logger hclog.Logger) (*EventBus, error) { timeout: defaultTimeout, cloudEventsFormatterFilter: cloudEventsFormatterFilter, filters: NewFilters(localNodeID), + walGetter: c, }, nil } diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go index 40bbc6618d0..1caec832536 100644 --- a/vault/eventbus/bus_test.go +++ b/vault/eventbus/bus_test.go @@ -18,12 +18,13 @@ import ( "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/structpb" ) // TestBusBasics tests that basic event sending and subscribing function. func TestBusBasics(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -78,7 +79,7 @@ func TestBusBasics(t *testing.T) { // TestBusIgnoresSendContext tests that the context is ignored when sending to an event, // so that we do not give up too quickly. func TestBusIgnoresSendContext(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -119,7 +120,7 @@ func TestBusIgnoresSendContext(t *testing.T) { // TestSubscribeNonRootNamespace verifies that events for non-root namespaces // aren't filtered out by the bus. func TestSubscribeNonRootNamespace(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -162,7 +163,7 @@ func TestSubscribeNonRootNamespace(t *testing.T) { // TestNamespaceFiltering verifies that events for other namespaces are filtered out by the bus. func TestNamespaceFiltering(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -222,7 +223,7 @@ func TestNamespaceFiltering(t *testing.T) { // TestBus2Subscriptions verifies that events of different types are successfully routed to the correct subscribers. func TestBus2Subscriptions(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -293,7 +294,7 @@ func TestBusSubscriptionsCancel(t *testing.T) { for _, tc := range testCases { t.Run(fmt.Sprintf("cancel=%v", tc.cancel), func(t *testing.T) { subscriptions.Store(0) - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -396,7 +397,7 @@ func waitFor(t *testing.T, maxWait time.Duration, f func() bool) { // TestBusWildcardSubscriptions tests that a single subscription can receive // multiple event types using * for glob patterns. func TestBusWildcardSubscriptions(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -471,7 +472,7 @@ func TestBusWildcardSubscriptions(t *testing.T) { // TestDataPathIsPrependedWithMount tests that "data_path", if present in the // metadata, is prepended with the plugin's mount. func TestDataPathIsPrependedWithMount(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -625,7 +626,7 @@ func TestDataPathIsPrependedWithMount(t *testing.T) { // TestBexpr tests go-bexpr filters are evaluated on an event. func TestBexpr(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -705,7 +706,7 @@ func TestBexpr(t *testing.T) { // TestPipelineCleanedUp ensures pipelines are properly cleaned up after // subscriptions are closed. func TestPipelineCleanedUp(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -740,7 +741,7 @@ func TestPipelineCleanedUp(t *testing.T) { // TestSubscribeGlobal tests that the global filter subscription mechanism works. func TestSubscribeGlobal(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -776,7 +777,7 @@ func TestSubscribeGlobal(t *testing.T) { // TestSubscribeGlobal_WithApply tests that the global filter subscription mechanism works when using ApplyGlobalFilterChanges. func TestSubscribeGlobal_WithApply(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -821,7 +822,7 @@ func TestSubscribeGlobal_WithApply(t *testing.T) { // TestSubscribeClusterNode tests that the cluster node filter subscription // mechanism works. func TestSubscribeClusterNode(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -858,7 +859,7 @@ func TestSubscribeClusterNode(t *testing.T) { // TestSubscribeClusterNode_WithApply tests that the cluster node filter // subscription mechanism works when using ApplyClusterNodeFilterChanges. func TestSubscribeClusterNode_WithApply(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -900,7 +901,7 @@ func TestSubscribeClusterNode_WithApply(t *testing.T) { // TestClearGlobalFilter tests that clearing the global filter means no messages get through. func TestClearGlobalFilter(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -942,7 +943,7 @@ func TestClearGlobalFilter(t *testing.T) { // TestClearClusterNodeFilter tests that clearing a cluster node filter means no // messages get through. func TestClearClusterNodeFilter(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -983,7 +984,7 @@ func TestClearClusterNodeFilter(t *testing.T) { // TestNotifyOnGlobalFilterChanges tests that notifications on global filter changes are sent. func TestNotifyOnGlobalFilterChanges(t *testing.T) { - bus, err := NewEventBus("", nil) + bus, err := NewEventBus("", nil, nil) if err != nil { t.Fatal(err) } @@ -1026,7 +1027,7 @@ func TestNotifyOnGlobalFilterChanges(t *testing.T) { // TestNotifyOnLocalFilterChanges tests that notifications on local cluster node // filter changes are sent. func TestNotifyOnLocalFilterChanges(t *testing.T) { - bus, err := NewEventBus("somecluster", nil) + bus, err := NewEventBus("somecluster", nil, nil) if err != nil { t.Fatal(err) } @@ -1065,3 +1066,121 @@ func TestNotifyOnLocalFilterChanges(t *testing.T) { t.Fatal("We expected to get a global filter notification") } } + +type fakeWALGetter struct { + Header string +} + +func (f *fakeWALGetter) GetCurrentWALHeader() string { + return f.Header +} + +var _ StorageWALGetter = (*fakeWALGetter)(nil) + +// Test_getIndexForEvent tests the retrieval of the Vault storage index for an +// event based on its metadata. +func Test_getIndexForEvent(t *testing.T) { + tests := map[string]struct { + event *logical.EventReceived + walGetter StorageWALGetter + expectErr string + expectIndex string + }{ + "event with modified true": { + event: &logical.EventReceived{ + Event: &logical.EventData{ + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "modified": {Kind: &structpb.Value_StringValue{StringValue: "true"}}, + }, + }, + }, + }, + walGetter: &fakeWALGetter{"test-wal"}, + expectErr: "", + expectIndex: "test-wal", + }, + "event with modified false": { + event: &logical.EventReceived{ + Event: &logical.EventData{ + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "modified": {Kind: &structpb.Value_StringValue{StringValue: "false"}}, + }, + }, + }, + }, + walGetter: &fakeWALGetter{"test-wal"}, + expectErr: "", + expectIndex: "", + }, + "event with modified not set": { + event: &logical.EventReceived{ + Event: &logical.EventData{ + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "path": {Kind: &structpb.Value_StringValue{StringValue: "abc/"}}, + }, + }, + }, + }, + walGetter: &fakeWALGetter{"test-wal"}, + expectErr: "", + expectIndex: "", + }, + "event with modified invalid": { + event: &logical.EventReceived{ + Event: &logical.EventData{ + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "modified": {Kind: &structpb.Value_StringValue{StringValue: "invalid"}}, + }, + }, + }, + }, + walGetter: &fakeWALGetter{"test-wal"}, + expectErr: "failed to parse event metadata modified", + expectIndex: "", + }, + "event without metadata": { + event: &logical.EventReceived{ + Event: &logical.EventData{ + Metadata: nil, + }, + }, + expectErr: "", + expectIndex: "", + }, + "nil walGetter": { + event: &logical.EventReceived{ + Event: &logical.EventData{ + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "modified": {Kind: &structpb.Value_StringValue{StringValue: "true"}}, + }, + }, + }, + }, + walGetter: nil, + expectErr: "", + expectIndex: "", + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + bus, err := NewEventBus("", nil, tc.walGetter) + require.NoError(t, err) + bus.Start() + + index, err := bus.getIndexForEvent(tc.event) + if tc.expectErr != "" { + assert.ErrorContains(t, err, tc.expectErr) + assert.Equal(t, tc.expectIndex, index) + } else { + assert.NoError(t, err) + assert.Equal(t, tc.expectIndex, index) + } + }) + } +}