/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.client;

import com.google.inject.Inject;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Predicate;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.collect.Ordering;
import org.apache.hive.druid.com.metamx.emitter.service.ServiceEmitter;
import org.apache.hive.druid.com.metamx.http.client.HttpClient;
import org.apache.hive.druid.io.druid.client.BrokerSegmentWatcherConfig;
import org.apache.hive.druid.io.druid.client.DirectDruidClient;
import org.apache.hive.druid.io.druid.client.DruidServer;
import org.apache.hive.druid.io.druid.client.FilteredServerInventoryView;
import org.apache.hive.druid.io.druid.client.ServerView;
import org.apache.hive.druid.io.druid.client.TimelineServerView;
import org.apache.hive.druid.io.druid.client.selector.QueryableDruidServer;
import org.apache.hive.druid.io.druid.client.selector.ServerSelector;
import org.apache.hive.druid.io.druid.client.selector.TierSelectorStrategy;
import org.apache.hive.druid.io.druid.concurrent.Execs;
import org.apache.hive.druid.io.druid.guice.annotations.Client;
import org.apache.hive.druid.io.druid.guice.annotations.Smile;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.java.util.common.logger.Logger;
import org.apache.hive.druid.io.druid.query.DataSource;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.query.QueryToolChestWarehouse;
import org.apache.hive.druid.io.druid.query.QueryWatcher;
import org.apache.hive.druid.io.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.druid.timeline.VersionedIntervalTimeline;
import org.apache.hive.druid.io.druid.timeline.partition.PartitionChunk;

