Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions serviceregistration/consul/consul_service_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ type serviceRegistration struct {
notifyActiveCh chan struct{}
notifySealedCh chan struct{}
notifyPerfStandbyCh chan struct{}
notifyInitializedCh chan struct{}

isActive, isSealed, isPerfStandby *atomicB.Bool
isActive *atomicB.Bool
isSealed *atomicB.Bool
isPerfStandby *atomicB.Bool
isInitialized *atomicB.Bool
}

// NewConsulServiceRegistration constructs a Consul-based ServiceRegistration.
Expand Down Expand Up @@ -210,10 +214,12 @@ func NewServiceRegistration(conf map[string]string, logger log.Logger, state sr.
notifyActiveCh: make(chan struct{}),
notifySealedCh: make(chan struct{}),
notifyPerfStandbyCh: make(chan struct{}),
notifyInitializedCh: make(chan struct{}),

isActive: atomicB.NewBool(state.IsActive),
isSealed: atomicB.NewBool(state.IsSealed),
isPerfStandby: atomicB.NewBool(state.IsPerformanceStandby),
isInitialized: atomicB.NewBool(state.IsInitialized),
}
return c, nil
}
Expand Down Expand Up @@ -269,9 +275,15 @@ func (c *serviceRegistration) NotifySealedStateChange(isSealed bool) error {
}

func (c *serviceRegistration) NotifyInitializedStateChange(isInitialized bool) error {
// This is not implemented because to date, Consul service registration has
// never reported out on whether Vault was initialized. We may someday want to
// do this, but it has not yet been requested.
c.isInitialized.Store(isInitialized)
select {
case c.notifyInitializedCh <- struct{}{}:
default:
// NOTE: If this occurs Vault's initialized status could be out of
// sync with Consul until checkTimer expires.
c.logger.Warn("concurrent initalize state change notify dropped")
}

return nil
}

Expand Down Expand Up @@ -329,7 +341,10 @@ func (c *serviceRegistration) runEventDemuxer(waitGroup *sync.WaitGroup, shutdow
// Run check timer immediately upon a seal state change notification
checkTimer.Reset(0)
case <-c.notifyPerfStandbyCh:
// Run check timer immediately upon a seal state change notification
// Run check timer immediately upon a perfstandby state change notification
checkTimer.Reset(0)
case <-c.notifyInitializedCh:
// Run check timer immediately upon an initialized state change notification
checkTimer.Reset(0)
case <-reconcileTimer.C:
// Unconditionally rearm the reconcileTimer
Expand Down Expand Up @@ -428,7 +443,7 @@ func (c *serviceRegistration) reconcileConsul(registeredServiceID string) (servi
}
}

tags := c.fetchServiceTags(c.isActive.Load(), c.isPerfStandby.Load())
tags := c.fetchServiceTags(c.isActive.Load(), c.isPerfStandby.Load(), c.isInitialized.Load())

var reregister bool

Expand Down Expand Up @@ -505,7 +520,7 @@ func (c *serviceRegistration) runCheck(sealed bool) error {
}

