/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.siddhi.operator;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext;
import org.apache.flink.contrib.siddhi.operator.StreamOutputHandler;
import org.apache.flink.contrib.siddhi.operator.StreamRecordComparator;
import org.apache.flink.contrib.siddhi.schema.StreamSchema;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;

public abstract class AbstractSiddhiOperator<IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
    private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
    private final SiddhiOperatorContext siddhiPlan;
    private final String executionExpression;
    private final boolean isProcessingTime;
    private final Map<String, MultiplexingStreamRecordSerializer<IN>> streamRecordSerializers;
    private transient SiddhiManager siddhiManager;
    private transient ExecutionPlanRuntime siddhiRuntime;
    private transient Map<String, InputHandler> inputStreamHandlers;
    private transient PriorityQueue<StreamRecord<IN>> priorityQueue;

    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
        AbstractSiddhiOperator.validate(siddhiPlan);
        this.executionExpression = siddhiPlan.getFinalExecutionPlan();
        this.siddhiPlan = siddhiPlan;
        this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
        this.streamRecordSerializers = new HashMap<String, MultiplexingStreamRecordSerializer<IN>>();
        for (String streamId : this.siddhiPlan.getInputStreams()) {
            this.streamRecordSerializers.put(streamId, this.createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig()));
        }
    }

    protected abstract MultiplexingStreamRecordSerializer<IN> createStreamRecordSerializer(StreamSchema var1, ExecutionConfig var2);

    protected MultiplexingStreamRecordSerializer<IN> getStreamRecordSerializer(String streamId) {
        if (this.streamRecordSerializers.containsKey(streamId)) {
            return this.streamRecordSerializers.get(streamId);
        }
        throw new UndefinedStreamException("Stream " + streamId + " not defined");
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        String streamId = this.getStreamId(element.getValue());
        StreamSchema schema = this.siddhiPlan.getInputStreamSchema(streamId);
        if (this.isProcessingTime) {
            this.processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
        } else {
            PriorityQueue<StreamRecord<IN>> priorityQueue = this.getPriorityQueue();
            if (this.getExecutionConfig().isObjectReuseEnabled()) {
                priorityQueue.offer(new StreamRecord(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
            } else {
                priorityQueue.offer(element);
            }
        }
    }

    protected abstract void processEvent(String var1, StreamSchema<IN> var2, IN var3, long var4) throws Exception;

    public void processWatermark(Watermark mark) throws Exception {
        while (!this.priorityQueue.isEmpty() && this.priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
            StreamRecord<IN> streamRecord = this.priorityQueue.poll();
            String streamId = this.getStreamId(streamRecord.getValue());
            long timestamp = streamRecord.getTimestamp();
            StreamSchema schema = this.siddhiPlan.getInputStreamSchema(streamId);
            this.processEvent(streamId, schema, streamRecord.getValue(), timestamp);
        }
        this.output.emitWatermark(mark);
    }

    public abstract String getStreamId(IN var1);

    public PriorityQueue<StreamRecord<IN>> getPriorityQueue() {
        return this.priorityQueue;
    }

    protected ExecutionPlanRuntime getSiddhiRuntime() {
        return this.siddhiRuntime;
    }

    public InputHandler getSiddhiInputHandler(String streamId) {
        return this.inputStreamHandlers.get(streamId);
    }

    protected SiddhiOperatorContext getSiddhiPlan() {
        return this.siddhiPlan;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        this.startSiddhiRuntime();
    }

    public void open() throws Exception {
        if (this.priorityQueue == null) {
            this.priorityQueue = new PriorityQueue(11, new StreamRecordComparator());
        }
        super.open();
    }

    protected void send(String streamId, Object[] data, long timestamp) throws InterruptedException {
        this.getSiddhiInputHandler(streamId).send(timestamp, data);
    }

    private static void validate(SiddhiOperatorContext siddhiPlan) {
        SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager();
        try {
            siddhiManager.validateExecutionPlan(siddhiPlan.getFinalExecutionPlan());
        }
        finally {
            siddhiManager.shutdown();
        }
    }

    private void startSiddhiRuntime() {
        if (this.siddhiRuntime == null) {
            this.siddhiManager = this.siddhiPlan.createSiddhiManager();
            for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
                this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
            }
        } else {
            throw new IllegalStateException("Siddhi runtime has already been initialized");
        }
        this.siddhiRuntime = this.siddhiManager.createExecutionPlanRuntime(this.executionExpression);
        this.siddhiRuntime.start();
        this.registerInputAndOutput(this.siddhiRuntime);
        LOGGER.info("Siddhi runtime {} started", (Object)this.siddhiRuntime.getName());
    }

    private void shutdownSiddhiRuntime() {
        if (this.siddhiRuntime == null) {
            throw new IllegalStateException("Siddhi runtime has already shutdown");
        }
        this.siddhiRuntime.shutdown();
        LOGGER.info("Siddhi runtime {} shutdown", (Object)this.siddhiRuntime.getName());
        this.siddhiRuntime = null;
        this.siddhiManager.shutdown();
        this.siddhiManager = null;
        this.inputStreamHandlers = null;
    }

    private void registerInputAndOutput(ExecutionPlanRuntime runtime) {
        AbstractDefinition definition = (AbstractDefinition)this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId());
        runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler(this.siddhiPlan.getOutputStreamType(), definition, this.output));
        this.inputStreamHandlers = new HashMap<String, InputHandler>();
        for (String inputStreamId : this.siddhiPlan.getInputStreams()) {
            this.inputStreamHandlers.put(inputStreamId, runtime.getInputHandler(inputStreamId));
        }
    }

    public void dispose() throws Exception {
        LOGGER.info("Disposing");
        super.dispose();
        this.shutdownSiddhiRuntime();
        this.output.close();
    }

    public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
        super.snapshotState(out, checkpointId, timestamp);
        ObjectOutputStream oos = new ObjectOutputStream((OutputStream)out);
        byte[] siddhiRuntimeSnapshot = this.siddhiRuntime.snapshot();
        int siddhiRuntimeSnapshotLength = siddhiRuntimeSnapshot.length;
        oos.writeInt(siddhiRuntimeSnapshotLength);
        out.write(siddhiRuntimeSnapshot, 0, siddhiRuntimeSnapshotLength);
        this.snapshotQueuerState(this.priorityQueue, (DataOutputView)new DataOutputViewStreamWrapper((OutputStream)oos));
        oos.flush();
    }

    public void restoreState(FSDataInputStream state) throws Exception {
        super.restoreState(state);
        ObjectInputStream ois = new ObjectInputStream((InputStream)state);
        this.startSiddhiRuntime();
        int siddhiRuntimeSnapshotLength = ois.readInt();
        byte[] siddhiRuntimeSnapshot = new byte[siddhiRuntimeSnapshotLength];
        int readLength = ois.read(siddhiRuntimeSnapshot, 0, siddhiRuntimeSnapshotLength);
        assert (readLength == siddhiRuntimeSnapshotLength);
        this.siddhiRuntime.restore(siddhiRuntimeSnapshot);
        this.priorityQueue = this.restoreQueuerState((DataInputView)new DataInputViewStreamWrapper((InputStream)ois));
    }

    protected abstract void snapshotQueuerState(PriorityQueue<StreamRecord<IN>> var1, DataOutputView var2) throws IOException;

    protected abstract PriorityQueue<StreamRecord<IN>> restoreQueuerState(DataInputView var1) throws IOException;
}

