Skip to content

Commit 6086881

Browse files
michaely520chaptersix
authored andcommitted
Fix race by extracting out CloseTransferTask while holding lock (#8548)
## What changed? Fixing data race by extracting out close transfer task information prior to release lock. Added more context to comment. ## Why? Bug ## How did you test it? - [x] built - [x] run locally and tested manually - [x] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s)
1 parent d7f33e1 commit 6086881

File tree

2 files changed

+64
-45
lines changed

2 files changed

+64
-45
lines changed

service/history/replication/raw_task_converter.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,13 @@ func (c *syncVersionedTransitionTaskConverter) convert(
678678
return nil, err
679679
}
680680
currentHistoryCopy := versionhistory.CopyVersionHistory(currentHistory)
681+
682+
// Extract data from mutable state before releasing the lock
683+
closeTransferTask := &tasks.CloseExecutionTask{
684+
WorkflowKey: mutableState.GetWorkflowKey(),
685+
TaskID: executionInfo.GetCloseTransferTaskId(),
686+
}
687+
681688
var syncStateResult *SyncStateResult
682689
if taskInfo.IsFirstTask {
683690
syncStateResult, err = c.syncStateRetriever.GetSyncWorkflowStateArtifactFromMutableStateForNewWorkflow(
@@ -710,9 +717,13 @@ func (c *syncVersionedTransitionTaskConverter) convert(
710717
return nil, err
711718
}
712719

713-
syncStateResult.VersionedTransitionArtifact.IsCloseTransferTaskAcked = c.isCloseTransferTaskAcked(mutableState)
720+
// WARNING: do not access mutable state after this point. If you are using mutable state in this function, be warned that the
721+
// releaseFunc that is being passed into this function is what is used to release the lock we are holding on mutable state. If
722+
// you use mutable state after the releaseFunc has been called, you will be accessing mutable state without holding the lock.
723+
// Deep copy what you need.
724+
725+
syncStateResult.VersionedTransitionArtifact.IsCloseTransferTaskAcked = c.isCloseTransferTaskAcked(closeTransferTask)
714726
syncStateResult.VersionedTransitionArtifact.IsForceReplication = taskInfo.IsForceReplication
715-
// do not access mutable state after this point
716727

717728
err = c.replicationCache.Update(taskInfo.RunID, targetClusterID, syncStateResult.VersionedTransitionHistory, currentHistoryCopy.Items)
718729
if err != nil {
@@ -852,11 +863,9 @@ func (c *syncVersionedTransitionTaskConverter) generateBackfillHistoryTask(
852863
}
853864

854865
func (c *syncVersionedTransitionTaskConverter) isCloseTransferTaskAcked(
855-
mutableState historyi.MutableState,
866+
closeTransferTask *tasks.CloseExecutionTask,
856867
) bool {
857-
closeTransferTaskID := mutableState.GetExecutionInfo().CloseTransferTaskId
858-
859-
if closeTransferTaskID == 0 {
868+
if closeTransferTask.TaskID == 0 {
860869
return false
861870
}
862871

@@ -865,11 +874,6 @@ func (c *syncVersionedTransitionTaskConverter) isCloseTransferTaskAcked(
865874
return false
866875
}
867876

868-
closeTransferTask := &tasks.CloseExecutionTask{
869-
WorkflowKey: mutableState.GetWorkflowKey(),
870-
TaskID: closeTransferTaskID,
871-
}
872-
873877
return queues.IsTaskAcked(closeTransferTask, transferQueueState)
874878
}
875879

service/history/replication/raw_task_converter_test.go

Lines changed: 49 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,10 +1455,16 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_Mutation(
14551455
).Return(s.workflowContext, s.releaseFn, nil)
14561456
s.workflowContext.EXPECT().LoadMutableState(gomock.Any(), s.shardContext).Return(s.mutableState, nil).Times(1)
14571457
s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
1458-
VersionHistories: versionHistories,
1459-
TransitionHistory: transitionHistory,
1460-
}).Times(3)
1458+
VersionHistories: versionHistories,
1459+
TransitionHistory: transitionHistory,
1460+
CloseTransferTaskId: 0,
1461+
}).Times(2)
14611462
s.mutableState.EXPECT().HasBufferedEvents().Return(false).Times(1)
1463+
s.mutableState.EXPECT().GetWorkflowKey().Return(definition.WorkflowKey{
1464+
NamespaceID: s.namespaceID,
1465+
WorkflowID: s.workflowID,
1466+
RunID: s.runID,
1467+
}).Times(1)
14621468

14631469
s.progressCache.EXPECT().Get(
14641470
s.runID,
@@ -1582,10 +1588,16 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_FirstTask
15821588
).Return(s.workflowContext, s.releaseFn, nil)
15831589
s.workflowContext.EXPECT().LoadMutableState(gomock.Any(), s.shardContext).Return(s.mutableState, nil).Times(1)
15841590
s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
1585-
VersionHistories: versionHistories,
1586-
TransitionHistory: transitionHistory,
1587-
}).Times(3)
1591+
VersionHistories: versionHistories,
1592+
TransitionHistory: transitionHistory,
1593+
CloseTransferTaskId: 0,
1594+
}).Times(2)
15881595
s.mutableState.EXPECT().HasBufferedEvents().Return(false).Times(1)
1596+
s.mutableState.EXPECT().GetWorkflowKey().Return(definition.WorkflowKey{
1597+
NamespaceID: s.namespaceID,
1598+
WorkflowID: s.workflowID,
1599+
RunID: s.runID,
1600+
}).Times(1)
15891601

15901602
s.progressCache.EXPECT().Get(
15911603
s.runID,
@@ -1704,28 +1716,37 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_HasBuffer
17041716
}
17051717

