-
Notifications
You must be signed in to change notification settings - Fork 1.2k
auto enabling priority and fairness #8650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
c778149 to
87ae80e
Compare
method. Even with the channel providing guardrails on initialization, the previous attempt would still cause spurious panic's with newPM being nil by the time we attempt to call the method on it. I didn't take a look at the generated assembly but I assume that even with the channel block there was still a race as to when the address of newPM was copied from the outer scope's stack into the closure's stack. This could likely be avoided by taking the address of the pointer and passing that instead, but started to feel too clever to me.
2bf07fe to
b96901b
Compare
The test code here needs to half initializaton, and then set the defaultQueue itself. With the setting of config moved over to the actual initialization we need to replicate some of this code in the test to keep the same behavior as before.
dnr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the PR title: it should just talk about auto-enabling priority/fairness. the fact that there's a dynamic config to enable the auto-enabling is the least interesting part.
| enum FairnessState { | ||
| FAIRNESS_STATE_UNSPECIFIED = 0; | ||
| FAIRNESS_STATE_V1 = 1; | ||
| FAIRNESS_STATE_V2 = 2; | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I think we usually put enums at the top-level. but I guess this is fine, we don't expect to use it anywhere else.
| if s.fairness { | ||
| prtnMgr.config.NewMatcher = true | ||
| prtnMgr.config.EnableFairness = true | ||
| } else if s.newMatcher { | ||
| prtnMgr.config.NewMatcher = true | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, the stuff on line 70-74 doesn't take care of this? if we do need this, maybe we don't need that?
reading this whole function again... it duplicates a lot of real code to set things up. I wish we could consolidate. but probably not now.
| perType.FairnessState = persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V2 | ||
| return data, true, nil | ||
| } | ||
| _, err := pm.userDataManager.UpdateUserData(ctx, UserDataUpdateOptions{Source: "Matching auto enable"}, updateFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately, we decided to store user data for all task queue types on the root of the workflow task queue. so this local call can only be done on the workflow tq. for activity, we need to do an rpc to the worker tq.
| if params.taskInfo.Priority != nil && (params.taskInfo.Priority.FairnessKey != "" || params.taskInfo.Priority.PriorityKey != int32(0)) { | ||
| updateFn := func(old *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) { | ||
| data := common.CloneProto(old) | ||
| perType := data.GetPerType()[int32(pm.Partition().TaskType())] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you might need to set up the per type map if it's nil here
| perType.FairnessState = persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V2 | ||
| return data, true, nil | ||
| } | ||
| _, err := pm.userDataManager.UpdateUserData(ctx, UserDataUpdateOptions{Source: "Matching auto enable"}, updateFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the rpc, and even for local UpdateUserData, we need some kind of rate limit or something so we don't do it again immediately on the next task, because updating user data is async.
| perType.FairnessState = persistencespb.TaskQueueTypeUserData_FAIRNESS_STATE_V2 | ||
| return data, true, nil | ||
| } | ||
| _, err := pm.userDataManager.UpdateUserData(ctx, UserDataUpdateOptions{Source: "Matching auto enable"}, updateFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, I'd move this whole thing into another function so there's just one line here, to not break the flow of reading the main path of AddTask
And some other misc PR comments
What changed?
Add a new dynamic config for Auto Enabling fairness and priority if we see the relevant tasks coming in.
Why?
Seamlessly start to transition users who start using the fields over to the new code path.
How did you test it?
Potential risks
Due to storing this in the userdata we are using that interface a bit more, we also need to change the initialization such that we start it before being able to substantiate the defaultQ, this change in initialization might have unintended side effects that I'm not currently seeing.
Notes
I've left comments for pieces of code that I am less sure or unsure about, or where things different slightly from our original plan. Leaving as a draft until I can get some new tests for testing this more thoroughly.