Skip to content

Commit 327482a

Browse files
committed
[FLINK-22624][runtime] Utilize the remain resource of new pending task managers to fulfill requirement in DefaultResourceAllocationStrategy
1 parent 3d20144 commit 327482a

2 files changed

Lines changed: 11 additions & 9 deletions

File tree

flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,17 @@ private void tryFulfillRequirementsForJobWithPendingResources(
178178
Collection<ResourceRequirement> unfulfilledRequirements,
179179
List<InternalResourceInfo> availableResources,
180180
ResourceAllocationResult.Builder resultBuilder) {
181-
final Collection<ResourceRequirement> missingResources =
182-
tryFulfillRequirementsForJobWithResources(
183-
jobId, unfulfilledRequirements, availableResources);
184-
for (ResourceRequirement missingResource : missingResources) {
181+
for (ResourceRequirement missingResource : unfulfilledRequirements) {
185182
// for this strategy, all pending resources should have the same default slot resource
186183
final ResourceProfile effectiveProfile =
187184
getEffectiveResourceProfile(
188185
missingResource.getResourceProfile(), defaultSlotResourceProfile);
189-
int numUnfulfilled = missingResource.getNumberOfRequiredSlots();
186+
int numUnfulfilled =
187+
tryFulfilledRequirementWithResource(
188+
availableResources,
189+
missingResource.getNumberOfRequiredSlots(),
190+
missingResource.getResourceProfile(),
191+
jobId);
190192

191193
if (!totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
192194
// Can not fulfill this resource type will the default worker.

flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategyTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ public void testFulfillRequirementWithPendingResources() {
9595
.setPendingTaskManagersSupplier(
9696
() -> Collections.singleton(pendingTaskManager))
9797
.build();
98-
requirements.add(ResourceRequirement.create(largeResource, 1));
99-
requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 7));
98+
requirements.add(ResourceRequirement.create(largeResource, 2));
99+
requirements.add(ResourceRequirement.create(ResourceProfile.UNKNOWN, 4));
100100

101101
final ResourceAllocationResult result =
102102
STRATEGY.tryFulfillRequirements(
@@ -127,8 +127,8 @@ public void testFulfillRequirementWithPendingResources() {
127127
resourceWithCount.getKey(), resourceWithCount.getValue());
128128
}
129129

130-
assertThat(allFulfilledRequirements.getResourceCount(DEFAULT_SLOT_RESOURCE), is(7));
131-
assertThat(allFulfilledRequirements.getResourceCount(largeResource), is(1));
130+
assertThat(allFulfilledRequirements.getResourceCount(DEFAULT_SLOT_RESOURCE), is(4));
131+
assertThat(allFulfilledRequirements.getResourceCount(largeResource), is(2));
132132
}
133133

134134
@Test

0 commit comments

Comments
 (0)