Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
package org.apache.flink.runtime.resourcemanager.slotmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils.getEffectiveResourceProfile;
Expand All @@ -48,8 +47,9 @@
* exist.
*
* <p>Note: The current implementation of this strategy is non-optimal, in terms of computation
* efficiency. In the worst case, for each requirement it checks all registered and pending
* resources. TODO: This will be optimized in FLINK-21174.
* efficiency. In the worst case, for each distinctly profiled requirement it checks all registered
* and pending resources. Further optimization requires complex data structures for ordering
* multi-dimensional resource profiles. The complexity is not necessary.
*/
public class DefaultResourceAllocationStrategy implements ResourceAllocationStrategy {
private final ResourceProfile defaultSlotResourceProfile;
Expand All @@ -71,24 +71,18 @@ public ResourceAllocationResult tryFulfillRequirements(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
final ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();

// Tuples of available and default slot resource for registered task managers, indexed by
// instanceId
final Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources =
getRegisteredResources(taskManagerResourceInfoProvider);
// Available resources of pending task managers, indexed by the pendingTaskManagerId
final Map<PendingTaskManagerId, ResourceProfile> pendingResources =
getPendingResources(taskManagerResourceInfoProvider);
final List<InternalResourceInfo> registeredResources =
getRegisteredResources(taskManagerResourceInfoProvider, resultBuilder);
final List<InternalResourceInfo> pendingResources =
getPendingResources(taskManagerResourceInfoProvider, resultBuilder);

for (Map.Entry<JobID, Collection<ResourceRequirement>> resourceRequirements :
missingResources.entrySet()) {
final JobID jobId = resourceRequirements.getKey();

final ResourceCounter unfulfilledJobRequirements =
tryFulfillRequirementsForJobWithRegisteredResources(
jobId,
resourceRequirements.getValue(),
registeredResources,
resultBuilder);
final Collection<ResourceRequirement> unfulfilledJobRequirements =
tryFulfillRequirementsForJobWithResources(
jobId, resourceRequirements.getValue(), registeredResources);

if (!unfulfilledJobRequirements.isEmpty()) {
tryFulfillRequirementsForJobWithPendingResources(
Expand All @@ -98,160 +92,161 @@ public ResourceAllocationResult tryFulfillRequirements(
return resultBuilder.build();
}

private static Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> getRegisteredResources(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
private static List<InternalResourceInfo> getRegisteredResources(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
ResourceAllocationResult.Builder resultBuilder) {
return taskManagerResourceInfoProvider.getRegisteredTaskManagers().stream()
.collect(
Collectors.toMap(
TaskManagerInfo::getInstanceId,
taskManager ->
Tuple2.of(
taskManager.getAvailableResource(),
taskManager.getDefaultSlotResourceProfile())));
.map(
taskManager ->
new InternalResourceInfo(
taskManager.getDefaultSlotResourceProfile(),
taskManager.getAvailableResource(),
(jobId, slotProfile) ->
resultBuilder.addAllocationOnRegisteredResource(
jobId,
taskManager.getInstanceId(),
slotProfile)))
.collect(Collectors.toList());
}

private static Map<PendingTaskManagerId, ResourceProfile> getPendingResources(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
private static List<InternalResourceInfo> getPendingResources(
TaskManagerResourceInfoProvider taskManagerResourceInfoProvider,
ResourceAllocationResult.Builder resultBuilder) {
return taskManagerResourceInfoProvider.getPendingTaskManagers().stream()
.collect(
Collectors.toMap(
PendingTaskManager::getPendingTaskManagerId,
PendingTaskManager::getTotalResourceProfile));
.map(
pendingTaskManager ->
new InternalResourceInfo(
pendingTaskManager.getDefaultSlotResourceProfile(),
pendingTaskManager.getTotalResourceProfile(),
(jobId, slotProfile) ->
resultBuilder.addAllocationOnPendingResource(
jobId,
pendingTaskManager
.getPendingTaskManagerId(),
slotProfile)))
.collect(Collectors.toList());
}

private static ResourceCounter tryFulfillRequirementsForJobWithRegisteredResources(
private static int tryFulfilledRequirementWithResource(
List<InternalResourceInfo> internalResource,
int numUnfulfilled,
ResourceProfile requiredResource,
JobID jobId) {
final Iterator<InternalResourceInfo> internalResourceInfoItr = internalResource.iterator();
while (numUnfulfilled > 0 && internalResourceInfoItr.hasNext()) {
final InternalResourceInfo currentTaskManager = internalResourceInfoItr.next();
while (numUnfulfilled > 0
&& currentTaskManager.tryAllocateSlotForJob(jobId, requiredResource)) {
numUnfulfilled--;
}
if (currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
internalResourceInfoItr.remove();
}
}
return numUnfulfilled;
}

private static Collection<ResourceRequirement> tryFulfillRequirementsForJobWithResources(
JobID jobId,
Collection<ResourceRequirement> missingResources,
Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
ResourceAllocationResult.Builder resultBuilder) {
ResourceCounter outstandingRequirements = ResourceCounter.empty();
List<InternalResourceInfo> registeredResources) {
Collection<ResourceRequirement> outstandingRequirements = new ArrayList<>();

for (ResourceRequirement resourceRequirement : missingResources) {
int numMissingRequirements =
tryFindSlotsForRequirement(
jobId, resourceRequirement, registeredResources, resultBuilder);
tryFulfilledRequirementWithResource(
registeredResources,
resourceRequirement.getNumberOfRequiredSlots(),
resourceRequirement.getResourceProfile(),
jobId);
if (numMissingRequirements > 0) {
outstandingRequirements =
outstandingRequirements.add(
resourceRequirement.getResourceProfile(), numMissingRequirements);
outstandingRequirements.add(
ResourceRequirement.create(
resourceRequirement.getResourceProfile(), numMissingRequirements));
}
}
return outstandingRequirements;
}

private static int tryFindSlotsForRequirement(
JobID jobId,
ResourceRequirement resourceRequirement,
Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
ResourceAllocationResult.Builder resultBuilder) {
final ResourceProfile requiredResource = resourceRequirement.getResourceProfile();

int numUnfulfilled = resourceRequirement.getNumberOfRequiredSlots();
while (numUnfulfilled > 0) {
final Optional<InstanceID> matchedTaskManager =
findMatchingTaskManager(requiredResource, registeredResources);

if (!matchedTaskManager.isPresent()) {
// exit loop early; we won't find a matching slot for this requirement
break;
}

final ResourceProfile effectiveProfile =
getEffectiveResourceProfile(
requiredResource, registeredResources.get(matchedTaskManager.get()).f1);
resultBuilder.addAllocationOnRegisteredResource(
jobId, matchedTaskManager.get(), effectiveProfile);
deductionRegisteredResource(
registeredResources, matchedTaskManager.get(), effectiveProfile);
numUnfulfilled--;
}
return numUnfulfilled;
}

private static Optional<InstanceID> findMatchingTaskManager(
ResourceProfile requirement,
Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources) {
return registeredResources.entrySet().stream()
.filter(
taskManager ->
canFulfillRequirement(
getEffectiveResourceProfile(
requirement, taskManager.getValue().f1),
taskManager.getValue().f0))
.findAny()
.map(Map.Entry::getKey);
}

private static boolean canFulfillRequirement(
ResourceProfile requirement, ResourceProfile resourceProfile) {
return resourceProfile.allFieldsNoLessThan(requirement);
}

private static void deductionRegisteredResource(
Map<InstanceID, Tuple2<ResourceProfile, ResourceProfile>> registeredResources,
InstanceID instanceId,
ResourceProfile resourceProfile) {
registeredResources.compute(
instanceId,
(id, tuple2) -> {
Preconditions.checkNotNull(tuple2);
if (tuple2.f0.subtract(resourceProfile).equals(ResourceProfile.ZERO)) {
return null;
} else {
return Tuple2.of(tuple2.f0.subtract(resourceProfile), tuple2.f1);
}
});
}

private static Optional<PendingTaskManagerId> findPendingManagerToFulfill(
ResourceProfile resourceProfile,
Map<PendingTaskManagerId, ResourceProfile> availableResources) {
return availableResources.entrySet().stream()
.filter(entry -> entry.getValue().allFieldsNoLessThan(resourceProfile))
.findAny()
.map(Map.Entry::getKey);
}

private void tryFulfillRequirementsForJobWithPendingResources(
JobID jobId,
ResourceCounter unfulfilledRequirements,
Map<PendingTaskManagerId, ResourceProfile> availableResources,
Collection<ResourceRequirement> unfulfilledRequirements,
List<InternalResourceInfo> availableResources,
ResourceAllocationResult.Builder resultBuilder) {
for (Map.Entry<ResourceProfile, Integer> missingResource :
unfulfilledRequirements.getResourcesWithCount()) {
final Collection<ResourceRequirement> missingResources =
tryFulfillRequirementsForJobWithResources(
jobId, unfulfilledRequirements, availableResources);
for (ResourceRequirement missingResource : missingResources) {
// for this strategy, all pending resources should have the same default slot resource
final ResourceProfile effectiveProfile =
getEffectiveResourceProfile(
missingResource.getKey(), defaultSlotResourceProfile);
for (int i = 0; i < missingResource.getValue(); i++) {
Optional<PendingTaskManagerId> matchedPendingTaskManager =
findPendingManagerToFulfill(effectiveProfile, availableResources);
if (matchedPendingTaskManager.isPresent()) {
availableResources.compute(
matchedPendingTaskManager.get(),
((pendingTaskManagerId, resourceProfile) ->
Preconditions.checkNotNull(resourceProfile)
.subtract(effectiveProfile)));
missingResource.getResourceProfile(), defaultSlotResourceProfile);
int numUnfulfilled = missingResource.getNumberOfRequiredSlots();

if (!totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
// Can not fulfill this resource type will the default worker.
resultBuilder.addUnfulfillableJob(jobId);
continue;
}

while (numUnfulfilled > 0) {
// Circularly add new pending task manager
final PendingTaskManager newPendingTaskManager =
new PendingTaskManager(totalResourceProfile, numSlotsPerWorker);
resultBuilder.addPendingTaskManagerAllocate(newPendingTaskManager);
ResourceProfile remainResource = totalResourceProfile;
while (numUnfulfilled > 0
&& canFulfillRequirement(effectiveProfile, remainResource)) {
numUnfulfilled--;
resultBuilder.addAllocationOnPendingResource(
jobId, matchedPendingTaskManager.get(), effectiveProfile);
} else {
if (totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
// Add new pending task manager
final PendingTaskManager pendingTaskManager =
new PendingTaskManager(totalResourceProfile, numSlotsPerWorker);
resultBuilder.addPendingTaskManagerAllocate(pendingTaskManager);
resultBuilder.addAllocationOnPendingResource(
jobId,
pendingTaskManager.getPendingTaskManagerId(),
effectiveProfile);
availableResources.put(
pendingTaskManager.getPendingTaskManagerId(),
totalResourceProfile.subtract(effectiveProfile));
} else {
resultBuilder.addUnfulfillableJob(jobId);
break;
}
jobId,
newPendingTaskManager.getPendingTaskManagerId(),
effectiveProfile);
remainResource = remainResource.subtract(effectiveProfile);
}
if (!remainResource.equals(ResourceProfile.ZERO)) {
availableResources.add(
new InternalResourceInfo(
defaultSlotResourceProfile,
remainResource,
(jobID, slotProfile) ->
resultBuilder.addAllocationOnPendingResource(
jobID,
newPendingTaskManager.getPendingTaskManagerId(),
slotProfile)));
}
}
}
}

private static class InternalResourceInfo {
private final ResourceProfile defaultSlotProfile;
private final BiConsumer<JobID, ResourceProfile> allocationConsumer;
private ResourceProfile availableProfile;

InternalResourceInfo(
ResourceProfile defaultSlotProfile,
ResourceProfile availableProfile,
BiConsumer<JobID, ResourceProfile> allocationConsumer) {
this.defaultSlotProfile = defaultSlotProfile;
this.availableProfile = availableProfile;
this.allocationConsumer = allocationConsumer;
}

boolean tryAllocateSlotForJob(JobID jobId, ResourceProfile requirement) {
final ResourceProfile effectiveProfile =
getEffectiveResourceProfile(requirement, defaultSlotProfile);
if (availableProfile.allFieldsNoLessThan(effectiveProfile)) {
availableProfile = availableProfile.subtract(effectiveProfile);
allocationConsumer.accept(jobId, effectiveProfile);
return true;
} else {
return false;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public Builder setRegisteredTaskManagersSupplier(
return this;
}

public Builder setgetPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction(
public Builder setGetPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction(
BiFunction<ResourceProfile, ResourceProfile, Collection<PendingTaskManager>>
getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction) {
this.getPendingTaskManagersByTotalAndDefaultSlotResourceProfileFunction =
Expand Down