diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
index ee2c24bc5139b..72e8e3f188b26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java
@@ -19,16 +19,15 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.slots.ResourceRequirement;
-import org.apache.flink.runtime.util.ResourceCounter;
-import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.Optional;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils.getEffectiveResourceProfile;
@@ -48,8 +47,9 @@
* exist.
*
*
Note: The current implementation of this strategy is non-optimal, in terms of computation
- * efficiency. In the worst case, for each requirement it checks all registered and pending
- * resources. TODO: This will be optimized in FLINK-21174.
+ * efficiency. In the worst case, for each distinctly profiled requirement it checks all registered
+ * and pending resources. Further optimization requires complex data structures for ordering
+ * multi-dimensional resource profiles. The complexity is not necessary.
*/
public class DefaultResourceAllocationStrategy implements ResourceAllocationStrategy {
private final ResourceProfile defaultSlotResourceProfile;
@@ -71,24 +71,18 @@ public ResourceAllocationResult tryFulfillRequirements(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
final ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();
- // Tuples of available and default slot resource for registered task managers, indexed by
- // instanceId
- final Map> registeredResources =
- getRegisteredResources(taskManagerResourceInfoProvider);
- // Available resources of pending task managers, indexed by the pendingTaskManagerId
- final Map pendingResources =
- getPendingResources(taskManagerResourceInfoProvider);
+ final List registeredResources =
+ getRegisteredResources(taskManagerResourceInfoProvider, resultBuilder);
+ final List pendingResources =
+ getPendingResources(taskManagerResourceInfoProvider, resultBuilder);
for (Map.Entry> resourceRequirements :
missingResources.entrySet()) {
final JobID jobId = resourceRequirements.getKey();
- final ResourceCounter unfulfilledJobRequirements =
- tryFulfillRequirementsForJobWithRegisteredResources(
- jobId,
- resourceRequirements.getValue(),
- registeredResources,
- resultBuilder);
+ final Collection unfulfilledJobRequirements =
+ tryFulfillRequirementsForJobWithResources(
+ jobId, resourceRequirements.getValue(), registeredResources);
if (!unfulfilledJobRequirements.isEmpty()) {
tryFulfillRequirementsForJobWithPendingResources(
@@ -98,160 +92,161 @@ public ResourceAllocationResult tryFulfillRequirements(
return resultBuilder.build();
}
- private static Map> getRegisteredResources(
- TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+ private static List getRegisteredResources(
+ TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
+ ResourceAllocationResult.Builder resultBuilder) {
return taskManagerResourceInfoProvider.getRegisteredTaskManagers().stream()
- .collect(
- Collectors.toMap(
- TaskManagerInfo::getInstanceId,
- taskManager ->
- Tuple2.of(
- taskManager.getAvailableResource(),
- taskManager.getDefaultSlotResourceProfile())));
+ .map(
+ taskManager ->
+ new InternalResourceInfo(
+ taskManager.getDefaultSlotResourceProfile(),
+ taskManager.getAvailableResource(),
+ (jobId, slotProfile) ->
+ resultBuilder.addAllocationOnRegisteredResource(
+ jobId,
+ taskManager.getInstanceId(),
+ slotProfile)))
+ .collect(Collectors.toList());
}
- private static Map getPendingResources(
- TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
+ private static List getPendingResources(
+ TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
+ ResourceAllocationResult.Builder resultBuilder) {
return taskManagerResourceInfoProvider.getPendingTaskManagers().stream()
- .collect(
- Collectors.toMap(
- PendingTaskManager::getPendingTaskManagerId,
- PendingTaskManager::getTotalResourceProfile));
+ .map(
+ pendingTaskManager ->
+ new InternalResourceInfo(
+ pendingTaskManager.getDefaultSlotResourceProfile(),
+ pendingTaskManager.getTotalResourceProfile(),
+ (jobId, slotProfile) ->
+ resultBuilder.addAllocationOnPendingResource(
+ jobId,
+ pendingTaskManager
+ .getPendingTaskManagerId(),
+ slotProfile)))
+ .collect(Collectors.toList());
}
- private static ResourceCounter tryFulfillRequirementsForJobWithRegisteredResources(
+ private static int tryFulfilledRequirementWithResource(
+ List internalResource,
+ int numUnfulfilled,
+ ResourceProfile requiredResource,
+ JobID jobId) {
+ final Iterator internalResourceInfoItr = internalResource.iterator();
+ while (numUnfulfilled > 0 && internalResourceInfoItr.hasNext()) {
+ final InternalResourceInfo currentTaskManager = internalResourceInfoItr.next();
+ while (numUnfulfilled > 0
+ && currentTaskManager.tryAllocateSlotForJob(jobId, requiredResource)) {
+ numUnfulfilled--;
+ }
+ if (currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
+ internalResourceInfoItr.remove();
+ }
+ }
+ return numUnfulfilled;
+ }
+
+ private static Collection tryFulfillRequirementsForJobWithResources(
JobID jobId,
Collection missingResources,
- Map> registeredResources,
- ResourceAllocationResult.Builder resultBuilder) {
- ResourceCounter outstandingRequirements = ResourceCounter.empty();
+ List registeredResources) {
+ Collection outstandingRequirements = new ArrayList<>();
for (ResourceRequirement resourceRequirement : missingResources) {
int numMissingRequirements =
- tryFindSlotsForRequirement(
- jobId, resourceRequirement, registeredResources, resultBuilder);
+ tryFulfilledRequirementWithResource(
+ registeredResources,
+ resourceRequirement.getNumberOfRequiredSlots(),
+ resourceRequirement.getResourceProfile(),
+ jobId);
if (numMissingRequirements > 0) {
- outstandingRequirements =
- outstandingRequirements.add(
- resourceRequirement.getResourceProfile(), numMissingRequirements);
+ outstandingRequirements.add(
+ ResourceRequirement.create(
+ resourceRequirement.getResourceProfile(), numMissingRequirements));
}
}
return outstandingRequirements;
}
- private static int tryFindSlotsForRequirement(
- JobID jobId,
- ResourceRequirement resourceRequirement,
- Map> registeredResources,
- ResourceAllocationResult.Builder resultBuilder) {
- final ResourceProfile requiredResource = resourceRequirement.getResourceProfile();
-
- int numUnfulfilled = resourceRequirement.getNumberOfRequiredSlots();
- while (numUnfulfilled > 0) {
- final Optional matchedTaskManager =
- findMatchingTaskManager(requiredResource, registeredResources);
-
- if (!matchedTaskManager.isPresent()) {
- // exit loop early; we won't find a matching slot for this requirement
- break;
- }
-
- final ResourceProfile effectiveProfile =
- getEffectiveResourceProfile(
- requiredResource, registeredResources.get(matchedTaskManager.get()).f1);
- resultBuilder.addAllocationOnRegisteredResource(
- jobId, matchedTaskManager.get(), effectiveProfile);
- deductionRegisteredResource(
- registeredResources, matchedTaskManager.get(), effectiveProfile);
- numUnfulfilled--;
- }
- return numUnfulfilled;
- }
-
- private static Optional findMatchingTaskManager(
- ResourceProfile requirement,
- Map> registeredResources) {
- return registeredResources.entrySet().stream()
- .filter(
- taskManager ->
- canFulfillRequirement(
- getEffectiveResourceProfile(
- requirement, taskManager.getValue().f1),
- taskManager.getValue().f0))
- .findAny()
- .map(Map.Entry::getKey);
- }
-
private static boolean canFulfillRequirement(
ResourceProfile requirement, ResourceProfile resourceProfile) {
return resourceProfile.allFieldsNoLessThan(requirement);
}
- private static void deductionRegisteredResource(
- Map> registeredResources,
- InstanceID instanceId,
- ResourceProfile resourceProfile) {
- registeredResources.compute(
- instanceId,
- (id, tuple2) -> {
- Preconditions.checkNotNull(tuple2);
- if (tuple2.f0.subtract(resourceProfile).equals(ResourceProfile.ZERO)) {
- return null;
- } else {
- return Tuple2.of(tuple2.f0.subtract(resourceProfile), tuple2.f1);
- }
- });
- }
-
- private static Optional findPendingManagerToFulfill(
- ResourceProfile resourceProfile,
- Map availableResources) {
- return availableResources.entrySet().stream()
- .filter(entry -> entry.getValue().allFieldsNoLessThan(resourceProfile))
- .findAny()
- .map(Map.Entry::getKey);
- }
-
private void tryFulfillRequirementsForJobWithPendingResources(
JobID jobId,
- ResourceCounter unfulfilledRequirements,
- Map availableResources,
+ Collection unfulfilledRequirements,
+ List availableResources,
ResourceAllocationResult.Builder resultBuilder) {
- for (Map.Entry missingResource :
- unfulfilledRequirements.getResourcesWithCount()) {
+ final Collection missingResources =
+ tryFulfillRequirementsForJobWithResources(
+ jobId, unfulfilledRequirements, availableResources);
+ for (ResourceRequirement missingResource : missingResources) {
// for this strategy, all pending resources should have the same default slot resource
final ResourceProfile effectiveProfile =
getEffectiveResourceProfile(
- missingResource.getKey(), defaultSlotResourceProfile);
- for (int i = 0; i < missingResource.getValue(); i++) {
- Optional matchedPendingTaskManager =
- findPendingManagerToFulfill(effectiveProfile, availableResources);
- if (matchedPendingTaskManager.isPresent()) {
- availableResources.compute(
- matchedPendingTaskManager.get(),
- ((pendingTaskManagerId, resourceProfile) ->
- Preconditions.checkNotNull(resourceProfile)
- .subtract(effectiveProfile)));
+ missingResource.getResourceProfile(), defaultSlotResourceProfile);
+ int numUnfulfilled = missingResource.getNumberOfRequiredSlots();
+
+ if (!totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
+ // Can not fulfill this resource type will the default worker.
+ resultBuilder.addUnfulfillableJob(jobId);
+ continue;
+ }
+
+ while (numUnfulfilled > 0) {
+ // Circularly add new pending task manager
+ final PendingTaskManager newPendingTaskManager =
+ new PendingTaskManager(totalResourceProfile, numSlotsPerWorker);
+ resultBuilder.addPendingTaskManagerAllocate(newPendingTaskManager);
+ ResourceProfile remainResource = totalResourceProfile;
+ while (numUnfulfilled > 0
+ && canFulfillRequirement(effectiveProfile, remainResource)) {
+ numUnfulfilled--;
resultBuilder.addAllocationOnPendingResource(
- jobId, matchedPendingTaskManager.get(), effectiveProfile);
- } else {
- if (totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
- // Add new pending task manager
- final PendingTaskManager pendingTaskManager =
- new PendingTaskManager(totalResourceProfile, numSlotsPerWorker);
- resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
- resultBuilder.addAllocationOnPendingResource(
- jobId,
- pendingTaskManager.getPendingTaskManagerId(),
- effectiveProfile);
- availableResources.put(
- pendingTaskManager.getPendingTaskManagerId(),
- totalResourceProfile.subtract(effectiveProfile));
- } else {
- resultBuilder.addUnfulfillableJob(jobId);
- break;
- }
+ jobId,
+ newPendingTaskManager.getPendingTaskManagerId(),
+ effectiveProfile);
+ remainResource = remainResource.subtract(effectiveProfile);
}
+ if (!remainResource.equals(ResourceProfile.ZERO)) {
+ availableResources.add(
+ new InternalResourceInfo(
+ defaultSlotResourceProfile,
+ remainResource,
+ (jobID, slotProfile) ->
+ resultBuilder.addAllocationOnPendingResource(
+ jobID,
+ newPendingTaskManager.getPendingTaskManagerId(),
+ slotProfile)));
+ }
+ }
+ }
+ }
+
+ private static class InternalResourceInfo {
+ private final ResourceProfile defaultSlotProfile;
+ private final BiConsumer allocationConsumer;
+ private ResourceProfile availableProfile;
+
+ InternalResourceInfo(
+ ResourceProfile defaultSlotProfile,
+ ResourceProfile availableProfile,
+ BiConsumer allocationConsumer) {
+ this.defaultSlotProfile = defaultSlotProfile;
+ this.availableProfile = availableProfile;
+ this.allocationConsumer = allocationConsumer;
+ }
+
+ boolean tryAllocateSlotForJob(JobID jobId, ResourceProfile requirement) {
+ final ResourceProfile effectiveProfile =
+ getEffectiveResourceProfile(requirement, defaultSlotProfile);
+ if (availableProfile.allFieldsNoLessThan(effectiveProfile)) {
+ availableProfile = availableProfile.subtract(effectiveProfile);
+ allocationConsumer.accept(jobId, effectiveProfile);
+ return true;
+ } else {
+ return false;
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java
index 641f527108544..01f3dc0c6c93b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/TestingTaskManagerResourceInfoProvider.java
@@ -158,7 +158,7 @@ public Builder setRegisteredTaskManagersSupplier(
return this;
}
- public Builder setgetPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction(
+ public Builder setGetPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction(
BiFunction>
getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction) {
this.getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction =