/*
 * Decompiled with CFR 0.152.
 */
package rx.quasar;

import co.paralleluniverse.fibers.FiberAsync;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.ReceivePort;
import co.paralleluniverse.strands.channels.SendPort;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.quasar.OnSubscribeFromChannel;

public final class ChannelObservable {
    private ChannelObservable() {
    }

    public static <T> Observable<T> from(ReceivePort<T> channel) {
        return Observable.create(new OnSubscribeFromChannel<T>(channel));
    }

    public static <T> Observable<T> from(ReceivePort<T> channel, Scheduler scheduler) {
        return Observable.create(new OnSubscribeFromChannel<T>(channel)).subscribeOn(scheduler);
    }

    public static <T> Observer<T> to(final SendPort<T> channel) {
        return new Observer<T>(){

            @Suspendable
            public void onNext(T t) {
                try {
                    channel.send(t);
                }
                catch (InterruptedException ex) {
                    Strand.interrupted();
                }
                catch (SuspendExecution ex) {
                    throw new AssertionError((Object)ex);
                }
            }

            public void onCompleted() {
                channel.close();
            }

            public void onError(Throwable e) {
                channel.close(e);
            }
        };
    }

    public static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, Observable<? extends T> o) {
        final Channel channel = Channels.newChannel((int)bufferSize, (Channels.OverflowPolicy)policy);
        o.subscribe(new Observer<T>(){

            @Suspendable
            public void onNext(T t) {
                try {
                    channel.send(t);
                }
                catch (InterruptedException ex) {
                    Strand.interrupted();
                }
                catch (SuspendExecution ex) {
                    throw new AssertionError((Object)ex);
                }
            }

            public void onCompleted() {
                channel.close();
            }

            public void onError(Throwable e) {
                channel.close(e);
            }
        });
        return channel;
    }

    public static <T> T get(Observable<T> o) throws ExecutionException, SuspendExecution, InterruptedException {
        return (T)new AsyncObservable<T>(o).run();
    }

    public static <T> T get(Observable<T> o, long timeout, TimeUnit unit) throws ExecutionException, SuspendExecution, InterruptedException, TimeoutException {
        return (T)new AsyncObservable<T>(o).run(timeout, unit);
    }

    private static class AsyncObservable<T>
    extends FiberAsync<T, ExecutionException>
    implements Observer<T> {
        private final Observable<T> o;

        public AsyncObservable(Observable<T> o) {
            this.o = o;
        }

        protected void requestAsync() {
            this.o.subscribe((Observer)this);
        }

        public void onNext(T t) {
            if (this.isCompleted()) {
                throw new IllegalStateException("Operation already completed");
            }
            this.asyncCompleted(t);
        }

        public void onError(Throwable e) {
            if (this.isCompleted()) {
                throw new IllegalStateException("Operation already completed");
            }
            this.asyncFailed(e);
        }

        public void onCompleted() {
            if (!this.isCompleted()) {
                this.asyncCompleted(null);
            }
        }

        protected ExecutionException wrapException(Throwable t) {
            return new ExecutionException(t);
        }
    }
}

