/*
 * Decompiled with CFR 0.152.
 */
package org.trpr.platform.batch.impl.job.ha.service;

import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.state.ConnectionState;
import com.netflix.curator.x.discovery.ServiceCache;
import com.netflix.curator.x.discovery.ServiceDiscovery;
import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
import com.netflix.curator.x.discovery.ServiceInstance;
import com.netflix.curator.x.discovery.details.InstanceSerializer;
import com.netflix.curator.x.discovery.details.JsonInstanceSerializer;
import com.netflix.curator.x.discovery.details.ServiceCacheListener;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.trpr.platform.batch.common.JobHost;
import org.trpr.platform.batch.impl.job.ha.JobInstanceDetails;
import org.trpr.platform.batch.impl.job.ha.service.SyncServiceImpl;
import org.trpr.platform.batch.spi.spring.admin.JobConfigurationService;
import org.trpr.platform.batch.spi.spring.admin.SyncService;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.event.PlatformEventConsumer;
import org.trpr.platform.core.spi.logging.Logger;
import org.trpr.platform.model.event.PlatformEvent;
import org.trpr.platform.runtime.impl.event.BootstrapProgressMonitor;

public class CuratorJobSyncHandler
implements InitializingBean,
PlatformEventConsumer {
    private JobConfigurationService jobConfigurationService;
    private CuratorFramework curatorFramework;
    private SyncService syncService;
    private ServiceDiscovery<JobInstanceDetails> serviceDiscovery = null;
    private Map<String, ServiceCache<JobInstanceDetails>> serviceCacheMap;
    private static final String ZK_DEP_PATH_PREFIX = "/Batch/Deployment";
    private static final Logger LOGGER = LogFactory.getLogger(CuratorJobSyncHandler.class);
    private BootstrapProgressMonitor bootstrapProgressMonitor;

    @Autowired
    public CuratorJobSyncHandler(JobConfigurationService jobConfigurationService, CuratorFramework curatorFramework, BootstrapProgressMonitor bootstrapProgressMonitor) {
        this.curatorFramework = curatorFramework;
        this.jobConfigurationService = jobConfigurationService;
        this.syncService = new SyncServiceImpl(this.jobConfigurationService);
        if (this.jobConfigurationService.getSyncService() == null) {
            this.jobConfigurationService.setSyncService(this.syncService);
        }
        JsonInstanceSerializer serializer = new JsonInstanceSerializer(JobInstanceDetails.class);
        this.serviceDiscovery = ServiceDiscoveryBuilder.builder(JobInstanceDetails.class).client(this.curatorFramework).basePath(ZK_DEP_PATH_PREFIX).serializer((InstanceSerializer)serializer).build();
        this.serviceCacheMap = new HashMap<String, ServiceCache<JobInstanceDetails>>();
        this.bootstrapProgressMonitor = bootstrapProgressMonitor;
    }

    public void updateHosts() {
        LOGGER.info("Updating list of servers");
        this.jobConfigurationService.clearJobInstances();
        try {
            Collection serviceNames = this.serviceDiscovery.queryForNames();
            for (String serviceName : this.serviceCacheMap.keySet()) {
                if (serviceNames.contains(serviceName)) continue;
                this.serviceCacheMap.get(serviceName).close();
                this.serviceCacheMap.remove(serviceName);
            }
            for (String serviceName : serviceNames) {
                if (!this.serviceCacheMap.containsKey(serviceName)) {
                    ServiceCache sc = this.serviceDiscovery.serviceCacheBuilder().name(serviceName).build();
                    sc.start();
                    sc.addListener((Object)new ServiceListner());
                    this.serviceCacheMap.put(serviceName, (ServiceCache<JobInstanceDetails>)sc);
                    LOGGER.info("Job: " + serviceName + "'s Cache listener removed.");
                }
                List instances = this.serviceCacheMap.get(serviceName).getInstances();
                for (ServiceInstance instance : instances) {
                    JobHost instHost = new JobHost(((JobInstanceDetails)instance.getPayload()).getHostName(), instance.getAddress(), instance.getPort().intValue());
                    this.jobConfigurationService.addJobInstance(serviceName, instHost);
                }
            }
        }
        catch (Exception e) {
            LOGGER.error("Error while updating server list", (Throwable)e);
        }
    }

    public synchronized void addJobInstance(String jobName) {
        JobHost currentHost = this.jobConfigurationService.getCurrentHostName();
        try {
            boolean isRegistered = false;
            LOGGER.info("Querying zookepper to see if already there is an entry ");
            Collection serviceNames = this.serviceDiscovery.queryForNames();
            if (serviceNames.contains(jobName)) {
                for (ServiceInstance serviceInstance : this.serviceDiscovery.queryForInstances(jobName)) {
                    if (!serviceInstance.getAddress().equals(currentHost.getIP()) || serviceInstance.getPort().intValue() != currentHost.getPort()) continue;
                    isRegistered = true;
                    break;
                }
            }
            if (!isRegistered) {
                this.serviceDiscovery.registerService(ServiceInstance.builder().name(jobName).address(currentHost.getIP()).port(currentHost.getPort()).payload((Object)new JobInstanceDetails(currentHost.getHostName())).build());
                LOGGER.info("Registering " + jobName + " to " + currentHost.getAddress());
            }
            if (!this.serviceCacheMap.containsKey(jobName)) {
                ServiceCache sc = this.serviceDiscovery.serviceCacheBuilder().name(jobName).build();
                sc.addListener((Object)new ServiceListner());
                sc.start();
                this.serviceCacheMap.put(jobName, (ServiceCache<JobInstanceDetails>)sc);
            }
            LOGGER.info("Added listener");
        }
        catch (Exception e) {
            LOGGER.error("Exception while adding job instance", (Throwable)e);
        }
        this.updateHosts();
    }

    protected void finalize() throws Throwable {
        for (String serviceName : this.serviceCacheMap.keySet()) {
            this.serviceCacheMap.get(serviceName).close();
        }
        this.serviceDiscovery.close();
        super.finalize();
    }

    private void sendPullRequests() {
        try {
            long minTime = Long.MAX_VALUE;
            ServiceInstance minInst = null;
            for (String serviceName : this.serviceDiscovery.queryForNames()) {
                for (ServiceInstance inst : this.serviceDiscovery.queryForInstances(serviceName)) {
                    if (inst.getRegistrationTimeUTC() >= minTime) continue;
                    minInst = inst;
                    minTime = inst.getRegistrationTimeUTC();
                }
            }
            LOGGER.info("The oldest host is: " + minInst);
            if (minInst != null) {
                JobHost instHost = new JobHost(((JobInstanceDetails)minInst.getPayload()).getHostName(), minInst.getAddress(), minInst.getPort().intValue());
                if (!this.jobConfigurationService.getCurrentHostName().equals((Object)instHost)) {
                    LOGGER.info("Sending pull request to: " + minInst.getAddress() + ":" + minInst.getPort());
                    this.syncService.pullRequest(minInst.getAddress() + ":" + minInst.getPort());
                }
            }
        }
        catch (Exception e) {
            LOGGER.error("Exception while finding the oldest host and sending the pull request", (Throwable)e);
        }
        this.syncService.syncAllHosts();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void onApplicationEvent(ApplicationEvent event) {
        PlatformEvent platformEvent;
        if (!(event.getSource() instanceof PlatformEvent) || (platformEvent = (PlatformEvent)event.getSource()).getEventType() == null || !platformEvent.getEventType().equalsIgnoreCase("BootstrapMonitoredEvent")) return;
        Class<BootstrapProgressMonitor> clazz = BootstrapProgressMonitor.class;
        synchronized (BootstrapProgressMonitor.class) {
            if (platformEvent.getEventStatus() == null || !platformEvent.getEventStatus().equalsIgnoreCase("started")) return;
            LOGGER.info("Finding oldest host and Sending pull requests");
            this.sendPullRequests();
            // ** MonitorExit[var3_3] (shouldn't be in output)
            return;
        }
    }

    public void afterPropertiesSet() throws Exception {
        this.bootstrapProgressMonitor.addBootstrapEventListener((PlatformEventConsumer)this);
    }

    private class ServiceListner
    implements ServiceCacheListener {
        private ServiceListner() {
        }

        public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
            LOGGER.info("State changed");
        }

        public void cacheChanged() {
            LOGGER.info("Curator Cache changed");
            CuratorJobSyncHandler.this.updateHosts();
        }
    }
}

