/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.tracing.zipkin;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.tracing.zipkin.ZipkinBaseTest;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import zipkin2.Span;

public class EventBusTest
extends ZipkinBaseTest {
    private static final String ADDRESS = "the-address";

    @Override
    protected HttpClientOptions getHttpClientOptions() {
        return new HttpClientOptions().setDefaultPort(8080);
    }

    @Test
    public void testEventBusSendPropagate(TestContext ctx) throws Exception {
        this.testSend(ctx, TracingPolicy.PROPAGATE, 2);
    }

    @Test
    public void testEventBusSendIgnore(TestContext ctx) throws Exception {
        this.testSend(ctx, TracingPolicy.IGNORE, 0);
    }

    @Test
    public void testEventBusSendAlways(TestContext ctx) throws Exception {
        this.testSend(ctx, TracingPolicy.ALWAYS, 2);
    }

    private void testSend(TestContext ctx, TracingPolicy policy, int expected) throws Exception {
        Async latch = ctx.async();
        ProducerVerticle producerVerticle = new ProducerVerticle(this.getHttpServerPolicy(policy), vertx -> {
            vertx.eventBus().send(ADDRESS, (Object)"ping", new DeliveryOptions().setTracingPolicy(policy));
            return Future.succeededFuture();
        });
        this.vertx.deployVerticle((Verticle)producerVerticle, ctx.asyncAssertSuccess(d1 -> {
            Promise consumerPromise = Promise.promise();
            this.vertx.deployVerticle((Verticle)new ConsumerVerticle((Promise<Void>)consumerPromise), ctx.asyncAssertSuccess(d2 -> this.client.request(HttpMethod.GET, "/", ctx.asyncAssertSuccess(req -> req.send(ctx.asyncAssertSuccess(resp -> {
                ctx.assertEquals((Object)200, (Object)resp.statusCode());
                consumerPromise.future().onComplete(ctx.asyncAssertSuccess(v -> latch.complete()));
            }))))));
        }));
        latch.awaitSuccess();
        List spans = (List)EventBusTest.waitUntilTrace(() -> Optional.of(this.zipkin.getTraces()).filter(traces -> !traces.isEmpty()).map(traces -> (List)traces.get(0)).map(trace -> trace.stream().filter(span -> !span.tags().containsKey("http.path")).collect(Collectors.toList())).filter(trace -> trace.size() == expected));
        for (Span span : spans) {
            Assert.assertEquals((Object)"send", (Object)span.name());
            Assert.assertEquals((Object)ADDRESS, (Object)span.remoteEndpoint().serviceName());
        }
    }

    private TracingPolicy getHttpServerPolicy(TracingPolicy policy) {
        return policy == TracingPolicy.ALWAYS ? TracingPolicy.IGNORE : TracingPolicy.ALWAYS;
    }

    @Test
    @Ignore
    public void testEventBusPublishProgagate(TestContext ctx) throws Exception {
        this.testPublish(ctx, TracingPolicy.PROPAGATE, 3);
    }

    @Test
    public void testEventBusPublishIgnore(TestContext ctx) throws Exception {
        this.testPublish(ctx, TracingPolicy.IGNORE, 0);
    }

    @Test
    @Ignore
    public void testEventBusPublishAlways(TestContext ctx) throws Exception {
        this.testPublish(ctx, TracingPolicy.ALWAYS, 3);
    }

    private void testPublish(TestContext ctx, TracingPolicy policy, int expected) throws Exception {
        Async latch = ctx.async();
        ProducerVerticle producerVerticle = new ProducerVerticle(this.getHttpServerPolicy(policy), vertx -> {
            vertx.eventBus().publish(ADDRESS, (Object)"ping", new DeliveryOptions().setTracingPolicy(policy));
            return Future.succeededFuture();
        });
        this.vertx.deployVerticle((Verticle)producerVerticle, ctx.asyncAssertSuccess(d1 -> {
            Promise consumer1Promise = Promise.promise();
            Promise consumer2Promise = Promise.promise();
            this.vertx.deployVerticle((Verticle)new ConsumerVerticle((Promise<Void>)consumer1Promise), ctx.asyncAssertSuccess(d2 -> this.vertx.deployVerticle((Verticle)new ConsumerVerticle((Promise<Void>)consumer2Promise), ctx.asyncAssertSuccess(d3 -> this.client.request(HttpMethod.GET, "/", ctx.asyncAssertSuccess(req -> req.send(ctx.asyncAssertSuccess(resp -> {
                ctx.assertEquals((Object)200, (Object)resp.statusCode());
                CompositeFuture.all((Future)consumer1Promise.future(), (Future)consumer2Promise.future()).onComplete(ctx.asyncAssertSuccess(v -> latch.complete()));
            }))))))));
        }));
        latch.awaitSuccess();
        List spans = (List)EventBusTest.waitUntilTrace(() -> Optional.of(this.zipkin.getTraces()).filter(traces -> !traces.isEmpty()).map(traces -> (List)traces.get(0)).map(trace -> trace.stream().filter(span -> !span.tags().containsKey("http.path")).collect(Collectors.toList())).filter(trace -> trace.size() == expected));
        for (Span span : spans) {
            Assert.assertEquals((Object)"publish", (Object)span.name());
            Assert.assertEquals((Object)ADDRESS, (Object)span.remoteEndpoint().serviceName());
        }
    }

    @Test
    public void testEventBusRequestReplyPropagate(TestContext ctx) throws Exception {
        this.testRequestReply(ctx, TracingPolicy.PROPAGATE, false, 2);
    }

    @Test
    public void testEventBusRequestReplyIgnore(TestContext ctx) throws Exception {
        this.testRequestReply(ctx, TracingPolicy.IGNORE, false, 0);
    }

    @Test
    public void testEventBusRequestReplyAlways(TestContext ctx) throws Exception {
        this.testRequestReply(ctx, TracingPolicy.ALWAYS, false, 2);
    }

    @Test
    public void testEventBusRequestReplyFailurePropagate(TestContext ctx) throws Exception {
        this.testRequestReply(ctx, TracingPolicy.PROPAGATE, true, 2);
    }

    @Test
    public void testEventBusRequestReplyFailureIgnore(TestContext ctx) throws Exception {
        this.testRequestReply(ctx, TracingPolicy.IGNORE, true, 0);
    }

    @Test
    public void testEventBusRequestReplyFailureAlways(TestContext ctx) throws Exception {
        this.testRequestReply(ctx, TracingPolicy.ALWAYS, true, 2);
    }

    private void testRequestReply(TestContext ctx, TracingPolicy policy, boolean fail, int expected) throws Exception {
        Async latch = ctx.async();
        ProducerVerticle producerVerticle = new ProducerVerticle(this.getHttpServerPolicy(policy), vertx -> {
            Promise promise = Promise.promise();
            vertx.eventBus().request(ADDRESS, (Object)"ping", new DeliveryOptions().setTracingPolicy(policy), ar -> {
                if (ar.failed() == fail) {
                    vertx.runOnContext(v -> promise.complete());
                } else {
                    vertx.runOnContext(v -> promise.fail("Unexpected"));
                }
            });
            return promise.future();
        });
        this.vertx.deployVerticle((Verticle)producerVerticle, ctx.asyncAssertSuccess(d1 -> this.vertx.deployVerticle((Verticle)new ReplyVerticle(fail), ctx.asyncAssertSuccess(d2 -> this.client.request(HttpMethod.GET, "/", ctx.asyncAssertSuccess(req -> req.send(ctx.asyncAssertSuccess(resp -> {
            ctx.assertEquals((Object)200, (Object)resp.statusCode());
            latch.complete();
        }))))))));
        latch.awaitSuccess();
        List spans = (List)EventBusTest.waitUntilTrace(() -> Optional.of(this.zipkin.getTraces()).filter(traces -> !traces.isEmpty()).map(traces -> (List)traces.get(0)).map(trace -> trace.stream().filter(span -> !span.tags().containsKey("http.path")).collect(Collectors.toList())).filter(trace -> trace.size() == expected));
        for (Span span : spans) {
            Assert.assertEquals((Object)"send", (Object)span.name());
            Assert.assertEquals((Object)ADDRESS, (Object)span.remoteEndpoint().serviceName());
        }
    }

    private static class ReplyVerticle
    extends AbstractVerticle {
        final boolean fail;

        ReplyVerticle(boolean fail) {
            this.fail = fail;
        }

        public void start(Promise<Void> startPromise) {
            this.vertx.eventBus().consumer(EventBusTest.ADDRESS, msg -> {
                if (this.fail) {
                    msg.fail(10, "boom");
                } else {
                    msg.reply(msg.body());
                }
            }).completionHandler(startPromise);
        }
    }

    private static class ConsumerVerticle
    extends AbstractVerticle {
        final Promise<Void> promise;

        ConsumerVerticle(Promise<Void> promise) {
            this.promise = promise;
        }

        public void start(Promise<Void> startPromise) {
            this.vertx.eventBus().consumer(EventBusTest.ADDRESS, msg -> this.vertx.runOnContext(v -> this.promise.complete())).completionHandler(startPromise);
        }
    }

    private static class ProducerVerticle
    extends AbstractVerticle {
        private final TracingPolicy httpServerPolicy;
        private final Function<Vertx, Future<Void>> action;

        private ProducerVerticle(TracingPolicy httpServerPolicy, Function<Vertx, Future<Void>> action) {
            this.httpServerPolicy = httpServerPolicy;
            this.action = action;
        }

        public void start(Promise<Void> startPromise) {
            this.vertx.createHttpServer(new HttpServerOptions().setTracingPolicy(this.httpServerPolicy)).requestHandler(this::onRequest).listen(8080).mapEmpty().onComplete(startPromise);
        }

        private void onRequest(HttpServerRequest request) {
            this.action.apply(this.vertx).onComplete(ar -> {
                if (ar.succeeded()) {
                    request.response().end();
                } else {
                    ar.cause().printStackTrace();
                    request.response().setStatusCode(500).end();
                }
            });
        }
    }
}

