Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions service/history/replication/raw_task_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,13 @@ func (c *syncVersionedTransitionTaskConverter) convert(
return nil, err
}
currentHistoryCopy := versionhistory.CopyVersionHistory(currentHistory)

// Extract data from mutable state before releasing the lock
closeTransferTask := &tasks.CloseExecutionTask{
WorkflowKey: mutableState.GetWorkflowKey(),
TaskID: executionInfo.GetCloseTransferTaskId(),
}

var syncStateResult *SyncStateResult
if taskInfo.IsFirstTask {
syncStateResult, err = c.syncStateRetriever.GetSyncWorkflowStateArtifactFromMutableStateForNewWorkflow(
Expand Down Expand Up @@ -710,9 +717,13 @@ func (c *syncVersionedTransitionTaskConverter) convert(
return nil, err
}

syncStateResult.VersionedTransitionArtifact.IsCloseTransferTaskAcked = c.isCloseTransferTaskAcked(mutableState)
// WARNING: do not access mutable state after this point. If you are using mutable state in this function, be warned that the
// releaseFunc that is being passed into this function is what is used to release the lock we are holding on mutable state. If
// you use mutable state after the releaseFunc has been called, you will be accessing mutable state without holding the lock.
// Deep copy what you need.

syncStateResult.VersionedTransitionArtifact.IsCloseTransferTaskAcked = c.isCloseTransferTaskAcked(closeTransferTask)
syncStateResult.VersionedTransitionArtifact.IsForceReplication = taskInfo.IsForceReplication
// do not access mutable state after this point

err = c.replicationCache.Update(taskInfo.RunID, targetClusterID, syncStateResult.VersionedTransitionHistory, currentHistoryCopy.Items)
if err != nil {
Expand Down Expand Up @@ -852,11 +863,9 @@ func (c *syncVersionedTransitionTaskConverter) generateBackfillHistoryTask(
}

func (c *syncVersionedTransitionTaskConverter) isCloseTransferTaskAcked(
mutableState historyi.MutableState,
closeTransferTask *tasks.CloseExecutionTask,
) bool {
closeTransferTaskID := mutableState.GetExecutionInfo().CloseTransferTaskId

if closeTransferTaskID == 0 {
if closeTransferTask.TaskID == 0 {
return false
}

Expand All @@ -865,11 +874,6 @@ func (c *syncVersionedTransitionTaskConverter) isCloseTransferTaskAcked(
return false
}

closeTransferTask := &tasks.CloseExecutionTask{
WorkflowKey: mutableState.GetWorkflowKey(),
TaskID: closeTransferTaskID,
}

return queues.IsTaskAcked(closeTransferTask, transferQueueState)
}

Expand Down
83 changes: 49 additions & 34 deletions service/history/replication/raw_task_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,10 +1455,16 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_Mutation(
).Return(s.workflowContext, s.releaseFn, nil)
s.workflowContext.EXPECT().LoadMutableState(gomock.Any(), s.shardContext).Return(s.mutableState, nil).Times(1)
s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
VersionHistories: versionHistories,
TransitionHistory: transitionHistory,
}).Times(3)
VersionHistories: versionHistories,
TransitionHistory: transitionHistory,
CloseTransferTaskId: 0,
}).Times(2)
s.mutableState.EXPECT().HasBufferedEvents().Return(false).Times(1)
s.mutableState.EXPECT().GetWorkflowKey().Return(definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.workflowID,
RunID: s.runID,
}).Times(1)

s.progressCache.EXPECT().Get(
s.runID,
Expand Down Expand Up @@ -1582,10 +1588,16 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_FirstTask
).Return(s.workflowContext, s.releaseFn, nil)
s.workflowContext.EXPECT().LoadMutableState(gomock.Any(), s.shardContext).Return(s.mutableState, nil).Times(1)
s.mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
VersionHistories: versionHistories,
TransitionHistory: transitionHistory,
}).Times(3)
VersionHistories: versionHistories,
TransitionHistory: transitionHistory,
CloseTransferTaskId: 0,
}).Times(2)
s.mutableState.EXPECT().HasBufferedEvents().Return(false).Times(1)
s.mutableState.EXPECT().GetWorkflowKey().Return(definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.workflowID,
RunID: s.runID,
}).Times(1)

