/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client;

import com.linkedin.databus.client.netty.HttpResponseProcessor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpChunkTrailer;
import org.jboss.netty.handler.codec.http.HttpResponse;

public class ChunkedBodyReadableByteChannel
implements ReadableByteChannel,
HttpResponseProcessor {
    public static final String MODULE = ChunkedBodyReadableByteChannel.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private static final int MAX_BUFFERED_CHUNKS = 1000;
    static final int MAX_BUFFERED_BYTES = 0x7D0000;
    static final int MAX_CHUNK_SPACE_WAIT_MS = 15000;
    private AtomicBoolean _open = new AtomicBoolean(true);
    private ChannelBuffer _currentBuffer = null;
    private HttpResponse _response = null;
    private HttpChunkTrailer _trailer = null;
    private final Lock _chunkQueueLock = new ReentrantLock(true);
    private final Condition _hasChunksCondition = this._chunkQueueLock.newCondition();
    private final Condition _hasChunkSpaceCondition = this._chunkQueueLock.newCondition();
    private final Queue<ChannelBuffer> _chunks = new ArrayDeque<ChannelBuffer>(1000);
    private int _totalBufferedBytes = 0;
    private volatile boolean _noMoreChunks = false;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        this._open.set(false);
        this._chunkQueueLock.lock();
        try {
            this.signalNoMoreChunks();
            this._hasChunkSpaceCondition.signalAll();
        }
        finally {
            this._chunkQueueLock.unlock();
        }
    }

    @Override
    public boolean isOpen() {
        return this._open.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int read(ByteBuffer buffer) throws IOException {
        int destRemaining;
        if (!this._open.get()) {
            return -1;
        }
        int saveRemaining = destRemaining = buffer.remaining();
        while (destRemaining > 0) {
            if (null == this._currentBuffer && !this.getChunk()) {
                int bytesAlreadyWritten = saveRemaining - destRemaining;
                return 0 == bytesAlreadyWritten ? -1 : bytesAlreadyWritten;
            }
            int bufferBytes = this._currentBuffer.readableBytes();
            int saveLimit = -1;
            if (bufferBytes < destRemaining) {
                saveLimit = buffer.limit();
                buffer.limit(buffer.position() + bufferBytes);
            }
            this._currentBuffer.readBytes(buffer);
            if (-1 != saveLimit) {
                buffer.limit(saveLimit);
            }
            if (0 == this._currentBuffer.readableBytes()) {
                this._currentBuffer.resetReaderIndex();
                this._chunkQueueLock.lock();
                try {
                    this._totalBufferedBytes -= this._currentBuffer.readableBytes();
                    this._hasChunkSpaceCondition.signalAll();
                }
                finally {
                    this._chunkQueueLock.unlock();
                }
                this._currentBuffer = null;
            }
            destRemaining = buffer.remaining();
        }
        return saveRemaining - destRemaining;
    }

    private boolean checkIfEnoughSpace(int newChunkSize) {
        boolean result = newChunkSize >= 0x7D0000 ? this._chunks.size() == 0 : this._chunks.size() < 1000 && this._totalBufferedBytes + newChunkSize <= 0x7D0000;
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addBytes(ChannelBuffer buffer) throws TimeoutException {
        if (this._open.get()) {
            this._chunkQueueLock.lock();
            try {
                int contentSize = buffer.readableBytes();
                boolean canBuffer = this.checkIfEnoughSpace(contentSize);
                boolean doLoop = true;
                while (doLoop && !canBuffer) {
                    try {
                        if (!this._hasChunkSpaceCondition.await(15000L, TimeUnit.MILLISECONDS)) {
                            throw new TimeoutException("waiting for chunk space");
                        }
                    }
                    catch (InterruptedException ie) {
                        LOG.info((Object)"interrupted");
                        doLoop = false;
                    }
                    if (!this._open.get()) {
                        return;
                    }
                    canBuffer = this.checkIfEnoughSpace(contentSize);
                }
                if (canBuffer) {
                    this._chunks.add(buffer);
                    this._totalBufferedBytes += contentSize;
                    if (this._chunks.size() == 1) {
                        this._hasChunksCondition.signalAll();
                    }
                }
            }
            finally {
                this._chunkQueueLock.unlock();
            }
        }
    }

    @Override
    public void addChunk(HttpChunk chunk) throws TimeoutException {
        if (null == chunk) {
            LOG.error((Object)"unexpected null chunk");
        } else {
            this.addBytes(chunk.getContent());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean getChunk() {
        boolean result = false;
        this._chunkQueueLock.lock();
        try {
            boolean doLoop;
            ChannelBuffer nextChunk = this._chunks.poll();
            boolean bl = doLoop = !this._noMoreChunks;
            while (doLoop && null == nextChunk && !this._noMoreChunks) {
                try {
                    this._hasChunksCondition.await();
                }
                catch (InterruptedException ie) {
                    LOG.info((Object)"interrupted");
                    doLoop = false;
                }
                if (!this._open.get()) break;
                nextChunk = this._chunks.poll();
            }
            if (null != nextChunk) {
                this._currentBuffer = nextChunk;
                if (0 == this._currentBuffer.readableBytes()) {
                    this.signalNoMoreChunksWithLock();
                }
                if (0 == this._chunks.size() || 999 == this._chunks.size()) {
                    this._hasChunkSpaceCondition.signalAll();
                }
                result = true;
            }
        }
        finally {
            this._chunkQueueLock.unlock();
        }
        return result;
    }

    @Override
    public void addTrailer(HttpChunkTrailer trailer) throws TimeoutException {
        this._trailer = trailer;
        this.addChunk((HttpChunk)trailer);
    }

    @Override
    public void finishResponse() {
        String dbusReqLatencyStr;
        boolean debugEnabled = LOG.isDebugEnabled();
        if (debugEnabled && null != (dbusReqLatencyStr = this.getMetadata("x-dbus-req-latency"))) {
            LOG.debug((Object)("Databus request processing latency (ms): " + dbusReqLatencyStr));
        }
        this.signalNoMoreChunksWithLock();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startResponse(HttpResponse response) throws TimeoutException {
        String dbusReqId;
        boolean debugEnabled = LOG.isDebugEnabled();
        if (debugEnabled && null != (dbusReqId = response.getHeader("x-dbus-req-id"))) {
            LOG.debug((Object)("Received response for databus reqid: " + dbusReqId));
        }
        String contentLengthStr = response.getHeader("Content-Length");
        this._chunkQueueLock.lock();
        try {
            this._response = response;
            this._currentBuffer = null;
            if (null == contentLengthStr) {
                this._noMoreChunks = false;
            } else {
                this.addBytes(response.getContent());
                this.signalNoMoreChunks();
            }
        }
        finally {
            this._chunkQueueLock.unlock();
        }
    }

    public String getMetadata(String key) {
        String result = null;
        if (null != this._trailer) {
            result = this._trailer.getHeader(key);
        }
        if (null == result && null != this._response) {
            result = this._response.getHeader(key);
        }
        return result;
    }

    @Override
    public void channelException(Throwable cause) {
        if (cause instanceof ClosedChannelException) {
            LOG.warn((Object)"channel unexpectedly closed.");
        } else if (cause instanceof RejectedExecutionException) {
            LOG.info((Object)"shutdown in progress");
        } else {
            LOG.error((Object)("channel exception(" + cause.getClass().getSimpleName() + "):" + cause.getMessage()), cause);
        }
        try {
            this.close();
        }
        catch (IOException ioe) {
            LOG.error((Object)("Error closing channel:" + ioe.getMessage()), (Throwable)ioe);
        }
    }

    private void signalNoMoreChunks() {
        this._noMoreChunks = true;
        this._hasChunksCondition.signalAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void signalNoMoreChunksWithLock() {
        this._chunkQueueLock.lock();
        try {
            this.signalNoMoreChunks();
        }
        finally {
            this._chunkQueueLock.unlock();
        }
    }

    public boolean hasNoMoreChunks() {
        return this._noMoreChunks;
    }
}

