/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.server.coordinator.helper;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.collect.MinMaxPriorityQueue;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.client.ImmutableDruidServer;
import org.apache.hive.druid.io.druid.java.util.common.StringUtils;
import org.apache.hive.druid.io.druid.java.util.common.guava.Comparators;
import org.apache.hive.druid.io.druid.server.coordinator.BalancerSegmentHolder;
import org.apache.hive.druid.io.druid.server.coordinator.BalancerStrategy;
import org.apache.hive.druid.io.druid.server.coordinator.CoordinatorStats;
import org.apache.hive.druid.io.druid.server.coordinator.DruidCoordinator;
import org.apache.hive.druid.io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.hive.druid.io.druid.server.coordinator.LoadPeonCallback;
import org.apache.hive.druid.io.druid.server.coordinator.LoadQueuePeon;
import org.apache.hive.druid.io.druid.server.coordinator.ServerHolder;
import org.apache.hive.druid.io.druid.server.coordinator.helper.DruidCoordinatorHelper;
import org.apache.hive.druid.io.druid.timeline.DataSegment;

public class DruidCoordinatorBalancer
implements DruidCoordinatorHelper {
    public static final Comparator<ServerHolder> percentUsedComparator = Comparators.inverse(new Comparator<ServerHolder>(){

        @Override
        public int compare(ServerHolder lhs, ServerHolder rhs) {
            return lhs.getPercentUsed().compareTo(rhs.getPercentUsed());
        }
    });
    protected static final EmittingLogger log = new EmittingLogger(DruidCoordinatorBalancer.class);
    protected final DruidCoordinator coordinator;
    protected final Map<String, ConcurrentHashMap<String, BalancerSegmentHolder>> currentlyMovingSegments = Maps.newHashMap();

    public DruidCoordinatorBalancer(DruidCoordinator coordinator) {
        this.coordinator = coordinator;
    }

    protected void reduceLifetimes(String tier) {
        for (BalancerSegmentHolder holder : this.currentlyMovingSegments.get(tier).values()) {
            holder.reduceLifetime();
            if (holder.getLifetime() > 0) continue;
            log.makeAlert("[%s]: Balancer move segments queue has a segment stuck", tier).addData("segment", holder.getSegment().getIdentifier()).addData("server", holder.getFromServer().getMetadata()).emit();
        }
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        CoordinatorStats stats = new CoordinatorStats();
        BalancerStrategy strategy = params.getBalancerStrategy();
        int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
        for (Map.Entry<String, MinMaxPriorityQueue<ServerHolder>> entry : params.getDruidCluster().getHistoricals().entrySet()) {
            String tier = entry.getKey();
            if (this.currentlyMovingSegments.get(tier) == null) {
                this.currentlyMovingSegments.put(tier, new ConcurrentHashMap());
            }
            if (!this.currentlyMovingSegments.get(tier).isEmpty()) {
                this.reduceLifetimes(tier);
                log.info("[%s]: Still waiting on %,d segments to be moved", tier, this.currentlyMovingSegments.size());
                continue;
            }
            ArrayList<ServerHolder> serverHolderList = Lists.newArrayList((Iterable)entry.getValue());
            if (serverHolderList.size() <= 1) {
                log.info("[%s]: One or fewer servers found.  Cannot balance.", tier);
                continue;
            }
            int numSegments = 0;
            for (ServerHolder server : serverHolderList) {
                numSegments += server.getServer().getSegments().size();
            }
            if (numSegments == 0) {
                log.info("No segments found.  Cannot balance.", new Object[0]);
                continue;
            }
            long unmoved = 0L;
            for (int iter = 0; iter < maxSegmentsToMove; ++iter) {
                BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
                if (segmentToMove == null || !params.getAvailableSegments().contains(segmentToMove.getSegment())) continue;
                ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
                if (holder != null) {
                    this.moveSegment(segmentToMove, holder.getServer(), params);
                    continue;
                }
                ++unmoved;
            }
            if (unmoved == (long)maxSegmentsToMove) {
                log.info("No good moves found in tier [%s]", tier);
            }
            stats.addToTieredStat("unmovedCount", tier, unmoved);
            stats.addToTieredStat("movedCount", tier, this.currentlyMovingSegments.get(tier).size());
            if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
                strategy.emitStats(tier, stats, serverHolderList);
            }
            log.info("[%s]: Segments Moved: [%d] Segments Let Alone: [%d]", tier, this.currentlyMovingSegments.get(tier).size(), unmoved);
        }
        return params.buildFromExisting().withCoordinatorStats(stats).build();
    }

    protected void moveSegment(BalancerSegmentHolder segment, final ImmutableDruidServer toServer, DruidCoordinatorRuntimeParams params) {
        LoadQueuePeon toPeon = params.getLoadManagementPeons().get(toServer.getName());
        ImmutableDruidServer fromServer = segment.getFromServer();
        DataSegment segmentToMove = segment.getSegment();
        final String segmentName = segmentToMove.getIdentifier();
        if (!toPeon.getSegmentsToLoad().contains(segmentToMove) && toServer.getSegment(segmentName) == null && new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) {
            log.info("Moving [%s] from [%s] to [%s]", segmentName, fromServer.getName(), toServer.getName());
            LoadPeonCallback callback = null;
            try {
                this.currentlyMovingSegments.get(toServer.getTier()).put(segmentName, segment);
                callback = new LoadPeonCallback(){

                    @Override
                    public void execute() {
                        Map movingSegments = DruidCoordinatorBalancer.this.currentlyMovingSegments.get(toServer.getTier());
                        if (movingSegments != null) {
                            movingSegments.remove(segmentName);
                        }
                    }
                };
                this.coordinator.moveSegment(fromServer, toServer, segmentToMove, callback);
            }
            catch (Exception e) {
                log.makeAlert(e, StringUtils.format("[%s] : Moving exception", segmentName), new Object[0]).emit();
                if (callback != null) {
                    callback.execute();
                }
            }
        } else {
            this.currentlyMovingSegments.get(toServer.getTier()).remove(segmentName);
        }
    }
}

