diff --git a/service/history/replication/raw_task_converter.go b/service/history/replication/raw_task_converter.go index 07e72cb8485..61a657a4f49 100644 --- a/service/history/replication/raw_task_converter.go +++ b/service/history/replication/raw_task_converter.go @@ -678,6 +678,13 @@ func (c *syncVersionedTransitionTaskConverter) convert( return nil, err } currentHistoryCopy := versionhistory.CopyVersionHistory(currentHistory) + + // Extract data from mutable state before releasing the lock + closeTransferTask := &tasks.CloseExecutionTask{ + WorkflowKey: mutableState.GetWorkflowKey(), + TaskID: executionInfo.GetCloseTransferTaskId(), + } + var syncStateResult *SyncStateResult if taskInfo.IsFirstTask { syncStateResult, err = c.syncStateRetriever.GetSyncWorkflowStateArtifactFromMutableStateForNewWorkflow( @@ -710,9 +717,13 @@ func (c *syncVersionedTransitionTaskConverter) convert( return nil, err } - syncStateResult.VersionedTransitionArtifact.IsCloseTransferTaskAcked = c.isCloseTransferTaskAcked(mutableState) + // WARNING: do not access mutable state after this point. If you are using mutable state in this function, be warned that the + // releaseFunc that is being passed into this function is what is used to release the lock we are holding on mutable state. If + // you use mutable state after the releaseFunc has been called, you will be accessing mutable state without holding the lock. + // Deep copy what you need. + + syncStateResult.VersionedTransitionArtifact.IsCloseTransferTaskAcked = c.isCloseTransferTaskAcked(closeTransferTask) syncStateResult.VersionedTransitionArtifact.IsForceReplication = taskInfo.IsForceReplication - // do not access mutable state after this point err = c.replicationCache.Update(taskInfo.RunID, targetClusterID, syncStateResult.VersionedTransitionHistory, currentHistoryCopy.Items) if err != nil { @@ -852,11 +863,9 @@ func (c *syncVersionedTransitionTaskConverter) generateBackfillHistoryTask( } func (c *syncVersionedTransitionTaskConverter) isCloseTransferTaskAcked( - mutableState historyi.MutableState, + closeTransferTask *tasks.CloseExecutionTask, ) bool { - closeTransferTaskID := mutableState.GetExecutionInfo().CloseTransferTaskId - - if closeTransferTaskID == 0 { + if closeTransferTask.TaskID == 0 { return false } @@ -865,11 +874,6 @@ func (c *syncVersionedTransitionTaskConverter) isCloseTransferTaskAcked( return false } - closeTransferTask := &tasks.CloseExecutionTask{ - WorkflowKey: mutableState.GetWorkflowKey(), - TaskID: closeTransferTaskID, - } - return queues.IsTaskAcked(closeTransferTask, transferQueueState) } diff --git a/service/history/replication/raw_task_converter_test.go b/service/history/replication/raw_task_converter_test.go index 0da636b3bc5..10702be87ed 100644 --- a/service/history/replication/raw_task_converter_test.go +++ b/service/history/replication/raw_task_converter_test.go @@ -1455,10 +1455,16 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_Mutation( ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any(), s.shardContext).Return(s.mutableState, nil).Times(1) s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - VersionHistories: versionHistories, - TransitionHistory: transitionHistory, - }).Times(3) + VersionHistories: versionHistories, + TransitionHistory: transitionHistory, + CloseTransferTaskId: 0, + }).Times(2) s.mutableState.EXPECT().HasBufferedEvents().Return(false).Times(1) + s.mutableState.EXPECT().GetWorkflowKey().Return(definition.WorkflowKey{ + NamespaceID: s.namespaceID, + WorkflowID: s.workflowID, + RunID: s.runID, + }).Times(1) s.progressCache.EXPECT().Get( s.runID, @@ -1582,10 +1588,16 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_FirstTask ).Return(s.workflowContext, s.releaseFn, nil) s.workflowContext.EXPECT().LoadMutableState(gomock.Any(), s.shardContext).Return(s.mutableState, nil).Times(1) s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - VersionHistories: versionHistories, - TransitionHistory: transitionHistory, - }).Times(3) + VersionHistories: versionHistories, + TransitionHistory: transitionHistory, + CloseTransferTaskId: 0, + }).Times(2) s.mutableState.EXPECT().HasBufferedEvents().Return(false).Times(1) + s.mutableState.EXPECT().GetWorkflowKey().Return(definition.WorkflowKey{ + NamespaceID: s.namespaceID, + WorkflowID: s.workflowID, + RunID: s.runID, + }).Times(1) s.progressCache.EXPECT().Get( s.runID, @@ -1704,28 +1716,37 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_HasBuffer } func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_ZeroTaskId() { - mu := historyi.NewMockMutableState(s.controller) - executionInfo := &persistencespb.WorkflowExecutionInfo{ - CloseTransferTaskId: 0, + testCloseTaskID := int64(0) + workflowKey := definition.WorkflowKey{ + NamespaceID: s.namespaceID, + WorkflowID: s.workflowID, + RunID: s.runID, + } + closeTransferTask := &tasks.CloseExecutionTask{ + WorkflowKey: workflowKey, + TaskID: testCloseTaskID, } - mu.EXPECT().GetExecutionInfo().Return(executionInfo) converter := newSyncVersionedTransitionTaskConverter(s.shardContext, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger) - result := converter.isCloseTransferTaskAcked(mu) + result := converter.isCloseTransferTaskAcked(closeTransferTask) s.False(result) } func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_QueueStateNotAvailable() { - mu := historyi.NewMockMutableState(s.controller) testCloseTaskID := int64(12345) - executionInfo := &persistencespb.WorkflowExecutionInfo{ - CloseTransferTaskId: testCloseTaskID, + workflowKey := definition.WorkflowKey{ + NamespaceID: s.namespaceID, + WorkflowID: s.workflowID, + RunID: s.runID, + } + closeTransferTask := &tasks.CloseExecutionTask{ + WorkflowKey: workflowKey, + TaskID: testCloseTaskID, } - mu.EXPECT().GetExecutionInfo().Return(executionInfo) // Queue state not set, so should return false converter := newSyncVersionedTransitionTaskConverter(s.shardContext, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger) - result := converter.isCloseTransferTaskAcked(mu) + result := converter.isCloseTransferTaskAcked(closeTransferTask) s.False(result) } @@ -1798,19 +1819,17 @@ func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_TaskAcked() { converter := newSyncVersionedTransitionTaskConverter(mockShard, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger) - mu := historyi.NewMockMutableState(s.controller) workflowKey := definition.WorkflowKey{ NamespaceID: s.namespaceID, WorkflowID: s.workflowID, RunID: s.runID, } - executionInfo := &persistencespb.WorkflowExecutionInfo{ - CloseTransferTaskId: testCloseTaskID, + closeTransferTask := &tasks.CloseExecutionTask{ + WorkflowKey: workflowKey, + TaskID: testCloseTaskID, } - mu.EXPECT().GetExecutionInfo().Return(executionInfo) - mu.EXPECT().GetWorkflowKey().Return(workflowKey) - result := converter.isCloseTransferTaskAcked(mu) + result := converter.isCloseTransferTaskAcked(closeTransferTask) s.True(result) } @@ -1842,19 +1861,17 @@ func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_TaskNotAcked() { converter := newSyncVersionedTransitionTaskConverter(mockShard, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger) - mu := historyi.NewMockMutableState(s.controller) workflowKey := definition.WorkflowKey{ NamespaceID: s.namespaceID, WorkflowID: s.workflowID, RunID: s.runID, } - executionInfo := &persistencespb.WorkflowExecutionInfo{ - CloseTransferTaskId: testCloseTaskID, + closeTransferTask := &tasks.CloseExecutionTask{ + WorkflowKey: workflowKey, + TaskID: testCloseTaskID, } - mu.EXPECT().GetExecutionInfo().Return(executionInfo) - mu.EXPECT().GetWorkflowKey().Return(workflowKey) - result := converter.isCloseTransferTaskAcked(mu) + result := converter.isCloseTransferTaskAcked(closeTransferTask) s.False(result) } @@ -1911,18 +1928,16 @@ func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_TaskNotAcked_Contai converter := newSyncVersionedTransitionTaskConverter(mockShard, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger) - mu := historyi.NewMockMutableState(s.controller) workflowKey := definition.WorkflowKey{ NamespaceID: s.namespaceID, WorkflowID: s.workflowID, RunID: s.runID, } - executionInfo := &persistencespb.WorkflowExecutionInfo{ - CloseTransferTaskId: testCloseTaskID, + closeTransferTask := &tasks.CloseExecutionTask{ + WorkflowKey: workflowKey, + TaskID: testCloseTaskID, } - mu.EXPECT().GetExecutionInfo().Return(executionInfo) - mu.EXPECT().GetWorkflowKey().Return(workflowKey) - result := converter.isCloseTransferTaskAcked(mu) + result := converter.isCloseTransferTaskAcked(closeTransferTask) s.False(result) }