/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.siddhi.operator;

import java.io.IOException;
import java.util.PriorityQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.siddhi.operator.AbstractSiddhiOperator;
import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext;
import org.apache.flink.contrib.siddhi.schema.StreamSchema;
import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class TupleStreamSiddhiOperator<IN, OUT>
extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> {
    public TupleStreamSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
        super(siddhiPlan);
    }

    @Override
    protected MultiplexingStreamRecordSerializer<Tuple2<String, IN>> createStreamRecordSerializer(StreamSchema streamSchema, ExecutionConfig executionConfig) {
        TypeInformation tuple2TypeInformation = SiddhiTypeFactory.getStreamTupleTypeInformation(streamSchema.getTypeInfo());
        return new MultiplexingStreamRecordSerializer(tuple2TypeInformation.createSerializer(executionConfig));
    }

    @Override
    protected void processEvent(String streamId, StreamSchema<Tuple2<String, IN>> schema, Tuple2<String, IN> value, long timestamp) throws InterruptedException {
        this.send((String)value.f0, this.getSiddhiPlan().getInputStreamSchema((String)value.f0).getStreamSerializer().getRow(value.f1), timestamp);
    }

    @Override
    public String getStreamId(Tuple2<String, IN> record) {
        return (String)record.f0;
    }

    @Override
    protected void snapshotQueuerState(PriorityQueue<StreamRecord<Tuple2<String, IN>>> queue, DataOutputView dataOutputView) throws IOException {
        dataOutputView.writeInt(queue.size());
        for (StreamRecord<Tuple2<String, IN>> record : queue) {
            String streamId = (String)((Tuple2)record.getValue()).f0;
            dataOutputView.writeUTF(streamId);
            this.getStreamRecordSerializer(streamId).serialize(record, dataOutputView);
        }
    }

    @Override
    protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException {
        int sizeOfQueue = dataInputView.readInt();
        PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<StreamRecord<Tuple2<String, IN>>>(sizeOfQueue);
        for (int i = 0; i < sizeOfQueue; ++i) {
            String streamId = dataInputView.readUTF();
            StreamElement streamElement = this.getStreamRecordSerializer(streamId).deserialize(dataInputView);
            priorityQueue.offer(streamElement.asRecord());
        }
        return priorityQueue;
    }
}

