/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.core;

import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.CheckpointMult;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.DbusEventBufferBatchReadable;
import com.linkedin.databus.core.DbusEventBufferMetaInfo;
import com.linkedin.databus.core.DbusEventFactory;
import com.linkedin.databus.core.DbusEventV2Factory;
import com.linkedin.databus.core.Encoding;
import com.linkedin.databus.core.OffsetNotFoundException;
import com.linkedin.databus.core.ScnNotFoundException;
import com.linkedin.databus.core.StreamEventsArgs;
import com.linkedin.databus.core.StreamEventsResult;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.data_model.LogicalPartition;
import com.linkedin.databus.core.data_model.LogicalSource;
import com.linkedin.databus.core.data_model.PhysicalPartition;
import com.linkedin.databus.core.data_model.PhysicalSource;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus.core.monitoring.mbean.StatsCollectors;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus2.core.BufferNotFoundException;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.filter.AllowAllDbusFilter;
import com.linkedin.databus2.core.filter.ConjunctionDbusFilter;
import com.linkedin.databus2.core.filter.DbusFilter;
import com.linkedin.databus2.core.filter.LogicalSourceAndPartitionDbusFilter;
import com.linkedin.databus2.core.filter.PhysicalPartitionDbusFilter;
import com.linkedin.databus2.relay.config.LogicalSourceConfig;
import com.linkedin.databus2.relay.config.LogicalSourceStaticConfig;
import com.linkedin.databus2.relay.config.PhysicalSourceStaticConfig;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.log4j.Logger;

public class DbusEventBufferMult {
    public static final String MODULE = DbusEventBufferMult.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    public static final String PERF_MODULE = MODULE + "Perf";
    public static final Logger PERF_LOG = Logger.getLogger((String)PERF_MODULE);
    private final Set<DbusEventBuffer> _uniqBufs = new HashSet<DbusEventBuffer>();
    private final TreeMap<PhysicalPartitionKey, DbusEventBuffer> _bufsMap = new TreeMap();
    private final Map<PhysicalPartitionKey, Set<PhysicalSource>> _partKey2PhysiscalSources = new HashMap<PhysicalPartitionKey, Set<PhysicalSource>>();
    private final Map<LogicalPartitionKey, PhysicalPartitionKey> _logicalPKey2PhysicalPKey = new HashMap<LogicalPartitionKey, PhysicalPartitionKey>();
    private final Map<Integer, LogicalSource> _logicalId2LogicalSource = new HashMap<Integer, LogicalSource>();
    private File _mmapDirectory = null;
    private DbusEventFactory _eventFactory;
    boolean _dropOldEvents = false;
    private final double _nanoSecsInMSec = 1000000.0;
    public static final String BAK_DIRNAME_SUFFIX = ".BAK";

    public DbusEventBufferMult() {
        this._eventFactory = new DbusEventV2Factory();
    }

    public DbusEventBufferMult(PhysicalSourceStaticConfig[] pConfigs, DbusEventBuffer.StaticConfig config, DbusEventFactory eventFactory) throws InvalidConfigException {
        this._eventFactory = eventFactory;
        if (pConfigs == null) {
            LOG.warn((Object)"Creating empty MULT buffer. No pConfigs passed");
            return;
        }
        LOG.info((Object)("Creating new DbusEventBufferMult for " + pConfigs.length + " physical configurations"));
        for (PhysicalSourceStaticConfig pConfig : pConfigs) {
            this.addNewBuffer(pConfig, config);
        }
        if (config.getAllocationPolicy() == DbusEventBuffer.AllocationPolicy.MMAPPED_MEMORY) {
            this._mmapDirectory = config.getMmapDirectory();
        }
    }

    public DbusEventBuffer getOneBuffer(LogicalSource lSource, LogicalPartition lPartition) {
        if (lPartition == null) {
            lPartition = LogicalSourceStaticConfig.getDefaultLogicalSourcePartition();
        }
        if (lSource == null) {
            throw new IllegalArgumentException("cannot find buffer without source");
        }
        LogicalPartitionKey lKey = new LogicalPartitionKey(lSource, lPartition);
        PhysicalPartitionKey pKey = this._logicalPKey2PhysicalPKey.get(lKey);
        if (pKey == null) {
            return null;
        }
        return this._bufsMap.get(pKey);
    }

