/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.siddhi;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.contrib.siddhi.SiddhiCEP;
import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext;
import org.apache.flink.contrib.siddhi.utils.SiddhiStreamFactory;
import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public abstract class SiddhiStream {
    private final SiddhiCEP cepEnvironment;

    public SiddhiStream(SiddhiCEP cepEnvironment) {
        Preconditions.checkNotNull((Object)cepEnvironment, (String)"SiddhiCEP cepEnvironment is null");
        this.cepEnvironment = cepEnvironment;
    }

    protected SiddhiCEP getCepEnvironment() {
        return this.cepEnvironment;
    }

    protected abstract DataStream<Tuple2<String, Object>> toDataStream();

    protected <T> DataStream<Tuple2<String, Object>> convertDataStream(DataStream<T> dataStream, String streamId) {
        final String streamIdInClosure = streamId;
        SingleOutputStreamOperator resultStream = dataStream.map(new MapFunction<T, Tuple2<String, Object>>(){

            public Tuple2<String, Object> map(T value) throws Exception {
                return Tuple2.of((Object)streamIdInClosure, value);
            }
        });
        if (dataStream instanceof KeyedStream) {
            final KeySelector keySelector = ((KeyedStream)dataStream).getKeySelector();
            KeySelector<Tuple2<String, Object>, Object> keySelectorInClosure = new KeySelector<Tuple2<String, Object>, Object>(){

                public Object getKey(Tuple2<String, Object> value) throws Exception {
                    return keySelector.getKey(value.f1);
                }
            };
            return resultStream.keyBy((KeySelector)keySelectorInClosure);
        }
        return resultStream;
    }

    public static class ExecutionSiddhiStream {
        private final DataStream<Tuple2<String, Object>> dataStream;
        private final SiddhiCEP environment;
        private final String executionPlan;

        public ExecutionSiddhiStream(DataStream<Tuple2<String, Object>> dataStream, String executionPlan, SiddhiCEP environment) {
            this.executionPlan = executionPlan;
            this.dataStream = dataStream;
            this.environment = environment;
        }

        public <T extends Tuple> DataStream<T> returns(String outStreamId) {
            SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
            siddhiContext.setExecutionPlan(this.executionPlan);
            siddhiContext.setInputStreamSchemas(this.environment.getDataStreamSchemas());
            siddhiContext.setTimeCharacteristic(this.environment.getExecutionEnvironment().getStreamTimeCharacteristic());
            siddhiContext.setOutputStreamId(outStreamId);
            siddhiContext.setExtensions(this.environment.getExtensions());
            siddhiContext.setExecutionConfig(this.environment.getExecutionEnvironment().getConfig());
            TypeInformation typeInformation = SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getFinalExecutionPlan(), outStreamId);
            siddhiContext.setOutputStreamType(typeInformation);
            return this.returnsInternal(siddhiContext);
        }

        public DataStream<Map<String, Object>> returnAsMap(String outStreamId) {
            return this.returnsInternal(outStreamId, SiddhiTypeFactory.getMapTypeInformation());
        }

        public <T> DataStream<T> returns(String outStreamId, Class<T> outType) {
            TypeInformation typeInformation = TypeExtractor.getForClass(outType);
            return this.returnsInternal(outStreamId, typeInformation);
        }

        private <T> DataStream<T> returnsInternal(String outStreamId, TypeInformation<T> typeInformation) {
            SiddhiOperatorContext siddhiContext = new SiddhiOperatorContext();
            siddhiContext.setExecutionPlan(this.executionPlan);
            siddhiContext.setInputStreamSchemas(this.environment.getDataStreamSchemas());
            siddhiContext.setTimeCharacteristic(this.environment.getExecutionEnvironment().getStreamTimeCharacteristic());
            siddhiContext.setOutputStreamId(outStreamId);
            siddhiContext.setOutputStreamType(typeInformation);
            siddhiContext.setExtensions(this.environment.getExtensions());
            siddhiContext.setExecutionConfig(this.environment.getExecutionEnvironment().getConfig());
            return this.returnsInternal(siddhiContext);
        }

        private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext) {
            return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream);
        }
    }

    public static class UnionSiddhiStream<T>
    extends ExecutableStream {
        private String firstStreamId;
        private List<String> unionStreamIds;

        public UnionSiddhiStream(String firstStreamId, List<String> unionStreamIds, SiddhiCEP environment) {
            super(environment);
            Preconditions.checkNotNull((Object)firstStreamId, (String)"firstStreamId");
            Preconditions.checkNotNull(unionStreamIds, (String)"unionStreamIds");
            environment.checkStreamDefined(firstStreamId);
            for (String unionStreamId : unionStreamIds) {
                environment.checkStreamDefined(unionStreamId);
            }
            this.firstStreamId = firstStreamId;
            this.unionStreamIds = unionStreamIds;
        }

        public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String ... fieldNames) {
            Preconditions.checkNotNull((Object)streamId, (String)"streamId");
            Preconditions.checkNotNull(dataStream, (String)"dataStream");
            Preconditions.checkNotNull((Object)fieldNames, (String)"fieldNames");
            this.getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
            return this.union(streamId);
        }

        public UnionSiddhiStream<T> union(String ... streamId) {
            LinkedList<String> newUnionStreamIds = new LinkedList<String>();
            newUnionStreamIds.addAll(this.unionStreamIds);
            newUnionStreamIds.addAll(Arrays.asList(streamId));
            return new UnionSiddhiStream<T>(this.firstStreamId, newUnionStreamIds, this.getCepEnvironment());
        }

        @Override
        protected DataStream<Tuple2<String, Object>> toDataStream() {
            String localFirstStreamId = this.firstStreamId;
            List<String> localUnionStreamIds = this.unionStreamIds;
            DataStream dataStream = this.convertDataStream(this.getCepEnvironment().getDataStream(localFirstStreamId), this.firstStreamId);
            for (String unionStreamId : localUnionStreamIds) {
                dataStream = dataStream.union(new DataStream[]{this.convertDataStream(this.getCepEnvironment().getDataStream(unionStreamId), unionStreamId)});
            }
            return dataStream;
        }
    }

    public static class SingleSiddhiStream<T>
    extends ExecutableStream {
        private final String streamId;

        public SingleSiddhiStream(String streamId, SiddhiCEP environment) {
            super(environment);
            environment.checkStreamDefined(streamId);
            this.streamId = streamId;
        }

        public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String ... fieldNames) {
            this.getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
            return this.union(streamId);
        }

        public UnionSiddhiStream<T> union(String ... streamIds) {
            Preconditions.checkNotNull((Object)streamIds, (String)"streamIds");
            return new UnionSiddhiStream(this.streamId, Arrays.asList(streamIds), this.getCepEnvironment());
        }

        @Override
        protected DataStream<Tuple2<String, Object>> toDataStream() {
            return this.convertDataStream(this.getCepEnvironment().getDataStream(this.streamId), this.streamId);
        }
    }

    public static abstract class ExecutableStream
    extends SiddhiStream {
        public ExecutableStream(SiddhiCEP environment) {
            super(environment);
        }

        public ExecutionSiddhiStream cql(String executionPlan) {
            Preconditions.checkNotNull((Object)executionPlan, (String)"executionPlan");
            return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, this.getCepEnvironment());
        }
    }
}

