/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.client;

import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Predicate;
import org.apache.hive.druid.com.google.common.base.Predicates;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.MapMaker;
import org.apache.hive.druid.com.google.common.net.HostAndPort;
import org.apache.hive.druid.com.google.common.util.concurrent.FutureCallback;
import org.apache.hive.druid.com.google.common.util.concurrent.Futures;
import org.apache.hive.druid.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.druid.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.com.metamx.http.client.HttpClient;
import org.apache.hive.druid.com.metamx.http.client.Request;
import org.apache.hive.druid.com.metamx.http.client.io.AppendableByteArrayInputStream;
import org.apache.hive.druid.com.metamx.http.client.response.ClientResponse;
import org.apache.hive.druid.com.metamx.http.client.response.InputStreamResponseHandler;
import org.apache.hive.druid.io.druid.client.DruidServer;
import org.apache.hive.druid.io.druid.client.DruidServerDiscovery;
import org.apache.hive.druid.io.druid.client.FilteredServerInventoryView;
import org.apache.hive.druid.io.druid.client.HttpServerInventoryViewConfig;
import org.apache.hive.druid.io.druid.client.ServerInventoryView;
import org.apache.hive.druid.io.druid.client.ServerView;
import org.apache.hive.druid.io.druid.concurrent.LifecycleLock;
import org.apache.hive.druid.io.druid.guice.annotations.Global;
import org.apache.hive.druid.io.druid.guice.annotations.Json;
import org.apache.hive.druid.io.druid.guice.annotations.Smile;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.java.util.common.StringUtils;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.hive.druid.io.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestHistory;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestsSnapshot;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.org.jboss.netty.handler.codec.http.HttpMethod;
import org.apache.hive.druid.org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;