    public DbusEventBuffer getOneBuffer(PhysicalPartition pPartition) {
        if (pPartition == null) {
            pPartition = PhysicalSourceStaticConfig.getDefaultPhysicalPartition();
        }
        PhysicalPartitionKey key = new PhysicalPartitionKey(pPartition);
        return this._bufsMap.get(key);
    }

    public void resetBuffer(PhysicalPartition pPartition, long prevScn) throws BufferNotFoundException {
        DbusEventBuffer buf = this.getOneBuffer(pPartition);
        if (buf == null) {
            throw new BufferNotFoundException("cannot find buf for partition " + pPartition);
        }
        buf.reset(prevScn);
    }

    public DbusEventBufferAppendable getDbusEventBufferAppendable(PhysicalPartition pPartition) {
        return this.getOneBuffer(pPartition);
    }

    public DbusEventBufferAppendable getDbusEventBufferAppendable(int lSrcId) {
        LogicalSource lSource = this._logicalId2LogicalSource.get(lSrcId);
        return this.getDbusEventBufferAppendable(lSource);
    }

    public DbusEventBufferAppendable getDbusEventBufferAppendable(LogicalSource lSource) {
        return this.getDbusEventBufferAppendable(lSource, null);
    }

    public DbusEventBufferAppendable getDbusEventBufferAppendable(LogicalSource lSource, LogicalPartition lPartition) {
        return this.getOneBuffer(lSource, lPartition);
    }

    public DbusEventBuffer getDbusEventBuffer(LogicalSource lSource) {
        return this.getOneBuffer(lSource, null);
    }

    public DbusEventBufferBatchReadable getDbusEventBufferBatchReadable(CheckpointMult cpMult, Set<PhysicalPartitionKey> ppartKeys, StatsCollectors<DbusEventsStatisticsCollector> statsCollectors) throws IOException {
        return new DbusEventBufferBatchReader(cpMult, ppartKeys, statsCollectors);
    }

    public DbusEventBufferBatchReadable getDbusEventBufferBatchReadable(Collection<Integer> ids, CheckpointMult cpMult, StatsCollectors<DbusEventsStatisticsCollector> statsCollector) throws IOException {
        return new DbusEventBufferBatchReader(ids, cpMult, statsCollector);
    }

    public DbusEventBufferBatchReadable getDbusEventBufferBatchReadable(CheckpointMult cpMult, Collection<PhysicalPartitionKey> physicalPartitions, StatsCollectors<DbusEventsStatisticsCollector> statsCollector) throws IOException {
        return new DbusEventBufferBatchReader(cpMult, physicalPartitions, statsCollector);
    }

    public PhysicalPartition getPhysicalPartition(int srcId) {
        return this.getPhysicalPartition(srcId, new LogicalPartition(LogicalSourceConfig.DEFAULT_LOGICAL_SOURCE_PARTITION));
    }

    public PhysicalPartition getPhysicalPartition(int srcId, LogicalPartition lPartition) {
        LogicalSource lSource = this._logicalId2LogicalSource.get(srcId);
        if (lSource == null) {
            return null;
        }
        LogicalPartitionKey lKey = new LogicalPartitionKey(lSource, lPartition);
        PhysicalPartitionKey pKey = this._logicalPKey2PhysicalPKey.get(lKey);
        return pKey == null ? null : pKey.getPhysicalPartition();
    }

    public Iterable<DbusEventBuffer> bufIterable() {
        return this._uniqBufs;
    }

