Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
Expand Down Expand Up @@ -66,13 +66,13 @@ public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implem
private static final class FulfilledAllocation {
final AllocationID allocationID;
final ResourceID taskExecutorID;
final LoadingWeight loadingWeight;
final ResourceUnitCount resourceUnitCount;

FulfilledAllocation(PhysicalSlot slot, LoadingWeight loadingWeight) {
FulfilledAllocation(PhysicalSlot slot, ResourceUnitCount resourceUnitCount) {
this.allocationID = Preconditions.checkNotNull(slot.getAllocationId());
this.taskExecutorID =
Preconditions.checkNotNull(slot.getTaskManagerLocation().getResourceID());
this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
this.resourceUnitCount = Preconditions.checkNotNull(resourceUnitCount);
}

@Override
Expand All @@ -83,7 +83,7 @@ public boolean equals(Object o) {
FulfilledAllocation that = (FulfilledAllocation) o;
return Objects.equals(allocationID, that.allocationID)
&& Objects.equals(taskExecutorID, that.taskExecutorID)
&& Objects.equals(loadingWeight, that.loadingWeight);
&& Objects.equals(resourceUnitCount, that.resourceUnitCount);
}
}

Expand Down Expand Up @@ -261,16 +261,16 @@ void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
}
}

private Map<ResourceID, LoadingWeight> getTaskExecutorsLoadingView() {
final Map<ResourceID, LoadingWeight> result = new HashMap<>();
private Map<ResourceID, ResourceUnitCount> getTaskExecutorsLoadingView() {
final Map<ResourceID, ResourceUnitCount> result = new HashMap<>();
Collection<FulfilledAllocation> fulfilledAllocations = fulfilledRequests.values();
for (FulfilledAllocation allocation : fulfilledAllocations) {
result.compute(
allocation.taskExecutorID,
(ignoredID, oldLoading) ->
Objects.isNull(oldLoading)
? allocation.loadingWeight
: oldLoading.merge(allocation.loadingWeight));
? allocation.resourceUnitCount
: oldLoading.merge(allocation.resourceUnitCount));
}
return result;
}
Expand Down Expand Up @@ -353,7 +353,8 @@ private PhysicalSlot reserveFreeSlot(AllocationID allocationId, PendingRequest p
getDeclarativeSlotPool()
.reserveFreeSlot(allocationId, pendingRequest.getResourceProfile());
fulfilledRequests.put(
slotRequestId, new FulfilledAllocation(slot, pendingRequest.getLoading()));
slotRequestId,
new FulfilledAllocation(slot, pendingRequest.getResourceUnitCount()));
return slot;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
import org.apache.flink.runtime.scheduler.resourceunit.HasResourceUnit;
import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
Expand All @@ -32,13 +32,13 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