public class HttpServerInventoryView
implements ServerInventoryView,
FilteredServerInventoryView {
    private final EmittingLogger log = new EmittingLogger(HttpServerInventoryView.class);
    private final DruidServerDiscovery serverDiscovery;
    private final LifecycleLock lifecycleLock = new LifecycleLock();
    private final ConcurrentMap<ServerView.ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
    private final ConcurrentMap<ServerView.SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();
    private final ConcurrentMap<ServerView.SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new MapMaker().makeMap();
    private final Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter;
    private volatile Predicate<Pair<DruidServerMetadata, DataSegment>> finalPredicate;
    private final Map<String, DruidServerHolder> servers = new HashMap<String, DruidServerHolder>();
    private volatile ExecutorService executor;
    private final BlockingQueue<String> queue = new LinkedBlockingDeque<String>();
    private final HttpClient httpClient;
    private final ObjectMapper smileMapper;
    private final HttpServerInventoryViewConfig config;

    @Inject
    public HttpServerInventoryView(@Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, @Global HttpClient httpClient, DruidServerDiscovery serverDiscovery, Predicate<Pair<DruidServerMetadata, DataSegment>> defaultFilter, HttpServerInventoryViewConfig config) {
        this.httpClient = httpClient;
        this.smileMapper = smileMapper;
        this.serverDiscovery = serverDiscovery;
        this.defaultFilter = defaultFilter;
        this.finalPredicate = defaultFilter;
        this.config = config;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStart
    public void start() throws Exception {
        LifecycleLock lifecycleLock = this.lifecycleLock;
        synchronized (lifecycleLock) {
            if (!this.lifecycleLock.canStart()) {
                throw new ISE("can't start.", new Object[0]);
            }
            this.log.info("Starting HttpServerInventoryView.", new Object[0]);
            try {
                this.executor = Executors.newFixedThreadPool(this.config.getNumThreads(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HttpServerInventoryView-%s").build());
                this.executor.execute(new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        if (!HttpServerInventoryView.this.lifecycleLock.awaitStarted()) {
                            HttpServerInventoryView.this.log.error("WTF! lifecycle not started, segments will not be discovered.", new Object[0]);
                            return;
                        }
                        while (!Thread.interrupted() && HttpServerInventoryView.this.lifecycleLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
                            try {
                                String name = (String)HttpServerInventoryView.this.queue.take();
                                Map map = HttpServerInventoryView.this.servers;
                                synchronized (map) {
                                    DruidServerHolder holder = (DruidServerHolder)HttpServerInventoryView.this.servers.get(name);
                                    if (holder != null) {
                                        holder.updateSegmentsListAsync();
                                    }
                                }
                            }
                            catch (InterruptedException ex) {
                                HttpServerInventoryView.this.log.info("main thread interrupted, served segments list is not synced anymore.", new Object[0]);
                                Thread.currentThread().interrupt();
                            }
                            catch (Throwable th) {
                                HttpServerInventoryView.this.log.makeAlert(th, "main thread ignored error", new Object[0]).emit();
                            }
                        }
                        HttpServerInventoryView.this.log.info("HttpServerInventoryView main thread exited.", new Object[0]);
                    }
                });
                this.serverDiscovery.registerListener(new DruidServerDiscovery.Listener(){

                    @Override
                    public void serverAdded(DruidServer server) {
                        HttpServerInventoryView.this.serverAddedOrUpdated(server);
                    }

                    @Override
                    public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer) {
                        return HttpServerInventoryView.this.serverAddedOrUpdated(newServer);
                    }

                    @Override
                    public void serverRemoved(DruidServer server) {
                        HttpServerInventoryView.this.serverRemoved(server);
                        HttpServerInventoryView.this.runServerCallbacks(server);
                    }

                    @Override
                    public void initialized() {
                        HttpServerInventoryView.this.serverInventoryInitialized();
                    }
                });
                this.serverDiscovery.start();
                this.log.info("Started HttpServerInventoryView.", new Object[0]);
                this.lifecycleLock.started();
            }
            finally {
                this.lifecycleLock.exitStart();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @LifecycleStop
    public void stop() throws IOException {
        LifecycleLock lifecycleLock = this.lifecycleLock;
        synchronized (lifecycleLock) {
            if (!this.lifecycleLock.canStop()) {
                throw new ISE("can't stop.", new Object[0]);
            }
            this.log.info("Stopping HttpServerInventoryView.", new Object[0]);
            this.serverDiscovery.stop();
            if (this.executor != null) {
                this.executor.shutdownNow();
                this.executor = null;
            }
            this.queue.clear();
            this.log.info("Stopped HttpServerInventoryView.", new Object[0]);
        }
    }

    @Override
    public void registerSegmentCallback(Executor exec, ServerView.SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter) {
        this.segmentCallbacks.put(callback, exec);
        this.segmentPredicates.put(callback, filter);
        this.finalPredicate = Predicates.or(this.defaultFilter, Predicates.or(this.segmentPredicates.values()));
    }

    @Override
    public void registerServerCallback(Executor exec, ServerView.ServerCallback callback) {
        this.serverCallbacks.put(callback, exec);
    }

    @Override
    public void registerSegmentCallback(Executor exec, ServerView.SegmentCallback callback) {
        this.segmentCallbacks.put(callback, exec);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public DruidServer getInventoryValue(String containerKey) {
        Map<String, DruidServerHolder> map = this.servers;
        synchronized (map) {
            DruidServerHolder holder = this.servers.get(containerKey);
            if (holder != null) {
                return holder.druidServer;
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Iterable<DruidServer> getInventory() {
        Map<String, DruidServerHolder> map = this.servers;
        synchronized (map) {
            return Iterables.transform(this.servers.values(), new Function<DruidServerHolder, DruidServer>(){

                @Override
                public DruidServer apply(DruidServerHolder input) {
                    return input.druidServer;
                }
            });
        }
    }

    private void runSegmentCallbacks(final Function<ServerView.SegmentCallback, ServerView.CallbackAction> fn) {
        for (final Map.Entry entry : this.segmentCallbacks.entrySet()) {
            ((Executor)entry.getValue()).execute(new Runnable(){

                @Override
                public void run() {
                    if (ServerView.CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
                        HttpServerInventoryView.this.segmentCallbacks.remove(entry.getKey());
                        if (HttpServerInventoryView.this.segmentPredicates.remove(entry.getKey()) != null) {
                            HttpServerInventoryView.this.finalPredicate = Predicates.or(HttpServerInventoryView.this.defaultFilter, Predicates.or(HttpServerInventoryView.this.segmentPredicates.values()));
                        }
                    }
                }
            });
        }
    }

    private void runServerCallbacks(final DruidServer server) {
        for (final Map.Entry entry : this.serverCallbacks.entrySet()) {
            ((Executor)entry.getValue()).execute(new Runnable(){

                @Override
                public void run() {
                    if (ServerView.CallbackAction.UNREGISTER == ((ServerView.ServerCallback)entry.getKey()).serverRemoved(server)) {
                        HttpServerInventoryView.this.serverCallbacks.remove(entry.getKey());
                    }
                }
            });
        }
    }

    private void serverInventoryInitialized() {
        for (DruidServerHolder server : this.servers.values()) {
            server.awaitInitialization();
        }
        this.log.info("Calling SegmentCallback.segmentViewInitialized() for all callbacks.", new Object[0]);
        this.runSegmentCallbacks(new Function<ServerView.SegmentCallback, ServerView.CallbackAction>(){

            @Override
            public ServerView.CallbackAction apply(ServerView.SegmentCallback input) {
                return input.segmentViewInitialized();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DruidServer serverAddedOrUpdated(DruidServer server) {
        DruidServerHolder newHolder;
        Map<String, DruidServerHolder> map = this.servers;
        synchronized (map) {
            DruidServerHolder curr = this.servers.get(server.getName());
            newHolder = curr == null ? new DruidServerHolder(server) : curr.updatedHolder(server);
            this.servers.put(server.getName(), newHolder);
        }
        newHolder.updateSegmentsListAsync();
        return newHolder.druidServer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void serverRemoved(DruidServer server) {
        Map<String, DruidServerHolder> map = this.servers;
        synchronized (map) {
            this.servers.remove(server.getName());
        }
    }

    public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer) {
        return this.serverAddedOrUpdated(newServer);
    }

    @Override
    public boolean isStarted() {
        return this.lifecycleLock.awaitStarted(1L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) {
        Map<String, DruidServerHolder> map = this.servers;
        synchronized (map) {
            DruidServerHolder holder = this.servers.get(serverKey);
            if (holder != null) {
                return holder.druidServer.getSegment(segment.getIdentifier()) != null;
            }
            return false;
        }
    }

    private static class BytesAccumulatingResponseHandler
    extends InputStreamResponseHandler {
        private int status;
        private String description;

        private BytesAccumulatingResponseHandler() {
        }

        @Override
        public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response) {
            this.status = response.getStatus().getCode();
            this.description = response.getStatus().getReasonPhrase();
            return ClientResponse.unfinished(super.handleResponse(response).getObj());
        }
    }

    private class DruidServerHolder {
        private final Object lock = new Object();
        private final DruidServer druidServer;
        private volatile SegmentChangeRequestHistory.Counter counter = null;
        private final HostAndPort serverHostAndPort;
        private final DataSegmentChangeHandler changeHandler;
        private final long serverHttpTimeout = HttpServerInventoryView.access$1400(HttpServerInventoryView.this).getServerTimeout() + 1000L;
        private final CountDownLatch initializationLatch = new CountDownLatch(1);

        DruidServerHolder(DruidServer druidServer) {
            this(druidServer, null);
        }

        private DruidServerHolder(final DruidServer druidServer, SegmentChangeRequestHistory.Counter counter) {
            this.druidServer = druidServer;
            this.serverHostAndPort = HostAndPort.fromString(druidServer.getHost());
            this.counter = counter;
            this.changeHandler = new DataSegmentChangeHandler(){

                @Override
                public void addSegment(final DataSegment segment, DataSegmentChangeCallback callback) {
                    if (HttpServerInventoryView.this.finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) {
                        druidServer.addDataSegment(segment.getIdentifier(), segment);
                        HttpServerInventoryView.this.runSegmentCallbacks(new Function<ServerView.SegmentCallback, ServerView.CallbackAction>(){

                            @Override
                            public ServerView.CallbackAction apply(ServerView.SegmentCallback input) {
                                return input.segmentAdded(druidServer.getMetadata(), segment);
                            }
                        });
                    }
                }

                @Override
                public void removeSegment(final DataSegment segment, DataSegmentChangeCallback callback) {
                    druidServer.removeDataSegment(segment.getIdentifier());
                    HttpServerInventoryView.this.runSegmentCallbacks(new Function<ServerView.SegmentCallback, ServerView.CallbackAction>(){

                        @Override
                        public ServerView.CallbackAction apply(ServerView.SegmentCallback input) {
                            return input.segmentRemoved(druidServer.getMetadata(), segment);
                        }
                    });
                }
            };
        }

        void awaitInitialization() {
            try {
                if (!this.initializationLatch.await(this.serverHttpTimeout, TimeUnit.MILLISECONDS)) {
                    HttpServerInventoryView.this.log.warn("Await initialization timed out for server [%s].", this.druidServer.getName());
                }
            }
            catch (InterruptedException ex) {
                HttpServerInventoryView.this.log.warn("Await initialization interrupted while waiting on server [%s].", this.druidServer.getName());
                Thread.currentThread().interrupt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        DruidServerHolder updatedHolder(DruidServer server) {
            Object object = this.lock;
            synchronized (object) {
                return new DruidServerHolder(server.addDataSegments(this.druidServer), this.counter);
            }
        }

        Future<?> updateSegmentsListAsync() {
            try {
                String req = this.counter != null ? StringUtils.format("/druid-internal/v1/segments?counter=%s&hash=%s&timeout=%s", this.counter.getCounter(), this.counter.getHash(), HttpServerInventoryView.this.config.getServerTimeout()) : StringUtils.format("/druid-internal/v1/segments?counter=-1&timeout=%s", HttpServerInventoryView.this.config.getServerTimeout());
                URL url = new URL("http", this.serverHostAndPort.getHostText(), this.serverHostAndPort.getPort(), req);
                final BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
                HttpServerInventoryView.this.log.debug("Sending segment list fetch request to [%s] on URL [%s]", this.druidServer.getName(), url);
                ListenableFuture<InputStream> future = HttpServerInventoryView.this.httpClient.go(new Request(HttpMethod.GET, url).addHeader("Accept", "application/x-jackson-smile").addHeader("Content-Type", "application/x-jackson-smile"), responseHandler, new Duration(this.serverHttpTimeout));
                HttpServerInventoryView.this.log.debug("Sent segment list fetch request to [%s]", this.druidServer.getName());
                Futures.addCallback(future, new FutureCallback<InputStream>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void onSuccess(InputStream stream) {
                        try {
                            if (responseHandler.status == 204) {
                                HttpServerInventoryView.this.log.debug("Received NO CONTENT from [%s]", DruidServerHolder.this.druidServer.getName());
                                return;
                            }
                            if (responseHandler.status != 200) {
                                this.onFailure(null);
                                return;
                            }
                            HttpServerInventoryView.this.log.debug("Received segment list response from [%s]", DruidServerHolder.this.druidServer.getName());
                            SegmentChangeRequestsSnapshot delta = HttpServerInventoryView.this.smileMapper.readValue(stream, SegmentChangeRequestsSnapshot.class);
                            HttpServerInventoryView.this.log.debug("Finished reading segment list response from [%s]", DruidServerHolder.this.druidServer.getName());
                            Object object = DruidServerHolder.this.lock;
                            synchronized (object) {
                                block16: {
                                    if (!delta.isResetCounter()) break block16;
                                    HttpServerInventoryView.this.log.debug("Server [%s] requested resetCounter for reason [%s].", DruidServerHolder.this.druidServer.getName(), delta.getResetCause());
                                    DruidServerHolder.this.counter = null;
                                    return;
                                }
                                if (DruidServerHolder.this.counter == null) {
                                    DruidServerHolder.this.druidServer.removeAllSegments();
                                }
                                for (DataSegmentChangeRequest request : delta.getRequests()) {
                                    request.go(DruidServerHolder.this.changeHandler, null);
                                }
                                DruidServerHolder.this.counter = delta.getCounter();
                            }
                            DruidServerHolder.this.initializationLatch.countDown();
                        }
                        catch (Exception ex) {
                            HttpServerInventoryView.this.log.error(ex, "error processing segment list response from server [%s]", DruidServerHolder.this.druidServer.getName());
                        }
                        finally {
                            HttpServerInventoryView.this.queue.add(DruidServerHolder.this.druidServer.getName());
                        }
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        try {
                            if (t != null) {
                                HttpServerInventoryView.this.log.error(t, "failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]", DruidServerHolder.this.druidServer.getName(), responseHandler.status, responseHandler.description);
                            } else {
                                HttpServerInventoryView.this.log.error("failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]", DruidServerHolder.this.druidServer.getName(), responseHandler.status, responseHandler.description);
                            }
                            try {
                                Thread.sleep(5000L);
                            }
                            catch (InterruptedException ex) {
                                Thread.currentThread().interrupt();
                            }
                        }
                        finally {
                            HttpServerInventoryView.this.queue.add(DruidServerHolder.this.druidServer.getName());
                        }
                    }
                }, HttpServerInventoryView.this.executor);
                return future;
            }
            catch (Throwable th) {
                HttpServerInventoryView.this.queue.add(this.druidServer.getName());
                HttpServerInventoryView.this.log.makeAlert(th, "Fatal error while fetching segment list from server [%s].", this.druidServer.getName()).emit();
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
                throw Throwables.propagate(th);
            }
        }
    }
}