    public synchronized DbusEventBuffer addNewBuffer(PhysicalSourceStaticConfig pConfig, DbusEventBuffer.StaticConfig config) throws InvalidConfigException {
        long startTimeTs = System.nanoTime();
        if (config == null) {
            throw new InvalidConfigException("config cannot be null for addNewBuffer");
        }
        PhysicalPartition pPartition = pConfig.getPhysicalPartition();
        PhysicalPartitionKey pKey = new PhysicalPartitionKey(pPartition);
        PhysicalSource pSource = pConfig.getPhysicalSource();
        Set<PhysicalSource> set = this._partKey2PhysiscalSources.get(pKey);
        if (set == null) {
            set = new HashSet<PhysicalSource>();
            this._partKey2PhysiscalSources.put(pKey, set);
        }
        set.add(pSource);
        DbusEventBuffer buf = this._bufsMap.get(pKey);
        if (buf != null) {
            LOG.info((Object)("Adding new buffer. Buffer " + buf.hashCode() + " already exists for: " + pConfig));
        } else {
            if (pConfig.isDbusEventBufferSet()) {
                buf = new DbusEventBuffer(pConfig.getDbusEventBuffer(), pPartition, this._eventFactory);
                LOG.info((Object)("Using- source specific event buffer config, the event buffer size allocated is: " + buf.getAllocatedSize()));
            } else {
                buf = new DbusEventBuffer(config, pPartition, this._eventFactory);
                LOG.info((Object)("Using- global event buffer config, the buffer size allocated is: " + buf.getAllocatedSize()));
            }
            this.addBuffer(pConfig, buf);
        }
        buf.increaseRefCounter();
        this.deallocateRemovedBuffers(false);
        long endTimeTs = System.nanoTime();
        if (PERF_LOG.isDebugEnabled()) {
            PERF_LOG.debug((Object)("addNewBuffer took:" + (double)(endTimeTs - startTimeTs) / 1000000.0 + "ms"));
        }
        return buf;
    }

    public synchronized void removeBuffer(PhysicalPartitionKey pKey, PhysicalSource pSource) {
        long startTimeTs = System.nanoTime();
        DbusEventBuffer buf = this._bufsMap.get(pKey);
        if (buf == null) {
            LOG.error((Object)("Cannot find buffer for key = " + pKey));
            return;
        }
        Set<PhysicalSource> set = this._partKey2PhysiscalSources.get(pKey);
        if (pSource != null) {
            LOG.info((Object)("removing physicalSource = " + pSource + " for key = " + pKey));
            if (set == null || !set.remove(pSource)) {
                LOG.warn((Object)("couldn't remove pSource for key=" + pKey + ";set = " + set + "; psource=" + pSource));
            }
        } else {
            LOG.info((Object)("removing all physicalSources for key = " + pKey));
            this._partKey2PhysiscalSources.remove(pKey);
        }
        buf.decreaseRefCounter();
        long endTimeTs = System.nanoTime();
        if (PERF_LOG.isDebugEnabled()) {
            PERF_LOG.debug((Object)("removeNewBuffer took:" + (double)(endTimeTs - startTimeTs) / 1000000.0 + "ms"));
        }
    }

    public synchronized void removeBuffer(PhysicalSourceStaticConfig pConfig) {
        PhysicalPartitionKey pKey = new PhysicalPartitionKey(pConfig.getPhysicalPartition());
        PhysicalSource pSource = pConfig.getPhysicalSource();
        this.removeBuffer(pKey, pSource);
    }

    public synchronized void deallocateRemovedBuffers(boolean now) {
        HashSet<DbusEventBuffer> set = new HashSet<DbusEventBuffer>(5);
        Iterator<PhysicalPartitionKey> it = this._bufsMap.keySet().iterator();
        while (it.hasNext()) {
            PhysicalPartitionKey pKey = it.next();
            DbusEventBuffer buf = this._bufsMap.get(pKey);
            if (!buf.shouldBeRemoved(now)) continue;
            it.remove();
            this.removeAuxMapping(pKey);
            set.add(buf);
        }
        for (DbusEventBuffer b : set) {
            b.closeBuffer(false);
            this._uniqBufs.remove(b);
        }
    }

    public synchronized void close() {
        File bakDir;
        if (this._mmapDirectory != null) {
            File[] metaFiles;
            bakDir = new File(this._mmapDirectory.getAbsolutePath() + BAK_DIRNAME_SUFFIX);
            FilePrefixFilter filter = new FilePrefixFilter(DbusEventBuffer.getMmapMetaInfoFileNamePrefix());
            for (File f : metaFiles = this._mmapDirectory.listFiles(filter)) {
                if (!f.isFile()) continue;
                this.moveFile(f, bakDir);
            }
        }
        for (Map.Entry<PhysicalPartitionKey, DbusEventBuffer> entry : this._bufsMap.entrySet()) {
            try {
                entry.getValue().closeBuffer(true);
            }
            catch (RuntimeException e) {
                LOG.error((Object)("error closing buffer for partition: " + entry.getKey().getPhysicalPartition() + ": " + e.getMessage()), (Throwable)e);
            }
        }
        if (this._mmapDirectory != null) {
            bakDir = new File(this._mmapDirectory.getAbsolutePath() + BAK_DIRNAME_SUFFIX);
            this.moveUnusedSessionDirs(bakDir);
        }
    }

