/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.bootstrap.utils;

import com.linkedin.databus.bootstrap.utils.BootstrapEventBuffer;
import com.linkedin.databus.bootstrap.utils.DbusSeederBaseThread;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.util.ConfigBuilder;
import com.linkedin.databus.core.util.InvalidConfigException;
import com.linkedin.databus.core.util.RateMonitor;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.db.EventReaderSummary;
import com.linkedin.databus2.producers.db.OracleTriggerMonitoredSourceInfo;
import com.linkedin.databus2.producers.db.ReadEventCycleSummary;
import com.linkedin.databus2.producers.db.SourceDBEventReader;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.log4j.Logger;

public class BootstrapAvroFileEventReader
extends DbusSeederBaseThread
implements SourceDBEventReader {
    public static final Logger LOG = Logger.getLogger((String)BootstrapAvroFileEventReader.class.getName());
    private static final long MILLISEC_TO_MIN = 60000L;
    private StaticConfig _config;
    private BootstrapEventBuffer _bootstrapEventBuffer;
    private List<OracleTriggerMonitoredSourceInfo> _sources;
    private final Map<String, Long> _lastRows;

    public BootstrapAvroFileEventReader(StaticConfig config, List<OracleTriggerMonitoredSourceInfo> sources, Map<String, Long> lastRows, BootstrapEventBuffer bootstrapEventBuffer) {
        super("BootstrapAvroFileEventReader");
        this._config = config;
        this._sources = sources;
        this._lastRows = new HashMap<String, Long>(lastRows);
        this._bootstrapEventBuffer = bootstrapEventBuffer;
    }

    @Override
    public void run() {
        try {
            this.readEventsFromAllSources(0L);
        }
        catch (Exception ex) {
            LOG.error((Object)"Got Error when executing readEventsFromAllSources !!", (Throwable)ex);
        }
        LOG.info((Object)(Thread.currentThread().getName() + " done seeding ||"));
    }

    public ReadEventCycleSummary readEventsFromAllSources(long sinceSCN) throws DatabusException, EventCreationException, UnsupportedKeyException {
        ArrayList<EventReaderSummary> summaries = new ArrayList<EventReaderSummary>();
        boolean error = false;
        long startTS = System.currentTimeMillis();
        long endScn = -1L;
        long minScn = Long.MAX_VALUE;
        try {
            for (OracleTriggerMonitoredSourceInfo sourceInfo : this._sources) {
                endScn = this._config.getSeedWindowSCNMap().get(sourceInfo.getEventView());
                minScn = Math.min(endScn, minScn);
                LOG.info((Object)("Bootstrapping " + sourceInfo.getEventView()));
                this._bootstrapEventBuffer.start(endScn);
                String dir = this._config.getAvroSeedInputDirMap().get(sourceInfo.getEventView());
                File d = new File(dir);
                EventReaderSummary summary = this.readEventsFromHadoopFiles(sourceInfo, d, endScn);
                this._bootstrapEventBuffer.endEvents(-2L, endScn, null);
                summaries.add(summary);
            }
        }
        catch (Exception ex) {
            error = true;
            throw new DatabusException((Throwable)ex);
        }
        finally {
            if (error) {
                this._bootstrapEventBuffer.endEvents(-3L, endScn, null);
                LOG.error((Object)"Seeder stopping unexpectedly !!");
            } else {
                this._bootstrapEventBuffer.endEvents(-1L, endScn, null);
                LOG.info((Object)"Completed Seeding !!");
            }
        }
        LOG.info((Object)("Start SCN :" + minScn));
        long endTS = System.currentTimeMillis();
        ReadEventCycleSummary cycleSummary = new ReadEventCycleSummary("seeder", summaries, minScn, endTS - startTS);
        return cycleSummary;
    }

    private EventReaderSummary readEventsFromHadoopFiles(OracleTriggerMonitoredSourceInfo sourceInfo, File avroSeedDir, Long windowSCN) {
        long timestamp;
        DataFileReader reader = null;
        File[] files = avroSeedDir.listFiles();
        List<File> fileList = Arrays.asList(files);
        Collections.sort(fileList);
        long numRead = 0L;
        long prevNumRead = 0L;
        long numBytes = 0L;
        long timeStart = timestamp = System.currentTimeMillis();
        long lastTime = timestamp;
        long commitInterval = this._config.getCommitInterval();
        long totLatency = 0L;
        GenericRecord record = null;
        RateMonitor seedingRate = new RateMonitor("Seeding Rate");
        seedingRate.start();
        seedingRate.suspend();
        long startRowId = this._lastRows.get(sourceInfo.getEventView());
        LOG.info((Object)("Last Known Row Id is :" + startRowId));
        boolean resumeSeedingRate = true;
        for (File avroSeedFile : files) {
            if (!avroSeedFile.isFile()) continue;
            LOG.info((Object)("Seeding from File : " + avroSeedFile));
            try {
                reader = new DataFileReader(avroSeedFile, (DatumReader)new GenericDatumReader());
            }
            catch (IOException e) {
                LOG.fatal((Object)("Failed to bootstrap from file " + avroSeedFile.getAbsolutePath()), (Throwable)e);
                throw new RuntimeException("Failed to bootstrap from file " + avroSeedFile.getAbsolutePath(), e);
            }
            try {
                boolean committed = false;
                Iterator i$ = reader.iterator();
                while (i$.hasNext()) {
                    GenericRecord hdfsRecord;
                    record = hdfsRecord = (GenericRecord)i$.next();
                    committed = false;
                    if (++numRead < startRowId) continue;
                    if (resumeSeedingRate) {
                        seedingRate.resume();
                        resumeSeedingRate = false;
                    }
                    seedingRate.tick();
                    long start = System.nanoTime();
                    long eventSize = sourceInfo.getFactory().createAndAppendEvent(windowSCN.longValue(), timestamp, hdfsRecord, (DbusEventBufferAppendable)this._bootstrapEventBuffer, false, null);
                    numBytes += eventSize;
                    long latency = System.nanoTime() - start;
                    totLatency += latency;
                    if (numRead % commitInterval != 0L) continue;
                    this._bootstrapEventBuffer.endEvents(numRead, timestamp, null);
                    this._bootstrapEventBuffer.startEvents();
                    long procTime = totLatency / 1000000000L;
                    long currTime = System.currentTimeMillis();
                    long diff = (currTime - lastTime) / 1000L;
                    long timeSinceStart = (currTime - timeStart) / 1000L;
                    LOG.info((Object)("Processed " + commitInterval + " rows in " + diff + " seconds, Avro Processing Time (seconds) so far :" + procTime + ",Seconds elapsed since start :" + timeSinceStart + ",Overall Row Rate:" + seedingRate.getRate() + ", NumRows Fetched so far:" + numRead + ". TotalEventSize :" + numBytes));
                    lastTime = currTime;
                    seedingRate.resume();
                    committed = true;
                }
                if (!committed) {
                    this._bootstrapEventBuffer.endEvents(numRead, timestamp, null);
                    this._bootstrapEventBuffer.startEvents();
                    long procTime = totLatency / 1000000000L;
                    long currTime = System.currentTimeMillis();
                    long diff = (currTime - lastTime) / 1000L;
                    long timeSinceStart = (currTime - timeStart) / 1000L;
                    LOG.info((Object)("Completed Seeding from : " + avroSeedFile + ", Processed " + commitInterval + " rows in " + diff + " seconds, Avro Processing Time (seconds) so far :" + procTime + ",Seconds elapsed since start :" + timeSinceStart + ",Overall Row Rate:" + seedingRate.getRate() + ", NumRows Fetched so far:" + numRead + ". TotalEventSize :" + numBytes));
                    lastTime = currTime;
                    seedingRate.resume();
                }
            }
            catch (Exception e) {
                LOG.fatal((Object)("NumRead :" + numRead + ", Got Exception while processing generic record :" + record), (Throwable)e);
                throw new RuntimeException(e);
            }
            LOG.info((Object)("Processed " + (numRead - prevNumRead) + " rows of Source: " + sourceInfo.getSourceName() + " from file " + avroSeedFile));
            prevNumRead = numRead;
        }
        long timeEnd = System.currentTimeMillis();
        long elapsedMin = (timeEnd - timeStart) / 60000L;
        LOG.info((Object)("Processed " + numRead + " rows of Source: " + sourceInfo.getSourceName() + " in " + elapsedMin + " minutes"));
        return new EventReaderSummary(sourceInfo.getSourceId(), sourceInfo.getSourceName(), -1L, (int)numRead, numBytes, timeEnd - timeStart, (timeEnd - timeStart) / numRead, 0L, 0L, 0L);
    }

    public List<OracleTriggerMonitoredSourceInfo> getSources() {
        return this._sources;
    }

    public Map<String, String> getPKeyNameMap() {
        return this._config.getPKeyNameMap();
    }

    public static class Config
    implements ConfigBuilder<StaticConfig> {
        private static final int DEFAULT_COMMIT_INTERVAL = 10000;
        private static final String DEFAULT_AVRO_SEED_INPUT_FILE = "DEFAULT_FILE_NAME";
        private static final Long DEFAULT_WINDOW_SCN = -1L;
        private static final String DEFAULT_PKEY_NAME = "key";
        private HashMap<String, String> avroSeedInputDirMap = new HashMap();
        private int commitInterval = 10000;
        private HashMap<String, Long> seedWindowSCNMap = new HashMap();
        private Map<String, String> pKeyNameMap = new HashMap<String, String>();

        public Long getSeedWindowSCN(String sourceName) {
            Long scn = this.seedWindowSCNMap.get(sourceName);
            if (null == scn) {
                this.seedWindowSCNMap.put(sourceName, DEFAULT_WINDOW_SCN);
                return DEFAULT_WINDOW_SCN;
            }
            return scn;
        }

        public String getAvroSeedInputDir(String sourceName) {
            String file = this.avroSeedInputDirMap.get(sourceName);
            if (null == file) {
                this.avroSeedInputDirMap.put(sourceName, DEFAULT_AVRO_SEED_INPUT_FILE);
                return DEFAULT_AVRO_SEED_INPUT_FILE;
            }
            return file;
        }

        public void setSeedWindowSCN(String sourceName, Long scn) {
            this.seedWindowSCNMap.put(sourceName, scn);
        }

        public void setAvroSeedInputDir(String sourceName, String file) {
            this.avroSeedInputDirMap.put(sourceName, file);
        }

        public int getCommitInterval() {
            return this.commitInterval;
        }

        public void setCommitInterval(int commitInterval) {
            this.commitInterval = commitInterval;
        }

        public String getPKeyName(String srcName) {
            String key = this.pKeyNameMap.get(srcName);
            if (null == key) {
                this.pKeyNameMap.put(srcName, DEFAULT_PKEY_NAME);
                return DEFAULT_PKEY_NAME;
            }
            return key;
        }

        public void setPKeyName(String srcName, String key) {
            this.pKeyNameMap.put(srcName, key);
        }

        public StaticConfig build() throws InvalidConfigException {
            LOG.info((Object)("BootstrapAvroFileEventReader starting with config :" + this.toString()));
            for (String file : this.avroSeedInputDirMap.values()) {
                File f = new File(file);
                if (f.isDirectory() && f.canRead()) continue;
                LOG.error((Object)("File (" + f + ") does not exist or cannot be read !!"));
                throw new InvalidConfigException("File (" + f + ") does not exist or cannot be read !!");
            }
            return new StaticConfig(this.avroSeedInputDirMap, this.seedWindowSCNMap, this.pKeyNameMap, this.commitInterval);
        }

        public String toString() {
            return "Config [avroSeedInputDirMap=" + this.avroSeedInputDirMap + ", commitInterval=" + this.commitInterval + ", seedWindowSCNMap=" + this.seedWindowSCNMap + ", _pKeyNameMap=" + this.pKeyNameMap + "]";
        }
    }

    public static class StaticConfig {
        private final Map<String, String> avroSeedInputDirMap;
        private final Map<String, Long> seedWindowSCNMap;
        private final Map<String, String> pKeyNameMap;
        private final int commitInterval;

        public StaticConfig(Map<String, String> sourceAvroSchemaMap, Map<String, Long> seedWindowSCNMap, Map<String, String> pKeyNameMap, int commitInterval) {
            this.avroSeedInputDirMap = sourceAvroSchemaMap;
            this.seedWindowSCNMap = seedWindowSCNMap;
            this.pKeyNameMap = pKeyNameMap;
            this.commitInterval = commitInterval;
        }

        public Map<String, String> getAvroSeedInputDirMap() {
            return this.avroSeedInputDirMap;
        }

        public Map<String, Long> getSeedWindowSCNMap() {
            return this.seedWindowSCNMap;
        }

        public Map<String, String> getPKeyNameMap() {
            return this.pKeyNameMap;
        }

        public int getCommitInterval() {
            return this.commitInterval;
        }
    }
}

