/*
 * Decompiled with CFR 0.152.
 */
package zeph.http;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import zeph.http.ByteArrayPool;

public class StreamingBodyInputStream
extends InputStream {
    private static final PooledChunk END_MARKER = new PooledChunk(null, 0, false);
    private static final PooledChunk ERROR_MARKER = new PooledChunk(null, 0, false);
    private final BlockingQueue<PooledChunk> dataQueue = new LinkedBlockingQueue<PooledChunk>();
    private final long contentLength;
    private PooledChunk currentChunk;
    private int currentOffset;
    private long totalRead;
    private volatile boolean closed;
    private volatile boolean completed;
    private volatile IOException error;

    public StreamingBodyInputStream(long contentLength) {
        this.contentLength = contentLength;
        this.totalRead = 0L;
        this.closed = false;
        this.completed = false;
    }

    public void feedData(byte[] data) {
        this.feedData(data, data.length, false);
    }

    public void feedData(byte[] data, int length) {
        this.feedData(data, length, true);
    }

    private void feedData(byte[] data, int length, boolean pooled) {
        if (this.closed || this.completed) {
            if (pooled) {
                ByteArrayPool.getInstance().release(data);
            }
            return;
        }
        try {
            this.dataQueue.put(new PooledChunk(data, length, pooled));
        }
        catch (InterruptedException e) {
            if (pooled) {
                ByteArrayPool.getInstance().release(data);
            }
            Thread.currentThread().interrupt();
        }
    }

    public void complete() {
        if (this.completed) {
            return;
        }
        this.completed = true;
        try {
            this.dataQueue.put(END_MARKER);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void signalError(IOException e) {
        this.error = e;
        try {
            this.dataQueue.put(ERROR_MARKER);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public int read() throws IOException {
        byte[] buf = new byte[1];
        int n = this.read(buf, 0, 1);
        return n <= 0 ? -1 : buf[0] & 0xFF;
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (this.closed) {
            throw new IOException("Stream closed");
        }
        if (this.error != null) {
            throw this.error;
        }
        if (this.currentChunk != null && this.currentOffset < this.currentChunk.length) {
            int available = this.currentChunk.length - this.currentOffset;
            int toRead = Math.min(available, len);
            System.arraycopy(this.currentChunk.data, this.currentOffset, b, off, toRead);
            this.currentOffset += toRead;
            this.totalRead += (long)toRead;
            if (this.currentOffset >= this.currentChunk.length) {
                this.currentChunk.release();
                this.currentChunk = null;
            }
            return toRead;
        }
        try {
            PooledChunk chunk = this.dataQueue.poll(100L, TimeUnit.MILLISECONDS);
            while (chunk == null && !this.closed && this.error == null) {
                chunk = this.dataQueue.poll(100L, TimeUnit.MILLISECONDS);
            }
            if (this.error != null) {
                throw this.error;
            }
            if (this.closed) {
                return -1;
            }
            if (chunk == END_MARKER || chunk == null) {
                return -1;
            }
            if (chunk == ERROR_MARKER) {
                throw this.error != null ? this.error : new IOException("Stream error");
            }
            this.currentChunk = chunk;
            this.currentOffset = 0;
            int available = chunk.length;
            int toRead = Math.min(available, len);
            System.arraycopy(chunk.data, 0, b, off, toRead);
            this.currentOffset = toRead;
            this.totalRead += (long)toRead;
            if (this.currentOffset >= this.currentChunk.length) {
                this.currentChunk.release();
                this.currentChunk = null;
            }
            return toRead;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Read interrupted", e);
        }
    }

    @Override
    public int available() throws IOException {
        if (this.currentChunk != null) {
            return this.currentChunk.length - this.currentOffset;
        }
        PooledChunk peeked = (PooledChunk)this.dataQueue.peek();
        if (peeked != null && peeked != END_MARKER && peeked != ERROR_MARKER) {
            return peeked.length;
        }
        return 0;
    }

    @Override
    public void close() throws IOException {
        PooledChunk chunk;
        this.closed = true;
        if (this.currentChunk != null) {
            this.currentChunk.release();
            this.currentChunk = null;
        }
        while ((chunk = (PooledChunk)this.dataQueue.poll()) != null) {
            if (chunk == END_MARKER || chunk == ERROR_MARKER) continue;
            chunk.release();
        }
    }

    public long getTotalRead() {
        return this.totalRead;
    }

    public long getContentLength() {
        return this.contentLength;
    }

    public byte[] drainAllData() {
        PooledChunk chunk;
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        if (this.currentChunk != null && this.currentOffset < this.currentChunk.length) {
            baos.write(this.currentChunk.data, this.currentOffset, this.currentChunk.length - this.currentOffset);
            this.currentChunk.release();
            this.currentChunk = null;
            this.currentOffset = 0;
        }
        while ((chunk = (PooledChunk)this.dataQueue.poll()) != null && chunk != END_MARKER && chunk != ERROR_MARKER) {
            baos.write(chunk.data, 0, chunk.length);
            chunk.release();
        }
        return baos.toByteArray();
    }

    private static class PooledChunk {
        final byte[] data;
        final int length;
        final boolean pooled;

        PooledChunk(byte[] data, int length, boolean pooled) {
            this.data = data;
            this.length = length;
            this.pooled = pooled;
        }

        void release() {
            if (this.pooled && this.data != null) {
                ByteArrayPool.getInstance().release(this.data);
            }
        }
    }
}