    private void moveUnusedSessionDirs(File bakDir) {
        File[] sessionDirs;
        File[] metaFiles;
        LOG.info((Object)("Moving unused session directories from " + this._mmapDirectory));
        FilePrefixFilter sessionFileFilter = new FilePrefixFilter(DbusEventBuffer.getSessionPrefix());
        FilePrefixFilter metaFileFilter = new FilePrefixFilter(DbusEventBuffer.getMmapMetaInfoFileNamePrefix());
        try {
            metaFiles = this._mmapDirectory.listFiles(metaFileFilter);
            sessionDirs = this._mmapDirectory.listFiles(sessionFileFilter);
        }
        catch (SecurityException e) {
            LOG.warn((Object)"Could not scan directories. Nothing moved.", (Throwable)e);
            return;
        }
        HashMap<String, File> sessionFileMap = new HashMap<String, File>(sessionDirs.length);
        for (File f : sessionDirs) {
            if (!f.isDirectory()) continue;
            sessionFileMap.put(f.getName(), f);
        }
        for (File f : metaFiles) {
            if (!f.isFile()) continue;
            try {
                DbusEventBufferMetaInfo mi = new DbusEventBufferMetaInfo(f);
                mi.loadMetaInfo();
                if (!mi.isValid()) continue;
                String sessionId = mi.getSessionId();
                sessionFileMap.remove(sessionId);
            }
            catch (DbusEventBufferMetaInfo.DbusEventBufferMetaInfoException e) {
                LOG.warn((Object)("Error parsing meta info file" + f.getName() + ". Nothing moved"), (Throwable)e);
                return;
            }
        }
        int nDirsMoved = 0;
        for (File d : sessionFileMap.values()) {
            try {
                if (this.moveFile(d, bakDir)) {
                    LOG.info((Object)("Moved directory " + d.getName()));
                    ++nDirsMoved;
                    continue;
                }
                LOG.warn((Object)("Could not move directory " + d.getName() + ". Ignored"));
            }
            catch (SecurityException e) {
                LOG.warn((Object)("Could not move directory " + d.getName() + ". Ignored"), (Throwable)e);
            }
        }
        LOG.info((Object)("Moved " + nDirsMoved + " session directories from " + this._mmapDirectory));
    }

    private boolean moveFile(File element, File bakDir) {
        LOG.info((Object)("backing up " + element));
        String baseName = element.getName();
        String bakDirName = bakDir.getAbsolutePath();
        if (!bakDir.exists()) {
            if (!bakDir.mkdirs()) {
                LOG.warn((Object)("Could not create directory " + bakDir.getName()));
                return false;
            }
        } else if (!bakDir.isDirectory()) {
            LOG.error((Object)(bakDir.getName() + " is not a directory"));
            return false;
        }
        File movedFile = new File(bakDirName, baseName);
        return element.renameTo(movedFile);
    }

    private void removeAuxMapping(PhysicalPartitionKey key) {
        ArrayList<LogicalPartitionKey> l = new ArrayList<LogicalPartitionKey>(10);
        for (Map.Entry<LogicalPartitionKey, PhysicalPartitionKey> e : this._logicalPKey2PhysicalPKey.entrySet()) {
            if (!e.getValue().equals(key)) continue;
            l.add(e.getKey());
        }
        for (LogicalPartitionKey lk : l) {
            this._logicalPKey2PhysicalPKey.remove(lk);
        }
    }

    public void assertBuffers() {
    }

    public synchronized void addBuffer(PhysicalSourceStaticConfig pConfig, DbusEventBuffer buf) {
        LOG.info((Object)("addBuffer for phSrc=" + pConfig + "; buf=" + buf.hashCode()));
        PhysicalPartition pPartition = pConfig.getPhysicalPartition();
        PhysicalPartitionKey pKey = new PhysicalPartitionKey(pPartition);
        this._bufsMap.put(pKey, buf);
        this._uniqBufs.add(buf);
        buf.setDropOldEvents(this._dropOldEvents);
        for (LogicalSourceStaticConfig lSrc : pConfig.getSources()) {
            this.updateLogicalSourceMapping(pKey, lSrc.getLogicalSource(), lSrc.getPartition());
        }
    }

