/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.siddhi.operator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.siddhi.control.ControlEvent;
import org.apache.flink.streaming.siddhi.control.ControlEventListener;
import org.apache.flink.streaming.siddhi.control.MetadataControlEvent;
import org.apache.flink.streaming.siddhi.control.OperationControlEvent;
import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
import org.apache.flink.streaming.siddhi.operator.StreamOutputHandler;
import org.apache.flink.streaming.siddhi.operator.StreamRecordComparator;
import org.apache.flink.streaming.siddhi.router.StreamRoute;
import org.apache.flink.streaming.siddhi.schema.StreamSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;

public abstract class AbstractSiddhiOperator<IN, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>,
ControlEventListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
    private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
    private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
    private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";
    private final SiddhiOperatorContext siddhiPlan;
    private final boolean isProcessingTime;
    private final Map<String, StreamElementSerializer<IN>> streamRecordSerializers;
    private transient SiddhiManager siddhiManager;
    private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
    private transient ListState<byte[]> siddhiRuntimeState;
    private transient ListState<byte[]> queuedRecordsState;
    private transient ConcurrentHashMap<String, QueryRuntimeHandler> siddhiRuntimeHandlers;

    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
        AbstractSiddhiOperator.validate(siddhiPlan);
        this.siddhiPlan = siddhiPlan;
        this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
        this.streamRecordSerializers = new HashMap<String, StreamElementSerializer<IN>>();
        this.registerStreamRecordSerializers();
    }

    private void registerStreamRecordSerializers() {
        for (String streamId : this.siddhiPlan.getInputStreams()) {
            this.streamRecordSerializers.put(streamId, this.createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig()));
        }
    }

    protected abstract StreamElementSerializer<IN> createStreamRecordSerializer(StreamSchema var1, ExecutionConfig var2);

    protected StreamElementSerializer<IN> getStreamRecordSerializer(String streamId) {
        if (this.streamRecordSerializers.containsKey(streamId)) {
            return this.streamRecordSerializers.get(streamId);
        }
        throw new UndefinedStreamException("Stream " + streamId + " not defined");
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        if (this.isControlStream(element.getValue())) {
            this.onEventReceived(this.getControlEvent(element.getValue()));
            return;
        }
        String streamId = this.getStreamId(element.getValue());
        StreamSchema schema = this.siddhiPlan.getInputStreamSchema(streamId);
        if (this.isProcessingTime) {
            this.processEvent(streamId, schema, element.getValue(), System.currentTimeMillis());
            this.checkpointSiddhiRuntimeState();
        } else {
            PriorityQueue<StreamRecord<IN>> priorityQueue = this.getPriorityQueue();
            if (this.getExecutionConfig().isObjectReuseEnabled()) {
                priorityQueue.offer(new StreamRecord(schema.getTypeSerializer().copy(element.getValue()), element.getTimestamp()));
            } else {
                priorityQueue.offer(element);
            }
            this.checkpointRecordQueueState();
        }
    }

    protected abstract void processEvent(String var1, StreamSchema<IN> var2, IN var3, long var4) throws Exception;

    public void processWatermark(Watermark mark) throws Exception {
        while (!this.priorityQueue.isEmpty() && this.priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
            StreamRecord<IN> streamRecord = this.priorityQueue.poll();
            String streamId = this.getStreamId(streamRecord.getValue());
            long timestamp = streamRecord.getTimestamp();
            StreamSchema schema = this.siddhiPlan.getInputStreamSchema(streamId);
            this.processEvent(streamId, schema, streamRecord.getValue(), timestamp);
        }
        this.output.emitWatermark(mark);
    }

    public abstract String getStreamId(IN var1);

    public abstract boolean isControlStream(IN var1);

    public abstract ControlEvent getControlEvent(IN var1);

    public PriorityQueue<StreamRecord<IN>> getPriorityQueue() {
        return this.priorityQueue;
    }

    protected SiddhiOperatorContext getSiddhiPlan() {
        return this.siddhiPlan;
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        if (this.priorityQueue == null) {
            this.priorityQueue = new PriorityQueue(11, new StreamRecordComparator());
        }
        if (this.siddhiRuntimeHandlers == null) {
            this.siddhiRuntimeHandlers = new ConcurrentHashMap();
        }
    }

    public void open() throws Exception {
        super.open();
        this.startSiddhiManager();
    }

    void send(StreamRoute streamRoute, Object[] data, long timestamp) throws InterruptedException {
        for (String executionPlanId : streamRoute.getExecutionPlanIds()) {
            this.siddhiRuntimeHandlers.get(executionPlanId).send(streamRoute.getInputStreamId(), data, timestamp);
        }
    }

    private static void validate(SiddhiOperatorContext siddhiPlan) {
        SiddhiManager siddhiManager = siddhiPlan.createSiddhiManager();
        try {
            siddhiManager.validateSiddhiApp(siddhiPlan.getAllEnrichedExecutionPlan());
        }
        finally {
            siddhiManager.shutdown();
        }
    }

    private void startSiddhiManager() {
        this.siddhiManager = this.siddhiPlan.createSiddhiManager();
        for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
            this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
        }
        for (String id : this.siddhiPlan.getExecutionPlanMap().keySet()) {
            QueryRuntimeHandler handler = new QueryRuntimeHandler(this.siddhiPlan.getEnrichedExecutionPlan(id));
            handler.start();
            this.siddhiRuntimeHandlers.put(id, handler);
        }
    }

    public void close() throws Exception {
        for (QueryRuntimeHandler executor : this.siddhiRuntimeHandlers.values()) {
            executor.shutdown();
        }
        this.siddhiRuntimeHandlers.clear();
        super.close();
    }

    public void dispose() throws Exception {
        this.siddhiRuntimeState.clear();
        super.dispose();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.checkpointSiddhiRuntimeState();
        this.checkpointRecordQueueState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreState() throws Exception {
        LOGGER.info("Restore siddhi state");
        Iterator siddhiState = ((Iterable)this.siddhiRuntimeState.get()).iterator();
        if (siddhiState.hasNext()) {
            // empty if block
        }
        LOGGER.info("Restore queued records state");
        Iterator queueState = ((Iterable)this.queuedRecordsState.get()).iterator();
        if (queueState.hasNext()) {
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream((byte[])queueState.next());
            DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper((InputStream)byteArrayInputStream);
            try {
                this.priorityQueue = this.restoreQueueState((DataInputView)dataInputView);
            }
            finally {
                dataInputView.close();
                byteArrayInputStream.close();
            }
        }
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        if (this.siddhiRuntimeState == null) {
            this.siddhiRuntimeState = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor(SIDDHI_RUNTIME_STATE_NAME, (TypeSerializer)new BytePrimitiveArraySerializer()));
        }
        if (this.queuedRecordsState == null) {
            this.queuedRecordsState = context.getOperatorStateStore().getListState(new ListStateDescriptor(QUEUED_RECORDS_STATE_NAME, (TypeSerializer)new BytePrimitiveArraySerializer()));
        }
        if (context.isRestored()) {
            this.restoreState();
        }
    }

    private void checkpointSiddhiRuntimeState() throws Exception {
        this.siddhiRuntimeState.clear();
        for (Map.Entry<String, QueryRuntimeHandler> entry : this.siddhiRuntimeHandlers.entrySet()) {
            this.siddhiRuntimeState.add((Object)entry.getValue().siddhiRuntime.snapshot());
        }
        this.queuedRecordsState.clear();
    }

    private void checkpointRecordQueueState() throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper((OutputStream)byteArrayOutputStream);
        try {
            this.snapshotQueueState(this.priorityQueue, (DataOutputView)dataOutputView);
            this.queuedRecordsState.clear();
            this.queuedRecordsState.add((Object)byteArrayOutputStream.toByteArray());
        }
        finally {
            dataOutputView.close();
            byteArrayOutputStream.close();
        }
    }

    protected abstract void snapshotQueueState(PriorityQueue<StreamRecord<IN>> var1, DataOutputView var2) throws IOException;

    protected abstract PriorityQueue<StreamRecord<IN>> restoreQueueState(DataInputView var1) throws IOException;

    @Override
    public void onEventReceived(ControlEvent event) {
        block15: {
            block16: {
                block14: {
                    QueryRuntimeHandler handler;
                    if (event == null) {
                        LOGGER.warn("Null control event received and ignored");
                    }
                    if (!(event instanceof MetadataControlEvent)) break block14;
                    MetadataControlEvent metadataControlEvent = (MetadataControlEvent)event;
                    if (metadataControlEvent.getDeletedExecutionPlanId() != null) {
                        for (String string : metadataControlEvent.getDeletedExecutionPlanId()) {
                            this.siddhiPlan.removeExecutionPlan(string);
                            handler = this.siddhiRuntimeHandlers.remove(string);
                            if (handler == null) continue;
                            handler.shutdown();
                        }
                    }
                    if (metadataControlEvent.getAddedExecutionPlanMap() != null) {
                        for (Map.Entry entry : metadataControlEvent.getAddedExecutionPlanMap().entrySet()) {
                            this.siddhiPlan.addExecutionPlan((String)entry.getKey(), (String)entry.getValue());
                            handler = new QueryRuntimeHandler(this.siddhiPlan.getEnrichedExecutionPlan((String)entry.getKey()));
                            handler.start();
                            this.siddhiRuntimeHandlers.put((String)entry.getKey(), handler);
                        }
                    }
                    if (metadataControlEvent.getUpdatedExecutionPlanMap() != null) {
                        for (Map.Entry entry : metadataControlEvent.getUpdatedExecutionPlanMap().entrySet()) {
                            this.siddhiPlan.updateExecutionPlan((String)entry.getKey(), (String)entry.getValue());
                            QueryRuntimeHandler oldHandler = this.siddhiRuntimeHandlers.get(entry.getKey());
                            QueryRuntimeHandler handler2 = new QueryRuntimeHandler(this.siddhiPlan.getEnrichedExecutionPlan((String)entry.getKey()));
                            handler2.start();
                            this.siddhiRuntimeHandlers.put((String)entry.getKey(), handler2);
                            if (oldHandler == null) continue;
                            oldHandler.shutdown();
                        }
                    }
                    break block15;
                }
                if (!(event instanceof OperationControlEvent)) break block16;
                OperationControlEvent.Action action = ((OperationControlEvent)event).getAction();
                if (action == null) {
                    LOGGER.warn("OperationControlEvent.Action is null");
                    return;
                }
                switch (action) {
                    case ENABLE_QUERY: {
                        QueryRuntimeHandler handler = this.siddhiRuntimeHandlers.get(((OperationControlEvent)event).getQueryId());
                        if (handler != null) {
                            handler.enable();
                            break;
                        }
                        break block15;
                    }
                    case DISABLE_QUERY: {
                        QueryRuntimeHandler handler = this.siddhiRuntimeHandlers.get(((OperationControlEvent)event).getQueryId());
                        if (handler != null) {
                            handler.disable();
                            break;
                        }
                        break block15;
                    }
                    default: {
                        throw new IllegalStateException("Illegal action type " + (Object)((Object)action) + ": " + event);
                    }
                }
                break block15;
            }
            throw new IllegalStateException("Illegal event type " + event);
        }
    }

    private class QueryRuntimeHandler {
        private final SiddhiAppRuntime siddhiRuntime;
        private final Map<String, InputHandler> inputStreamHandlers = new HashMap<String, InputHandler>();
        private AtomicLong count = new AtomicLong(0L);
        private AtomicBoolean enabled = new AtomicBoolean(false);

        QueryRuntimeHandler(String executionPlan) {
            this.siddhiRuntime = AbstractSiddhiOperator.this.siddhiManager.createSiddhiAppRuntime(executionPlan);
        }

        void send(String streamId, Object[] data, long timestamp) throws InterruptedException {
            if (this.enabled.get()) {
                this.count.incrementAndGet();
                this.inputStreamHandlers.get(streamId).send(timestamp, data);
            }
        }

        private void start() {
            this.enable();
            this.siddhiRuntime.start();
            this.registerInputAndOutput();
            LOGGER.info("Siddhi {} started", (Object)this.siddhiRuntime.getName());
        }

        private void shutdown() {
            this.siddhiRuntime.shutdown();
            this.disable();
            LOGGER.info("Siddhi {} shutdown, processed {} events", (Object)this.siddhiRuntime.getName(), (Object)this.count.get());
        }

        public void enable() {
            this.enabled.set(true);
        }

        public void disable() {
            this.enabled.set(false);
        }

        private void registerInputAndOutput() {
            Map streamDefinitionMap = this.siddhiRuntime.getStreamDefinitionMap();
            for (String outputStreamId : AbstractSiddhiOperator.this.siddhiPlan.getOutputStreamTypes().keySet()) {
                AbstractDefinition definition = (AbstractDefinition)this.siddhiRuntime.getStreamDefinitionMap().get(outputStreamId);
                if (!streamDefinitionMap.containsKey(outputStreamId)) continue;
                this.siddhiRuntime.addCallback(outputStreamId, new StreamOutputHandler(outputStreamId, AbstractSiddhiOperator.this.siddhiPlan.getOutputStreamType(outputStreamId), definition, AbstractSiddhiOperator.this.output));
            }
            for (String inputStreamId : AbstractSiddhiOperator.this.siddhiPlan.getInputStreams()) {
                if (!streamDefinitionMap.containsKey(inputStreamId)) continue;
                this.inputStreamHandlers.put(inputStreamId, this.siddhiRuntime.getInputHandler(inputStreamId));
            }
        }
    }
}

