/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.siddhi.operator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException;
import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext;
import org.apache.flink.contrib.siddhi.operator.StreamOutputHandler;
import org.apache.flink.contrib.siddhi.operator.StreamRecordComparator;
import org.apache.flink.contrib.siddhi.schema.StreamSchema;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;

public abstract class AbstractSiddhiOperator<IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
    private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
    private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
    private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";
    private final SiddhiOperatorContext siddhiPlan;
    private final String executionExpression;
    private final boolean isProcessingTime;
    private final Map<String, StreamElementSerializer<IN>> streamRecordSerializers;
    private transient SiddhiManager siddhiManager;
    private transient SiddhiAppRuntime siddhiRuntime;
    private transient Map<String, InputHandler> inputStreamHandlers;
    private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
    private transient ListState<byte[]> siddhiRuntimeState;
    private transient ListState<byte[]> queuedRecordsState;

    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
        AbstractSiddhiOperator.validate(siddhiPlan);
        this.executionExpression = siddhiPlan.getFinalExecutionPlan();
        this.siddhiPlan = siddhiPlan;
        this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
        this.streamRecordSerializers = new HashMap<String, StreamElementSerializer<IN>>();
        this.registerStreamRecordSerializers();
    }

    private void registerStreamRecordSerializers() {
        for (String streamId : this.siddhiPlan.getInputStreams()) {
            this.streamRecordSerializers.put(streamId, this.createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig()));
        }
    }

    protected abstract StreamElementSerializer<IN> createStreamRecordSerializer(StreamSchema var1, ExecutionConfig var2);

    protected StreamElementSerializer<IN> getStreamRecordSerializer(String streamId) {
        if (this.streamRecordSerializers.containsKey(streamId)) {
            return this.streamRecordSerializers.get(streamId);
        }
        throw new UndefinedStreamException("Stream " + streamId + " not defined");
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        String streamId = this.getStreamId(element.getValue());
        StreamSchema schema = this.siddhiPlan.getInputStreamSchema(streamId);
        if (this.isProcessingTime) {
            this.processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
            this.checkpointSiddhiRuntimeState();
        } else {
            PriorityQueue<StreamRecord<IN>> priorityQueue = this.getPriorityQueue();
            if (this.getExecutionConfig().isObjectReuseEnabled()) {
                priorityQueue.offer(new StreamRecord(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
            } else {
                priorityQueue.offer(element);
            }
            this.checkpointRecordQueueState();
        }
    }

    protected abstract void processEvent(String var1, StreamSchema<IN> var2, IN var3, long var4) throws Exception;

    public void processWatermark(Watermark mark) throws Exception {
        while (!this.priorityQueue.isEmpty() && this.priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
            StreamRecord<IN> streamRecord = this.priorityQueue.poll();
            String streamId = this.getStreamId(streamRecord.getValue());
            long timestamp = streamRecord.getTimestamp();
            StreamSchema schema = this.siddhiPlan.getInputStreamSchema(streamId);
            this.processEvent(streamId, schema, streamRecord.getValue(), timestamp);
        }
        this.output.emitWatermark(mark);
    }

    public abstract String getStreamId(IN var1);

    public PriorityQueue<StreamRecord<IN>> getPriorityQueue() {
        return this.priorityQueue;
    }

    protected SiddhiAppRuntime getSiddhiRuntime() {
        return this.siddhiRuntime;
    }

    public InputHandler getSiddhiInputHandler(String streamId) {
        return this.inputStreamHandlers.get(streamId);
    }

    protected SiddhiOperatorContext getSiddhiPlan() {
        return this.siddhiPlan;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        if (this.priorityQueue == null) {
            this.priorityQueue = new PriorityQueue(11, new StreamRecordComparator());
        }
        this.startSiddhiRuntime();
    }

    protected void send(String streamId, Object[] data, long timestamp) throws InterruptedException {
        this.getSiddhiInputHandler(streamId).send(timestamp, data);
    }

    private static void validate(SiddhiOperatorContext siddhiPlan) {
        SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager();
        try {
            siddhiManager.validateSiddhiApp(siddhiPlan.getFinalExecutionPlan());
        }
        finally {
            siddhiManager.shutdown();
        }
    }

    private void startSiddhiRuntime() {
        if (this.siddhiRuntime == null) {
            this.siddhiManager = this.siddhiPlan.createSiddhiManager();
            for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
                this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
            }
        } else {
            throw new IllegalStateException("Siddhi has already been initialized");
        }
        this.siddhiRuntime = this.siddhiManager.createSiddhiAppRuntime(this.executionExpression);
        this.siddhiRuntime.start();
        this.registerInputAndOutput(this.siddhiRuntime);
        LOGGER.info("Siddhi {} started", (Object)this.siddhiRuntime.getName());
    }

    private void shutdownSiddhiRuntime() {
        if (this.siddhiRuntime == null) {
            throw new IllegalStateException("Siddhi has already shutdown");
        }
        this.siddhiRuntime.shutdown();
        LOGGER.info("Siddhi {} shutdown", (Object)this.siddhiRuntime.getName());
        this.siddhiRuntime = null;
        this.siddhiManager.shutdown();
        this.siddhiManager = null;
        this.inputStreamHandlers = null;
    }

    private void registerInputAndOutput(SiddhiAppRuntime runtime) {
        AbstractDefinition definition = (AbstractDefinition)this.siddhiRuntime.getStreamDefinitionMap().get(this.siddhiPlan.getOutputStreamId());
        runtime.addCallback(this.siddhiPlan.getOutputStreamId(), new StreamOutputHandler(this.siddhiPlan.getOutputStreamType(), definition, this.output));
        this.inputStreamHandlers = new HashMap<String, InputHandler>();
        for (String inputStreamId : this.siddhiPlan.getInputStreams()) {
            this.inputStreamHandlers.put(inputStreamId, runtime.getInputHandler(inputStreamId));
        }
    }

    public void dispose() throws Exception {
        this.shutdownSiddhiRuntime();
        this.siddhiRuntimeState.clear();
        super.dispose();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.checkpointSiddhiRuntimeState();
        this.checkpointRecordQueueState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreState() throws Exception {
        LOGGER.info("Restore siddhi state");
        Iterator siddhiState = ((Iterable)this.siddhiRuntimeState.get()).iterator();
        if (siddhiState.hasNext()) {
            this.siddhiRuntime.restore((byte[])siddhiState.next());
        }
        LOGGER.info("Restore queued records state");
        Iterator queueState = ((Iterable)this.queuedRecordsState.get()).iterator();
        if (queueState.hasNext()) {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream((byte[])queueState.next());
            DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper((InputStream)byteArrayInputStream);
            try {
                this.priorityQueue = this.restoreQueuerState((DataInputView)dataInputView);
            }
            finally {
                dataInputView.close();
                byteArrayInputStream.close();
            }
        }
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        if (this.siddhiRuntimeState == null) {
            this.siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor(SIDDHI_RUNTIME_STATE_NAME, (TypeSerializer)new BytePrimitiveArraySerializer()));
        }
        if (this.queuedRecordsState == null) {
            this.queuedRecordsState = context.getOperatorStateStore().getListState(new ListStateDescriptor(QUEUED_RECORDS_STATE_NAME, (TypeSerializer)new BytePrimitiveArraySerializer()));
        }
        if (context.isRestored()) {
            this.restoreState();
        }
    }

    private void checkpointSiddhiRuntimeState() throws Exception {
        this.siddhiRuntimeState.clear();
        this.siddhiRuntimeState.add((Object)this.siddhiRuntime.snapshot());
        this.queuedRecordsState.clear();
    }

    private void checkpointRecordQueueState() throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper((OutputStream)byteArrayOutputStream);
        try {
            this.snapshotQueueState(this.priorityQueue, (DataOutputView)dataOutputView);
            this.queuedRecordsState.clear();
            this.queuedRecordsState.add((Object)byteArrayOutputStream.toByteArray());
        }
        finally {
            dataOutputView.close();
            byteArrayOutputStream.close();
        }
    }

    protected abstract void snapshotQueueState(PriorityQueue<StreamRecord<IN>> var1, DataOutputView var2) throws IOException;

    protected abstract PriorityQueue<StreamRecord<IN>> restoreQueuerState(DataInputView var1) throws IOException;
}

