/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.net.SocketTimeoutException;
import kafka.admin.AdminUtils$;
import kafka.api.KAFKA_0_10_0_IV0$;
import kafka.api.KAFKA_0_9_0$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Replica;
import kafka.common.KafkaStorageException;
import kafka.common.TopicAndPartition;
import kafka.log.LogConfig$;
import kafka.message.ByteBufferMessageSet;
import kafka.server.AbstractFetcherThread;
import kafka.server.ConfigType$;
import kafka.server.KafkaConfig;
import kafka.server.LogOffsetMetadata;
import kafka.server.LogOffsetMetadata$;
import kafka.server.PartitionFetchState;
import kafka.server.ReplicaManager;
import kafka.utils.NetworkClientBlockingOps$;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.LoginType;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenMap;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.MapLike;
import scala.collection.TraversableLike;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong;

@ScalaSignature(bytes="\u0006\u0001\t\rf\u0001B\u0001\u0003\u0001\u001d\u0011ACU3qY&\u001c\u0017MR3uG\",'\u000f\u00165sK\u0006$'BA\u0002\u0005\u0003\u0019\u0019XM\u001d<fe*\tQ!A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001A\u0001CA\u0005\u000b\u001b\u0005\u0011\u0011BA\u0006\u0003\u0005U\t%m\u001d;sC\u000e$h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012D\u0011\"\u0004\u0001\u0003\u0002\u0003\u0006IA\u0004\r\u0002\t9\fW.\u001a\t\u0003\u001fUq!\u0001E\n\u000e\u0003EQ\u0011AE\u0001\u0006g\u000e\fG.Y\u0005\u0003)E\ta\u0001\u0015:fI\u00164\u0017B\u0001\f\u0018\u0005\u0019\u0019FO]5oO*\u0011A#E\u0005\u0003\u001beI!AG\u000e\u0003%MCW\u000f\u001e3po:\f'\r\\3UQJ,\u0017\r\u001a\u0006\u00039\u0011\tQ!\u001e;jYND\u0001B\b\u0001\u0003\u0002\u0003\u0006IaH\u0001\nM\u0016$8\r[3s\u0013\u0012\u0004\"\u0001\u0005\u0011\n\u0005\u0005\n\"aA%oi\"A1\u0005\u0001B\u0001B\u0003%A%\u0001\u0007t_V\u00148-\u001a\"s_.,'\u000f\u0005\u0002&Q5\taE\u0003\u0002(\t\u000591\r\\;ti\u0016\u0014\u0018BA\u0015'\u00059\u0011%o\\6fe\u0016sG\rU8j]RD\u0001b\u000b\u0001\u0003\u0002\u0003\u0006I\u0001L\u0001\rEJ|7.\u001a:D_:4\u0017n\u001a\t\u0003\u00135J!A\f\u0002\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\ta\u0001\u0011\t\u0011)A\u0005c\u0005Q!/\u001a9mS\u000e\fWj\u001a:\u0011\u0005%\u0011\u0014BA\u001a\u0003\u00059\u0011V\r\u001d7jG\u0006l\u0015M\\1hKJD\u0001\"\u000e\u0001\u0003\u0002\u0003\u0006IAN\u0001\b[\u0016$(/[2t!\t9\u0004)D\u00019\u0015\t)\u0014H\u0003\u0002;w\u000511m\\7n_:T!!\u0002\u001f\u000b\u0005ur\u0014AB1qC\u000eDWMC\u0001@\u0003\ry'oZ\u0005\u0003\u0003b\u0012q!T3ue&\u001c7\u000f\u0003\u0005D\u0001\t\u0005\t\u0015!\u0003E\u0003\u0011!\u0018.\\3\u0011\u0005\u0015;U\"\u0001$\u000b\u0005qI\u0014B\u0001%G\u0005\u0011!\u0016.\\3\t\u000b)\u0003A\u0011A&\u0002\rqJg.\u001b;?)!aUJT(Q#J\u001b\u0006CA\u0005\u0001\u0011\u0015i\u0011\n1\u0001\u000f\u0011\u0015q\u0012\n1\u0001 \u0011\u0015\u0019\u0013\n1\u0001%\u0011\u0015Y\u0013\n1\u0001-\u0011\u0015\u0001\u0014\n1\u00012\u0011\u0015)\u0014\n1\u00017\u0011\u0015\u0019\u0015\n1\u0001E\u000b\u0011)\u0006\u0001\u0001,\u0003\u0007I+\u0015\u000b\u0005\u0002XE:\u0011\u0011\u0002W\u0004\u00063\nA\tAW\u0001\u0015%\u0016\u0004H.[2b\r\u0016$8\r[3s)\"\u0014X-\u00193\u0011\u0005%Yf!B\u0001\u0003\u0011\u0003a6CA.^!\t\u0001b,\u0003\u0002`#\t1\u0011I\\=SK\u001aDQAS.\u0005\u0002\u0005$\u0012A\u0017\u0004\u0006Gn\u0003!\u0001\u001a\u0002\r\r\u0016$8\r\u001b*fcV,7\u000f^\n\u0004Ev+\u0007C\u00014j\u001d\tIq-\u0003\u0002i\u0005\u0005)\u0012IY:ue\u0006\u001cGOR3uG\",'\u000f\u00165sK\u0006$\u0017BA2k\u0015\tA'\u0001\u0003\u0005mE\n\u0015\r\u0011\"\u0001n\u0003))h\u000eZ3sYfLgnZ\u000b\u0002]B\u0011qN]\u0007\u0002a*\u0011\u0011/O\u0001\te\u0016\fX/Z:ug&\u00111\r\u001d\u0005\ti\n\u0014\t\u0011)A\u0005]\u0006YQO\u001c3fe2L\u0018N\\4!\u0011\u0015Q%\r\"\u0001w)\t9\u0018\u0010\u0005\u0002yE6\t1\fC\u0003mk\u0002\u0007a\u000eC\u0003|E\u0012\u0005A0A\u0004jg\u0016k\u0007\u000f^=\u0016\u0003u\u0004\"\u0001\u0005@\n\u0005}\f\"a\u0002\"p_2,\u0017M\u001c\u0005\b\u0003\u0007\u0011G\u0011AA\u0003\u0003\u0019ygMZ:fiR!\u0011qAA\u0007!\r\u0001\u0012\u0011B\u0005\u0004\u0003\u0017\t\"\u0001\u0002'p]\u001eD\u0001\"a\u0004\u0002\u0002\u0001\u0007\u0011\u0011C\u0001\u0012i>\u0004\u0018nY!oIB\u000b'\u000f^5uS>t\u0007\u0003BA\n\u0003/i!!!\u0006\u000b\u0005i\"\u0011\u0002BA\r\u0003+\u0011\u0011\u0003V8qS\u000e\fe\u000e\u001a)beRLG/[8o\r\u001d\tib\u0017\u0001\u0003\u0003?\u0011Q\u0002U1si&$\u0018n\u001c8ECR\f7#BA\u000e;\u0006\u0005\u0002c\u00014\u0002$%\u0019\u0011Q\u00046\t\u00151\fYB!b\u0001\n\u0003\t9#\u0006\u0002\u0002*A!\u00111FA\u0019\u001d\ry\u0017QF\u0005\u0004\u0003_\u0001\u0018!\u0004$fi\u000eD'+Z:q_:\u001cX-\u0003\u0003\u0002\u001e\u0005M\"bAA\u0018a\"QA/a\u0007\u0003\u0002\u0003\u0006I!!\u000b\t\u000f)\u000bY\u0002\"\u0001\u0002:Q!\u00111HA\u001f!\rA\u00181\u0004\u0005\bY\u0006]\u0002\u0019AA\u0015\u0011!\t\t%a\u0007\u0005\u0002\u0005\r\u0013!C3se>\u00148i\u001c3f+\t\t)\u0005E\u0002\u0011\u0003\u000fJ1!!\u0013\u0012\u0005\u0015\u0019\u0006n\u001c:u\u0011!\ti%a\u0007\u0005\u0002\u0005=\u0013A\u0006;p\u0005f$XMQ;gM\u0016\u0014X*Z:tC\u001e,7+\u001a;\u0016\u0005\u0005E\u0003\u0003BA*\u00033j!!!\u0016\u000b\u0007\u0005]C!A\u0004nKN\u001c\u0018mZ3\n\t\u0005m\u0013Q\u000b\u0002\u0015\u0005f$XMQ;gM\u0016\u0014X*Z:tC\u001e,7+\u001a;\t\u0011\u0005}\u00131\u0004C\u0001\u0003C\nQ\u0002[5hQ^\u000bG/\u001a:nCJ\\WCAA\u0004\u0011!\t)'a\u0007\u0005\u0002\u0005\u001d\u0014!C3yG\u0016\u0004H/[8o+\t\tI\u0007E\u0003\u0011\u0003W\ny'C\u0002\u0002nE\u0011aa\u00149uS>t\u0007\u0003BA9\u0003\u0003sA!a\u001d\u0002~9!\u0011QOA>\u001b\t\t9HC\u0002\u0002z\u0019\ta\u0001\u0010:p_Rt\u0014\"\u0001\n\n\u0007\u0005}\u0014#A\u0004qC\u000e\\\u0017mZ3\n\t\u0005\r\u0015Q\u0011\u0002\n)\"\u0014xn^1cY\u0016T1!a \u0012\u000b\u0019\tI\t\u0001\u0001\u0002\f\n\u0011\u0001\u000b\u0012\t\u0004/\u0006m\u0001\"CAH\u0001\t\u0007I\u0011BA\"\u0003M1W\r^2i%\u0016\fX/Z:u-\u0016\u00148/[8o\u0011!\t\u0019\n\u0001Q\u0001\n\u0005\u0015\u0013\u0001\u00064fi\u000eD'+Z9vKN$h+\u001a:tS>t\u0007\u0005C\u0005\u0002\u0018\u0002\u0011\r\u0011\"\u0003\u0002\u001a\u0006i1o\\2lKR$\u0016.\\3pkR,\u0012a\b\u0005\b\u0003;\u0003\u0001\u0015!\u0003 \u00039\u0019xnY6fiRKW.Z8vi\u0002B\u0011\"!)\u0001\u0005\u0004%I!!'\u0002\u0013I,\u0007\u000f\\5dC&#\u0007bBAS\u0001\u0001\u0006IaH\u0001\u000be\u0016\u0004H.[2b\u0013\u0012\u0004\u0003\"CAU\u0001\t\u0007I\u0011BAV\u0003\u001di\u0017\r_,bSR,\"!!,\u0011\t\u0005=\u0016\u0011X\u0007\u0003\u0003cSA!a-\u00026\u0006!A.\u00198h\u0015\t\t9,\u0001\u0003kCZ\f\u0017\u0002BA^\u0003c\u0013q!\u00138uK\u001e,'\u000f\u0003\u0005\u0002@\u0002\u0001\u000b\u0011BAW\u0003!i\u0017\r_,bSR\u0004\u0003\"CAb\u0001\t\u0007I\u0011BAV\u0003!i\u0017N\u001c\"zi\u0016\u001c\b\u0002CAd\u0001\u0001\u0006I!!,\u0002\u00135LgNQ=uKN\u0004\u0003\"CAf\u0001\t\u0007I\u0011BAV\u0003%1W\r^2i'&TX\r\u0003\u0005\u0002P\u0002\u0001\u000b\u0011BAW\u0003)1W\r^2i'&TX\r\t\u0005\b\u0003'\u0004A\u0011BAk\u0003!\u0019G.[3oi&#W#\u0001\b\t\u0013\u0005e\u0007A1A\u0005\n\u0005m\u0017AC:pkJ\u001cWMT8eKV\u0011\u0011Q\u001c\t\u0005\u0003?\f\t/D\u0001:\u0013\r\t\u0019/\u000f\u0002\u0005\u001d>$W\r\u0003\u0005\u0002h\u0002\u0001\u000b\u0011BAo\u0003-\u0019x.\u001e:dK:{G-\u001a\u0011\t\u0013\u0005-\bA1A\u0005\n\u00055\u0018!\u00048fi^|'o[\"mS\u0016tG/\u0006\u0002\u0002pB!\u0011\u0011_A|\u001b\t\t\u0019PC\u0002\u0002vn\nqa\u00197jK:$8/\u0003\u0003\u0002z\u0006M(!\u0004(fi^|'o[\"mS\u0016tG\u000f\u0003\u0005\u0002~\u0002\u0001\u000b\u0011BAx\u00039qW\r^<pe.\u001cE.[3oi\u0002BqA!\u0001\u0001\t\u0003\u0012\u0019!\u0001\u0005tQV$Hm\\<o)\t\u0011)\u0001E\u0002\u0011\u0005\u000fI1A!\u0003\u0012\u0005\u0011)f.\u001b;\t\u000f\t5\u0001\u0001\"\u0001\u0003\u0010\u0005!\u0002O]8dKN\u001c\b+\u0019:uSRLwN\u001c#bi\u0006$\u0002B!\u0002\u0003\u0012\tM!q\u0003\u0005\t\u0003\u001f\u0011Y\u00011\u0001\u0002\u0012!A!Q\u0003B\u0006\u0001\u0004\t9!A\u0006gKR\u001c\u0007n\u00144gg\u0016$\b\u0002\u0003B\r\u0005\u0017\u0001\r!a#\u0002\u001bA\f'\u000f^5uS>tG)\u0019;b\u0011\u001d\u0011i\u0002\u0001C\u0001\u0005?\tac^1s]&3W*Z:tC\u001e,wJ^3sg&TX\r\u001a\u000b\u0007\u0005\u000b\u0011\tC!\n\t\u0011\t\r\"1\u0004a\u0001\u0003#\n!\"\\3tg\u0006<WmU3u\u0011!\tyAa\u0007A\u0002\u0005E\u0001b\u0002B\u0015\u0001\u0011\u0005!1F\u0001\u0017Q\u0006tG\r\\3PM\u001a\u001cX\r^(vi>3'+\u00198hKR!\u0011q\u0001B\u0017\u0011!\tyAa\nA\u0002\u0005E\u0001b\u0002B\u0019\u0001\u0011\u0005!1G\u0001\u001bQ\u0006tG\r\\3QCJ$\u0018\u000e^5p]N<\u0016\u000e\u001e5FeJ|'o\u001d\u000b\u0005\u0005\u000b\u0011)\u0004\u0003\u0005\u00038\t=\u0002\u0019\u0001B\u001d\u0003)\u0001\u0018M\u001d;ji&|gn\u001d\t\u0007\u0003c\u0012Y$!\u0005\n\t\tu\u0012Q\u0011\u0002\t\u0013R,'/\u00192mK\"9!\u0011\t\u0001\u0005\u0012\t\r\u0013!\u00024fi\u000eDG\u0003\u0002B#\u0005#\u0002\u0002Ba\u0012\u0003N\u0005E\u00111R\u0007\u0003\u0005\u0013R1Aa\u0013\u0012\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0005\u001f\u0012IEA\u0002NCBDqAa\u0015\u0003@\u0001\u0007a+\u0001\u0007gKR\u001c\u0007NU3rk\u0016\u001cH\u000fC\u0004\u0003X\u0001!IA!\u0017\u0002\u0017M,g\u000e\u001a*fcV,7\u000f\u001e\u000b\t\u00057\u0012\tG!\u001d\u0003xA!\u0011\u0011\u001fB/\u0013\u0011\u0011y&a=\u0003\u001d\rc\u0017.\u001a8u%\u0016\u001c\bo\u001c8tK\"A!1\rB+\u0001\u0004\u0011)'\u0001\u0004ba&\\U-\u001f\t\u0005\u0005O\u0012i'\u0004\u0002\u0003j)\u0019!1N\u001d\u0002\u0011A\u0014x\u000e^8d_2LAAa\u001c\u0003j\t9\u0011\t]5LKf\u001c\b\u0002\u0003B:\u0005+\u0002\rA!\u001e\u0002\u0015\u0005\u0004\u0018NV3sg&|g\u000eE\u0003\u0011\u0003W\n)\u0005\u0003\u0005\u0003z\tU\u0003\u0019\u0001B>\u0003\u001d\u0011X-];fgR\u00042a\u001cB?\u0013\r\u0011y\b\u001d\u0002\u0010\u0003\n\u001cHO]1diJ+\u0017/^3ti\"9!1\u0011\u0001\u0005\n\t\u0015\u0015AF3be2LWm\u001d;Pe2\u000bG/Z:u\u001f\u001a47/\u001a;\u0015\u0011\u0005\u001d!q\u0011BE\u0005\u001bC\u0001\"a\u0004\u0003\u0002\u0002\u0007\u0011\u0011\u0003\u0005\t\u0005\u0017\u0013\t\t1\u0001\u0002\b\u0005\u0001R-\u0019:mS\u0016\u001cHo\u0014:MCR,7\u000f\u001e\u0005\b\u0005\u001f\u0013\t\t1\u0001 \u0003)\u0019wN\\:v[\u0016\u0014\u0018\n\u001a\u0005\b\u0005'\u0003A\u0011\u0003BK\u0003E\u0011W/\u001b7e\r\u0016$8\r\u001b*fcV,7\u000f\u001e\u000b\u0004-\n]\u0005\u0002\u0003BM\u0005#\u0003\rAa'\u0002\u0019A\f'\u000f^5uS>tW*\u00199\u0011\u0011\t\u001d#QJA\t\u0005;\u00032!\u0003BP\u0013\r\u0011\tK\u0001\u0002\u0014!\u0006\u0014H/\u001b;j_:4U\r^2i'R\fG/\u001a")
public class ReplicaFetcherThread
extends AbstractFetcherThread {
    public final BrokerEndPoint kafka$server$ReplicaFetcherThread$$sourceBroker;
    public final KafkaConfig kafka$server$ReplicaFetcherThread$$brokerConfig;
    private final ReplicaManager replicaMgr;
    private final Time time;
    private final short fetchRequestVersion;
    private final int socketTimeout;
    private final int replicaId;
    private final Integer maxWait;
    private final Integer minBytes;
    private final Integer kafka$server$ReplicaFetcherThread$$fetchSize;
    private final Node sourceNode;
    private final NetworkClient kafka$server$ReplicaFetcherThread$$networkClient;

    private short fetchRequestVersion() {
        return this.fetchRequestVersion;
    }

    private int socketTimeout() {
        return this.socketTimeout;
    }

    private int replicaId() {
        return this.replicaId;
    }

    private Integer maxWait() {
        return this.maxWait;
    }

    private Integer minBytes() {
        return this.minBytes;
    }

    public Integer kafka$server$ReplicaFetcherThread$$fetchSize() {
        return this.kafka$server$ReplicaFetcherThread$$fetchSize;
    }

    private String clientId() {
        return super.name();
    }

    private Node sourceNode() {
        return this.sourceNode;
    }

    public NetworkClient kafka$server$ReplicaFetcherThread$$networkClient() {
        return this.kafka$server$ReplicaFetcherThread$$networkClient;
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.kafka$server$ReplicaFetcherThread$$networkClient().close();
    }

    public void processPartitionData(TopicAndPartition topicAndPartition, long fetchOffset, PartitionData partitionData) {
        block7: {
            try {
                TopicAndPartition topicAndPartition2 = topicAndPartition;
                if (topicAndPartition2 != null) {
                    Tuple2<String, Integer> tuple2;
                    String topic = topicAndPartition2.topic();
                    int partitionId = topicAndPartition2.partition();
                    Tuple2<String, Integer> tuple22 = tuple2 = new Tuple2<String, Integer>(topic, BoxesRunTime.boxToInteger(partitionId));
                    String topic2 = tuple22._1();
                    int partitionId2 = tuple22._2$mcI$sp();
                    Replica replica = this.replicaMgr.getReplica(topic2, partitionId2, this.replicaMgr.getReplica$default$3()).get();
                    ByteBufferMessageSet messageSet = partitionData.toByteBufferMessageSet();
                    this.warnIfMessageOversized(messageSet, topicAndPartition);
                    if (fetchOffset != replica.logEndOffset().messageOffset()) {
                        throw new RuntimeException(new StringOps(Predef$.MODULE$.augmentString("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{topicAndPartition, BoxesRunTime.boxToLong(fetchOffset), BoxesRunTime.boxToLong(replica.logEndOffset().messageOffset())})));
                    }
                    if (this.logger().isTraceEnabled()) {
                        this.trace((Function0<String>)((Object)new Serializable(this, topicAndPartition, partitionData, replica, messageSet){
                            public static final long serialVersionUID = 0L;
                            private final TopicAndPartition topicAndPartition$1;
                            private final PartitionData partitionData$1;
                            private final Replica replica$1;
                            private final ByteBufferMessageSet messageSet$1;

                            public final String apply() {
                                return new StringOps(Predef$.MODULE$.augmentString("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.replica$1.brokerId()), BoxesRunTime.boxToLong(this.replica$1.logEndOffset().messageOffset()), this.topicAndPartition$1, BoxesRunTime.boxToInteger(this.messageSet$1.sizeInBytes()), BoxesRunTime.boxToLong(this.partitionData$1.highWatermark())}));
                            }
                            {
                                this.topicAndPartition$1 = topicAndPartition$1;
                                this.partitionData$1 = partitionData$1;
                                this.replica$1 = replica$1;
                                this.messageSet$1 = messageSet$1;
                            }
                        }));
                    }
                    replica.log().get().append(messageSet, false);
                    if (this.logger().isTraceEnabled()) {
                        this.trace((Function0<String>)((Object)new Serializable(this, topicAndPartition, replica, messageSet){
                            public static final long serialVersionUID = 0L;
                            private final TopicAndPartition topicAndPartition$1;
                            private final Replica replica$1;
                            private final ByteBufferMessageSet messageSet$1;

                            public final String apply() {
                                return new StringOps(Predef$.MODULE$.augmentString("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.replica$1.brokerId()), BoxesRunTime.boxToLong(this.replica$1.logEndOffset().messageOffset()), BoxesRunTime.boxToInteger(this.messageSet$1.sizeInBytes()), this.topicAndPartition$1}));
                            }
                            {
                                this.topicAndPartition$1 = topicAndPartition$1;
                                this.replica$1 = replica$1;
                                this.messageSet$1 = messageSet$1;
                            }
                        }));
                    }
                    long followerHighWatermark = BoxesRunTime.unboxToLong(new RichLong(Predef$.MODULE$.longWrapper(replica.logEndOffset().messageOffset())).min(BoxesRunTime.boxToLong(partitionData.highWatermark())));
                    replica.highWatermark_$eq(new LogOffsetMetadata(followerHighWatermark, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3()));
                    if (this.logger().isTraceEnabled()) {
                        this.trace((Function0<String>)((Object)new Serializable(this, topic2, partitionId2, replica, followerHighWatermark){
                            public static final long serialVersionUID = 0L;
                            private final String topic$1;
                            private final int partitionId$1;
                            private final Replica replica$1;
                            private final long followerHighWatermark$1;

                            public final String apply() {
                                return new StringOps(Predef$.MODULE$.augmentString("Follower %d set replica high watermark for partition [%s,%d] to %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.replica$1.brokerId()), this.topic$1, BoxesRunTime.boxToInteger(this.partitionId$1), BoxesRunTime.boxToLong(this.followerHighWatermark$1)}));
                            }
                            {
                                this.topic$1 = topic$1;
                                this.partitionId$1 = partitionId$1;
                                this.replica$1 = replica$1;
                                this.followerHighWatermark$1 = followerHighWatermark$1;
                            }
                        }));
                    }
                    break block7;
                }
                throw new MatchError(topicAndPartition2);
            }
            catch (KafkaStorageException kafkaStorageException) {
                this.fatal((Function0<String>)((Object)new Serializable(this, topicAndPartition){
                    public static final long serialVersionUID = 0L;
                    private final TopicAndPartition topicAndPartition$1;

                    public final String apply() {
                        return new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Disk error while replicating data for ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.topicAndPartition$1}));
                    }
                    {
                        this.topicAndPartition$1 = topicAndPartition$1;
                    }
                }), (Function0<Throwable>)((Object)new Serializable(this, kafkaStorageException){
                    public static final long serialVersionUID = 0L;
                    private final KafkaStorageException e$1;

                    public final KafkaStorageException apply() {
                        return this.e$1;
                    }
                    {
                        this.e$1 = e$1;
                    }
                }));
                Runtime.getRuntime().halt(1);
            }
        }
    }

    public void warnIfMessageOversized(ByteBufferMessageSet messageSet, TopicAndPartition topicAndPartition) {
        if (messageSet.sizeInBytes() > 0 && messageSet.validBytes() <= 0) {
            this.error((Function0<String>)((Object)new Serializable(this, topicAndPartition){
                public static final long serialVersionUID = 0L;
                private final TopicAndPartition topicAndPartition$2;

                public final String apply() {
                    return new StringBuilder().append((Object)new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition ", ". "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.topicAndPartition$2}))).append((Object)"This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large ").append((Object)"message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be ").append((Object)"equal or larger than your settings for max.message.bytes, both at a broker and topic level.").toString();
                }
                {
                    this.topicAndPartition$2 = topicAndPartition$2;
                }
            }));
        }
    }

    @Override
    public long handleOffsetOutOfRange(TopicAndPartition topicAndPartition) {
        long l;
        Replica replica = this.replicaMgr.getReplica(topicAndPartition.topic(), topicAndPartition.partition(), this.replicaMgr.getReplica$default$3()).get();
        long leaderEndOffset = this.earliestOrLatestOffset(topicAndPartition, -1L, this.kafka$server$ReplicaFetcherThread$$brokerConfig.brokerId());
        if (leaderEndOffset < replica.logEndOffset().messageOffset()) {
            if (!Predef$.MODULE$.Boolean2boolean(LogConfig$.MODULE$.fromProps(this.kafka$server$ReplicaFetcherThread$$brokerConfig.originals(), AdminUtils$.MODULE$.fetchEntityConfig(this.replicaMgr.zkUtils(), ConfigType$.MODULE$.Topic(), topicAndPartition.topic())).uncleanLeaderElectionEnable())) {
                this.fatal((Function0<String>)((Object)new Serializable(this, topicAndPartition, replica, leaderEndOffset){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ ReplicaFetcherThread $outer;
                    private final TopicAndPartition topicAndPartition$3;
                    private final Replica replica$2;
                    private final long leaderEndOffset$1;

                    public final String apply() {
                        return new StringBuilder().append((Object)new StringOps(Predef$.MODULE$.augmentString("Exiting because log truncation is not allowed for topic %s,")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.topicAndPartition$3.topic()}))).append((Object)new StringOps(Predef$.MODULE$.augmentString(" Current leader %d's latest offset %d is less than replica %d's latest offset %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.$outer.kafka$server$ReplicaFetcherThread$$sourceBroker.id()), BoxesRunTime.boxToLong(this.leaderEndOffset$1), BoxesRunTime.boxToInteger(this.$outer.kafka$server$ReplicaFetcherThread$$brokerConfig.brokerId()), BoxesRunTime.boxToLong(this.replica$2.logEndOffset().messageOffset())}))).toString();
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                        this.topicAndPartition$3 = topicAndPartition$3;
                        this.replica$2 = replica$2;
                        this.leaderEndOffset$1 = leaderEndOffset$1;
                    }
                }));
                System.exit(1);
            }
            this.warn((Function0<String>)((Object)new Serializable(this, topicAndPartition, replica, leaderEndOffset){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ ReplicaFetcherThread $outer;
                private final TopicAndPartition topicAndPartition$3;
                private final Replica replica$2;
                private final long leaderEndOffset$1;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.$outer.kafka$server$ReplicaFetcherThread$$brokerConfig.brokerId()), this.topicAndPartition$3, BoxesRunTime.boxToLong(this.replica$2.logEndOffset().messageOffset()), BoxesRunTime.boxToInteger(this.$outer.kafka$server$ReplicaFetcherThread$$sourceBroker.id()), BoxesRunTime.boxToLong(this.leaderEndOffset$1)}));
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.topicAndPartition$3 = topicAndPartition$3;
                    this.replica$2 = replica$2;
                    this.leaderEndOffset$1 = leaderEndOffset$1;
                }
            }));
            this.replicaMgr.logManager().truncateTo((Map)scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topicAndPartition), BoxesRunTime.boxToLong(leaderEndOffset))})));
            l = leaderEndOffset;
        } else {
            long leaderStartOffset = this.earliestOrLatestOffset(topicAndPartition, -2L, this.kafka$server$ReplicaFetcherThread$$brokerConfig.brokerId());
            this.warn((Function0<String>)((Object)new Serializable(this, topicAndPartition, replica, leaderStartOffset){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ ReplicaFetcherThread $outer;
                private final TopicAndPartition topicAndPartition$3;
                private final Replica replica$2;
                private final long leaderStartOffset$1;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.$outer.kafka$server$ReplicaFetcherThread$$brokerConfig.brokerId()), this.topicAndPartition$3, BoxesRunTime.boxToLong(this.replica$2.logEndOffset().messageOffset()), BoxesRunTime.boxToInteger(this.$outer.kafka$server$ReplicaFetcherThread$$sourceBroker.id()), BoxesRunTime.boxToLong(this.leaderStartOffset$1)}));
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.topicAndPartition$3 = topicAndPartition$3;
                    this.replica$2 = replica$2;
                    this.leaderStartOffset$1 = leaderStartOffset$1;
                }
            }));
            long offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset().messageOffset());
            if (leaderStartOffset > replica.logEndOffset().messageOffset()) {
                this.replicaMgr.logManager().truncateFullyAndStartAt(topicAndPartition, leaderStartOffset);
            }
            l = offsetToFetch;
        }
        return l;
    }

    @Override
    public void handlePartitionsWithErrors(Iterable<TopicAndPartition> partitions) {
        this.delayPartitions(partitions, Predef$.MODULE$.Integer2int(this.kafka$server$ReplicaFetcherThread$$brokerConfig.replicaFetchBackoffMs()));
    }

    public Map<TopicAndPartition, PartitionData> fetch(FetchRequest fetchRequest) {
        ClientResponse clientResponse = this.sendRequest(ApiKeys.FETCH, new Some<Object>(BoxesRunTime.boxToShort(this.fetchRequestVersion())), fetchRequest.underlying());
        return ((TraversableLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(new FetchResponse(clientResponse.responseBody()).responseData()).asScala()).map(new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Tuple2<TopicAndPartition, PartitionData> apply(Tuple2<TopicPartition, FetchResponse.PartitionData> x0$1) {
                Tuple2<TopicPartition, FetchResponse.PartitionData> tuple2 = x0$1;
                if (tuple2 != null) {
                    TopicPartition key = tuple2._1();
                    FetchResponse.PartitionData value2 = tuple2._2();
                    Tuple2<TopicAndPartition, PartitionData> tuple22 = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(new TopicAndPartition(key.topic(), key.partition())), new PartitionData(value2));
                    return tuple22;
                }
                throw new MatchError(tuple2);
            }
        }, Map$.MODULE$.canBuildFrom());
    }

    private ClientResponse sendRequest(ApiKeys apiKey, Option<Object> apiVersion, AbstractRequest request) {
        RequestHeader header = apiVersion.fold(new Serializable(this, apiKey){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ReplicaFetcherThread $outer;
            private final ApiKeys apiKey$1;

            public final RequestHeader apply() {
                return this.$outer.kafka$server$ReplicaFetcherThread$$networkClient().nextRequestHeader(this.apiKey$1);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.apiKey$1 = apiKey$1;
            }
        }, new Serializable(this, apiKey){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ReplicaFetcherThread $outer;
            private final ApiKeys apiKey$1;

            public final RequestHeader apply(short x$2) {
                return this.$outer.kafka$server$ReplicaFetcherThread$$networkClient().nextRequestHeader(this.apiKey$1, x$2);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.apiKey$1 = apiKey$1;
            }
        });
        try {
            if (NetworkClientBlockingOps$.MODULE$.blockingReady$extension(NetworkClientBlockingOps$.MODULE$.networkClientBlockingOps(this.kafka$server$ReplicaFetcherThread$$networkClient()), this.sourceNode(), this.socketTimeout(), this.time)) {
                RequestSend send2 = new RequestSend(((Object)BoxesRunTime.boxToInteger(this.kafka$server$ReplicaFetcherThread$$sourceBroker.id())).toString(), header, request.toStruct());
                ClientRequest clientRequest = new ClientRequest(this.time.milliseconds(), true, send2, null);
                return NetworkClientBlockingOps$.MODULE$.blockingSendAndReceive$extension(NetworkClientBlockingOps$.MODULE$.networkClientBlockingOps(this.kafka$server$ReplicaFetcherThread$$networkClient()), clientRequest, this.time);
            }
            throw new SocketTimeoutException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to connect within ", " ms"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.socketTimeout())})));
        }
        catch (Throwable throwable) {
            this.kafka$server$ReplicaFetcherThread$$networkClient().close(((Object)BoxesRunTime.boxToInteger(this.kafka$server$ReplicaFetcherThread$$sourceBroker.id())).toString());
            throw throwable;
        }
    }

    private long earliestOrLatestOffset(TopicAndPartition topicAndPartition, long earliestOrLatest, int consumerId) {
        TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
        Map partitions = (Map)scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(topicPartition), new ListOffsetRequest.PartitionData(earliestOrLatest, 1))}));
        ListOffsetRequest request = new ListOffsetRequest(consumerId, JavaConverters$.MODULE$.mapAsJavaMapConverter(partitions).asJava());
        ClientResponse clientResponse = this.sendRequest(ApiKeys.LIST_OFFSETS, None$.MODULE$, request);
        ListOffsetResponse response = new ListOffsetResponse(clientResponse.responseBody());
        ListOffsetResponse.PartitionData partitionData = response.responseData().get(topicPartition);
        Errors errors = Errors.forCode(partitionData.errorCode);
        Errors errors2 = Errors.NONE;
        Errors errors3 = errors;
        if (!(errors2 != null ? !((Object)((Object)errors2)).equals((Object)errors3) : errors3 != null)) {
            long l = Predef$.MODULE$.Long2long((Long)((IterableLike)JavaConverters$.MODULE$.asScalaBufferConverter(partitionData.offsets).asScala()).head());
            return l;
        }
        throw errors.exception();
    }

    @Override
    public FetchRequest buildFetchRequest(Map<TopicAndPartition, PartitionFetchState> partitionMap) {
        GenMap requestMap = Map$.MODULE$.empty();
        partitionMap.foreach(new Serializable(this, (scala.collection.mutable.Map)requestMap){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ReplicaFetcherThread $outer;
            private final scala.collection.mutable.Map requestMap$1;

            public final void apply(Tuple2<TopicAndPartition, PartitionFetchState> x0$2) {
                Tuple2<TopicAndPartition, PartitionFetchState> tuple2 = x0$2;
                if (tuple2 != null) {
                    TopicAndPartition topicAndPartition = tuple2._1();
                    PartitionFetchState partitionFetchState = tuple2._2();
                    if (topicAndPartition != null) {
                        BoxedUnit boxedUnit;
                        String topic = topicAndPartition.topic();
                        int partition2 = topicAndPartition.partition();
                        if (partitionFetchState.isActive()) {
                            this.requestMap$1.update(new TopicPartition(topic, partition2), new FetchRequest.PartitionData(partitionFetchState.offset(), Predef$.MODULE$.Integer2int(this.$outer.kafka$server$ReplicaFetcherThread$$fetchSize())));
                            boxedUnit = BoxedUnit.UNIT;
                        } else {
                            boxedUnit = BoxedUnit.UNIT;
                        }
                        BoxedUnit boxedUnit2 = boxedUnit;
                        return;
                    }
                }
                throw new MatchError(tuple2);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.requestMap$1 = requestMap$1;
            }
        });
        return new FetchRequest(new org.apache.kafka.common.requests.FetchRequest(this.replicaId(), Predef$.MODULE$.Integer2int(this.maxWait()), Predef$.MODULE$.Integer2int(this.minBytes()), JavaConverters$.MODULE$.mutableMapAsJavaMapConverter(requestMap).asJava()));
    }

    public ReplicaFetcherThread(String name, int fetcherId, BrokerEndPoint sourceBroker, KafkaConfig brokerConfig, ReplicaManager replicaMgr, Metrics metrics, Time time) {
        this.kafka$server$ReplicaFetcherThread$$sourceBroker = sourceBroker;
        this.kafka$server$ReplicaFetcherThread$$brokerConfig = brokerConfig;
        this.replicaMgr = replicaMgr;
        this.time = time;
        super(name, name, sourceBroker, Predef$.MODULE$.Integer2int(brokerConfig.replicaFetchBackoffMs()), false);
        this.fetchRequestVersion = (short)(brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_10_0_IV0$.MODULE$) ? 2 : (brokerConfig.interBrokerProtocolVersion().$greater$eq(KAFKA_0_9_0$.MODULE$) ? 1 : 0));
        this.socketTimeout = Predef$.MODULE$.Integer2int(brokerConfig.replicaSocketTimeoutMs());
        this.replicaId = brokerConfig.brokerId();
        this.maxWait = brokerConfig.replicaFetchWaitMaxMs();
        this.minBytes = brokerConfig.replicaFetchMinBytes();
        this.kafka$server$ReplicaFetcherThread$$fetchSize = brokerConfig.replicaFetchMaxBytes();
        this.sourceNode = new Node(sourceBroker.id(), sourceBroker.host(), sourceBroker.port());
        ChannelBuilder channelBuilder = ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol(), Mode.CLIENT, LoginType.SERVER, brokerConfig.values(), brokerConfig.saslMechanismInterBrokerProtocol(), brokerConfig.saslInterBrokerHandshakeRequestEnable());
        Selector selector = new Selector(-1, Predef$.MODULE$.Long2long(brokerConfig.connectionsMaxIdleMs()), metrics, time, "replica-fetcher", JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)scala.collection.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("broker-id"), ((Object)BoxesRunTime.boxToInteger(sourceBroker.id())).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc("fetcher-id"), ((Object)BoxesRunTime.boxToInteger(fetcherId)).toString())}))).asJava(), false, channelBuilder);
        this.kafka$server$ReplicaFetcherThread$$networkClient = new NetworkClient((Selectable)selector, new ManualMetadataUpdater(), this.clientId(), 1, 0L, -1, Predef$.MODULE$.Integer2int(brokerConfig.replicaSocketReceiveBufferBytes()), Predef$.MODULE$.Integer2int(brokerConfig.requestTimeoutMs()), time);
    }

    public static class FetchRequest
    implements AbstractFetcherThread.FetchRequest {
        private final org.apache.kafka.common.requests.FetchRequest underlying;

        public org.apache.kafka.common.requests.FetchRequest underlying() {
            return this.underlying;
        }

        @Override
        public boolean isEmpty() {
            return this.underlying().fetchData().isEmpty();
        }

        @Override
        public long offset(TopicAndPartition topicAndPartition) {
            return ((FetchRequest.PartitionData)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(this.underlying().fetchData()).asScala()).apply(new TopicPartition((String)topicAndPartition.topic(), (int)topicAndPartition.partition()))).offset;
        }

        public FetchRequest(org.apache.kafka.common.requests.FetchRequest underlying) {
            this.underlying = underlying;
        }
    }

    public static class PartitionData
    implements AbstractFetcherThread.PartitionData {
        private final FetchResponse.PartitionData underlying;

        public FetchResponse.PartitionData underlying() {
            return this.underlying;
        }

        @Override
        public short errorCode() {
            return this.underlying().errorCode;
        }

        @Override
        public ByteBufferMessageSet toByteBufferMessageSet() {
            return new ByteBufferMessageSet(this.underlying().recordSet);
        }

        @Override
        public long highWatermark() {
            return this.underlying().highWatermark;
        }

        @Override
        public Option<Throwable> exception() {
            Errors errors = Errors.forCode(this.errorCode());
            Errors errors2 = Errors.NONE;
            Errors errors3 = errors;
            Option option = !(errors2 != null ? !((Object)((Object)errors2)).equals((Object)errors3) : errors3 != null) ? None$.MODULE$ : new Some<ApiException>(errors.exception());
            return option;
        }

        public PartitionData(FetchResponse.PartitionData underlying) {
            this.underlying = underlying;
        }
    }
}

