/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.server;

import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportResponseHandler;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.Message;
import org.apache.spark.network.protocol.RequestMessage;
import org.apache.spark.network.protocol.ResponseMessage;
import org.apache.spark.network.server.TransportRequestHandler;
import org.apache.spark.network.util.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.io.netty.channel.ChannelHandlerContext;
import org.spark_project.io.netty.channel.SimpleChannelInboundHandler;
import org.spark_project.io.netty.handler.timeout.IdleState;
import org.spark_project.io.netty.handler.timeout.IdleStateEvent;

public class TransportChannelHandler
extends SimpleChannelInboundHandler<Message> {
    private static final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);
    private final TransportClient client;
    private final TransportResponseHandler responseHandler;
    private final TransportRequestHandler requestHandler;
    private final long requestTimeoutNs;
    private final boolean closeIdleConnections;
    private final TransportContext transportContext;

    public TransportChannelHandler(TransportClient client, TransportResponseHandler responseHandler, TransportRequestHandler requestHandler, long requestTimeoutMs, boolean closeIdleConnections, TransportContext transportContext) {
        this.client = client;
        this.responseHandler = responseHandler;
        this.requestHandler = requestHandler;
        this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000L;
        this.closeIdleConnections = closeIdleConnections;
        this.transportContext = transportContext;
    }

    public TransportClient getClient() {
        return this.client;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()), cause);
        this.requestHandler.exceptionCaught(cause);
        this.responseHandler.exceptionCaught(cause);
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        try {
            this.requestHandler.channelActive();
        }
        catch (RuntimeException e) {
            logger.error("Exception from request handler while channel is active", (Throwable)e);
        }
        try {
            this.responseHandler.channelActive();
        }
        catch (RuntimeException e) {
            logger.error("Exception from response handler while channel is active", (Throwable)e);
        }
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        try {
            this.requestHandler.channelInactive();
        }
        catch (RuntimeException e) {
            logger.error("Exception from request handler while channel is inactive", (Throwable)e);
        }
        try {
            this.responseHandler.channelInactive();
        }
        catch (RuntimeException e) {
            logger.error("Exception from response handler while channel is inactive", (Throwable)e);
        }
        super.channelInactive(ctx);
    }

    @Override
    public boolean acceptInboundMessage(Object msg) throws Exception {
        if (msg instanceof ChunkFetchRequest) {
            return false;
        }
        return super.acceptInboundMessage(msg);
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
        if (request instanceof RequestMessage) {
            this.requestHandler.handle((RequestMessage)request);
        } else if (request instanceof ResponseMessage) {
            this.responseHandler.handle((ResponseMessage)request);
        } else {
            ctx.fireChannelRead(request);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent)evt;
            TransportChannelHandler transportChannelHandler = this;
            synchronized (transportChannelHandler) {
                boolean isActuallyOverdue;
                boolean bl = isActuallyOverdue = System.nanoTime() - this.responseHandler.getTimeOfLastRequestNs() > this.requestTimeoutNs;
                if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
                    if (this.responseHandler.numOutstandingRequests() > 0) {
                        String address = NettyUtils.getRemoteAddress(ctx.channel());
                        logger.error("Connection to {} has been quiet for {} ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.", (Object)address, (Object)(this.requestTimeoutNs / 1000L / 1000L));
                        this.client.timeOut();
                        ctx.close();
                    } else if (this.closeIdleConnections) {
                        this.client.timeOut();
                        ctx.close();
                    }
                }
            }
        }
        ctx.fireUserEventTriggered(evt);
    }

    public TransportResponseHandler getResponseHandler() {
        return this.responseHandler;
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.transportContext.getRegisteredConnections().inc();
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        this.transportContext.getRegisteredConnections().dec();
        super.channelUnregistered(ctx);
    }
}

