Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (c *Persistence) IsCustomVisibilityStore() bool {
c.GetSecondaryVisibilityStoreConfig().CustomDataStoreConfig != nil
}

func (c *Persistence) IsElasticsearchVisibilityStore() bool {
return (c.VisibilityConfigExist() && c.DataStores[c.VisibilityStore].Elasticsearch != nil) ||
(c.SecondaryVisibilityConfigExist() && c.DataStores[c.SecondaryVisibilityStore].Elasticsearch != nil)
}

func (c *Persistence) GetVisibilityStoreConfig() DataStore {
return c.DataStores[c.VisibilityStore]
}
Expand Down
37 changes: 37 additions & 0 deletions common/namespace/nsregistry/fx.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package nsregistry

import (
"context"

"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.uber.org/fx"
)

var RegistryLifetimeHooksModule = fx.Options(
fx.Invoke(RegistryLifetimeHooks),
fx.Invoke(InitializeSearchAttributeMappingsHook),
)

func RegistryLifetimeHooks(
Expand All @@ -15,3 +22,33 @@ func RegistryLifetimeHooks(
) {
lc.Append(fx.StartStopHook(registry.Start, registry.Stop))
}

func InitializeSearchAttributeMappingsHook(
lc fx.Lifecycle,
cfg *config.Config,
clusterMetadata cluster.Metadata,
metadataManager persistence.MetadataManager,
clusterMetadataManager persistence.ClusterMetadataManager,
logger log.Logger,
) {
visDataStore := cfg.Persistence.GetVisibilityStoreConfig()
if visDataStore.Elasticsearch == nil {
return
}

visibilityIndexName := visDataStore.GetIndexName()
currentClusterName := clusterMetadata.GetCurrentClusterName()

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return InitializeSearchAttributeMappings(
ctx,
metadataManager,
clusterMetadataManager,
currentClusterName,
visibilityIndexName,
logger,
)
},
})
}
114 changes: 114 additions & 0 deletions common/namespace/nsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ package nsregistry

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/clock"
Expand All @@ -20,6 +22,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/pingable"
"go.temporal.io/server/common/searchattribute"
expmaps "golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -588,3 +591,114 @@ func namespaceStateChanged(old *namespace.Namespace, new *namespace.Namespace) b
old.ActiveClusterName() != new.ActiveClusterName() ||
old.ReplicationState() != new.ReplicationState()
}

// InitializeSearchAttributeMappings initializes namespace search attribute mappings
// for Elasticsearch visibility stores. This creates identity mappings for existing
// custom search attributes in the cluster metadata.
func InitializeSearchAttributeMappings(
ctx context.Context,
metadataManager persistence.MetadataManager,
clusterMetadataManager persistence.ClusterMetadataManager,
currentClusterName string,
visibilityIndexName string,
logger log.Logger,
) error {
// Fetch the latest cluster metadata with updated index search attributes
clusterMetadataResponse, err := clusterMetadataManager.GetClusterMetadata(
ctx,
&persistence.GetClusterMetadataRequest{ClusterName: currentClusterName},
)
if err != nil {
return fmt.Errorf("failed to fetch cluster metadata: %w", err)
}

indexSearchAttrs := clusterMetadataResponse.IndexSearchAttributes[visibilityIndexName]
if indexSearchAttrs == nil || len(indexSearchAttrs.CustomSearchAttributes) == 0 {
return nil
}

// List all namespaces
request := &persistence.ListNamespacesRequest{
PageSize: CacheRefreshPageSize,
IncludeDeleted: false,
}

for {
response, err := metadataManager.ListNamespaces(ctx, request)
if err != nil {
return fmt.Errorf("failed to list namespaces: %w", err)
}

for _, nsDetail := range response.Namespaces {
if err := updateSingleNamespaceSearchAttributeMappings(
ctx,
metadataManager,
nsDetail,
indexSearchAttrs.CustomSearchAttributes,
logger,
); err != nil {
logger.Warn("Failed to update namespace search attribute mappings",
tag.WorkflowNamespace(nsDetail.Namespace.Info.Name),
tag.Error(err))
// Continue with other namespaces
}
}

if len(response.NextPageToken) == 0 {
break
}
request.NextPageToken = response.NextPageToken
}

return nil
}

func updateSingleNamespaceSearchAttributeMappings(
ctx context.Context,
metadataManager persistence.MetadataManager,
nsDetail *persistence.GetNamespaceResponse,
clusterCustomSearchAttributes map[string]enumspb.IndexedValueType,
logger log.Logger,
) error {
fieldToAliasMap := nsDetail.Namespace.Config.CustomSearchAttributeAliases
if fieldToAliasMap == nil {
fieldToAliasMap = make(map[string]string)
}

needsUpdate := false
upsertFieldToAliasMap := make(map[string]string)
for fieldName, fieldType := range clusterCustomSearchAttributes {
if searchattribute.IsPreallocatedCSAFieldName(fieldName, fieldType) {
continue
}
if _, ok := fieldToAliasMap[fieldName]; ok {
continue
}
if _, ok := upsertFieldToAliasMap[fieldName]; ok {
continue
}
upsertFieldToAliasMap[fieldName] = fieldName
needsUpdate = true
}

if !needsUpdate {
return nil
}

// Update the namespace with new mappings
nsDetail.Namespace.Config.CustomSearchAttributeAliases = upsertFieldToAliasMap
err := metadataManager.UpdateNamespace(ctx, &persistence.UpdateNamespaceRequest{
Namespace: nsDetail.Namespace,
IsGlobalNamespace: nsDetail.IsGlobalNamespace,
NotificationVersion: nsDetail.NotificationVersion,
})
if err != nil {
return fmt.Errorf("failed to update namespace: %w", err)
}

logger.Info("Created identity mappings for namespace search attributes",
tag.WorkflowNamespace(nsDetail.Namespace.Info.Name),
tag.Number(int64(len(upsertFieldToAliasMap))))

return nil
}
Loading
Loading