/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.io.impl;

import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.io.FileReader;
import com.pinterest.secor.io.FileReaderWriterFactory;
import com.pinterest.secor.io.FileWriter;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.util.FileUtil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageUnpacker;

public class MessagePackSequenceFileReaderWriterFactory
implements FileReaderWriterFactory {
    private static final int KAFKA_MESSAGE_OFFSET = 1;
    private static final int KAFKA_HASH_KEY = 2;
    private static final byte[] EMPTY_BYTES = new byte[0];

    @Override
    public FileReader BuildFileReader(LogFilePath logFilePath, CompressionCodec codec) throws Exception {
        return new MessagePackSequenceFileReader(logFilePath);
    }

    @Override
    public FileWriter BuildFileWriter(LogFilePath logFilePath, CompressionCodec codec) throws IOException {
        return new MessagePackSequenceFileWriter(logFilePath, codec);
    }

    protected class MessagePackSequenceFileWriter
    implements FileWriter {
        private final SequenceFile.Writer mWriter;
        private final BytesWritable mKey;
        private final BytesWritable mValue;

        public MessagePackSequenceFileWriter(LogFilePath path, CompressionCodec codec) throws IOException {
            Configuration config = new Configuration();
            Path fsPath = new Path(path.getLogFilePath());
            FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
            this.mWriter = codec != null ? SequenceFile.createWriter((FileSystem)fs, (Configuration)config, (Path)fsPath, BytesWritable.class, BytesWritable.class, (SequenceFile.CompressionType)SequenceFile.CompressionType.BLOCK, (CompressionCodec)codec) : SequenceFile.createWriter((FileSystem)fs, (Configuration)config, (Path)fsPath, BytesWritable.class, BytesWritable.class);
            this.mKey = new BytesWritable();
            this.mValue = new BytesWritable();
        }

        @Override
        public long getLength() throws IOException {
            return this.mWriter.getLength();
        }

        @Override
        public void write(KeyValue keyValue) throws IOException {
            byte[] kafkaKey = keyValue.getKafkaKey();
            ByteArrayOutputStream out = new ByteArrayOutputStream(17 + kafkaKey.length);
            MessagePacker packer = MessagePack.newDefaultPacker((OutputStream)out).packMapHeader(kafkaKey.length == 0 ? 1 : 2).packInt(1).packLong(keyValue.getOffset());
            if (kafkaKey.length != 0) {
                packer.packInt(2).packBinaryHeader(kafkaKey.length).writePayload(kafkaKey);
            }
            packer.close();
            byte[] outBytes = out.toByteArray();
            this.mKey.set(outBytes, 0, outBytes.length);
            this.mValue.set(keyValue.getValue(), 0, keyValue.getValue().length);
            this.mWriter.append((Writable)this.mKey, (Writable)this.mValue);
        }

        @Override
        public void close() throws IOException {
            this.mWriter.close();
        }
    }

    protected class MessagePackSequenceFileReader
    implements FileReader {
        private final SequenceFile.Reader mReader;
        private final BytesWritable mKey;
        private final BytesWritable mValue;

        public MessagePackSequenceFileReader(LogFilePath path) throws Exception {
            Configuration config = new Configuration();
            Path fsPath = new Path(path.getLogFilePath());
            FileSystem fs = FileUtil.getFileSystem(path.getLogFilePath());
            this.mReader = new SequenceFile.Reader(fs, fsPath, config);
            this.mKey = (BytesWritable)this.mReader.getKeyClass().newInstance();
            this.mValue = (BytesWritable)this.mReader.getValueClass().newInstance();
        }

        @Override
        public KeyValue next() throws IOException {
            if (this.mReader.next((Writable)this.mKey, (Writable)this.mValue)) {
                MessageUnpacker unpacker = MessagePack.newDefaultUnpacker((byte[])this.mKey.getBytes());
                int mapSize = unpacker.unpackMapHeader();
                long offset = 0L;
                byte[] keyBytes = EMPTY_BYTES;
                block4: for (int i = 0; i < mapSize; ++i) {
                    int key = unpacker.unpackInt();
                    switch (key) {
                        case 1: {
                            offset = unpacker.unpackLong();
                            continue block4;
                        }
                        case 2: {
                            int keySize = unpacker.unpackBinaryHeader();
                            keyBytes = new byte[keySize];
                            unpacker.readPayload(keyBytes);
                        }
                    }
                }
                unpacker.close();
                return new KeyValue(offset, keyBytes, Arrays.copyOfRange(this.mValue.getBytes(), 0, this.mValue.getLength()));
            }
            return null;
        }

        @Override
        public void close() throws IOException {
            this.mReader.close();
        }
    }
}

