/*
 * Decompiled with CFR 0.152.
 */
package org.apache.falcon.metadata;

import com.thinkaurelius.titan.core.EdgeLabel;
import com.thinkaurelius.titan.core.Order;
import com.thinkaurelius.titan.core.PropertyKey;
import com.thinkaurelius.titan.core.RelationType;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.thinkaurelius.titan.graphdb.blueprints.TitanBlueprintsGraph;
import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphFactory;
import com.tinkerpop.blueprints.KeyIndexableGraph;
import com.tinkerpop.blueprints.TransactionalGraph;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.util.TransactionRetryHelper;
import com.tinkerpop.blueprints.util.TransactionWork;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.metadata.EntityRelationshipGraphBuilder;
import org.apache.falcon.metadata.InstanceRelationshipGraphBuilder;
import org.apache.falcon.metadata.RelationshipLabel;
import org.apache.falcon.metadata.RelationshipProperty;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataMappingService
implements FalconService,
ConfigurationChangeListener,
WorkflowExecutionListener {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataMappingService.class);
    public static final String SERVICE_NAME = MetadataMappingService.class.getSimpleName();
    private static final String FALCON_PREFIX = "falcon.graph.";
    public static final String PROPERTY_KEY_STORAGE_BACKEND = "storage.backend";
    public static final String STORAGE_BACKEND_HBASE = "hbase";
    public static final String STORAGE_BACKEND_BDB = "berkeleyje";
    public static final String PROPERTY_KEY_STORAGE_HOSTNAME = "storage.hostname";
    public static final String PROPERTY_KEY_STORAGE_TABLE = "storage.hbase.table";
    public static final Set<String> PROPERTY_KEYS_HBASE = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList("storage.hostname", "storage.hbase.table")));
    public static final String PROPERTY_KEY_STORAGE_DIRECTORY = "storage.directory";
    public static final String PROPERTY_KEY_SERIALIZE_PATH = "serialize.path";
    public static final Set<String> PROPERTY_KEYS_BDB = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList("storage.directory", "serialize.path")));
    private Graph graph;
    private Set<String> vertexIndexedKeys;
    private Set<String> edgeIndexedKeys;
    private EntityRelationshipGraphBuilder entityGraphBuilder;
    private InstanceRelationshipGraphBuilder instanceGraphBuilder;
    private int transactionRetries;
    private long transactionRetryDelayInMillis;

    @Override
    public String getName() {
        return SERVICE_NAME;
    }

    @Override
    public void init() throws FalconException {
        this.graph = MetadataMappingService.initializeGraphDB();
        this.createIndicesForVertexKeys();
        LOG.info("Initialized graph db: {}", (Object)this.graph);
        this.vertexIndexedKeys = this.getIndexableGraph().getIndexedKeys(Vertex.class);
        LOG.info("Init vertex property keys: {}", this.vertexIndexedKeys);
        this.edgeIndexedKeys = this.getIndexableGraph().getIndexedKeys(Edge.class);
        LOG.info("Init edge property keys: {}", this.edgeIndexedKeys);
        boolean preserveHistory = Boolean.valueOf(StartupProperties.get().getProperty("falcon.graph.preserve.history", "false"));
        this.entityGraphBuilder = new EntityRelationshipGraphBuilder(this.graph, preserveHistory);
        this.instanceGraphBuilder = new InstanceRelationshipGraphBuilder(this.graph, preserveHistory);
        ConfigurationStore.get().registerListener(this);
        ((WorkflowJobEndNotificationService)Services.get().getService(WorkflowJobEndNotificationService.SERVICE_NAME)).registerListener(this);
        try {
            this.transactionRetries = Integer.parseInt(StartupProperties.get().getProperty("falcon.graph.transaction.retry.count", "3"));
            this.transactionRetryDelayInMillis = Long.parseLong(StartupProperties.get().getProperty("falcon.graph.transaction.retry.delay", "5"));
        }
        catch (NumberFormatException e) {
            throw new FalconException("Invalid values for graph transaction retry delay/count " + e);
        }
    }

    public static Graph initializeGraphDB() {
        LOG.info("Initializing graph db");
        Configuration graphConfig = MetadataMappingService.getConfiguration();
        MetadataMappingService.validateConfiguration(graphConfig);
        return GraphFactory.open((Configuration)graphConfig);
    }

    private static void validateConfiguration(Configuration graphConfig) {
        String backend;
        if (!graphConfig.containsKey(PROPERTY_KEY_STORAGE_BACKEND)) {
            throw new FalconRuntimException("Titan GraphDB storage backend is not configured. You need to choose either hbase or berkeleydb.Please check Configuration twiki or the section Graph Database Properties in startup.properties on how to configure Titan GraphDB backend.");
        }
        switch (backend = graphConfig.getString(PROPERTY_KEY_STORAGE_BACKEND)) {
            case "berkeleyje": {
                for (String key : PROPERTY_KEYS_BDB) {
                    if (graphConfig.containsKey(key)) continue;
                    throw new FalconRuntimException("Required parameter falcon.graph." + key + " not found in startup.properties.Please check Configuration twiki or the section Graph Database Properties in startup.properties on how to configure Berkeley DB storage backend.");
                }
                break;
            }
            case "hbase": {
                for (String key : PROPERTY_KEYS_HBASE) {
                    if (graphConfig.containsKey(key)) continue;
                    throw new FalconRuntimException("Required parameter falcon.graph." + key + " not found in startup.properties.Please check Configuration twiki or the section Graph Database Properties in startup.properties on how to configure HBase storage backend.");
                }
                break;
            }
            default: {
                throw new FalconRuntimException("Invalid graph storage backend: " + backend + ". You need to choose either hbase or berkeleydb.Please check Configuration twiki or the section Graph Database Properties in startup.properties on how to configure Titan GraphDB backend.");
            }
        }
    }

    public static Configuration getConfiguration() {
        BaseConfiguration graphConfig = new BaseConfiguration();
        Properties configProperties = StartupProperties.get();
        for (Map.Entry<Object, Object> entry : configProperties.entrySet()) {
            String name = (String)entry.getKey();
            if (!name.startsWith(FALCON_PREFIX)) continue;
            String value = (String)entry.getValue();
            name = name.substring(FALCON_PREFIX.length());
            graphConfig.setProperty(name, (Object)value);
        }
        return graphConfig;
    }

    protected void createIndicesForVertexKeys() {
        if (!((KeyIndexableGraph)this.graph).getIndexedKeys(Vertex.class).isEmpty()) {
            LOG.info("Indexes already exist for graph");
            return;
        }
        LOG.info("Indexes does not exist, Creating indexes for graph");
        this.makeNameKeyIndex();
        this.makeKeyIndex(RelationshipProperty.TYPE.getName());
        this.makeKeyIndex(RelationshipProperty.TIMESTAMP.getName());
        this.makeKeyIndex(RelationshipProperty.VERSION.getName());
        this.makeInstanceIndex();
    }

    private void makeInstanceIndex() {
        TitanManagement titanManagement = this.getTitanGraph().getManagementSystem();
        PropertyKey statusKey = this.makePropertyKey(titanManagement, RelationshipProperty.STATUS.getName());
        PropertyKey nominalTimeKey = this.makePropertyKey(titanManagement, RelationshipProperty.NOMINAL_TIME.getName());
        EdgeLabel edgeLabel = titanManagement.makeEdgeLabel(RelationshipLabel.INSTANCE_ENTITY_EDGE.getName()).make();
        titanManagement.buildEdgeIndex(edgeLabel, "indexInstanceN", Direction.OUT, Order.DESC, new RelationType[]{nominalTimeKey});
        titanManagement.buildEdgeIndex(edgeLabel, "indexInstanceSN", Direction.OUT, Order.DESC, new RelationType[]{statusKey, nominalTimeKey});
        titanManagement.commit();
    }

    private void makeNameKeyIndex() {
        TitanManagement titanManagement = this.getTitanGraph().getManagementSystem();
        PropertyKey nameKey = this.makePropertyKey(titanManagement, RelationshipProperty.NAME.getName());
        titanManagement.buildIndex("indexByVertexName", Vertex.class).addKey(nameKey).buildCompositeIndex();
        titanManagement.buildIndex("indexByEdgeName", Edge.class).addKey(nameKey).buildCompositeIndex();
        titanManagement.commit();
    }

    private void makeKeyIndex(String key) {
        TitanManagement titanManagement = this.getTitanGraph().getManagementSystem();
        PropertyKey propertyKey = this.makePropertyKey(titanManagement, key);
        titanManagement.buildIndex("indexBy" + key, Vertex.class).addKey(propertyKey).buildCompositeIndex();
        titanManagement.commit();
    }

    private PropertyKey makePropertyKey(TitanManagement titanManagement, String key) {
        if (titanManagement.containsPropertyKey(key)) {
            return titanManagement.getPropertyKey(key);
        }
        return titanManagement.makePropertyKey(key).dataType(String.class).make();
    }

    public Graph getGraph() {
        return this.graph;
    }

    public KeyIndexableGraph getIndexableGraph() {
        return (KeyIndexableGraph)this.graph;
    }

    public TransactionalGraph getTransactionalGraph() {
        return (TransactionalGraph)this.graph;
    }

    public TitanBlueprintsGraph getTitanGraph() {
        return (TitanBlueprintsGraph)this.graph;
    }

    public Set<String> getVertexIndexedKeys() {
        return this.vertexIndexedKeys;
    }

    public Set<String> getEdgeIndexedKeys() {
        return this.edgeIndexedKeys;
    }

    @Override
    public void destroy() throws FalconException {
        ((WorkflowJobEndNotificationService)Services.get().getService(WorkflowJobEndNotificationService.SERVICE_NAME)).unregisterListener(this);
        LOG.info("Shutting down graph db");
        this.graph.shutdown();
    }

    @Override
    public void onAdd(final Entity entity) throws FalconException {
        EntityType entityType = entity.getEntityType();
        LOG.info("Adding lineage for entity: {}, type: {}", (Object)entity.getName(), (Object)entityType);
        try {
            new TransactionRetryHelper.Builder(this.getTransactionalGraph()).perform((TransactionWork)new TransactionWork<Void>(){

                public Void execute(TransactionalGraph transactionalGraph) throws Exception {
                    MetadataMappingService.this.entityGraphBuilder.addEntity(entity);
                    transactionalGraph.commit();
                    return null;
                }
            }).build().exponentialBackoff(this.transactionRetries, this.transactionRetryDelayInMillis);
        }
        catch (Exception e) {
            this.getTransactionalGraph().rollback();
            throw new FalconException(e);
        }
    }

    @Override
    public void onRemove(Entity entity) throws FalconException {
    }

    @Override
    public void onChange(final Entity oldEntity, final Entity newEntity) throws FalconException {
        EntityType entityType = newEntity.getEntityType();
        LOG.info("Updating lineage for entity: {}, type: {}", (Object)newEntity.getName(), (Object)entityType);
        try {
            new TransactionRetryHelper.Builder(this.getTransactionalGraph()).perform((TransactionWork)new TransactionWork<Void>(){

                public Void execute(TransactionalGraph transactionalGraph) throws Exception {
                    MetadataMappingService.this.entityGraphBuilder.updateEntity(oldEntity, newEntity);
                    transactionalGraph.commit();
                    return null;
                }
            }).build().exponentialBackoff(this.transactionRetries, this.transactionRetryDelayInMillis);
        }
        catch (Exception e) {
            this.getTransactionalGraph().rollback();
            throw new FalconException(e);
        }
    }

    @Override
    public void onReload(Entity entity) throws FalconException {
        this.onAdd(entity);
    }

    @Override
    public void onStart(WorkflowExecutionContext context) throws FalconException {
        LOG.info("onStart {}", (Object)context);
        this.onInstanceExecutionUpdate(context);
    }

    @Override
    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
        LOG.info("onSuccess {}", (Object)context);
        this.onInstanceExecutionUpdate(context);
    }

    @Override
    public void onFailure(WorkflowExecutionContext context) throws FalconException {
        LOG.info("onFailure {}", (Object)context);
        this.onInstanceExecutionUpdate(context);
    }

    @Override
    public void onSuspend(WorkflowExecutionContext context) throws FalconException {
        LOG.info("onSuspend {}", (Object)context);
        this.onInstanceExecutionUpdate(context);
    }

    @Override
    public void onWait(WorkflowExecutionContext context) throws FalconException {
        LOG.info("onWait {}", (Object)context);
        this.onInstanceExecutionUpdate(context);
    }

    private void onInstanceExecutionUpdate(final WorkflowExecutionContext context) throws FalconException {
        try {
            new TransactionRetryHelper.Builder(this.getTransactionalGraph()).perform((TransactionWork)new TransactionWork<Void>(){

                public Void execute(TransactionalGraph transactionalGraph) throws Exception {
                    MetadataMappingService.this.updateInstanceStatus(context);
                    transactionalGraph.commit();
                    return null;
                }
            }).build().exponentialBackoff(this.transactionRetries, this.transactionRetryDelayInMillis);
        }
        catch (Exception e) {
            this.getTransactionalGraph().rollback();
            throw new FalconException(e);
        }
    }

    private void updateInstanceStatus(WorkflowExecutionContext context) throws FalconException {
        if (context.getContextType() == WorkflowExecutionContext.Type.COORDINATOR_ACTION) {
            return;
        }
        WorkflowExecutionContext.EntityOperations entityOperation = context.getOperation();
        switch (entityOperation) {
            case GENERATE: {
                this.updateProcessInstance(context);
                break;
            }
            case REPLICATE: {
                this.updateReplicatedFeedInstance(context);
                break;
            }
            case DELETE: {
                this.updateEvictedFeedInstance(context);
                break;
            }
            case IMPORT: {
                this.updateImportedFeedInstance(context);
                break;
            }
            case EXPORT: {
                this.updateExportedFeedInstance(context);
                break;
            }
            default: {
                throw new IllegalArgumentException("Invalid EntityOperation - " + (Object)((Object)entityOperation));
            }
        }
    }

    private void updateProcessInstance(WorkflowExecutionContext context) throws FalconException {
        LOG.info("Updating process instance: {}", (Object)context.getNominalTimeAsISO8601());
        Vertex processInstance = this.instanceGraphBuilder.addProcessInstance(context);
        this.instanceGraphBuilder.addOutputFeedInstances(context, processInstance);
        this.instanceGraphBuilder.addInputFeedInstances(context, processInstance);
    }

    private void updateReplicatedFeedInstance(WorkflowExecutionContext context) throws FalconException {
        LOG.info("Updating replicated feed instance: {}", (Object)context.getNominalTimeAsISO8601());
        this.instanceGraphBuilder.addReplicatedInstance(context);
    }

    private void updateEvictedFeedInstance(WorkflowExecutionContext context) throws FalconException {
        LOG.info("Updating evicted feed instance: {}", (Object)context.getNominalTimeAsISO8601());
        this.instanceGraphBuilder.addEvictedInstance(context);
    }

    private void updateImportedFeedInstance(WorkflowExecutionContext context) throws FalconException {
        LOG.info("Updating imported feed instance: {}", (Object)context.getNominalTimeAsISO8601());
        this.instanceGraphBuilder.addImportedInstance(context);
    }

    private void updateExportedFeedInstance(WorkflowExecutionContext context) throws FalconException {
        LOG.info("Updating export feed instance: {}", (Object)context.getNominalTimeAsISO8601());
        this.instanceGraphBuilder.addExportedInstance(context);
    }
}

