/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.shuffle;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.ShuffleIndexInformation;
import org.apache.spark.network.shuffle.ShuffleIndexRecord;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LevelDBProvider;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.com.fasterxml.jackson.annotation.JsonCreator;
import org.spark_project.com.fasterxml.jackson.annotation.JsonProperty;
import org.spark_project.com.fasterxml.jackson.databind.ObjectMapper;
import org.spark_project.guava.annotations.VisibleForTesting;
import org.spark_project.guava.base.Objects;
import org.spark_project.guava.cache.CacheBuilder;
import org.spark_project.guava.cache.CacheLoader;
import org.spark_project.guava.cache.LoadingCache;
import org.spark_project.guava.cache.Weigher;
import org.spark_project.guava.collect.Maps;

public class ExternalShuffleBlockResolver {
    private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
    private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
    private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}");
    @VisibleForTesting
    final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
    private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
    private final Executor directoryCleaner;
    private final TransportConf conf;
    @VisibleForTesting
    final File registeredExecutorFile;
    @VisibleForTesting
    final DB db;
    private final List<String> knownManagers = Arrays.asList("org.apache.spark.shuffle.sort.SortShuffleManager", "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");

    public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException {
        this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
    }

    @VisibleForTesting
    ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile, Executor directoryCleaner) throws IOException {
        this.conf = conf;
        this.registeredExecutorFile = registeredExecutorFile;
        String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
        CacheLoader<File, ShuffleIndexInformation> indexCacheLoader = new CacheLoader<File, ShuffleIndexInformation>(){

            @Override
            public ShuffleIndexInformation load(File file) throws IOException {
                return new ShuffleIndexInformation(file);
            }
        };
        this.shuffleIndexCache = CacheBuilder.newBuilder().maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize)).weigher(new Weigher<File, ShuffleIndexInformation>(){

            @Override
            public int weigh(File file, ShuffleIndexInformation indexInfo) {
                return indexInfo.getSize();
            }
        }).build(indexCacheLoader);
        this.db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
        this.executors = this.db != null ? ExternalShuffleBlockResolver.reloadRegisteredExecutors(this.db) : Maps.newConcurrentMap();
        this.directoryCleaner = directoryCleaner;
    }

    public int getRegisteredExecutorsSize() {
        return this.executors.size();
    }

    public void registerExecutor(String appId, String execId, ExecutorShuffleInfo executorInfo) {
        AppExecId fullId = new AppExecId(appId, execId);
        logger.info("Registered executor {} with {}", (Object)fullId, (Object)executorInfo);
        if (!this.knownManagers.contains(executorInfo.shuffleManager)) {
            throw new UnsupportedOperationException("Unsupported shuffle manager of executor: " + executorInfo);
        }
        try {
            if (this.db != null) {
                byte[] key = ExternalShuffleBlockResolver.dbAppExecKey(fullId);
                byte[] value2 = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
                this.db.put(key, value2);
            }
        }
        catch (Exception e) {
            logger.error("Error saving registered executors", (Throwable)e);
        }
        this.executors.put(fullId, executorInfo);
    }

    public ManagedBuffer getBlockData(String appId, String execId, int shuffleId, int mapId, int reduceId) {
        ExecutorShuffleInfo executor = (ExecutorShuffleInfo)this.executors.get(new AppExecId(appId, execId));
        if (executor == null) {
            throw new RuntimeException(String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
        }
        return this.getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
    }

    public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
        logger.info("Application {} removed, cleanupLocalDirs = {}", (Object)appId, (Object)cleanupLocalDirs);
        Iterator it = this.executors.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry2 = it.next();
            AppExecId fullId = (AppExecId)entry2.getKey();
            ExecutorShuffleInfo executor = (ExecutorShuffleInfo)entry2.getValue();
            if (!appId.equals(fullId.appId)) continue;
            it.remove();
            if (this.db != null) {
                try {
                    this.db.delete(ExternalShuffleBlockResolver.dbAppExecKey(fullId));
                }
                catch (IOException e) {
                    logger.error("Error deleting {} from executor state db", (Object)appId, (Object)e);
                }
            }
            if (!cleanupLocalDirs) continue;
            logger.info("Cleaning up executor {}'s {} local dirs", (Object)fullId, (Object)executor.localDirs.length);
            this.directoryCleaner.execute(() -> this.deleteExecutorDirs(executor.localDirs));
        }
    }

    public void executorRemoved(String executorId, String appId) {
        logger.info("Clean up non-shuffle files associated with the finished executor {}", (Object)executorId);
        AppExecId fullId = new AppExecId(appId, executorId);
        ExecutorShuffleInfo executor = (ExecutorShuffleInfo)this.executors.get(fullId);
        if (executor == null) {
            logger.info("Executor is not registered (appId={}, execId={})", (Object)appId, (Object)executorId);
        } else {
            logger.info("Cleaning up non-shuffle files in executor {}'s {} local dirs", (Object)fullId, (Object)executor.localDirs.length);
            this.directoryCleaner.execute(() -> this.deleteNonShuffleFiles(executor.localDirs));
        }
    }

    private void deleteExecutorDirs(String[] dirs2) {
        for (String localDir : dirs2) {
            try {
                JavaUtils.deleteRecursively(new File(localDir));
                logger.debug("Successfully cleaned up directory: {}", (Object)localDir);
            }
            catch (Exception e) {
                logger.error("Failed to delete directory: " + localDir, (Throwable)e);
            }
        }
    }

    private void deleteNonShuffleFiles(String[] dirs2) {
        FilenameFilter filter2 = new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return !name.endsWith(".index") && !name.endsWith(".data");
            }
        };
        for (String localDir : dirs2) {
            try {
                JavaUtils.deleteRecursively(new File(localDir), filter2);
                logger.debug("Successfully cleaned up non-shuffle files in directory: {}", (Object)localDir);
            }
            catch (Exception e) {
                logger.error("Failed to delete non-shuffle files in directory: " + localDir, (Throwable)e);
            }
        }
    }

    private ManagedBuffer getSortBasedShuffleBlockData(ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
        File indexFile = ExternalShuffleBlockResolver.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.index");
        try {
            ShuffleIndexInformation shuffleIndexInformation = this.shuffleIndexCache.get(indexFile);
            ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
            return new FileSegmentManagedBuffer(this.conf, ExternalShuffleBlockResolver.getFile(executor.localDirs, executor.subDirsPerLocalDir, "shuffle_" + shuffleId + "_" + mapId + "_0.data"), shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength());
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Failed to open file: " + indexFile, e);
        }
    }

    @VisibleForTesting
    static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
        int hash2 = JavaUtils.nonNegativeHash(filename);
        String localDir = localDirs[hash2 % localDirs.length];
        int subDirId = hash2 / localDirs.length % subDirsPerLocalDir;
        return new File(ExternalShuffleBlockResolver.createNormalizedInternedPathname(localDir, String.format("%02x", subDirId), filename));
    }

    void close() {
        if (this.db != null) {
            try {
                this.db.close();
            }
            catch (IOException e) {
                logger.error("Exception closing leveldb with registered executors", (Throwable)e);
            }
        }
    }

    @VisibleForTesting
    static String createNormalizedInternedPathname(String dir1, String dir2, String fname) {
        String pathname = dir1 + File.separator + dir2 + File.separator + fname;
        Matcher m = MULTIPLE_SEPARATORS.matcher(pathname);
        if ((pathname = m.replaceAll("/")).length() > 1 && pathname.endsWith("/")) {
            pathname = pathname.substring(0, pathname.length() - 1);
        }
        return pathname.intern();
    }

    private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
        String appExecJson = mapper.writeValueAsString(appExecId);
        String key = "AppExecShuffleInfo;" + appExecJson;
        return key.getBytes(StandardCharsets.UTF_8);
    }

    private static AppExecId parseDbAppExecKey(String s) throws IOException {
        if (!s.startsWith(APP_KEY_PREFIX)) {
            throw new IllegalArgumentException("expected a string starting with AppExecShuffleInfo");
        }
        String json = s.substring(APP_KEY_PREFIX.length() + 1);
        AppExecId parsed = mapper.readValue(json, AppExecId.class);
        return parsed;
    }

    @VisibleForTesting
    static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db) throws IOException {
        ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
        if (db != null) {
            Map.Entry e;
            String key;
            DBIterator itr = db.iterator();
            itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
            while (itr.hasNext() && (key = new String((byte[])(e = (Map.Entry)itr.next()).getKey(), StandardCharsets.UTF_8)).startsWith(APP_KEY_PREFIX)) {
                AppExecId id = ExternalShuffleBlockResolver.parseDbAppExecKey(key);
                logger.info("Reloading registered executors: " + id.toString());
                ExecutorShuffleInfo shuffleInfo = mapper.readValue((byte[])e.getValue(), ExecutorShuffleInfo.class);
                registeredExecutors.put(id, shuffleInfo);
            }
        }
        return registeredExecutors;
    }

    public static class AppExecId {
        public final String appId;
        public final String execId;

        @JsonCreator
        public AppExecId(@JsonProperty(value="appId") String appId, @JsonProperty(value="execId") String execId) {
            this.appId = appId;
            this.execId = execId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AppExecId appExecId = (AppExecId)o;
            return Objects.equal(this.appId, appExecId.appId) && Objects.equal(this.execId, appExecId.execId);
        }

        public int hashCode() {
            return Objects.hashCode(this.appId, this.execId);
        }

        public String toString() {
            return Objects.toStringHelper(this).add("appId", this.appId).add("execId", this.execId).toString();
        }
    }
}

