/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.CurrentThreadScheduler;
import rx.concurrency.ImmediateScheduler;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func2;

public class OperationObserveOn {
    public static <T> Observable.OnSubscribeFunc<T> observeOn(Observable<? extends T> source, Scheduler scheduler) {
        return new ObserveOn<T>(source, scheduler);
    }

    private static class ObserveOn<T>
    implements Observable.OnSubscribeFunc<T> {
        private final Observable<? extends T> source;
        private final Scheduler scheduler;
        private volatile Scheduler recursiveScheduler;
        final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue();
        final AtomicInteger counter = new AtomicInteger(0);

        public ObserveOn(Observable<? extends T> source, Scheduler scheduler) {
            this.source = source;
            this.scheduler = scheduler;
        }

        @Override
        public Subscription onSubscribe(Observer<? super T> observer) {
            if (this.scheduler instanceof ImmediateScheduler) {
                return this.source.subscribe(observer);
            }
            if (this.scheduler instanceof CurrentThreadScheduler) {
                return this.source.subscribe(observer);
            }
            return this.observeOn(observer, this.scheduler);
        }

        public Subscription observeOn(final Observer<? super T> observer, final Scheduler scheduler) {
            final CompositeSubscription s = new CompositeSubscription(new Subscription[0]);
            s.add(this.source.materialize().subscribe(new Action1<Notification<? extends T>>(){

                @Override
                public void call(Notification<? extends T> e) {
                    ObserveOn.this.queue.offer(e);
                    if (ObserveOn.this.counter.getAndIncrement() == 0) {
                        if (ObserveOn.this.recursiveScheduler == null) {
                            s.add(scheduler.schedule(null, new Func2<Scheduler, T, Subscription>(){

                                @Override
                                public Subscription call(Scheduler innerScheduler, T state) {
                                    ObserveOn.this.recursiveScheduler = innerScheduler;
                                    ObserveOn.this.processQueue(s, observer);
                                    return Subscriptions.empty();
                                }
                            }));
                        } else {
                            ObserveOn.this.processQueue(s, observer);
                        }
                    }
                }
            }));
            return s;
        }

        private void processQueue(CompositeSubscription s, final Observer<? super T> observer) {
            s.add(this.recursiveScheduler.schedule(new Action1<Action0>(){

                @Override
                public void call(Action0 self) {
                    Notification not = ObserveOn.this.queue.poll();
                    if (not != null) {
                        not.accept(observer);
                    }
                    if (ObserveOn.this.counter.decrementAndGet() > 0) {
                        self.call();
                    }
                }
            }));
        }
    }
}

