/*
 * Decompiled with CFR 0.152.
 */
package kafka.consumer;

import java.util.concurrent.CountDownLatch;
import kafka.api.FetchRequest;
import kafka.api.MultiFetchResponse;
import kafka.api.OffsetRequest$;
import kafka.cluster.Broker;
import kafka.cluster.Partition;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.FetcherRunnable$;
import kafka.consumer.PartitionTopicInfo;
import kafka.consumer.SimpleConsumer;
import kafka.utils.Utils$;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.ScalaObject;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001\u0005ec\u0001C\u0001\u0003\t\u0003\u0005\t\u0011A\u0004\u0003\u001f\u0019+Go\u00195feJ+hN\\1cY\u0016T!a\u0001\u0003\u0002\u0011\r|gn];nKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\r\u0001\u0001\u0002\u0005\t\u0003\u00139i\u0011A\u0003\u0006\u0003\u00171\tA\u0001\\1oO*\tQ\"\u0001\u0003kCZ\f\u0017BA\b\u000b\u0005\u0019!\u0006N]3bIB\u0011\u0011\u0003F\u0007\u0002%)\t1#A\u0003tG\u0006d\u0017-\u0003\u0002\u0016%\tY1kY1mC>\u0013'.Z2u\u0011!9\u0002A!b\u0001\n\u0003A\u0012\u0001\u00028b[\u0016,\u0012!\u0007\t\u00035uq!!E\u000e\n\u0005q\u0011\u0012A\u0002)sK\u0012,g-\u0003\u0002\u001f?\t11\u000b\u001e:j]\u001eT!\u0001\b\n\t\u0011\u0005\u0002!\u0011!Q\u0001\ne\tQA\\1nK\u0002B\u0001b\t\u0001\u0003\u0006\u0004%\t\u0001J\u0001\tu.\u001cE.[3oiV\tQ\u0005\u0005\u0002'[5\tqE\u0003\u0002)S\u0005A!p[2mS\u0016tGO\u0003\u0002+W\u00051\u0011\nM%uK\u000eT\u0011\u0001L\u0001\u0004_J<\u0017B\u0001\u0018(\u0005!Q6n\u00117jK:$\b\u0002\u0003\u0019\u0001\u0005\u0003\u0005\u000b\u0011B\u0013\u0002\u0013i\\7\t\\5f]R\u0004\u0003\u0002\u0003\u001a\u0001\u0005\u000b\u0007I\u0011A\u001a\u0002\r\r|gNZ5h+\u0005!\u0004CA\u001b7\u001b\u0005\u0011\u0011BA\u001c\u0003\u00059\u0019uN\\:v[\u0016\u00148i\u001c8gS\u001eD\u0001\"\u000f\u0001\u0003\u0002\u0003\u0006I\u0001N\u0001\bG>tg-[4!\u0011!Y\u0004A!b\u0001\n\u0003a\u0014A\u00022s_.,'/F\u0001>!\tq\u0014)D\u0001@\u0015\t\u0001E!A\u0004dYV\u001cH/\u001a:\n\u0005\t{$A\u0002\"s_.,'\u000f\u0003\u0005E\u0001\t\u0005\t\u0015!\u0003>\u0003\u001d\u0011'o\\6fe\u0002B\u0001B\u0012\u0001\u0003\u0006\u0004%\taR\u0001\u0014a\u0006\u0014H/\u001b;j_:$v\u000e]5d\u0013:4wn]\u000b\u0002\u0011B\u0019\u0011*\u0015+\u000f\u0005){eBA&O\u001b\u0005a%BA'\u0007\u0003\u0019a$o\\8u}%\t1#\u0003\u0002Q%\u00059\u0001/Y2lC\u001e,\u0017B\u0001*T\u0005\u0011a\u0015n\u001d;\u000b\u0005A\u0013\u0002CA\u001bV\u0013\t1&A\u0001\nQCJ$\u0018\u000e^5p]R{\u0007/[2J]\u001a|\u0007\u0002\u0003-\u0001\u0005\u0003\u0005\u000b\u0011\u0002%\u0002)A\f'\u000f^5uS>tGk\u001c9jG&sgm\\:!\u0011\u0015Q\u0006\u0001\"\u0001\\\u0003\u0019a\u0014N\\5u}Q1A,\u00180`A\u0006\u0004\"!\u000e\u0001\t\u000b]I\u0006\u0019A\r\t\u000b\rJ\u0006\u0019A\u0013\t\u000bIJ\u0006\u0019\u0001\u001b\t\u000bmJ\u0006\u0019A\u001f\t\u000b\u0019K\u0006\u0019\u0001%\t\u000f\r\u0004!\u0019!C\u0005I\u00061An\\4hKJ,\u0012!\u001a\t\u0003M.l\u0011a\u001a\u0006\u0003Q&\fQ\u0001\\8hi)T!A[\u0016\u0002\r\u0005\u0004\u0018m\u00195f\u0013\tawM\u0001\u0004M_\u001e<WM\u001d\u0005\u0007]\u0002\u0001\u000b\u0011B3\u0002\u000f1|wmZ3sA!9\u0001\u000f\u0001b\u0001\n\u0013\t\u0018!D:ikR$wn\u001e8MCR\u001c\u0007.F\u0001s!\t\u0019\b0D\u0001u\u0015\t)h/\u0001\u0006d_:\u001cWO\u001d:f]RT!a\u001e\u0007\u0002\tU$\u0018\u000e\\\u0005\u0003sR\u0014abQ8v]R$un\u001e8MCR\u001c\u0007\u000e\u0003\u0004|\u0001\u0001\u0006IA]\u0001\u000fg\",H\u000fZ8x]2\u000bGo\u00195!\u0011\u001di\bA1A\u0005\ny\fab]5na2,7i\u001c8tk6,'/F\u0001\u0000!\r)\u0014\u0011A\u0005\u0004\u0003\u0007\u0011!AD*j[BdWmQ8ogVlWM\u001d\u0005\b\u0003\u000f\u0001\u0001\u0015!\u0003\u0000\u0003=\u0019\u0018.\u001c9mK\u000e{gn];nKJ\u0004\u0003\"CA\u0006\u0001\u0001\u0007I\u0011BA\u0007\u0003\u001d\u0019Ho\u001c9qK\u0012,\"!a\u0004\u0011\u0007E\t\t\"C\u0002\u0002\u0014I\u0011qAQ8pY\u0016\fg\u000eC\u0005\u0002\u0018\u0001\u0001\r\u0011\"\u0003\u0002\u001a\u0005Y1\u000f^8qa\u0016$w\fJ3r)\u0011\tY\"!\t\u0011\u0007E\ti\"C\u0002\u0002 I\u0011A!\u00168ji\"Q\u00111EA\u000b\u0003\u0003\u0005\r!a\u0004\u0002\u0007a$\u0013\u0007\u0003\u0005\u0002(\u0001\u0001\u000b\u0015BA\b\u0003!\u0019Ho\u001c9qK\u0012\u0004\u0003\u0006BA\u0013\u0003W\u00012!EA\u0017\u0013\r\tyC\u0005\u0002\tm>d\u0017\r^5mK\"9\u00111\u0007\u0001\u0005\u0002\u0005U\u0012\u0001C:ikR$wn\u001e8\u0015\u0005\u0005m\u0001bBA\u001d\u0001\u0011\u0005\u0013QG\u0001\u0004eVt\u0007bBA\u001f\u0001\u0011%\u0011QG\u0001\u0011g\",H\u000fZ8x]\u000e{W\u000e\u001d7fi\u0016Dq!!\u0011\u0001\t\u0013\t\u0019%\u0001\u000bsKN,GoQ8ogVlWM](gMN,Go\u001d\u000b\u0007\u0003\u000b\nY%a\u0014\u0011\u0007E\t9%C\u0002\u0002JI\u0011A\u0001T8oO\"9\u0011QJA \u0001\u0004I\u0012!\u0002;pa&\u001c\u0007\u0002CA)\u0003\u007f\u0001\r!a\u0015\u0002\u0013A\f'\u000f^5uS>t\u0007c\u0001 \u0002V%\u0019\u0011qK \u0003\u0013A\u000b'\u000f^5uS>t\u0007")
public class FetcherRunnable
extends Thread
implements ScalaObject {
    private final String name;
    private final ZkClient zkClient;
    private final ConsumerConfig config;
    private final Broker broker;
    private final List<PartitionTopicInfo> partitionTopicInfos;
    private final Logger kafka$consumer$FetcherRunnable$$logger;
    private final CountDownLatch shutdownLatch;
    private final SimpleConsumer kafka$consumer$FetcherRunnable$$simpleConsumer;
    private volatile boolean kafka$consumer$FetcherRunnable$$stopped;

    public String name() {
        return this.name;
    }

    public ZkClient zkClient() {
        return this.zkClient;
    }

    public ConsumerConfig config() {
        return this.config;
    }

    public Broker broker() {
        return this.broker;
    }

    public List<PartitionTopicInfo> partitionTopicInfos() {
        return this.partitionTopicInfos;
    }

    public final Logger kafka$consumer$FetcherRunnable$$logger() {
        return this.kafka$consumer$FetcherRunnable$$logger;
    }

    private CountDownLatch shutdownLatch() {
        return this.shutdownLatch;
    }

    public final SimpleConsumer kafka$consumer$FetcherRunnable$$simpleConsumer() {
        return this.kafka$consumer$FetcherRunnable$$simpleConsumer;
    }

    public final boolean kafka$consumer$FetcherRunnable$$stopped() {
        return this.kafka$consumer$FetcherRunnable$$stopped;
    }

    private void kafka$consumer$FetcherRunnable$$stopped_$eq(boolean bl) {
        this.kafka$consumer$FetcherRunnable$$stopped = bl;
    }

    public void shutdown() {
        this.kafka$consumer$FetcherRunnable$$stopped_$eq(true);
        this.interrupt();
        this.kafka$consumer$FetcherRunnable$$logger().debug((Object)new StringBuilder().append((Object)"awaiting shutdown on fetcher ").append((Object)this.name()).toString());
        this.shutdownLatch().await();
        this.kafka$consumer$FetcherRunnable$$logger().debug((Object)new StringBuilder().append((Object)"shutdown of fetcher ").append((Object)this.name()).append((Object)" thread complete").toString());
    }

    @Override
    public void run() {
        this.partitionTopicInfos().foreach((Function1)new $anonfun$run$2(this));
        try {
            while (!this.kafka$consumer$FetcherRunnable$$stopped()) {
                List fetches = (List)this.partitionTopicInfos().map((Function1)new $anonfun$1(this), List$.MODULE$.canBuildFrom());
                if (this.kafka$consumer$FetcherRunnable$$logger().isTraceEnabled()) {
                    this.kafka$consumer$FetcherRunnable$$logger().trace((Object)new StringBuilder().append((Object)"fetch request: ").append((Object)fetches.toString()).toString());
                }
                MultiFetchResponse response = this.kafka$consumer$FetcherRunnable$$simpleConsumer().multifetch((Seq<FetchRequest>)fetches);
                LongRef read$1 = new LongRef(0L);
                ((IterableLike)response.zip((Iterable)this.partitionTopicInfos(), Iterable$.MODULE$.canBuildFrom())).foreach((Function1)new $anonfun$run$3(this, read$1));
                if (this.kafka$consumer$FetcherRunnable$$logger().isTraceEnabled()) {
                    this.kafka$consumer$FetcherRunnable$$logger().trace((Object)new StringBuilder().append((Object)"fetched bytes: ").append((Object)BoxesRunTime.boxToLong((long)read$1.elem)).toString());
                }
                if (read$1.elem != 0L) continue;
                this.kafka$consumer$FetcherRunnable$$logger().debug((Object)new StringBuilder().append((Object)"backing off ").append((Object)BoxesRunTime.boxToLong((long)this.config().backoffIncrementMs())).append((Object)" ms").toString());
                Thread.sleep(this.config().backoffIncrementMs());
            }
        }
        catch (Throwable throwable) {
            if (this.kafka$consumer$FetcherRunnable$$stopped()) {
                this.kafka$consumer$FetcherRunnable$$logger().info((Object)new StringBuilder().append((Object)"FecherRunnable ").append((Object)this).append((Object)" interrupted").toString());
            }
            this.kafka$consumer$FetcherRunnable$$logger().error((Object)"error in FetcherRunnable ", throwable);
        }
        this.kafka$consumer$FetcherRunnable$$logger().info((Object)new StringBuilder().append((Object)"stopping fetcher ").append((Object)this.name()).append((Object)" to host ").append((Object)this.broker().host()).toString());
        Utils$.MODULE$.swallow((Function2<Object, Throwable, Object>)new $anonfun$run$4(this), (Function0<Object>)new $anonfun$run$1(this));
        this.shutdownComplete();
    }

    private void shutdownComplete() {
        this.shutdownLatch().countDown();
    }

    public final long kafka$consumer$FetcherRunnable$$resetConsumerOffsets(String topic, Partition partition) {
        block4: {
            long offset;
            block3: {
                String string;
                block2: {
                    offset = 0L;
                    String string2 = string = this.config().autoOffsetReset();
                    String string3 = OffsetRequest$.MODULE$.SmallestTimeString();
                    if (string2 != null ? !string2.equals(string3) : string3 != null) break block2;
                    offset = OffsetRequest$.MODULE$.EarliestTime();
                    break block3;
                }
                String string4 = string;
                String string5 = OffsetRequest$.MODULE$.LargestTimeString();
                if (string4 != null ? !string4.equals(string5) : string5 != null) break block4;
                offset = OffsetRequest$.MODULE$.LatestTime();
            }
            long[] offsets = this.kafka$consumer$FetcherRunnable$$simpleConsumer().getOffsetsBefore(topic, partition.partId(), offset, 1);
            ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(this.config().groupId(), topic);
            this.kafka$consumer$FetcherRunnable$$logger().info((Object)new StringBuilder().append((Object)"updating partition ").append((Object)partition.name()).append((Object)" for topic ").append((Object)topic).append((Object)" with ").append((Object)(offset == OffsetRequest$.MODULE$.EarliestTime() ? "earliest " : " latest ")).append((Object)"offset ").append((Object)BoxesRunTime.boxToLong((long)offsets[0])).toString());
            ZkUtils$.MODULE$.updatePersistentPath(this.zkClient(), new StringBuilder().append((Object)topicDirs.consumerOffsetDir()).append((Object)"/").append((Object)partition.name()).toString(), ((Object)BoxesRunTime.boxToLong((long)offsets[0])).toString());
            return offsets[0];
        }
        return -1L;
    }

    public FetcherRunnable(String name, ZkClient zkClient, ConsumerConfig config, Broker broker, List<PartitionTopicInfo> partitionTopicInfos) {
        this.name = name;
        this.zkClient = zkClient;
        this.config = config;
        this.broker = broker;
        this.partitionTopicInfos = partitionTopicInfos;
        super(name);
        this.kafka$consumer$FetcherRunnable$$logger = Logger.getLogger(this.getClass());
        this.shutdownLatch = new CountDownLatch(1);
        this.kafka$consumer$FetcherRunnable$$simpleConsumer = new SimpleConsumer(broker.host(), broker.port(), config.socketTimeoutMs(), config.socketBufferSize());
        this.kafka$consumer$FetcherRunnable$$stopped = false;
    }
}