s.progressCache.EXPECT().Get(
s.runID,
Expand Down Expand Up @@ -1704,28 +1716,37 @@ func (s *rawTaskConverterSuite) TestConvertSyncVersionedTransitionTask_HasBuffer
}

func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_ZeroTaskId() {
mu := historyi.NewMockMutableState(s.controller)
executionInfo := &persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: 0,
testCloseTaskID := int64(0)
workflowKey := definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.workflowID,
RunID: s.runID,
}
closeTransferTask := &tasks.CloseExecutionTask{
WorkflowKey: workflowKey,
TaskID: testCloseTaskID,
}
mu.EXPECT().GetExecutionInfo().Return(executionInfo)

converter := newSyncVersionedTransitionTaskConverter(s.shardContext, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)
result := converter.isCloseTransferTaskAcked(mu)
result := converter.isCloseTransferTaskAcked(closeTransferTask)
s.False(result)
}

func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_QueueStateNotAvailable() {
mu := historyi.NewMockMutableState(s.controller)
testCloseTaskID := int64(12345)
executionInfo := &persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: testCloseTaskID,
workflowKey := definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.workflowID,
RunID: s.runID,
}
closeTransferTask := &tasks.CloseExecutionTask{
WorkflowKey: workflowKey,
TaskID: testCloseTaskID,
}
mu.EXPECT().GetExecutionInfo().Return(executionInfo)

// Queue state not set, so should return false
converter := newSyncVersionedTransitionTaskConverter(s.shardContext, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)
result := converter.isCloseTransferTaskAcked(mu)
result := converter.isCloseTransferTaskAcked(closeTransferTask)
s.False(result)
}

Expand Down Expand Up @@ -1798,19 +1819,17 @@ func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_TaskAcked() {

converter := newSyncVersionedTransitionTaskConverter(mockShard, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)

mu := historyi.NewMockMutableState(s.controller)
workflowKey := definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.workflowID,
RunID: s.runID,
}
executionInfo := &persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: testCloseTaskID,
closeTransferTask := &tasks.CloseExecutionTask{
WorkflowKey: workflowKey,
TaskID: testCloseTaskID,
}
mu.EXPECT().GetExecutionInfo().Return(executionInfo)
mu.EXPECT().GetWorkflowKey().Return(workflowKey)

result := converter.isCloseTransferTaskAcked(mu)
result := converter.isCloseTransferTaskAcked(closeTransferTask)
s.True(result)
}

Expand Down Expand Up @@ -1842,19 +1861,17 @@ func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_TaskNotAcked() {

converter := newSyncVersionedTransitionTaskConverter(mockShard, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)

mu := historyi.NewMockMutableState(s.controller)
workflowKey := definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.workflowID,
RunID: s.runID,
}
executionInfo := &persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: testCloseTaskID,
closeTransferTask := &tasks.CloseExecutionTask{
WorkflowKey: workflowKey,
TaskID: testCloseTaskID,
}
mu.EXPECT().GetExecutionInfo().Return(executionInfo)
mu.EXPECT().GetWorkflowKey().Return(workflowKey)

result := converter.isCloseTransferTaskAcked(mu)
result := converter.isCloseTransferTaskAcked(closeTransferTask)
s.False(result)
}

Expand Down Expand Up @@ -1911,18 +1928,16 @@ func (s *rawTaskConverterSuite) TestIsCloseTransferTaskAcked_TaskNotAcked_Contai

converter := newSyncVersionedTransitionTaskConverter(mockShard, s.workflowCache, nil, s.progressCache, s.executionManager, s.syncStateRetriever, s.logger)

mu := historyi.NewMockMutableState(s.controller)
workflowKey := definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.workflowID,
RunID: s.runID,
}
executionInfo := &persistencespb.WorkflowExecutionInfo{
CloseTransferTaskId: testCloseTaskID,
closeTransferTask := &tasks.CloseExecutionTask{
WorkflowKey: workflowKey,
TaskID: testCloseTaskID,
}
mu.EXPECT().GetExecutionInfo().Return(executionInfo)
mu.EXPECT().GetWorkflowKey().Return(workflowKey)

result := converter.isCloseTransferTaskAcked(mu)
result := converter.isCloseTransferTaskAcked(closeTransferTask)
s.False(result)
}
Loading