/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.elephantbird.mapreduce.input;

import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class LzoRecordReader<K, V>
extends RecordReader<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(LzoRecordReader.class);
    public static final String BAD_RECORD_THRESHOLD_CONF_KEY = "elephantbird.mapred.input.bad.record.threshold";
    protected long start_;
    protected long pos_;
    protected long end_;
    private FSDataInputStream fileIn_;
    protected InputErrorTracker errorTracker;

    public float getProgress() {
        if (this.start_ == this.end_) {
            return 0.0f;
        }
        return Math.min(1.0f, (float)(this.pos_ - this.start_) / (float)(this.end_ - this.start_));
    }

    public synchronized long getPos() throws IOException {
        return this.pos_;
    }

    public long getLzoFilePos() throws IOException {
        return this.fileIn_.getPos();
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        FileSplit fileSplit = (FileSplit)inputSplit;
        this.start_ = fileSplit.getStart();
        this.end_ = this.start_ + fileSplit.getLength();
        Path path = fileSplit.getPath();
        Configuration configuration = taskAttemptContext.getConfiguration();
        this.errorTracker = new InputErrorTracker(configuration);
        LOG.info("input split: " + path + " " + this.start_ + ":" + this.end_);
        FileSystem fileSystem = path.getFileSystem(configuration);
        CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(configuration);
        CompressionCodec compressionCodec = compressionCodecFactory.getCodec(path);
        if (compressionCodec == null) {
            throw new IOException("No codec for file " + path + " found, cannot run");
        }
        this.fileIn_ = fileSystem.open(fileSplit.getPath());
        this.createInputReader((InputStream)compressionCodec.createInputStream((InputStream)this.fileIn_), configuration);
        if (this.start_ != 0L) {
            this.fileIn_.seek(this.start_);
            this.skipToNextSyncPoint(false);
            this.start_ = this.fileIn_.getPos();
            LOG.info("Start is now " + this.start_);
        } else {
            this.skipToNextSyncPoint(true);
        }
        this.pos_ = this.start_;
    }

    protected abstract void createInputReader(InputStream var1, Configuration var2) throws IOException;

    protected abstract void skipToNextSyncPoint(boolean var1) throws IOException;

    static class InputErrorTracker {
        long numRecords;
        long numErrors;
        double errorThreshold;

        InputErrorTracker(Configuration configuration) {
            this.errorThreshold = configuration.getFloat(LzoRecordReader.BAD_RECORD_THRESHOLD_CONF_KEY, 1.0E-4f);
            this.numRecords = 0L;
            this.numErrors = 0L;
        }

        void incRecords() {
            ++this.numRecords;
        }

        void incErrors(Throwable throwable) {
            ++this.numErrors;
            if (this.numErrors > this.numRecords) {
                throw new RuntimeException("Forgot to invoke incRecords()?");
            }
            if (throwable == null) {
                throwable = new Exception("Unknown error");
            }
            if (this.errorThreshold <= 0.0) {
                throw new RuntimeException("error while reading input records", throwable);
            }
            LOG.warn("Error while reading an input record (" + this.numErrors + " out of " + this.numRecords + " so far ): ", throwable);
            double d = (double)this.numErrors / (double)this.numRecords;
            if (this.numErrors > 1L && d > this.errorThreshold) {
                LOG.error(this.numErrors + " out of " + this.numRecords + " crosses configured threshold (" + this.errorThreshold + ")");
                throw new RuntimeException("error rate while reading input records crossed threshold", throwable);
            }
        }
    }
}

