/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.server.coordinator.rules;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.collect.MinMaxPriorityQueue;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.java.util.common.IAE;
import org.apache.hive.druid.io.druid.server.coordinator.BalancerStrategy;
import org.apache.hive.druid.io.druid.server.coordinator.CoordinatorStats;
import org.apache.hive.druid.io.druid.server.coordinator.DruidCoordinator;
import org.apache.hive.druid.io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.hive.druid.io.druid.server.coordinator.LoadPeonCallback;
import org.apache.hive.druid.io.druid.server.coordinator.ReplicationThrottler;
import org.apache.hive.druid.io.druid.server.coordinator.ServerHolder;
import org.apache.hive.druid.io.druid.server.coordinator.rules.Rule;
import org.apache.hive.druid.io.druid.timeline.DataSegment;

public abstract class LoadRule
implements Rule {
    private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
    static final String ASSIGNED_COUNT = "assignedCount";
    static final String DROPPED_COUNT = "droppedCount";

    @Override
    public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment) {
        CoordinatorStats stats = new CoordinatorStats();
        Set<DataSegment> availableSegments = params.getAvailableSegments();
        HashMap<String, Integer> loadStatus = Maps.newHashMap();
        int totalReplicantsInCluster = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier());
        for (Map.Entry<String, Integer> entry : this.getTieredReplicants().entrySet()) {
            String tier = entry.getKey();
            int expectedReplicantsInTier = entry.getValue();
            int totalReplicantsInTier = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier);
            int loadedReplicantsInTier = params.getSegmentReplicantLookup().getLoadedReplicants(segment.getIdentifier(), tier);
            MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getHistoricalsByTier(tier);
            if (serverQueue == null) {
                log.makeAlert("Tier[%s] has no servers! Check your cluster configuration!", tier).emit();
                continue;
            }
            ArrayList<ServerHolder> serverHolderList = Lists.newArrayList(serverQueue);
            BalancerStrategy strategy = params.getBalancerStrategy();
            if (availableSegments.contains(segment)) {
                CoordinatorStats assignStats = this.assign(params.getReplicationManager(), tier, totalReplicantsInCluster, expectedReplicantsInTier, totalReplicantsInTier, strategy, serverHolderList, segment);
                stats.accumulate(assignStats);
                totalReplicantsInCluster = (int)((long)totalReplicantsInCluster + assignStats.getTieredStat(ASSIGNED_COUNT, tier));
            }
            loadStatus.put(tier, expectedReplicantsInTier - loadedReplicantsInTier);
        }
        stats.accumulate(this.drop(loadStatus, segment, params));
        return stats;
    }

    private CoordinatorStats assign(final ReplicationThrottler replicationManager, final String tier, int totalReplicantsInCluster, int expectedReplicantsInTier, int totalReplicantsInTier, BalancerStrategy strategy, List<ServerHolder> serverHolderList, final DataSegment segment) {
        CoordinatorStats stats = new CoordinatorStats();
        stats.addToTieredStat(ASSIGNED_COUNT, tier, 0L);
        int currReplicantsInTier = totalReplicantsInTier;
        int currTotalReplicantsInCluster = totalReplicantsInCluster;
        while (currReplicantsInTier < expectedReplicantsInTier) {
            boolean replicate;
            boolean bl = replicate = currTotalReplicantsInCluster > 0;
            if (replicate && !replicationManager.canCreateReplicant(tier)) break;
            final ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList);
            if (holder == null) {
                log.warn("Not enough [%s] servers or node capacity to assign segment[%s]! Expected Replicants[%d]", tier, segment.getIdentifier(), expectedReplicantsInTier);
                break;
            }
            if (replicate) {
                replicationManager.registerReplicantCreation(tier, segment.getIdentifier(), holder.getServer().getHost());
            }
            holder.getPeon().loadSegment(segment, new LoadPeonCallback(){

                @Override
                public void execute() {
                    replicationManager.unregisterReplicantCreation(tier, segment.getIdentifier(), holder.getServer().getHost());
                }
            });
            stats.addToTieredStat(ASSIGNED_COUNT, tier, 1L);
            ++currReplicantsInTier;
            ++currTotalReplicantsInCluster;
        }
        return stats;
    }

    private CoordinatorStats drop(Map<String, Integer> loadStatus, DataSegment segment, DruidCoordinatorRuntimeParams params) {
        CoordinatorStats stats = new CoordinatorStats();
        for (Integer leftToLoad : loadStatus.values()) {
            if (leftToLoad <= 0) continue;
            return stats;
        }
        Map<String, Integer> replicantsByTier = params.getSegmentReplicantLookup().getClusterTiers(segment.getIdentifier());
        for (Map.Entry<String, Integer> entry : replicantsByTier.entrySet()) {
            String tier = entry.getKey();
            int loadedNumReplicantsForTier = entry.getValue();
            int expectedNumReplicantsForTier = this.getNumReplicants(tier);
            stats.addToTieredStat(DROPPED_COUNT, tier, 0L);
            MinMaxPriorityQueue<ServerHolder> serverQueue = params.getDruidCluster().getHistoricalsByTier(tier);
            if (serverQueue == null) {
                log.makeAlert("No holders found for tier[%s]", entry.getKey()).emit();
                continue;
            }
            ArrayList<ServerHolder> droppedServers = Lists.newArrayList();
            while (loadedNumReplicantsForTier > expectedNumReplicantsForTier) {
                ServerHolder holder = serverQueue.pollLast();
                if (holder == null) {
                    log.warn("Wtf, holder was null?  I have no servers serving [%s]?", segment.getIdentifier());
                    break;
                }
                if (holder.isServingSegment(segment)) {
                    holder.getPeon().dropSegment(segment, null);
                    --loadedNumReplicantsForTier;
                    stats.addToTieredStat(DROPPED_COUNT, tier, 1L);
                }
                droppedServers.add(holder);
            }
            serverQueue.addAll(droppedServers);
        }
        return stats;
    }

    protected void validateTieredReplicants(Map<String, Integer> tieredReplicants) {
        if (tieredReplicants.size() == 0) {
            throw new IAE("A rule with empty tiered replicants is invalid", new Object[0]);
        }
        for (Map.Entry<String, Integer> entry : tieredReplicants.entrySet()) {
            if (entry.getValue() == null) {
                throw new IAE("Replicant value cannot be empty", new Object[0]);
            }
            if (entry.getValue() >= 0) continue;
            throw new IAE("Replicant value [%d] is less than 0, which is not allowed", entry.getValue());
        }
    }

    public abstract Map<String, Integer> getTieredReplicants();

    public abstract int getNumReplicants(String var1);
}