// fetchServiceTags returns all of the relevant tags for Consul.
func (c *serviceRegistration) fetchServiceTags(active bool, perfStandby bool) []string {
func (c *serviceRegistration) fetchServiceTags(active, perfStandby, initialized bool) []string {
activeTag := "standby"
if active {
activeTag = "active"
Expand All @@ -517,6 +532,10 @@ func (c *serviceRegistration) fetchServiceTags(active bool, perfStandby bool) []
result = append(c.serviceTags, "performance-standby")
}

if initialized {
result = append(result, "initialized")
}

return result
}

Expand Down
89 changes: 31 additions & 58 deletions serviceregistration/consul/consul_service_registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/testhelpers/consul"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/helper/strutil"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/sdk/physical/inmem"
sr "github.com/hashicorp/vault/serviceregistration"
Expand Down Expand Up @@ -145,65 +144,10 @@ func TestConsul_ServiceRegistration(t *testing.T) {

waitForServices(t, map[string][]string{
"consul": []string{},
"vault": []string{"active"},
"vault": []string{"active", "initialized"},
})
}

func TestConsul_ServiceTags(t *testing.T) {
consulConfig := map[string]string{
"path": "seaTech/",
"service": "astronomy",
"service_tags": "deadbeef, cafeefac, deadc0de, feedface",
"redirect_addr": "http://127.0.0.2:8200",
"check_timeout": "6s",
"address": "127.0.0.2",
"scheme": "https",
"token": "deadbeef-cafeefac-deadc0de-feedface",
"max_parallel": "4",
"disable_registration": "false",
}
logger := logging.NewVaultLogger(log.Debug)

shutdownCh := make(chan struct{})
defer func() {
close(shutdownCh)
}()

be, err := NewServiceRegistration(consulConfig, logger, sr.State{})
if err != nil {
t.Fatal(err)
}
if err := be.Run(shutdownCh, &sync.WaitGroup{}, ""); err != nil {
t.Fatal(err)
}

c, ok := be.(*serviceRegistration)
if !ok {
t.Fatalf("failed to create physical Consul backend")
}

expected := []string{"deadbeef", "cafeefac", "deadc0de", "feedface"}
actual := c.fetchServiceTags(false, false)
if !strutil.EquivalentSlices(actual, append(expected, "standby")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "standby"), actual)
}

actual = c.fetchServiceTags(true, false)
if !strutil.EquivalentSlices(actual, append(expected, "active")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "active"), actual)
}

actual = c.fetchServiceTags(false, true)
if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual)
}

actual = c.fetchServiceTags(true, true)
if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) {
t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual)
}
}

func TestConsul_ServiceAddress(t *testing.T) {
tests := []struct {
consulConfig map[string]string
Expand Down Expand Up @@ -418,34 +362,63 @@ func TestConsul_serviceTags(t *testing.T) {
tests := []struct {
active bool
perfStandby bool
initialized bool
tags []string
}{
{
active: true,
perfStandby: false,
initialized: false,
tags: []string{"active"},
},
{
active: false,
perfStandby: false,
initialized: false,
tags: []string{"standby"},
},
{
active: false,
perfStandby: true,
initialized: false,
tags: []string{"performance-standby"},
},
{
active: true,
perfStandby: true,
initialized: false,
tags: []string{"performance-standby"},
},
{
active: true,
perfStandby: false,
initialized: true,
tags: []string{"active", "initialized"},
},
{
active: false,
perfStandby: false,
initialized: true,
tags: []string{"standby", "initialized"},
},
{
active: false,
perfStandby: true,
initialized: true,
tags: []string{"performance-standby", "initialized"},
},
{
active: true,
perfStandby: true,
initialized: true,
tags: []string{"performance-standby", "initialized"},
},
}

c := testConsulServiceRegistration(t)

for _, test := range tests {
tags := c.fetchServiceTags(test.active, test.perfStandby)
tags := c.fetchServiceTags(test.active, test.perfStandby, test.initialized)
if !reflect.DeepEqual(tags[:], test.tags[:]) {
t.Errorf("Bad %v: %v %v", test.active, tags, test.tags)
}
Expand Down
4 changes: 2 additions & 2 deletions serviceregistration/service_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ type ServiceRegistration interface {
// in the face of errors.
NotifyPerformanceStandbyStateChange(isStandby bool) error

// NotifyInitializedStateChange is used by Core to notify that the core is
// initialized.
// NotifyInitializedStateChange is used by Core to notify that storage
// has been initialized. An unsealed core will always also be initialized.
// If errors are returned, Vault only logs a warning, so it is
// the implementation's responsibility to retry updating state
// in the face of errors.
Expand Down
5 changes: 5 additions & 0 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,11 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
c.logger.Warn("failed to notify unsealed status", "error", err)
}
}
if err := c.serviceRegistration.NotifyInitializedStateChange(true); err != nil {
if c.logger.IsWarn() {
c.logger.Warn("failed to notify initialized status", "error", err)
}
}
}
return true, nil
}
Expand Down