diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java index f89641f7742b0..50035e0b74a69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultSlotStatusSyncer.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -106,6 +107,14 @@ public CompletableFuture allocateSlot( "Could not find a registered task manager for instance id " + instanceId + '.'); final TaskExecutorGateway gateway = taskManager.get().getTaskExecutorConnection().getTaskExecutorGateway(); + final ResourceID resourceId = taskManager.get().getTaskExecutorConnection().getResourceID(); + + LOG.debug( + "Starting allocation of slot {} from {} for job {} with resource profile {}.", + allocationId, + resourceId, + jobId, + resourceProfile); taskManagerTracker.notifySlotStatus( allocationId, jobId, instanceId, resourceProfile, SlotState.PENDING); @@ -115,8 +124,7 @@ public CompletableFuture allocateSlot( // RPC call to the task manager CompletableFuture requestFuture = gateway.requestSlot( - SlotID.getDynamicSlotID( - taskManager.get().getTaskExecutorConnection().getResourceID()), + SlotID.getDynamicSlotID(resourceId), jobId, allocationId, resourceProfile,