package org.apache.flink.runtime.instance;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/instance/SlotPool.class */
public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
    static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
    private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5);
    private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10);
    private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
    private final JobID jobId;
    private final ProviderAndOwner providerAndOwner;
    private final HashSet<ResourceID> registeredTaskManagers;
    private final AllocatedSlots allocatedSlots;
    private final AvailableSlots availableSlots;
    private final HashMap<AllocationID, PendingRequest> pendingRequests;
    private final HashMap<AllocationID, PendingRequest> waitingForResourceManager;
    private final Time resourceManagerRequestsTimeout;
    private final Time resourceManagerAllocationTimeout;
    private final Clock clock;
    private JobMasterId jobMasterId;
    private ResourceManagerGateway resourceManagerGateway;
    private String jobManagerAddress;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/instance/SlotPool$AllocatedSlots.class */
    public static class AllocatedSlots {
        private final Map<ResourceID, Set<Slot>> allocatedSlotsByTaskManager = new HashMap();
        private final Map<AllocationID, Slot> allocatedSlotsById = new HashMap();

        AllocatedSlots() {
        }

        void add(Slot slot) {
            this.allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slot);
            ResourceID taskManagerID = slot.getTaskManagerID();
            Set<Slot> set = this.allocatedSlotsByTaskManager.get(taskManagerID);
            if (set == null) {
                set = new HashSet();
                this.allocatedSlotsByTaskManager.put(taskManagerID, set);
            }
            set.add(slot);
        }

        Slot get(AllocationID allocationID) {
            return this.allocatedSlotsById.get(allocationID);
        }

        boolean contains(AllocationID allocationID) {
            return this.allocatedSlotsById.containsKey(allocationID);
        }

        boolean remove(Slot slot) {
            return remove(slot.getAllocatedSlot().getSlotAllocationId()) != null;
        }

        Slot remove(AllocationID allocationID) {
            Slot remove = this.allocatedSlotsById.remove(allocationID);
            if (remove == null) {
                return null;
            }
            ResourceID taskManagerID = remove.getTaskManagerID();
            Set<Slot> set = this.allocatedSlotsByTaskManager.get(taskManagerID);
            set.remove(remove);
            if (set.isEmpty()) {
                this.allocatedSlotsByTaskManager.remove(taskManagerID);
            }
            return remove;
        }

        Set<Slot> removeSlotsForTaskManager(ResourceID resourceID) {
            Set<Slot> remove = this.allocatedSlotsByTaskManager.remove(resourceID);
            if (remove == null) {
                return Collections.emptySet();
            }
            Iterator<Slot> it = remove.iterator();
            while (it.hasNext()) {
                this.allocatedSlotsById.remove(it.next().getAllocatedSlot().getSlotAllocationId());
            }
            return remove;
        }

        void clear() {
            this.allocatedSlotsById.clear();
            this.allocatedSlotsByTaskManager.clear();
        }

        @VisibleForTesting
        boolean containResource(ResourceID resourceID) {
            return this.allocatedSlotsByTaskManager.containsKey(resourceID);
        }

        @VisibleForTesting
        int size() {
            return this.allocatedSlotsById.size();
        }

        @VisibleForTesting
        Set<Slot> getSlotsForTaskManager(ResourceID resourceID) {
            return this.allocatedSlotsByTaskManager.containsKey(resourceID) ? this.allocatedSlotsByTaskManager.get(resourceID) : Collections.emptySet();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/instance/SlotPool$AvailableSlots.class */
    public static class AvailableSlots {
        private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = new HashMap<>();
        private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost = new HashMap<>();
        private final HashMap<AllocationID, SlotAndTimestamp> availableSlots = new HashMap<>();

        AvailableSlots() {
        }

        void add(AllocatedSlot allocatedSlot, long j) {
            Preconditions.checkNotNull(allocatedSlot);
            if (this.availableSlots.put(allocatedSlot.getSlotAllocationId(), new SlotAndTimestamp(allocatedSlot, j)) != null) {
                throw new IllegalStateException("slot already contained");
            }
            ResourceID resourceID = allocatedSlot.getTaskManagerLocation().getResourceID();
            String fQDNHostname = allocatedSlot.getTaskManagerLocation().getFQDNHostname();
            Set<AllocatedSlot> set = this.availableSlotsByTaskManager.get(resourceID);
            if (set == null) {
                set = new HashSet();
                this.availableSlotsByTaskManager.put(resourceID, set);
            }
            set.add(allocatedSlot);
            Set<AllocatedSlot> set2 = this.availableSlotsByHost.get(fQDNHostname);
            if (set2 == null) {
                set2 = new HashSet();
                this.availableSlotsByHost.put(fQDNHostname, set2);
            }
            set2.add(allocatedSlot);
        }

        boolean contains(AllocationID allocationID) {
            return this.availableSlots.containsKey(allocationID);
        }

        SlotAndLocality poll(ResourceProfile resourceProfile, Iterable<TaskManagerLocation> iterable) {
            if (this.availableSlots.isEmpty()) {
                return null;
            }
            boolean z = false;
            if (iterable != null) {
                Iterator<TaskManagerLocation> it = iterable.iterator();
                while (it.hasNext()) {
                    z = true;
                    Set<AllocatedSlot> set = this.availableSlotsByTaskManager.get(it.next().getResourceID());
                    if (set != null) {
                        for (AllocatedSlot allocatedSlot : set) {
                            if (allocatedSlot.getResourceProfile().isMatching(resourceProfile)) {
                                remove(allocatedSlot.getSlotAllocationId());
                                return new SlotAndLocality(allocatedSlot, Locality.LOCAL);
                            }
                        }
                    }
                }
                Iterator<TaskManagerLocation> it2 = iterable.iterator();
                while (it2.hasNext()) {
                    Set<AllocatedSlot> set2 = this.availableSlotsByHost.get(it2.next().getFQDNHostname());
                    if (set2 != null) {
                        for (AllocatedSlot allocatedSlot2 : set2) {
                            if (allocatedSlot2.getResourceProfile().isMatching(resourceProfile)) {
                                remove(allocatedSlot2.getSlotAllocationId());
                                return new SlotAndLocality(allocatedSlot2, Locality.HOST_LOCAL);
                            }
                        }
                    }
                }
            }
            Iterator<SlotAndTimestamp> it3 = this.availableSlots.values().iterator();
            while (it3.hasNext()) {
                AllocatedSlot slot = it3.next().slot();
                if (slot.getResourceProfile().isMatching(resourceProfile)) {
                    remove(slot.getSlotAllocationId());
                    return new SlotAndLocality(slot, z ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
                }
            }
            return null;
        }

        void removeAllForTaskManager(ResourceID resourceID) {
            Set<AllocatedSlot> remove = this.availableSlotsByTaskManager.remove(resourceID);
            if (remove == null || remove.size() <= 0) {
                return;
            }
            String fQDNHostname = remove.iterator().next().getTaskManagerLocation().getFQDNHostname();
            Set<AllocatedSlot> set = this.availableSlotsByHost.get(fQDNHostname);
            for (AllocatedSlot allocatedSlot : remove) {
                this.availableSlots.remove(allocatedSlot.getSlotAllocationId());
                set.remove(allocatedSlot);
            }
            if (set.isEmpty()) {
                this.availableSlotsByHost.remove(fQDNHostname);
            }
        }

        boolean tryRemove(AllocationID allocationID) {
            SlotAndTimestamp remove = this.availableSlots.remove(allocationID);
            if (remove == null) {
                return false;
            }
            AllocatedSlot slot = remove.slot();
            ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
            String fQDNHostname = slot.getTaskManagerLocation().getFQDNHostname();
            Set<AllocatedSlot> set = this.availableSlotsByTaskManager.get(resourceID);
            Set<AllocatedSlot> set2 = this.availableSlotsByHost.get(fQDNHostname);
            set.remove(slot);
            set2.remove(slot);
            if (set.isEmpty()) {
                this.availableSlotsByTaskManager.remove(resourceID);
            }
            if (!set2.isEmpty()) {
                return true;
            }
            this.availableSlotsByHost.remove(fQDNHostname);
            return true;
        }

        private void remove(AllocationID allocationID) throws IllegalStateException {
            if (!tryRemove(allocationID)) {
                throw new IllegalStateException("slot not contained");
            }
        }

        @VisibleForTesting
        boolean containsTaskManager(ResourceID resourceID) {
            return this.availableSlotsByTaskManager.containsKey(resourceID);
        }

        @VisibleForTesting
        int size() {
            return this.availableSlots.size();
        }

        @VisibleForTesting
        void clear() {
            this.availableSlots.clear();
            this.availableSlotsByTaskManager.clear();
            this.availableSlotsByHost.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/instance/SlotPool$PendingRequest.class */
    public static class PendingRequest {
        private final AllocationID allocationID;
        private final CompletableFuture<SimpleSlot> future;
        private final ResourceProfile resourceProfile;

        PendingRequest(AllocationID allocationID, CompletableFuture<SimpleSlot> completableFuture, ResourceProfile resourceProfile) {
            this.allocationID = allocationID;
            this.future = completableFuture;
            this.resourceProfile = resourceProfile;
        }

        public AllocationID allocationID() {
            return this.allocationID;
        }

        public CompletableFuture<SimpleSlot> getFuture() {
            return this.future;
        }

        public ResourceProfile resourceProfile() {
            return this.resourceProfile;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/instance/SlotPool$ProviderAndOwner.class */
    public static class ProviderAndOwner implements SlotOwner, SlotProvider {
        private final SlotPoolGateway gateway;
        private final Time timeout;

        ProviderAndOwner(SlotPoolGateway slotPoolGateway, Time time) {
            this.gateway = slotPoolGateway;
            this.timeout = time;
        }

        @Override // org.apache.flink.runtime.jobmanager.slots.SlotOwner
        public boolean returnAllocatedSlot(Slot slot) {
            this.gateway.returnAllocatedSlot(slot);
            return true;
        }

        @Override // org.apache.flink.runtime.instance.SlotProvider
        public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit scheduledUnit, boolean z, Collection<TaskManagerLocation> collection) {
            return this.gateway.allocateSlot(scheduledUnit, ResourceProfile.UNKNOWN, collection, this.timeout);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/instance/SlotPool$SlotAndTimestamp.class */
    public static class SlotAndTimestamp {
        private final AllocatedSlot slot;
        private final long timestamp;

        SlotAndTimestamp(AllocatedSlot allocatedSlot, long j) {
            this.slot = allocatedSlot;
            this.timestamp = j;
        }

        public AllocatedSlot slot() {
            return this.slot;
        }

        public long timestamp() {
            return this.timestamp;
        }

        public String toString() {
            return this.slot + " @ " + this.timestamp;
        }
    }

    public SlotPool(RpcService rpcService, JobID jobID) {
        this(rpcService, jobID, SystemClock.getInstance(), DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
    }

    public SlotPool(RpcService rpcService, JobID jobID, Clock clock, Time time, Time time2, Time time3) {
        super(rpcService);
        this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        this.clock = (Clock) Preconditions.checkNotNull(clock);
        this.resourceManagerRequestsTimeout = (Time) Preconditions.checkNotNull(time3);
        this.resourceManagerAllocationTimeout = (Time) Preconditions.checkNotNull(time2);
        this.registeredTaskManagers = new HashSet<>();
        this.allocatedSlots = new AllocatedSlots();
        this.availableSlots = new AvailableSlots();
        this.pendingRequests = new HashMap<>();
        this.waitingForResourceManager = new HashMap<>();
        this.providerAndOwner = new ProviderAndOwner((SlotPoolGateway) getSelfGateway(SlotPoolGateway.class), time);
        this.jobMasterId = null;
        this.resourceManagerGateway = null;
        this.jobManagerAddress = null;
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void start() {
        throw new UnsupportedOperationException("Should never call start() without leader ID");
    }

    public void start(JobMasterId jobMasterId, String str) throws Exception {
        this.jobMasterId = (JobMasterId) Preconditions.checkNotNull(jobMasterId);
        this.jobManagerAddress = (String) Preconditions.checkNotNull(str);
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException("This should never happen", e);
        }
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public void suspend() {
        validateRunsInMainThread();
        stop();
        this.jobMasterId = null;
        this.resourceManagerGateway = null;
        this.availableSlots.clear();
        this.allocatedSlots.clear();
        this.pendingRequests.clear();
    }

    public SlotOwner getSlotOwner() {
        return this.providerAndOwner;
    }

    public SlotProvider getSlotProvider() {
        return this.providerAndOwner;
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {
        this.resourceManagerGateway = (ResourceManagerGateway) Preconditions.checkNotNull(resourceManagerGateway);
        for (PendingRequest pendingRequest : this.waitingForResourceManager.values()) {
            requestSlotFromResourceManager(pendingRequest.allocationID(), pendingRequest.getFuture(), pendingRequest.resourceProfile());
        }
        this.waitingForResourceManager.clear();
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public void disconnectResourceManager() {
        this.resourceManagerGateway = null;
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit scheduledUnit, ResourceProfile resourceProfile, Iterable<TaskManagerLocation> iterable, Time time) {
        return internalAllocateSlot(scheduledUnit, resourceProfile, iterable);
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public void returnAllocatedSlot(Slot slot) {
        internalReturnAllocatedSlot(slot);
    }

    CompletableFuture<SimpleSlot> internalAllocateSlot(ScheduledUnit scheduledUnit, ResourceProfile resourceProfile, Iterable<TaskManagerLocation> iterable) {
        SlotAndLocality poll = this.availableSlots.poll(resourceProfile, iterable);
        if (poll != null) {
            SimpleSlot createSimpleSlot = createSimpleSlot(poll.slot(), poll.locality());
            this.allocatedSlots.add(createSimpleSlot);
            return CompletableFuture.completedFuture(createSimpleSlot);
        }
        AllocationID allocationID = new AllocationID();
        CompletableFuture<SimpleSlot> completableFuture = new CompletableFuture<>();
        if (this.resourceManagerGateway == null) {
            stashRequestWaitingForResourceManager(allocationID, resourceProfile, completableFuture);
        } else {
            requestSlotFromResourceManager(allocationID, completableFuture, resourceProfile);
        }
        return completableFuture;
    }

    private void requestSlotFromResourceManager(AllocationID allocationID, CompletableFuture<SimpleSlot> completableFuture, ResourceProfile resourceProfile) {
        LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resourceProfile, allocationID);
        this.pendingRequests.put(allocationID, new PendingRequest(allocationID, completableFuture, resourceProfile));
        this.resourceManagerGateway.requestSlot(this.jobMasterId, new SlotRequest(this.jobId, allocationID, resourceProfile, this.jobManagerAddress), this.resourceManagerRequestsTimeout).thenAcceptAsync(acknowledge -> {
            slotRequestToResourceManagerSuccess(allocationID);
        }, (Executor) getMainThreadExecutor()).whenCompleteAsync((r6, th) -> {
            if (th != null) {
                slotRequestToResourceManagerFailed(allocationID, th);
            }
        }, (Executor) getMainThreadExecutor());
    }

    private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) {
        scheduleRunAsync(new Runnable() { // from class: org.apache.flink.runtime.instance.SlotPool.1
            @Override // java.lang.Runnable
            public void run() {
                SlotPool.this.checkTimeoutSlotAllocation(allocationID);
            }
        }, this.resourceManagerAllocationTimeout);
    }

    private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throwable th) {
        PendingRequest remove = this.pendingRequests.remove(allocationID);
        if (remove != null) {
            remove.getFuture().completeExceptionally(new NoResourceAvailableException("No pooled slot available and request to ResourceManager for new slot failed", th));
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Unregistered slot request {} failed.", allocationID, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkTimeoutSlotAllocation(AllocationID allocationID) {
        PendingRequest remove = this.pendingRequests.remove(allocationID);
        if (remove == null || remove.getFuture().isDone()) {
            return;
        }
        remove.getFuture().completeExceptionally(new TimeoutException("Slot allocation request timed out"));
    }

    private void stashRequestWaitingForResourceManager(final AllocationID allocationID, ResourceProfile resourceProfile, CompletableFuture<SimpleSlot> completableFuture) {
        LOG.info("Cannot serve slot request, no ResourceManager connected. Adding as pending request {}", allocationID);
        this.waitingForResourceManager.put(allocationID, new PendingRequest(allocationID, completableFuture, resourceProfile));
        scheduleRunAsync(new Runnable() { // from class: org.apache.flink.runtime.instance.SlotPool.2
            @Override // java.lang.Runnable
            public void run() {
                SlotPool.this.checkTimeoutRequestWaitingForResourceManager(allocationID);
            }
        }, this.resourceManagerRequestsTimeout);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) {
        PendingRequest remove = this.waitingForResourceManager.remove(allocationID);
        if (remove == null || remove.getFuture().isDone()) {
            return;
        }
        remove.getFuture().completeExceptionally(new NoResourceAvailableException("No slot available and no connection to Resource Manager established."));
    }

    private void internalReturnAllocatedSlot(Slot slot) {
        Preconditions.checkNotNull(slot);
        Preconditions.checkArgument(!slot.isAlive(), "slot is still alive");
        Preconditions.checkArgument(slot.getOwner() == this.providerAndOwner, "slot belongs to the wrong pool.");
        if (slot.markReleased()) {
            if (!this.allocatedSlots.remove(slot)) {
                LOG.debug("Returned slot's allocation has been failed. Dropping slot.");
                return;
            }
            AllocatedSlot allocatedSlot = slot.getAllocatedSlot();
            PendingRequest pollMatchingPendingRequest = pollMatchingPendingRequest(allocatedSlot);
            if (pollMatchingPendingRequest == null) {
                LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getSlotAllocationId());
                this.availableSlots.add(allocatedSlot, this.clock.relativeTimeMillis());
            } else {
                LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", pollMatchingPendingRequest.allocationID(), allocatedSlot.getSlotAllocationId());
                SimpleSlot createSimpleSlot = createSimpleSlot(allocatedSlot, Locality.UNKNOWN);
                this.allocatedSlots.add(createSimpleSlot);
                pollMatchingPendingRequest.getFuture().complete(createSimpleSlot);
            }
        }
    }

    private PendingRequest pollMatchingPendingRequest(AllocatedSlot allocatedSlot) {
        ResourceProfile resourceProfile = allocatedSlot.getResourceProfile();
        for (PendingRequest pendingRequest : this.pendingRequests.values()) {
            if (resourceProfile.isMatching(pendingRequest.resourceProfile())) {
                this.pendingRequests.remove(pendingRequest.allocationID());
                return pendingRequest;
            }
        }
        for (PendingRequest pendingRequest2 : this.waitingForResourceManager.values()) {
            if (resourceProfile.isMatching(pendingRequest2.resourceProfile())) {
                this.waitingForResourceManager.remove(pendingRequest2.allocationID());
                return pendingRequest2;
            }
        }
        return null;
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> collection) {
        validateRunsInMainThread();
        return FutureUtils.combineAll((List) collection.stream().map(tuple2 -> {
            return offerSlot((AllocatedSlot) tuple2.f0).thenApply(bool -> {
                return bool.booleanValue() ? Optional.of(tuple2.f1) : Optional.empty();
            });
        }).collect(Collectors.toList())).thenApply(collection2 -> {
            return (Collection) collection2.stream().flatMap(optional -> {
                return (Stream) optional.map((v0) -> {
                    return Stream.of(v0);
                }).orElseGet(Stream::empty);
            }).collect(Collectors.toList());
        });
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public CompletableFuture<Boolean> offerSlot(AllocatedSlot allocatedSlot) {
        validateRunsInMainThread();
        ResourceID taskManagerId = allocatedSlot.getTaskManagerId();
        AllocationID slotAllocationId = allocatedSlot.getSlotAllocationId();
        if (!this.registeredTaskManagers.contains(taskManagerId)) {
            LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", allocatedSlot.getSlotAllocationId(), allocatedSlot);
            return CompletableFuture.completedFuture(false);
        }
        if (this.allocatedSlots.contains(slotAllocationId) || this.availableSlots.contains(slotAllocationId)) {
            LOG.debug("Received repeated offer for slot [{}]. Ignoring.", slotAllocationId);
            return CompletableFuture.completedFuture(true);
        }
        PendingRequest remove = this.pendingRequests.remove(slotAllocationId);
        if (remove != null) {
            SimpleSlot createSimpleSlot = createSimpleSlot(allocatedSlot, Locality.UNKNOWN);
            remove.getFuture().complete(createSimpleSlot);
            this.allocatedSlots.add(createSimpleSlot);
        } else {
            this.availableSlots.add(allocatedSlot, this.clock.relativeTimeMillis());
        }
        return CompletableFuture.completedFuture(true);
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public void failAllocation(AllocationID allocationID, Exception exc) {
        PendingRequest remove = this.pendingRequests.remove(allocationID);
        if (remove != null) {
            LOG.debug("Failed pending request [{}] with ", allocationID, exc);
            remove.getFuture().completeExceptionally(exc);
        } else {
            if (this.availableSlots.tryRemove(allocationID)) {
                LOG.debug("Failed available slot [{}] with ", allocationID, exc);
                return;
            }
            Slot remove2 = this.allocatedSlots.remove(allocationID);
            if (remove2 != null) {
                remove2.releaseSlot();
            } else {
                LOG.debug("Outdated request to fail slot [{}] with ", allocationID, exc);
            }
        }
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public void registerTaskManager(ResourceID resourceID) {
        this.registeredTaskManagers.add(resourceID);
    }

    @Override // org.apache.flink.runtime.instance.SlotPoolGateway
    public CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID) {
        if (this.registeredTaskManagers.remove(resourceID)) {
            this.availableSlots.removeAllForTaskManager(resourceID);
            Iterator<Slot> it = this.allocatedSlots.removeSlotsForTaskManager(resourceID).iterator();
            while (it.hasNext()) {
                it.next().releaseSlot();
            }
        }
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    private SimpleSlot createSimpleSlot(AllocatedSlot allocatedSlot, Locality locality) {
        SimpleSlot simpleSlot = new SimpleSlot(allocatedSlot, this.providerAndOwner, allocatedSlot.getSlotNumber());
        if (locality != null) {
            simpleSlot.setLocality(locality);
        }
        return simpleSlot;
    }

    @VisibleForTesting
    AllocatedSlots getAllocatedSlots() {
        return this.allocatedSlots;
    }
}