    public Set<PhysicalSource> getPhysicalSourcesForPartition(PhysicalPartition pPart) {
        PhysicalPartitionKey pKey = new PhysicalPartitionKey(pPart);
        return this._partKey2PhysiscalSources.get(pKey);
    }

    public DbusFilter constructFilters(Collection<DatabusSubscription> subs) throws DatabusException {
        DbusFilter result;
        HashMap<PhysicalPartition, PhysicalPartitionDbusFilter> filterMap = null;
        for (DatabusSubscription sub : subs) {
            PhysicalPartition ppart = sub.getPhysicalPartition();
            if (sub.getLogicalSource().isWildcard()) {
                if (!ppart.isWildcard()) {
                    if (null == filterMap) {
                        filterMap = new HashMap<PhysicalPartition, PhysicalPartitionDbusFilter>(10);
                    }
                    filterMap.put(ppart, new PhysicalPartitionDbusFilter(ppart, null));
                    continue;
                }
                LOG.warn((Object)"ignoring subscription with both physical partition and logical source wildcards");
                continue;
            }
            PhysicalPartitionDbusFilter ppartFilter = null != filterMap ? (PhysicalPartitionDbusFilter)filterMap.get(ppart) : null;
            LogicalSourceAndPartitionDbusFilter logFilter = null;
            if (null == ppartFilter) {
                logFilter = new LogicalSourceAndPartitionDbusFilter();
                ppartFilter = new PhysicalPartitionDbusFilter(ppart, logFilter);
                if (null == filterMap) {
                    filterMap = new HashMap(10);
                }
                filterMap.put(ppart, ppartFilter);
            } else {
                logFilter = (LogicalSourceAndPartitionDbusFilter)ppartFilter.getNestedFilter();
            }
            if (null != logFilter) {
                logFilter.addSourceCondition(sub.getLogicalPartition());
                continue;
            }
            LOG.error((Object)"unexpected null filter for logical source");
        }
        if (0 == filterMap.size()) {
            return AllowAllDbusFilter.THE_INSTANCE;
        }
        if (1 == filterMap.size()) {
            result = (DbusFilter)filterMap.entrySet().iterator().next().getValue();
            return result;
        }
        result = new ConjunctionDbusFilter();
        for (Map.Entry filterEntry : filterMap.entrySet()) {
            ((ConjunctionDbusFilter)result).addFilter((DbusFilter)filterEntry.getValue());
        }
        return result;
    }

    public NavigableSet<PhysicalPartitionKey> getAllPhysicalPartitionKeys() {
        return this._bufsMap.navigableKeySet();
    }

    private void updateLogicalSourceMapping(PhysicalPartitionKey pKey, LogicalSource lSource, LogicalPartition lPartition) {
        LogicalPartitionKey lKey = new LogicalPartitionKey(lSource, lPartition);
        LOG.info((Object)("logical source " + lKey + " mapped to physical source " + pKey));
        this._logicalPKey2PhysicalPKey.put(lKey, pKey);
        this._logicalId2LogicalSource.put(lSource.getId(), lSource);
    }

    public void setDropOldEvents(boolean val) {
        this._dropOldEvents = val;
        for (DbusEventBuffer buf : this._uniqBufs) {
            buf.setDropOldEvents(val);
        }
    }

    public int bufsNum() {
        return this._uniqBufs.size();
    }

    public void startAllEvents() {
        for (DbusEventBuffer buf : this._uniqBufs) {
            buf.startEvents();
        }
    }

    public void endAllEvents(long seq, long nTime, DbusEventsStatisticsCollector stats) {
        for (DbusEventBuffer buf : this._uniqBufs) {
            buf.endEvents(seq, stats);
        }
    }

    public void clearAll() {
        for (DbusEventBuffer buf : this._uniqBufs) {
            buf.clear();
        }
    }

    public synchronized void saveBufferMetaInfo(boolean infoOnly) throws IOException {
        for (DbusEventBuffer buf : this._uniqBufs) {
            buf.saveBufferMetaInfo(infoOnly);
        }
    }

    public void validateRelayBuffers() throws DbusEventBufferMetaInfo.DbusEventBufferMetaInfoException {
        for (DbusEventBuffer buf : this._uniqBufs) {
            buf.validateEventsInBuffer();
        }
    }