public final class PendingRequest implements WeightLoadable {
public final class PendingRequest implements HasResourceUnit {

private final SlotRequestId slotRequestId;

private final ResourceProfile resourceProfile;

private final LoadingWeight loadingWeight;
private final ResourceUnitCount resourceUnitCount;

private final HashSet<AllocationID> preferredAllocations;

Expand All @@ -51,12 +51,12 @@ public final class PendingRequest implements WeightLoadable {
private PendingRequest(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
LoadingWeight loadingWeight,
ResourceUnitCount resourceUnitCount,
Collection<AllocationID> preferredAllocations,
boolean isBatchRequest) {
this.slotRequestId = slotRequestId;
this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
this.loadingWeight = Preconditions.checkNotNull(loadingWeight);
this.resourceUnitCount = Preconditions.checkNotNull(resourceUnitCount);
this.preferredAllocations = new HashSet<>(preferredAllocations);
this.isBatchRequest = isBatchRequest;
this.slotFuture = new CompletableFuture<>();
Expand All @@ -66,19 +66,19 @@ private PendingRequest(
static PendingRequest createBatchRequest(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
LoadingWeight loadingWeight,
ResourceUnitCount resourceUnitCount,
Collection<AllocationID> preferredAllocations) {
return new PendingRequest(
slotRequestId, resourceProfile, loadingWeight, preferredAllocations, true);
slotRequestId, resourceProfile, resourceUnitCount, preferredAllocations, true);
}

public static PendingRequest createNormalRequest(
SlotRequestId slotRequestId,
ResourceProfile resourceProfile,
LoadingWeight loadingWeight,
ResourceUnitCount resourceUnitCount,
Collection<AllocationID> preferredAllocations) {
return new PendingRequest(
slotRequestId, resourceProfile, loadingWeight, preferredAllocations, false);
slotRequestId, resourceProfile, resourceUnitCount, preferredAllocations, false);
}

SlotRequestId getSlotRequestId() {
Expand Down Expand Up @@ -135,7 +135,7 @@ public String toString() {
+ ", resourceProfile="
+ resourceProfile
+ ", loadingWeight="
+ loadingWeight
+ resourceUnitCount
+ ", preferredAllocations="
+ preferredAllocations
+ ", isBatchRequest="
Expand All @@ -146,7 +146,7 @@ public String toString() {
}

@Override
public @Nonnull LoadingWeight getLoading() {
return loadingWeight;
public @Nonnull ResourceUnitCount getResourceUnitCount() {
return resourceUnitCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,31 @@
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
import org.apache.flink.runtime.scheduler.resourceunit.HasResourceUnit;
import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount;

import javax.annotation.Nonnull;

/** Represents a request for a physical slot. */
public class PhysicalSlotRequest implements WeightLoadable {
public class PhysicalSlotRequest implements HasResourceUnit {

private final SlotRequestId slotRequestId;

private final SlotProfile slotProfile;

private final LoadingWeight loadingWeight;
private final ResourceUnitCount resourceUnitCount;

private final boolean slotWillBeOccupiedIndefinitely;

public PhysicalSlotRequest(
final SlotRequestId slotRequestId,
final SlotProfile slotProfile,
final LoadingWeight loadingWeight,
final ResourceUnitCount resourceUnitCount,
final boolean slotWillBeOccupiedIndefinitely) {

this.slotRequestId = slotRequestId;
this.slotProfile = slotProfile;
this.loadingWeight = loadingWeight;
this.resourceUnitCount = resourceUnitCount;
this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
}

Expand All @@ -70,18 +70,18 @@ public PendingRequest toPendingRequest() {
? PendingRequest.createNormalRequest(
slotRequestId,
slotProfile.getPhysicalSlotResourceProfile(),
loadingWeight,
resourceUnitCount,
slotProfile.getPreferredAllocations())
: PendingRequest.createBatchRequest(
slotRequestId,
slotProfile.getPhysicalSlotResourceProfile(),
loadingWeight,
resourceUnitCount,
slotProfile.getPreferredAllocations());
}

@Override
public @Nonnull LoadingWeight getLoading() {
return loadingWeight;
public @Nonnull ResourceUnitCount getResourceUnitCount() {
return resourceUnitCount;
}

/** Result of a {@link PhysicalSlotRequest}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -58,7 +58,7 @@ public static RequestSlotMatchingStrategy create(RequestSlotMatchingStrategy rol
public Collection<RequestSlotMatch> matchRequestsAndSlots(
Collection<? extends PhysicalSlot> slots,
Collection<PendingRequest> pendingRequests,
Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight) {
Map<ResourceID, ResourceUnitCount> taskExecutorsLoadingWeight) {
final Collection<RequestSlotMatch> requestSlotMatches = new ArrayList<>();

final Map<AllocationID, PhysicalSlot> freeSlots =
Expand Down Expand Up @@ -97,7 +97,7 @@ public Collection<RequestSlotMatch> matchRequestsAndSlots(
.getPreferredAllocations()
.contains(freeSlot.getAllocationId())) {
requestSlotMatches.add(RequestSlotMatch.createFor(pendingRequest, freeSlot));
LoadingWeight deltaLoading = pendingRequest.getLoading();
ResourceUnitCount deltaLoading = pendingRequest.getResourceUnitCount();
taskExecutorsLoadingWeight.compute(
freeSlot.getTaskManagerLocation().getResourceID(),
(ignoredId, oldLoad) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount;

import java.util.Collection;
import java.util.Map;
Expand All @@ -38,7 +38,7 @@ public interface RequestSlotMatchingStrategy {
Collection<RequestSlotMatch> matchRequestsAndSlots(
Collection<? extends PhysicalSlot> slots,
Collection<PendingRequest> pendingRequests,
Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight);
Map<ResourceID, ResourceUnitCount> taskExecutorsLoadingWeight);

/** Result class representing matches. */
final class RequestSlotMatch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobmaster.slotpool;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -38,7 +38,7 @@ public enum SimpleRequestSlotMatchingStrategy implements RequestSlotMatchingStra
public Collection<RequestSlotMatch> matchRequestsAndSlots(
Collection<? extends PhysicalSlot> slots,
Collection<PendingRequest> pendingRequests,
Map<ResourceID, LoadingWeight> taskExecutorsLoadingWeight) {
Map<ResourceID, ResourceUnitCount> taskExecutorsLoadingWeight) {
final Collection<RequestSlotMatch> resultingMatches = new ArrayList<>();

// if pendingRequests has a special order, then let's preserve it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
import org.apache.flink.runtime.scheduler.resourceunit.DefaultResourceUnitCount;
import org.apache.flink.runtime.scheduler.resourceunit.ResourceUnitCount;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.heap.AbstractHeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueue;
Expand All @@ -42,7 +42,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.runtime.scheduler.loading.WeightLoadable.sortByLoadingDescend;
import static org.apache.flink.runtime.scheduler.resourceunit.HasResourceUnit.sortDesc;

/**
* The tasks balanced based implementation of {@link RequestSlotMatchingStrategy} that matches the
Expand All @@ -57,20 +57,20 @@ public enum TasksBalancedRequestSlotMatchingStrategy implements RequestSlotMatch
/** The {@link PhysicalSlotElement} comparator to compare loading. */
static final class PhysicalSlotElementComparator implements Comparator<PhysicalSlotElement> {

private final Map<ResourceID, LoadingWeight> taskExecutorsLoading;
private final Map<ResourceID, ResourceUnitCount> taskExecutorsLoading;

PhysicalSlotElementComparator(Map<ResourceID, LoadingWeight> taskExecutorsLoading) {
PhysicalSlotElementComparator(Map<ResourceID, ResourceUnitCount> taskExecutorsLoading) {
this.taskExecutorsLoading = Preconditions.checkNotNull(taskExecutorsLoading);
}

@Override
public int compare(PhysicalSlotElement left, PhysicalSlotElement right) {
final LoadingWeight leftLoad =
final ResourceUnitCount leftLoad =
taskExecutorsLoading.getOrDefault(
left.getResourceID(), DefaultLoadingWeight.EMPTY);
final LoadingWeight rightLoad =
left.getResourceID(), DefaultResourceUnitCount.EMPTY);
final ResourceUnitCount rightLoad =
taskExecutorsLoading.getOrDefault(
right.getResourceID(), DefaultLoadingWeight.EMPTY);
right.getResourceID(), DefaultResourceUnitCount.EMPTY);
return leftLoad.compareTo(rightLoad);
}
}
Expand Down Expand Up @@ -115,7 +115,8 @@ static final class PhysicalSlotElementPriorityComparator

private final PhysicalSlotElementComparator physicalSlotElementComparator;

PhysicalSlotElementPriorityComparator(Map<ResourceID, LoadingWeight> taskExecutorsLoading) {
PhysicalSlotElementPriorityComparator(
Map<ResourceID, ResourceUnitCount> taskExecutorsLoading) {
this.physicalSlotElementComparator =
new PhysicalSlotElementComparator(taskExecutorsLoading);
}
Expand All @@ -130,15 +131,15 @@ public int comparePriority(PhysicalSlotElement left, PhysicalSlotElement right)
public Collection<RequestSlotMatch> matchRequestsAndSlots(
Collection<? extends PhysicalSlot> slots,
Collection<PendingRequest> pendingRequests,
Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
Map<ResourceID, ResourceUnitCount> taskExecutorsLoad) {
ResourceRequestPreMappings resourceRequestPreMappings =
ResourceRequestPreMappings.createFrom(pendingRequests, slots);
if (!resourceRequestPreMappings.isMatchingFulfilled()) {
return Collections.emptyList();
}

final Collection<RequestSlotMatch> resultingMatches = new ArrayList<>();
final List<PendingRequest> sortedRequests = sortByLoadingDescend(pendingRequests);
final List<PendingRequest> sortedRequests = sortDesc(pendingRequests);

logDebugInfo(slots, taskExecutorsLoad, sortedRequests);

Expand Down Expand Up @@ -166,20 +167,20 @@ public Collection<RequestSlotMatch> matchRequestsAndSlots(
}

private static void updateTaskExecutorsLoad(
Map<ResourceID, LoadingWeight> taskExecutorsLoad,
Map<ResourceID, ResourceUnitCount> taskExecutorsLoad,
PendingRequest request,
PhysicalSlotElement slotElement) {
taskExecutorsLoad.compute(
slotElement.getResourceID(),
(ignoredId, oldLoading) ->
Objects.isNull(oldLoading)
? request.getLoading()
: oldLoading.merge(request.getLoading()));
? request.getResourceUnitCount()
: oldLoading.merge(request.getResourceUnitCount()));
}

private static void logDebugInfo(
Collection<? extends PhysicalSlot> slots,
Map<ResourceID, LoadingWeight> taskExecutorsLoad,
Map<ResourceID, ResourceUnitCount> taskExecutorsLoad,
List<PendingRequest> sortedRequests) {
LOG.debug(
"Available slots: {}, sortedRequests: {}, taskExecutorsLoad: {}",
Expand All @@ -198,7 +199,7 @@ private Map<ResourceID, Set<PhysicalSlotElement>> groupSlotsByTaskExecutor(

private Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> getSlotCandidatesByProfile(
Collection<PhysicalSlotElement> slotElements,
Map<ResourceID, LoadingWeight> taskExecutorsLoad) {
Map<ResourceID, ResourceUnitCount> taskExecutorsLoad) {
final Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> result = new HashMap<>();
final PhysicalSlotElementPriorityComparator physicalSlotElementPriorityComparator =
new PhysicalSlotElementPriorityComparator(taskExecutorsLoad);
Expand All @@ -221,7 +222,7 @@ private Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> getSlotCand
private Optional<PhysicalSlotElement> tryMatchPhysicalSlot(
PendingRequest request,
Map<ResourceProfile, HeapPriorityQueue<PhysicalSlotElement>> profileToSlotMap,
Map<ResourceID, LoadingWeight> taskExecutorsLoad,
Map<ResourceID, ResourceUnitCount> taskExecutorsLoad,
ResourceRequestPreMappings resourceRequestPreMappings) {
final ResourceProfile requestProfile = request.getResourceProfile();

Expand Down
Loading