/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationThrottler;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationWALReaderManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;

@InterfaceAudience.Private
public class ReplicationSource
extends Thread
implements ReplicationSourceInterface {
    public static final Log LOG = LogFactory.getLog(ReplicationSource.class);
    private PriorityBlockingQueue<Path> queue;
    private ReplicationQueues replicationQueues;
    private ReplicationPeers replicationPeers;
    private Configuration conf;
    private ReplicationQueueInfo replicationQueueInfo;
    private String peerId;
    private ReplicationSourceManager manager;
    private Stoppable stopper;
    private long sleepForRetries;
    private long replicationQueueSizeCapacity;
    private int replicationQueueNbCapacity;
    private WAL.Reader reader;
    private long lastLoggedPosition = -1L;
    private volatile Path currentPath;
    private FileSystem fs;
    private UUID clusterId;
    private UUID peerClusterId;
    private long totalReplicatedEdits = 0L;
    private long totalReplicatedOperations = 0L;
    private String peerClusterZnode;
    private int maxRetriesMultiplier;
    private int currentNbOperations = 0;
    private int currentSize = 0;
    private long currentNbHFiles = 0L;
    private volatile boolean running = true;
    private MetricsSource metrics;
    private ReplicationWALReaderManager repLogReader;
    private int logQueueWarnThreshold;
    private ReplicationEndpoint replicationEndpoint;
    private WALEntryFilter walEntryFilter;
    private ReplicationThrottler throttler;

    @Override
    public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, MetricsSource metrics) throws IOException {
        this.stopper = stopper;
        this.conf = HBaseConfiguration.create(conf);
        this.decorateConf();
        this.replicationQueueSizeCapacity = this.conf.getLong("replication.source.size.capacity", 0x4000000L);
        this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.queue = new PriorityBlockingQueue<Path>(this.conf.getInt("hbase.regionserver.maxlogs", 32), new LogsComparator());
        long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0L);
        this.throttler = new ReplicationThrottler((double)bandwidth / 10.0);
        this.replicationQueues = replicationQueues;
        this.replicationPeers = replicationPeers;
        this.manager = manager;
        this.fs = fs;
        this.metrics = metrics;
        this.repLogReader = new ReplicationWALReaderManager(this.fs, this.conf);
        this.clusterId = clusterId;
        this.peerClusterZnode = peerClusterZnode;
        this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
        this.peerId = this.replicationQueueInfo.getPeerId();
        this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
        this.replicationEndpoint = replicationEndpoint;
    }

    private void decorateConf() {
        String replicationCodec = this.conf.get("hbase.replication.rpc.codec");
        if (StringUtils.isNotEmpty(replicationCodec)) {
            this.conf.set("hbase.client.rpc.codec", replicationCodec);
        }
    }

    @Override
    public void enqueueLog(Path log) {
        this.queue.put(log);
        int queueSize = this.queue.size();
        this.metrics.setSizeOfLogQueue(queueSize);
        if (queueSize > this.logQueueWarnThreshold) {
            LOG.warn("Queue size: " + queueSize + " exceeds value of replication.source.log.queue.warn: " + this.logQueueWarnThreshold);
        }
    }

    @Override
    public void addHFileRefs(TableName tableName, byte[] family, List<String> files) throws ReplicationException {
        Map<TableName, List<String>> tableCFMap;
        String peerId = this.peerClusterZnode;
        if (peerId.contains("-")) {
            peerId = this.peerClusterZnode.split("-")[0];
        }
        if ((tableCFMap = this.replicationPeers.getPeer(peerId).getTableCFs()) != null) {
            List<String> tableCfs = tableCFMap.get(tableName);
            if (tableCFMap.containsKey(tableName) && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
                this.replicationQueues.addHFileRefs(peerId, files);
                this.metrics.incrSizeOfHFileRefsQueue(files.size());
            } else {
                LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family " + Bytes.toString(family) + " to peer id " + peerId);
            }
        } else {
            this.replicationQueues.addHFileRefs(peerId, files);
            this.metrics.incrSizeOfHFileRefsQueue(files.size());
        }
    }

    private void uninitialize() {
        LOG.debug("Source exiting " + this.peerId);
        this.metrics.clear();
        if (this.replicationEndpoint.state() == Service.State.STARTING || this.replicationEndpoint.state() == Service.State.RUNNING) {
            this.replicationEndpoint.stopAndWait();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        if (!this.isActive()) {
            this.uninitialize();
            return;
        }
        try {
            Service.State state = (Service.State)((Object)this.replicationEndpoint.start().get());
            if (state != Service.State.RUNNING) {
                LOG.warn("ReplicationEndpoint was not started. Exiting");
                this.uninitialize();
                return;
            }
        }
        catch (Exception ex) {
            LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
            throw new RuntimeException(ex);
        }
        ArrayList<WALEntryFilter> filters = Lists.newArrayList(new SystemTableWALEntryFilter());
        WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
        if (filterFromEndpoint != null) {
            filters.add(filterFromEndpoint);
        }
        this.walEntryFilter = new ChainWALEntryFilter(filters);
        int sleepMultiplier = 1;
        while (this.isActive() && this.peerClusterId == null) {
            this.peerClusterId = this.replicationEndpoint.getPeerUUID();
            if (!this.isActive() || this.peerClusterId != null || !this.sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) continue;
            ++sleepMultiplier;
        }
        if (!this.isActive()) {
            this.uninitialize();
            return;
        }
        sleepMultiplier = 1;
        if (this.clusterId.equals(this.peerClusterId) && !this.replicationEndpoint.canReplicateToSameCluster()) {
            this.terminate("ClusterId " + this.clusterId + " is replicating to itself: peerClusterId " + this.peerClusterId + " which is not allowed by ReplicationEndpoint:" + this.replicationEndpoint.getClass().getName(), null, false);
        }
        LOG.info("Replicating " + this.clusterId + " -> " + this.peerClusterId);
        if (this.replicationQueueInfo.isQueueRecovered()) {
            try {
                this.repLogReader.setPosition(this.replicationQueues.getLogPosition(this.peerClusterZnode, this.queue.peek().getName()));
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + this.repLogReader.getPosition());
                }
            }
            catch (ReplicationException e) {
                this.terminate("Couldn't get the position of this recovered queue " + this.peerClusterZnode, e);
            }
        }
        while (this.isActive()) {
            ArrayList<WAL.Entry> entries;
            boolean gotIOE;
            boolean currentWALisBeingWrittenTo;
            block43: {
                if (!this.isPeerEnabled()) {
                    if (!this.sleepForRetries("Replication is disabled", sleepMultiplier)) continue;
                    ++sleepMultiplier;
                    continue;
                }
                Path oldPath = this.getCurrentPath();
                boolean hasCurrentPath = this.getNextPath();
                if (this.getCurrentPath() != null && oldPath == null) {
                    sleepMultiplier = 1;
                }
                if (!hasCurrentPath) {
                    if (!this.sleepForRetries("No log to process", sleepMultiplier)) continue;
                    ++sleepMultiplier;
                    continue;
                }
                currentWALisBeingWrittenTo = false;
                if (!this.replicationQueueInfo.isQueueRecovered() && this.queue.size() == 0) {
                    currentWALisBeingWrittenTo = true;
                }
                if (!this.openReader(sleepMultiplier)) {
                    sleepMultiplier = 1;
                    continue;
                }
                if (this.reader == null) {
                    if (!this.sleepForRetries("Unable to open a reader", sleepMultiplier)) continue;
                    ++sleepMultiplier;
                    continue;
                }
                gotIOE = false;
                this.currentNbOperations = 0;
                this.currentNbHFiles = 0L;
                entries = new ArrayList<WAL.Entry>(1);
                this.currentSize = 0;
                try {
                    if (this.readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
                        continue;
                    }
                }
                catch (IOException ioe) {
                    LOG.warn(this.peerClusterZnode + " Got: ", ioe);
                    gotIOE = true;
                    if (!(ioe.getCause() instanceof EOFException)) break block43;
                    boolean considerDumping = false;
                    if (this.replicationQueueInfo.isQueueRecovered()) {
                        try {
                            FileStatus stat = this.fs.getFileStatus(this.currentPath);
                            if (stat.getLen() == 0L) {
                                LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
                            }
                            considerDumping = true;
                        }
                        catch (IOException e) {
                            LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
                        }
                    }
                    if (considerDumping && sleepMultiplier == this.maxRetriesMultiplier && this.processEndOfFile()) {
                        continue;
                    }
                }
                finally {
                    try {
                        this.reader = null;
                        this.repLogReader.closeReader();
                    }
                    catch (IOException e) {
                        gotIOE = true;
                        LOG.warn("Unable to finalize the tailing of a file", e);
                    }
                    continue;
                }
            }
            if (this.isActive() && (gotIOE || entries.isEmpty())) {
                if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
                    this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
                    this.lastLoggedPosition = this.repLogReader.getPosition();
                }
                if (!gotIOE) {
                    sleepMultiplier = 1;
                    this.metrics.setAgeOfLastShippedOp(System.currentTimeMillis());
                }
                if (!this.sleepForRetries("Nothing to replicate", sleepMultiplier)) continue;
                ++sleepMultiplier;
                continue;
            }
            sleepMultiplier = 1;
            this.shipEdits(currentWALisBeingWrittenTo, entries);
        }
        this.uninitialize();
    }

    protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) throws IOException {
        long seenEntries = 0L;
        if (LOG.isTraceEnabled()) {
            LOG.trace("Seeking in " + this.currentPath + " at position " + this.repLogReader.getPosition());
        }
        this.repLogReader.seek();
        long positionBeforeRead = this.repLogReader.getPosition();
        WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
        while (entry != null) {
            this.metrics.incrLogEditsRead();
            ++seenEntries;
            if (this.replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(this.peerClusterId)) {
                entry = this.walEntryFilter.filter(entry);
                WALEdit edit = null;
                WALKey logKey = null;
                if (entry != null) {
                    edit = entry.getEdit();
                    logKey = entry.getKey();
                }
                if (edit != null && edit.size() != 0) {
                    logKey.addClusterId(this.clusterId);
                    this.currentNbOperations += this.countDistinctRowKeys(edit);
                    entries.add(entry);
                    this.currentSize += this.calculateTotalSizeOfStoreFiles(edit);
                } else {
                    this.metrics.incrLogEditsFiltered();
                }
            }
            if ((long)this.currentSize >= this.replicationQueueSizeCapacity || entries.size() >= this.replicationQueueNbCapacity) break;
            try {
                entry = this.repLogReader.readNextAndSetPosition();
            }
            catch (IOException ie) {
                LOG.debug("Break on IOE: " + ie.getMessage());
                break;
            }
        }
        this.metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
        if (currentWALisBeingWrittenTo) {
            return false;
        }
        return seenEntries == 0L && this.processEndOfFile();
    }

    private int calculateTotalSizeOfStoreFiles(WALEdit edit) {
        ArrayList<Cell> cells = edit.getCells();
        int totalStoreFilesSize = 0;
        int totalCells = edit.size();
        for (int i = 0; i < totalCells; ++i) {
            if (!CellUtil.matchingQualifier((Cell)cells.get(i), WALEdit.BULK_LOAD)) continue;
            try {
                WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor((Cell)cells.get(i));
                List<WALProtos.StoreDescriptor> stores = bld.getStoresList();
                int totalStores = stores.size();
                for (int j = 0; j < totalStores; ++j) {
                    totalStoreFilesSize = (int)((long)totalStoreFilesSize + stores.get(j).getStoreFileSize());
                }
                continue;
            }
            catch (IOException e) {
                LOG.error("Failed to deserialize bulk load entry from wal edit. Size of HFiles part of cell will not be considered in replication request size calculation.", e);
            }
        }
        return totalStoreFilesSize;
    }

    private void cleanUpHFileRefs(WALEdit edit) throws IOException {
        String peerId = this.peerClusterZnode;
        if (peerId.contains("-")) {
            peerId = this.peerClusterZnode.split("-")[0];
        }
        ArrayList<Cell> cells = edit.getCells();
        int totalCells = cells.size();
        for (int i = 0; i < totalCells; ++i) {
            Cell cell = (Cell)cells.get(i);
            if (!CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) continue;
            WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
            List<WALProtos.StoreDescriptor> stores = bld.getStoresList();
            int totalStores = stores.size();
            for (int j = 0; j < totalStores; ++j) {
                List<String> storeFileList = stores.get(j).getStoreFileList();
                this.manager.cleanUpHFileRefs(peerId, storeFileList);
                this.metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
            }
        }
    }

    protected boolean getNextPath() {
        try {
            if (this.currentPath == null) {
                this.currentPath = this.queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
                this.metrics.setSizeOfLogQueue(this.queue.size());
                if (this.currentPath != null) {
                    this.manager.cleanOldLogs(this.currentPath.getName(), this.peerId, this.replicationQueueInfo.isQueueRecovered());
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("New log: " + this.currentPath);
                    }
                }
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while reading edits", e);
        }
        return this.currentPath != null;
    }

    protected boolean openReader(int sleepMultiplier) {
        block14: {
            try {
                try {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Opening log " + this.currentPath);
                    }
                    this.reader = this.repLogReader.openReader(this.currentPath);
                }
                catch (FileNotFoundException fnfe) {
                    if (this.replicationQueueInfo.isQueueRecovered()) {
                        List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
                        LOG.info("NB dead servers : " + deadRegionServers.size());
                        Path rootDir = FSUtils.getRootDir(this.conf);
                        for (String curDeadServerName : deadRegionServers) {
                            Path[] locs;
                            Path deadRsDirectory = new Path(rootDir, DefaultWALProvider.getWALDirectoryName(curDeadServerName));
                            for (Path possibleLogLocation : locs = new Path[]{new Path(deadRsDirectory, this.currentPath.getName()), new Path(deadRsDirectory.suffix("-splitting"), this.currentPath.getName())}) {
                                LOG.info("Possible location " + possibleLogLocation.toUri().toString());
                                if (!this.manager.getFs().exists(possibleLogLocation)) continue;
                                LOG.info("Log " + this.currentPath + " still exists at " + possibleLogLocation);
                                return true;
                            }
                        }
                        if (this.stopper instanceof ReplicationSyncUp.DummyServer) {
                            FileStatus[] rss;
                            for (FileStatus rs : rss = this.fs.listStatus(this.manager.getLogDir())) {
                                FileStatus[] logs;
                                Path p = rs.getPath();
                                for (FileStatus log : logs = this.fs.listStatus(p)) {
                                    if (!(p = new Path(p, log.getPath().getName())).getName().equals(this.currentPath.getName())) continue;
                                    this.currentPath = p;
                                    LOG.info("Log " + this.currentPath.getName() + " found at " + this.currentPath);
                                    this.openReader(sleepMultiplier);
                                    return true;
                                }
                            }
                        }
                        throw new IOException("File from recovered queue is nowhere to be found", fnfe);
                    }
                    Path archivedLogLocation = new Path(this.manager.getOldLogDir(), this.currentPath.getName());
                    if (this.manager.getFs().exists(archivedLogLocation)) {
                        this.currentPath = archivedLogLocation;
                        LOG.info("Log " + this.currentPath + " was moved to " + archivedLogLocation);
                        this.openReader(sleepMultiplier);
                    }
                }
            }
            catch (IOException ioe) {
                if (ioe instanceof EOFException && this.isCurrentLogEmpty()) {
                    return true;
                }
                LOG.warn(this.peerClusterZnode + " Got: ", ioe);
                this.reader = null;
                if (ioe.getCause() instanceof NullPointerException) {
                    LOG.warn("Got NPE opening reader, will retry.");
                }
                if (sleepMultiplier != this.maxRetriesMultiplier) break block14;
                LOG.warn("Waited too long for this file, considering dumping");
                return !this.processEndOfFile();
            }
        }
        return true;
    }

    private boolean isCurrentLogEmpty() {
        return this.repLogReader.getPosition() == 0L && !this.replicationQueueInfo.isQueueRecovered() && this.queue.size() == 0;
    }

    protected boolean sleepForRetries(String msg, int sleepMultiplier) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace(msg + ", sleeping " + this.sleepForRetries + " times " + sleepMultiplier);
            }
            Thread.sleep(this.sleepForRetries * (long)sleepMultiplier);
        }
        catch (InterruptedException e) {
            LOG.debug("Interrupted while sleeping between retries");
            Thread.currentThread().interrupt();
        }
        return sleepMultiplier < this.maxRetriesMultiplier;
    }

    private int countDistinctRowKeys(WALEdit edit) {
        ArrayList<Cell> cells = edit.getCells();
        int distinctRowKeys = 1;
        int totalHFileEntries = 0;
        Cell lastCell = (Cell)cells.get(0);
        int totalCells = edit.size();
        for (int i = 0; i < totalCells; ++i) {
            if (CellUtil.matchingQualifier((Cell)cells.get(i), WALEdit.BULK_LOAD)) {
                try {
                    WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor((Cell)cells.get(i));
                    List<WALProtos.StoreDescriptor> stores = bld.getStoresList();
                    int totalStores = stores.size();
                    for (int j = 0; j < totalStores; ++j) {
                        totalHFileEntries += stores.get(j).getStoreFileList().size();
                    }
                }
                catch (IOException e) {
                    LOG.error("Failed to deserialize bulk load entry from wal edit. Then its hfiles count will not be added into metric.");
                }
            }
            if (!CellUtil.matchingRow((Cell)cells.get(i), lastCell)) {
                ++distinctRowKeys;
            }
            lastCell = (Cell)cells.get(i);
        }
        this.currentNbHFiles += (long)totalHFileEntries;
        return distinctRowKeys + totalHFileEntries;
    }

    protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
        int sleepMultiplier = 0;
        if (entries.isEmpty()) {
            LOG.warn("Was given 0 edits to ship");
            return;
        }
        while (this.isActive()) {
            try {
                long sleepTicks;
                if (this.throttler.isEnabled() && (sleepTicks = this.throttler.getNextSleepInterval(this.currentSize)) > 0L) {
                    try {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
                        }
                        Thread.sleep(sleepTicks);
                    }
                    catch (InterruptedException e) {
                        LOG.debug("Interrupted while sleeping for throttling control");
                        Thread.currentThread().interrupt();
                        continue;
                    }
                    this.throttler.resetStartTick();
                }
                ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
                replicateContext.setEntries(entries).setSize(this.currentSize);
                long startTimeNs = System.nanoTime();
                boolean replicated = this.replicationEndpoint.replicate(replicateContext);
                long endTimeNs = System.nanoTime();
                if (!replicated) continue;
                sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
                if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
                    int size = entries.size();
                    for (int i = 0; i < size; ++i) {
                        this.cleanUpHFileRefs(entries.get(i).getEdit());
                    }
                    this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
                    this.lastLoggedPosition = this.repLogReader.getPosition();
                }
                if (this.throttler.isEnabled()) {
                    this.throttler.addPushSize(this.currentSize);
                }
                this.totalReplicatedEdits += (long)entries.size();
                this.totalReplicatedOperations += (long)this.currentNbOperations;
                this.metrics.shipBatch(this.currentNbOperations, this.currentSize / 1024, this.currentNbHFiles);
                this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime());
                if (!LOG.isTraceEnabled()) break;
                LOG.trace("Replicated " + this.totalReplicatedEdits + " entries in total, or " + this.totalReplicatedOperations + " operations in " + (endTimeNs - startTimeNs) / 1000000L + " ms");
                break;
            }
            catch (Exception ex) {
                LOG.warn(this.replicationEndpoint.getClass().getName() + " threw unknown exception:" + org.apache.hadoop.util.StringUtils.stringifyException(ex));
                if (!this.sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) continue;
                ++sleepMultiplier;
            }
        }
    }

    protected boolean isPeerEnabled() {
        return this.replicationPeers.getStatusOfPeer(this.peerId);
    }

    @SuppressWarnings(value={"DE_MIGHT_IGNORE"}, justification="Yeah, this is how it works")
    protected boolean processEndOfFile() {
        if (this.queue.size() != 0) {
            if (LOG.isTraceEnabled()) {
                String filesize = "N/A";
                try {
                    FileStatus stat = this.fs.getFileStatus(this.currentPath);
                    filesize = stat.getLen() + "";
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                LOG.trace("Reached the end of a log, stats: " + this.getStats() + ", and the length of the file is " + filesize);
            }
            this.currentPath = null;
            this.repLogReader.finishCurrentFile();
            this.reader = null;
            return true;
        }
        if (this.replicationQueueInfo.isQueueRecovered()) {
            this.manager.closeRecoveredQueue(this);
            LOG.info("Finished recovering the queue with the following stats " + this.getStats());
            this.running = false;
            return true;
        }
        return false;
    }

    @Override
    public void startup() {
        String n = Thread.currentThread().getName();
        Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                LOG.error("Unexpected exception in ReplicationSource, currentPath=" + ReplicationSource.this.currentPath, e);
            }
        };
        Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
    }

    @Override
    public void terminate(String reason) {
        this.terminate(reason, null);
    }

    @Override
    public void terminate(String reason, Exception cause) {
        this.terminate(reason, cause, true);
    }

    public void terminate(String reason, Exception cause, boolean join) {
        if (cause == null) {
            LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason);
        } else {
            LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason, cause);
        }
        this.running = false;
        this.interrupt();
        ListenableFuture<Service.State> future = null;
        if (this.replicationEndpoint != null) {
            future = this.replicationEndpoint.stop();
        }
        if (join) {
            Threads.shutdown(this, this.sleepForRetries);
            if (future != null) {
                try {
                    future.get();
                }
                catch (Exception e) {
                    LOG.warn("Got exception:" + e);
                }
            }
        }
    }

    @Override
    public String getPeerClusterZnode() {
        return this.peerClusterZnode;
    }

    @Override
    public String getPeerClusterId() {
        return this.peerId;
    }

    @Override
    public Path getCurrentPath() {
        return this.currentPath;
    }

    private boolean isActive() {
        return !this.stopper.isStopped() && this.running && !this.isInterrupted();
    }

    @Override
    public String getStats() {
        long position = this.repLogReader.getPosition();
        return "Total replicated edits: " + this.totalReplicatedEdits + ", currently replicating from: " + this.currentPath + " at position: " + position;
    }

    public MetricsSource getSourceMetrics() {
        return this.metrics;
    }

    public static class LogsComparator
    implements Comparator<Path> {
        @Override
        public int compare(Path o1, Path o2) {
            return Long.valueOf(this.getTS(o1)).compareTo(this.getTS(o2));
        }

        private long getTS(Path p) {
            String[] parts = p.getName().split("\\.");
            return Long.parseLong(parts[parts.length - 1]);
        }
    }
}

