/*
 * Decompiled with CFR 0.152.
 */
package com.github.ylgrgyq.reservoir;

import com.github.ylgrgyq.reservoir.AutomaticObjectQueueConsumerBuilder;
import com.github.ylgrgyq.reservoir.ConsumeObjectHandler;
import com.github.ylgrgyq.reservoir.ConsumeObjectListener;
import com.github.ylgrgyq.reservoir.HandleFailedStrategy;
import com.github.ylgrgyq.reservoir.NamedThreadFactory;
import com.github.ylgrgyq.reservoir.ObjectQueueConsumer;
import com.github.ylgrgyq.reservoir.Verifiable;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class AutomaticObjectQueueConsumer<E extends Verifiable>
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(AutomaticObjectQueueConsumer.class);
    private static final ThreadFactory threadFactory = new NamedThreadFactory("automatic-object-queue-consumer-");
    private final ObjectQueueConsumer<E> backupQueue;
    private final Thread worker;
    private final Executor listenerExecutor;
    private final Set<ConsumeObjectListener<E>> listeners;
    private volatile boolean closed;

    AutomaticObjectQueueConsumer(AutomaticObjectQueueConsumerBuilder<E> builder) {
        Objects.requireNonNull(builder, "builder");
        this.backupQueue = builder.getConsumer();
        this.worker = threadFactory.newThread(new Worker(builder.getConsumeObjectHandler()));
        this.listenerExecutor = builder.getListenerExecutor();
        this.listeners = new CopyOnWriteArraySet<ConsumeObjectListener<ConsumeObjectListener<E>>>(builder.getConsumeElementListeners());
        this.worker.start();
    }

    public void addListener(ConsumeObjectListener<E> listener) {
        Objects.requireNonNull(listener, "listener");
        this.listeners.add(listener);
    }

    public boolean removeListener(ConsumeObjectListener<E> listener) {
        Objects.requireNonNull(listener, "listener");
        return this.listeners.remove(listener);
    }

    @Override
    public void close() throws Exception {
        this.closed = true;
        if (this.worker != Thread.currentThread()) {
            this.worker.interrupt();
            this.worker.join();
        }
        this.backupQueue.close();
    }

    boolean closed() {
        return this.closed;
    }

    private boolean handleObjectFailed(ConsumeObjectHandler<E> handler, E obj, Throwable ex) throws Exception {
        HandleFailedStrategy strategy;
        boolean commit = false;
        try {
            strategy = handler.onHandleObjectFailed(obj, ex);
            if (strategy == null) {
                logger.error("ConsumeObjectHandler.onHandleObjectFailed returned null on handle object {}, and exception {}, shutdown anyway.", (Object)obj, (Object)ex);
                strategy = HandleFailedStrategy.SHUTDOWN;
            }
        }
        catch (Exception ex2) {
            logger.error("Got unexpected exception from ConsumeObjectHandler.onHandleObjectFailed on handle object {}, and exception {}. Shutdown anyway.", (Object)obj, (Object)ex2);
            strategy = HandleFailedStrategy.SHUTDOWN;
        }
        switch (strategy) {
            case RETRY: {
                break;
            }
            case IGNORE: {
                commit = true;
                break;
            }
            case SHUTDOWN: {
                this.close();
            }
        }
        return commit;
    }

    private void notifyInvalidObject(E obj) {
        this.sendNotification(l -> l.onInvalidObject(obj));
    }

    private void notifyOnHandleSuccess(E obj) {
        this.sendNotification(l -> l.onHandleSuccess(obj));
    }

    private void notifyOnHandleFailed(E obj, Throwable ex) {
        this.sendNotification(l -> l.onHandleFailed(obj, ex));
    }

    private void sendNotification(Consumer<ConsumeObjectListener<E>> consumer) {
        try {
            this.listenerExecutor.execute(() -> {
                for (ConsumeObjectListener<E> l : this.listeners) {
                    try {
                        consumer.accept(l);
                    }
                    catch (Exception ex) {
                        l.onListenerNotificationFailed(ex);
                    }
                }
            });
        }
        catch (Exception ex) {
            logger.error("Notification failed", ex);
        }
    }

    private final class Worker
    implements Runnable {
        private final ConsumeObjectHandler<E> handler;

        Worker(ConsumeObjectHandler<E> handler) {
            this.handler = handler;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ConsumeObjectHandler<Verifiable> handler = this.handler;
            ObjectQueueConsumer consumer = AutomaticObjectQueueConsumer.this.backupQueue;
            while (!AutomaticObjectQueueConsumer.this.closed) {
                try {
                    boolean commit = false;
                    try {
                        Verifiable obj = (Verifiable)consumer.fetch();
                        if (!obj.isValid()) {
                            AutomaticObjectQueueConsumer.this.notifyInvalidObject(obj);
                            commit = true;
                            continue;
                        }
                        try {
                            handler.onHandleObject(obj);
                            AutomaticObjectQueueConsumer.this.notifyOnHandleSuccess(obj);
                            commit = true;
                        }
                        catch (Exception ex) {
                            AutomaticObjectQueueConsumer.this.notifyOnHandleFailed(obj, ex);
                            commit = AutomaticObjectQueueConsumer.this.handleObjectFailed(handler, obj, ex);
                        }
                    }
                    finally {
                        if (!commit) continue;
                        consumer.commit();
                    }
                }
                catch (InterruptedException commit) {
                }
                catch (Exception ex) {
                    logger.warn("Got unexpected exception on processing object in reservoir queue.", ex);
                }
            }
        }
    }
}