public class BrokerServerView
implements TimelineServerView {
    private static final Logger log = new Logger(BrokerServerView.class);
    private final Object lock = new Object();
    private final ConcurrentMap<String, QueryableDruidServer> clients;
    private final Map<String, ServerSelector> selectors;
    private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
    private final ConcurrentMap<TimelineServerView.TimelineCallback, Executor> timelineCallbacks = new ConcurrentHashMap<TimelineServerView.TimelineCallback, Executor>();
    private final QueryToolChestWarehouse warehouse;
    private final QueryWatcher queryWatcher;
    private final ObjectMapper smileMapper;
    private final HttpClient httpClient;
    private final FilteredServerInventoryView baseView;
    private final TierSelectorStrategy tierSelectorStrategy;
    private final ServiceEmitter emitter;
    private final Predicate<Pair<DruidServerMetadata, DataSegment>> segmentFilter;
    private volatile boolean initialized = false;

    @Inject
    public BrokerServerView(QueryToolChestWarehouse warehouse, QueryWatcher queryWatcher, @Smile ObjectMapper smileMapper, @Client HttpClient httpClient, FilteredServerInventoryView baseView, TierSelectorStrategy tierSelectorStrategy, ServiceEmitter emitter, final BrokerSegmentWatcherConfig segmentWatcherConfig) {
        this.warehouse = warehouse;
        this.queryWatcher = queryWatcher;
        this.smileMapper = smileMapper;
        this.httpClient = httpClient;
        this.baseView = baseView;
        this.tierSelectorStrategy = tierSelectorStrategy;
        this.emitter = emitter;
        this.clients = Maps.newConcurrentMap();
        this.selectors = Maps.newHashMap();
        this.timelines = Maps.newHashMap();
        this.segmentFilter = new Predicate<Pair<DruidServerMetadata, DataSegment>>(){

            @Override
            public boolean apply(Pair<DruidServerMetadata, DataSegment> input) {
                if (segmentWatcherConfig.getWatchedTiers() != null && !segmentWatcherConfig.getWatchedTiers().contains(((DruidServerMetadata)input.lhs).getTier())) {
                    return false;
                }
                return segmentWatcherConfig.getWatchedDataSources() == null || segmentWatcherConfig.getWatchedDataSources().contains(((DataSegment)input.rhs).getDataSource());
            }
        };
        ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s");
        baseView.registerSegmentCallback(exec, new ServerView.SegmentCallback(){

            @Override
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) {
                BrokerServerView.this.serverAddedSegment(server, segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override
            public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) {
                BrokerServerView.this.serverRemovedSegment(server, segment);
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override
            public ServerView.CallbackAction segmentViewInitialized() {
                BrokerServerView.this.initialized = true;
                BrokerServerView.this.runTimelineCallbacks(TimelineServerView.TimelineCallback::timelineInitialized);
                return ServerView.CallbackAction.CONTINUE;
            }
        }, this.segmentFilter);
        baseView.registerServerCallback(exec, new ServerView.ServerCallback(){

            @Override
            public ServerView.CallbackAction serverRemoved(DruidServer server) {
                BrokerServerView.this.removeServer(server);
                return ServerView.CallbackAction.CONTINUE;
            }
        });
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clear() {
        Object object = this.lock;
        synchronized (object) {
            Iterator clientsIter = this.clients.keySet().iterator();
            while (clientsIter.hasNext()) {
                clientsIter.remove();
            }
            this.timelines.clear();
            Iterator<ServerSelector> selectorsIter = this.selectors.values().iterator();
            while (selectorsIter.hasNext()) {
                ServerSelector selector = selectorsIter.next();
                selectorsIter.remove();
                while (!selector.isEmpty()) {
                    QueryableDruidServer pick = selector.pick();
                    selector.removeServer(pick);
                }
            }
        }
    }

    private QueryableDruidServer addServer(DruidServer server) {
        QueryableDruidServer retVal = new QueryableDruidServer(server, this.makeDirectClient(server));
        QueryableDruidServer exists = this.clients.put(server.getName(), retVal);
        if (exists != null) {
            log.warn("QueryRunner for server[%s] already existed!? Well it's getting replaced", server);
        }
        return retVal;
    }

    private DirectDruidClient makeDirectClient(DruidServer server) {
        return new DirectDruidClient(this.warehouse, this.queryWatcher, this.smileMapper, this.httpClient, server.getHost(), this.emitter);
    }

    private QueryableDruidServer removeServer(DruidServer server) {
        for (DataSegment segment : server.getSegments().values()) {
            this.serverRemovedSegment(server.getMetadata(), segment);
        }
        return (QueryableDruidServer)this.clients.remove(server.getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void serverAddedSegment(DruidServerMetadata server, DataSegment segment) {
        String segmentId = segment.getIdentifier();
        Object object = this.lock;
        synchronized (object) {
            QueryableDruidServer queryableDruidServer;
            log.debug("Adding segment[%s] for server[%s]", segment, server);
            ServerSelector selector = this.selectors.get(segmentId);
            if (selector == null) {
                selector = new ServerSelector(segment, this.tierSelectorStrategy);
                VersionedIntervalTimeline<String, ServerSelector> timeline = this.timelines.get(segment.getDataSource());
                if (timeline == null) {
                    timeline = new VersionedIntervalTimeline(Ordering.natural());
                    this.timelines.put(segment.getDataSource(), timeline);
                }
                timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector));
                this.selectors.put(segmentId, selector);
            }
            if ((queryableDruidServer = (QueryableDruidServer)this.clients.get(server.getName())) == null) {
                queryableDruidServer = this.addServer(this.baseView.getInventoryValue(server.getName()));
            }
            selector.addServerAndUpdateSegment(queryableDruidServer, segment);
            this.runTimelineCallbacks(callback -> callback.segmentAdded(server, segment));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) {
        String segmentId = segment.getIdentifier();
        Object object = this.lock;
        synchronized (object) {
            log.debug("Removing segment[%s] from server[%s].", segmentId, server);
            ServerSelector selector = this.selectors.get(segmentId);
            if (selector == null) {
                log.warn("Told to remove non-existant segment[%s]", segmentId);
                return;
            }
            QueryableDruidServer queryableDruidServer = (QueryableDruidServer)this.clients.get(server.getName());
            if (!selector.removeServer(queryableDruidServer)) {
                log.warn("Asked to disassociate non-existant association between server[%s] and segment[%s]", server, segmentId);
            }
            if (selector.isEmpty()) {
                VersionedIntervalTimeline<String, ServerSelector> timeline = this.timelines.get(segment.getDataSource());
                this.selectors.remove(segmentId);
                PartitionChunk<ServerSelector> removedPartition = timeline.remove(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector));
                if (removedPartition == null) {
                    log.warn("Asked to remove timeline entry[interval: %s, version: %s] that doesn't exist", segment.getInterval(), segment.getVersion());
                } else {
                    this.runTimelineCallbacks(callback -> callback.segmentRemoved(segment));
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource) {
        String table = Iterables.getOnlyElement(dataSource.getNames());
        Object object = this.lock;
        synchronized (object) {
            return this.timelines.get(table);
        }
    }

    @Override
    public void registerTimelineCallback(Executor exec, TimelineServerView.TimelineCallback callback) {
        this.timelineCallbacks.put(callback, exec);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> QueryRunner<T> getQueryRunner(DruidServer server) {
        Object object = this.lock;
        synchronized (object) {
            QueryableDruidServer queryableDruidServer = (QueryableDruidServer)this.clients.get(server.getName());
            if (queryableDruidServer == null) {
                log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
                return null;
            }
            return queryableDruidServer.getClient();
        }
    }

    @Override
    public void registerServerCallback(Executor exec, ServerView.ServerCallback callback) {
        this.baseView.registerServerCallback(exec, callback);
    }

    @Override
    public void registerSegmentCallback(Executor exec, ServerView.SegmentCallback callback) {
        this.baseView.registerSegmentCallback(exec, callback, this.segmentFilter);
    }

    private void runTimelineCallbacks(Function<TimelineServerView.TimelineCallback, ServerView.CallbackAction> function) {
        for (Map.Entry entry : this.timelineCallbacks.entrySet()) {
            ((Executor)entry.getValue()).execute(() -> {
                if (ServerView.CallbackAction.UNREGISTER == function.apply((TimelineServerView.TimelineCallback)entry.getKey())) {
                    this.timelineCallbacks.remove(entry.getKey());
                }
            });
        }
    }
}

