/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.segment.realtime.appenderator;

import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.com.metamx.emitter.service.ServiceEmitter;
import org.apache.hive.druid.io.druid.client.CachingQueryRunner;
import org.apache.hive.druid.io.druid.client.cache.Cache;
import org.apache.hive.druid.io.druid.client.cache.CacheConfig;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.java.util.common.guava.CloseQuietly;
import org.apache.hive.druid.io.druid.java.util.common.guava.FunctionalIterable;
import org.apache.hive.druid.io.druid.query.BySegmentQueryRunner;
import org.apache.hive.druid.io.druid.query.CPUTimeMetricQueryRunner;
import org.apache.hive.druid.io.druid.query.MetricsEmittingQueryRunner;
import org.apache.hive.druid.io.druid.query.NoopQueryRunner;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryMetrics;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactory;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.hive.druid.io.druid.query.QueryRunnerHelper;
import org.apache.hive.druid.io.druid.query.QuerySegmentWalker;
import org.apache.hive.druid.io.druid.query.QueryToolChest;
import org.apache.hive.druid.io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.hive.druid.io.druid.query.SegmentDescriptor;
import org.apache.hive.druid.io.druid.query.TableDataSource;
import org.apache.hive.druid.io.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.hive.druid.io.druid.query.spec.SpecificSegmentSpec;
import org.apache.hive.druid.io.druid.segment.Segment;
import org.apache.hive.druid.io.druid.segment.realtime.FireHydrant;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Sink;
import org.apache.hive.druid.io.druid.timeline.TimelineObjectHolder;
import org.apache.hive.druid.io.druid.timeline.VersionedIntervalTimeline;
import org.apache.hive.druid.io.druid.timeline.partition.PartitionChunk;
import org.apache.hive.druid.io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;

