/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute.pipelined;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.olacabs.fabric.common.util.metrics.MetricFactory;
import com.olacabs.fabric.compute.builder.Linker;
import com.olacabs.fabric.compute.builder.Loader;
import com.olacabs.fabric.compute.builder.impl.RegisteringLoader;
import com.olacabs.fabric.compute.pipeline.ComputationPipeline;
import com.olacabs.fabric.compute.pipelined.CountingProcessor;
import com.olacabs.fabric.compute.pipelined.MemoryBasedPipelineStreamPipelineSource;
import com.olacabs.fabric.compute.pipelined.PrinterStreamingProcessor;
import com.olacabs.fabric.compute.pipelined.SummingProcessor;
import com.olacabs.fabric.compute.processor.ProcessorBase;
import com.olacabs.fabric.compute.source.PipelineSource;
import com.olacabs.fabric.model.common.ComponentMetadata;
import com.olacabs.fabric.model.common.ComponentType;
import com.olacabs.fabric.model.computation.ComponentInstance;
import com.olacabs.fabric.model.computation.ComputationSpec;
import com.olacabs.fabric.model.computation.Connection;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

public class ComuptationPipelineTest {
    @Test
    public void testCheck() throws Exception {
        Properties properties = new Properties();
        properties.put("processor.counter_1.triggering_frequency", "1000");
        properties.put("processor.summer_1.triggering_frequency", "1000");
        properties.put("computation.shutdown.wait_time_in_seconds", "1");
        properties.put("computation.channel.channel_type", " disruptor");
        properties.put("computation.disruptor.buffer_size", "64");
        properties.put("computation.disruptor.wait_strategy", "Yield ");
        String sourceId = "source_1";
        String pid1 = "summer_1";
        String pid2 = "counter_1";
        String pid3 = "printer_1";
        RegisteringLoader loader = RegisteringLoader.builder().source("memory", (PipelineSource)new MemoryBasedPipelineStreamPipelineSource()).stage("printer", (ProcessorBase)new PrinterStreamingProcessor()).stage("summer", (ProcessorBase)new SummingProcessor()).stage("counter", (ProcessorBase)new CountingProcessor()).build();
        ComputationSpec spec = ComputationSpec.builder().name("test-pipeline").source(ComponentInstance.builder().id("source_1").meta(ComponentMetadata.builder().type(ComponentType.SOURCE).id("source_1").name("memory").build()).build()).processor(ComponentInstance.builder().id("summer_1").meta(ComponentMetadata.builder().type(ComponentType.PROCESSOR).id("summer_1").name("summer").build()).build()).processor(ComponentInstance.builder().id("counter_1").meta(ComponentMetadata.builder().type(ComponentType.PROCESSOR).id("counter_1").name("counter").build()).build()).processor(ComponentInstance.builder().id("printer_1").meta(ComponentMetadata.builder().type(ComponentType.PROCESSOR).id("printer_1").name("printer").build()).build()).connection(Connection.builder().fromType(ComponentType.SOURCE).from("source_1").to("summer_1").build()).connection(Connection.builder().fromType(ComponentType.SOURCE).from("source_1").to("counter_1").build()).connection(Connection.builder().fromType(ComponentType.SOURCE).from("source_1").to("printer_1").build()).connection(Connection.builder().fromType(ComponentType.PROCESSOR).from("summer_1").to("printer_1").build()).connection(Connection.builder().fromType(ComponentType.PROCESSOR).from("counter_1").to("printer_1").build()).properties(properties).build();
        System.out.println(new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString((Object)spec));
        Linker linker = new Linker((Loader)loader);
        ComputationPipeline pipeline = linker.build(spec);
        pipeline.initialize(properties);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ConsoleReporter reporter = ConsoleReporter.forRegistry((MetricRegistry)MetricFactory.getMetricRegistry()).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();
        reporter.start(1L, TimeUnit.SECONDS);
        executor.submit(() -> ((ComputationPipeline)pipeline).start());
        Thread.sleep(2000L);
        pipeline.stop();
        reporter.stop();
        executor.shutdownNow();
    }
}