    public void rollbackAllBuffers() {
        for (DbusEventBuffer buf : this._uniqBufs) {
            buf.rollbackEvents();
        }
    }

    public static class PhysicalPartitionKey
    implements Comparable {
        private PhysicalPartition _pPartition;

        public int hashCode() {
            int prime = 37;
            int result = 1;
            result = 37 * result + (this._pPartition == null ? 0 : this._pPartition.hashCode());
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            PhysicalPartitionKey other = (PhysicalPartitionKey)obj;
            return !(this._pPartition == null ? other._pPartition != null : !this._pPartition.equals(other._pPartition));
        }

        public PhysicalPartition getPhysicalPartition() {
            return this._pPartition;
        }

        public void setPhysicalPartition(PhysicalPartition p) {
            this._pPartition = p;
        }

        public PhysicalPartitionKey() {
            this._pPartition = new PhysicalPartition();
        }

        public PhysicalPartitionKey(PhysicalPartition pPartition) {
            this._pPartition = pPartition;
        }

        public String toJsonString() {
            return "{\"physicalPartition\":" + this._pPartition.toJsonString() + "}";
        }

        public String toString() {
            return this.toJsonString();
        }

        public int compareTo(Object other) {
            if (!(other instanceof PhysicalPartitionKey)) {
                throw new ClassCastException("PhysicalPartitionKey class expected instead of " + other.getClass().getSimpleName());
            }
            PhysicalPartition op = ((PhysicalPartitionKey)other).getPhysicalPartition();
            return this._pPartition.compareTo(op);
        }
    }

