diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java index 72e8e3f188b26..8a70a6dce945a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java @@ -178,15 +178,17 @@ private void tryFulfillRequirementsForJobWithPendingResources( Collection unfulfilledRequirements, List availableResources, ResourceAllocationResult.Builder resultBuilder) { - final Collection missingResources = - tryFulfillRequirementsForJobWithResources( - jobId, unfulfilledRequirements, availableResources); - for (ResourceRequirement missingResource : missingResources) { + for (ResourceRequirement missingResource : unfulfilledRequirements) { // for this strategy, all pending resources should have the same default slot resource final ResourceProfile effectiveProfile = getEffectiveResourceProfile( missingResource.getResourceProfile(), defaultSlotResourceProfile); - int numUnfulfilled = missingResource.getNumberOfRequiredSlots(); + int numUnfulfilled = + tryFulfilledRequirementWithResource( + availableResources, + missingResource.getNumberOfRequiredSlots(), + missingResource.getResourceProfile(), + jobId); if (!totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) { // Can not fulfill this resource type will the default worker. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java index 17e06c339f0fd..fea456fe704f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java @@ -95,8 +95,8 @@ public void testFulfillRequirementWithPendingResources() { .setPendingTaskManagersSupplier( () -> Collections.singleton(pendingTaskManager)) .build(); - requirements.add(ResourceRequirement.create(largeResource, 1)); - requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 7)); + requirements.add(ResourceRequirement.create(largeResource, 2)); + requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 4)); final ResourceAllocationResult result = STRATEGY.tryFulfillRequirements( @@ -127,8 +127,8 @@ public void testFulfillRequirementWithPendingResources() { resourceWithCount.getKey(), resourceWithCount.getValue()); } - assertThat(allFulfilledRequirements.getResourceCount(DEFAULT_SLOT_RESOURCE), is(7)); - assertThat(allFulfilledRequirements.getResourceCount(largeResource), is(1)); + assertThat(allFulfilledRequirements.getResourceCount(DEFAULT_SLOT_RESOURCE), is(4)); + assertThat(allFulfilledRequirements.getResourceCount(largeResource), is(2)); } @Test