/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.scheduler;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpportunisticContainerAllocator {
    private static final int NODE_LOCAL_LOOP = 0;
    private static final int RACK_LOCAL_LOOP = 1;
    private static final int OFF_SWITCH_LOOP = 2;
    private static final Logger LOG = LoggerFactory.getLogger(OpportunisticContainerAllocator.class);
    private static final ResourceCalculator RESOURCE_CALCULATOR = new DominantResourceCalculator();
    private final BaseContainerTokenSecretManager tokenSecretManager;

    public OpportunisticContainerAllocator(BaseContainerTokenSecretManager tokenSecretManager) {
        this.tokenSecretManager = tokenSecretManager;
    }

    public List<Container> allocateContainers(ResourceBlacklistRequest blackList, List<ResourceRequest> oppResourceReqs, ApplicationAttemptId applicationAttemptId, OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException {
        if (blackList != null) {
            opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
            opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
        }
        opportContext.addToOutstandingReqs(oppResourceReqs);
        HashSet<String> nodeBlackList = new HashSet<String>(opportContext.getBlacklist());
        ArrayList<Container> allocatedContainers = new ArrayList<Container>();
        boolean continueLoop = true;
        while (continueLoop) {
            continueLoop = false;
            ArrayList<Map<Resource, List<Allocation>>> allocations = new ArrayList<Map<Resource, List<Allocation>>>();
            for (SchedulerRequestKey schedulerRequestKey : opportContext.getOutstandingOpReqs().descendingKeySet()) {
                Map<Resource, List<Allocation>> allocation = this.allocate(rmIdentifier, opportContext, schedulerRequestKey, applicationAttemptId, appSubmitter, nodeBlackList);
                if (allocation.size() <= 0) continue;
                allocations.add(allocation);
                continueLoop = true;
            }
            for (Map map : allocations) {
                for (Map.Entry e : map.entrySet()) {
                    opportContext.matchAllocationToOutstandingRequest((Resource)e.getKey(), (List)e.getValue());
                    for (Allocation alloc : (List)e.getValue()) {
                        allocatedContainers.add(alloc.getContainer());
                    }
                }
            }
        }
        return allocatedContainers;
    }

    private Map<Resource, List<Allocation>> allocate(long rmIdentifier, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, ApplicationAttemptId appAttId, String userName, Set<String> blackList) throws YarnException {
        HashMap<Resource, List<Allocation>> containers = new HashMap<Resource, List<Allocation>>();
        for (EnrichedResourceRequest enrichedAsk : appContext.getOutstandingOpReqs().get(schedKey).values()) {
            this.allocateContainersInternal(rmIdentifier, appContext.getAppParams(), appContext.getContainerIdGenerator(), blackList, appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk);
            ResourceRequest anyAsk = enrichedAsk.getRequest();
            if (containers.isEmpty()) continue;
            LOG.info("Opportunistic allocation requested for [priority={}, allocationRequestId={}, num_containers={}, capability={}] allocated = {}", new Object[]{anyAsk.getPriority(), anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), anyAsk.getCapability(), containers.keySet()});
        }
        return containers;
    }

    private void allocateContainersInternal(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, Set<String> blacklist, ApplicationAttemptId id, Map<String, RemoteNode> allNodes, String userName, Map<Resource, List<Allocation>> allocations, EnrichedResourceRequest enrichedAsk) throws YarnException {
        if (allNodes.size() == 0) {
            LOG.info("No nodes currently available to allocate OPPORTUNISTIC containers.");
            return;
        }
        ResourceRequest anyAsk = enrichedAsk.getRequest();
        int toAllocate = anyAsk.getNumContainers() - (allocations.isEmpty() ? 0 : allocations.get(anyAsk.getCapability()).size());
        toAllocate = Math.min(toAllocate, appParams.getMaxAllocationsPerSchedulerKeyPerRound());
        int numAllocated = 0;
        int loopIndex = 2;
        if (enrichedAsk.getNodeLocations().size() > 0) {
            loopIndex = 0;
        }
        while (numAllocated < toAllocate) {
            Collection<RemoteNode> nodeCandidates = this.findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk);
            for (RemoteNode rNode : nodeCandidates) {
                String rNodeHost = rNode.getNodeId().getHost();
                if (blacklist.contains(rNodeHost)) {
                    LOG.info("Nodes for scheduling has a blacklisted node [" + rNodeHost + "]..");
                    continue;
                }
                String location = "*";
                if (loopIndex == 0) {
                    if (!enrichedAsk.getNodeLocations().contains(rNodeHost)) continue;
                    location = rNodeHost;
                }
                if (loopIndex == 1) {
                    if (!enrichedAsk.getRackLocations().contains(rNode.getRackName())) continue;
                    location = rNode.getRackName();
                }
                Container container = this.createContainer(rmIdentifier, appParams, idCounter, id, userName, allocations, location, anyAsk, rNode);
                ++numAllocated;
                if (loopIndex != 0) {
                    blacklist.add(rNode.getNodeId().getHost());
                }
                LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + "location [" + location + "]");
                if (numAllocated < toAllocate) continue;
                break;
            }
            loopIndex = loopIndex == 0 && enrichedAsk.getRackLocations().size() > 0 ? 1 : ++loopIndex;
            if (loopIndex <= 2 || numAllocated != 0) continue;
            LOG.warn("Unable to allocate any opportunistic containers.");
            break;
        }
    }

    private Collection<RemoteNode> findNodeCandidates(int loopIndex, Map<String, RemoteNode> allNodes, Set<String> blackList, EnrichedResourceRequest enrichedRR) {
        if (loopIndex > 1) {
            return allNodes.values();
        }
        LinkedList<RemoteNode> retList = new LinkedList<RemoteNode>();
        int numContainers = enrichedRR.getRequest().getNumContainers();
        while (numContainers > 0 && (numContainers = loopIndex == 0 ? this.collectNodeLocalCandidates(allNodes, enrichedRR, retList, numContainers) : this.collectRackLocalCandidates(allNodes, enrichedRR, retList, blackList, numContainers)) != enrichedRR.getRequest().getNumContainers()) {
        }
        return retList;
    }

    private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes, EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList, Set<String> blackList, int numContainers) {
        for (RemoteNode rNode : allNodes.values()) {
            if (enrichedRR.getRackLocations().contains(rNode.getRackName())) {
                if (blackList.contains(rNode.getNodeId().getHost())) {
                    retList.addLast(rNode);
                } else {
                    retList.addFirst(rNode);
                    --numContainers;
                }
            }
            if (numContainers != 0) continue;
            break;
        }
        return numContainers;
    }

    private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes, EnrichedResourceRequest enrichedRR, List<RemoteNode> retList, int numContainers) {
        for (String nodeName : enrichedRR.getNodeLocations()) {
            RemoteNode remoteNode = allNodes.get(nodeName);
            if (remoteNode != null) {
                retList.add(remoteNode);
                --numContainers;
            }
            if (numContainers != 0) continue;
            break;
        }
        return numContainers;
    }

    private Container createContainer(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, ApplicationAttemptId id, String userName, Map<Resource, List<Allocation>> allocations, String location, ResourceRequest anyAsk, RemoteNode rNode) throws YarnException {
        Container container = this.buildContainer(rmIdentifier, appParams, idCounter, anyAsk, id, userName, rNode);
        List<Allocation> allocList = allocations.get(anyAsk.getCapability());
        if (allocList == null) {
            allocList = new ArrayList<Allocation>();
            allocations.put(anyAsk.getCapability(), allocList);
        }
        allocList.add(new Allocation(container, location));
        return container;
    }

    private Container buildContainer(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, ResourceRequest rr, ApplicationAttemptId id, String userName, RemoteNode node) throws YarnException {
        ContainerId cId = ContainerId.newContainerId((ApplicationAttemptId)id, (long)idCounter.generateContainerId());
        Resource capability = this.normalizeCapability(appParams, rr);
        return this.createContainer(rmIdentifier, appParams.getContainerTokenExpiryInterval(), SchedulerRequestKey.create(rr), userName, node, cId, capability);
    }

    private Container createContainer(long rmIdentifier, long tokenExpiry, SchedulerRequestKey schedulerKey, String userName, RemoteNode node, ContainerId cId, Resource capability) {
        long currTime = System.currentTimeMillis();
        ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier(cId, 0, node.getNodeId().toString(), userName, capability, currTime + tokenExpiry, this.tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, schedulerKey.getPriority(), currTime, null, "", ContainerType.TASK, ExecutionType.OPPORTUNISTIC, schedulerKey.getAllocationRequestId());
        byte[] pwd = this.tokenSecretManager.createPassword(containerTokenIdentifier);
        Token containerToken = OpportunisticContainerAllocator.newContainerToken(node.getNodeId(), pwd, containerTokenIdentifier);
        Container container = BuilderUtils.newContainer(cId, node.getNodeId(), node.getHttpAddress(), capability, schedulerKey.getPriority(), containerToken, containerTokenIdentifier.getExecutionType(), schedulerKey.getAllocationRequestId());
        return container;
    }

    private Resource normalizeCapability(AllocationParams appParams, ResourceRequest ask) {
        return Resources.normalize((ResourceCalculator)RESOURCE_CALCULATOR, (Resource)ask.getCapability(), (Resource)appParams.minResource, (Resource)appParams.maxResource, (Resource)appParams.incrementResource);
    }

    private static Token newContainerToken(NodeId nodeId, byte[] password, ContainerTokenIdentifier tokenIdentifier) {
        InetSocketAddress addr = NetUtils.createSocketAddrForHost((String)nodeId.getHost(), (int)nodeId.getPort());
        Token containerToken = Token.newInstance((byte[])tokenIdentifier.getBytes(), (String)ContainerTokenIdentifier.KIND.toString(), (byte[])password, (String)SecurityUtil.buildTokenService((InetSocketAddress)addr).toString());
        return containerToken;
    }

    public PartitionedResourceRequests partitionAskList(List<ResourceRequest> askList) {
        PartitionedResourceRequests partitionedRequests = new PartitionedResourceRequests();
        for (ResourceRequest rr : askList) {
            if (rr.getExecutionTypeRequest().getExecutionType() == ExecutionType.OPPORTUNISTIC) {
                partitionedRequests.getOpportunistic().add(rr);
                continue;
            }
            partitionedRequests.getGuaranteed().add(rr);
        }
        return partitionedRequests;
    }

    static class EnrichedResourceRequest {
        private final Map<String, AtomicInteger> nodeLocations = new HashMap<String, AtomicInteger>();
        private final Map<String, AtomicInteger> rackLocations = new HashMap<String, AtomicInteger>();
        private final ResourceRequest request;

        EnrichedResourceRequest(ResourceRequest request) {
            this.request = request;
        }

        ResourceRequest getRequest() {
            return this.request;
        }

        void addLocation(String location, int count) {
            Map<String, AtomicInteger> m = this.rackLocations;
            if (!location.startsWith("/")) {
                m = this.nodeLocations;
            }
            if (count == 0) {
                m.remove(location);
            } else {
                m.put(location, new AtomicInteger(count));
            }
        }

        void removeLocation(String location) {
            Map<String, AtomicInteger> m = this.rackLocations;
            AtomicInteger count = m.get(location);
            if (count == null) {
                m = this.nodeLocations;
                count = m.get(location);
            }
            if (count != null && count.decrementAndGet() == 0) {
                m.remove(location);
            }
        }

        Set<String> getNodeLocations() {
            return this.nodeLocations.keySet();
        }

        Set<String> getRackLocations() {
            return this.rackLocations.keySet();
        }
    }

    static class Allocation {
        private final Container container;
        private final String resourceName;

        Allocation(Container container, String resourceName) {
            this.container = container;
            this.resourceName = resourceName;
        }

        Container getContainer() {
            return this.container;
        }

        String getResourceName() {
            return this.resourceName;
        }
    }

    public static class PartitionedResourceRequests {
        private List<ResourceRequest> guaranteed = new ArrayList<ResourceRequest>();
        private List<ResourceRequest> opportunistic = new ArrayList<ResourceRequest>();

        public List<ResourceRequest> getGuaranteed() {
            return this.guaranteed;
        }

        public List<ResourceRequest> getOpportunistic() {
            return this.opportunistic;
        }
    }

    public static class ContainerIdGenerator {
        protected volatile AtomicLong containerIdCounter = new AtomicLong(1L);

        public void resetContainerIdCounter(long containerIdStart) {
            this.containerIdCounter.set(containerIdStart);
        }

        public long generateContainerId() {
            return this.containerIdCounter.incrementAndGet();
        }
    }

    public static class AllocationParams {
        private Resource maxResource;
        private Resource minResource;
        private Resource incrementResource;
        private int containerTokenExpiryInterval;
        private int maxAllocationsPerSchedulerKeyPerRound = 1;

        public Resource getMaxResource() {
            return this.maxResource;
        }

        public void setMaxResource(Resource maxResource) {
            this.maxResource = maxResource;
        }

        public Resource getMinResource() {
            return this.minResource;
        }

        public void setMinResource(Resource minResource) {
            this.minResource = minResource;
        }

        public Resource getIncrementResource() {
            return this.incrementResource;
        }

        public void setIncrementResource(Resource incrementResource) {
            this.incrementResource = incrementResource;
        }

        public int getContainerTokenExpiryInterval() {
            return this.containerTokenExpiryInterval;
        }

        public void setContainerTokenExpiryInterval(int containerTokenExpiryInterval) {
            this.containerTokenExpiryInterval = containerTokenExpiryInterval;
        }

        public int getMaxAllocationsPerSchedulerKeyPerRound() {
            return this.maxAllocationsPerSchedulerKeyPerRound;
        }

        public void setMaxAllocationsPerSchedulerKeyPerRound(int maxAllocationsPerSchedulerKeyPerRound) {
            this.maxAllocationsPerSchedulerKeyPerRound = maxAllocationsPerSchedulerKeyPerRound;
        }
    }
}

