/*
 * Decompiled with CFR 0.152.
 */
package org.kitesdk.data.spi.filesystem;

import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetOperationException;
import org.kitesdk.data.DatasetRecordException;
import org.kitesdk.data.Flushable;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import org.kitesdk.data.Syncable;
import org.kitesdk.data.UnknownFormatException;
import org.kitesdk.data.ValidationException;
import org.kitesdk.data.spi.AbstractDatasetWriter;
import org.kitesdk.data.spi.DescriptorUtil;
import org.kitesdk.data.spi.ReaderWriterState;
import org.kitesdk.data.spi.RollingWriter;
import org.kitesdk.data.spi.filesystem.AvroAppender;
import org.kitesdk.data.spi.filesystem.CSVAppender;
import org.kitesdk.data.spi.filesystem.DurableParquetAppender;
import org.kitesdk.data.spi.filesystem.FileSystemUtil;
import org.kitesdk.data.spi.filesystem.ParquetAppender;
import org.kitesdk.shaded.com.google.common.annotations.VisibleForTesting;
import org.kitesdk.shaded.com.google.common.base.Preconditions;
import org.kitesdk.shaded.com.google.common.base.Throwables;
import org.kitesdk.shaded.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FileSystemWriter<E>
extends AbstractDatasetWriter<E>
implements RollingWriter {
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemWriter.class);
    private static final int MIN_RECORDS_BEFORE_ROLL_CHECK = 1000;
    private static final long MIN_SIZE_BEFORE_ROLL_CHECK = 5120L;
    private static final Set<Format> SUPPORTED_FORMATS = ((ImmutableSet.Builder)((ImmutableSet.Builder)ImmutableSet.builder().add(Formats.AVRO)).add(Formats.PARQUET)).build();
    private final Path directory;
    private final DatasetDescriptor descriptor;
    private final Schema schema;
    private long targetFileSize;
    private long rollIntervalMillis;
    private Path tempPath;
    private Path finalPath;
    private long count = 0L;
    private long nextRollCheck = 1000L;
    private long nextRollTime = Long.MAX_VALUE;
    protected final FileSystem fs;
    protected FileAppender<E> appender;
    protected boolean flushed = false;
    @VisibleForTesting
    ReaderWriterState state;
    @VisibleForTesting
    final Configuration conf;
    @VisibleForTesting
    final boolean useTempPath;

    static boolean isSupportedFormat(DatasetDescriptor descriptor) {
        Format format = descriptor.getFormat();
        return SUPPORTED_FORMATS.contains(format) || Formats.CSV.equals(format) && DescriptorUtil.isEnabled("kite.allow.csv", descriptor);
    }

    private FileSystemWriter(FileSystem fs, Path path, long rollIntervalMillis, long targetFileSize, DatasetDescriptor descriptor, Schema writerSchema) {
        Preconditions.checkNotNull(fs, "File system is not defined");
        Preconditions.checkNotNull(path, "Destination directory is not defined");
        Preconditions.checkNotNull(descriptor, "Descriptor is not defined");
        this.fs = fs;
        this.directory = path;
        this.rollIntervalMillis = rollIntervalMillis;
        this.targetFileSize = targetFileSize;
        this.descriptor = descriptor;
        this.conf = new Configuration(fs.getConf());
        this.state = ReaderWriterState.NEW;
        this.schema = writerSchema;
        for (String prop : descriptor.listProperties()) {
            this.conf.set(prop, descriptor.getProperty(prop));
        }
        this.useTempPath = FileSystemUtil.supportsRename(fs.getUri(), this.conf);
    }

    @Override
    public final void initialize() {
        Preconditions.checkState(this.state.equals((Object)ReaderWriterState.NEW), "Unable to open a writer from state:%s", new Object[]{this.state});
        ValidationException.check(FileSystemWriter.isSupportedFormat(this.descriptor), "Not a supported format: %s", this.descriptor.getFormat());
        try {
            this.fs.mkdirs(this.directory);
        }
        catch (RuntimeException e) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetOperationException(e, "Failed to create path %s", this.directory);
        }
        catch (IOException ex) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetIOException("Failed to create path " + this.directory, ex);
        }
        try {
            this.finalPath = new Path(this.directory, FileSystemWriter.uniqueFilename(this.descriptor.getFormat()));
            this.tempPath = this.useTempPath ? FileSystemWriter.tempFilename(this.finalPath) : this.finalPath;
        }
        catch (RuntimeException e) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetOperationException(e, "Failed to initialize file paths under %s", this.directory);
        }
        try {
            this.appender = this.newAppender(this.tempPath);
            this.appender.open();
        }
        catch (RuntimeException e) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetOperationException(e, "Failed to open appender %s", this.appender);
        }
        catch (IOException e) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetIOException("Failed to open appender " + this.appender, e);
        }
        this.count = 0L;
        this.nextRollCheck = 1000L;
        if (this.rollIntervalMillis > 0L) {
            this.nextRollTime = System.currentTimeMillis() + this.rollIntervalMillis;
        }
        LOG.info("Opened output appender {} for {}", this.appender, (Object)this.finalPath);
        this.state = ReaderWriterState.OPEN;
    }

    @Override
    public final void write(E entity) {
        Preconditions.checkState(this.state.equals((Object)ReaderWriterState.OPEN), "Attempt to write to a writer in state:%s", new Object[]{this.state});
        try {
            this.appender.append(entity);
            ++this.count;
            this.checkSizeBasedFileRoll();
        }
        catch (RuntimeException e) {
            Throwables.propagateIfInstanceOf(e, DatasetRecordException.class);
            this.state = ReaderWriterState.ERROR;
            throw new DatasetOperationException(e, "Failed to append %s to %s", entity, this.appender);
        }
        catch (IOException e) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetIOException("Failed to append " + entity + " to " + this.appender, e);
        }
    }

    @Override
    public final void close() {
        try {
            if (ReaderWriterState.NEW.equals((Object)this.state) || ReaderWriterState.CLOSED.equals((Object)this.state)) {
                return;
            }
            if (!ReaderWriterState.ERROR.equals((Object)this.state)) {
                try {
                    this.appender.close();
                }
                catch (RuntimeException e) {
                    throw new DatasetOperationException(e, "Failed to close appender %s", this.appender);
                }
                catch (IOException e) {
                    throw new DatasetIOException("Failed to close appender " + this.appender, e);
                }
            }
            if (this.count > 0L && (this.flushed || ReaderWriterState.OPEN.equals((Object)this.state))) {
                if (this.useTempPath) {
                    try {
                        if (!this.fs.rename(this.tempPath, this.finalPath)) {
                            throw new DatasetOperationException("Failed to move %s to %s", this.tempPath, this.finalPath);
                        }
                    }
                    catch (RuntimeException e) {
                        throw new DatasetOperationException(e, "Failed to commit %s", this.finalPath);
                    }
                    catch (IOException e) {
                        throw new DatasetIOException("Failed to commit " + this.finalPath, e);
                    }
                }
                LOG.debug("Committed {} for appender {} ({} entities)", new Object[]{this.finalPath, this.appender, this.count});
            } else {
                try {
                    if (!this.fs.delete(this.tempPath, true)) {
                        throw new DatasetOperationException("Failed to delete %s", this.tempPath);
                    }
                }
                catch (RuntimeException e) {
                    throw new DatasetOperationException(e, "Failed to remove temporary file %s", this.tempPath);
                }
                catch (IOException e) {
                    throw new DatasetIOException("Failed to remove temporary file " + this.tempPath, e);
                }
                LOG.debug("Discarded {} ({} entities)", this.tempPath, (Object)this.count);
            }
            try {
                this.appender.cleanup();
            }
            catch (IOException e) {
                throw new DatasetIOException("Failed to clean up " + this.appender, e);
            }
        }
        finally {
            this.state = ReaderWriterState.CLOSED;
        }
    }

    @Override
    public void setRollIntervalMillis(long rollIntervalMillis) {
        if (ReaderWriterState.OPEN == this.state) {
            long lastRollTime = this.nextRollTime - this.rollIntervalMillis;
            this.nextRollTime = lastRollTime + rollIntervalMillis;
        }
        this.rollIntervalMillis = rollIntervalMillis;
    }

    @Override
    public void setTargetFileSize(long targetSizeBytes) {
        this.targetFileSize = targetSizeBytes;
    }

    @Override
    public void tick() {
        if (ReaderWriterState.OPEN == this.state) {
            this.checkTimeBasedFileRoll();
        }
    }

    private void roll() {
        this.close();
        this.state = ReaderWriterState.NEW;
        this.initialize();
    }

    protected void checkSizeBasedFileRoll() throws IOException {
        if (this.targetFileSize > 0L && this.count >= this.nextRollCheck) {
            long pos = this.appender.pos();
            if (pos < 5120L) {
                this.nextRollCheck = this.count + 1000L;
                return;
            }
            double recordSizeEstimate = (double)pos / (double)this.count;
            long recordsLeft = (long)((double)(this.targetFileSize - pos) / recordSizeEstimate);
            if (pos < this.targetFileSize && recordsLeft > 10L) {
                this.nextRollCheck = this.count + Math.max(1000L, recordsLeft / 2L);
            } else {
                this.nextRollCheck /= 2L;
                this.roll();
            }
        }
    }

    private void checkTimeBasedFileRoll() {
        long now = System.currentTimeMillis();
        if (now >= this.nextRollTime) {
            this.roll();
        }
    }

    @Override
    public final boolean isOpen() {
        return this.state.equals((Object)ReaderWriterState.OPEN);
    }

    private static String uniqueFilename(Format format) {
        return UUID.randomUUID() + "." + format.getExtension();
    }

    private static Path tempFilename(Path location) {
        return new Path(location.getParent(), "." + location.getName() + ".tmp");
    }

    @VisibleForTesting
    <E> FileAppender<E> newAppender(Path temp) {
        Format format = this.descriptor.getFormat();
        if (Formats.PARQUET.equals(format)) {
            if (DescriptorUtil.isDisabled("kite.parquet.non-durable-writes", this.descriptor)) {
                return new DurableParquetAppender(this.fs, temp, this.schema, this.conf, this.descriptor.getCompressionType());
            }
            return new ParquetAppender(this.fs, temp, this.schema, this.conf, this.descriptor.getCompressionType());
        }
        if (Formats.AVRO.equals(format)) {
            return new AvroAppender(this.fs, temp, this.schema, this.descriptor.getCompressionType());
        }
        if (Formats.CSV.equals(format) && DescriptorUtil.isEnabled("kite.allow.csv", this.descriptor)) {
            return new CSVAppender(this.fs, temp, this.descriptor);
        }
        this.state = ReaderWriterState.ERROR;
        throw new UnknownFormatException("Unknown format " + this.descriptor);
    }

    static <E> FileSystemWriter<E> newWriter(FileSystem fs, Path path, long rollIntervalMillis, long targetFileSize, DatasetDescriptor descriptor, Schema writerSchema) {
        Format format = descriptor.getFormat();
        if (Formats.PARQUET.equals(format)) {
            if (DescriptorUtil.isDisabled("kite.parquet.non-durable-writes", descriptor)) {
                return new IncrementalWriter(fs, path, rollIntervalMillis, targetFileSize, descriptor, writerSchema);
            }
            return new FileSystemWriter<E>(fs, path, rollIntervalMillis, targetFileSize, descriptor, writerSchema);
        }
        if (Formats.AVRO.equals(format) || Formats.CSV.equals(format)) {
            return new IncrementalWriter(fs, path, rollIntervalMillis, targetFileSize, descriptor, writerSchema);
        }
        return new FileSystemWriter<E>(fs, path, rollIntervalMillis, targetFileSize, descriptor, writerSchema);
    }

    public RecordWriter<E, Void> asRecordWriter() {
        return new KiteRecordWriter();
    }

    private class KiteRecordWriter
    extends RecordWriter<E, Void> {
        private KiteRecordWriter() {
        }

        public void write(E e, Void aVoid) throws IOException, InterruptedException {
            FileSystemWriter.this.write(e);
        }

        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            FileSystemWriter.this.close();
        }
    }

    static class IncrementalWriter<E>
    extends FileSystemWriter<E>
    implements Flushable,
    Syncable {
        private IncrementalWriter(FileSystem fs, Path path, long rollIntervalMillis, long targetFileSize, DatasetDescriptor descriptor, Schema writerSchema) {
            super(fs, path, rollIntervalMillis, targetFileSize, descriptor, writerSchema);
        }

        @Override
        public void flush() {
            Preconditions.checkState(this.isOpen(), "Attempt to flush a writer in state:%s", new Object[]{this.state});
            try {
                this.appender.flush();
                this.flushed = true;
                this.checkSizeBasedFileRoll();
            }
            catch (RuntimeException e) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetOperationException(e, "Failed to flush appender %s", this.appender);
            }
            catch (IOException e) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetIOException("Failed to flush appender " + this.appender, e);
            }
        }

        @Override
        public void sync() {
            Preconditions.checkState(this.isOpen(), "Attempt to sync a writer in state:%s", new Object[]{this.state});
            try {
                this.appender.sync();
                this.flushed = true;
            }
            catch (RuntimeException e) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetOperationException(e, "Failed to sync appender %s", this.appender);
            }
            catch (IOException e) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetIOException("Failed to sync appender " + this.appender, e);
            }
        }
    }

    static interface FileAppender<E>
    extends java.io.Flushable,
    Closeable {
        public void open() throws IOException;

        public void append(E var1) throws IOException;

        public long pos() throws IOException;

        public void sync() throws IOException;

        public void cleanup() throws IOException;
    }
}

