/*
 * Decompiled with CFR 0.152.
 */
package com.github.haoch.experimental;

import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import org.apache.flink.streaming.siddhi.control.ControlEventSchema;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import scala.Predef$;

public final class CEPPipeline$ {
    public static final CEPPipeline$ MODULE$;

    static {
        new CEPPipeline$();
    }

    public void main(String[] args) {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        if (params.getNumberOfParameters() < 4) {
            Predef$.MODULE$.println((Object)"Missing parameters!\nUsage: Kafka --input-topic <topic> --control-topic <topic> --output-topic <topic> --bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id> ");
            return;
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)4, (long)10000L));
        env.enableCheckpointing(5000L);
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)params);
        String[] dataSchemaFields = (String[])((Object[])new String[]{"name", "value", "timestamp", "host"});
        TypeInformation[] dataSchemaTypes = (TypeInformation[])((Object[])new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO});
        DataStreamSource dataStream = env.addSource((SourceFunction)new FlinkKafkaConsumer010(params.getRequired("input-topic"), (DeserializationSchema)new JsonRowDeserializationSchema((TypeInformation)new RowTypeInfo(dataSchemaTypes, dataSchemaFields)), params.getProperties()));
        DataStreamSource controlStream = env.addSource((SourceFunction)new FlinkKafkaConsumer010(params.getRequired("control-topic"), (DeserializationSchema)new ControlEventSchema(), params.getProperties()));
        FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010(params.getRequired("output-topic"), (SerializationSchema)new SimpleStringSchema(), params.getProperties());
        dataStream.print();
        DataStream alertStream = SiddhiCEP.define((String)"MetricStreamKeyedByHost", (DataStream)dataStream.keyBy(new String[]{"host"}), (String[])new String[]{"name", "value", "timestamp", "host"}).union("MetricStreamKeyedByName", (DataStream)dataStream.keyBy(new String[]{"name"}), new String[]{"name", "value", "timestamp", "host"}).cql((DataStream)controlStream).returnAsMap("AlertStream");
        alertStream.map((MapFunction)new MapFunction<Map<String, Object>, String>(){

            public String map(Map<String, Object> value) {
                return value.toString();
            }
        }).addSink((SinkFunction)kafkaProducer);
        env.execute("Kafka 0.10 Example");
    }

    private CEPPipeline$() {
        MODULE$ = this;
    }
}

