/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.elephantbird.mapreduce.io;

import com.twitter.data.proto.BlockStorage;
import com.twitter.elephantbird.mapreduce.io.BinaryConverter;
import com.twitter.elephantbird.mapreduce.io.BinaryWritable;
import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.StreamSearcher;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BinaryBlockReader<M> {
    private static final Logger LOG = LoggerFactory.getLogger(BinaryBlockReader.class);
    private InputStream in_;
    private final StreamSearcher searcher_;
    private final BinaryConverter<M> protoConverter_;
    private BlockStorage.SerializedBlock curBlock_;
    private int numLeftToReadThisBlock_ = 0;
    private boolean readNewBlocks_ = true;

    protected BinaryBlockReader(InputStream inputStream, BinaryConverter<M> binaryConverter) {
        this.in_ = inputStream;
        this.protoConverter_ = binaryConverter;
        this.searcher_ = new StreamSearcher(Protobufs.KNOWN_GOOD_POSITION_MARKER);
    }

    public void close() throws IOException {
        if (this.in_ != null) {
            this.in_.close();
        }
    }

    public void setInputStream(InputStream inputStream) {
        this.in_ = inputStream;
    }

    public M readNext() throws IOException {
        byte[] byArray = this.readNextProtoBytes();
        return byArray == null ? null : (M)this.protoConverter_.fromBytes(byArray);
    }

    public boolean readNext(BinaryWritable<M> binaryWritable) throws IOException {
        byte[] byArray = this.readNextProtoBytes();
        if (byArray != null) {
            binaryWritable.set(this.protoConverter_.fromBytes(byArray));
            return true;
        }
        return false;
    }

    public byte[] readNextProtoBytes() throws IOException {
        if (!this.setupNewBlockIfNeeded()) {
            return null;
        }
        int n = this.curBlock_.getProtoBlobsCount() - this.numLeftToReadThisBlock_;
        --this.numLeftToReadThisBlock_;
        return this.curBlock_.getProtoBlobs(n).toByteArray();
    }

    public boolean readNextProtoBytes(BytesWritable bytesWritable) throws IOException {
        byte[] byArray = this.readNextProtoBytes();
        if (byArray != null) {
            bytesWritable.set(byArray, 0, byArray.length);
            return true;
        }
        return false;
    }

    public void markNoMoreNewBlocks() {
        this.readNewBlocks_ = false;
    }

    public boolean skipToNextSyncPoint() throws IOException {
        return this.searcher_.search(this.in_);
    }

    public BlockStorage.SerializedBlock parseNextBlock() throws IOException {
        LOG.debug("BlockReader: none left to read, skipping to sync point");
        if (!this.skipToNextSyncPoint()) {
            LOG.debug("BlockReader: SYNC point eof");
            return null;
        }
        int n = this.readInt();
        LOG.debug("BlockReader: found sync point, next block has size " + n);
        if (n < 0) {
            LOG.debug("ProtobufReader: reading size after sync point eof");
            return null;
        }
        byte[] byArray = new byte[n];
        IOUtils.readFully((InputStream)this.in_, (byte[])byArray, (int)0, (int)n);
        BlockStorage.SerializedBlock serializedBlock = BlockStorage.SerializedBlock.parseFrom(byArray);
        this.numLeftToReadThisBlock_ = serializedBlock.getProtoBlobsCount();
        LOG.debug("ProtobufReader: number in next block is " + this.numLeftToReadThisBlock_);
        return serializedBlock;
    }

    private boolean setupNewBlockIfNeeded() throws IOException {
        if (this.numLeftToReadThisBlock_ == 0) {
            if (!this.readNewBlocks_) {
                return false;
            }
            this.curBlock_ = this.parseNextBlock();
            if (this.curBlock_ == null) {
                return false;
            }
        }
        return true;
    }

    private int readInt() throws IOException {
        int n = this.in_.read();
        if (n == -1) {
            return -1;
        }
        return n | this.in_.read() << 8 | this.in_.read() << 16 | this.in_.read() << 24;
    }
}

