/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.writer;

import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.io.FileWriter;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.message.ParsedMessage;
import com.pinterest.secor.util.CompressionUtil;
import com.pinterest.secor.util.IdUtil;
import com.pinterest.secor.util.StatsUtil;
import java.io.IOException;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageWriter {
    private static final Logger LOG = LoggerFactory.getLogger(MessageWriter.class);
    private SecorConfig mConfig;
    private OffsetTracker mOffsetTracker;
    private FileRegistry mFileRegistry;
    private String mFileExtension;
    private CompressionCodec mCodec;
    private String mLocalPrefix;
    private final int mGeneration;

    public MessageWriter(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry) throws Exception {
        this.mConfig = config;
        this.mOffsetTracker = offsetTracker;
        this.mFileRegistry = fileRegistry;
        if (this.mConfig.getCompressionCodec() != null && !this.mConfig.getCompressionCodec().isEmpty()) {
            this.mCodec = CompressionUtil.createCompressionCodec(this.mConfig.getCompressionCodec());
            this.mFileExtension = this.mCodec.getDefaultExtension();
        }
        if (this.mConfig.getFileExtension() != null && !this.mConfig.getFileExtension().isEmpty()) {
            this.mFileExtension = this.mConfig.getFileExtension();
        } else if (this.mFileExtension == null) {
            this.mFileExtension = "";
        }
        this.mLocalPrefix = this.mConfig.getLocalPath() + '/' + IdUtil.getLocalMessageDir();
        this.mGeneration = this.mConfig.getGeneration();
    }

    public void adjustOffset(Message message) throws IOException {
        TopicPartition topicPartition = new TopicPartition(message.getTopic(), message.getKafkaPartition());
        long lastSeenOffset = this.mOffsetTracker.getLastSeenOffset(topicPartition);
        if (message.getOffset() != lastSeenOffset + 1L) {
            StatsUtil.incr("secor.consumer_rebalance_count." + topicPartition.getTopic());
            LOG.debug("offset of message {} does not follow sequentially the last seen offset {}. Deleting files in topic {} partition {}", new Object[]{message, lastSeenOffset, topicPartition.getTopic(), topicPartition.getPartition()});
            this.mFileRegistry.deleteTopicPartition(topicPartition);
        }
        this.mOffsetTracker.setLastSeenOffset(topicPartition, message.getOffset());
    }

    public void write(ParsedMessage message) throws Exception {
        TopicPartition topicPartition = new TopicPartition(message.getTopic(), message.getKafkaPartition());
        long offset = this.mOffsetTracker.getAdjustedCommittedOffsetCount(topicPartition);
        LogFilePath path = new LogFilePath(this.mLocalPrefix, this.mGeneration, offset, message, this.mFileExtension);
        FileWriter writer = this.mFileRegistry.getOrCreateWriter(path, this.mCodec);
        writer.write(new KeyValue(message.getOffset(), message.getKafkaKey(), message.getPayload()));
        LOG.debug("appended message {} to file {}.  File length {}", new Object[]{message, path, writer.getLength()});
    }
}

