/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.azure;

import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.TimeZone;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper;
import org.apache.hadoop.fs.azure.SelfRenewingLease;
import org.apache.hadoop.fs.azure.StorageInterface;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockBlobAppendStream
extends OutputStream {
    private final String key;
    private final int bufferSize;
    private ByteArrayOutputStream outBuffer;
    private final StorageInterface.CloudBlockBlobWrapper blob;
    private final OperationContext opContext;
    private boolean closed = false;
    private volatile boolean leaseFreed;
    private boolean initialized = false;
    private volatile IOException lastError = null;
    private final List<BlockEntry> uncommittedBlockEntries;
    private static final int UNSET_BLOCKS_COUNT = -1;
    private long nextBlockCount = -1L;
    private final Random sequenceGenerator = new Random();
    private static final int LEASE_RENEWAL_PERIOD = 10000;
    private static final int MAX_LEASE_RENEWAL_RETRY_COUNT = 3;
    private static final int LEASE_RENEWAL_RETRY_SLEEP_PERIOD = 500;
    public static final String APPEND_LEASE = "append_lease";
    public static final int APPEND_LEASE_TIMEOUT = 30000;
    public static final String APPEND_LEASE_LAST_MODIFIED = "append_lease_last_modified";
    private static final int MAX_BLOCK_UPLOAD_RETRIES = 3;
    private static final int BLOCK_UPLOAD_RETRY_INTERVAL = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(BlockBlobAppendStream.class);
    private static final int MAX_BLOCK_COUNT = 100000;
    private ThreadPoolExecutor ioThreadPool;
    private final AtomicInteger threadSequenceNumber;
    private static final String THREAD_ID_PREFIX = "BlockBlobAppendStream";
    private static final String UTC_STR = "UTC";

    public BlockBlobAppendStream(StorageInterface.CloudBlockBlobWrapper blob, String aKey, int bufferSize, OperationContext opContext) throws IOException {
        if (null == aKey || 0 == aKey.length()) {
            throw new IllegalArgumentException("Illegal argument: The key string is null or empty");
        }
        if (0 >= bufferSize) {
            throw new IllegalArgumentException("Illegal argument bufferSize cannot be zero or negative");
        }
        this.blob = blob;
        this.opContext = opContext;
        this.key = aKey;
        this.bufferSize = bufferSize;
        this.threadSequenceNumber = new AtomicInteger(0);
        this.setBlocksCount();
        this.outBuffer = new ByteArrayOutputStream(bufferSize);
        this.uncommittedBlockEntries = new ArrayList<BlockEntry>();
        try {
            if (!this.updateBlobAppendMetadata(true, false)) {
                LOG.error("Unable to set Append Lease on the Blob : {} Possibly because another client already has a create or append stream open on the Blob", (Object)this.key);
                throw new IOException("Unable to set Append lease on the Blob. Possibly because another client already had an append stream open on the Blob.");
            }
        }
        catch (StorageException ex) {
            LOG.error("Encountered Storage exception while acquiring append lease on blob : {}. Storage Exception : {} ErrorCode : {}", new Object[]{this.key, ex, ex.getErrorCode()});
            throw new IOException(ex);
        }
        this.leaseFreed = false;
    }

    public synchronized void initialize() {
        if (this.initialized) {
            return;
        }
        Thread appendLeaseRenewer = new Thread(new AppendRenewer());
        appendLeaseRenewer.setDaemon(true);
        appendLeaseRenewer.setName(String.format("%s-AppendLeaseRenewer", this.key));
        appendLeaseRenewer.start();
        this.ioThreadPool = new ThreadPoolExecutor(4, 4, 2L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new UploaderThreadFactory());
        this.initialized = true;
    }

    public String getKey() {
        return this.key;
    }

    public int getBufferSize() {
        return this.bufferSize;
    }

    @Override
    public void write(int byteVal) throws IOException {
        this.write(new byte[]{(byte)(byteVal & 0xFF)});
    }

    @Override
    public void write(byte[] data) throws IOException {
        this.write(data, 0, data.length);
    }

    @Override
    public void write(byte[] data, int offset, int length) throws IOException {
        if (offset < 0 || length < 0 || length > data.length - offset) {
            throw new IndexOutOfBoundsException("write API in append stream called with invalid arguments");
        }
        this.writeInternal(data, offset, length);
    }

    @Override
    public synchronized void close() throws IOException {
        if (!this.initialized) {
            throw new IOException("Trying to close an uninitialized Append stream");
        }
        if (this.closed) {
            return;
        }
        if (this.leaseFreed) {
            throw new IOException(String.format("Attempting to close an append stream on blob : %s  that does not have lease on the Blob. Failing close", this.key));
        }
        if (this.outBuffer.size() > 0) {
            this.uploadBlockToStorage(this.outBuffer.toByteArray());
        }
        this.ioThreadPool.shutdown();
        try {
            if (!this.ioThreadPool.awaitTermination(10L, TimeUnit.MINUTES)) {
                LOG.error("Time out occured while waiting for IO request to finish in append for blob : {}", (Object)this.key);
                NativeAzureFileSystemHelper.logAllLiveStackTraces();
                throw new IOException("Timed out waiting for IO requests to finish");
            }
        }
        catch (InterruptedException intrEx) {
            Thread.currentThread().interrupt();
            LOG.error("Upload block operation in append interrupted for blob {}. Failing close", (Object)this.key);
            throw new IOException("Append Commit interrupted.");
        }
        if (this.lastError == null) {
            this.commitAppendBlocks();
        }
        this.cleanup();
        if (this.lastError != null) {
            throw this.lastError;
        }
    }

    private synchronized void cleanup() {
        this.closed = true;
        try {
            this.updateBlobAppendMetadata(false, true);
        }
        catch (StorageException ex) {
            LOG.debug("Append metadata update on the Blob : {} encountered Storage Exception : {} Error Code : {}", new Object[]{this.key, ex, ex.getErrorCode()});
            this.lastError = new IOException(ex);
        }
        this.leaseFreed = true;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private synchronized void commitAppendBlocks() throws IOException {
        SelfRenewingLease lease = null;
        try {
            if (this.uncommittedBlockEntries.size() > 0) {
                lease = new SelfRenewingLease(this.blob);
                List<BlockEntry> blockEntries = this.blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), this.opContext);
                blockEntries.addAll(this.uncommittedBlockEntries);
                AccessCondition accessCondition = new AccessCondition();
                accessCondition.setLeaseID(lease.getLeaseID());
                this.blob.commitBlockList(blockEntries, accessCondition, new BlobRequestOptions(), this.opContext);
                this.uncommittedBlockEntries.clear();
            }
            if (lease == null) return;
        }
        catch (StorageException ex) {
            try {
                LOG.error("Storage exception encountered during block commit phase of append for blob : {} Storage Exception : {} Error Code: {}", new Object[]{this.key, ex, ex.getErrorCode()});
                throw new IOException("Encountered Exception while committing append blocks", ex);
            }
            catch (Throwable throwable) {
                if (lease == null) throw throwable;
                try {
                    lease.free();
                    throw throwable;
                }
                catch (StorageException ex2) {
                    LOG.debug("Exception encountered while releasing lease for blob : {} StorageException : {} ErrorCode : {}", new Object[]{this.key, ex2, ex2.getErrorCode()});
                }
                throw throwable;
            }
        }
        try {
            lease.free();
            return;
        }
        catch (StorageException ex) {
            LOG.debug("Exception encountered while releasing lease for blob : {} StorageException : {} ErrorCode : {}", new Object[]{this.key, ex, ex.getErrorCode()});
            return;
        }
    }

    private void setBlocksCount() throws IOException {
        try {
            if (this.nextBlockCount == -1L) {
                this.nextBlockCount = (long)this.sequenceGenerator.nextInt(Integer.MAX_VALUE) + (long)this.sequenceGenerator.nextInt(2147383647);
                List<BlockEntry> blockEntries = this.blob.downloadBlockList(BlockListingFilter.COMMITTED, new BlobRequestOptions(), this.opContext);
                this.nextBlockCount += (long)blockEntries.size();
            }
        }
        catch (StorageException ex) {
            LOG.debug("Encountered storage exception during setting next Block Count. StorageException : {} ErrorCode : {}", (Object)ex, (Object)ex.getErrorCode());
            throw new IOException(ex);
        }
    }

    private String generateBlockId() throws IOException {
        if (this.nextBlockCount == -1L) {
            throw new IOException("Append Stream in invalid state. nextBlockCount not set correctly");
        }
        byte[] blockIdInBytes = BlockBlobAppendStream.getBytesFromLong(this.nextBlockCount);
        return new String(Base64.encodeBase64(blockIdInBytes), StandardCharsets.UTF_8);
    }

    private static byte[] getBytesFromLong(long value) {
        byte[] tempArray = new byte[8];
        for (int m = 0; m < 8; ++m) {
            tempArray[7 - m] = (byte)(value >> 8 * m & 0xFFL);
        }
        return tempArray;
    }

    private synchronized void uploadBlockToStorage(byte[] payload) throws IOException {
        ++this.nextBlockCount;
        String blockId = this.generateBlockId();
        this.uncommittedBlockEntries.add(new BlockEntry(blockId));
        this.ioThreadPool.execute(new WriteRequest(payload, blockId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled aggressive exception aggregation
     */
    private boolean updateBlobAppendMetadata(boolean holdLease, boolean testCondition) throws StorageException {
        SelfRenewingLease lease = null;
        StorageException lastStorageException = null;
        int leaseRenewalRetryCount = 0;
        while (leaseRenewalRetryCount < 3) {
            lastStorageException = null;
            BlockBlobAppendStream blockBlobAppendStream = this;
            synchronized (blockBlobAppendStream) {
                block32: {
                    boolean bl;
                    block31: {
                        HashMap<String, String> metadata;
                        long currentTime;
                        block29: {
                            boolean bl2;
                            block30: {
                                Calendar currentCalendar = Calendar.getInstance(Locale.US);
                                currentCalendar.setTimeZone(TimeZone.getTimeZone(UTC_STR));
                                currentTime = currentCalendar.getTime().getTime();
                                lease = new SelfRenewingLease(this.blob);
                                this.blob.downloadAttributes(this.opContext);
                                metadata = this.blob.getMetadata();
                                if (!metadata.containsKey(APPEND_LEASE) || currentTime - Long.parseLong(metadata.get(APPEND_LEASE_LAST_MODIFIED)) > 30000L || metadata.get(APPEND_LEASE).equals(Boolean.toString(testCondition))) break block29;
                                bl2 = false;
                                if (lease == null) break block30;
                                try {
                                    try {
                                        lease.free();
                                        lease = null;
                                    }
                                    catch (StorageException ex) {
                                        LOG.debug("Encountered Storage exception while releasing lease for Blob {} during Append  metadata operation. Storage Exception {} Error Code : {} ", new Object[]{this.key, ex, ex.getErrorCode()});
                                        lease = null;
                                    }
                                }
                                catch (Throwable throwable) {
                                    lease = null;
                                    throw throwable;
                                }
                            }
                            return bl2;
                        }
                        metadata.put(APPEND_LEASE, Boolean.toString(holdLease));
                        metadata.put(APPEND_LEASE_LAST_MODIFIED, Long.toString(currentTime));
                        this.blob.setMetadata(metadata);
                        AccessCondition accessCondition = new AccessCondition();
                        accessCondition.setLeaseID(lease.getLeaseID());
                        this.blob.uploadMetadata(accessCondition, null, this.opContext);
                        bl = true;
                        if (lease == null) break block31;
                        try {
                            try {
                                lease.free();
                                lease = null;
                            }
                            catch (StorageException ex) {
                                LOG.debug("Encountered Storage exception while releasing lease for Blob {} during Append  metadata operation. Storage Exception {} Error Code : {} ", new Object[]{this.key, ex, ex.getErrorCode()});
                                lease = null;
                            }
                        }
                        catch (Throwable throwable) {
                            lease = null;
                            throw throwable;
                        }
                    }
                    return bl;
                    catch (StorageException ex) {
                        try {
                            lastStorageException = ex;
                            LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} Error Code : {}", new Object[]{this.key, ex, ex.getErrorCode()});
                            ++leaseRenewalRetryCount;
                            if (lease == null) break block32;
                        }
                        catch (Throwable throwable) {
                            if (lease != null) {
                                try {
                                    try {
                                        lease.free();
                                        lease = null;
                                    }
                                    catch (StorageException ex2) {
                                        LOG.debug("Encountered Storage exception while releasing lease for Blob {} during Append  metadata operation. Storage Exception {} Error Code : {} ", new Object[]{this.key, ex2, ex2.getErrorCode()});
                                        lease = null;
                                    }
                                }
                                catch (Throwable throwable2) {
                                    lease = null;
                                    throw throwable2;
                                }
                            }
                            throw throwable;
                        }
                        try {
                            try {
                                lease.free();
                                lease = null;
                            }
                            catch (StorageException ex3) {
                                LOG.debug("Encountered Storage exception while releasing lease for Blob {} during Append  metadata operation. Storage Exception {} Error Code : {} ", new Object[]{this.key, ex3, ex3.getErrorCode()});
                                lease = null;
                            }
                        }
                        catch (Throwable throwable) {
                            lease = null;
                            throw throwable;
                        }
                    }
                }
            }
            if (leaseRenewalRetryCount == 3) {
                throw lastStorageException;
            }
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException ex) {
                LOG.debug("Blob append metadata updated method interrupted");
                Thread.currentThread().interrupt();
            }
        }
        return false;
    }

    private synchronized void writeInternal(byte[] data, int offset, int length) throws IOException {
        if (!this.initialized) {
            throw new IOException("Trying to write to an un-initialized Append stream");
        }
        if (this.closed) {
            throw new IOException("Stream is closed!");
        }
        if (this.leaseFreed) {
            throw new IOException(String.format("Write called on a append stream not holding lease. Failing Write", new Object[0]));
        }
        byte[] currentData = new byte[length];
        System.arraycopy(data, offset, currentData, 0, length);
        while (this.outBuffer.size() + currentData.length > this.bufferSize) {
            byte[] payload = new byte[this.bufferSize];
            System.arraycopy(this.outBuffer.toByteArray(), 0, payload, 0, this.outBuffer.size());
            int availableSpaceInPayload = this.bufferSize - this.outBuffer.size();
            System.arraycopy(currentData, 0, payload, this.outBuffer.size(), availableSpaceInPayload);
            this.uploadBlockToStorage(payload);
            byte[] tempBuffer = new byte[currentData.length - availableSpaceInPayload];
            System.arraycopy(currentData, availableSpaceInPayload, tempBuffer, 0, currentData.length - availableSpaceInPayload);
            currentData = tempBuffer;
            this.outBuffer = new ByteArrayOutputStream(this.bufferSize);
        }
        this.outBuffer.write(currentData);
    }

    private class AppendRenewer
    implements Runnable {
        private AppendRenewer() {
        }

        @Override
        public void run() {
            while (!BlockBlobAppendStream.this.leaseFreed) {
                try {
                    Thread.sleep(10000L);
                }
                catch (InterruptedException ie) {
                    LOG.debug("Appender Renewer thread interrupted");
                    Thread.currentThread().interrupt();
                }
                Log.debug((String)"Attempting to renew append lease on {}", (Object)BlockBlobAppendStream.this.key);
                try {
                    if (BlockBlobAppendStream.this.leaseFreed || BlockBlobAppendStream.this.updateBlobAppendMetadata(true, true)) continue;
                    LOG.error("Unable to re-acquire append lease on the Blob {} ", (Object)BlockBlobAppendStream.this.key);
                    BlockBlobAppendStream.this.leaseFreed = true;
                }
                catch (StorageException ex) {
                    LOG.debug("Lease renewal for Blob : {} encountered Storage Exception : {} Error Code : {}", new Object[]{BlockBlobAppendStream.this.key, ex, ex.getErrorCode()});
                    BlockBlobAppendStream.this.leaseFreed = true;
                }
            }
        }
    }

    class UploaderThreadFactory
    implements ThreadFactory {
        UploaderThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName(String.format("%s-%s-%d", BlockBlobAppendStream.THREAD_ID_PREFIX, BlockBlobAppendStream.this.key, BlockBlobAppendStream.this.threadSequenceNumber.getAndIncrement()));
            return t;
        }
    }

    private class WriteRequest
    implements Runnable {
        private final byte[] dataPayload;
        private final String blockId;

        public WriteRequest(byte[] dataPayload, String blockId) {
            this.dataPayload = dataPayload;
            this.blockId = blockId;
        }

        @Override
        public void run() {
            int uploadRetryAttempts;
            IOException lastLocalException = null;
            for (uploadRetryAttempts = 0; uploadRetryAttempts < 3; ++uploadRetryAttempts) {
                try {
                    BlockBlobAppendStream.this.blob.uploadBlock(this.blockId, new ByteArrayInputStream(this.dataPayload), this.dataPayload.length, new BlobRequestOptions(), BlockBlobAppendStream.this.opContext);
                }
                catch (Exception ioe) {
                    Log.debug((String)"Encountered exception during uploading block for Blob : {} Exception : {}", (Object)BlockBlobAppendStream.this.key, (Object)ioe);
                    lastLocalException = new IOException("Encountered Exception while uploading block", ioe);
                    try {
                        Thread.sleep(1000L);
                        continue;
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
                break;
            }
            if (uploadRetryAttempts == 3) {
                BlockBlobAppendStream.this.lastError = lastLocalException;
            }
        }
    }
}

