/*
 * Decompiled with CFR 0.152.
 */
package org.apache.falcon.entity.store;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.EntityAlreadyExistsException;
import org.apache.falcon.entity.store.StoreAccessException;
import org.apache.falcon.entity.v0.AccessControlList;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.update.UpdateHelper;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConfigurationStore
implements FalconService {
    private static final EntityType[] ENTITY_LOAD_ORDER = new EntityType[]{EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, EntityType.DATASOURCE};
    public static final EntityType[] ENTITY_DELETE_ORDER = new EntityType[]{EntityType.PROCESS, EntityType.FEED, EntityType.CLUSTER};
    private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class);
    private static final Logger AUDIT = LoggerFactory.getLogger((String)"AUDIT");
    private static final String UTF_8 = "UTF-8";
    private static final String LOAD_ENTITIES_THREADS = "config.store.num.threads.load.entities";
    private static final String TIMEOUT_MINS_LOAD_ENTITIES = "config.store.start.timeout.minutes";
    private int numThreads;
    private int restoreTimeOutInMins;
    private final boolean shouldPersist;
    private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
    private Set<ConfigurationChangeListener> listeners = new LinkedHashSet<ConfigurationChangeListener>();
    private ThreadLocal<Entity> updatesInProgress = new ThreadLocal();
    private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary = new HashMap<EntityType, ConcurrentHashMap<String, Entity>>();
    private static final Entity NULL = new Entity(){

        @Override
        public String getName() {
            return "NULL";
        }

        @Override
        public String getTags() {
            return null;
        }

        @Override
        public AccessControlList getACL() {
            return null;
        }
    };
    private static final ConfigurationStore STORE = new ConfigurationStore();
    private FileSystem fs;
    private Path storePath;

    public static ConfigurationStore get() {
        return STORE;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public Path getStorePath() {
        return this.storePath;
    }

    private ConfigurationStore() {
        for (EntityType type : EntityType.values()) {
            this.dictionary.put(type, new ConcurrentHashMap());
        }
        this.shouldPersist = Boolean.parseBoolean(StartupProperties.get().getProperty("config.store.persist", "true"));
        if (this.shouldPersist) {
            String uri = StartupProperties.get().getProperty("config.store.uri");
            this.storePath = new Path(uri);
            this.fs = this.initializeFileSystem();
        }
    }

    private FileSystem initializeFileSystem() {
        try {
            FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(this.storePath.toUri());
            if (!fileSystem.exists(this.storePath)) {
                LOG.info("Creating configuration store directory: {}", (Object)this.storePath);
                HadoopClientFactory.mkdirs(fileSystem, this.storePath, STORE_PERMISSION);
            }
            return fileSystem;
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to bring up config store for path: " + this.storePath, e);
        }
    }

    @Override
    public void init() throws FalconException {
        try {
            this.numThreads = Integer.parseInt(StartupProperties.get().getProperty(LOAD_ENTITIES_THREADS, "100"));
            LOG.info("Number of threads used to restore entities: {}", (Object)this.restoreTimeOutInMins);
        }
        catch (NumberFormatException nfe) {
            throw new FalconException("Invalid value specified for start up property \"config.store.num.threads.load.entities\".Please provide an integer value");
        }
        try {
            this.restoreTimeOutInMins = Integer.parseInt(StartupProperties.get().getProperty(TIMEOUT_MINS_LOAD_ENTITIES, "30"));
            LOG.info("TimeOut to load Entities is taken as {} mins", (Object)this.restoreTimeOutInMins);
        }
        catch (NumberFormatException nfe) {
            throw new FalconException("Invalid value specified for start up property \"config.store.start.timeout.minutes\".Please provide an integer value");
        }
        String listenerClassNames = StartupProperties.get().getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph");
        for (String string : listenerClassNames.split(",")) {
            String string2 = string.trim();
            if (string2.isEmpty()) continue;
            ConfigurationChangeListener listener = (ConfigurationChangeListener)ReflectionUtils.getInstanceByClassName(string2);
            this.registerListener(listener);
        }
        if (this.shouldPersist) {
            for (EntityType entityType : ENTITY_LOAD_ORDER) {
                this.loadEntity(entityType);
            }
        }
    }

    private void loadEntity(final EntityType type) throws FalconException {
        try {
            final ConcurrentHashMap<String, Entity> entityMap = this.dictionary.get((Object)type);
            FileStatus[] files = this.fs.globStatus(new Path(this.storePath, type.name() + "/" + "*"));
            if (files != null) {
                ExecutorService service = Executors.newFixedThreadPool(this.numThreads);
                for (final FileStatus file : files) {
                    service.execute(new Runnable(){

                        @Override
                        public void run() {
                            try {
                                String fileName = file.getPath().getName();
                                String encodedEntityName = fileName.substring(0, fileName.length() - 4);
                                String entityName = URLDecoder.decode(encodedEntityName, ConfigurationStore.UTF_8);
                                Entity entity = ConfigurationStore.this.restore(type, entityName);
                                LOG.info("Restored configuration {}/{}", (Object)type, (Object)entityName);
                                entityMap.put(entityName, entity);
                            }
                            catch (IOException | FalconException e) {
                                LOG.error("Unable to restore entity of", (Object)file);
                            }
                        }
                    });
                }
                service.shutdown();
                if (service.awaitTermination(this.restoreTimeOutInMins, TimeUnit.MINUTES)) {
                    LOG.info("Restored Configurations for entity type: {} ", (Object)type.name());
                } else {
                    LOG.warn("Timed out while waiting for all threads to finish while restoring entities for type: {}", (Object)type.name());
                }
                if (entityMap.size() != files.length) {
                    throw new FalconException("Unable to restore configurations for entity type " + type.name());
                }
                for (Entity entity : entityMap.values()) {
                    this.onReload(entity);
                }
            }
        }
        catch (IOException e) {
            throw new FalconException("Unable to restore configurations", e);
        }
        catch (InterruptedException e) {
            throw new FalconException("Failed to restore configurations in 10 minutes for entity type " + type.name());
        }
    }

    public void registerListener(ConfigurationChangeListener listener) {
        this.listeners.add(listener);
    }

    public void unregisterListener(ConfigurationChangeListener listener) {
        this.listeners.remove(listener);
    }

    public synchronized void publish(EntityType type, Entity entity) throws FalconException {
        try {
            if (this.get(type, entity.getName()) != null) {
                throw new EntityAlreadyExistsException(entity.toShortString() + " already registered with configuration store. " + "Can't be submitted again. Try removing before submitting.");
            }
            this.persist(type, entity);
            this.onAdd(entity);
            this.dictionary.get((Object)type).put(entity.getName(), entity);
        }
        catch (IOException e) {
            throw new StoreAccessException(e);
        }
        AUDIT.info((Object)((Object)type) + "/" + entity.getName() + " is published into config store");
    }

    private synchronized void updateInternal(EntityType type, Entity entity) throws FalconException {
        try {
            if (this.get(type, entity.getName()) == null) {
                throw new FalconException(entity.toShortString() + " doesn't exist");
            }
            ConcurrentHashMap<String, Entity> entityMap = this.dictionary.get((Object)type);
            Entity oldEntity = entityMap.get(entity.getName());
            this.updateVersion(oldEntity, entity);
            this.persist(type, entity);
            this.onChange(oldEntity, entity);
            entityMap.put(entity.getName(), entity);
        }
        catch (IOException e) {
            throw new StoreAccessException(e);
        }
        AUDIT.info((Object)((Object)type) + "/" + entity.getName() + " is replaced into config store");
    }

    private void updateVersion(Entity oldentity, Entity newEntity) throws FalconException {
        if (oldentity.getEntityType().equals((Object)EntityType.CLUSTER)) {
            if (UpdateHelper.isClusterEntityUpdated((Cluster)oldentity, (Cluster)newEntity)) {
                EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity) + 1);
            }
        } else if (!EntityUtil.equals(oldentity, newEntity)) {
            EntityUtil.setVersion(newEntity, EntityUtil.getVersion(oldentity));
        }
    }

    public synchronized void update(EntityType type, Entity entity) throws FalconException {
        if (this.updatesInProgress.get() == entity) {
            try {
                this.archive(type, entity.getName());
            }
            catch (IOException e) {
                throw new StoreAccessException(e);
            }
        } else {
            throw new FalconException(entity.toShortString() + " is not initialized for update");
        }
        this.updateInternal(type, entity);
    }

    private void onAdd(Entity entity) throws FalconException {
        for (ConfigurationChangeListener listener : this.listeners) {
            listener.onAdd(entity);
        }
    }

    private void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
        for (ConfigurationChangeListener listener : this.listeners) {
            listener.onChange(oldEntity, newEntity);
        }
    }

    private void onReload(Entity entity) throws FalconException {
        for (ConfigurationChangeListener listener : this.listeners) {
            listener.onReload(entity);
        }
    }

    public synchronized void initiateUpdate(Entity entity) throws FalconException {
        if (this.get(entity.getEntityType(), entity.getName()) == null || this.updatesInProgress.get() != null) {
            throw new FalconException("An update for " + entity.toShortString() + " is already in progress or doesn't exist");
        }
        this.updatesInProgress.set(entity);
    }

    public <T extends Entity> T get(EntityType type, String name) throws FalconException {
        ConcurrentHashMap<String, Entity> entityMap = this.dictionary.get((Object)type);
        if (entityMap.containsKey(name)) {
            if (this.updatesInProgress.get() != null && this.updatesInProgress.get().getEntityType() == type && this.updatesInProgress.get().getName().equals(name)) {
                return (T)this.updatesInProgress.get();
            }
            Entity entity = entityMap.get(name);
            if (entity == NULL && this.shouldPersist) {
                try {
                    entity = this.restore(type, name);
                }
                catch (IOException e) {
                    throw new StoreAccessException(e);
                }
                LOG.info("Restored configuration {}/{}", (Object)type, (Object)name);
                entityMap.put(name, entity);
                return (T)entity;
            }
            return (T)entity;
        }
        return null;
    }

    public Collection<String> getEntities(EntityType type) {
        return Collections.unmodifiableCollection(this.dictionary.get((Object)type).keySet());
    }

    public synchronized boolean remove(EntityType type, String name) throws FalconException {
        Map entityMap = this.dictionary.get((Object)type);
        if (entityMap.containsKey(name)) {
            try {
                this.archive(type, name);
                Entity entity = (Entity)entityMap.get(name);
                this.onRemove(entity);
                entityMap.remove(name);
            }
            catch (IOException e) {
                throw new StoreAccessException(e);
            }
            AUDIT.info((Object)((Object)type) + " " + name + " is removed from config store");
            return true;
        }
        return false;
    }

    private void onRemove(Entity entity) throws FalconException {
        for (ConfigurationChangeListener listener : this.listeners) {
            listener.onRemove(entity);
        }
    }

    private void persist(EntityType type, Entity entity) throws IOException, FalconException {
        if (!this.shouldPersist) {
            return;
        }
        try (FSDataOutputStream out = this.fs.create(new Path(this.storePath, (Object)((Object)type) + "/" + URLEncoder.encode(entity.getName(), UTF_8) + ".xml"));){
            type.getMarshaller().marshal((Object)entity, (OutputStream)out);
            LOG.info("Persisted configuration {}/{}", (Object)type, (Object)entity.getName());
        }
    }

    private void archive(EntityType type, String name) throws IOException {
        if (!this.shouldPersist) {
            return;
        }
        Path archivePath = new Path(this.storePath, "archive/" + (Object)((Object)type));
        HadoopClientFactory.mkdirs(this.fs, archivePath, STORE_PERMISSION);
        this.fs.rename(new Path(this.storePath, (Object)((Object)type) + "/" + URLEncoder.encode(name, UTF_8) + ".xml"), new Path(archivePath, URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
        LOG.info("Archived configuration {}/{}", (Object)type, (Object)name);
    }

    private synchronized <T extends Entity> T restore(EntityType type, String name) throws IOException, FalconException {
        try (FSDataInputStream in = this.fs.open(new Path(this.storePath, (Object)((Object)type) + "/" + URLEncoder.encode(name, UTF_8) + ".xml"));){
            Entity entity = (Entity)type.getUnmarshaller().unmarshal((InputStream)in);
            return (T)entity;
        }
    }

    public void cleanupUpdateInit() {
        this.updatesInProgress.set(null);
    }

    @Override
    public String getName() {
        return this.getClass().getName();
    }

    @Override
    public void destroy() {
    }
}

