/*
 * Decompiled with CFR 0.152.
 */
package com.twosigma.waiter.courier;

import com.twosigma.waiter.courier.CourierGrpc;
import com.twosigma.waiter.courier.CourierReply;
import com.twosigma.waiter.courier.CourierRequest;
import com.twosigma.waiter.courier.CourierSummary;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class GrpcClient {
    private static Function<String, Void> logFunction = new Function<String, Void>(){

        @Override
        public Void apply(String message) {
            System.out.println(message);
            return null;
        }
    };

    public static void setLogFunction(Function<String, Void> logFunction) {
        GrpcClient.logFunction = logFunction;
    }

    private static ManagedChannel initializeChannel(String host, int port) {
        logFunction.apply("initializing plaintext client at " + host + ":" + port);
        return ManagedChannelBuilder.forAddress((String)host, (int)port).usePlaintext().build();
    }

    private static void shutdownChannel(ManagedChannel channel) throws InterruptedException {
        logFunction.apply("shutting down channel");
        channel.shutdown().awaitTermination(1L, TimeUnit.SECONDS);
        logFunction.apply("channel shutdown successfully");
    }

    private static Metadata attachRequestHeaders(Map<String, Object> headers) {
        Metadata headerMetadata = new Metadata();
        for (Map.Entry<String, Object> entry : headers.entrySet()) {
            String key = entry.getKey();
            String value = String.valueOf(entry.getValue());
            headerMetadata.put(Metadata.Key.of((String)key, (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER), (Object)value);
        }
        return headerMetadata;
    }

    private static Channel wrapResponseLogger(ManagedChannel channel) {
        return ClientInterceptors.intercept((Channel)channel, (ClientInterceptor[])new ClientInterceptor[]{new ClientInterceptor(){

            public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
                return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)){

                    public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
                        super.start((ClientCall.Listener)new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener){

                            public void onHeaders(Metadata headers) {
                                logFunction.apply("headers received from server:" + headers);
                                super.onHeaders(headers);
                            }

                            public void onClose(Status status, Metadata trailers) {
                                logFunction.apply("status received from server:" + status);
                                logFunction.apply("trailers received from server:" + trailers);
                                super.onClose(status, trailers);
                            }
                        }, headers);
                    }
                };
            }
        }});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    public static CourierReply sendPackage(String host, int port, Map<String, Object> headers, String id, String from, String message) throws InterruptedException {
        ManagedChannel channel = GrpcClient.initializeChannel(host, port);
        try {
            CourierReply response;
            Channel wrappedChannel = GrpcClient.wrapResponseLogger(channel);
            Metadata headerMetadata = GrpcClient.attachRequestHeaders(headers);
            CourierGrpc.CourierFutureStub rawStub = CourierGrpc.newFutureStub(wrappedChannel);
            CourierGrpc.CourierFutureStub futureStub = (CourierGrpc.CourierFutureStub)MetadataUtils.attachHeaders((AbstractStub)rawStub, (Metadata)headerMetadata);
            logFunction.apply("will try to send package from " + from + " ...");
            CourierRequest request = CourierRequest.newBuilder().setId(id).setFrom(from).setMessage(message).build();
            try {
                response = (CourierReply)futureStub.sendPackage(request).get();
            }
            catch (StatusRuntimeException e) {
                logFunction.apply("RPC failed, status: " + e.getStatus());
                CourierReply courierReply = null;
                GrpcClient.shutdownChannel(channel);
                return courierReply;
            }
            catch (Exception e) {
                logFunction.apply("RPC failed, message: " + e.getMessage());
                CourierReply courierReply = null;
                GrpcClient.shutdownChannel(channel);
                return courierReply;
            }
            logFunction.apply("received response CourierReply{id=" + response.getId() + ", response=" + response.getResponse() + ", message.length=" + response.getMessage().length() + "}");
            logFunction.apply("messages equal = " + message.equals(response.getMessage()));
            CourierReply courierReply = response;
            return courierReply;
            {
                catch (Throwable throwable) {
                    throw throwable;
                }
            }
        }
        finally {
            GrpcClient.shutdownChannel(channel);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    public static CourierSummary collectPackages(String host, int port, Map<String, Object> headers, String idPrefix, String from, List<String> messages, int interMessageSleepMs, final boolean lockStepMode) throws InterruptedException {
        ManagedChannel channel = GrpcClient.initializeChannel(host, port);
        try {
            final Semaphore lockStep = new Semaphore(1);
            Channel wrappedChannel = GrpcClient.wrapResponseLogger(channel);
            Metadata headerMetadata = GrpcClient.attachRequestHeaders(headers);
            CourierGrpc.CourierStub rawStub = CourierGrpc.newStub(wrappedChannel);
            CourierGrpc.CourierStub futureStub = (CourierGrpc.CourierStub)MetadataUtils.attachHeaders((AbstractStub)rawStub, (Metadata)headerMetadata);
            logFunction.apply("will try to send package from " + from + " ...");
            final CompletableFuture responsePromise = new CompletableFuture();
            try {
                StreamObserver<CourierRequest> collector = futureStub.collectPackages(new StreamObserver<CourierSummary>(){
                    private long numMessages = 0L;
                    private long totalLength = 0L;

                    public void onNext(CourierSummary response) {
                        logFunction.apply("received response CourierSummary{count=" + response.getNumMessages() + ", length=" + response.getTotalLength() + "}");
                        this.numMessages += response.getNumMessages();
                        this.totalLength += response.getTotalLength();
                        if (lockStepMode) {
                            logFunction.apply("releasing semaphore after receiving response");
                            lockStep.release();
                        }
                    }

                    public void onError(Throwable throwable) {
                        logFunction.apply("error in collecting summaries " + throwable);
                        responsePromise.complete(null);
                    }

                    public void onCompleted() {
                        logFunction.apply("completed collecting summaries");
                        responsePromise.complete(CourierSummary.newBuilder().setNumMessages(this.numMessages).setTotalLength(this.totalLength).build());
                    }
                });
                for (int i = 0; i < messages.size(); ++i) {
                    String requestId = idPrefix + i;
                    if (lockStepMode) {
                        logFunction.apply("acquiring semaphore before sending request " + requestId);
                        lockStep.acquire();
                    }
                    CourierRequest request = CourierRequest.newBuilder().setId(requestId).setFrom(from).setMessage(messages.get(i)).build();
                    logFunction.apply("sending message CourierRequest{id=" + request.getId() + ", from=" + request.getFrom() + ", message.length=" + request.getMessage().length() + "}");
                    collector.onNext((Object)request);
                    Thread.sleep(interMessageSleepMs);
                }
                logFunction.apply("completed sending packages");
                collector.onCompleted();
                CourierSummary courierSummary = (CourierSummary)responsePromise.get();
                return courierSummary;
            }
            catch (StatusRuntimeException e) {
                logFunction.apply("RPC failed, status: " + e.getStatus());
                CourierSummary courierSummary = null;
                GrpcClient.shutdownChannel(channel);
                return courierSummary;
            }
            catch (Exception e) {
                logFunction.apply("RPC failed, message: " + e.getMessage());
                CourierSummary courierSummary = null;
                {
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                }
                GrpcClient.shutdownChannel(channel);
                return courierSummary;
            }
        }
        finally {
            GrpcClient.shutdownChannel(channel);
        }
    }

    public static void main(String ... args) throws Exception {
        String host = "localhost";
        int port = 8080;
        HashMap<String, Object> headers = new HashMap<String, Object>();
        String id = UUID.randomUUID().toString();
        String user = "Jim";
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < 100000; ++i2) {
            sb.append("a");
            if (i2 % 1000 != 0) continue;
            sb.append(".");
        }
        CourierReply courierReply = GrpcClient.sendPackage("localhost", 8080, headers, id, "Jim", sb.toString());
        logFunction.apply("sendPackage response = " + courierReply);
        List<String> messages = IntStream.range(0, 100).mapToObj(i -> "message-" + i).collect(Collectors.toList());
        CourierSummary courierSummary = GrpcClient.collectPackages("localhost", 8080, headers, "id-", "Jim", messages, 10, true);
        logFunction.apply("collectPackages response = " + courierSummary);
    }
}