    public static class LogicalPartitionKey {
        private final LogicalSource _lSource;
        private final LogicalPartition _lPartition;

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this._lPartition == null ? 0 : this._lPartition.hashCode());
            result = 31 * result + (this._lSource == null ? 0 : this._lSource.hashCode());
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            LogicalPartitionKey other = (LogicalPartitionKey)obj;
            if (this._lPartition == null ? other._lPartition != null : !this._lPartition.equals(other._lPartition)) {
                return false;
            }
            return !(this._lSource == null ? other._lSource != null : !this._lSource.equals(other._lSource));
        }

        public LogicalSource getLogicalSource() {
            return this._lSource;
        }

        public LogicalPartition getLogicalPartition() {
            return this._lPartition;
        }

        public LogicalPartitionKey(LogicalSource lSource, LogicalPartition lPartition) {
            this._lSource = lSource;
            if (lPartition == null) {
                lPartition = LogicalSourceStaticConfig.getDefaultLogicalSourcePartition();
            }
            this._lPartition = lPartition;
        }

        public String toString() {
            return "" + this._lSource + this._lPartition;
        }
    }

    private static class FilePrefixFilter
    implements FilenameFilter {
        private final String _prefix;

        FilePrefixFilter(String prefix) {
            this._prefix = prefix;
        }

        @Override
        public boolean accept(File dir, String name) {
            return name.startsWith(this._prefix);
        }
    }

    public class DbusEventBufferBatchReader
    implements DbusEventBufferBatchReadable {
        private final NavigableSet<PhysicalPartitionKey> _pKeys;
        CheckpointMult _checkPoints;
        int _clientEventVersion = 0;
        private final StatsCollectors<DbusEventsStatisticsCollector> _statsCollectors;

        public DbusEventBufferBatchReader(CheckpointMult cpMult, Collection<PhysicalPartitionKey> physicalPartitions, StatsCollectors<DbusEventsStatisticsCollector> statsCollectors) throws IOException {
            this._statsCollectors = statsCollectors;
            this._checkPoints = cpMult;
            this._pKeys = null != physicalPartitions ? new TreeSet<PhysicalPartitionKey>(physicalPartitions) : new TreeSet();
        }

        public DbusEventBufferBatchReader(Collection<Integer> ids, CheckpointMult cpMult, StatsCollectors<DbusEventsStatisticsCollector> statsCollectors) throws IOException {
            this(cpMult, null, statsCollectors);
            boolean debugEnabled = LOG.isDebugEnabled();
            for (int id : ids) {
                LogicalSource lSource = (LogicalSource)dbusEventBufferMult._logicalId2LogicalSource.get(id);
                LogicalPartition lPartition = null;
                if (lPartition == null) {
                    lPartition = LogicalPartition.createAllPartitionsWildcard();
                }
                if (debugEnabled) {
                    LOG.debug((Object)("Streaming for logical source=" + lSource + "; partition=" + lPartition));
                }
                ArrayList<LogicalPartitionKey> lpKeys = null;
                if (lPartition.isAllPartitionsWildcard()) {
                    lpKeys = new ArrayList(dbusEventBufferMult._logicalPKey2PhysicalPKey.size());
                    for (LogicalPartitionKey lpKey : dbusEventBufferMult._logicalPKey2PhysicalPKey.keySet()) {
                        if (!lpKey.getLogicalSource().getId().equals(id)) continue;
                        lpKeys.add(lpKey);
                    }
                } else {
                    lpKeys = new ArrayList<LogicalPartitionKey>(1);
                    LogicalPartitionKey lKey = new LogicalPartitionKey(lSource, lPartition);
                    lpKeys.add(lKey);
                }
                for (LogicalPartitionKey lpKey : lpKeys) {
                    PhysicalPartitionKey pKey = (PhysicalPartitionKey)dbusEventBufferMult._logicalPKey2PhysicalPKey.get(lpKey);
                    DbusEventBuffer buf = null;
                    if (pKey != null) {
                        buf = (DbusEventBuffer)dbusEventBufferMult._bufsMap.get(pKey);
                        if (buf != null) {
                            this._pKeys.add(pKey);
                        } else {
                            LOG.warn((Object)("couldn't find buffer for pKeyp=" + pKey + " and lKey=" + lpKey));
                        }
                    }
                    if (!debugEnabled) continue;
                    LOG.debug((Object)("streaming:  for srcId=" + id + " and lKey=" + lpKey + " found pKey=" + pKey + " and buf=" + (null == buf ? "null" : Integer.valueOf(buf.hashCode()))));
                }
            }
        }

        @Override
        public StreamEventsResult streamEvents(boolean streamFromLatestScn, int batchFetchSize, WritableByteChannel writeChannel, Encoding encoding, DbusFilter filter) throws ScnNotFoundException, BufferNotFoundException, OffsetNotFoundException {
            long startTimeTs = System.nanoTime();
            int numEventsStreamed = 0;
            int batchFetchSoFar = 0;
            DbusEventBuffer.StreamingMode mode = DbusEventBuffer.StreamingMode.WINDOW_AT_TIME;
            SizeControlledWritableByteChannel ch = new SizeControlledWritableByteChannel(writeChannel);
            boolean debugEnabled = LOG.isDebugEnabled();
            NavigableSet<PhysicalPartitionKey> workingSet = this._pKeys;
            PhysicalPartition partialWindowPartition = this._checkPoints.getPartialWindowPartition();
            if (partialWindowPartition != null) {
                workingSet = this._pKeys.tailSet(new PhysicalPartitionKey(partialWindowPartition), true);
                if (workingSet.size() == 0) {
                    throw new OffsetNotFoundException("Partial window offset not found" + partialWindowPartition);
                }
            } else {
                PhysicalPartitionKey ppKey;
                workingSet = this._pKeys;
                PhysicalPartition cursorPartition = this._checkPoints.getCursorPartition();
                if (cursorPartition != null && ((workingSet = this._pKeys.tailSet(ppKey = new PhysicalPartitionKey(cursorPartition), false)).isEmpty() || !this._pKeys.contains(ppKey))) {
                    workingSet = this._pKeys;
                }
            }
            HashSet<PhysicalPartitionKey> streamFromLatestState = new HashSet<PhysicalPartitionKey>();
            int minPendingEventSize = 0;
            boolean done = false;
            while (!done) {
                boolean somethingStreamed = false;
                for (PhysicalPartitionKey pKey : workingSet) {
                    DbusEventBuffer buf = (DbusEventBuffer)DbusEventBufferMult.this._bufsMap.get(pKey);
                    if (null == buf) {
                        String errMsg = "Buffer not found for physicalPartitionKey " + pKey;
                        LOG.error((Object)errMsg);
                        throw new BufferNotFoundException(errMsg);
                    }
                    PhysicalPartition pPartition = pKey.getPhysicalPartition();
                    DbusEventsStatisticsCollector statsCollector = this._statsCollectors == null ? null : this._statsCollectors.getStatsCollector(pPartition.toSimpleString());
                    Checkpoint cp = null;
                    cp = this._checkPoints.getCheckpoint(pKey.getPhysicalPartition());
                    if (debugEnabled) {
                        LOG.debug((Object)("get Checkpoint by pPartition" + pPartition + ";cp=" + cp));
                    }
                    if (cp == null) {
                        cp = new Checkpoint();
                        cp.setFlexible();
                        this._checkPoints.addCheckpoint(pPartition, cp);
                    }
                    if (this._pKeys.size() == 1) {
                        mode = DbusEventBuffer.StreamingMode.CONTINUOUS;
                    }
                    StreamEventsArgs args = new StreamEventsArgs(batchFetchSize - batchFetchSoFar);
                    boolean streamFromLatestScnForPartition = this.computeStreamFromLatestScnForPartition(pKey, streamFromLatestState, streamFromLatestScn);
                    args.setEncoding(encoding).setStreamFromLatestScn(streamFromLatestScnForPartition);
                    args.setSMode(mode).setFilter(filter).setStatsCollector(statsCollector).setMaxClientEventVersion(this._clientEventVersion);
                    StreamEventsResult result = buf.streamEvents(cp, ch, args);
                    int numEvents = result.getNumEventsStreamed();
                    if (numEvents == 0 && result.getSizeOfPendingEvent() > 0) {
                        if (minPendingEventSize == 0) {
                            minPendingEventSize = result.getSizeOfPendingEvent();
                        } else if (result.getSizeOfPendingEvent() < minPendingEventSize) {
                            minPendingEventSize = result.getSizeOfPendingEvent();
                        }
                    }
                    if (numEvents > 0) {
                        somethingStreamed = true;
                    }
                    numEventsStreamed += numEvents;
                    batchFetchSoFar = ch.writtenSoFar();
                    if (debugEnabled) {
                        LOG.debug((Object)("one iteration:  read " + numEvents + " from buf " + buf.hashCode() + "; read so far " + batchFetchSoFar + "(out of " + batchFetchSize + ")"));
                    }
                    this._checkPoints.addCheckpoint(pPartition, cp);
                    if (cp.getWindowOffset() < 0L) {
                        this._checkPoints.setCursorPartition(pPartition);
                    }
                    if (batchFetchSoFar < batchFetchSize) continue;
                    break;
                }
                if (batchFetchSoFar >= batchFetchSize) {
                    done = true;
                } else if (!somethingStreamed && workingSet.size() == this._pKeys.size()) {
                    done = true;
                }
                workingSet = this._pKeys;
            }
            long endTimeTs = System.nanoTime();
            if (PERF_LOG.isDebugEnabled()) {
                PERF_LOG.debug((Object)("streamEvents took: " + (double)(endTimeTs - startTimeTs) / 1000000.0 + "ms"));
            }
            return new StreamEventsResult(numEventsStreamed, minPendingEventSize > 0 ? minPendingEventSize : 0);
        }

        @Override
        public CheckpointMult getCheckpointMult() {
            return this._checkPoints;
        }

        @Override
        public void setClientMaxEventVersion(int version) {
            this._clientEventVersion = version;
        }

        protected boolean computeStreamFromLatestScnForPartition(PhysicalPartitionKey pKey, Set<PhysicalPartitionKey> ppList, boolean streamFromLatestScn) {
            if (!streamFromLatestScn) {
                return false;
            }
            if (pKey == null || ppList.contains(pKey)) {
                return false;
            }
            if (!ppList.contains(pKey)) {
                ppList.add(pKey);
            }
            return true;
        }
    }

    public static class SizeControlledWritableByteChannel
    implements WritableByteChannel {
        private final WritableByteChannel _channel;
        private int _totalWritten;

        public SizeControlledWritableByteChannel(WritableByteChannel ch) {
            this._channel = ch;
            this._totalWritten = 0;
        }

        public int writtenSoFar() {
            return this._totalWritten;
        }

        @Override
        public boolean isOpen() {
            return this._channel.isOpen();
        }

        @Override
        public void close() throws IOException {
            this._channel.close();
        }

        @Override
        public int write(ByteBuffer src) throws IOException {
            int written = this._channel.write(src);
            this._totalWritten += written;
            return written;
        }
    }
}

