/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.io.impl;

import com.google.common.io.CountingOutputStream;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.io.FileReader;
import com.pinterest.secor.io.FileReaderWriterFactory;
import com.pinterest.secor.io.FileWriter;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.util.FileUtil;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;

public class DelimitedTextFileReaderWriterFactory
implements FileReaderWriterFactory {
    private static final byte DELIMITER = 10;

    @Override
    public FileReader BuildFileReader(LogFilePath logFilePath, CompressionCodec codec) throws IllegalAccessException, IOException, InstantiationException {
        return new DelimitedTextFileReader(logFilePath, codec);
    }

    @Override
    public FileWriter BuildFileWriter(LogFilePath logFilePath, CompressionCodec codec) throws IOException {
        return new DelimitedTextFileWriter(logFilePath, codec);
    }

    protected class DelimitedTextFileWriter
    implements FileWriter {
        private final CountingOutputStream mCountingStream;
        private final BufferedOutputStream mWriter;
        private Compressor mCompressor = null;

        public DelimitedTextFileWriter(LogFilePath path, CompressionCodec codec) throws IOException {
            BufferedOutputStream bufferedOutputStream;
            Path fsPath = new Path(path.getLogFilePath());
            FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
            this.mCountingStream = new CountingOutputStream((OutputStream)fs.create(fsPath));
            if (codec == null) {
                bufferedOutputStream = new BufferedOutputStream((OutputStream)this.mCountingStream);
            } else {
                this.mCompressor = CodecPool.getCompressor((CompressionCodec)codec);
                BufferedOutputStream bufferedOutputStream2 = new BufferedOutputStream((OutputStream)codec.createOutputStream((OutputStream)this.mCountingStream, this.mCompressor));
                bufferedOutputStream = bufferedOutputStream2;
            }
            this.mWriter = bufferedOutputStream;
        }

        @Override
        public long getLength() throws IOException {
            assert (this.mCountingStream != null);
            return this.mCountingStream.getCount();
        }

        @Override
        public void write(KeyValue keyValue) throws IOException {
            this.mWriter.write(keyValue.getValue());
            this.mWriter.write(10);
        }

        @Override
        public void close() throws IOException {
            this.mWriter.close();
            CodecPool.returnCompressor((Compressor)this.mCompressor);
            this.mCompressor = null;
        }
    }

    protected class DelimitedTextFileReader
    implements FileReader {
        private final BufferedInputStream mReader;
        private long mOffset;
        private Decompressor mDecompressor = null;

        public DelimitedTextFileReader(LogFilePath path, CompressionCodec codec) throws IOException {
            BufferedInputStream bufferedInputStream;
            Path fsPath = new Path(path.getLogFilePath());
            FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
            FSDataInputStream inputStream = fs.open(fsPath);
            if (codec == null) {
                bufferedInputStream = new BufferedInputStream((InputStream)inputStream);
            } else {
                this.mDecompressor = CodecPool.getDecompressor((CompressionCodec)codec);
                BufferedInputStream bufferedInputStream2 = new BufferedInputStream((InputStream)codec.createInputStream((InputStream)inputStream, this.mDecompressor));
                bufferedInputStream = bufferedInputStream2;
            }
            this.mReader = bufferedInputStream;
            this.mOffset = path.getOffset();
        }

        @Override
        public KeyValue next() throws IOException {
            int nextByte;
            ByteArrayOutputStream messageBuffer = new ByteArrayOutputStream();
            while ((nextByte = this.mReader.read()) != 10) {
                if (nextByte == -1) {
                    if (messageBuffer.size() == 0) {
                        return null;
                    }
                    throw new EOFException("Non-empty message without delimiter");
                }
                messageBuffer.write(nextByte);
            }
            return new KeyValue(this.mOffset++, messageBuffer.toByteArray());
        }

        @Override
        public void close() throws IOException {
            this.mReader.close();
            CodecPool.returnDecompressor((Decompressor)this.mDecompressor);
            this.mDecompressor = null;
        }
    }
}

