/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.nodelabels;

import com.google.common.collect.Sets;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;

public class FileSystemNodeLabelsStore
extends NodeLabelsStore {
    protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
    protected static final String DEFAULT_DIR_NAME = "node-labels";
    protected static final String MIRROR_FILENAME = "nodelabel.mirror";
    protected static final String EDITLOG_FILENAME = "nodelabel.editlog";
    Path fsWorkingPath;
    FileSystem fs;
    private FSDataOutputStream editlogOs;
    private Path editLogPath;

    private String getDefaultFSNodeLabelsRootDir() throws IOException {
        return "file:///tmp/hadoop-yarn-" + UserGroupInformation.getCurrentUser().getShortUserName() + "/" + DEFAULT_DIR_NAME;
    }

    @Override
    public void init(Configuration conf) throws Exception {
        this.fsWorkingPath = new Path(conf.get("yarn.node-labels.fs-store.root-dir", this.getDefaultFSNodeLabelsRootDir()));
        this.setFileSystem(conf);
        this.fs.mkdirs(this.fsWorkingPath);
    }

    @Override
    public void close() throws IOException {
        IOUtils.cleanup(LOG, this.fs, this.editlogOs);
    }

    void setFileSystem(Configuration conf) throws IOException {
        Configuration confCopy = new Configuration(conf);
        this.fs = this.fsWorkingPath.getFileSystem(confCopy);
        if (this.fs.getScheme().equals("file")) {
            this.fs = ((LocalFileSystem)this.fs).getRaw();
        }
    }

    private void ensureAppendEditlogFile() throws IOException {
        this.editlogOs = this.fs.append(this.editLogPath);
    }

    private void ensureCloseEditlogFile() throws IOException {
        this.editlogOs.close();
    }

    @Override
    public void updateNodeToLabelsMappings(Map<NodeId, Set<String>> nodeToLabels) throws IOException {
        try {
            this.ensureAppendEditlogFile();
            this.editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal());
            ((ReplaceLabelsOnNodeRequestPBImpl)ReplaceLabelsOnNodeRequest.newInstance(nodeToLabels)).getProto().writeDelimitedTo(this.editlogOs);
        }
        finally {
            this.ensureCloseEditlogFile();
        }
    }

    @Override
    public void storeNewClusterNodeLabels(List<NodeLabel> labels) throws IOException {
        try {
            this.ensureAppendEditlogFile();
            this.editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal());
            ((AddToClusterNodeLabelsRequestPBImpl)AddToClusterNodeLabelsRequest.newInstance(labels)).getProto().writeDelimitedTo(this.editlogOs);
        }
        finally {
            this.ensureCloseEditlogFile();
        }
    }

    @Override
    public void removeClusterNodeLabels(Collection<String> labels) throws IOException {
        try {
            this.ensureAppendEditlogFile();
            this.editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal());
            ((RemoveFromClusterNodeLabelsRequestPBImpl)RemoveFromClusterNodeLabelsRequest.newInstance(Sets.newHashSet(labels.iterator()))).getProto().writeDelimitedTo(this.editlogOs);
        }
        finally {
            this.ensureCloseEditlogFile();
        }
    }

    protected void loadFromMirror(Path newMirrorPath, Path oldMirrorPath) throws IOException {
        FSDataInputStream is = null;
        try {
            is = this.fs.open(newMirrorPath);
        }
        catch (FileNotFoundException e) {
            try {
                is = this.fs.open(oldMirrorPath);
            }
            catch (FileNotFoundException fileNotFoundException) {
                // empty catch block
            }
        }
        if (null != is) {
            List<NodeLabel> labels = new AddToClusterNodeLabelsRequestPBImpl(YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)).getNodeLabels();
            this.mgr.addToCluserNodeLabels(labels);
            if (this.mgr.isCentralizedConfiguration()) {
                Map<NodeId, Set<String>> nodeToLabels = new ReplaceLabelsOnNodeRequestPBImpl(YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)).getNodeToLabels();
                this.mgr.replaceLabelsOnNode(nodeToLabels);
            }
            is.close();
        }
    }

    @Override
    public void recover() throws YarnException, IOException {
        FSDataInputStream is;
        Path mirrorPath = new Path(this.fsWorkingPath, MIRROR_FILENAME);
        Path oldMirrorPath = new Path(this.fsWorkingPath, "nodelabel.mirror.old");
        this.loadFromMirror(mirrorPath, oldMirrorPath);
        this.editLogPath = new Path(this.fsWorkingPath, EDITLOG_FILENAME);
        try {
            is = this.fs.open(this.editLogPath);
        }
        catch (FileNotFoundException e) {
            is = null;
        }
        if (null != is) {
            try {
                while (true) {
                    SerializedLogType type = SerializedLogType.values()[is.readInt()];
                    switch (type) {
                        case ADD_LABELS: {
                            List<Object> labels = new AddToClusterNodeLabelsRequestPBImpl(YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)).getNodeLabels();
                            this.mgr.addToCluserNodeLabels(labels);
                            break;
                        }
                        case REMOVE_LABELS: {
                            List<Object> labels = YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto.parseDelimitedFrom(is).getNodeLabelsList();
                            this.mgr.removeFromClusterNodeLabels(labels);
                            break;
                        }
                        case NODE_TO_LABELS: {
                            Map<NodeId, Set<String>> map = new ReplaceLabelsOnNodeRequestPBImpl(YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is)).getNodeToLabels();
                            if (!this.mgr.isCentralizedConfiguration()) break;
                            this.mgr.replaceLabelsOnNode(map);
                            break;
                        }
                    }
                }
            }
            catch (EOFException e) {
                is.close();
            }
        }
        Path writingMirrorPath = new Path(this.fsWorkingPath, "nodelabel.mirror.writing");
        FSDataOutputStream os = this.fs.create(writingMirrorPath, true);
        ((AddToClusterNodeLabelsRequestPBImpl)AddToClusterNodeLabelsRequestPBImpl.newInstance(this.mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(os);
        ((ReplaceLabelsOnNodeRequestPBImpl)ReplaceLabelsOnNodeRequest.newInstance(this.mgr.getNodeLabels())).getProto().writeDelimitedTo(os);
        os.close();
        if (this.fs.exists(mirrorPath)) {
            this.fs.delete(oldMirrorPath, false);
            this.fs.rename(mirrorPath, oldMirrorPath);
        }
        this.fs.rename(writingMirrorPath, mirrorPath);
        this.fs.delete(writingMirrorPath, false);
        this.fs.delete(oldMirrorPath, false);
        this.editlogOs = this.fs.create(this.editLogPath, true);
        this.editlogOs.close();
        LOG.info("Finished write mirror at:" + mirrorPath.toString());
        LOG.info("Finished create editlog file at:" + this.editLogPath.toString());
    }

    protected static enum SerializedLogType {
        ADD_LABELS,
        NODE_TO_LABELS,
        REMOVE_LABELS;

    }
}

