/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client;

import com.linkedin.databus.client.ConnectionState;
import com.linkedin.databus.client.ConnectionStateFactory;
import com.linkedin.databus.client.ConnectionStateMessage;
import com.linkedin.databus.client.DatabusSourcesConnection;
import com.linkedin.databus.client.ServerSetChangeMessage;
import com.linkedin.databus.client.pub.ServerInfo;
import com.linkedin.databus.client.pub.mbean.UnifiedClientStats;
import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.DbusEventBuffer;
import com.linkedin.databus.core.DbusEventFactory;
import com.linkedin.databus.core.async.AbstractActorMessageQueue;
import com.linkedin.databus.core.async.LifecycleMessage;
import com.linkedin.databus2.core.BackoffTimerStaticConfig;
import com.linkedin.databus2.core.mbean.DatabusReadOnlyStatus;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import javax.management.MBeanServer;
import org.apache.log4j.Logger;

public abstract class BasePullThread
extends AbstractActorMessageQueue {
    protected Set<ServerInfo> _servers;
    protected int _curServerIdx = -1;
    protected ServerInfo _curServer = null;
    protected final ConnectionState _currentState;
    protected final DatabusComponentStatus _status;
    protected final DatabusReadOnlyStatus _statusMbean;
    protected DatabusSourcesConnection _sourcesConn;
    private final DbusEventFactory _eventFactory;
    private boolean tearConnAfterResponse = false;
    private final MBeanServer _mbeanServer;
    private final AbstractActorMessageQueue.MessageQueueFilter pickServerFilter = new PickServerEnqueueFilter();

    public BasePullThread(String name, BackoffTimerStaticConfig pullerRetries, DatabusSourcesConnection sourcesConn, DbusEventBuffer dbusEventBuffer, ConnectionStateFactory connStateFactory, Set<ServerInfo> servers, MBeanServer mbeanServer, DbusEventFactory eventFactory, Logger log) {
        super(name, pullerRetries, sourcesConn.getConnectionConfig().isPullerMessageQueueLoggingEnabled(), log);
        this._sourcesConn = sourcesConn;
        this._currentState = connStateFactory.create(dbusEventBuffer);
        this._servers = null != servers ? new TreeSet<ServerInfo>(servers) : new TreeSet<ServerInfo>();
        this._eventFactory = eventFactory;
        this._mbeanServer = mbeanServer;
        this._status = new DatabusComponentStatus(name, pullerRetries);
        this._statusMbean = new DatabusReadOnlyStatus(this.getName(), this._status, -1L);
        this._statusMbean.registerAsMbean(this._mbeanServer);
        this.resetServerRetries();
    }

    protected boolean executeAndChangeState(Object message) {
        boolean success = true;
        if (message instanceof ServerSetChangeMessage) {
            ServerSetChangeMessage serverSetChangeMsg = (ServerSetChangeMessage)message;
            switch (serverSetChangeMsg.getTypeId()) {
                case SET_SERVERS: {
                    this.doSetServers(serverSetChangeMsg);
                    break;
                }
                case ADD_SERVER: {
                    this.doAddServer(serverSetChangeMsg);
                    break;
                }
                case REMOVE_SERVER: {
                    this.doRemoveServer(serverSetChangeMsg);
                    break;
                }
                default: {
                    this._log.error((Object)("Unkown ServerSetChangeMessage in ServerPullThread: " + (Object)((Object)serverSetChangeMsg.getTypeId())));
                    success = false;
                }
            }
            if (success && this._componentStatus.getStatus() == DatabusComponentStatus.Status.SUSPENDED_ON_ERROR) {
                this.enqueueMessage(LifecycleMessage.createResumeMessage());
            }
        } else {
            return super.executeAndChangeState(message);
        }
        return success;
    }

    private void doRemoveServer(ServerSetChangeMessage serverSetChangeMsg) {
        ServerInfo newServer = serverSetChangeMsg.getServer();
        this._log.info((Object)("About to remove Server (" + newServer + ") from Server set. Current Server set is :" + this._servers));
        if (null == newServer) {
            this._log.error((Object)"No Server to remove");
            return;
        }
        if (!this._servers.contains(newServer)) {
            this._log.warn((Object)("Trying to remove a Server that does not exist:" + newServer.toString()));
        } else {
            this._log.info((Object)("Removing Server: " + newServer.toString()));
            Iterator<ServerInfo> iter = this._servers.iterator();
            int index = 0;
            while (iter.hasNext() && !newServer.equals((Object)iter.next())) {
                ++index;
            }
            if (index < this._curServerIdx) {
                --this._curServerIdx;
            } else if (index == this._curServerIdx) {
                this._log.info((Object)"Trying to remove the active Server !!");
                this.handleServerSwitch();
                this._curServerIdx = -1;
                this._curServer = null;
            }
            this._servers.remove(newServer);
        }
    }

    protected void doSetServers(ServerSetChangeMessage serverSetChangeMsg) {
        TreeSet<ServerInfo> ServerSet;
        TreeSet<ServerInfo> treeSet = ServerSet = null == serverSetChangeMsg.getServerSet() ? null : new TreeSet<ServerInfo>(serverSetChangeMsg.getServerSet());
        if (ServerSet != null && this._servers != null && ((Object)this._servers).equals(ServerSet)) {
            this._log.info((Object)("doSetServers : Both old set and new set is same. Skipping this message. ServerSet is :" + ServerSet));
            return;
        }
        boolean tearConnection = this._curServer != null && (null == ServerSet || !ServerSet.contains(this._curServer));
        this._log.info((Object)("About to change Server set. Old Server set was :" + this._servers + ", New Server Set is :" + ServerSet));
        this._servers.clear();
        if (tearConnection) {
            this.handleServerSwitch();
            if (null != ServerSet) {
                this._servers.addAll(ServerSet);
            }
        } else {
            this._servers.addAll(ServerSet);
            if (this._curServer != null) {
                Iterator iter = ServerSet.iterator();
                int index = 0;
                while (iter.hasNext() && !this._curServer.equals(iter.next())) {
                    ++index;
                }
                this._curServerIdx = index;
            } else {
                this._curServerIdx = -1;
                this._curServer = null;
            }
        }
        this.resetServerRetries();
    }

    private void doAddServer(ServerSetChangeMessage serverSetChangeMsg) {
        ServerInfo newServer = serverSetChangeMsg.getServer();
        this._log.info((Object)("About to add new Server (" + newServer + ") to Server set. Current Server set is :" + this._servers));
        if (null == newServer) {
            this._log.error((Object)"No new Server to add");
            return;
        }
        if (this._servers.contains(newServer)) {
            this._log.warn((Object)("Server already exists:" + newServer.toString() + " Skipping this addition !!"));
        } else {
            this._log.info((Object)("Adding new Server: " + newServer.toString()));
            this._servers.add(newServer);
        }
        this.resetServerRetries();
    }

    protected void resetServerRetries() {
        this._status.resume();
    }

    public void shutdown() {
        if (this._statusMbean != null) {
            this._statusMbean.unregisterMbean(this._mbeanServer);
            this._log.info((Object)"mbean unregistered");
        }
        super.shutdown();
    }

    protected void backoffOnPullError() {
        if (this._status.isRunningStatus()) {
            this._status.retryOnError("pull error");
        } else {
            this._status.retryOnLastError();
        }
    }

    protected boolean shouldRetainMessageOnPause(Object msg) {
        if (msg instanceof ServerSetChangeMessage) {
            return true;
        }
        return super.shouldRetainMessageOnPause(msg);
    }

    protected boolean shouldRetainMessageOnSuspend(Object msg) {
        if (msg instanceof ServerSetChangeMessage) {
            return true;
        }
        return super.shouldRetainMessageOnPause(msg);
    }

    protected abstract boolean shouldDelayTearConnection(ConnectionState.StateId var1);

    protected abstract void resetConnection();

    protected void handleServerSwitch() {
        boolean delayTear = this.shouldDelayTearConnection(this._currentState.getStateId());
        if (!delayTear) {
            this.tearConnection();
            DatabusComponentStatus.Status currStatus = this._status.getStatus();
            if (currStatus != DatabusComponentStatus.Status.PAUSED && currStatus != DatabusComponentStatus.Status.SUSPENDED_ON_ERROR && this._currentState.getStateId() != ConnectionState.StateId.INITIAL) {
                this.enqueuePickServer(this._currentState);
            }
        } else {
            this.tearConnAfterResponse = true;
        }
    }

    protected void resetConnectionAndSetFlag() {
        this.resetConnection();
        this.tearConnAfterResponse = false;
    }

    protected void killConnection() {
        this._currentState.getRelayConnection().close();
    }

    protected void tearConnection() {
        this.resetConnectionAndSetFlag();
        this._curServer = null;
        this._curServerIdx = -1;
    }

    protected void tearConnectionAndEnqueuePickServer() {
        this.tearConnection();
        this.enqueuePickServer(this._currentState);
    }

    protected boolean toTearConnAfterHandlingResponse() {
        return this.tearConnAfterResponse;
    }

    protected void enqueuePickServer(ConnectionState connState) {
        connState.switchToPickServer();
        this.enqueueMessageAfterFilter(connState, this.pickServerFilter);
    }

    public Set<ServerInfo> getServers() {
        return this._servers;
    }

    public int getCurrentServerIdx() {
        return this._curServerIdx;
    }

    public ServerInfo getCurentServer() {
        return this._curServer;
    }

    public ConnectionState getConnectionState() {
        return this._currentState;
    }

    public DatabusSourcesConnection getSourcesConnection() {
        return this._sourcesConn;
    }

    public void setSourcesConnection(DatabusSourcesConnection conn) {
        this._sourcesConn = conn;
    }

    protected Object preEnqueue(Object message) {
        Object ret = message;
        if (message instanceof ConnectionState) {
            ConnectionState state = (ConnectionState)message;
            ConnectionStateMessage stateMsg = new ConnectionStateMessage(state.getStateId(), state);
            ret = stateMsg;
        }
        return ret;
    }

    protected Logger getLog() {
        return this._log;
    }

    protected DbusEventFactory getEventFactory() {
        return this._eventFactory;
    }

    protected static void sendHeartbeat(UnifiedClientStats unifiedClientStats) {
        BasePullThread.sendHeartbeat(unifiedClientStats, System.currentTimeMillis());
    }

    protected static void sendHeartbeat(UnifiedClientStats unifiedClientStats, long timestampMs) {
        if (unifiedClientStats != null) {
            unifiedClientStats.setHeartbeatTimestamp(timestampMs);
        }
    }

    private class PickServerEnqueueFilter
    implements AbstractActorMessageQueue.MessageQueueFilter {
        private PickServerEnqueueFilter() {
        }

        public boolean shouldRetain(Object msg) {
            return BasePullThread.this.shouldRetainMessageOnPause(msg);
        }
    }
}

