/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.core.async;

import com.linkedin.databus.core.DatabusComponentStatus;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus.core.async.LifecycleMessage;
import com.linkedin.databus2.core.BackoffTimerStaticConfig;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import org.apache.log4j.Logger;

public abstract class AbstractActorMessageQueue
implements Runnable,
ActorMessageQueue {
    public static final String MODULE = AbstractActorMessageQueue.class.getName();
    public final Logger _log;
    public static final int MAX_QUEUED_MESSAGE_HISTORY_SIZE = 100;
    public static final int MAX_QUEUED_MESSAGES = 10;
    public static final long MESSAGE_QUEUE_POLL_TIMEOUT_MS = 100L;
    private final String _name;
    private final Queue<Object> _messageQueue = new ArrayDeque<Object>(10);
    protected final CircularFifoBuffer _messageProcessedHistory = new CircularFifoBuffer(100);
    volatile boolean _hasMessages;
    private final Lock _controlLock = new ReentrantLock(true);
    private final Condition _shutdownCondition = this._controlLock.newCondition();
    private final Condition _newStateCondition = this._controlLock.newCondition();
    private volatile LifecycleMessage _shutdownRequest = null;
    protected LifecycleMessage _currentLifecycleState;
    protected final DatabusComponentStatus _componentStatus;
    private final MessageQueueFilter pauseFilter = new DefaultPauseFilter();
    private final MessageQueueFilter suspendFilter = new DefaultSuspendFilter();
    private final MessageQueueFilter shutdownFilter = new DefaultShutdownFilter();
    private long _numEnqueuedMessages = 0L;
    private final boolean _enablePullerMessageQueueLogging;

    public AbstractActorMessageQueue(String name, BackoffTimerStaticConfig errorRetriesConf) {
        this(name, errorRetriesConf, false, null);
    }

    public AbstractActorMessageQueue(String name, BackoffTimerStaticConfig errorRetriesConf, boolean enablePullerMessageQueueLogging, Logger log) {
        this._name = name;
        this._currentLifecycleState = null;
        this._componentStatus = new DatabusComponentStatus(name, errorRetriesConf);
        this._enablePullerMessageQueueLogging = enablePullerMessageQueueLogging;
        this._hasMessages = false;
        this._log = null != log ? log : Logger.getLogger((String)MODULE);
    }

    protected AbstractActorMessageQueue(String name) {
        this(name, BackoffTimerStaticConfig.UNLIMITED_RETRIES);
    }

    protected abstract void onShutdown();

    private void performShutdown() {
        this.onShutdown();
        this._log.info((Object)(this.getName() + " shutdown."));
        this.clearQueue(this.shutdownFilter);
    }

    protected void onResume() {
    }

    public final boolean doExecuteAndChangeState(Object message) {
        boolean success = false;
        try {
            success = this.executeAndChangeState(message);
        }
        catch (RuntimeException re) {
            this._log.error((Object)"Stopping because of runtime exception :", (Throwable)re);
            success = false;
        }
        this._messageProcessedHistory.add(message);
        ++this._numEnqueuedMessages;
        if (!success) {
            this._log.info((Object)("Message Queue History (earliest first) at end:" + this.getMessageHistoryLog()));
        } else if (this._numEnqueuedMessages % 100L == 0L) {
            if (this._enablePullerMessageQueueLogging) {
                this._log.info((Object)("Message Queue History (earliest first) :" + this.getMessageHistoryLog()));
            } else if (this._log.isDebugEnabled()) {
                this._log.debug((Object)("Message Queue History (earliest first) :" + this.getMessageHistoryLog()));
            }
        }
        return success;
    }

    protected boolean executeAndChangeState(Object message) {
        boolean success = true;
        if (message instanceof LifecycleMessage) {
            LifecycleMessage lcMessage = (LifecycleMessage)message;
            switch (lcMessage.getTypeId()) {
                case START: {
                    this.doStart(lcMessage);
                    break;
                }
                case PAUSE: {
                    this.doPause(lcMessage);
                    break;
                }
                case SUSPEND_ON_ERROR: {
                    this.doSuspendOnError(lcMessage);
                    break;
                }
                case RESUME: {
                    this.doResume(lcMessage);
                    break;
                }
                case SHUTDOWN: {
                    this._log.error((Object)("Shutdown message is seen in the queue but not expected : Message :" + lcMessage));
                    success = false;
                    break;
                }
                default: {
                    this._log.error((Object)("Unknown Lifecycle message in RelayPullThread: " + (Object)((Object)lcMessage.getTypeId())));
                    success = false;
                    break;
                }
            }
        } else {
            this._log.error((Object)("Unknown message of type " + message.getClass().getName() + ": " + message.toString()));
            success = false;
        }
        return success;
    }

    protected void doResume(LifecycleMessage lcMessage) {
        this._log.info((Object)(this.getName() + ": resuming"));
        this._componentStatus.resume();
        this.onResume();
    }

    protected void doSuspendOnError(LifecycleMessage lcMessage) {
        Throwable lastError = lcMessage.getLastError();
        if (null != lastError) {
            this._log.info((Object)(this.getName() + ": suspending due to " + lastError), lastError);
        } else {
            this._log.info((Object)(this.getName() + ": suspending"));
        }
        if (this._log.isDebugEnabled()) {
            this._log.debug((Object)(" because of message: " + lcMessage.getLastError()));
        }
        this._componentStatus.suspendOnError(lcMessage.getLastError());
        this.clearQueue(this.suspendFilter);
    }

    protected void doPause(LifecycleMessage lcMessage) {
        this._log.info((Object)(this.getName() + ": pausing"));
        this._componentStatus.pause();
        this.clearQueue(this.pauseFilter);
    }

    protected void doStart(LifecycleMessage lcMessage) {
        this._log.info((Object)(this.getName() + ": starting"));
        this._componentStatus.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        boolean isDebugEnabled = this._log.isDebugEnabled();
        Object nextState = null;
        boolean running = true;
        try {
            while (running && !this.checkForShutdownRequest()) {
                nextState = this.pollNextState();
                if (null == nextState) {
                    running = false;
                    continue;
                }
                if (isDebugEnabled) {
                    this._log.debug((Object)(this.getName() + ": new state: " + nextState.toString()));
                }
                running = this.doExecuteAndChangeState(nextState);
            }
        }
        catch (Exception e) {
            this._log.error((Object)(this.getName() + ": stopping because of unhandled exception: "), (Throwable)e);
            running = false;
        }
        if (isDebugEnabled) {
            StringBuilder sb = new StringBuilder(10240);
            sb.append(this.getName());
            sb.append(": message queue at exit:");
            while (null != (nextState = this._messageQueue.poll())) {
                sb.append(nextState.toString());
                sb.append(' ');
            }
            this._log.debug((Object)sb.toString());
        }
        try {
            this.performShutdown();
        }
        finally {
            this._controlLock.lock();
            try {
                this._componentStatus.shutdown();
                this._shutdownCondition.signalAll();
            }
            finally {
                this._controlLock.unlock();
            }
            this._log.info((Object)("Message Queue History (earliest first) at shutdown:" + this.getMessageHistoryLog()));
            if (isDebugEnabled) {
                this._log.debug((Object)(this.getName() + ": exited message loop."));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void enqueueMessageAfterFilter(Object message, MessageQueueFilter filter) {
        try {
            this._controlLock.lock();
            this.clearQueue(filter);
            this.enqueueMessage(message);
        }
        finally {
            this._controlLock.unlock();
        }
    }

    protected Object preEnqueue(Object message) {
        return message;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void enqueueMessage(Object message) {
        if (null == message) {
            this._log.warn((Object)"Attempt to queue empty state");
            return;
        }
        this._controlLock.lock();
        try {
            message = this.preEnqueue(message);
            if (this._componentStatus.getStatus() == DatabusComponentStatus.Status.SHUTDOWN) {
                this._log.warn((Object)(this.getName() + ": shutdown: ignoring " + message.toString()));
            } else if (this.checkForShutdownRequest()) {
                this._log.warn((Object)(this.getName() + ": shutdown requested: ignoring " + message.toString()));
            } else if (this._componentStatus.getStatus() == DatabusComponentStatus.Status.PAUSED && !this.shouldRetainMessageOnPause(message)) {
                this._log.warn((Object)(this.getName() + ": ignoring message while paused: " + message.toString()));
            } else if (this._componentStatus.getStatus() == DatabusComponentStatus.Status.SUSPENDED_ON_ERROR && !this.shouldRetainMessageOnSuspend(message)) {
                this._log.warn((Object)(this.getName() + ": ignoring message while suspended_on_error: " + message.toString()));
            } else {
                boolean offerSuccess = this._messageQueue.offer(message);
                if (!offerSuccess) {
                    this._log.error((Object)(this.getName() + ": adding a new state failed: " + message.toString() + "; queue.size=" + this._messageQueue.size()));
                }
                if (1 == this._messageQueue.size()) {
                    this._newStateCondition.signalAll();
                }
                this._hasMessages = true;
            }
        }
        finally {
            this._controlLock.unlock();
        }
    }

    public void shutdown() {
        this._log.info((Object)(this.getName() + ": shutdown requested."));
        this._shutdownRequest = LifecycleMessage.createShutdownMessage();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void awaitShutdown() {
        this._log.info((Object)(this.getName() + ": waiting for shutdown"));
        this._controlLock.lock();
        try {
            this._log.info((Object)(this.getName() + ": status at shutdown: " + (Object)((Object)this._componentStatus.getStatus())));
            this._log.info((Object)(this.getName() + ": queue at shutdown: " + this._messageQueue));
            while (this._componentStatus.getStatus() != DatabusComponentStatus.Status.SHUTDOWN && this._componentStatus.getStatus() != DatabusComponentStatus.Status.INITIALIZING) {
                this._shutdownCondition.awaitUninterruptibly();
            }
        }
        finally {
            this._controlLock.unlock();
        }
    }

    public String getName() {
        return this._name;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isShutdown() {
        this._controlLock.lock();
        try {
            boolean bl = this._componentStatus.getStatus() == DatabusComponentStatus.Status.SHUTDOWN;
            return bl;
        }
        finally {
            this._controlLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Object pollNextState() {
        Object nextState = null;
        this._controlLock.lock();
        try {
            while (!this.checkForShutdownRequest() && this._messageQueue.isEmpty()) {
                try {
                    this._newStateCondition.await(100L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException interruptedException) {}
            }
            if (!this.checkForShutdownRequest()) {
                nextState = this._messageQueue.poll();
                this._hasMessages = this._messageQueue.size() > 0;
            }
        }
        finally {
            this._controlLock.unlock();
        }
        return nextState;
    }

    public DatabusComponentStatus getComponentStatus() {
        return this._componentStatus;
    }

    public boolean checkForShutdownRequest() {
        return null != this._shutdownRequest;
    }

    public String getQueueListString() {
        StringBuilder sb = new StringBuilder(100);
        this.getQueueListString(sb);
        return sb.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void getQueueListString(StringBuilder sb) {
        this._controlLock.lock();
        try {
            sb.append(this.getName());
            sb.append(" queue: ");
            sb.append(this._messageQueue.toString());
        }
        finally {
            this._controlLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void clearQueue(MessageQueueFilter filter) {
        try {
            this._controlLock.lock();
            Iterator itr = this._messageQueue.iterator();
            while (itr.hasNext()) {
                Object msg = itr.next();
                boolean retain = filter.shouldRetain(msg);
                if (retain) continue;
                itr.remove();
            }
        }
        finally {
            this._controlLock.unlock();
        }
    }

    protected void clearMessageQueueOnPause() {
        this.clearQueue(new DefaultPauseFilter());
    }

    protected void clearMessageQueueOnSuspend() {
        this.clearQueue(new DefaultSuspendFilter());
    }

    protected void clearMessageQueueOnShutdown() {
        this.clearQueue(new DefaultShutdownFilter());
    }

    protected boolean shouldRetainMessageOnPause(Object msg) {
        return msg instanceof LifecycleMessage;
    }

    protected boolean shouldRetainMessageOnSuspend(Object msg) {
        return msg instanceof LifecycleMessage;
    }

    protected boolean shouldRetainMessageOnShutdown(Object msg) {
        return false;
    }

    public Queue<Object> getMessageQueue() {
        return this._messageQueue;
    }

    public String getMessageHistoryLog() {
        return this._messageProcessedHistory.toString();
    }

    protected boolean hasMessages() {
        return this._hasMessages;
    }

    private class DefaultShutdownFilter
    implements MessageQueueFilter {
        private DefaultShutdownFilter() {
        }

        @Override
        public boolean shouldRetain(Object msg) {
            return AbstractActorMessageQueue.this.shouldRetainMessageOnShutdown(msg);
        }
    }

    private class DefaultSuspendFilter
    implements MessageQueueFilter {
        private DefaultSuspendFilter() {
        }

        @Override
        public boolean shouldRetain(Object msg) {
            return AbstractActorMessageQueue.this.shouldRetainMessageOnSuspend(msg);
        }
    }

    private class DefaultPauseFilter
    implements MessageQueueFilter {
        private DefaultPauseFilter() {
        }

        @Override
        public boolean shouldRetain(Object msg) {
            return AbstractActorMessageQueue.this.shouldRetainMessageOnPause(msg);
        }
    }

    public static interface MessageQueueFilter {
        public boolean shouldRetain(Object var1);
    }
}

