/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateWithPeriodicWatermarks;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateWithPunctuatedWatermarks;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

@Internal
public abstract class AbstractFetcher<T, KPH> {
    private static final int NO_TIMESTAMPS_WATERMARKS = 0;
    private static final int PERIODIC_WATERMARKS = 1;
    private static final int PUNCTUATED_WATERMARKS = 2;
    protected final SourceFunction.SourceContext<T> sourceContext;
    private final Object checkpointLock;
    private final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates;
    protected final ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> unassignedPartitionsQueue;
    private final int timestampWatermarkMode;
    private final SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic;
    private final SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated;
    private final ClassLoader userCodeClassLoader;
    private volatile long maxWatermarkSoFar = Long.MIN_VALUE;
    private final boolean useMetrics;
    private final MetricGroup consumerMetricGroup;
    @Deprecated
    private final MetricGroup legacyCurrentOffsetsMetricGroup;
    @Deprecated
    private final MetricGroup legacyCommittedOffsetsMetricGroup;

    protected AbstractFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception {
        this.sourceContext = (SourceFunction.SourceContext)Preconditions.checkNotNull(sourceContext);
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.userCodeClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeClassLoader);
        this.useMetrics = useMetrics;
        this.consumerMetricGroup = (MetricGroup)Preconditions.checkNotNull((Object)consumerMetricGroup);
        this.legacyCurrentOffsetsMetricGroup = consumerMetricGroup.addGroup("current-offsets");
        this.legacyCommittedOffsetsMetricGroup = consumerMetricGroup.addGroup("committed-offsets");
        this.watermarksPeriodic = watermarksPeriodic;
        this.watermarksPunctuated = watermarksPunctuated;
        if (watermarksPeriodic == null) {
            this.timestampWatermarkMode = watermarksPunctuated == null ? 0 : 2;
        } else if (watermarksPunctuated == null) {
            this.timestampWatermarkMode = 1;
        } else {
            throw new IllegalArgumentException("Cannot have both periodic and punctuated watermarks");
        }
        this.unassignedPartitionsQueue = new ClosableBlockingQueue();
        this.subscribedPartitionStates = this.createPartitionStateHolders(seedPartitionsWithInitialOffsets, this.timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, userCodeClassLoader);
        for (KafkaTopicPartitionState<KPH> partitionState : this.subscribedPartitionStates) {
            if (partitionState.isOffsetDefined()) continue;
            throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
        }
        for (KafkaTopicPartitionState<KPH> partition : this.subscribedPartitionStates) {
            this.unassignedPartitionsQueue.add(partition);
        }
        if (useMetrics) {
            this.registerOffsetMetrics(consumerMetricGroup, this.subscribedPartitionStates);
        }
        if (this.timestampWatermarkMode == 1) {
            PeriodicWatermarkEmitter<KPH> periodicEmitter = new PeriodicWatermarkEmitter<KPH>(this.subscribedPartitionStates, sourceContext, processingTimeProvider, autoWatermarkInterval);
            periodicEmitter.start();
        }
    }

    public void addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) throws IOException, ClassNotFoundException {
        List<KafkaTopicPartitionState<KPH>> newPartitionStates = this.createPartitionStateHolders(newPartitions, -915623761775L, this.timestampWatermarkMode, this.watermarksPeriodic, this.watermarksPunctuated, this.userCodeClassLoader);
        if (this.useMetrics) {
            this.registerOffsetMetrics(this.consumerMetricGroup, newPartitionStates);
        }
        for (KafkaTopicPartitionState<KPH> newPartitionState : newPartitionStates) {
            this.subscribedPartitionStates.add(newPartitionState);
            this.unassignedPartitionsQueue.add(newPartitionState);
        }
    }

    protected final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates() {
        return this.subscribedPartitionStates;
    }

    public abstract void runFetchLoop() throws Exception;

    public abstract void cancel();

    public final void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception {
        this.doCommitInternalOffsetsToKafka(this.filterOutSentinels(offsets), commitCallback);
    }

    protected abstract void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> var1, @Nonnull KafkaCommitCallback var2) throws Exception;

    private Map<KafkaTopicPartition, Long> filterOutSentinels(Map<KafkaTopicPartition, Long> offsets) {
        return offsets.entrySet().stream().filter(entry -> !KafkaTopicPartitionStateSentinel.isSentinel((Long)entry.getValue())).collect(Collectors.toMap(entry -> (KafkaTopicPartition)entry.getKey(), entry -> (Long)entry.getValue()));
    }

    protected abstract KPH createKafkaPartitionHandle(KafkaTopicPartition var1);

    public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
        assert (Thread.holdsLock(this.checkpointLock));
        HashMap<KafkaTopicPartition, Long> state = new HashMap<KafkaTopicPartition, Long>(this.subscribedPartitionStates.size());
        for (KafkaTopicPartitionState<KPH> partition : this.subscribedPartitionStates) {
            state.put(partition.getKafkaTopicPartition(), partition.getOffset());
        }
        return state;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void emitRecord(T record, KafkaTopicPartitionState<KPH> partitionState, long offset) throws Exception {
        if (record != null) {
            if (this.timestampWatermarkMode == 0) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    this.sourceContext.collect(record);
                    partitionState.setOffset(offset);
                }
            } else if (this.timestampWatermarkMode == 1) {
                this.emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, Long.MIN_VALUE);
            } else {
                this.emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, Long.MIN_VALUE);
            }
        } else {
            Object object = this.checkpointLock;
            synchronized (object) {
                partitionState.setOffset(offset);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void emitRecordWithTimestamp(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long timestamp) throws Exception {
        if (record != null) {
            if (this.timestampWatermarkMode == 0) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    this.sourceContext.collectWithTimestamp(record, timestamp);
                    partitionState.setOffset(offset);
                }
            } else if (this.timestampWatermarkMode == 1) {
                this.emitRecordWithTimestampAndPeriodicWatermark(record, partitionState, offset, timestamp);
            } else {
                this.emitRecordWithTimestampAndPunctuatedWatermark(record, partitionState, offset, timestamp);
            }
        } else {
            Object object = this.checkpointLock;
            synchronized (object) {
                partitionState.setOffset(offset);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitRecordWithTimestampAndPeriodicWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) {
        long timestamp;
        KafkaTopicPartitionStateWithPeriodicWatermarks withWatermarksState = (KafkaTopicPartitionStateWithPeriodicWatermarks)partitionState;
        Object object = withWatermarksState;
        synchronized (object) {
            timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
        }
        object = this.checkpointLock;
        synchronized (object) {
            this.sourceContext.collectWithTimestamp(record, timestamp);
            partitionState.setOffset(offset);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitRecordWithTimestampAndPunctuatedWatermark(T record, KafkaTopicPartitionState<KPH> partitionState, long offset, long kafkaEventTimestamp) {
        KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = (KafkaTopicPartitionStateWithPunctuatedWatermarks)partitionState;
        long timestamp = withWatermarksState.getTimestampForRecord(record, kafkaEventTimestamp);
        Watermark newWatermark = withWatermarksState.checkAndGetNewWatermark(record, timestamp);
        Object object = this.checkpointLock;
        synchronized (object) {
            this.sourceContext.collectWithTimestamp(record, timestamp);
            partitionState.setOffset(offset);
        }
        if (newWatermark != null) {
            this.updateMinPunctuatedWatermark(newWatermark);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateMinPunctuatedWatermark(Watermark nextWatermark) {
        if (nextWatermark.getTimestamp() > this.maxWatermarkSoFar) {
            long newMin = Long.MAX_VALUE;
            for (KafkaTopicPartitionState<KPH> state : this.subscribedPartitionStates) {
                KafkaTopicPartitionStateWithPunctuatedWatermarks withWatermarksState = (KafkaTopicPartitionStateWithPunctuatedWatermarks)state;
                newMin = Math.min(newMin, withWatermarksState.getCurrentPartitionWatermark());
            }
            if (newMin > this.maxWatermarkSoFar) {
                Object object = this.checkpointLock;
                synchronized (object) {
                    if (newMin > this.maxWatermarkSoFar) {
                        this.maxWatermarkSoFar = newMin;
                        this.sourceContext.emitWatermark(new Watermark(newMin));
                    }
                }
            }
        }
    }

    private List<KafkaTopicPartitionState<KPH>> createPartitionStateHolders(Map<KafkaTopicPartition, Long> partitionsToInitialOffsets, int timestampWatermarkMode, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
        CopyOnWriteArrayList<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<KafkaTopicPartitionState<KPH>>();
        switch (timestampWatermarkMode) {
            case 0: {
                for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
                    KPH kafkaHandle = this.createKafkaPartitionHandle(partitionEntry.getKey());
                    KafkaTopicPartitionState<KPH> partitionState = new KafkaTopicPartitionState<KPH>(partitionEntry.getKey(), kafkaHandle);
                    partitionState.setOffset(partitionEntry.getValue());
                    partitionStates.add(partitionState);
                }
                return partitionStates;
            }
            case 1: {
                for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
                    KPH kafkaHandle = this.createKafkaPartitionHandle(partitionEntry.getKey());
                    AssignerWithPeriodicWatermarks assignerInstance = (AssignerWithPeriodicWatermarks)watermarksPeriodic.deserializeValue(userCodeClassLoader);
                    KafkaTopicPartitionStateWithPeriodicWatermarks partitionState = new KafkaTopicPartitionStateWithPeriodicWatermarks(partitionEntry.getKey(), kafkaHandle, assignerInstance);
                    partitionState.setOffset(partitionEntry.getValue());
                    partitionStates.add(partitionState);
                }
                return partitionStates;
            }
            case 2: {
                for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
                    KPH kafkaHandle = this.createKafkaPartitionHandle(partitionEntry.getKey());
                    AssignerWithPunctuatedWatermarks assignerInstance = (AssignerWithPunctuatedWatermarks)watermarksPunctuated.deserializeValue(userCodeClassLoader);
                    KafkaTopicPartitionStateWithPunctuatedWatermarks partitionState = new KafkaTopicPartitionStateWithPunctuatedWatermarks(partitionEntry.getKey(), kafkaHandle, assignerInstance);
                    partitionState.setOffset(partitionEntry.getValue());
                    partitionStates.add(partitionState);
                }
                return partitionStates;
            }
        }
        throw new RuntimeException();
    }

    private List<KafkaTopicPartitionState<KPH>> createPartitionStateHolders(List<KafkaTopicPartition> partitions, long initialOffset, int timestampWatermarkMode, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
        HashMap<KafkaTopicPartition, Long> partitionsToInitialOffset = new HashMap<KafkaTopicPartition, Long>(partitions.size());
        for (KafkaTopicPartition partition : partitions) {
            partitionsToInitialOffset.put(partition, initialOffset);
        }
        return this.createPartitionStateHolders(partitionsToInitialOffset, timestampWatermarkMode, watermarksPeriodic, watermarksPunctuated, userCodeClassLoader);
    }

    private void registerOffsetMetrics(MetricGroup consumerMetricGroup, List<KafkaTopicPartitionState<KPH>> partitionOffsetStates) {
        for (KafkaTopicPartitionState<KPH> ktp : partitionOffsetStates) {
            MetricGroup topicPartitionGroup = consumerMetricGroup.addGroup("topic", ktp.getTopic()).addGroup("partition", Integer.toString(ktp.getPartition()));
            topicPartitionGroup.gauge("currentOffsets", (Gauge)new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
            topicPartitionGroup.gauge("committedOffsets", (Gauge)new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
            this.legacyCurrentOffsetsMetricGroup.gauge(AbstractFetcher.getLegacyOffsetsMetricsGaugeName(ktp), (Gauge)new OffsetGauge(ktp, OffsetGaugeType.CURRENT_OFFSET));
            this.legacyCommittedOffsetsMetricGroup.gauge(AbstractFetcher.getLegacyOffsetsMetricsGaugeName(ktp), (Gauge)new OffsetGauge(ktp, OffsetGaugeType.COMMITTED_OFFSET));
        }
    }

    private static String getLegacyOffsetsMetricsGaugeName(KafkaTopicPartitionState<?> ktp) {
        return ktp.getTopic() + "-" + ktp.getPartition();
    }

    private static class PeriodicWatermarkEmitter<KPH>
    implements ProcessingTimeCallback {
        private final List<KafkaTopicPartitionState<KPH>> allPartitions;
        private final SourceFunction.SourceContext<?> emitter;
        private final ProcessingTimeService timerService;
        private final long interval;
        private long lastWatermarkTimestamp;

        PeriodicWatermarkEmitter(List<KafkaTopicPartitionState<KPH>> allPartitions, SourceFunction.SourceContext<?> emitter, ProcessingTimeService timerService, long autoWatermarkInterval) {
            this.allPartitions = (List)Preconditions.checkNotNull(allPartitions);
            this.emitter = (SourceFunction.SourceContext)Preconditions.checkNotNull(emitter);
            this.timerService = (ProcessingTimeService)Preconditions.checkNotNull((Object)timerService);
            this.interval = autoWatermarkInterval;
            this.lastWatermarkTimestamp = Long.MIN_VALUE;
        }

        public void start() {
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, (ProcessingTimeCallback)this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onProcessingTime(long timestamp) throws Exception {
            long minAcrossAll = Long.MAX_VALUE;
            boolean isEffectiveMinAggregation = false;
            Iterator<KafkaTopicPartitionState<KPH>> iterator = this.allPartitions.iterator();
            while (iterator.hasNext()) {
                long curr;
                KafkaTopicPartitionState<KPH> state;
                KafkaTopicPartitionState<KPH> kafkaTopicPartitionState = state = iterator.next();
                synchronized (kafkaTopicPartitionState) {
                    curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks)state).getCurrentWatermarkTimestamp();
                }
                minAcrossAll = Math.min(minAcrossAll, curr);
                isEffectiveMinAggregation = true;
            }
            if (isEffectiveMinAggregation && minAcrossAll > this.lastWatermarkTimestamp) {
                this.lastWatermarkTimestamp = minAcrossAll;
                this.emitter.emitWatermark(new Watermark(minAcrossAll));
            }
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, (ProcessingTimeCallback)this);
        }
    }

    private static class OffsetGauge
    implements Gauge<Long> {
        private final KafkaTopicPartitionState<?> ktp;
        private final OffsetGaugeType gaugeType;

        OffsetGauge(KafkaTopicPartitionState<?> ktp, OffsetGaugeType gaugeType) {
            this.ktp = ktp;
            this.gaugeType = gaugeType;
        }

        public Long getValue() {
            switch (this.gaugeType) {
                case COMMITTED_OFFSET: {
                    return this.ktp.getCommittedOffset();
                }
                case CURRENT_OFFSET: {
                    return this.ktp.getOffset();
                }
            }
            throw new RuntimeException("Unknown gauge type: " + (Object)((Object)this.gaugeType));
        }
    }

    private static enum OffsetGaugeType {
        CURRENT_OFFSET,
        COMMITTED_OFFSET;

    }
}