public class SinkQuerySegmentWalker
implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
    private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
    private final String dataSource;
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
    private final ObjectMapper objectMapper;
    private final ServiceEmitter emitter;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private final ExecutorService queryExecutorService;
    private final Cache cache;
    private final CacheConfig cacheConfig;

    public SinkQuerySegmentWalker(String dataSource, VersionedIntervalTimeline<String, Sink> sinkTimeline, ObjectMapper objectMapper, ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, ExecutorService queryExecutorService, Cache cache, CacheConfig cacheConfig) {
        this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
        this.sinkTimeline = Preconditions.checkNotNull(sinkTimeline, "sinkTimeline");
        this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
        this.emitter = Preconditions.checkNotNull(emitter, "emitter");
        this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate");
        this.queryExecutorService = Preconditions.checkNotNull(queryExecutorService, "queryExecutorService");
        this.cache = Preconditions.checkNotNull(cache, "cache");
        this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig");
        if (!cache.isLocal()) {
            log.warn("Configured cache[%s] is not local, caching will not be enabled.", cache.getClass().getName());
        }
    }

    @Override
    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        FunctionalIterable<SegmentDescriptor> specs = FunctionalIterable.create(intervals).transformCat(new Function<Interval, Iterable<TimelineObjectHolder<String, Sink>>>(){

            @Override
            public Iterable<TimelineObjectHolder<String, Sink>> apply(Interval interval) {
                return SinkQuerySegmentWalker.this.sinkTimeline.lookup(interval);
            }
        }).transformCat(new Function<TimelineObjectHolder<String, Sink>, Iterable<SegmentDescriptor>>(){

            @Override
            public Iterable<SegmentDescriptor> apply(final TimelineObjectHolder<String, Sink> holder) {
                return FunctionalIterable.create(holder.getObject()).transform(new Function<PartitionChunk<Sink>, SegmentDescriptor>(){

                    @Override
                    public SegmentDescriptor apply(PartitionChunk<Sink> chunk) {
                        return new SegmentDescriptor(holder.getInterval(), (String)holder.getVersion(), chunk.getChunkNumber());
                    }
                });
            }
        });
        return this.getQueryRunnerForSegments(query, specs);
    }

    @Override
    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs) {
        if (!(query.getDataSource() instanceof TableDataSource) || !this.dataSource.equals(((TableDataSource)query.getDataSource()).getName())) {
            log.makeAlert("Received query for unknown dataSource", new Object[0]).addData("dataSource", query.getDataSource()).emit();
            return new NoopQueryRunner();
        }
        final QueryRunnerFactory factory = this.conglomerate.findFactory(query);
        if (factory == null) {
            throw new ISE("Unknown query type[%s].", query.getClass());
        }
        final QueryToolChest toolChest = factory.getToolchest();
        final boolean skipIncrementalSegment = query.getContextValue(CONTEXT_SKIP_INCREMENTAL_SEGMENT, false);
        final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
        return CPUTimeMetricQueryRunner.safeBuild(toolChest.mergeResults(factory.mergeRunners(this.queryExecutorService, FunctionalIterable.create(specs).transform(new Function<SegmentDescriptor, QueryRunner<T>>(){

            @Override
            public QueryRunner<T> apply(final SegmentDescriptor descriptor) {
                PartitionHolder holder = SinkQuerySegmentWalker.this.sinkTimeline.findEntry(descriptor.getInterval(), descriptor.getVersion());
                if (holder == null) {
                    return new ReportTimelineMissingSegmentQueryRunner(descriptor);
                }
                PartitionChunk chunk = holder.getChunk(descriptor.getPartitionNumber());
                if (chunk == null) {
                    return new ReportTimelineMissingSegmentQueryRunner(descriptor);
                }
                Sink theSink = (Sink)chunk.getObject();
                String sinkSegmentIdentifier = theSink.getSegment().getIdentifier();
                return new SpecificSegmentQueryRunner(SinkQuerySegmentWalker.this.withPerSinkMetrics(new BySegmentQueryRunner(sinkSegmentIdentifier, descriptor.getInterval().getStart(), factory.mergeRunners(MoreExecutors.sameThreadExecutor(), Iterables.transform(theSink, new Function<FireHydrant, QueryRunner<T>>(){

                    @Override
                    public QueryRunner<T> apply(FireHydrant hydrant) {
                        boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
                        if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
                            return new NoopQueryRunner();
                        }
                        Pair<Segment, Closeable> segment = hydrant.getAndIncrementSegment();
                        try {
                            QueryRunner baseRunner = QueryRunnerHelper.makeClosingQueryRunner(factory.createRunner((Segment)segment.lhs), (Closeable)segment.rhs);
                            if (hydrantDefinitelySwapped && SinkQuerySegmentWalker.this.cache.isLocal()) {
                                return new CachingQueryRunner(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant), descriptor, SinkQuerySegmentWalker.this.objectMapper, SinkQuerySegmentWalker.this.cache, toolChest, baseRunner, MoreExecutors.sameThreadExecutor(), SinkQuerySegmentWalker.this.cacheConfig);
                            }
                            return baseRunner;
                        }
                        catch (RuntimeException e) {
                            CloseQuietly.close((Closeable)segment.rhs);
                            throw e;
                        }
                    }
                }))), toolChest, sinkSegmentIdentifier, cpuTimeAccumulator), new SpecificSegmentSpec(descriptor));
            }
        }))), toolChest, this.emitter, cpuTimeAccumulator, true);
    }

    private <T> QueryRunner<T> withPerSinkMetrics(QueryRunner<T> sinkRunner, QueryToolChest<T, ? extends Query<T>> queryToolChest, String sinkSegmentIdentifier, AtomicLong cpuTimeAccumulator) {
        return CPUTimeMetricQueryRunner.safeBuild(new MetricsEmittingQueryRunner<T>(this.emitter, queryToolChest, new MetricsEmittingQueryRunner<T>(this.emitter, queryToolChest, sinkRunner, QueryMetrics::reportSegmentTime, queryMetrics -> queryMetrics.segment(sinkSegmentIdentifier)), QueryMetrics::reportSegmentAndCacheTime, queryMetrics -> queryMetrics.segment(sinkSegmentIdentifier)).withWaitMeasuredFromNow(), queryToolChest, this.emitter, cpuTimeAccumulator, false);
    }

    public static String makeHydrantCacheIdentifier(FireHydrant input) {
        return input.getSegment().getIdentifier() + "_" + input.getCount();
    }
}

