-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add implementation of CHASM List/Count Runs #8662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
common/persistence/visibility/store/elasticsearch/visibility_store.go
Outdated
Show resolved
Hide resolved
common/persistence/visibility/store/elasticsearch/visibility_store.go
Outdated
Show resolved
Hide resolved
457d6b9 to
e63973b
Compare
1b7b1d1 to
db42bca
Compare
c71cd57 to
6d0b6fa
Compare
| UserMemoPrefix = "__user__" | ||
| ChasmMemoPrefix = "__chasm__" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| UserMemoPrefix = "__user__" | |
| ChasmMemoPrefix = "__chasm__" | |
| UserMemoKey = "__user__" | |
| ChasmMemoKey = "__chasm__" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change the var names.
| fieldName, err := searchAttributesMapper.GetFieldName(alias, namespaceEntry.Name().String()) | ||
| if err != nil { | ||
| return err | ||
| t.logger.Warn("Failed to get field name for alias, ignoring search attribute", tag.NewStringTag("alias", alias), tag.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are ignore, you need to continue
| t.logger.Warn("Failed to get field name for alias, ignoring search attribute", tag.NewStringTag("alias", alias), tag.Error(err)) | |
| t.logger.Warn("Failed to get field name for alias, ignoring search attribute", tag.NewStringTag("alias", alias), tag.Error(err)) | |
| continue |
256aa5d to
74fae89
Compare
74fae89 to
23c1374
Compare
bergundy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will let others review the implementation but the API LGTM.
Just please document all of the exported functions. CHASM will be used by a lot of developers and we want to help people out as much as possible.
| NextPageToken []byte | ||
| } | ||
|
|
||
| type ListExecutionsOptions struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be removed now.
| NamespaceID string | ||
| NamespaceName string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we need to take in both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NamespaceID is what we store in persistence, and NamespaceName is needed for dynamic config and custom search attributes mapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting that matching needs visibility manager as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler has some constraints in their query, so it needs the visibility manager to build the query converter IIRC.
| } | ||
| } | ||
|
|
||
| combinedMemo := make(map[string]*commonpb.Payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| combinedMemo := make(map[string]*commonpb.Payload) | |
| combinedMemo := make(map[string]*commonpb.Payload, 2) |
| return fqn, ok | ||
| } | ||
|
|
||
| // ComponentByID returns the registrable component for a given archetype ID. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plz also comment that those methods should not be used by chasm library developer. and also for RegistrableComponent.SearchAttributesMapper()
we probably need to create a separate interface for developer facing registry later.
| s.Len(root.currentSA, 3) | ||
| s.Contains(root.currentSA, "TemporalDatetime01") | ||
| s.True(root.currentSA["TemporalDatetime01"].(VisibilityValueTime).Equal(VisibilityValueTime(now))) | ||
| s.True(root.currentMemo[TestComponentStartTimeMemoKey].(VisibilityValueTime).Equal(VisibilityValueTime(now))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should be able to assert the memo value? here and below?
| if ok { | ||
| newMemo := memoProvider.Memo(immutableContext) | ||
| if !maps.EqualFunc(n.currentMemo, newMemo, isVisibilityValueEqual) { | ||
| if !proto.Equal(n.currentMemo, newMemo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I think now that we are using proto.Message we no longer have guarantee that there's no error upon encoding in task executor, but guess that should never happen.
Something for later, since the currentMemo/SA here are always consistent with CHASM tree state, in visibility task executor we actually don't have to invoke SA/Memo provider again, we can just serialize & buffer the value here (and now we can also return error upon closeTransaction if it can't be encoded), and use it in the vis task executor.
| Count int64 | ||
| } | ||
|
|
||
| ChasmExecutionInfo struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that persistence can depend on chasm package, for those new struct definitions, is it possible to reuse the corresponding definitions in chasm package and thus avoiding the conversion between the different struct definition of essentially the same thing in chasm_engine.go?
| InitialFailoverVersion: resp.GetInitialFailoverVersion(), | ||
| IsGlobalNamespaceEnabled: resp.GetIsGlobalNamespaceEnabled(), | ||
| IsConnectionEnabled: request.GetEnableRemoteClusterConnection(), | ||
| IsReplicationEnabled: request.GetIsReplicationEnabled(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this intentional?
| var userMemo *commonpb.Memo | ||
| var chasmMemoPayload *commonpb.Payload | ||
|
|
||
| // Check if archetype matches to decide how to split memo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm why not just check if __chasm__ is a key in the combinedMemo?
if we are worrying about potential conflict (though I feel that chance is very low...), I'd probably just check if TemporalNamespaceDivision string is a number or not (just the first char should be enough), since we are going to use this method to query across archetypes, e.g. in namespace migration case, I just want all visibility records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm concerned about potential conflict (although very low).
I'm ok with checking if TemporalNamespaceDivision is a number.
|
|
||
| // GetType returns the type of a CHASM search attribute field. | ||
| // Returns an error if the field is not found in the type map. | ||
| func (v *VisibilitySearchAttributesMapper) GetType(fieldName string) (enumspb.IndexedValueType, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename function: ValueType or ValueTypeOf
| UserMemoPrefix = "__user__" | ||
| ChasmMemoPrefix = "__chasm__" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change the var names.
| // Get field type, checking CHASM mapper first if applicable | ||
| var tp enumspb.IndexedValueType | ||
| if sadefs.IsChasmSearchAttribute(colName) { | ||
| var err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err is already defined in the scope of this function, no need to create a new var.
| tp, err = c.chasmMapper.GetType(colName) | ||
| if err != nil { | ||
| return nil, NewConverterError("invalid search attribute: %s", colName) | ||
| } | ||
| } else { | ||
| var err error | ||
| tp, err = c.saNameType.GetType(colName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you can actually combine these maps and do the check together. These are field names, so there shouldn't be any conflicting keys.
| enableUnifiedQueryConverter dynamicconfig.BoolPropertyFn | ||
| } | ||
|
|
||
| listWorkflowExecutionsRequestInternal struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| listWorkflowExecutionsRequestInternal struct { | |
| listExecutionsInternalRequest struct { |
What changed?
Add implementation of CHASM List/Count Runs. Given a CHASM archetype and supported MemoType, a Visibility Query can be ran against both custom and CHASM search attributes. Both operations wrap around the Visibility Manager methods to List/Count Workflows.
In the initial handler of the query in visibility_store.go, the query converter is passed the chasm VisibilitySearchAttributesMapper and the archetypeID. When resolving query parameters, the order of resolving aliases to field names is Custom -> CHASM -> System/Predefined attribute.
Removes chasm transient dependency on persistence package
Fixes visibility task executor to emit Warning Log instead of returning error when custom search attribute is not found. (Edge case when search attributes get deregistered between task creation and execution)
Why?
Required to support CHASM Visibility and Component authors.
How did you test it?