/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.bootstrap.common;

import com.linkedin.databus.bootstrap.common.BootstrapCleanerStaticConfig;
import com.linkedin.databus.bootstrap.common.BootstrapDBSingleSourceCleaner;
import com.linkedin.databus.bootstrap.common.BootstrapReadOnlyConfig;
import com.linkedin.databus.core.DatabusThreadBase;
import com.linkedin.databus.core.util.NamedThreadFactory;
import com.linkedin.databus2.core.DatabusException;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

public class BootstrapDBCleaner {
    public static final String MODULE = BootstrapDBCleaner.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final long TERMINATION_TIMEOUT_IN_MS = 10000L;
    public BootstrapCleanerStaticConfig _cleanerConfig;
    public BootstrapReadOnlyConfig _bootstrapConfig;
    private Map<String, DatabusThreadBase> _appliers;
    private final Map<String, BootstrapDBSingleSourceCleaner> _cleaners;
    private final ExecutorService _cleanerThreadPoolService;
    private final Map<String, Future<?>> _cleanerFutures;

    public BootstrapDBCleaner(String name, BootstrapCleanerStaticConfig config, BootstrapReadOnlyConfig bootstrapConfig, Map<String, DatabusThreadBase> appliers, List<String> sources) throws SQLException {
        this._cleanerConfig = config;
        this._bootstrapConfig = bootstrapConfig;
        this._appliers = null != appliers ? appliers : new HashMap<String, DatabusThreadBase>();
        this._cleaners = new HashMap<String, BootstrapDBSingleSourceCleaner>();
        if (null != sources) {
            for (String source : sources) {
                String perSourceName = name + "_" + source;
                DatabusThreadBase perSourceApplier = this._appliers.get(source);
                BootstrapDBSingleSourceCleaner cleaner = new BootstrapDBSingleSourceCleaner(perSourceName, source, perSourceApplier, config, bootstrapConfig);
                this._cleaners.put(source, cleaner);
            }
        }
        NamedThreadFactory tf = new NamedThreadFactory(name);
        this._cleanerThreadPoolService = Executors.newCachedThreadPool((ThreadFactory)tf);
        this._cleanerFutures = new HashMap();
        LOG.info((Object)("Cleaner Config is :" + this._cleanerConfig));
    }

    public synchronized void doClean() {
        for (Map.Entry<String, BootstrapDBSingleSourceCleaner> entry : this._cleaners.entrySet()) {
            String source = entry.getKey();
            BootstrapDBSingleSourceCleaner singleSourceCleaner = entry.getValue();
            Future<?> c = this._cleanerFutures.get(source);
            if (c != null && !c.isDone()) {
                LOG.info((Object)("Skipping running cleaner as it is already running for source = " + source));
                continue;
            }
            LOG.info((Object)("Submitting a cleaner task for source = " + source));
            Future<?> cleaner = this._cleanerThreadPoolService.submit(singleSourceCleaner);
            this._cleanerFutures.put(source, cleaner);
        }
    }

    public synchronized boolean isAnyCleanerRunning() {
        for (Map.Entry<String, Future<?>> entry : this._cleanerFutures.entrySet()) {
            Future<?> cleanerFuture = entry.getValue();
            if (cleanerFuture.isDone()) continue;
            LOG.debug((Object)("Cleaner process is running for source = " + entry.getKey()));
            return true;
        }
        LOG.info((Object)"There are no cleaner processes running");
        return false;
    }

    public synchronized void sleepTillNoCleanerIsRunning() throws DatabusException, InterruptedException {
        long maxWaitTime = 10000L;
        long waitTime = 0L;
        while (this.isAnyCleanerRunning()) {
            if (waitTime >= 10000L) {
                throw new DatabusException("The cleaners have not terminated within 10000 ms");
            }
            long sleepIntervalInMs = 100L;
            Thread.sleep(100L);
            waitTime += 100L;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        List<Runnable> incompleteCleaners = this._cleanerThreadPoolService.shutdownNow();
        if (incompleteCleaners.size() > 0) {
            LOG.info((Object)("Number of cleaners that have not completed = " + incompleteCleaners.size()));
            LOG.info((Object)"Printing out sources for which cleaners what not completed ");
            for (Runnable r : incompleteCleaners) {
                BootstrapDBSingleSourceCleaner bsc = (BootstrapDBSingleSourceCleaner)r;
                LOG.error((Object)bsc.getName());
            }
        }
        try {
            boolean hasTerminated = this._cleanerThreadPoolService.awaitTermination(10000L, TimeUnit.MILLISECONDS);
            LOG.info((Object)("Result of terminating cleaner thread pool service: " + (hasTerminated ? "success" : "failure")));
        }
        catch (InterruptedException e) {
            LOG.error((Object)"Cleaner thread pool service termination has been interrupted", (Throwable)e);
        }
        finally {
            for (Map.Entry<String, BootstrapDBSingleSourceCleaner> entry : this._cleaners.entrySet()) {
                String source = entry.getKey();
                BootstrapDBSingleSourceCleaner singleSourceCleaner = entry.getValue();
                LOG.info((Object)("Invoking close on cleaner for source = " + source));
                singleSourceCleaner.close();
            }
        }
    }
}

