diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java index a55e8c80b2f5a..05fa0319f331b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -66,13 +66,13 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem private static final class FulfilledAllocation { final AllocationID allocationID; final ResourceID taskExecutorID; - final LoadingWeight loadingWeight; + final ResourceUnitCount resourceUnitCount; - FulfilledAllocation(PhysicalSlot slot, LoadingWeight loadingWeight) { + FulfilledAllocation(PhysicalSlot slot, ResourceUnitCount resourceUnitCount) { this.allocationID = Preconditions.checkNotNull(slot.getAllocationId()); this.taskExecutorID = Preconditions.checkNotNull(slot.getTaskManagerLocation().getResourceID()); - this.loadingWeight = Preconditions.checkNotNull(loadingWeight); + this.resourceUnitCount = Preconditions.checkNotNull(resourceUnitCount); } @Override @@ -83,7 +83,7 @@ public boolean equals(Object o) { FulfilledAllocation that = (FulfilledAllocation) o; return Objects.equals(allocationID, that.allocationID) && Objects.equals(taskExecutorID, that.taskExecutorID) - && Objects.equals(loadingWeight, that.loadingWeight); + && Objects.equals(resourceUnitCount, that.resourceUnitCount); } } @@ -261,16 +261,16 @@ void newSlotsAreAvailable(Collection newSlots) { } } - private Map getTaskExecutorsLoadingView() { - final Map result = new HashMap<>(); + private Map getTaskExecutorsLoadingView() { + final Map result = new HashMap<>(); Collection fulfilledAllocations = fulfilledRequests.values(); for (FulfilledAllocation allocation : fulfilledAllocations) { result.compute( allocation.taskExecutorID, (ignoredID, oldLoading) -> Objects.isNull(oldLoading) - ? allocation.loadingWeight - : oldLoading.merge(allocation.loadingWeight)); + ? allocation.resourceUnitCount + : oldLoading.merge(allocation.resourceUnitCount)); } return result; } @@ -353,7 +353,8 @@ private PhysicalSlot reserveFreeSlot(AllocationID allocationId, PendingRequest p getDeclarativeSlotPool() .reserveFreeSlot(allocationId, pendingRequest.getResourceProfile()); fulfilledRequests.put( - slotRequestId, new FulfilledAllocation(slot, pendingRequest.getLoading())); + slotRequestId, + new FulfilledAllocation(slot, pendingRequest.getResourceUnitCount())); return slot; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java index 339db7f575f0a..b677ddb1a87c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PendingRequest.java @@ -21,8 +21,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; -import org.apache.flink.runtime.scheduler.loading.WeightLoadable; +import org.apache.flink.runtime.scheduler.resourceunit.HasResourceUnit; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -32,13 +32,13 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; -public final class PendingRequest implements WeightLoadable { +public final class PendingRequest implements HasResourceUnit { private final SlotRequestId slotRequestId; private final ResourceProfile resourceProfile; - private final LoadingWeight loadingWeight; + private final ResourceUnitCount resourceUnitCount; private final HashSet preferredAllocations; @@ -51,12 +51,12 @@ public final class PendingRequest implements WeightLoadable { private PendingRequest( SlotRequestId slotRequestId, ResourceProfile resourceProfile, - LoadingWeight loadingWeight, + ResourceUnitCount resourceUnitCount, Collection preferredAllocations, boolean isBatchRequest) { this.slotRequestId = slotRequestId; this.resourceProfile = Preconditions.checkNotNull(resourceProfile); - this.loadingWeight = Preconditions.checkNotNull(loadingWeight); + this.resourceUnitCount = Preconditions.checkNotNull(resourceUnitCount); this.preferredAllocations = new HashSet<>(preferredAllocations); this.isBatchRequest = isBatchRequest; this.slotFuture = new CompletableFuture<>(); @@ -66,19 +66,19 @@ private PendingRequest( static PendingRequest createBatchRequest( SlotRequestId slotRequestId, ResourceProfile resourceProfile, - LoadingWeight loadingWeight, + ResourceUnitCount resourceUnitCount, Collection preferredAllocations) { return new PendingRequest( - slotRequestId, resourceProfile, loadingWeight, preferredAllocations, true); + slotRequestId, resourceProfile, resourceUnitCount, preferredAllocations, true); } public static PendingRequest createNormalRequest( SlotRequestId slotRequestId, ResourceProfile resourceProfile, - LoadingWeight loadingWeight, + ResourceUnitCount resourceUnitCount, Collection preferredAllocations) { return new PendingRequest( - slotRequestId, resourceProfile, loadingWeight, preferredAllocations, false); + slotRequestId, resourceProfile, resourceUnitCount, preferredAllocations, false); } SlotRequestId getSlotRequestId() { @@ -135,7 +135,7 @@ public String toString() { + ", resourceProfile=" + resourceProfile + ", loadingWeight=" - + loadingWeight + + resourceUnitCount + ", preferredAllocations=" + preferredAllocations + ", isBatchRequest=" @@ -146,7 +146,7 @@ public String toString() { } @Override - public @Nonnull LoadingWeight getLoading() { - return loadingWeight; + public @Nonnull ResourceUnitCount getResourceUnitCount() { + return resourceUnitCount; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java index 08a9bfe5dbf0c..cffaaaa512652 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequest.java @@ -21,31 +21,31 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; -import org.apache.flink.runtime.scheduler.loading.WeightLoadable; +import org.apache.flink.runtime.scheduler.resourceunit.HasResourceUnit; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import javax.annotation.Nonnull; /** Represents a request for a physical slot. */ -public class PhysicalSlotRequest implements WeightLoadable { +public class PhysicalSlotRequest implements HasResourceUnit { private final SlotRequestId slotRequestId; private final SlotProfile slotProfile; - private final LoadingWeight loadingWeight; + private final ResourceUnitCount resourceUnitCount; private final boolean slotWillBeOccupiedIndefinitely; public PhysicalSlotRequest( final SlotRequestId slotRequestId, final SlotProfile slotProfile, - final LoadingWeight loadingWeight, + final ResourceUnitCount resourceUnitCount, final boolean slotWillBeOccupiedIndefinitely) { this.slotRequestId = slotRequestId; this.slotProfile = slotProfile; - this.loadingWeight = loadingWeight; + this.resourceUnitCount = resourceUnitCount; this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; } @@ -70,18 +70,18 @@ public PendingRequest toPendingRequest() { ? PendingRequest.createNormalRequest( slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), - loadingWeight, + resourceUnitCount, slotProfile.getPreferredAllocations()) : PendingRequest.createBatchRequest( slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), - loadingWeight, + resourceUnitCount, slotProfile.getPreferredAllocations()); } @Override - public @Nonnull LoadingWeight getLoading() { - return loadingWeight; + public @Nonnull ResourceUnitCount getResourceUnitCount() { + return resourceUnitCount; } /** Result of a {@link PhysicalSlotRequest}. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java index d6654b7a25468..3fb03dba7cc75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategy.java @@ -21,7 +21,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -58,7 +58,7 @@ public static RequestSlotMatchingStrategy create(RequestSlotMatchingStrategy rol public Collection matchRequestsAndSlots( Collection slots, Collection pendingRequests, - Map taskExecutorsLoadingWeight) { + Map taskExecutorsLoadingWeight) { final Collection requestSlotMatches = new ArrayList<>(); final Map freeSlots = @@ -97,7 +97,7 @@ public Collection matchRequestsAndSlots( .getPreferredAllocations() .contains(freeSlot.getAllocationId())) { requestSlotMatches.add(RequestSlotMatch.createFor(pendingRequest, freeSlot)); - LoadingWeight deltaLoading = pendingRequest.getLoading(); + ResourceUnitCount deltaLoading = pendingRequest.getResourceUnitCount(); taskExecutorsLoadingWeight.compute( freeSlot.getTaskManagerLocation().getResourceID(), (ignoredId, oldLoad) -> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java index d4774d72f5d8f..7278d22007e51 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/RequestSlotMatchingStrategy.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import java.util.Collection; import java.util.Map; @@ -38,7 +38,7 @@ public interface RequestSlotMatchingStrategy { Collection matchRequestsAndSlots( Collection slots, Collection pendingRequests, - Map taskExecutorsLoadingWeight); + Map taskExecutorsLoadingWeight); /** Result class representing matches. */ final class RequestSlotMatch { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java index cf354afbcad66..857784fb949aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategy.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import java.util.ArrayList; import java.util.Collection; @@ -38,7 +38,7 @@ public enum SimpleRequestSlotMatchingStrategy implements RequestSlotMatchingStra public Collection matchRequestsAndSlots( Collection slots, Collection pendingRequests, - Map taskExecutorsLoadingWeight) { + Map taskExecutorsLoadingWeight) { final Collection resultingMatches = new ArrayList<>(); // if pendingRequests has a special order, then let's preserve it diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java index c70f15ad17e7e..aac65cc5ce5b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategy.java @@ -20,8 +20,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.runtime.state.PriorityComparator; import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement; import org.apache.flink.runtime.state.heap.HeapPriorityQueue; @@ -42,7 +42,7 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.apache.flink.runtime.scheduler.loading.WeightLoadable.sortByLoadingDescend; +import static org.apache.flink.runtime.scheduler.resourceunit.HasResourceUnit.sortDesc; /** * The tasks balanced based implementation of {@link RequestSlotMatchingStrategy} that matches the @@ -57,20 +57,20 @@ public enum TasksBalancedRequestSlotMatchingStrategy implements RequestSlotMatch /** The {@link PhysicalSlotElement} comparator to compare loading. */ static final class PhysicalSlotElementComparator implements Comparator { - private final Map taskExecutorsLoading; + private final Map taskExecutorsLoading; - PhysicalSlotElementComparator(Map taskExecutorsLoading) { + PhysicalSlotElementComparator(Map taskExecutorsLoading) { this.taskExecutorsLoading = Preconditions.checkNotNull(taskExecutorsLoading); } @Override public int compare(PhysicalSlotElement left, PhysicalSlotElement right) { - final LoadingWeight leftLoad = + final ResourceUnitCount leftLoad = taskExecutorsLoading.getOrDefault( - left.getResourceID(), DefaultLoadingWeight.EMPTY); - final LoadingWeight rightLoad = + left.getResourceID(), DefaultResourceUnitCount.EMPTY); + final ResourceUnitCount rightLoad = taskExecutorsLoading.getOrDefault( - right.getResourceID(), DefaultLoadingWeight.EMPTY); + right.getResourceID(), DefaultResourceUnitCount.EMPTY); return leftLoad.compareTo(rightLoad); } } @@ -115,7 +115,8 @@ static final class PhysicalSlotElementPriorityComparator private final PhysicalSlotElementComparator physicalSlotElementComparator; - PhysicalSlotElementPriorityComparator(Map taskExecutorsLoading) { + PhysicalSlotElementPriorityComparator( + Map taskExecutorsLoading) { this.physicalSlotElementComparator = new PhysicalSlotElementComparator(taskExecutorsLoading); } @@ -130,7 +131,7 @@ public int comparePriority(PhysicalSlotElement left, PhysicalSlotElement right) public Collection matchRequestsAndSlots( Collection slots, Collection pendingRequests, - Map taskExecutorsLoad) { + Map taskExecutorsLoad) { ResourceRequestPreMappings resourceRequestPreMappings = ResourceRequestPreMappings.createFrom(pendingRequests, slots); if (!resourceRequestPreMappings.isMatchingFulfilled()) { @@ -138,7 +139,7 @@ public Collection matchRequestsAndSlots( } final Collection resultingMatches = new ArrayList<>(); - final List sortedRequests = sortByLoadingDescend(pendingRequests); + final List sortedRequests = sortDesc(pendingRequests); logDebugInfo(slots, taskExecutorsLoad, sortedRequests); @@ -166,20 +167,20 @@ public Collection matchRequestsAndSlots( } private static void updateTaskExecutorsLoad( - Map taskExecutorsLoad, + Map taskExecutorsLoad, PendingRequest request, PhysicalSlotElement slotElement) { taskExecutorsLoad.compute( slotElement.getResourceID(), (ignoredId, oldLoading) -> Objects.isNull(oldLoading) - ? request.getLoading() - : oldLoading.merge(request.getLoading())); + ? request.getResourceUnitCount() + : oldLoading.merge(request.getResourceUnitCount())); } private static void logDebugInfo( Collection slots, - Map taskExecutorsLoad, + Map taskExecutorsLoad, List sortedRequests) { LOG.debug( "Available slots: {}, sortedRequests: {}, taskExecutorsLoad: {}", @@ -198,7 +199,7 @@ private Map> groupSlotsByTaskExecutor( private Map> getSlotCandidatesByProfile( Collection slotElements, - Map taskExecutorsLoad) { + Map taskExecutorsLoad) { final Map> result = new HashMap<>(); final PhysicalSlotElementPriorityComparator physicalSlotElementPriorityComparator = new PhysicalSlotElementPriorityComparator(taskExecutorsLoad); @@ -221,7 +222,7 @@ private Map> getSlotCand private Optional tryMatchPhysicalSlot( PendingRequest request, Map> profileToSlotMap, - Map taskExecutorsLoad, + Map taskExecutorsLoad, ResourceRequestPreMappings resourceRequestPreMappings) { final ResourceProfile requestProfile = request.getResourceProfile(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java index 7f3dc35d5ed50..d1437c13869a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java @@ -20,9 +20,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; -import org.apache.flink.runtime.scheduler.loading.WeightLoadable; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; +import org.apache.flink.runtime.scheduler.resourceunit.HasResourceUnit; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.util.Preconditions; @@ -33,7 +33,7 @@ import java.util.Set; /** Represents execution vertices that will run the same shared slot. */ -public class ExecutionSlotSharingGroup implements WeightLoadable { +public class ExecutionSlotSharingGroup implements HasResourceUnit { private final Set executionVertexIds; @@ -70,13 +70,13 @@ public String toString() { + ", slotSharingGroup=" + slotSharingGroup + ", loadingWeight=" - + getLoading() + + getResourceUnitCount() + '}'; } @Nonnull @Override - public LoadingWeight getLoading() { - return new DefaultLoadingWeight(executionVertexIds.size()); + public ResourceUnitCount getResourceUnitCount() { + return new DefaultResourceUnitCount(executionVertexIds.size()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java index 0a5e79777b2d0..c9894239b8d22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.DualKeyLinkedMap; import org.apache.flink.util.FlinkException; @@ -109,7 +109,7 @@ public Map allocateSlotsFor( new PhysicalSlotRequest( slotRequestId, slotProfile, - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, slotWillBeOccupiedIndefinitely); physicalSlotRequests.add(request); remainingExecutionsToSlotRequest.put(slotRequestId, executionAttemptId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java index 00a8ebd020c72..ce046de212242 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java @@ -272,7 +272,7 @@ private Map allocateSharedSlots( new PhysicalSlotRequest( physicalSlotRequestId, slotProfile, - group.getLoading(), + group.getResourceUnitCount(), slotWillBeOccupiedIndefinitely); slotRequests.add(request); requestToGroup.put(physicalSlotRequestId, group); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java index 39bfa7902a3d4..66381bbeb54be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java @@ -28,9 +28,9 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; -import org.apache.flink.runtime.scheduler.loading.WeightLoadable; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; +import org.apache.flink.runtime.scheduler.resourceunit.HasResourceUnit; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.Preconditions; @@ -344,7 +344,7 @@ private SharedSlot reserveSharedSlot(SlotInfo slotInfo) { } /** The execution slot sharing group for adaptive scheduler. */ - public static class ExecutionSlotSharingGroup implements WeightLoadable { + public static class ExecutionSlotSharingGroup implements HasResourceUnit { private final String id; private final SlotSharingGroup slotSharingGroup; private final Set containedExecutionVertices; @@ -382,8 +382,8 @@ public Collection getContainedExecutionVertices() { @Nonnull @Override - public LoadingWeight getLoading() { - return new DefaultLoadingWeight(containedExecutionVertices.size()); + public ResourceUnitCount getResourceUnitCount() { + return new DefaultResourceUnitCount(containedExecutionVertices.size()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java index d4ee374227ec1..43b01b0e54229 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TasksBalancedSlotMatchingResolver.java @@ -20,8 +20,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.util.CollectionUtil; import java.util.ArrayList; @@ -35,7 +35,7 @@ import static org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; import static org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; -import static org.apache.flink.runtime.scheduler.loading.WeightLoadable.sortByLoadingDescend; +import static org.apache.flink.runtime.scheduler.resourceunit.HasResourceUnit.sortDesc; /** The tasks balanced request slot matching resolver implementation. */ public enum TasksBalancedSlotMatchingResolver implements SlotMatchingResolver { @@ -49,17 +49,17 @@ public Collection matchSlotSharingGroupWithSlo new ArrayList<>(requestGroups.size()); final Map> slotsPerTaskExecutor = AllocatorUtil.getSlotsPerTaskExecutor(freeSlots); - final TreeMap> loadingSlotsMap = + final TreeMap> loadingSlotsMap = getLoadingSlotsMap(freeSlots); - SlotTaskExecutorWeight best; - for (ExecutionSlotSharingGroup requestGroup : sortByLoadingDescend(requestGroups)) { + SlotTaskExecutorWeight best; + for (ExecutionSlotSharingGroup requestGroup : sortDesc(requestGroups)) { best = getTheBestSlotTaskExecutorLoading(loadingSlotsMap); slotAssignments.add(new SlotAssignment(best.physicalSlot, requestGroup)); // Update the references - final LoadingWeight newLoading = - best.taskExecutorWeight.merge(requestGroup.getLoading()); + final ResourceUnitCount newLoading = + best.taskExecutorWeight.merge(requestGroup.getResourceUnitCount()); updateSlotsPerTaskExecutor(slotsPerTaskExecutor, best); Set physicalSlots = slotsPerTaskExecutor.get(best.getResourceID()); updateLoadingSlotsMap(loadingSlotsMap, best, physicalSlots, newLoading); @@ -68,10 +68,10 @@ public Collection matchSlotSharingGroupWithSlo } private static void updateLoadingSlotsMap( - Map> loadingSlotsMap, - SlotTaskExecutorWeight best, + Map> loadingSlotsMap, + SlotTaskExecutorWeight best, Set slotsToAdjust, - LoadingWeight newLoading) { + ResourceUnitCount newLoading) { Set physicalSlots = loadingSlotsMap.get(best.taskExecutorWeight); if (!CollectionUtil.isNullOrEmpty(physicalSlots)) { physicalSlots.remove(best.physicalSlot); @@ -96,7 +96,7 @@ private static void updateLoadingSlotsMap( private static void updateSlotsPerTaskExecutor( Map> slotsPerTaskExecutor, - SlotTaskExecutorWeight best) { + SlotTaskExecutorWeight best) { Set slots = slotsPerTaskExecutor.get(best.getResourceID()); if (Objects.nonNull(slots)) { slots.remove(best.physicalSlot); @@ -106,21 +106,22 @@ private static void updateSlotsPerTaskExecutor( } } - private static TreeMap> getLoadingSlotsMap( + private static TreeMap> getLoadingSlotsMap( Collection slots) { return new TreeMap<>() { { HashSet slotsValue = CollectionUtil.newHashSetWithExpectedSize(slots.size()); slotsValue.addAll(slots); - put(DefaultLoadingWeight.EMPTY, slotsValue); + put(DefaultResourceUnitCount.EMPTY, slotsValue); } }; } - private static SlotTaskExecutorWeight getTheBestSlotTaskExecutorLoading( - TreeMap> slotsByLoading) { - final Map.Entry> firstEntry = slotsByLoading.firstEntry(); + private static SlotTaskExecutorWeight getTheBestSlotTaskExecutorLoading( + TreeMap> slotsByLoading) { + final Map.Entry> firstEntry = + slotsByLoading.firstEntry(); if (firstEntry == null || firstEntry.getKey() == null || CollectionUtil.isNullOrEmpty(firstEntry.getValue())) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/resourceunit/DefaultResourceUnitCount.java similarity index 56% rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/resourceunit/DefaultResourceUnitCount.java index c7c41505ae286..1e3b4f41f854b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/resourceunit/DefaultResourceUnitCount.java @@ -16,44 +16,44 @@ * limitations under the License. */ -package org.apache.flink.runtime.scheduler.loading; +package org.apache.flink.runtime.scheduler.resourceunit; import org.apache.flink.annotation.Internal; import org.apache.flink.util.Preconditions; -import javax.annotation.Nonnull; - import java.util.Objects; -/** The default implementation of {@link LoadingWeight}. */ +/** The default implementation of {@link ResourceUnitCount}. */ @Internal -public class DefaultLoadingWeight implements LoadingWeight { +public class DefaultResourceUnitCount implements ResourceUnitCount { - public static final LoadingWeight EMPTY = new DefaultLoadingWeight(0f); + public static final ResourceUnitCount EMPTY = new DefaultResourceUnitCount(0f); - private final float loading; + private final float count; - public DefaultLoadingWeight(float loading) { - Preconditions.checkArgument(loading >= 0.0f); - this.loading = loading; + public DefaultResourceUnitCount(float count) { + Preconditions.checkArgument(count >= 0.0f); + this.count = count; } @Override - public float getLoading() { - return loading; + public float getCount() { + return count; } @Override - public LoadingWeight merge(LoadingWeight other) { - if (other == null) { - return this; - } - return new DefaultLoadingWeight(loading + other.getLoading()); + public int getCountAsInt() { + return (int) count; + } + + @Override + public ResourceUnitCount merge(ResourceUnitCount other) { + return other == null ? this : new DefaultResourceUnitCount(count + other.getCount()); } @Override - public int compareTo(@Nonnull LoadingWeight o) { - return Float.compare(loading, o.getLoading()); + public int compareTo(ResourceUnitCount o) { + return Float.compare(count, o.getCount()); } @Override @@ -64,17 +64,17 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - DefaultLoadingWeight that = (DefaultLoadingWeight) o; - return Float.compare(loading, that.loading) == 0f; + DefaultResourceUnitCount that = (DefaultResourceUnitCount) o; + return compareTo(that) == 0f; } @Override public int hashCode() { - return Objects.hash(loading); + return Objects.hash(count); } @Override public String toString() { - return "DefaultLoadingWeight{loading=" + loading + '}'; + return "DefaultResourceUnitCount{count=" + count + '}'; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/resourceunit/HasResourceUnit.java similarity index 65% rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/resourceunit/HasResourceUnit.java index 72f8b994ab2fb..a31ff2da41d97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/resourceunit/HasResourceUnit.java @@ -16,36 +16,34 @@ * limitations under the License. */ -package org.apache.flink.runtime.scheduler.loading; +package org.apache.flink.runtime.scheduler.resourceunit; import org.apache.flink.annotation.Internal; -import javax.annotation.Nonnull; - import java.util.Collection; import java.util.List; import java.util.stream.Collectors; /** - * The interface that holds the {@link LoadingWeight} getter is required for corresponding + * The interface that holds the {@link ResourceUnitCount} getter is required for corresponding * abstractions. */ @Internal -public interface WeightLoadable { +public interface HasResourceUnit { /** - * Get the loading weight. + * Get the resource unit count. * - * @return An implementation object of {@link LoadingWeight}. + * @return An implementation object of {@link ResourceUnitCount}. */ - @Nonnull - LoadingWeight getLoading(); + ResourceUnitCount getResourceUnitCount(); - static List sortByLoadingDescend(Collection weightLoadables) { - return weightLoadables.stream() + static List sortDesc(Collection col) { + return col.stream() .sorted( (leftReq, rightReq) -> - rightReq.getLoading().compareTo(leftReq.getLoading())) + rightReq.getResourceUnitCount() + .compareTo(leftReq.getResourceUnitCount())) .collect(Collectors.toList()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/resourceunit/ResourceUnitCount.java similarity index 59% rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/resourceunit/ResourceUnitCount.java index 80ec766cc6a9b..42d560e3b39b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/resourceunit/ResourceUnitCount.java @@ -16,28 +16,35 @@ * limitations under the License. */ -package org.apache.flink.runtime.scheduler.loading; +package org.apache.flink.runtime.scheduler.resourceunit; import org.apache.flink.annotation.Internal; import java.io.Serializable; -/** The class is used to represent the loading weight abstraction. */ +/** The class is used to represent the load weight abstraction. */ @Internal -public interface LoadingWeight extends Comparable, Serializable { +public interface ResourceUnitCount extends Comparable, Serializable { /** - * Get the loading value. + * Returns the resource unit count. * - * @return A float represented the loading. + * @return the current resource unit count */ - float getLoading(); + float getCount(); /** - * Merge the other loading weight and this one into a new object. + * Returns the resource unit count as an integer, truncated if necessary. + * + * @return the current resource unit count + */ + int getCountAsInt(); + + /** + * Merge the other resource unit count and this one into a new object. * * @param other A loading weight object. - * @return The new merged {@link LoadingWeight}. + * @return the new merged {@link ResourceUnitCount}. */ - LoadingWeight merge(LoadingWeight other); + ResourceUnitCount merge(ResourceUnitCount other); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java index 3e1e3dfa2cb38..4a4fef9ee2655 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderExtension.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -102,7 +102,7 @@ public PhysicalSlotRequest createSimpleRequest() { return new PhysicalSlotRequest( new SlotRequestId(), SlotProfileTestingUtils.noLocality(ResourceProfile.UNKNOWN), - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, false); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java index 00d42d9884887..61a0190675a82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotProviderImplWithSpreadOutStrategyTest.java @@ -21,7 +21,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.junit.jupiter.api.Test; @@ -85,7 +85,7 @@ void testSlotAllocationFulfilledWithPreferredInputOverwrittingSpreadOut() SlotProfileTestingUtils.preferredLocality( ResourceProfile.ANY, Collections.singleton(preferredTaskManagerLocation)), - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, false); PhysicalSlotRequest.Result result1 = physicalSlotProviderExtension.allocateSlot(request1).get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java index 7462f07c679d4..0682e9bbafbdc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestUtils.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.jobmaster.SlotRequestId; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; import java.util.Collection; import java.util.Collections; @@ -44,7 +44,7 @@ private static PhysicalSlotRequest normalRequest( Collections.emptyList(), preferredAllocations, Collections.emptySet()), - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, true); } @@ -71,7 +71,7 @@ public static PhysicalSlotRequest batchRequest(final ResourceProfile requiredRes Collections.emptyList(), Collections.emptyList(), Collections.emptySet()), - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, false); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java index 3a047d8b45bbf..9808918ffbcda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/PreferredAllocationRequestSlotMatchingStrategyTest.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLoggerExtension; @@ -103,12 +103,12 @@ void testNewSlotsAreMatchedAgainstPreferredAllocationIDs() { PendingRequest.createNormalRequest( new SlotRequestId(), ResourceProfile.UNKNOWN, - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, Collections.singleton(allocationId2)), PendingRequest.createNormalRequest( new SlotRequestId(), ResourceProfile.UNKNOWN, - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, Collections.singleton(allocationId1))); final Collection requestSlotMatches = @@ -204,7 +204,7 @@ private static PendingRequest createRequest( return PendingRequest.createNormalRequest( new SlotRequestId(), requestProfile, - new DefaultLoadingWeight(loading), + new DefaultResourceUnitCount(loading), preferAllocationIds); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java index c2bf3fd691b5f..f6b0bf431881a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/ResourceRequestPreMappingsTest.java @@ -20,7 +20,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; import org.apache.flink.util.Preconditions; import org.junit.jupiter.api.Test; @@ -357,7 +357,7 @@ private List newPendingRequests(ResourceProfile... requiredProfi PendingRequest.createNormalRequest( new SlotRequestId(), Preconditions.checkNotNull(requiredProfile), - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, Collections.emptyList())); } return pendingRequests; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java index 32f834e6fcf1d..da3ad608c3915 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SimpleRequestSlotMatchingStrategyTest.java @@ -21,7 +21,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.shaded.guava33.com.google.common.collect.Iterators; @@ -50,13 +50,13 @@ public void testSlotRequestsAreMatchedInOrder() { PendingRequest.createNormalRequest( new SlotRequestId(), ResourceProfile.UNKNOWN, - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, Collections.emptyList()); final PendingRequest pendingRequest2 = PendingRequest.createNormalRequest( new SlotRequestId(), ResourceProfile.UNKNOWN, - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, Collections.emptyList()); final Collection pendingRequests = Arrays.asList(pendingRequest1, pendingRequest2); @@ -90,13 +90,13 @@ public void testSlotRequestsThatCanBeFulfilledAreMatched() { PendingRequest.createNormalRequest( new SlotRequestId(), large, - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, Collections.emptyList()); final PendingRequest pendingRequest2 = PendingRequest.createNormalRequest( new SlotRequestId(), small, - DefaultLoadingWeight.EMPTY, + DefaultResourceUnitCount.EMPTY, Collections.emptyList()); final Collection pendingRequests = Arrays.asList(pendingRequest1, pendingRequest2); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java index 706c3c8fefe42..7c133da30f359 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TasksBalancedRequestSlotMatchingStrategyTest.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -67,8 +67,8 @@ void testMatchRequestsAndSlotsRiskOfFineGrainedResourcesMatchedToUnknownProfile( pendingRequests, new HashMap<>() { { - put(tmLocation1.getResourceID(), DefaultLoadingWeight.EMPTY); - put(tmLocation2.getResourceID(), new DefaultLoadingWeight(9)); + put(tmLocation1.getResourceID(), DefaultResourceUnitCount.EMPTY); + put(tmLocation2.getResourceID(), new DefaultResourceUnitCount(9)); } }); assertThat(requestSlotMatches).hasSize(2); @@ -108,7 +108,7 @@ private static PendingRequest createRequest(ResourceProfile requestProfile, floa return PendingRequest.createNormalRequest( new SlotRequestId(), requestProfile, - new DefaultLoadingWeight(loading), + new DefaultResourceUnitCount(loading), Collections.emptyList()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java index 6a1028fb26a00..769be2a9ceace 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPool.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotInfo; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -89,7 +89,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { private final Consumer setResourceRequirementsConsumer; - private final Supplier> taskExecutorsLoadingWeightSupplier; + private final Supplier> taskExecutorsLoadingWeightSupplier; TestingDeclarativeSlotPool( Consumer increaseResourceRequirementsByConsumer, @@ -120,7 +120,7 @@ final class TestingDeclarativeSlotPool implements DeclarativeSlotPool { Function containsFreeSlotFunction, LongConsumer releaseIdleSlotsConsumer, Consumer setResourceRequirementsConsumer, - Supplier> taskExecutorsLoadingWeightSupplier) { + Supplier> taskExecutorsLoadingWeightSupplier) { this.increaseResourceRequirementsByConsumer = increaseResourceRequirementsByConsumer; this.decreaseResourceRequirementsByConsumer = decreaseResourceRequirementsByConsumer; this.getResourceRequirementsSupplier = getResourceRequirementsSupplier; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java index 6b5960ee0674e..3148499df13c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingDeclarativeSlotPoolBuilder.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotInfo; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -83,7 +83,7 @@ public class TestingDeclarativeSlotPoolBuilder { Collection> registerSlotsFunction = (slotOffers, ignoredB, ignoredC, ignoredD) -> new ArrayList<>(slotOffers); - private Supplier> taskExecutorsLoadingWeightSupplier = + private Supplier> taskExecutorsLoadingWeightSupplier = HashMap::new; public TestingDeclarativeSlotPoolBuilder setIncreaseResourceRequirementsByConsumer( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java index e2be038d345b6..90f4bfb7156c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AbstractSlotMatchingResolverTest.java @@ -22,8 +22,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; -import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; -import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount; +import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -158,11 +158,11 @@ protected void assertAssignments(Collection assignments) { s.getTargetAs( ExecutionSlotSharingGroup .class) - .getLoading()) + .getResourceUnitCount()) .reduce( - DefaultLoadingWeight.EMPTY, - LoadingWeight::merge) - .getLoading()) + DefaultResourceUnitCount.EMPTY, + ResourceUnitCount::merge) + .getCount()) .isGreaterThanOrEqualTo(9f); }); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeightTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/resourceunit/DefaultResourceUnitCountTest.java similarity index 60% rename from flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeightTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/resourceunit/DefaultResourceUnitCountTest.java index 4d4203dc668ea..57d4fb6df3fed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeightTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/resourceunit/DefaultResourceUnitCountTest.java @@ -16,26 +16,36 @@ * limitations under the License. */ -package org.apache.flink.runtime.scheduler.loading; +package org.apache.flink.runtime.scheduler.resourceunit; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link DefaultLoadingWeight}. */ -class DefaultLoadingWeightTest { +/** Test for {@link DefaultResourceUnitCount}. */ +class DefaultResourceUnitCountTest { @Test void testInvalidLoading() { - assertThatThrownBy(() -> new DefaultLoadingWeight(-1f)) + assertThatThrownBy(() -> new DefaultResourceUnitCount(-1f)) .isInstanceOf(IllegalArgumentException.class); } @Test void testMerge() { - assertThat(new DefaultLoadingWeight(0).merge(null).getLoading()).isZero(); - assertThat(new DefaultLoadingWeight(0).merge(new DefaultLoadingWeight(1.2f)).getLoading()) + assertThat(new DefaultResourceUnitCount(0).merge(null).getCount()).isZero(); + assertThat( + new DefaultResourceUnitCount(0) + .merge(new DefaultResourceUnitCount(1.2f)) + .getCount()) .isEqualTo(1.2f); } + + @Test + void testGetCountAsInt() { + assertThat(new DefaultResourceUnitCount(2.9f).getCountAsInt()).isEqualTo(2); + assertThat(new DefaultResourceUnitCount(3e10f).getCountAsInt()) + .isEqualTo(Integer.MAX_VALUE); + } }