/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.siddhi;

import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.siddhi.SiddhiCEP;
import org.apache.flink.streaming.siddhi.control.ControlEvent;
import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
import org.apache.flink.streaming.siddhi.router.AddRouteOperator;
import org.apache.flink.streaming.siddhi.router.DynamicPartitioner;
import org.apache.flink.streaming.siddhi.router.StreamRoute;
import org.apache.flink.streaming.siddhi.utils.GenericRecord;
import org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory;
import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public abstract class SiddhiStream {
    private final SiddhiCEP cepEnvironment;

    public SiddhiStream(SiddhiCEP cepEnvironment) {
        Preconditions.checkNotNull((Object)cepEnvironment, (String)"SiddhiCEP cepEnvironment is null");
        this.cepEnvironment = cepEnvironment;
    }

    protected SiddhiCEP getCepEnvironment() {
        return this.cepEnvironment;
    }

    protected abstract DataStream<Tuple2<StreamRoute, Object>> toDataStream();

    protected <T> DataStream<Tuple2<StreamRoute, Object>> convertDataStream(DataStream<T> dataStream, String streamId) {
        final String streamIdInClosure = streamId;
        SingleOutputStreamOperator resultStream = dataStream.map(new MapFunction<T, Tuple2<StreamRoute, Object>>(){

            public Tuple2<StreamRoute, Object> map(T value) throws Exception {
                return Tuple2.of((Object)StreamRoute.of(streamIdInClosure), value);
            }
        });
        if (dataStream instanceof KeyedStream) {
            final KeySelector keySelector = ((KeyedStream)dataStream).getKeySelector();
            KeySelector<Tuple2<StreamRoute, Object>, Object> keySelectorInClosure = new KeySelector<Tuple2<StreamRoute, Object>, Object>(){

                public Object getKey(Tuple2<StreamRoute, Object> value) throws Exception {
                    return keySelector.getKey(value.f1);
                }
            };
            return resultStream.keyBy((KeySelector)keySelectorInClosure);
        }
        return resultStream;
    }

    public static class ExecutionSiddhiStream {
        private final DataStream<Tuple2<StreamRoute, Object>> dataStream;
        private DataStream createdDataStream;
        private SiddhiOperatorContext siddhiContext;
        private String executionPlanId;

        public ExecutionSiddhiStream(DataStream<Tuple2<StreamRoute, Object>> dataStream, String executionPlan, SiddhiCEP environment) {
            this.dataStream = dataStream;
            this.siddhiContext = new SiddhiOperatorContext();
            if (executionPlan != null) {
                this.executionPlanId = this.siddhiContext.addExecutionPlan(executionPlan);
            }
            this.siddhiContext.setInputStreamSchemas(environment.getDataStreamSchemas());
            this.siddhiContext.setTimeCharacteristic(environment.getExecutionEnvironment().getStreamTimeCharacteristic());
            this.siddhiContext.setExtensions(environment.getExtensions());
            this.siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
        }

        public <T extends Tuple> DataStream<T> returns(String outStreamId) {
            TypeInformation typeInformation = SiddhiTypeFactory.getTupleTypeInformation(this.siddhiContext.getAllEnrichedExecutionPlan(), outStreamId);
            return this.returns(Collections.singletonList(outStreamId)).map((MapFunction & Serializable)value -> (Tuple)typeInformation.getTypeClass().cast(value.f1)).returns(typeInformation);
        }

        public <T extends Tuple> DataStream<Tuple2<String, T>> returns(List<String> outStreamIds) {
            for (String outStreamId : outStreamIds) {
                TypeInformation typeInformation = SiddhiTypeFactory.getTupleTypeInformation(this.siddhiContext.getAllEnrichedExecutionPlan(), outStreamId);
                this.siddhiContext.setOutputStreamType(outStreamId, typeInformation);
            }
            return this.returnsInternal(this.siddhiContext, this.executionPlanId);
        }

        public DataStream<Row> returnsTransformRow(String outStreamId) {
            AbstractDefinition definition = SiddhiTypeFactory.getStreamDefinition(this.siddhiContext.getAllEnrichedExecutionPlan(), outStreamId, this.siddhiContext);
            ArrayList<TypeInformation> types = new ArrayList<TypeInformation>();
            for (Attribute attribute : definition.getAttributeList()) {
                types.add(TypeInformation.of(SiddhiTypeFactory.getJavaType(attribute.getType())));
            }
            TypeInformation typeInformation = Types.ROW((TypeInformation[])types.toArray(new TypeInformation[0]));
            return this.returnAsRow(Collections.singletonList(outStreamId)).map((MapFunction & Serializable)x -> (Row)typeInformation.getTypeClass().cast(x.f1)).returns(typeInformation);
        }

        public DataStream<Map<String, Object>> returnAsMap(String outStreamId) {
            return this.returnAsMap(Collections.singletonList(outStreamId)).map((MapFunction)new MapFunction<Tuple2<String, Map<String, Object>>, Map<String, Object>>(){

                public Map<String, Object> map(Tuple2<String, Map<String, Object>> value) throws Exception {
                    return (Map)value.f1;
                }
            });
        }

        public DataStream<Tuple2<String, Map<String, Object>>> returnAsMap(List<String> outStreamIds) {
            for (String outStreamId : outStreamIds) {
                this.siddhiContext.setOutputStreamType(outStreamId, SiddhiTypeFactory.getMapTypeInformation());
            }
            return this.returnsInternal().map((MapFunction)new MapFunction<Tuple2<String, Object>, Tuple2<String, Map<String, Object>>>(){

                public Tuple2<String, Map<String, Object>> map(Tuple2<String, Object> value) throws Exception {
                    return Tuple2.of((Object)value.f0, ((GenericRecord)value.f1).getMap());
                }
            });
        }

        public DataStream<Row> returnAsRow(String outStreamId) {
            return this.returnAsRow(Collections.singletonList(outStreamId)).map((MapFunction & Serializable)x -> (Row)x.f1);
        }

        public DataStream<Tuple2<String, Row>> returnAsRow(List<String> outStreamIds) {
            for (String outStreamId : outStreamIds) {
                this.siddhiContext.setOutputStreamType(outStreamId, TypeExtractor.createTypeInfo(Row.class));
            }
            return this.returnsInternal();
        }

        public <T> DataStream<T> returns(String outStreamId, Class<T> outType) {
            return this.returns(Collections.singletonList(outStreamId), outType).map((MapFunction & Serializable)x -> x.f1);
        }

        public <T> DataStream<Tuple2<String, T>> returns(List<String> outStreamIds, Class<T> outType) {
            for (String outStreamId : outStreamIds) {
                TypeInformation typeInformation = TypeExtractor.getForClass(outType);
                this.siddhiContext.setOutputStreamType(outStreamId, typeInformation);
            }
            return this.returnsInternal();
        }

        public <T> DataStream<T> returns(String outStreamId, TypeInformation<T> typeInformation) {
            return this.returns(Collections.singletonList(outStreamId), typeInformation).map((MapFunction & Serializable)x -> x.f1);
        }

        public <T> DataStream<Tuple2<String, T>> returns(List<String> outStreamIds, TypeInformation<T> typeInformation) {
            for (String outStreamId : outStreamIds) {
                this.siddhiContext.setOutputStreamType(outStreamId, typeInformation);
            }
            return this.returnsInternal();
        }

        @VisibleForTesting
        <T> DataStream<Tuple2<String, T>> returnsInternal() {
            return this.returnsInternal(this.siddhiContext, this.executionPlanId);
        }

        private <T> DataStream<Tuple2<String, T>> returnsInternal(SiddhiOperatorContext siddhiContext, final String executionPlanId) {
            if (this.createdDataStream == null) {
                SingleOutputStreamOperator mapped = this.dataStream.map((MapFunction)new MapFunction<Tuple2<StreamRoute, Object>, Tuple2<StreamRoute, Object>>(){

                    public Tuple2<StreamRoute, Object> map(Tuple2<StreamRoute, Object> value) throws Exception {
                        if (executionPlanId != null) {
                            ((StreamRoute)value.f0).addExecutionPlanId(executionPlanId);
                        }
                        return value;
                    }
                });
                this.createdDataStream = SiddhiStreamFactory.createDataStream(siddhiContext, (DataStream<Tuple2<StreamRoute, Object>>)mapped);
            }
            return this.createdDataStream;
        }
    }

    public static class UnionSiddhiStream<T>
    extends ExecutableStream {
        private String firstStreamId;
        private List<String> unionStreamIds;

        public UnionSiddhiStream(String firstStreamId, List<String> unionStreamIds, SiddhiCEP environment) {
            super(environment);
            Preconditions.checkNotNull((Object)firstStreamId, (String)"firstStreamId");
            Preconditions.checkNotNull(unionStreamIds, (String)"unionStreamIds");
            environment.checkStreamDefined(firstStreamId);
            for (String unionStreamId : unionStreamIds) {
                environment.checkStreamDefined(unionStreamId);
            }
            this.firstStreamId = firstStreamId;
            this.unionStreamIds = unionStreamIds;
        }

        public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String ... fieldNames) {
            Preconditions.checkNotNull((Object)streamId, (String)"streamId");
            Preconditions.checkNotNull(dataStream, (String)"dataStream");
            Preconditions.checkNotNull((Object)fieldNames, (String)"fieldNames");
            this.getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
            return this.union(streamId);
        }

        public UnionSiddhiStream<T> union(String ... streamId) {
            LinkedList<String> newUnionStreamIds = new LinkedList<String>();
            newUnionStreamIds.addAll(this.unionStreamIds);
            newUnionStreamIds.addAll(Arrays.asList(streamId));
            return new UnionSiddhiStream<T>(this.firstStreamId, newUnionStreamIds, this.getCepEnvironment());
        }

        @Override
        protected DataStream<Tuple2<StreamRoute, Object>> toDataStream() {
            String localFirstStreamId = this.firstStreamId;
            List<String> localUnionStreamIds = this.unionStreamIds;
            DataStream dataStream = this.convertDataStream(this.getCepEnvironment().getDataStream(localFirstStreamId), this.firstStreamId);
            for (String unionStreamId : localUnionStreamIds) {
                dataStream = dataStream.union(new DataStream[]{this.convertDataStream(this.getCepEnvironment().getDataStream(unionStreamId), unionStreamId)});
            }
            return dataStream;
        }
    }

    public static class SingleSiddhiStream<T>
    extends ExecutableStream {
        private final String streamId;

        public SingleSiddhiStream(String streamId, SiddhiCEP environment) {
            super(environment);
            environment.checkStreamDefined(streamId);
            this.streamId = streamId;
        }

        public UnionSiddhiStream<T> union(String streamId, DataStream<T> dataStream, String ... fieldNames) {
            this.getCepEnvironment().registerStream(streamId, dataStream, fieldNames);
            return this.union(streamId);
        }

        public UnionSiddhiStream<T> union(String ... streamIds) {
            Preconditions.checkNotNull((Object)streamIds, (String)"streamIds");
            return new UnionSiddhiStream(this.streamId, Arrays.asList(streamIds), this.getCepEnvironment());
        }

        @Override
        protected DataStream<Tuple2<StreamRoute, Object>> toDataStream() {
            return this.convertDataStream(this.getCepEnvironment().getDataStream(this.streamId), this.streamId);
        }
    }

    public static abstract class ExecutableStream
    extends SiddhiStream {
        public ExecutableStream(SiddhiCEP environment) {
            super(environment);
        }

        public ExecutionSiddhiStream cql(String executionPlan) {
            Preconditions.checkNotNull((Object)executionPlan, (String)"executionPlan");
            return new ExecutionSiddhiStream(this.toDataStream(), executionPlan, this.getCepEnvironment());
        }

        public ExecutionSiddhiStream cql(DataStream<ControlEvent> controlStream) {
            SingleOutputStreamOperator unionStream = controlStream.map((MapFunction)new NamedControlStream("_internal_control_stream")).broadcast().union(new DataStream[]{this.toDataStream()}).transform("add route transform", SiddhiTypeFactory.getStreamTupleTypeInformation(TypeInformation.of(Object.class)), (OneInputStreamOperator)new AddRouteOperator(this.getCepEnvironment().getDataStreamSchemas()));
            DataStream partitionedStream = new DataStream(unionStream.getExecutionEnvironment(), (StreamTransformation)new PartitionTransformation(unionStream.getTransformation(), (StreamPartitioner)new DynamicPartitioner()));
            return new ExecutionSiddhiStream((DataStream<Tuple2<StreamRoute, Object>>)partitionedStream, null, this.getCepEnvironment());
        }

        private static class NamedControlStream
        implements MapFunction<ControlEvent, Tuple2<StreamRoute, Object>>,
        ResultTypeQueryable {
            private static final TypeInformation<Tuple2<StreamRoute, Object>> TYPE_INFO = TypeInformation.of((TypeHint)new TypeHint<Tuple2<StreamRoute, Object>>(){});
            private final String streamId;

            NamedControlStream(String streamId) {
                this.streamId = streamId;
            }

            public Tuple2<StreamRoute, Object> map(ControlEvent value) throws Exception {
                return Tuple2.of((Object)StreamRoute.of(this.streamId), (Object)value);
            }

            public TypeInformation getProducedType() {
                return TYPE_INFO;
            }
        }
    }
}