17061718
func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_ZeroTaskId() {
1707-
mu := historyi.NewMockMutableState(s.controller)
1708-
executionInfo := &persistencespb.WorkflowExecutionInfo{
1709-
CloseTransferTaskId: 0,
1719+
testCloseTaskID := int64(0)
1720+
workflowKey := definition.WorkflowKey{
1721+
NamespaceID: s.namespaceID,
1722+
WorkflowID: s.workflowID,
1723+
RunID: s.runID,
1724+
}
1725+
closeTransferTask := &tasks.CloseExecutionTask{
1726+
WorkflowKey: workflowKey,
1727+
TaskID: testCloseTaskID,
17101728
}
1711-
mu.EXPECT().GetExecutionInfo().Return(executionInfo)
17121729

17131730
converter := newSyncVersionedTransitionTaskConverter(s.shardContext, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)
1714-
result := converter.isCloseTransferTaskAcked(mu)
1731+
result := converter.isCloseTransferTaskAcked(closeTransferTask)
17151732
s.False(result)
17161733
}
17171734

17181735
func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_QueueStateNotAvailable() {
1719-
mu := historyi.NewMockMutableState(s.controller)
17201736
testCloseTaskID := int64(12345)
1721-
executionInfo := &persistencespb.WorkflowExecutionInfo{
1722-
CloseTransferTaskId: testCloseTaskID,
1737+
workflowKey := definition.WorkflowKey{
1738+
NamespaceID: s.namespaceID,
1739+
WorkflowID: s.workflowID,
1740+
RunID: s.runID,
1741+
}
1742+
closeTransferTask := &tasks.CloseExecutionTask{
1743+
WorkflowKey: workflowKey,
1744+
TaskID: testCloseTaskID,
17231745
}
1724-
mu.EXPECT().GetExecutionInfo().Return(executionInfo)
17251746

17261747
// Queue state not set, so should return false
17271748
converter := newSyncVersionedTransitionTaskConverter(s.shardContext, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)
1728-
result := converter.isCloseTransferTaskAcked(mu)
1749+
result := converter.isCloseTransferTaskAcked(closeTransferTask)
17291750
s.False(result)
17301751
}
17311752

@@ -1798,19 +1819,17 @@ func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_TaskAcked() {
17981819

17991820
converter := newSyncVersionedTransitionTaskConverter(mockShard, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)
18001821

1801-
mu := historyi.NewMockMutableState(s.controller)
18021822
workflowKey := definition.WorkflowKey{
18031823
NamespaceID: s.namespaceID,
18041824
WorkflowID: s.workflowID,
18051825
RunID: s.runID,
18061826
}
1807-
executionInfo := &persistencespb.WorkflowExecutionInfo{
1808-
CloseTransferTaskId: testCloseTaskID,
1827+
closeTransferTask := &tasks.CloseExecutionTask{
1828+
WorkflowKey: workflowKey,
1829+
TaskID: testCloseTaskID,
18091830
}
1810-
mu.EXPECT().GetExecutionInfo().Return(executionInfo)
1811-
mu.EXPECT().GetWorkflowKey().Return(workflowKey)
18121831

1813-
result := converter.isCloseTransferTaskAcked(mu)
1832+
result := converter.isCloseTransferTaskAcked(closeTransferTask)
18141833
s.True(result)
18151834
}
18161835

@@ -1842,19 +1861,17 @@ func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_TaskNotAcked() {
18421861

18431862
converter := newSyncVersionedTransitionTaskConverter(mockShard, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)
18441863

1845-
mu := historyi.NewMockMutableState(s.controller)
18461864
workflowKey := definition.WorkflowKey{
18471865
NamespaceID: s.namespaceID,
18481866
WorkflowID: s.workflowID,
18491867
RunID: s.runID,
18501868
}
1851-
executionInfo := &persistencespb.WorkflowExecutionInfo{
1852-
CloseTransferTaskId: testCloseTaskID,
1869+
closeTransferTask := &tasks.CloseExecutionTask{
1870+
WorkflowKey: workflowKey,
1871+
TaskID: testCloseTaskID,
18531872
}
1854-
mu.EXPECT().GetExecutionInfo().Return(executionInfo)
1855-
mu.EXPECT().GetWorkflowKey().Return(workflowKey)
18561873

1857-
result := converter.isCloseTransferTaskAcked(mu)
1874+
result := converter.isCloseTransferTaskAcked(closeTransferTask)
18581875
s.False(result)
18591876
}
18601877

@@ -1911,18 +1928,16 @@ func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_TaskNotAcked_Contai
19111928

19121929
converter := newSyncVersionedTransitionTaskConverter(mockShard, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)
19131930

1914-
mu := historyi.NewMockMutableState(s.controller)
19151931
workflowKey := definition.WorkflowKey{
19161932
NamespaceID: s.namespaceID,
19171933
WorkflowID: s.workflowID,
19181934
RunID: s.runID,
19191935
}
1920-
executionInfo := &persistencespb.WorkflowExecutionInfo{
1921-
CloseTransferTaskId: testCloseTaskID,
1936+
closeTransferTask := &tasks.CloseExecutionTask{
1937+
WorkflowKey: workflowKey,
1938+
TaskID: testCloseTaskID,
19221939
}
1923-
mu.EXPECT().GetExecutionInfo().Return(executionInfo)
1924-
mu.EXPECT().GetWorkflowKey().Return(workflowKey)
19251940

1926-
result := converter.isCloseTransferTaskAcked(mu)
1941+
result := converter.isCloseTransferTaskAcked(closeTransferTask)
19271942
s.False(result)
19281943
}

0 commit comments

Comments
 (0)