/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.data.input.impl;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hive.druid.io.druid.data.input.Firehose;
import org.apache.hive.druid.io.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import org.apache.hive.druid.io.druid.data.input.impl.FileIteratingFirehose;
import org.apache.hive.druid.io.druid.data.input.impl.StringInputRowParser;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.logger.Logger;

public abstract class PrefetchableTextFilesFirehoseFactory<ObjectType>
extends AbstractTextFilesFirehoseFactory<ObjectType> {
    private static final Logger LOG = new Logger(PrefetchableTextFilesFirehoseFactory.class);
    private static final long DEFAULT_MAX_CACHE_CAPACITY_BYTES = 0x40000000L;
    private static final long DEFAULT_MAX_FETCH_CAPACITY_BYTES = 0x40000000L;
    private static final long DEFAULT_FETCH_TIMEOUT = 60000L;
    private static final int DEFAULT_MAX_FETCH_RETRY = 3;
    private static final String FETCH_FILE_PREFIX = "fetch-";
    private final long maxCacheCapacityBytes;
    private final long maxFetchCapacityBytes;
    private final long prefetchTriggerBytes;
    private final long fetchTimeout;
    private final int maxFetchRetry;
    private final List<FetchedFile> cacheFiles = new ArrayList<FetchedFile>();
    private long totalCachedBytes;
    private List<ObjectType> objects;

    private static ExecutorService createFetchExecutor() {
        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("firehose_fetch_%d").build());
    }

    public PrefetchableTextFilesFirehoseFactory(Long maxCacheCapacityBytes, Long maxFetchCapacityBytes, Long prefetchTriggerBytes, Long fetchTimeout, Integer maxFetchRetry) {
        this.maxCacheCapacityBytes = maxCacheCapacityBytes == null ? 0x40000000L : maxCacheCapacityBytes;
        this.maxFetchCapacityBytes = maxFetchCapacityBytes == null ? 0x40000000L : maxFetchCapacityBytes;
        this.prefetchTriggerBytes = prefetchTriggerBytes == null ? this.maxFetchCapacityBytes / 2L : prefetchTriggerBytes;
        this.fetchTimeout = fetchTimeout == null ? 60000L : fetchTimeout;
        this.maxFetchRetry = maxFetchRetry == null ? 3 : maxFetchRetry;
    }

    @Override
    public Firehose connect(StringInputRowParser firehoseParser, final File temporaryDirectory) throws IOException {
        if (this.maxCacheCapacityBytes == 0L && this.maxFetchCapacityBytes == 0L) {
            return super.connect(firehoseParser, temporaryDirectory);
        }
        if (this.objects == null) {
            this.objects = ImmutableList.copyOf(Preconditions.checkNotNull(this.initObjects(), "objects"));
        }
        Preconditions.checkState(temporaryDirectory.exists(), "temporaryDirectory[%s] does not exist", temporaryDirectory);
        Preconditions.checkState(temporaryDirectory.isDirectory(), "temporaryDirectory[%s] is not a directory", temporaryDirectory);
        final ExecutorService fetchExecutor = PrefetchableTextFilesFirehoseFactory.createFetchExecutor();
        return new FileIteratingFirehose(new Iterator<LineIterator>(){
            private final Object fetchLock = new Object();
            private final LinkedBlockingQueue<FetchedFile> fetchFiles = new LinkedBlockingQueue();
            private final AtomicLong fetchedBytes = new AtomicLong(0L);
            private final boolean cacheInitialized = PrefetchableTextFilesFirehoseFactory.access$000(PrefetchableTextFilesFirehoseFactory.this) > 0L;
            private final boolean prefetchEnabled;
            private Future<Void> fetchFuture;
            private int cacheIterateIndex;
            private int nextFetchIndex;
            {
                boolean bl = this.prefetchEnabled = PrefetchableTextFilesFirehoseFactory.this.maxFetchCapacityBytes > 0L;
                if (this.cacheInitialized) {
                    this.nextFetchIndex = PrefetchableTextFilesFirehoseFactory.this.cacheFiles.size();
                }
                if (this.prefetchEnabled) {
                    this.fetchIfNeeded(PrefetchableTextFilesFirehoseFactory.this.totalCachedBytes);
                }
            }

            private void fetchIfNeeded(long remainingBytes) {
                if ((this.fetchFuture == null || this.fetchFuture.isDone()) && remainingBytes <= PrefetchableTextFilesFirehoseFactory.this.prefetchTriggerBytes) {
                    this.fetchFuture = fetchExecutor.submit(() -> {
                        this.fetch();
                        return null;
                    });
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void fetch() throws Exception {
                for (int i = this.nextFetchIndex; i < PrefetchableTextFilesFirehoseFactory.this.objects.size() && this.fetchedBytes.get() <= PrefetchableTextFilesFirehoseFactory.this.maxFetchCapacityBytes; ++i) {
                    Object object = PrefetchableTextFilesFirehoseFactory.this.objects.get(i);
                    LOG.info("Fetching object[%s], fetchedBytes[%d]", object, this.fetchedBytes.get());
                    File outFile = File.createTempFile(PrefetchableTextFilesFirehoseFactory.FETCH_FILE_PREFIX, null, temporaryDirectory);
                    this.fetchedBytes.addAndGet(this.download(object, outFile, 0));
                    Object object2 = this.fetchLock;
                    synchronized (object2) {
                        this.fetchFiles.put(new FetchedFile(object, outFile));
                        ++this.nextFetchIndex;
                        continue;
                    }
                }
            }

            /*
             * Exception decompiling
             */
            private long download(ObjectType object, File outFile, int tryCount) throws IOException {
                /*
                 * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
                 * 
                 * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
                 *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
                 *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
                 *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
                 *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
                 *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
                 *     at org.benf.cfr.reader.Main.main(Main.java:54)
                 */
                throw new IllegalStateException("Decompilation failed");
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean hasNext() {
                Object object = this.fetchLock;
                synchronized (object) {
                    return this.cacheInitialized && this.cacheIterateIndex < PrefetchableTextFilesFirehoseFactory.this.cacheFiles.size() || !this.fetchFiles.isEmpty() || this.nextFetchIndex < PrefetchableTextFilesFirehoseFactory.this.objects.size();
                }
            }

            @Override
            public LineIterator next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException();
                }
                this.checkFetchException();
                try {
                    OpenedObject openedObject;
                    if (this.cacheInitialized && this.cacheIterateIndex < PrefetchableTextFilesFirehoseFactory.this.cacheFiles.size()) {
                        FetchedFile fetchedFile = (FetchedFile)PrefetchableTextFilesFirehoseFactory.this.cacheFiles.get(this.cacheIterateIndex++);
                        openedObject = new OpenedObject(fetchedFile, PrefetchableTextFilesFirehoseFactory.this.getNoopCloser());
                    } else {
                        openedObject = this.prefetchEnabled ? this.openObjectFromLocal() : this.openObjectFromRemote();
                    }
                    InputStream stream = PrefetchableTextFilesFirehoseFactory.this.wrapObjectStream(openedObject.object, openedObject.objectStream);
                    return new ResourceCloseableLineIterator(new InputStreamReader(stream, Charsets.UTF_8), openedObject.resourceCloser);
                }
                catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }

            private void checkFetchException() {
                if (this.fetchFuture != null && this.fetchFuture.isDone()) {
                    try {
                        this.fetchFuture.get();
                        this.fetchFuture = null;
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw Throwables.propagate(e);
                    }
                }
            }

            private OpenedObject openObjectFromLocal() throws IOException {
                Closeable resourceCloser;
                FetchedFile fetchedFile;
                if (!this.fetchFiles.isEmpty()) {
                    fetchedFile = this.fetchFiles.poll();
                    resourceCloser = PrefetchableTextFilesFirehoseFactory.this.cacheIfPossibleAndGetCloser(fetchedFile, this.fetchedBytes);
                    this.fetchIfNeeded(this.fetchedBytes.get());
                } else {
                    try {
                        this.fetchIfNeeded(this.fetchedBytes.get());
                        fetchedFile = this.fetchFiles.poll(PrefetchableTextFilesFirehoseFactory.this.fetchTimeout, TimeUnit.MILLISECONDS);
                        if (fetchedFile == null) {
                            this.checkFetchException();
                            throw new RuntimeException(new TimeoutException());
                        }
                        resourceCloser = PrefetchableTextFilesFirehoseFactory.this.cacheIfPossibleAndGetCloser(fetchedFile, this.fetchedBytes);
                        this.fetchIfNeeded(this.fetchedBytes.get());
                    }
                    catch (InterruptedException e) {
                        throw Throwables.propagate(e);
                    }
                }
                return new OpenedObject(fetchedFile, resourceCloser);
            }

            private OpenedObject openObjectFromRemote() throws IOException {
                OpenedObject openedObject;
                Closeable resourceCloser = PrefetchableTextFilesFirehoseFactory.this.getNoopCloser();
                if (PrefetchableTextFilesFirehoseFactory.this.totalCachedBytes < PrefetchableTextFilesFirehoseFactory.this.maxCacheCapacityBytes) {
                    LOG.info("Caching object[%s]", PrefetchableTextFilesFirehoseFactory.this.objects.get(this.nextFetchIndex));
                    try {
                        this.fetch();
                        FetchedFile fetchedFile = this.fetchFiles.poll();
                        if (fetchedFile == null) {
                            throw new ISE("Cannot fetch object[%s]", PrefetchableTextFilesFirehoseFactory.this.objects.get(this.nextFetchIndex));
                        }
                        PrefetchableTextFilesFirehoseFactory.this.cacheIfPossible(fetchedFile);
                        this.fetchedBytes.addAndGet(-fetchedFile.length());
                        openedObject = new OpenedObject(fetchedFile, resourceCloser);
                    }
                    catch (Exception e) {
                        throw Throwables.propagate(e);
                    }
                } else {
                    Object object = PrefetchableTextFilesFirehoseFactory.this.objects.get(this.nextFetchIndex++);
                    LOG.info("Reading object[%s]", object);
                    openedObject = new OpenedObject(object, PrefetchableTextFilesFirehoseFactory.this.openObjectStream(object), resourceCloser);
                }
                return openedObject;
            }
        }, firehoseParser, () -> {
            fetchExecutor.shutdownNow();
            try {
                Preconditions.checkState(fetchExecutor.awaitTermination(this.fetchTimeout, TimeUnit.MILLISECONDS));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ISE("Failed to shutdown fetch executor during close", new Object[0]);
            }
        });
    }

    private boolean cacheIfPossible(FetchedFile fetchedFile) {
        if (this.totalCachedBytes < this.maxCacheCapacityBytes) {
            this.cacheFiles.add(fetchedFile);
            this.totalCachedBytes += fetchedFile.length();
            return true;
        }
        return false;
    }

    private Closeable cacheIfPossibleAndGetCloser(FetchedFile fetchedFile, AtomicLong fetchedBytes) {
        Closeable closeable;
        if (this.cacheIfPossible(fetchedFile)) {
            closeable = this.getNoopCloser();
            fetchedBytes.addAndGet(-fetchedFile.length());
        } else {
            closeable = this.getFetchedFileCloser(fetchedFile, fetchedBytes);
        }
        return closeable;
    }

    private Closeable getNoopCloser() {
        return () -> {};
    }

    private Closeable getFetchedFileCloser(FetchedFile fetchedFile, AtomicLong fetchedBytes) {
        return () -> {
            long fileSize = fetchedFile.length();
            fetchedFile.delete();
            fetchedBytes.addAndGet(-fileSize);
        };
    }

    static /* synthetic */ int access$600(PrefetchableTextFilesFirehoseFactory x0) {
        return x0.maxFetchRetry;
    }

    private class OpenedObject {
        private final ObjectType object;
        private final InputStream objectStream;
        private final Closeable resourceCloser;

        public OpenedObject(FetchedFile fetchedFile, Closeable resourceCloser) throws IOException {
            this.object = fetchedFile.object;
            this.objectStream = FileUtils.openInputStream((File)fetchedFile.file);
            this.resourceCloser = resourceCloser;
        }

        public OpenedObject(ObjectType object, InputStream objectStream, Closeable resourceCloser) {
            this.object = object;
            this.objectStream = objectStream;
            this.resourceCloser = resourceCloser;
        }
    }

    private class FetchedFile {
        private final ObjectType object;
        private final File file;

        public FetchedFile(ObjectType object, File file) {
            this.object = object;
            this.file = file;
        }

        public long length() {
            return this.file.length();
        }

        public void delete() {
            this.file.delete();
        }
    }

    static class ResourceCloseableLineIterator
    extends LineIterator {
        private final Closeable resourceCloser;

        public ResourceCloseableLineIterator(Reader reader, Closeable resourceCloser) throws IllegalArgumentException {
            super(reader);
            this.resourceCloser = resourceCloser;
        }

        public void close() {
            super.close();
            try {
                this.resourceCloser.close();
            }
            catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }
    }
}

