/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.krystal.lattice.ext.grpc;

import com.flipkart.krystal.data.ImmutableRequest;
import com.flipkart.krystal.krystex.kryon.KryonExecutorConfig;
import com.flipkart.krystal.lattice.core.di.Bindings;
import com.flipkart.krystal.lattice.core.doping.Dopant;
import com.flipkart.krystal.lattice.core.doping.DopantType;
import com.flipkart.krystal.lattice.core.headers.Header;
import com.flipkart.krystal.lattice.core.headers.SingleValueHeader;
import com.flipkart.krystal.lattice.ext.grpc.GrpcServer;
import com.flipkart.krystal.lattice.ext.grpc.GrpcServerConfig;
import com.flipkart.krystal.lattice.ext.grpc.GrpcServerSpec;
import com.flipkart.krystal.lattice.ext.grpc.StandardHeadersInterceptor;
import com.flipkart.krystal.lattice.vajram.VajramDopant;
import com.flipkart.krystal.lattice.vajram.VajramRequestExecutionContext;
import com.flipkart.krystal.pooling.LeaseUnavailableException;
import com.flipkart.krystal.tags.Names;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Message;
import io.grpc.BindableService;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.Generated;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DopantType(value="krystal.lattice.grpcServer")
public abstract class GrpcServerDopant
implements Dopant<GrpcServer, GrpcServerConfig> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GrpcServerDopant.class);
    public static final String DOPANT_TYPE = "krystal.lattice.grpcServer";
    private final GrpcServerSpec grpcServerSpec;
    private final GrpcServerConfig config;
    private final GrpcServer annotation;
    private final VajramDopant vajramDopant;
    private final StandardHeadersInterceptor headerInterceptor;
    private @MonotonicNonNull Server server;

    public static GrpcServerSpec.GrpcServerSpecBuilder grpcServer() {
        return new GrpcServerSpec.GrpcServerSpecBuilder();
    }

    @Inject
    protected GrpcServerDopant(GrpcInitData initData) {
        this.grpcServerSpec = initData.spec();
        this.annotation = initData.annotation();
        this.config = initData.config();
        this.vajramDopant = initData.vajramDopant();
        this.headerInterceptor = initData.headerInterceptor();
    }

    public void start() throws IOException {
        log.info("****** GrpcServerDopant : Starting GrpcServer {} ****** ", (Object)this.annotation.serverName());
        ExecutorService executor = Executors.newFixedThreadPool(8);
        ServerBuilder serverBuilder = Grpc.newServerBuilderForPort((int)this.config.port(), (ServerCredentials)InsecureServerCredentials.create()).executor((Executor)executor).intercept((ServerInterceptor)this.headerInterceptor);
        this.serviceDefinitions().forEach(arg_0 -> ((ServerBuilder)serverBuilder).addService(arg_0));
        Server server = serverBuilder.build().start();
        log.info("************************************************");
        log.info("****** GrpcServerDopant : Started GrpcServer {}", (Object)this.annotation.serverName());
        log.info("************************************************");
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.error("*** shutting down gRPC server '{}' since JVM is shutting down", (Object)this.annotation.serverName());
            try {
                server.shutdown().awaitTermination(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                server.shutdownNow();
                log.error("", (Throwable)e);
            }
            finally {
                executor.shutdown();
            }
            log.error("*** server shut down ***");
        }));
        this.server = server;
    }

    protected abstract ImmutableList<BindableService> serviceDefinitions();

    public <RespT, RespProtoT extends Message> void executeRpc(ImmutableRequest<RespT> request, StreamObserver<@Nullable RespProtoT> responseObserver, Function<@Nullable RespT, RespProtoT> protoConverter) {
        Bindings seedMap = this.getRequestSeeds();
        KryonExecutorConfig.KryonExecutorConfigBuilder configBuilder = KryonExecutorConfig.builder();
        String requestId = (String)this.grpcServerSpec.requestIdContextKey().get();
        if (requestId != null) {
            configBuilder.executorId(requestId);
        }
        try {
            this.vajramDopant.executeRequest(VajramRequestExecutionContext.builder().vajramRequest(request).requestScopeSeeds(seedMap).executorConfigBuilder(configBuilder).build()).whenComplete((response, throwable) -> {
                try {
                    if (throwable != null) {
                        responseObserver.onError(throwable);
                    } else {
                        Message proto = (Message)protoConverter.apply(response);
                        if (proto == null) {
                            responseObserver.onError((Throwable)GrpcServerDopant.getUnknownInternalError());
                        } else {
                            responseObserver.onNext((Object)proto);
                        }
                    }
                }
                catch (Throwable e) {
                    responseObserver.onError(e);
                }
                finally {
                    responseObserver.onCompleted();
                }
            });
        }
        catch (LeaseUnavailableException e) {
            log.error("Could not lease out single thread executor. Aborting request", (Throwable)e);
            responseObserver.onError((Throwable)new StatusException(Status.RESOURCE_EXHAUSTED));
            return;
        }
    }

    private Bindings getRequestSeeds() {
        Bindings.BindingsBuilder bindings = Bindings.builder();
        this.addHeader(this.grpcServerSpec.acceptHeaderContextKey(), bindings);
        return bindings.build();
    }

    private void addHeader(Context.Key<String> key, Bindings.BindingsBuilder bindings) {
        String headerValue = (String)key.get();
        if (headerValue != null) {
            String headerKey = key.toString();
            bindings.bind(Header.class, (Annotation)Names.named((String)headerKey), (Object)new SingleValueHeader(headerKey, headerValue));
        }
    }

    private static IllegalArgumentException getUnknownInternalError() {
        return new IllegalArgumentException("Unknown internal error");
    }

    public void tryMainMethodExit() throws InterruptedException {
        if (this.server != null) {
            this.server.awaitTermination();
        }
    }

    @Singleton
    protected record GrpcInitData(GrpcServer annotation, GrpcServerConfig config, GrpcServerSpec spec, StandardHeadersInterceptor headerInterceptor, VajramDopant vajramDopant) {
    }
}

