/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.segment.realtime.appenderator;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Joiner;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.ImmutableSet;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.google.common.util.concurrent.FutureCallback;
import org.apache.hive.druid.com.google.common.util.concurrent.Futures;
import org.apache.hive.druid.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.druid.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.com.google.common.util.concurrent.SettableFuture;
import org.apache.hive.druid.io.druid.concurrent.Execs;
import org.apache.hive.druid.io.druid.data.input.Committer;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.logger.Logger;
import org.apache.hive.druid.io.druid.query.SegmentDescriptor;
import org.apache.hive.druid.io.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.Appenderator;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.FiniteAppenderatorDriverMetadata;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentAllocator;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentNotWritableException;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.UsedSegmentChecker;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;

public class AppenderatorDriver
implements Closeable {
    private static final Logger log = new Logger(AppenderatorDriver.class);
    private final Appenderator appenderator;
    private final SegmentAllocator segmentAllocator;
    private final SegmentHandoffNotifier handoffNotifier;
    private final UsedSegmentChecker usedSegmentChecker;
    private final ObjectMapper objectMapper;
    private final FireDepartmentMetrics metrics;
    private final Map<String, NavigableMap<Long, SegmentIdentifier>> activeSegments = new TreeMap<String, NavigableMap<Long, SegmentIdentifier>>();
    private final Map<String, List<SegmentIdentifier>> publishPendingSegments = new HashMap<String, List<SegmentIdentifier>>();
    private final Map<String, String> lastSegmentIds = Maps.newHashMap();
    private final ListeningExecutorService publishExecutor;

    public AppenderatorDriver(Appenderator appenderator, SegmentAllocator segmentAllocator, SegmentHandoffNotifierFactory handoffNotifierFactory, UsedSegmentChecker usedSegmentChecker, ObjectMapper objectMapper, FireDepartmentMetrics metrics) {
        this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator");
        this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator");
        this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory").createSegmentHandoffNotifier(appenderator.getDataSource());
        this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker");
        this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
        this.metrics = Preconditions.checkNotNull(metrics, "metrics");
        this.publishExecutor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object startJob() {
        this.handoffNotifier.start();
        FiniteAppenderatorDriverMetadata metadata = this.objectMapper.convertValue(this.appenderator.startJob(), FiniteAppenderatorDriverMetadata.class);
        log.info("Restored metadata[%s].", metadata);
        if (metadata != null) {
            Map<String, NavigableMap<Long, SegmentIdentifier>> map = this.activeSegments;
            synchronized (map) {
                for (Map.Entry<String, List<SegmentIdentifier>> entry : metadata.getActiveSegments().entrySet()) {
                    String sequenceName = entry.getKey();
                    TreeMap<Long, SegmentIdentifier> segmentMap = Maps.newTreeMap();
                    this.activeSegments.put(sequenceName, segmentMap);
                    for (SegmentIdentifier identifier : entry.getValue()) {
                        segmentMap.put(identifier.getInterval().getStartMillis(), identifier);
                    }
                }
                this.publishPendingSegments.putAll(metadata.getPublishPendingSegments());
                this.lastSegmentIds.putAll(metadata.getLastSegmentIds());
            }
            return metadata.getCallerMetadata();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addSegment(String sequenceName, SegmentIdentifier identifier) {
        Map<String, NavigableMap<Long, SegmentIdentifier>> map = this.activeSegments;
        synchronized (map) {
            this.activeSegments.computeIfAbsent(sequenceName, k -> new TreeMap()).putIfAbsent(identifier.getInterval().getStartMillis(), identifier);
            this.publishPendingSegments.computeIfAbsent(sequenceName, k -> new ArrayList()).add(identifier);
            this.lastSegmentIds.put(sequenceName, identifier.getIdentifierAsString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clear() throws InterruptedException {
        Map<String, NavigableMap<Long, SegmentIdentifier>> map = this.activeSegments;
        synchronized (map) {
            this.activeSegments.clear();
        }
        this.appenderator.clear();
    }

    public AppenderatorDriverAddResult add(InputRow row, String sequenceName, Supplier<Committer> committerSupplier) throws IOException {
        Preconditions.checkNotNull(row, "row");
        Preconditions.checkNotNull(sequenceName, "sequenceName");
        Preconditions.checkNotNull(committerSupplier, "committerSupplier");
        SegmentIdentifier identifier = this.getSegment(row.getTimestamp(), sequenceName);
        if (identifier != null) {
            try {
                int numRows = this.appenderator.add(identifier, row, this.wrapCommitterSupplier(committerSupplier));
                return AppenderatorDriverAddResult.ok(identifier, numRows);
            }
            catch (SegmentNotWritableException e) {
                throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier);
            }
        }
        return AppenderatorDriverAddResult.fail();
    }

    public Object persist(Committer committer) throws InterruptedException {
        try {
            log.info("Persisting data.", new Object[0]);
            long start = System.currentTimeMillis();
            Object commitMetadata = this.appenderator.persistAll(this.wrapCommitter(committer)).get();
            log.info("Persisted pending data in %,dms.", System.currentTimeMillis() - start);
            return commitMetadata;
        }
        catch (InterruptedException e) {
            throw e;
        }
        catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    public ListenableFuture<SegmentsAndMetadata> registerHandoff(final SegmentsAndMetadata segmentsAndMetadata) {
        if (segmentsAndMetadata == null) {
            return Futures.immediateFuture(null);
        }
        List waitingSegmentIdList = segmentsAndMetadata.getSegments().stream().map(SegmentIdentifier::fromDataSegment).collect(Collectors.toList());
        if (waitingSegmentIdList.isEmpty()) {
            return Futures.immediateFuture(new SegmentsAndMetadata(segmentsAndMetadata.getSegments(), ((FiniteAppenderatorDriverMetadata)segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()));
        }
        log.info("Register handoff of segments: [%s]", waitingSegmentIdList);
        final SettableFuture<SegmentsAndMetadata> resultFuture = SettableFuture.create();
        final AtomicInteger numRemainingHandoffSegments = new AtomicInteger(waitingSegmentIdList.size());
        for (final SegmentIdentifier segmentIdentifier : waitingSegmentIdList) {
            this.handoffNotifier.registerSegmentHandoffCallback(new SegmentDescriptor(segmentIdentifier.getInterval(), segmentIdentifier.getVersion(), segmentIdentifier.getShardSpec().getPartitionNum()), MoreExecutors.sameThreadExecutor(), () -> {
                log.info("Segment[%s] successfully handed off, dropping.", segmentIdentifier);
                this.metrics.incrementHandOffCount();
                ListenableFuture<?> dropFuture = this.appenderator.drop(segmentIdentifier);
                Futures.addCallback(dropFuture, new FutureCallback<Object>(){

                    @Override
                    public void onSuccess(Object result) {
                        if (numRemainingHandoffSegments.decrementAndGet() == 0) {
                            log.info("All segments handed off.", new Object[0]);
                            resultFuture.set(new SegmentsAndMetadata(segmentsAndMetadata.getSegments(), ((FiniteAppenderatorDriverMetadata)segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()));
                        }
                    }

                    @Override
                    public void onFailure(Throwable e) {
                        log.warn(e, "Failed to drop segment[%s]?!", segmentIdentifier);
                        numRemainingHandoffSegments.decrementAndGet();
                        resultFuture.setException(e);
                    }
                });
            });
        }
        return resultFuture;
    }

    @Override
    public void close() {
        this.publishExecutor.shutdownNow();
        this.handoffNotifier.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private SegmentIdentifier getActiveSegment(DateTime timestamp, String sequenceName) {
        Map<String, NavigableMap<Long, SegmentIdentifier>> map = this.activeSegments;
        synchronized (map) {
            NavigableMap<Long, SegmentIdentifier> activeSegmentsForSequence = this.activeSegments.get(sequenceName);
            if (activeSegmentsForSequence == null) {
                return null;
            }
            Map.Entry<Long, SegmentIdentifier> candidateEntry = activeSegmentsForSequence.floorEntry(timestamp.getMillis());
            if (candidateEntry != null && candidateEntry.getValue().getInterval().contains((ReadableInstant)timestamp)) {
                return candidateEntry.getValue();
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private SegmentIdentifier getSegment(DateTime timestamp, String sequenceName) throws IOException {
        Map<String, NavigableMap<Long, SegmentIdentifier>> map = this.activeSegments;
        synchronized (map) {
            SegmentIdentifier existing = this.getActiveSegment(timestamp, sequenceName);
            if (existing != null) {
                return existing;
            }
            SegmentIdentifier newSegment = this.segmentAllocator.allocate(timestamp, sequenceName, this.lastSegmentIds.get(sequenceName));
            if (newSegment != null) {
                for (SegmentIdentifier identifier : this.appenderator.getSegments()) {
                    if (!identifier.equals(newSegment)) continue;
                    throw new ISE("WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", newSegment, identifier);
                }
                log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName);
                this.addSegment(sequenceName, newSegment);
            } else {
                log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName);
            }
            return newSegment;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void moveSegmentOut(String sequenceName, List<SegmentIdentifier> identifiers) {
        Map<String, NavigableMap<Long, SegmentIdentifier>> map = this.activeSegments;
        synchronized (map) {
            NavigableMap<Long, SegmentIdentifier> activeSegmentsForSequence = this.activeSegments.get(sequenceName);
            if (activeSegmentsForSequence == null) {
                throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName);
            }
            for (SegmentIdentifier identifier : identifiers) {
                log.info("Moving segment[%s] out of active list.", identifier);
                long key = identifier.getInterval().getStartMillis();
                if (((SegmentIdentifier)activeSegmentsForSequence.remove(key)).equals(identifier)) continue;
                throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<SegmentsAndMetadata> publish(TransactionalSegmentPublisher publisher, Committer committer, final Collection<String> sequenceNames) {
        List<SegmentIdentifier> theSegments;
        Map<String, NavigableMap<Long, SegmentIdentifier>> map = this.activeSegments;
        synchronized (map) {
            theSegments = sequenceNames.stream().map(this.publishPendingSegments::get).filter(Objects::nonNull).flatMap(Collection::stream).collect(Collectors.toList());
        }
        ListenableFuture<SegmentsAndMetadata> publishFuture = this.publish(publisher, this.wrapCommitter(committer), theSegments);
        Futures.addCallback(publishFuture, new FutureCallback<SegmentsAndMetadata>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onSuccess(@Nullable SegmentsAndMetadata result) {
                if (result != null) {
                    Map map = AppenderatorDriver.this.activeSegments;
                    synchronized (map) {
                        sequenceNames.forEach(sequenceName -> {
                            AppenderatorDriver.this.activeSegments.remove(sequenceName);
                            AppenderatorDriver.this.publishPendingSegments.remove(sequenceName);
                        });
                    }
                }
            }

            @Override
            public void onFailure(Throwable t) {
                log.error(t, "Failed to publish segments[%s]", theSegments);
            }
        });
        return publishFuture;
    }

    private ListenableFuture<SegmentsAndMetadata> publish(TransactionalSegmentPublisher publisher, WrappedCommitter wrappedCommitter, List<SegmentIdentifier> segmentIdentifiers) {
        return this.publishExecutor.submit(() -> {
            long nTry = 0L;
            while (true) {
                try {
                    log.info("Pushing segments: [%s]", Joiner.on(", ").join(segmentIdentifiers));
                    SegmentsAndMetadata segmentsAndMetadata = (SegmentsAndMetadata)this.appenderator.push(segmentIdentifiers, wrappedCommitter).get();
                    Set<SegmentIdentifier> pushedSegments = segmentsAndMetadata.getSegments().stream().map(SegmentIdentifier::fromDataSegment).collect(Collectors.toSet());
                    if (!pushedSegments.equals(Sets.newHashSet(segmentIdentifiers))) {
                        throw new ISE("WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", pushedSegments, segmentIdentifiers);
                    }
                    log.info("Publishing segments with commitMetadata[%s]: [%s]", segmentsAndMetadata.getCommitMetadata(), Joiner.on(", ").join(segmentsAndMetadata.getSegments()));
                    if (segmentsAndMetadata.getSegments().isEmpty()) {
                        log.info("Nothing to publish, skipping publish step.", new Object[0]);
                    } else {
                        boolean published = publisher.publishSegments(ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), ((FiniteAppenderatorDriverMetadata)segmentsAndMetadata.getCommitMetadata()).getCallerMetadata());
                        if (published) {
                            log.info("Published segments, awaiting handoff.", new Object[0]);
                        } else {
                            log.info("Transaction failure while publishing segments, checking if someone else beat us to it.", new Object[0]);
                            if (this.usedSegmentChecker.findUsedSegments(pushedSegments).equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
                                log.info("Our segments really do exist, awaiting handoff.", new Object[0]);
                            } else {
                                log.warn("Our segments don't exist, giving up.", new Object[0]);
                                return null;
                            }
                        }
                    }
                    return segmentsAndMetadata;
                }
                catch (InterruptedException e) {
                    throw e;
                }
                catch (Exception e) {
                    long sleepMillis = AppenderatorDriver.computeNextRetrySleep(++nTry);
                    log.warn(e, "Failed publish (try %d), retrying in %,dms.", nTry, sleepMillis);
                    Thread.sleep(sleepMillis);
                    continue;
                }
                break;
            }
        });
    }

    public ListenableFuture<SegmentsAndMetadata> publishAndRegisterHandoff(TransactionalSegmentPublisher publisher, Committer committer, Collection<String> sequenceNames) {
        return Futures.transform(this.publish(publisher, committer, sequenceNames), this::registerHandoff);
    }

    private Supplier<Committer> wrapCommitterSupplier(Supplier<Committer> committerSupplier) {
        return () -> this.wrapCommitter((Committer)committerSupplier.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private WrappedCommitter wrapCommitter(final Committer committer) {
        FiniteAppenderatorDriverMetadata wrappedMetadata;
        Map<String, NavigableMap<Long, SegmentIdentifier>> map = this.activeSegments;
        synchronized (map) {
            wrappedMetadata = new FiniteAppenderatorDriverMetadata(ImmutableMap.copyOf(Maps.transformValues(this.activeSegments, new Function<NavigableMap<Long, SegmentIdentifier>, List<SegmentIdentifier>>(){

                @Override
                public List<SegmentIdentifier> apply(NavigableMap<Long, SegmentIdentifier> input) {
                    return ImmutableList.copyOf(input.values());
                }
            })), ImmutableMap.copyOf(this.publishPendingSegments), ImmutableMap.copyOf(this.lastSegmentIds), committer.getMetadata());
        }
        return new WrappedCommitter(){

            @Override
            public Object getMetadata() {
                return wrappedMetadata;
            }

            @Override
            public void run() {
                committer.run();
            }
        };
    }

    private static long computeNextRetrySleep(long nTry) {
        long baseSleepMillis = 1000L;
        long maxSleepMillis = 60000L;
        double fuzzyMultiplier = Math.min(Math.max(1.0 + 0.2 * new Random().nextGaussian(), 0.0), 2.0);
        return (long)(Math.min(60000.0, 1000.0 * Math.pow(2.0, nTry)) * fuzzyMultiplier);
    }

    private static interface WrappedCommitter
    extends Committer {
    }
}

