/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;

import java.io.DataInputStream;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.DiskValidator;
import org.apache.hadoop.util.DiskValidatorFactory;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
import org.apache.hadoop.yarn.util.FSDownload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContainerLocalizer {
    static final Logger LOG = LoggerFactory.getLogger(ContainerLocalizer.class);
    public static final String FILECACHE = "filecache";
    public static final String APPCACHE = "appcache";
    public static final String USERCACHE = "usercache";
    public static final String OUTPUTDIR = "output";
    public static final String TOKEN_FILE_NAME_FMT = "%s.tokens";
    public static final String WORKDIR = "work";
    private static final String APPCACHE_CTXT_FMT = "%s.app.cache.dirs";
    private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs";
    private static final FsPermission FILECACHE_PERMS = new FsPermission(456);
    private static final FsPermission USERCACHE_FOLDER_PERMS = new FsPermission(493);
    private final String user;
    private final String appId;
    private final List<Path> localDirs;
    private final String localizerId;
    private final FileContext lfs;
    private final Configuration conf;
    private final RecordFactory recordFactory;
    private final Map<LocalResource, Future<Path>> pendingResources;
    private final String appCacheDirContextName;
    private final DiskValidator diskValidator;
    private Set<Thread> localizingThreads = Collections.synchronizedSet(new HashSet());

    public ContainerLocalizer(FileContext lfs, String user, String appId, String localizerId, List<Path> localDirs, RecordFactory recordFactory) throws IOException {
        if (null == user) {
            throw new IOException("Cannot initialize for null user");
        }
        if (null == localizerId) {
            throw new IOException("Cannot initialize for null containerId");
        }
        this.lfs = lfs;
        this.user = user;
        this.appId = appId;
        this.localDirs = localDirs;
        this.localizerId = localizerId;
        this.recordFactory = recordFactory;
        this.conf = new YarnConfiguration();
        this.diskValidator = DiskValidatorFactory.getInstance((String)this.conf.get("yarn.nodemanager.disk-validator", "basic"));
        LOG.info("Disk Validator: yarn.nodemanager.disk-validator is loaded.");
        this.appCacheDirContextName = String.format(APPCACHE_CTXT_FMT, appId);
        this.pendingResources = new HashMap<LocalResource, Future<Path>>();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public LocalizationProtocol getProxy(InetSocketAddress nmAddr) {
        YarnRPC rpc = YarnRPC.create((Configuration)this.conf);
        return (LocalizationProtocol)rpc.getProxy(LocalizationProtocol.class, nmAddr, this.conf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runLocalization(final InetSocketAddress nmAddr) throws IOException, InterruptedException {
        ContainerLocalizer.initDirs(this.conf, this.user, this.appId, this.lfs, this.localDirs);
        Credentials creds = new Credentials();
        try (FilterInputStream credFile = null;){
            Path tokenPath = new Path(String.format(TOKEN_FILE_NAME_FMT, this.localizerId));
            credFile = this.lfs.open(tokenPath);
            creds.readTokenStorageStream((DataInputStream)credFile);
            this.lfs.delete(tokenPath, false);
        }
        UserGroupInformation remoteUser = UserGroupInformation.createRemoteUser((String)this.user);
        remoteUser.addToken(creds.getToken(LocalizerTokenIdentifier.KIND));
        LocalizationProtocol nodeManager = (LocalizationProtocol)remoteUser.doAs((PrivilegedAction)new PrivilegedAction<LocalizationProtocol>(){

            @Override
            public LocalizationProtocol run() {
                return ContainerLocalizer.this.getProxy(nmAddr);
            }
        });
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)this.user);
        for (Token token : creds.getAllTokens()) {
            ugi.addToken(token);
        }
        ExecutorService exec = null;
        try {
            exec = this.createDownloadThreadPool();
            CompletionService<Path> ecs = this.createCompletionService(exec);
            this.localizeFiles(nodeManager, ecs, ugi);
        }
        catch (Throwable e) {
            throw new IOException(e);
        }
        finally {
            try {
                if (exec != null) {
                    exec.shutdown();
                    this.destroyShellProcesses(Shell.getAllShells());
                    exec.awaitTermination(10L, TimeUnit.SECONDS);
                }
                LocalDirAllocator.removeContext((String)this.appCacheDirContextName);
            }
            finally {
                this.closeFileSystems(ugi);
            }
        }
    }

    ExecutorService createDownloadThreadPool() {
        return HadoopExecutors.newSingleThreadExecutor((ThreadFactory)new ThreadFactoryBuilder().setNameFormat("ContainerLocalizer Downloader").build());
    }

    CompletionService<Path> createCompletionService(ExecutorService exec) {
        return new ExecutorCompletionService<Path>(exec);
    }

    Callable<Path> download(Path destDirPath, LocalResource rsrc, UserGroupInformation ugi) throws IOException {
        if (rsrc.getVisibility() == LocalResourceVisibility.PRIVATE) {
            this.createParentDirs(destDirPath);
        }
        this.diskValidator.checkStatus(new File(destDirPath.toUri().getRawPath()));
        return new FSDownloadWrapper(this.lfs, ugi, this.conf, destDirPath, rsrc);
    }

    private void createParentDirs(Path destDirPath) throws IOException {
        Path parent = destDirPath.getParent();
        Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(parent);
        Stack<Path> dirs = new Stack<Path>();
        while (!parent.equals((Object)cacheRoot)) {
            dirs.push(parent);
            parent = parent.getParent();
        }
        while (!dirs.isEmpty()) {
            ContainerLocalizer.createDir(this.lfs, (Path)dirs.pop(), USERCACHE_FOLDER_PERMS);
        }
    }

    static long getEstimatedSize(LocalResource rsrc) {
        if (rsrc.getSize() < 0L) {
            return -1L;
        }
        switch (rsrc.getType()) {
            case ARCHIVE: 
            case PATTERN: {
                return 5L * rsrc.getSize();
            }
        }
        return rsrc.getSize();
    }

    void sleep(int duration) throws InterruptedException {
        TimeUnit.SECONDS.sleep(duration);
    }

    protected void closeFileSystems(UserGroupInformation ugi) {
        try {
            FileSystem.closeAllForUGI((UserGroupInformation)ugi);
        }
        catch (IOException e) {
            LOG.warn("Failed to close filesystems: ", (Throwable)e);
        }
    }

    protected void localizeFiles(LocalizationProtocol nodemanager, CompletionService<Path> cs, UserGroupInformation ugi) throws IOException, YarnException {
        try {
            while (true) {
                LocalizerStatus status = this.createStatus();
                LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
                switch (response.getLocalizerAction()) {
                    case LIVE: {
                        List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
                        for (ResourceLocalizationSpec resourceLocalizationSpec : newRsrcs) {
                            if (this.pendingResources.containsKey(resourceLocalizationSpec.getResource())) continue;
                            this.pendingResources.put(resourceLocalizationSpec.getResource(), cs.submit(this.download(new Path(resourceLocalizationSpec.getDestinationDirectory().getFile()), resourceLocalizationSpec.getResource(), ugi)));
                        }
                        break;
                    }
                    case DIE: {
                        for (Future future : this.pendingResources.values()) {
                            future.cancel(true);
                        }
                        status = this.createStatus();
                        try {
                            nodemanager.heartbeat(status);
                        }
                        catch (YarnException e) {
                            e.printStackTrace(System.out);
                            LOG.error("Heartbeat failed while dying: ", (Throwable)e);
                        }
                        return;
                    }
                }
                cs.poll(1000L, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException e) {
            return;
        }
        catch (YarnException e) {
            throw e;
        }
    }

    private LocalizerStatus createStatus() throws InterruptedException {
        ArrayList<LocalResourceStatus> currentResources = new ArrayList<LocalResourceStatus>();
        Iterator<Map.Entry<LocalResource, Future<Path>>> i = this.pendingResources.entrySet().iterator();
        while (i.hasNext()) {
            Map.Entry<LocalResource, Future<Path>> mapEntry = i.next();
            LocalResourceStatus stat = (LocalResourceStatus)this.recordFactory.newRecordInstance(LocalResourceStatus.class);
            stat.setResource(mapEntry.getKey());
            Future<Path> fPath = mapEntry.getValue();
            if (fPath.isDone()) {
                try {
                    Path localPath = fPath.get();
                    stat.setLocalPath(URL.fromPath((Path)localPath));
                    stat.setLocalSize(FileUtil.getDU((File)new File(localPath.getParent().toUri())));
                    stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
                }
                catch (ExecutionException e) {
                    stat.setStatus(ResourceStatusType.FETCH_FAILURE);
                    stat.setException(SerializedException.newInstance((Throwable)e.getCause()));
                }
                catch (CancellationException e) {
                    stat.setStatus(ResourceStatusType.FETCH_FAILURE);
                    stat.setException(SerializedException.newInstance((Throwable)e));
                }
                i.remove();
            } else {
                stat.setStatus(ResourceStatusType.FETCH_PENDING);
            }
            currentResources.add(stat);
        }
        LocalizerStatus status = (LocalizerStatus)this.recordFactory.newRecordInstance(LocalizerStatus.class);
        status.setLocalizerId(this.localizerId);
        status.addAllResources(currentResources);
        return status;
    }

    public static List<String> getJavaOpts(Configuration conf) {
        String opts = conf.get("yarn.nodemanager.container-localizer.java.opts", "-Xmx256m");
        return Arrays.asList(opts.split(" "));
    }

    public static void buildMainArgs(List<String> command, String user, String appId, String locId, InetSocketAddress nmAddr, List<String> localDirs, Configuration conf) {
        String logLevel = conf.get("yarn.nodemanager.container-localizer.log.level", "INFO");
        ContainerLocalizer.addLog4jSystemProperties(logLevel, command);
        command.add(ContainerLocalizer.class.getName());
        command.add(user);
        command.add(appId);
        command.add(locId);
        command.add(nmAddr.getHostName());
        command.add(Integer.toString(nmAddr.getPort()));
        for (String dir : localDirs) {
            command.add(dir);
        }
    }

    private static void addLog4jSystemProperties(String logLevel, List<String> command) {
        command.add("-Dlog4j.configuration=container-log4j.properties");
        command.add("-Dyarn.app.container.log.dir=<LOG_DIR>");
        command.add("-Dyarn.app.container.log.filesize=0");
        command.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
        command.add("-Dhadoop.root.logfile=container-localizer-syslog");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] argv) throws Throwable {
        Thread.setDefaultUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)new YarnUncaughtExceptionHandler());
        int nRet = 0;
        try {
            String user = argv[0];
            String appId = argv[1];
            String locId = argv[2];
            InetSocketAddress nmAddr = new InetSocketAddress(argv[3], Integer.parseInt(argv[4]));
            String[] sLocaldirs = Arrays.copyOfRange(argv, 5, argv.length);
            ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length);
            for (String sLocaldir : sLocaldirs) {
                localDirs.add(new Path(sLocaldir));
            }
            String uid = UserGroupInformation.getCurrentUser().getShortUserName();
            if (!user.equals(uid)) {
                LOG.warn("Localization running as " + uid + " not " + user);
            }
            ContainerLocalizer localizer = new ContainerLocalizer(FileContext.getLocalFSFileContext(), user, appId, locId, localDirs, RecordFactoryProvider.getRecordFactory(null));
            localizer.runLocalization(nmAddr);
            return;
        }
        catch (Throwable e) {
            e.printStackTrace(System.out);
            LOG.error("Exception in main:", e);
            nRet = -1;
        }
        finally {
            System.exit(nRet);
        }
    }

    private static void initDirs(Configuration conf, String user, String appId, FileContext lfs, List<Path> localDirs) throws IOException {
        if (null == localDirs || 0 == localDirs.size()) {
            throw new IOException("Cannot initialize without local dirs");
        }
        String[] appsFileCacheDirs = new String[localDirs.size()];
        String[] usersFileCacheDirs = new String[localDirs.size()];
        int n = localDirs.size();
        for (int i = 0; i < n; ++i) {
            Path base = lfs.makeQualified(new Path(new Path(localDirs.get(i), USERCACHE), user));
            Path userFileCacheDir = new Path(base, FILECACHE);
            usersFileCacheDirs[i] = userFileCacheDir.toString();
            ContainerLocalizer.createDir(lfs, userFileCacheDir, FILECACHE_PERMS);
            Path appBase = new Path(base, new Path(APPCACHE, appId));
            Path appFileCacheDir = new Path(appBase, FILECACHE);
            appsFileCacheDirs[i] = appFileCacheDir.toString();
            ContainerLocalizer.createDir(lfs, appFileCacheDir, FILECACHE_PERMS);
        }
        conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
        conf.setStrings(String.format(USERCACHE_CTXT_FMT, user), usersFileCacheDirs);
    }

    private static void createDir(FileContext lfs, Path dirPath, FsPermission perms) throws IOException {
        lfs.mkdir(dirPath, perms, false);
        if (!perms.equals((Object)perms.applyUMask(lfs.getUMask()))) {
            lfs.setPermission(dirPath, perms);
        }
    }

    private void destroyShellProcesses(Set<Shell> shells) {
        for (Shell shell : shells) {
            if (!this.localizingThreads.contains(shell.getWaitingThread())) continue;
            shell.getProcess().destroy();
        }
    }

    class FSDownloadWrapper
    extends FSDownload {
        FSDownloadWrapper(FileContext files, UserGroupInformation ugi, Configuration conf, Path destDirPath, LocalResource resource) {
            super(files, ugi, conf, destDirPath, resource);
        }

        public Path call() throws Exception {
            Thread currentThread = Thread.currentThread();
            ContainerLocalizer.this.localizingThreads.add(currentThread);
            try {
                Path path = this.doDownloadCall();
                return path;
            }
            finally {
                ContainerLocalizer.this.localizingThreads.remove(currentThread);
            }
        }

        Path doDownloadCall() throws Exception {
            return super.call();
        }
    }
}

