/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyApplicationContext;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.utils.AMRMClientUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FederationInterceptor
extends AbstractRequestInterceptor {
    private static final Logger LOG = LoggerFactory.getLogger(FederationInterceptor.class);
    public static final String NMSS_CLASS_PREFIX = "FederationInterceptor/";
    public static final String NMSS_REG_REQUEST_KEY = "FederationInterceptor/registerRequest";
    public static final String NMSS_REG_RESPONSE_KEY = "FederationInterceptor/registerResponse";
    public static final String NMSS_SECONDARY_SC_PREFIX = "FederationInterceptor/secondarySC/";
    public static final String STRING_TO_BYTE_FORMAT = "UTF-8";
    private ApplicationMasterProtocol homeRM;
    private SubClusterId homeSubClusterId;
    private UnmanagedAMPoolManager uamPool;
    private ExecutorService threadpool;
    private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
    private Map<ContainerId, SubClusterId> containerIdToSubClusterIdMap = new ConcurrentHashMap<ContainerId, SubClusterId>();
    private RegisterApplicationMasterRequest amRegistrationRequest = null;
    private RegisterApplicationMasterResponse amRegistrationResponse = null;
    private FederationStateStoreFacade federationFacade;
    private SubClusterResolver subClusterResolver;
    private FederationAMRMProxyPolicy policyInterpreter;
    private UserGroupInformation appOwner;
    private FederationRegistryClient registryClient;

    public FederationInterceptor() {
        this.asyncResponseSink = new ConcurrentHashMap<SubClusterId, List<AllocateResponse>>();
        this.threadpool = Executors.newCachedThreadPool();
        this.uamPool = this.createUnmanagedAMPoolManager(this.threadpool);
    }

    @Override
    public void init(AMRMProxyApplicationContext appContext) {
        super.init(appContext);
        LOG.info("Initializing Federation Interceptor");
        Configuration conf = appContext.getConf();
        if (conf == null) {
            conf = this.getConf();
        } else {
            this.setConf(conf);
        }
        try {
            this.appOwner = UserGroupInformation.createProxyUser((String)appContext.getUser(), (UserGroupInformation)UserGroupInformation.getCurrentUser());
        }
        catch (Exception ex) {
            throw new YarnRuntimeException((Throwable)ex);
        }
        if (appContext.getRegistryClient() != null) {
            this.registryClient = new FederationRegistryClient(conf, appContext.getRegistryClient(), this.appOwner);
            if (appContext.getCredentials() != null) {
                this.appOwner.addCredentials(appContext.getCredentials());
            }
        }
        this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId((Configuration)conf));
        this.homeRM = this.createHomeRMProxy(appContext, ApplicationMasterProtocol.class, this.appOwner);
        this.federationFacade = FederationStateStoreFacade.getInstance();
        this.subClusterResolver = this.federationFacade.getSubClusterResolver();
        this.policyInterpreter = null;
        this.uamPool.init(conf);
        this.uamPool.start();
    }

    @Override
    public void recover(Map<String, byte[]> recoveredDataMap) {
        super.recover(recoveredDataMap);
        LOG.info("Recovering data for FederationInterceptor");
        if (recoveredDataMap == null) {
            return;
        }
        ApplicationAttemptId attemptId = this.getApplicationContext().getApplicationAttemptId();
        try {
            Map<Object, Object> uamMap;
            YarnServiceProtos.RegisterApplicationMasterRequestProto pb;
            if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
                pb = YarnServiceProtos.RegisterApplicationMasterRequestProto.parseFrom((byte[])recoveredDataMap.get(NMSS_REG_REQUEST_KEY));
                this.amRegistrationRequest = new RegisterApplicationMasterRequestPBImpl(pb);
                LOG.info("amRegistrationRequest recovered for {}", (Object)attemptId);
            }
            if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
                pb = YarnServiceProtos.RegisterApplicationMasterResponseProto.parseFrom((byte[])recoveredDataMap.get(NMSS_REG_RESPONSE_KEY));
                this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl((YarnServiceProtos.RegisterApplicationMasterResponseProto)pb);
                LOG.info("amRegistrationResponse recovered for {}", (Object)attemptId);
            }
            if (this.registryClient != null) {
                uamMap = this.registryClient.loadStateFromRegistry(attemptId.getApplicationId());
                LOG.info("Found {} existing UAMs for application {} in Yarn Registry", (Object)uamMap.size(), (Object)attemptId.getApplicationId());
            } else {
                uamMap = new HashMap();
                for (Map.Entry<String, byte[]> entry : recoveredDataMap.entrySet()) {
                    if (!entry.getKey().startsWith(NMSS_SECONDARY_SC_PREFIX)) continue;
                    String string = ((String)entry.getKey()).substring(NMSS_SECONDARY_SC_PREFIX.length());
                    Token amrmToken = new Token();
                    amrmToken.decodeFromUrlString(new String((byte[])entry.getValue(), STRING_TO_BYTE_FORMAT));
                    uamMap.put(string, amrmToken);
                    LOG.debug("Recovered UAM in " + string + " from NMSS");
                }
                LOG.info("Found {} existing UAMs for application {} in NMStateStore", (Object)uamMap.size(), (Object)attemptId.getApplicationId());
            }
            int containers = 0;
            for (Map.Entry entry : uamMap.entrySet()) {
                SubClusterId subClusterId = SubClusterId.newInstance((String)entry.getKey());
                YarnConfiguration config = new YarnConfiguration(this.getConf());
                FederationProxyProviderUtil.updateConfForFederation((Configuration)config, subClusterId.getId());
                try {
                    this.uamPool.reAttachUAM(subClusterId.getId(), (Configuration)config, attemptId.getApplicationId(), this.amRegistrationResponse.getQueue(), this.getApplicationContext().getUser(), this.homeSubClusterId.getId(), (Token<AMRMTokenIdentifier>)((Token)entry.getValue()));
                    RegisterApplicationMasterResponse response = this.uamPool.registerApplicationMaster(subClusterId.getId(), this.amRegistrationRequest);
                    for (Container container : response.getContainersFromPreviousAttempts()) {
                        this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
                        ++containers;
                    }
                    LOG.info("Recovered {} running containers from UAM in {}", (Object)response.getContainersFromPreviousAttempts().size(), (Object)subClusterId);
                }
                catch (Exception e) {
                    LOG.error("Error reattaching UAM to " + subClusterId + " for " + attemptId, (Throwable)e);
                }
            }
            UserGroupInformation appSubmitter = UserGroupInformation.createRemoteUser((String)this.getApplicationContext().getUser());
            ApplicationClientProtocol applicationClientProtocol = this.createHomeRMProxy(this.getApplicationContext(), ApplicationClientProtocol.class, appSubmitter);
            GetContainersResponse response = applicationClientProtocol.getContainers(GetContainersRequest.newInstance((ApplicationAttemptId)attemptId));
            for (ContainerReport container : response.getContainerList()) {
                this.containerIdToSubClusterIdMap.put(container.getContainerId(), this.homeSubClusterId);
                ++containers;
                LOG.debug("  From home RM " + this.homeSubClusterId + " running container " + container.getContainerId());
            }
            LOG.info("{} running containers including AM recovered from home RM ", (Object)response.getContainerList().size(), (Object)this.homeSubClusterId);
            LOG.info("In all {} UAMs {} running containers including AM recovered for {}", new Object[]{uamMap.size(), containers, attemptId});
            if (this.amRegistrationResponse != null) {
                String queue = this.amRegistrationResponse.getQueue();
                this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, this.getConf(), this.federationFacade, this.homeSubClusterId);
            }
        }
        catch (IOException | YarnException e) {
            throw new YarnRuntimeException(e);
        }
    }

    public synchronized RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnException, IOException {
        String queue;
        if (this.amRegistrationRequest != null) {
            if (!this.amRegistrationRequest.equals(request)) {
                throw new YarnException("AM should not call registerApplicationMaster with a different request body");
            }
        } else {
            this.amRegistrationRequest = request;
            if (this.getNMStateStore() != null) {
                try {
                    RegisterApplicationMasterRequestPBImpl pb = (RegisterApplicationMasterRequestPBImpl)this.amRegistrationRequest;
                    this.getNMStateStore().storeAMRMProxyAppContextEntry(this.getApplicationContext().getApplicationAttemptId(), NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
                }
                catch (Exception e) {
                    LOG.error("Error storing AMRMProxy application context entry for " + this.getApplicationContext().getApplicationAttemptId(), (Throwable)e);
                }
            }
        }
        if (this.amRegistrationResponse != null) {
            return this.amRegistrationResponse;
        }
        this.amRegistrationResponse = this.homeRM.registerApplicationMaster(request);
        if (this.amRegistrationResponse.getContainersFromPreviousAttempts() != null) {
            this.cacheAllocatedContainers(this.amRegistrationResponse.getContainersFromPreviousAttempts(), this.homeSubClusterId);
        }
        ApplicationId appId = this.getApplicationContext().getApplicationAttemptId().getApplicationId();
        this.reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
        if (this.getNMStateStore() != null) {
            try {
                RegisterApplicationMasterResponsePBImpl pb = (RegisterApplicationMasterResponsePBImpl)this.amRegistrationResponse;
                this.getNMStateStore().storeAMRMProxyAppContextEntry(this.getApplicationContext().getApplicationAttemptId(), NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
            }
            catch (Exception e) {
                LOG.error("Error storing AMRMProxy application context entry for " + this.getApplicationContext().getApplicationAttemptId(), (Throwable)e);
            }
        }
        if ((queue = this.amRegistrationResponse.getQueue()) == null) {
            LOG.warn("Received null queue for application " + appId + " from home subcluster. Will use default queue name " + "default" + " for getting AMRMProxyPolicy");
        } else {
            LOG.info("Application " + appId + " belongs to queue " + queue);
        }
        try {
            this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter, this.getConf(), this.federationFacade, this.homeSubClusterId);
        }
        catch (FederationPolicyInitializationException e) {
            throw new YarnRuntimeException((Throwable)((Object)e));
        }
        return this.amRegistrationResponse;
    }

    public AllocateResponse allocate(AllocateRequest request) throws YarnException {
        Preconditions.checkArgument((this.policyInterpreter != null ? 1 : 0) != 0, (Object)"Allocate should be called after registerApplicationMaster");
        try {
            Map<SubClusterId, AllocateRequest> requests = this.splitAllocateRequest(request);
            Registrations newRegistrations = this.sendRequestsToSecondaryResourceManagers(requests);
            AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(requests.get(this.homeSubClusterId), this.homeRM, this.amRegistrationRequest, this.getApplicationContext().getApplicationAttemptId().getApplicationId());
            try {
                this.policyInterpreter.notifyOfResponse(this.homeSubClusterId, homeResponse);
            }
            catch (YarnException e) {
                LOG.warn("notifyOfResponse for policy failed for home sub-cluster " + this.homeSubClusterId, (Throwable)e);
            }
            if (homeResponse.getAMRMToken() != null) {
                LOG.debug("Received new AMRMToken");
                YarnServerSecurityUtils.updateAMRMToken(homeResponse.getAMRMToken(), this.appOwner, this.getConf());
            }
            homeResponse = this.mergeAllocateResponses(homeResponse);
            if (!FederationInterceptor.isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
                homeResponse = this.mergeRegistrationResponses(homeResponse, newRegistrations.getSuccessfulRegistrations());
            }
            return homeResponse;
        }
        catch (IOException ex) {
            LOG.error("Exception encountered while processing heart beat", (Throwable)ex);
            throw new YarnException((Throwable)ex);
        }
    }

    public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnException, IOException {
        boolean failedToUnRegister = false;
        ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc = null;
        Set<String> subClusterIds = this.uamPool.getAllUAMIds();
        if (subClusterIds.size() > 0) {
            final FinishApplicationMasterRequest finishRequest = request;
            compSvc = new ExecutorCompletionService<FinishApplicationMasterResponseInfo>(this.threadpool);
            LOG.info("Sending finish application request to {} sub-cluster RMs", (Object)subClusterIds.size());
            for (final String subClusterId : subClusterIds) {
                compSvc.submit(new Callable<FinishApplicationMasterResponseInfo>(){

                    @Override
                    public FinishApplicationMasterResponseInfo call() throws Exception {
                        LOG.info("Sending finish application request to RM {}", (Object)subClusterId);
                        FinishApplicationMasterResponse uamResponse = null;
                        try {
                            uamResponse = FederationInterceptor.this.uamPool.finishApplicationMaster(subClusterId, finishRequest);
                        }
                        catch (Throwable e) {
                            LOG.warn("Failed to finish unmanaged application master: RM address: " + subClusterId + " ApplicationId: " + FederationInterceptor.this.getApplicationContext().getApplicationAttemptId(), e);
                        }
                        return new FinishApplicationMasterResponseInfo(uamResponse, subClusterId);
                    }
                });
            }
        }
        FinishApplicationMasterResponse homeResponse = AMRMClientUtils.finishAMWithReRegister(request, this.homeRM, this.amRegistrationRequest, this.getApplicationContext().getApplicationAttemptId().getApplicationId());
        if (subClusterIds.size() > 0) {
            LOG.info("Waiting for finish application response from {} sub-cluster RMs", (Object)subClusterIds.size());
            for (int i = 0; i < subClusterIds.size(); ++i) {
                try {
                    Future future = compSvc.take();
                    FinishApplicationMasterResponseInfo uamResponse = (FinishApplicationMasterResponseInfo)future.get();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received finish application response from RM: " + uamResponse.getSubClusterId());
                    }
                    if (uamResponse.getResponse() == null || !uamResponse.getResponse().getIsUnregistered()) {
                        failedToUnRegister = true;
                        continue;
                    }
                    if (this.getNMStateStore() == null) continue;
                    this.getNMStateStore().removeAMRMProxyAppContextEntry(this.getApplicationContext().getApplicationAttemptId(), NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId());
                    continue;
                }
                catch (Throwable e) {
                    failedToUnRegister = true;
                    LOG.warn("Failed to finish unmanaged application master:  ApplicationId: " + this.getApplicationContext().getApplicationAttemptId(), e);
                }
            }
        }
        if (failedToUnRegister) {
            homeResponse.setIsUnregistered(false);
        } else {
            this.uamPool.stop();
            if (this.registryClient != null) {
                this.registryClient.removeAppFromRegistry(this.getApplicationContext().getApplicationAttemptId().getApplicationId());
            }
        }
        return homeResponse;
    }

    @Override
    public void setNextInterceptor(RequestInterceptor next) {
        throw new YarnRuntimeException("setNextInterceptor is being called on FederationInterceptor. It should always be used as the last interceptor in the chain");
    }

    @Override
    public void shutdown() {
        if (this.threadpool != null) {
            try {
                this.threadpool.shutdown();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            this.threadpool = null;
        }
        super.shutdown();
    }

    @VisibleForTesting
    protected void cleanupRegistry() {
        if (this.registryClient != null) {
            this.registryClient.cleanAllApplications();
        }
    }

    @VisibleForTesting
    protected FederationRegistryClient getRegistryClient() {
        return this.registryClient;
    }

    @VisibleForTesting
    protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(ExecutorService threadPool) {
        return new UnmanagedAMPoolManager(threadPool);
    }

    protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext, Class<T> protocol, UserGroupInformation user) {
        try {
            return FederationProxyProviderUtil.createRMProxy(appContext.getConf(), protocol, this.homeSubClusterId, user, appContext.getAMRMToken());
        }
        catch (Exception ex) {
            throw new YarnRuntimeException((Throwable)ex);
        }
    }

    private void mergeRegisterResponse(RegisterApplicationMasterResponse homeResponse, RegisterApplicationMasterResponse otherResponse) {
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getContainersFromPreviousAttempts())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getContainersFromPreviousAttempts())) {
                homeResponse.getContainersFromPreviousAttempts().addAll(otherResponse.getContainersFromPreviousAttempts());
            } else {
                homeResponse.setContainersFromPreviousAttempts(otherResponse.getContainersFromPreviousAttempts());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getNMTokensFromPreviousAttempts())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getNMTokensFromPreviousAttempts())) {
                homeResponse.getNMTokensFromPreviousAttempts().addAll(otherResponse.getNMTokensFromPreviousAttempts());
            } else {
                homeResponse.setNMTokensFromPreviousAttempts(otherResponse.getNMTokensFromPreviousAttempts());
            }
        }
    }

    protected void reAttachUAMAndMergeRegisterResponse(RegisterApplicationMasterResponse homeResponse, final ApplicationId appId) {
        if (this.registryClient == null) {
            LOG.warn("registryClient is null, skip attaching existing UAM if any");
            return;
        }
        Map<String, Token<AMRMTokenIdentifier>> uamMap = this.registryClient.loadStateFromRegistry(appId);
        if (uamMap.size() == 0) {
            LOG.info("No existing UAM for application {} found in Yarn Registry", (Object)appId);
            return;
        }
        LOG.info("Found {} existing UAMs for application {} in Yarn Registry. Reattaching in parallel", (Object)uamMap.size(), (Object)appId);
        ExecutorCompletionService<RegisterApplicationMasterResponse> completionService = new ExecutorCompletionService<RegisterApplicationMasterResponse>(this.threadpool);
        for (Map.Entry<String, Token<AMRMTokenIdentifier>> entry : uamMap.entrySet()) {
            final SubClusterId subClusterId = SubClusterId.newInstance(entry.getKey());
            final Token<AMRMTokenIdentifier> amrmToken = entry.getValue();
            completionService.submit(new Callable<RegisterApplicationMasterResponse>(){

                @Override
                public RegisterApplicationMasterResponse call() throws Exception {
                    RegisterApplicationMasterResponse response = null;
                    try {
                        YarnConfiguration config = new YarnConfiguration(FederationInterceptor.this.getConf());
                        FederationProxyProviderUtil.updateConfForFederation((Configuration)config, subClusterId.getId());
                        FederationInterceptor.this.uamPool.reAttachUAM(subClusterId.getId(), (Configuration)config, appId, FederationInterceptor.this.amRegistrationResponse.getQueue(), FederationInterceptor.this.getApplicationContext().getUser(), FederationInterceptor.this.homeSubClusterId.getId(), (Token<AMRMTokenIdentifier>)amrmToken);
                        response = FederationInterceptor.this.uamPool.registerApplicationMaster(subClusterId.getId(), FederationInterceptor.this.amRegistrationRequest);
                        if (response != null && response.getContainersFromPreviousAttempts() != null) {
                            FederationInterceptor.this.cacheAllocatedContainers(response.getContainersFromPreviousAttempts(), subClusterId);
                        }
                        LOG.info("UAM {} reattached for {}", (Object)subClusterId, (Object)appId);
                    }
                    catch (Throwable e) {
                        LOG.error("Reattaching UAM " + subClusterId + " failed for " + appId, e);
                    }
                    return response;
                }
            });
        }
        for (int i = 0; i < uamMap.size(); ++i) {
            try {
                Future future = completionService.take();
                RegisterApplicationMasterResponse registerResponse = (RegisterApplicationMasterResponse)future.get();
                if (registerResponse == null) continue;
                LOG.info("Merging register response for {}", (Object)appId);
                this.mergeRegisterResponse(homeResponse, registerResponse);
                continue;
            }
            catch (Exception e) {
                LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, (Throwable)e);
            }
        }
    }

    private SubClusterId getSubClusterForNode(String nodeName) {
        SubClusterId subClusterId = null;
        try {
            subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName);
        }
        catch (YarnException e) {
            LOG.error("Failed to resolve sub-cluster for node " + nodeName + ", skipping this node", (Throwable)e);
            return null;
        }
        if (subClusterId == null) {
            LOG.error("Failed to resolve sub-cluster for node {}, skipping this node", (Object)nodeName);
            return null;
        }
        return subClusterId;
    }

    private Map<SubClusterId, AllocateRequest> splitAllocateRequest(AllocateRequest request) throws YarnException {
        AllocateRequest newRequest;
        HashMap<SubClusterId, AllocateRequest> requestMap = new HashMap<SubClusterId, AllocateRequest>();
        FederationInterceptor.findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request, requestMap);
        Set<String> subClusterIds = this.uamPool.getAllUAMIds();
        for (String string : subClusterIds) {
            FederationInterceptor.findOrCreateAllocateRequestForSubCluster(SubClusterId.newInstance(string), request, requestMap);
        }
        if (!FederationInterceptor.isNullOrEmpty(request.getAskList())) {
            Map<SubClusterId, List<ResourceRequest>> asks = this.splitResourceRequests(request.getAskList());
            for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : asks.entrySet()) {
                newRequest = FederationInterceptor.findOrCreateAllocateRequestForSubCluster(entry.getKey(), request, requestMap);
                newRequest.getAskList().addAll((Collection)entry.getValue());
            }
        }
        if (request.getResourceBlacklistRequest() != null) {
            if (!FederationInterceptor.isNullOrEmpty(request.getResourceBlacklistRequest().getBlacklistAdditions())) {
                for (String string : request.getResourceBlacklistRequest().getBlacklistAdditions()) {
                    SubClusterId subClusterId = this.getSubClusterForNode(string);
                    if (subClusterId == null) continue;
                    newRequest = FederationInterceptor.findOrCreateAllocateRequestForSubCluster(subClusterId, request, requestMap);
                    newRequest.getResourceBlacklistRequest().getBlacklistAdditions().add(string);
                }
            }
            if (!FederationInterceptor.isNullOrEmpty(request.getResourceBlacklistRequest().getBlacklistRemovals())) {
                for (String string : request.getResourceBlacklistRequest().getBlacklistRemovals()) {
                    SubClusterId subClusterId = this.getSubClusterForNode(string);
                    if (subClusterId == null) continue;
                    newRequest = FederationInterceptor.findOrCreateAllocateRequestForSubCluster(subClusterId, request, requestMap);
                    newRequest.getResourceBlacklistRequest().getBlacklistRemovals().add(string);
                }
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(request.getReleaseList())) {
            for (ContainerId containerId : request.getReleaseList()) {
                if (!this.warnIfNotExists(containerId, "release")) continue;
                SubClusterId subClusterId = this.containerIdToSubClusterIdMap.get(containerId);
                newRequest = (AllocateRequest)requestMap.get(subClusterId);
                newRequest.getReleaseList().add(containerId);
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(request.getUpdateRequests())) {
            for (UpdateContainerRequest updateContainerRequest : request.getUpdateRequests()) {
                if (!this.warnIfNotExists(updateContainerRequest.getContainerId(), "update")) continue;
                SubClusterId subClusterId = this.containerIdToSubClusterIdMap.get(updateContainerRequest.getContainerId());
                newRequest = (AllocateRequest)requestMap.get(subClusterId);
                newRequest.getUpdateRequests().add(updateContainerRequest);
            }
        }
        return requestMap;
    }

    private Registrations sendRequestsToSecondaryResourceManagers(Map<SubClusterId, AllocateRequest> requests) throws YarnException, IOException {
        Registrations registrations = this.registerWithNewSubClusters(requests.keySet());
        for (Map.Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
            final SubClusterId subClusterId = entry.getKey();
            if (subClusterId.equals(this.homeSubClusterId)) continue;
            if (!this.uamPool.hasUAMId(subClusterId.getId())) {
                LOG.warn("Unmanaged AM registration not found for sub-cluster {}", (Object)subClusterId);
                continue;
            }
            this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(), new AsyncCallback<AllocateResponse>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void callback(AllocateResponse response) {
                    Map map = FederationInterceptor.this.asyncResponseSink;
                    synchronized (map) {
                        List<AllocateResponse> responses = null;
                        if (FederationInterceptor.this.asyncResponseSink.containsKey(subClusterId)) {
                            responses = (List)FederationInterceptor.this.asyncResponseSink.get(subClusterId);
                        } else {
                            responses = new ArrayList();
                            FederationInterceptor.this.asyncResponseSink.put(subClusterId, responses);
                        }
                        responses.add(response);
                    }
                    if (response.getAMRMToken() != null) {
                        Token newToken = ConverterUtils.convertFromYarn((org.apache.hadoop.yarn.api.records.Token)response.getAMRMToken(), (Text)null);
                        if (FederationInterceptor.this.registryClient != null) {
                            FederationInterceptor.this.registryClient.writeAMRMTokenForUAM(FederationInterceptor.this.getApplicationContext().getApplicationAttemptId().getApplicationId(), subClusterId.getId(), (Token<AMRMTokenIdentifier>)newToken);
                        } else if (FederationInterceptor.this.getNMStateStore() != null) {
                            try {
                                FederationInterceptor.this.getNMStateStore().storeAMRMProxyAppContextEntry(FederationInterceptor.this.getApplicationContext().getApplicationAttemptId(), FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + subClusterId.getId(), newToken.encodeToUrlString().getBytes(FederationInterceptor.STRING_TO_BYTE_FORMAT));
                            }
                            catch (IOException e) {
                                LOG.error("Error storing UAM token as AMRMProxy context entry in NMSS for " + FederationInterceptor.this.getApplicationContext().getApplicationAttemptId(), (Throwable)e);
                            }
                        }
                    }
                    try {
                        FederationInterceptor.this.policyInterpreter.notifyOfResponse(subClusterId, response);
                    }
                    catch (YarnException e) {
                        LOG.warn("notifyOfResponse for policy failed for home sub-cluster " + subClusterId, (Throwable)e);
                    }
                }
            });
        }
        return registrations;
    }

    private Registrations registerWithNewSubClusters(Set<SubClusterId> subClusterSet) throws IOException {
        ArrayList<SubClusterId> failedRegistrations = new ArrayList<SubClusterId>();
        HashMap<SubClusterId, RegisterApplicationMasterResponse> successfulRegistrations = new HashMap<SubClusterId, RegisterApplicationMasterResponse>();
        ArrayList<String> newSubClusters = new ArrayList<String>();
        for (SubClusterId subClusterId : subClusterSet) {
            if (subClusterId.equals(this.homeSubClusterId) || this.uamPool.hasUAMId(subClusterId.getId())) continue;
            newSubClusters.add(subClusterId.getId());
        }
        if (newSubClusters.size() > 0) {
            final RegisterApplicationMasterRequest registerRequest = this.amRegistrationRequest;
            final AMRMProxyApplicationContext appContext = this.getApplicationContext();
            ExecutorCompletionService<RegisterApplicationMasterResponseInfo> completionService = new ExecutorCompletionService<RegisterApplicationMasterResponseInfo>(this.threadpool);
            for (final String subClusterId : newSubClusters) {
                completionService.submit(new Callable<RegisterApplicationMasterResponseInfo>(){

                    @Override
                    public RegisterApplicationMasterResponseInfo call() throws Exception {
                        YarnConfiguration config = new YarnConfiguration(FederationInterceptor.this.getConf());
                        FederationProxyProviderUtil.updateConfForFederation((Configuration)config, subClusterId);
                        RegisterApplicationMasterResponse uamResponse = null;
                        Token<AMRMTokenIdentifier> token = null;
                        try {
                            token = FederationInterceptor.this.uamPool.launchUAM(subClusterId, (Configuration)config, appContext.getApplicationAttemptId().getApplicationId(), FederationInterceptor.this.amRegistrationResponse.getQueue(), appContext.getUser(), FederationInterceptor.this.homeSubClusterId.toString(), FederationInterceptor.this.registryClient != null);
                            uamResponse = FederationInterceptor.this.uamPool.registerApplicationMaster(subClusterId, registerRequest);
                        }
                        catch (Throwable e) {
                            LOG.error("Failed to register application master: " + subClusterId + " Application: " + appContext.getApplicationAttemptId(), e);
                        }
                        return new RegisterApplicationMasterResponseInfo(uamResponse, SubClusterId.newInstance(subClusterId), token);
                    }
                });
            }
            for (int i = 0; i < newSubClusters.size(); ++i) {
                try {
                    Future future = completionService.take();
                    RegisterApplicationMasterResponseInfo uamResponse = (RegisterApplicationMasterResponseInfo)future.get();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received register application response from RM: " + uamResponse.getSubClusterId());
                    }
                    if (uamResponse.getResponse() == null) {
                        failedRegistrations.add(uamResponse.getSubClusterId());
                        continue;
                    }
                    LOG.info("Successfully registered unmanaged application master: " + uamResponse.getSubClusterId() + " ApplicationId: " + this.getApplicationContext().getApplicationAttemptId());
                    successfulRegistrations.put(uamResponse.getSubClusterId(), uamResponse.getResponse());
                    if (this.registryClient != null) {
                        this.registryClient.writeAMRMTokenForUAM(this.getApplicationContext().getApplicationAttemptId().getApplicationId(), uamResponse.getSubClusterId().getId(), uamResponse.getUamToken());
                        continue;
                    }
                    if (this.getNMStateStore() == null) continue;
                    this.getNMStateStore().storeAMRMProxyAppContextEntry(this.getApplicationContext().getApplicationAttemptId(), NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId().getId(), uamResponse.getUamToken().encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
                    continue;
                }
                catch (Exception e) {
                    LOG.warn("Failed to register unmanaged application master:  ApplicationId: " + this.getApplicationContext().getApplicationAttemptId(), (Throwable)e);
                }
            }
        }
        return new Registrations(successfulRegistrations, failedRegistrations);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AllocateResponse mergeAllocateResponses(AllocateResponse homeResponse) {
        this.removeFinishedContainersFromCache(homeResponse.getCompletedContainersStatuses());
        this.cacheAllocatedContainers(homeResponse.getAllocatedContainers(), this.homeSubClusterId);
        Map<SubClusterId, List<AllocateResponse>> map = this.asyncResponseSink;
        synchronized (map) {
            for (Map.Entry<SubClusterId, List<AllocateResponse>> entry : this.asyncResponseSink.entrySet()) {
                SubClusterId subClusterId = entry.getKey();
                List<AllocateResponse> responses = entry.getValue();
                if (responses.size() <= 0) continue;
                for (AllocateResponse response : responses) {
                    this.removeFinishedContainersFromCache(response.getCompletedContainersStatuses());
                    this.cacheAllocatedContainers(response.getAllocatedContainers(), subClusterId);
                    this.mergeAllocateResponse(homeResponse, response, subClusterId);
                }
                responses.clear();
            }
        }
        return homeResponse;
    }

    private void removeFinishedContainersFromCache(List<ContainerStatus> finishedContainers) {
        for (ContainerStatus container : finishedContainers) {
            LOG.debug("Completed container {}", (Object)container);
            if (!this.containerIdToSubClusterIdMap.containsKey(container.getContainerId())) continue;
            this.containerIdToSubClusterIdMap.remove(container.getContainerId());
        }
    }

    private AllocateResponse mergeRegistrationResponses(AllocateResponse homeResponse, Map<SubClusterId, RegisterApplicationMasterResponse> registrations) {
        for (Map.Entry<SubClusterId, RegisterApplicationMasterResponse> entry : registrations.entrySet()) {
            RegisterApplicationMasterResponse registration = entry.getValue();
            if (!FederationInterceptor.isNullOrEmpty(registration.getContainersFromPreviousAttempts())) {
                List tempContainers = homeResponse.getAllocatedContainers();
                if (!FederationInterceptor.isNullOrEmpty(tempContainers)) {
                    tempContainers.addAll(registration.getContainersFromPreviousAttempts());
                    homeResponse.setAllocatedContainers(tempContainers);
                } else {
                    homeResponse.setAllocatedContainers(registration.getContainersFromPreviousAttempts());
                }
                this.cacheAllocatedContainers(registration.getContainersFromPreviousAttempts(), entry.getKey());
            }
            if (FederationInterceptor.isNullOrEmpty(registration.getNMTokensFromPreviousAttempts())) continue;
            List tempTokens = homeResponse.getNMTokens();
            if (!FederationInterceptor.isNullOrEmpty(tempTokens)) {
                tempTokens.addAll(registration.getNMTokensFromPreviousAttempts());
                homeResponse.setNMTokens(tempTokens);
                continue;
            }
            homeResponse.setNMTokens(registration.getNMTokensFromPreviousAttempts());
        }
        return homeResponse;
    }

    private void mergeAllocateResponse(AllocateResponse homeResponse, AllocateResponse otherResponse, SubClusterId otherRMAddress) {
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getAllocatedContainers())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getAllocatedContainers())) {
                homeResponse.getAllocatedContainers().addAll(otherResponse.getAllocatedContainers());
            } else {
                homeResponse.setAllocatedContainers(otherResponse.getAllocatedContainers());
            }
        }
        if (otherResponse.getAvailableResources() != null) {
            if (homeResponse.getAvailableResources() != null) {
                homeResponse.setAvailableResources(Resources.add((Resource)homeResponse.getAvailableResources(), (Resource)otherResponse.getAvailableResources()));
            } else {
                homeResponse.setAvailableResources(otherResponse.getAvailableResources());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) {
                homeResponse.getCompletedContainersStatuses().addAll(otherResponse.getCompletedContainersStatuses());
            } else {
                homeResponse.setCompletedContainersStatuses(otherResponse.getCompletedContainersStatuses());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getUpdatedNodes())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getUpdatedNodes())) {
                homeResponse.getUpdatedNodes().addAll(otherResponse.getUpdatedNodes());
            } else {
                homeResponse.setUpdatedNodes(otherResponse.getUpdatedNodes());
            }
        }
        homeResponse.setNumClusterNodes(homeResponse.getNumClusterNodes() + otherResponse.getNumClusterNodes());
        PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
        PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
        if (homePreempMessage == null && otherPreempMessage != null) {
            homeResponse.setPreemptionMessage(otherPreempMessage);
        }
        if (homePreempMessage != null && otherPreempMessage != null) {
            PreemptionContract par1 = homePreempMessage.getContract();
            PreemptionContract par2 = otherPreempMessage.getContract();
            if (par1 == null && par2 != null) {
                homePreempMessage.setContract(par2);
            }
            if (par1 != null && par2 != null) {
                par1.getResourceRequest().addAll(par2.getResourceRequest());
                par2.getContainers().addAll(par2.getContainers());
            }
            StrictPreemptionContract spar1 = homePreempMessage.getStrictContract();
            StrictPreemptionContract spar2 = otherPreempMessage.getStrictContract();
            if (spar1 == null && spar2 != null) {
                homePreempMessage.setStrictContract(spar2);
            }
            if (spar1 != null && spar2 != null) {
                spar1.getContainers().addAll(spar2.getContainers());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getNMTokens())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getNMTokens())) {
                homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
            } else {
                homeResponse.setNMTokens(otherResponse.getNMTokens());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getUpdatedContainers())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getUpdatedContainers())) {
                homeResponse.getUpdatedContainers().addAll(otherResponse.getUpdatedContainers());
            } else {
                homeResponse.setUpdatedContainers(otherResponse.getUpdatedContainers());
            }
        }
        if (!FederationInterceptor.isNullOrEmpty(otherResponse.getUpdateErrors())) {
            if (!FederationInterceptor.isNullOrEmpty(homeResponse.getUpdateErrors())) {
                homeResponse.getUpdateErrors().addAll(otherResponse.getUpdateErrors());
            } else {
                homeResponse.setUpdateErrors(otherResponse.getUpdateErrors());
            }
        }
    }

    private void cacheAllocatedContainers(List<Container> containers, SubClusterId subClusterId) {
        for (Container container : containers) {
            LOG.debug("Adding container {}", (Object)container);
            if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) {
                SubClusterId existingSubClusterId = this.containerIdToSubClusterIdMap.get(container.getId());
                if (existingSubClusterId.equals(subClusterId)) {
                    LOG.warn("Duplicate containerID: {} found in the allocated containers from same sub-cluster: {}, so ignoring.", (Object)container.getId(), (Object)subClusterId);
                } else {
                    throw new YarnRuntimeException("Duplicate containerID found in the allocated containers. This can happen if the RM epoch is not configured properly. ContainerId: " + container.getId().toString() + " ApplicationId: " + this.getApplicationContext().getApplicationAttemptId() + " From RM: " + subClusterId + " . Previous container was from sub-cluster: " + existingSubClusterId);
                }
            }
            this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
        }
    }

    private static AllocateRequest findOrCreateAllocateRequestForSubCluster(SubClusterId subClusterId, AllocateRequest originalAMRequest, Map<SubClusterId, AllocateRequest> requestMap) {
        AllocateRequest newRequest = null;
        if (requestMap.containsKey(subClusterId)) {
            newRequest = requestMap.get(subClusterId);
        } else {
            newRequest = FederationInterceptor.createAllocateRequest();
            newRequest.setResponseId(originalAMRequest.getResponseId());
            newRequest.setProgress(originalAMRequest.getProgress());
            requestMap.put(subClusterId, newRequest);
        }
        return newRequest;
    }

    private static AllocateRequest createAllocateRequest() {
        AllocateRequest request = AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null);
        request.setAskList(new ArrayList());
        request.setReleaseList(new ArrayList());
        ResourceBlacklistRequest blackList = ResourceBlacklistRequest.newInstance(null, null);
        blackList.setBlacklistAdditions(new ArrayList());
        blackList.setBlacklistRemovals(new ArrayList());
        request.setResourceBlacklistRequest(blackList);
        request.setUpdateRequests(new ArrayList());
        return request;
    }

    private boolean warnIfNotExists(ContainerId containerId, String actionName) {
        if (!this.containerIdToSubClusterIdMap.containsKey(containerId)) {
            LOG.error("AM is trying to {} a container {} that does not exist. Might happen shortly after NM restart when NM recovery is enabled", (Object)actionName, (Object)containerId.toString());
            return false;
        }
        return true;
    }

    protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(List<ResourceRequest> askList) throws YarnException {
        return this.policyInterpreter.splitResourceRequests(askList);
    }

    @VisibleForTesting
    public int getUnmanagedAMPoolSize() {
        return this.uamPool.getAllUAMIds().size();
    }

    @VisibleForTesting
    public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
        return this.asyncResponseSink;
    }

    public static <T> boolean isNullOrEmpty(Collection<T> c) {
        return c == null || c.size() == 0;
    }

    public static <T1, T2> boolean isNullOrEmpty(Map<T1, T2> c) {
        return c == null || c.size() == 0;
    }

    private static class Registrations {
        private Map<SubClusterId, RegisterApplicationMasterResponse> successfulRegistrations;
        private List<SubClusterId> failedRegistrations;

        Registrations(Map<SubClusterId, RegisterApplicationMasterResponse> successfulRegistrations, List<SubClusterId> failedRegistrations) {
            this.successfulRegistrations = successfulRegistrations;
            this.failedRegistrations = failedRegistrations;
        }

        public Map<SubClusterId, RegisterApplicationMasterResponse> getSuccessfulRegistrations() {
            return this.successfulRegistrations;
        }

        public List<SubClusterId> getFailedRegistrations() {
            return this.failedRegistrations;
        }
    }

    private static class FinishApplicationMasterResponseInfo {
        private FinishApplicationMasterResponse response;
        private String subClusterId;

        FinishApplicationMasterResponseInfo(FinishApplicationMasterResponse response, String subClusterId) {
            this.response = response;
            this.subClusterId = subClusterId;
        }

        public FinishApplicationMasterResponse getResponse() {
            return this.response;
        }

        public String getSubClusterId() {
            return this.subClusterId;
        }
    }

    private static class RegisterApplicationMasterResponseInfo {
        private RegisterApplicationMasterResponse response;
        private SubClusterId subClusterId;
        private Token<AMRMTokenIdentifier> uamToken;

        RegisterApplicationMasterResponseInfo(RegisterApplicationMasterResponse response, SubClusterId subClusterId, Token<AMRMTokenIdentifier> uamToken) {
            this.response = response;
            this.subClusterId = subClusterId;
            this.uamToken = uamToken;
        }

        public RegisterApplicationMasterResponse getResponse() {
            return this.response;
        }

        public SubClusterId getSubClusterId() {
            return this.subClusterId;
        }

        public Token<AMRMTokenIdentifier> getUamToken() {
            return this.uamToken;
        }
    }
}

