/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.siddhi.router;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.siddhi.control.ControlEvent;
import org.apache.flink.streaming.siddhi.control.MetadataControlEvent;
import org.apache.flink.streaming.siddhi.control.OperationControlEvent;
import org.apache.flink.streaming.siddhi.router.StreamRoute;
import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
import org.apache.flink.streaming.siddhi.utils.SiddhiExecutionPlanner;

public class AddRouteOperator
extends AbstractStreamOperator<Tuple2<StreamRoute, Object>>
implements OneInputStreamOperator<Tuple2<StreamRoute, Object>, Tuple2<StreamRoute, Object>> {
    private Map<String, Set<String>> inputStreamToExecutionPlans = new HashMap<String, Set<String>>();
    private Map<String, List<String>> executionPlanIdToPartitionKeys = new HashMap<String, List<String>>();
    private Map<String, Boolean> executionPlanEnabled = new HashMap<String, Boolean>();
    private Map<String, SiddhiStreamSchema<?>> dataStreamSchemas;

    public AddRouteOperator(Map<String, SiddhiStreamSchema<?>> dataStreamSchemas) {
        this.dataStreamSchemas = new HashMap(dataStreamSchemas);
    }

    public void processElement(StreamRecord<Tuple2<StreamRoute, Object>> element) throws Exception {
        StreamRoute streamRoute = (StreamRoute)((Tuple2)element.getValue()).f0;
        Object value = ((Tuple2)element.getValue()).f1;
        if (value instanceof ControlEvent) {
            streamRoute.setBroadCastPartitioning(true);
            if (value instanceof OperationControlEvent) {
                this.handleOperationControlEvent((OperationControlEvent)value);
            } else if (value instanceof MetadataControlEvent) {
                this.handleMetadataControlEvent((MetadataControlEvent)value);
            }
            this.output.collect(element);
        } else {
            String inputStreamId = streamRoute.getInputStreamId();
            if (!this.inputStreamToExecutionPlans.containsKey(inputStreamId)) {
                return;
            }
            for (String executionPlanId : this.inputStreamToExecutionPlans.get(inputStreamId)) {
                if (!this.executionPlanEnabled.get(executionPlanId).booleanValue()) continue;
                streamRoute.getExecutionPlanIds().clear();
                List<String> partitionKeys = this.executionPlanIdToPartitionKeys.get(executionPlanId);
                SiddhiStreamSchema<?> schema = this.dataStreamSchemas.get(inputStreamId);
                String[] fieldNames = schema.getFieldNames();
                Object[] row = schema.getStreamSerializer().getRow(value);
                streamRoute.setPartitionKey(-1L);
                for (String partitionKey : partitionKeys) {
                    long partitionValue = 0L;
                    for (int i = 0; i < fieldNames.length; ++i) {
                        if (!partitionKey.equals(fieldNames[i])) continue;
                        partitionValue += (long)row[i].hashCode();
                    }
                    streamRoute.setPartitionKey(Math.abs(partitionValue));
                }
                streamRoute.addExecutionPlanId(executionPlanId);
                this.output.collect(element);
            }
        }
    }

    private void handleMetadataControlEvent(MetadataControlEvent event) throws Exception {
        String executionPlan;
        if (event.getDeletedExecutionPlanId() != null) {
            for (String executionPlanId : event.getDeletedExecutionPlanId()) {
                for (String string : this.inputStreamToExecutionPlans.keySet()) {
                    this.inputStreamToExecutionPlans.get(string).remove(executionPlanId);
                    if (!this.inputStreamToExecutionPlans.get(string).isEmpty()) continue;
                    this.inputStreamToExecutionPlans.remove(string);
                }
                this.executionPlanIdToPartitionKeys.remove(executionPlanId);
                this.executionPlanEnabled.remove(executionPlanId);
            }
        }
        if (event.getAddedExecutionPlanMap() != null) {
            for (String executionPlanId : event.getAddedExecutionPlanMap().keySet()) {
                for (Set set : this.inputStreamToExecutionPlans.values()) {
                    if (!set.contains(executionPlanId)) continue;
                    throw new Exception("Execution plan " + executionPlanId + " already exists!");
                }
                executionPlan = event.getAddedExecutionPlanMap().get(executionPlanId);
                this.handleExecutionPlan(executionPlanId, executionPlan);
                this.executionPlanEnabled.put(executionPlanId, true);
            }
        }
        if (event.getUpdatedExecutionPlanMap() != null) {
            for (String executionPlanId : event.getUpdatedExecutionPlanMap().keySet()) {
                if (!this.executionPlanEnabled.containsKey(executionPlanId)) {
                    throw new Exception("Execution plan " + executionPlanId + " does not exist!");
                }
                executionPlan = event.getUpdatedExecutionPlanMap().get(executionPlanId);
                this.handleExecutionPlan(executionPlanId, executionPlan);
            }
        }
    }

    private void handleOperationControlEvent(OperationControlEvent event) throws Exception {
        OperationControlEvent.Action action = event.getAction();
        if (action == null) {
            throw new Exception("OperationControlEvent.Action is null");
        }
        switch (action) {
            case ENABLE_QUERY: {
                this.executionPlanEnabled.put(event.getQueryId(), true);
                break;
            }
            case DISABLE_QUERY: {
                this.executionPlanEnabled.put(event.getQueryId(), false);
                break;
            }
            default: {
                throw new IllegalStateException("Illegal action type " + (Object)((Object)action) + ": " + event);
            }
        }
    }

    private void handleExecutionPlan(String executionPlanId, String executionPlan) throws Exception {
        Map<String, SiddhiExecutionPlanner.StreamPartition> streamPartitions = SiddhiExecutionPlanner.of(this.dataStreamSchemas, executionPlan).getStreamPartitions();
        for (String inputStreamId : streamPartitions.keySet()) {
            if (!this.inputStreamToExecutionPlans.containsKey(inputStreamId)) {
                this.inputStreamToExecutionPlans.put(inputStreamId, new HashSet());
            }
            if (!this.executionPlanIdToPartitionKeys.containsKey(executionPlanId)) {
                this.executionPlanIdToPartitionKeys.put(executionPlanId, new ArrayList());
            }
            this.inputStreamToExecutionPlans.get(inputStreamId).add(executionPlanId);
            if (streamPartitions.get(inputStreamId).getPartitonWithList().isEmpty()) {
                this.executionPlanIdToPartitionKeys.get(executionPlanId).addAll(streamPartitions.get(inputStreamId).getGroupByList());
                continue;
            }
            this.executionPlanIdToPartitionKeys.get(executionPlanId).addAll(streamPartitions.get(inputStreamId).getPartitonWithList());
        }
    }
}

