/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute.builder;

import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.olacabs.fabric.common.util.metrics.MetricFactory;
import com.olacabs.fabric.compute.ProcessingContext;
import com.olacabs.fabric.compute.builder.Loader;
import com.olacabs.fabric.compute.pipeline.ComputationPipeline;
import com.olacabs.fabric.compute.pipeline.MessageSource;
import com.olacabs.fabric.compute.pipeline.NotificationBus;
import com.olacabs.fabric.compute.pipeline.PipelineStage;
import com.olacabs.fabric.compute.processor.ProcessorBase;
import com.olacabs.fabric.compute.source.PipelineSource;
import com.olacabs.fabric.compute.source.PipelineStreamSource;
import com.olacabs.fabric.model.common.ComponentMetadata;
import com.olacabs.fabric.model.computation.ComputationSpec;
import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Linker {
    private static final Logger LOGGER = LoggerFactory.getLogger(Linker.class);
    private static final String DEFAULT_REGISTRY_NAME = "metrics-registry";
    private final Loader loader;
    private MetricRegistry metricRegistry;

    public Linker(Loader loader) {
        this(loader, MetricFactory.getMetricRegistry());
    }

    public Linker(Loader loader, MetricRegistry metricRegistry) {
        this.loader = loader;
        this.metricRegistry = metricRegistry;
    }

    public ComputationPipeline build(ComputationSpec spec) {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        NotificationBus notificationBus = new NotificationBus(spec.getProperties());
        ProcessingContext processingContext = new ProcessingContext(spec.getName(), objectMapper);
        ComputationPipeline pipeline = ComputationPipeline.builder();
        pipeline.notificationBus(notificationBus);
        pipeline.computationName(spec.getName());
        HashMap sources = Maps.newHashMap();
        HashMap stages = Maps.newHashMap();
        spec.getSources().forEach(sourceMetadata -> {
            ComponentMetadata meta = sourceMetadata.getMeta();
            PipelineSource source = null;
            String errorMessage = String.format("Source object not loaded properly [%s:%s:%s]", meta.getNamespace(), meta.getName(), meta.getVersion());
            try {
                PipelineSource sourceCopy = this.loader.loadSource(meta);
                if (sourceCopy != null) {
                    source = (PipelineSource)sourceCopy.getClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                    source.setMetricRegistry(this.metricRegistry);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(errorMessage, e);
            }
            Preconditions.checkNotNull(source, (Object)errorMessage);
            LOGGER.info("Loaded source: {}:{}:{}", new Object[]{meta.getNamespace(), meta.getName(), meta.getVersion()});
            PipelineStreamSource sourceStage = PipelineStreamSource.builder().instanceId(sourceMetadata.getId()).properties(sourceMetadata.getProperties()).notificationBus(notificationBus).sourceMetadata(sourceMetadata.getMeta()).topologyName(processingContext.getTopologyName()).source(source).processingContext(processingContext).objectMapper(objectMapper).registry(this.metricRegistry).build();
            pipeline.addSource(sourceStage);
            sources.put(sourceMetadata.getId(), sourceStage);
        });
        spec.getProcessors().forEach(processorMetadata -> {
            ComponentMetadata meta = processorMetadata.getMeta();
            ProcessorBase processorBase = null;
            String errorMessage = String.format("Processor object not loaded properly [%s:%s:%s]", meta.getNamespace(), meta.getName(), meta.getVersion());
            try {
                ProcessorBase processorBaseCopy = this.loader.loadProcessor(meta);
                if (processorBaseCopy != null) {
                    processorBase = (ProcessorBase)processorBaseCopy.getClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
                    processorBase.setMetricRegistry(this.metricRegistry);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(errorMessage, e);
            }
            Preconditions.checkNotNull(processorBase, (Object)errorMessage);
            LOGGER.info("Loaded processor: {}:{}:{}", new Object[]{meta.getNamespace(), meta.getName(), meta.getVersion()});
            PipelineStage stage = PipelineStage.builder().instanceId(processorMetadata.getId()).properties(processorMetadata.getProperties()).notificationBus(notificationBus).processorMetadata(processorMetadata.getMeta()).processor(processorBase).context(processingContext).build();
            processorBase.setId(processorMetadata.getId());
            pipeline.addPipelineStage(stage);
            stages.put(processorMetadata.getId(), stage);
        });
        spec.getConnections().forEach(connection -> {
            switch (connection.getFromType()) {
                case SOURCE: {
                    pipeline.connect((MessageSource)sources.get(connection.getFrom()), (PipelineStage)stages.get(connection.getTo()));
                    break;
                }
                case PROCESSOR: {
                    pipeline.connect((MessageSource)stages.get(connection.getFrom()), (PipelineStage)stages.get(connection.getTo()));
                    break;
                }
            }
        });
        return pipeline;
    }
}

