/*
 * Decompiled with CFR 0.152.
 */
package rx.operators;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.concurrency.Schedulers;
import rx.operators.OperationParallelMerge;
import rx.subjects.PublishSubject;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class OperationParallelMergeTest {
    @Test
    public void testParallelMerge() {
        PublishSubject p1 = PublishSubject.create();
        PublishSubject p2 = PublishSubject.create();
        PublishSubject p3 = PublishSubject.create();
        PublishSubject p4 = PublishSubject.create();
        Observable fourStreams = Observable.from(p1, p2, p3, p4);
        Observable twoStreams = OperationParallelMerge.parallelMerge(fourStreams, 2);
        Observable threeStreams = OperationParallelMerge.parallelMerge(fourStreams, 3);
        List fourList = fourStreams.toList().toBlockingObservable().last();
        List threeList = threeStreams.toList().toBlockingObservable().last();
        List twoList = twoStreams.toList().toBlockingObservable().last();
        System.out.println("two list: " + twoList);
        System.out.println("three list: " + threeList);
        System.out.println("four list: " + fourList);
        Assert.assertEquals((long)4L, (long)fourList.size());
        Assert.assertEquals((long)3L, (long)threeList.size());
        Assert.assertEquals((long)2L, (long)twoList.size());
    }

    @Test
    public void testNumberOfThreads() {
        final ConcurrentHashMap threads = new ConcurrentHashMap();
        Observable.merge(OperationParallelMergeTest.getStreams()).toBlockingObservable().forEach(new Action1<String>(){

            @Override
            public void call(String o) {
                System.out.println("o: " + o + " Thread: " + Thread.currentThread());
                threads.put(Thread.currentThread().getName(), Thread.currentThread().getName());
            }
        });
        Assert.assertEquals((long)Runtime.getRuntime().availableProcessors(), (long)threads.keySet().size());
        threads.clear();
        OperationParallelMerge.parallelMerge(OperationParallelMergeTest.getStreams(), 3).flatMap(new Func1<Observable<String>, Observable<String>>(){

            @Override
            public Observable<String> call(Observable<String> o) {
                return o.observeOn(Schedulers.newThread());
            }
        }).toBlockingObservable().forEach(new Action1<String>(){

            @Override
            public void call(String o) {
                System.out.println("o: " + o + " Thread: " + Thread.currentThread());
                threads.put(Thread.currentThread().getName(), Thread.currentThread().getName());
            }
        });
        Assert.assertEquals((long)3L, (long)threads.keySet().size());
    }

    @Test
    public void testNumberOfThreadsOnScheduledMerge() {
        final ConcurrentHashMap threads = new ConcurrentHashMap();
        Observable.merge(OperationParallelMerge.parallelMerge(OperationParallelMergeTest.getStreams(), 3, Schedulers.newThread())).toBlockingObservable().forEach(new Action1<String>(){

            @Override
            public void call(String o) {
                System.out.println("o: " + o + " Thread: " + Thread.currentThread());
                threads.put(Thread.currentThread().getName(), Thread.currentThread().getName());
            }
        });
        Assert.assertEquals((long)3L, (long)threads.keySet().size());
    }

    private static Observable<Observable<String>> getStreams() {
        return Observable.range(0, 10).map(new Func1<Integer, Observable<String>>(){

            @Override
            public Observable<String> call(final Integer i) {
                return Observable.interval(10L, TimeUnit.MILLISECONDS).map(new Func1<Long, String>(){

                    @Override
                    public String call(Long l) {
                        return "Stream " + i + "  Value: " + l;
                    }
                }).take(5);
            }
        });
    }
}

