/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.commons.io.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.CallQueueManager;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.IpcException;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.RpcServerException;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceInfo;
import org.apache.htrace.TraceScope;

public abstract class Server {
    private final boolean authorize;
    private List<SaslRpcServer.AuthMethod> enabledAuthMethods;
    private RpcHeaderProtos.RpcSaslProto negotiateResponse;
    private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
    private static final ByteBuffer HTTP_GET_BYTES = ByteBuffer.wrap("GET ".getBytes(Charsets.UTF_8));
    static final String RECEIVED_HTTP_REQ_RESPONSE = "HTTP/1.1 404 Not Found\r\nContent-type: text/plain\r\n\r\nIt looks like you are making an HTTP request to a Hadoop IPC port. This is not the correct port for the web interface on this daemon.\r\n";
    static int INITIAL_RESP_BUF_SIZE = 10240;
    static Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap = new HashMap<RPC.RpcKind, RpcKindMapValue>(4);
    public static final Log LOG = LogFactory.getLog(Server.class);
    public static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." + Server.class.getName());
    private static final String AUTH_FAILED_FOR = "Auth failed for ";
    private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
    private static final ThreadLocal<Server> SERVER = new ThreadLocal();
    private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap();
    private static final ThreadLocal<Call> CurCall = new ThreadLocal();
    private String bindAddress;
    private int port;
    private int handlerCount;
    private int readThreads;
    private int readerPendingConnectionQueue;
    private Class<? extends Writable> rpcRequestClass;
    protected final RpcMetrics rpcMetrics;
    protected final RpcDetailedMetrics rpcDetailedMetrics;
    private Configuration conf;
    private String portRangeConfig = null;
    private SecretManager<TokenIdentifier> secretManager;
    private SaslPropertiesResolver saslPropsResolver;
    private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager();
    private int maxQueueSize;
    private final int maxRespSize;
    private int socketSendBufferSize;
    private final int maxDataLength;
    private final boolean tcpNoDelay;
    private volatile boolean running = true;
    private CallQueueManager<Call> callQueue;
    private ConnectionManager connectionManager;
    private Listener listener = null;
    private Responder responder = null;
    private Handler[] handlers = null;
    private boolean logSlowRPC = false;
    private static int NIO_BUFFER_LIMIT = 8192;

    public void addTerseExceptions(Class<?> ... exceptionClass) {
        this.exceptionsHandler.addTerseLoggingExceptions(exceptionClass);
    }

    public void addSuppressedLoggingExceptions(Class<?> ... exceptionClass) {
        this.exceptionsHandler.addSuppressedLoggingExceptions(exceptionClass);
    }

    public static void registerProtocolEngine(RPC.RpcKind rpcKind, Class<? extends Writable> rpcRequestWrapperClass, RPC.RpcInvoker rpcInvoker) {
        RpcKindMapValue old = rpcKindMap.put(rpcKind, new RpcKindMapValue(rpcRequestWrapperClass, rpcInvoker));
        if (old != null) {
            rpcKindMap.put(rpcKind, old);
            throw new IllegalArgumentException("ReRegistration of rpcKind: " + (Object)((Object)rpcKind));
        }
        LOG.debug("rpcKind=" + (Object)((Object)rpcKind) + ", rpcRequestWrapperClass=" + rpcRequestWrapperClass + ", rpcInvoker=" + rpcInvoker);
    }

    public Class<? extends Writable> getRpcRequestWrapper(RpcHeaderProtos.RpcKindProto rpcKind) {
        if (this.rpcRequestClass != null) {
            return this.rpcRequestClass;
        }
        RpcKindMapValue val = rpcKindMap.get((Object)ProtoUtil.convert(rpcKind));
        return val == null ? null : val.rpcRequestWrapperClass;
    }

    public static RPC.RpcInvoker getRpcInvoker(RPC.RpcKind rpcKind) {
        RpcKindMapValue val = rpcKindMap.get((Object)rpcKind);
        return val == null ? null : val.rpcInvoker;
    }

    static Class<?> getProtocolClass(String protocolName, Configuration conf) throws ClassNotFoundException {
        Class<?> protocol = PROTOCOL_CACHE.get(protocolName);
        if (protocol == null) {
            protocol = conf.getClassByName(protocolName);
            PROTOCOL_CACHE.put(protocolName, protocol);
        }
        return protocol;
    }

    public static Server get() {
        return SERVER.get();
    }

    @VisibleForTesting
    public static ThreadLocal<Call> getCurCall() {
        return CurCall;
    }

    public static int getCallId() {
        Call call = CurCall.get();
        return call != null ? call.callId : -2;
    }

    public static int getCallRetryCount() {
        Call call = CurCall.get();
        return call != null ? call.retryCount : -1;
    }

    public static InetAddress getRemoteIp() {
        Call call = CurCall.get();
        return call != null && call.connection != null ? call.connection.getHostInetAddress() : null;
    }

    public static byte[] getClientId() {
        Call call = CurCall.get();
        return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
    }

    public static String getRemoteAddress() {
        InetAddress addr = Server.getRemoteIp();
        return addr == null ? null : addr.getHostAddress();
    }

    public static UserGroupInformation getRemoteUser() {
        Call call = CurCall.get();
        return call != null && call.connection != null ? ((Call)call).connection.user : null;
    }

    public static boolean isRpcInvocation() {
        return CurCall.get() != null;
    }

    protected boolean isLogSlowRPC() {
        return this.logSlowRPC;
    }

    @VisibleForTesting
    protected void setLogSlowRPC(boolean logSlowRPCFlag) {
        this.logSlowRPC = logSlowRPCFlag;
    }

    void logSlowRpcCalls(String methodName, int processingTime) {
        int deviation = 3;
        int minSampleSize = 1024;
        double threeSigma = this.rpcMetrics.getProcessingMean() + this.rpcMetrics.getProcessingStdDev() * 3.0;
        if (this.rpcMetrics.getProcessingSampleCount() > 1024L && (double)processingTime > threeSigma) {
            if (LOG.isWarnEnabled()) {
                String client = CurCall.get().connection.toString();
                LOG.warn("Slow RPC : " + methodName + " took " + processingTime + " milliseconds to process from client " + client);
            }
            this.rpcMetrics.incrSlowRpc();
        }
    }

    public static void bind(ServerSocket socket, InetSocketAddress address, int backlog) throws IOException {
        Server.bind(socket, address, backlog, null, null);
    }

    public static void bind(ServerSocket socket, InetSocketAddress address, int backlog, Configuration conf, String rangeConf) throws IOException {
        try {
            Configuration.IntegerRanges range = null;
            if (rangeConf != null) {
                range = conf.getRange(rangeConf, "");
            }
            if (range == null || range.isEmpty() || address.getPort() != 0) {
                socket.bind(address, backlog);
            } else {
                for (Integer port : range) {
                    if (socket.isBound()) break;
                    try {
                        InetSocketAddress temp = new InetSocketAddress(address.getAddress(), (int)port);
                        socket.bind(temp, backlog);
                    }
                    catch (BindException e) {}
                }
                if (!socket.isBound()) {
                    throw new BindException("Could not find a free port in " + range);
                }
            }
        }
        catch (SocketException e) {
            throw NetUtils.wrapException(null, 0, address.getHostName(), address.getPort(), e);
        }
    }

    @VisibleForTesting
    public RpcMetrics getRpcMetrics() {
        return this.rpcMetrics;
    }

    @VisibleForTesting
    public RpcDetailedMetrics getRpcDetailedMetrics() {
        return this.rpcDetailedMetrics;
    }

    @VisibleForTesting
    Iterable<? extends Thread> getHandlers() {
        return Arrays.asList(this.handlers);
    }

    @VisibleForTesting
    Connection[] getConnections() {
        return this.connectionManager.toArray();
    }

    public void refreshServiceAcl(Configuration conf, PolicyProvider provider) {
        this.serviceAuthorizationManager.refresh(conf, provider);
    }

    @InterfaceAudience.Private
    public void refreshServiceAclWithLoadedConfiguration(Configuration conf, PolicyProvider provider) {
        this.serviceAuthorizationManager.refreshWithLoadedConfiguration(conf, provider);
    }

    @InterfaceAudience.LimitedPrivate(value={"HDFS", "MapReduce"})
    public ServiceAuthorizationManager getServiceAuthorizationManager() {
        return this.serviceAuthorizationManager;
    }

    static Class<? extends BlockingQueue<Call>> getQueueClass(String prefix, Configuration conf) {
        String name = prefix + "." + "callqueue.impl";
        Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
        return CallQueueManager.convertQueueClass(queueClass, Call.class);
    }

    private String getQueueClassPrefix() {
        return "ipc." + this.port;
    }

    public synchronized void refreshCallQueue(Configuration conf) {
        String prefix = this.getQueueClassPrefix();
        this.callQueue.swapQueue(Server.getQueueClass(prefix, conf), this.maxQueueSize, prefix, conf);
    }

    static boolean getClientBackoffEnable(String prefix, Configuration conf) {
        String name = prefix + "." + "backoff.enable";
        return conf.getBoolean(name, false);
    }

    @VisibleForTesting
    void logException(Log logger, Throwable e, Call call) {
        if (this.exceptionsHandler.isSuppressedLog(e.getClass())) {
            return;
        }
        String logMsg = Thread.currentThread().getName() + ", call " + call;
        if (this.exceptionsHandler.isTerseLog(e.getClass())) {
            logger.info(logMsg + ": " + e);
        } else if (e instanceof RuntimeException || e instanceof Error) {
            logger.warn(logMsg, e);
        } else {
            logger.info(logMsg, e);
        }
    }

    protected Server(String bindAddress, int port, Class<? extends Writable> paramClass, int handlerCount, Configuration conf) throws IOException {
        this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null, null);
    }

    protected Server(String bindAddress, int port, Class<? extends Writable> rpcRequestClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
        this(bindAddress, port, rpcRequestClass, handlerCount, numReaders, queueSizePerHandler, conf, serverName, secretManager, null);
    }

    protected Server(String bindAddress, int port, Class<? extends Writable> rpcRequestClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException {
        this.bindAddress = bindAddress;
        this.conf = conf;
        this.portRangeConfig = portRangeConfig;
        this.port = port;
        this.rpcRequestClass = rpcRequestClass;
        this.handlerCount = handlerCount;
        this.socketSendBufferSize = 0;
        this.maxDataLength = conf.getInt("ipc.maximum.data.length", 0x4000000);
        this.maxQueueSize = queueSizePerHandler != -1 ? queueSizePerHandler : handlerCount * conf.getInt("ipc.server.handler.queue.size", 100);
        this.maxRespSize = conf.getInt("ipc.server.max.response.size", 0x100000);
        this.readThreads = numReaders != -1 ? numReaders : conf.getInt("ipc.server.read.threadpool.size", 1);
        this.readerPendingConnectionQueue = conf.getInt("ipc.server.read.connection-queue.size", 100);
        String prefix = this.getQueueClassPrefix();
        this.callQueue = new CallQueueManager(Server.getQueueClass(prefix, conf), Server.getClientBackoffEnable(prefix, conf), this.maxQueueSize, prefix, conf);
        this.secretManager = secretManager;
        this.authorize = conf.getBoolean("hadoop.security.authorization", false);
        this.enabledAuthMethods = this.getAuthMethods(secretManager, conf);
        this.negotiateResponse = this.buildNegotiateResponse(this.enabledAuthMethods);
        this.listener = new Listener();
        this.port = this.listener.getAddress().getPort();
        this.connectionManager = new ConnectionManager();
        this.rpcMetrics = RpcMetrics.create(this, conf);
        this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
        this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", true);
        this.setLogSlowRPC(conf.getBoolean("ipc.server.log.slow.rpc", false));
        this.responder = new Responder();
        if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {
            SaslRpcServer.init(conf);
            this.saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
        }
        this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
    }

    private RpcHeaderProtos.RpcSaslProto buildNegotiateResponse(List<SaslRpcServer.AuthMethod> authMethods) throws IOException {
        RpcHeaderProtos.RpcSaslProto.Builder negotiateBuilder = RpcHeaderProtos.RpcSaslProto.newBuilder();
        if (authMethods.contains((Object)SaslRpcServer.AuthMethod.SIMPLE) && authMethods.size() == 1) {
            negotiateBuilder.setState(RpcHeaderProtos.RpcSaslProto.SaslState.SUCCESS);
        } else {
            negotiateBuilder.setState(RpcHeaderProtos.RpcSaslProto.SaslState.NEGOTIATE);
            for (SaslRpcServer.AuthMethod authMethod : authMethods) {
                SaslRpcServer saslRpcServer = new SaslRpcServer(authMethod);
                RpcHeaderProtos.RpcSaslProto.SaslAuth.Builder builder = negotiateBuilder.addAuthsBuilder().setMethod(authMethod.toString()).setMechanism(saslRpcServer.mechanism);
                if (saslRpcServer.protocol != null) {
                    builder.setProtocol(saslRpcServer.protocol);
                }
                if (saslRpcServer.serverId == null) continue;
                builder.setServerId(saslRpcServer.serverId);
            }
        }
        return negotiateBuilder.build();
    }

    private List<SaslRpcServer.AuthMethod> getAuthMethods(SecretManager<?> secretManager, Configuration conf) {
        UserGroupInformation.AuthenticationMethod confAuthenticationMethod = SecurityUtil.getAuthenticationMethod(conf);
        ArrayList<SaslRpcServer.AuthMethod> authMethods = new ArrayList<SaslRpcServer.AuthMethod>();
        if (confAuthenticationMethod == UserGroupInformation.AuthenticationMethod.TOKEN) {
            if (secretManager == null) {
                throw new IllegalArgumentException((Object)((Object)UserGroupInformation.AuthenticationMethod.TOKEN) + " authentication requires a secret manager");
            }
        } else if (secretManager != null) {
            LOG.debug((Object)((Object)UserGroupInformation.AuthenticationMethod.TOKEN) + " authentication enabled for secret manager");
            authMethods.add(UserGroupInformation.AuthenticationMethod.TOKEN.getAuthMethod());
        }
        authMethods.add(confAuthenticationMethod.getAuthMethod());
        LOG.debug("Server accepts auth methods:" + authMethods);
        return authMethods;
    }

    private void closeConnection(Connection connection) {
        this.connectionManager.close(connection);
    }

    private void setupResponse(ByteArrayOutputStream responseBuf, Call call, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto status, RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto erCode, Writable rv, String errorClass, String error) throws IOException {
        responseBuf.reset();
        DataOutputStream out = new DataOutputStream(responseBuf);
        RpcHeaderProtos.RpcResponseHeaderProto.Builder headerBuilder = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder();
        headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
        headerBuilder.setCallId(call.callId);
        headerBuilder.setRetryCount(call.retryCount);
        headerBuilder.setStatus(status);
        headerBuilder.setServerIpcVersionNum(9);
        if (status == RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS) {
            RpcHeaderProtos.RpcResponseHeaderProto header = headerBuilder.build();
            int headerLen = header.getSerializedSize();
            int fullLength = CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen;
            try {
                if (rv instanceof ProtobufRpcEngine.RpcWrapper) {
                    ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper)rv;
                    out.writeInt(fullLength += resWrapper.getLength());
                    header.writeDelimitedTo(out);
                    rv.write(out);
                }
                DataOutputBuffer buf = new DataOutputBuffer();
                rv.write(buf);
                byte[] data = buf.getData();
                out.writeInt(fullLength += buf.getLength());
                header.writeDelimitedTo(out);
                out.write(data, 0, buf.getLength());
            }
            catch (Throwable t) {
                LOG.warn("Error serializing call response for call " + call, t);
                this.setupResponse(responseBuf, call, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.ERROR, RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_SERIALIZING_RESPONSE, null, t.getClass().getName(), StringUtils.stringifyException(t));
                return;
            }
        } else {
            headerBuilder.setExceptionClassName(errorClass);
            headerBuilder.setErrorMsg(error);
            headerBuilder.setErrorDetail(erCode);
            RpcHeaderProtos.RpcResponseHeaderProto header = headerBuilder.build();
            int headerLen = header.getSerializedSize();
            int fullLength = CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen;
            out.writeInt(fullLength);
            header.writeDelimitedTo(out);
        }
        if (call.connection.useWrap) {
            this.wrapWithSasl(responseBuf, call);
        }
        call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
    }

    private void setupResponseOldVersionFatal(ByteArrayOutputStream response, Call call, Writable rv, String errorClass, String error) throws IOException {
        int OLD_VERSION_FATAL_STATUS = -1;
        response.reset();
        DataOutputStream out = new DataOutputStream(response);
        out.writeInt(call.callId);
        out.writeInt(-1);
        WritableUtils.writeString(out, errorClass);
        WritableUtils.writeString(out, error);
        if (call.connection.useWrap) {
            this.wrapWithSasl(response, call);
        }
        call.setResponse(ByteBuffer.wrap(response.toByteArray()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wrapWithSasl(ByteArrayOutputStream response, Call call) throws IOException {
        if (((Call)call).connection.saslServer != null) {
            byte[] token = response.toByteArray();
            SaslServer saslServer = ((Call)call).connection.saslServer;
            synchronized (saslServer) {
                token = ((Call)call).connection.saslServer.wrap(token, 0, token.length);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Adding saslServer wrapped token of size " + token.length + " as call response.");
            }
            response.reset();
            RpcHeaderProtos.RpcResponseHeaderProto saslHeader = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder().setCallId(AuthProtocol.SASL.callId).setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS).build();
            RpcHeaderProtos.RpcSaslProto saslMessage = RpcHeaderProtos.RpcSaslProto.newBuilder().setState(RpcHeaderProtos.RpcSaslProto.SaslState.WRAP).setToken(ByteString.copyFrom(token, 0, token.length)).build();
            ProtobufRpcEngine.RpcResponseMessageWrapper saslResponse = new ProtobufRpcEngine.RpcResponseMessageWrapper(saslHeader, (Message)saslMessage);
            DataOutputStream out = new DataOutputStream(response);
            out.writeInt(saslResponse.getLength());
            saslResponse.write(out);
        }
    }

    Configuration getConf() {
        return this.conf;
    }

    public void setSocketSendBufSize(int size) {
        this.socketSendBufferSize = size;
    }

    public synchronized void start() {
        this.responder.start();
        this.listener.start();
        this.handlers = new Handler[this.handlerCount];
        for (int i = 0; i < this.handlerCount; ++i) {
            this.handlers[i] = new Handler(i);
            this.handlers[i].start();
        }
    }

    public synchronized void stop() {
        LOG.info("Stopping server on " + this.port);
        this.running = false;
        if (this.handlers != null) {
            for (int i = 0; i < this.handlerCount; ++i) {
                if (this.handlers[i] == null) continue;
                this.handlers[i].interrupt();
            }
        }
        this.listener.interrupt();
        this.listener.doStop();
        this.responder.interrupt();
        this.notifyAll();
        this.rpcMetrics.shutdown();
        this.rpcDetailedMetrics.shutdown();
    }

    public synchronized void join() throws InterruptedException {
        while (this.running) {
            this.wait();
        }
    }

    public synchronized InetSocketAddress getListenerAddress() {
        return this.listener.getAddress();
    }

    @Deprecated
    public Writable call(Writable param, long receiveTime) throws Exception {
        return this.call(RPC.RpcKind.RPC_BUILTIN, null, param, receiveTime);
    }

    public abstract Writable call(RPC.RpcKind var1, String var2, Writable var3, long var4) throws Exception;

    private void authorize(UserGroupInformation user, String protocolName, InetAddress addr) throws AuthorizationException {
        if (this.authorize) {
            if (protocolName == null) {
                throw new AuthorizationException("Null protocol not authorized");
            }
            Class<?> protocol = null;
            try {
                protocol = Server.getProtocolClass(protocolName, this.getConf());
            }
            catch (ClassNotFoundException cfne) {
                throw new AuthorizationException("Unknown protocol: " + protocolName);
            }
            this.serviceAuthorizationManager.authorize(user, protocol, this.getConf(), addr);
        }
    }

    public int getPort() {
        return this.port;
    }

    public int getNumOpenConnections() {
        return this.connectionManager.size();
    }

    public int getCallQueueLen() {
        return this.callQueue.size();
    }

    public int getMaxQueueSize() {
        return this.maxQueueSize;
    }

    public int getNumReaders() {
        return this.readThreads;
    }

    private int channelWrite(WritableByteChannel channel, ByteBuffer buffer) throws IOException {
        int count;
        int n = count = buffer.remaining() <= NIO_BUFFER_LIMIT ? channel.write(buffer) : Server.channelIO(null, channel, buffer);
        if (count > 0) {
            this.rpcMetrics.incrSentBytes(count);
        }
        return count;
    }

    private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
        int count;
        int n = count = buffer.remaining() <= NIO_BUFFER_LIMIT ? channel.read(buffer) : Server.channelIO(channel, null, buffer);
        if (count > 0) {
            this.rpcMetrics.incrReceivedBytes(count);
        }
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int channelIO(ReadableByteChannel readCh, WritableByteChannel writeCh, ByteBuffer buf) throws IOException {
        int nBytes;
        int originalLimit = buf.limit();
        int initialRemaining = buf.remaining();
        int ret = 0;
        while (buf.remaining() > 0) {
            try {
                int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
                buf.limit(buf.position() + ioSize);
                ret = readCh == null ? writeCh.write(buf) : readCh.read(buf);
                if (ret >= ioSize) continue;
                break;
            }
            finally {
                buf.limit(originalLimit);
            }
        }
        return (nBytes = initialRemaining - buf.remaining()) > 0 ? nBytes : ret;
    }

    private class ConnectionManager {
        private final AtomicInteger count = new AtomicInteger();
        private final Set<Connection> connections;
        private final Timer idleScanTimer;
        private final int idleScanThreshold;
        private final int idleScanInterval;
        private final int maxIdleTime;
        private final int maxIdleToClose;
        private final int maxConnections;

        ConnectionManager() {
            this.idleScanTimer = new Timer("IPC Server idle connection scanner for port " + Server.this.getPort(), true);
            this.idleScanThreshold = Server.this.conf.getInt("ipc.client.idlethreshold", 4000);
            this.idleScanInterval = Server.this.conf.getInt("ipc.client.connection.idle-scan-interval.ms", 10000);
            this.maxIdleTime = 2 * Server.this.conf.getInt("ipc.client.connection.maxidletime", 10000);
            this.maxIdleToClose = Server.this.conf.getInt("ipc.client.kill.max", 10);
            this.maxConnections = Server.this.conf.getInt("ipc.server.max.connections", 0);
            this.connections = Collections.newSetFromMap(new ConcurrentHashMap(Server.this.maxQueueSize, 0.75f, Server.this.readThreads + 2));
        }

        private boolean add(Connection connection) {
            boolean added = this.connections.add(connection);
            if (added) {
                this.count.getAndIncrement();
            }
            return added;
        }

        private boolean remove(Connection connection) {
            boolean removed = this.connections.remove(connection);
            if (removed) {
                this.count.getAndDecrement();
            }
            return removed;
        }

        int size() {
            return this.count.get();
        }

        boolean isFull() {
            return this.maxConnections > 0 && this.size() >= this.maxConnections;
        }

        Connection[] toArray() {
            return this.connections.toArray(new Connection[0]);
        }

        Connection register(SocketChannel channel) {
            if (this.isFull()) {
                return null;
            }
            Connection connection = new Connection(channel, Time.now());
            this.add(connection);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Server connection from " + connection + "; # active connections: " + this.size() + "; # queued calls: " + Server.this.callQueue.size());
            }
            return connection;
        }

        boolean close(Connection connection) {
            boolean exists = this.remove(connection);
            if (exists) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(Thread.currentThread().getName() + ": disconnecting client " + connection + ". Number of active connections: " + this.size());
                }
                connection.close();
            }
            return exists;
        }

        synchronized void closeIdle(boolean scanAll) {
            long minLastContact = Time.now() - (long)this.maxIdleTime;
            int closed = 0;
            for (Connection connection : this.connections) {
                if ((scanAll || this.size() >= this.idleScanThreshold) && (!connection.isIdle() || connection.getLastContact() >= minLastContact || !this.close(connection) || scanAll || ++closed != this.maxIdleToClose)) continue;
                break;
            }
        }

        void closeAll() {
            for (Connection connection : this.toArray()) {
                this.close(connection);
            }
        }

        void startIdleScan() {
            this.scheduleIdleScanTask();
        }

        void stopIdleScan() {
            this.idleScanTimer.cancel();
        }

        private void scheduleIdleScanTask() {
            if (!Server.this.running) {
                return;
            }
            TimerTask idleScanTask = new TimerTask(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    if (!Server.this.running) {
                        return;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(Thread.currentThread().getName() + ": task running");
                    }
                    try {
                        ConnectionManager.this.closeIdle(false);
                    }
                    finally {
                        ConnectionManager.this.scheduleIdleScanTask();
                    }
                }
            };
            this.idleScanTimer.schedule(idleScanTask, this.idleScanInterval);
        }
    }

    private class Handler
    extends Thread {
        public Handler(int instanceNumber) {
            this.setDaemon(true);
            this.setName("IPC Server handler " + instanceNumber + " on " + Server.this.port);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        @Override
        public void run() {
            LOG.debug(Thread.currentThread().getName() + ": starting");
            SERVER.set(Server.this);
            ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
            while (Server.this.running) {
                TraceScope traceScope;
                block26: {
                    Call call;
                    block23: {
                        block24: {
                            traceScope = null;
                            call = (Call)Server.this.callQueue.take();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + (Object)((Object)call.rpcKind));
                            }
                            if (call.connection.channel.isOpen()) break block23;
                            LOG.info(Thread.currentThread().getName() + ": skipped " + call);
                            if (traceScope == null) break block24;
                            traceScope.close();
                        }
                        IOUtils.cleanup(LOG, traceScope);
                        continue;
                    }
                    try {
                        Writable value;
                        RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto detailedErr;
                        RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto returnStatus;
                        String error;
                        String errorClass;
                        block25: {
                            errorClass = null;
                            error = null;
                            returnStatus = RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS;
                            detailedErr = null;
                            value = null;
                            CurCall.set(call);
                            if (call.traceSpan != null) {
                                traceScope = Trace.continueSpan(call.traceSpan);
                            }
                            CallerContext.setCurrent(call.callerContext);
                            try {
                                value = ((Call)call).connection.user == null ? Server.this.call(call.rpcKind, ((Call)call).connection.protocolName, call.rpcRequest, call.timestamp) : ((Call)call).connection.user.doAs(new PrivilegedExceptionAction<Writable>(){

                                    @Override
                                    public Writable run() throws Exception {
                                        return Server.this.call(call.rpcKind, ((Call)call).connection.protocolName, call.rpcRequest, call.timestamp);
                                    }
                                });
                            }
                            catch (Throwable e) {
                                if (e instanceof UndeclaredThrowableException) {
                                    e = e.getCause();
                                }
                                Server.this.logException(LOG, e, call);
                                if (e instanceof RpcServerException) {
                                    RpcServerException rse = (RpcServerException)e;
                                    returnStatus = rse.getRpcStatusProto();
                                    detailedErr = rse.getRpcErrorCodeProto();
                                } else {
                                    returnStatus = RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.ERROR;
                                    detailedErr = RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_APPLICATION;
                                }
                                errorClass = e.getClass().getName();
                                error = StringUtils.stringifyException(e);
                                String exceptionHdr = errorClass + ": ";
                                if (!error.startsWith(exceptionHdr)) break block25;
                                error = error.substring(exceptionHdr.length());
                            }
                        }
                        CurCall.set(null);
                        LinkedList linkedList = call.connection.responseQueue;
                        synchronized (linkedList) {
                            Server.this.setupResponse(buf, call, returnStatus, detailedErr, value, errorClass, error);
                            if (buf.size() > Server.this.maxRespSize) {
                                LOG.warn("Large response size " + buf.size() + " for call " + call.toString());
                                buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
                            }
                            Server.this.responder.doRespond(call);
                        }
                        if (traceScope == null) break block26;
                        traceScope.close();
                    }
                    catch (InterruptedException e) {
                        block27: {
                            if (Server.this.running) {
                                LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
                                if (Trace.isTracing()) {
                                    traceScope.getSpan().addTimelineAnnotation("unexpectedly interrupted: " + StringUtils.stringifyException(e));
                                }
                            }
                            if (traceScope == null) break block27;
                            traceScope.close();
                        }
                        IOUtils.cleanup(LOG, traceScope);
                        continue;
                    }
                    catch (Exception e2) {
                        block28: {
                            LOG.info(Thread.currentThread().getName() + " caught an exception", e2);
                            if (Trace.isTracing()) {
                                traceScope.getSpan().addTimelineAnnotation("Exception: " + StringUtils.stringifyException(e2));
                            }
                            if (traceScope == null) break block28;
                            traceScope.close();
                            {
                                catch (Throwable throwable) {
                                    if (traceScope != null) {
                                        traceScope.close();
                                    }
                                    IOUtils.cleanup(LOG, traceScope);
                                    throw throwable;
                                }
                            }
                        }
                        IOUtils.cleanup(LOG, traceScope);
                        continue;
                    }
                }
                IOUtils.cleanup(LOG, traceScope);
            }
            LOG.debug(Thread.currentThread().getName() + ": exiting");
        }
    }

    public class Connection {
        private boolean connectionHeaderRead = false;
        private boolean connectionContextRead = false;
        private SocketChannel channel;
        private ByteBuffer data;
        private ByteBuffer dataLengthBuffer;
        private LinkedList<Call> responseQueue;
        private AtomicInteger rpcCount = new AtomicInteger();
        private long lastContact;
        private int dataLength;
        private Socket socket;
        private String hostAddress;
        private int remotePort;
        private InetAddress addr;
        IpcConnectionContextProtos.IpcConnectionContextProto connectionContext;
        String protocolName;
        SaslServer saslServer;
        private SaslRpcServer.AuthMethod authMethod;
        private AuthProtocol authProtocol;
        private boolean saslContextEstablished;
        private ByteBuffer connectionHeaderBuf = null;
        private ByteBuffer unwrappedData;
        private ByteBuffer unwrappedDataLengthBuffer;
        private int serviceClass;
        UserGroupInformation user = null;
        public UserGroupInformation attemptingUser = null;
        private final Call authFailedCall = new Call(-1, -1, null, this);
        private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
        private final Call saslCall;
        private final ByteArrayOutputStream saslResponse;
        private boolean sentNegotiate;
        private boolean useWrap;

        public Connection(SocketChannel channel, long lastContact) {
            this.saslCall = new Call(AuthProtocol.SASL.callId, -1, null, this);
            this.saslResponse = new ByteArrayOutputStream();
            this.sentNegotiate = false;
            this.useWrap = false;
            this.channel = channel;
            this.lastContact = lastContact;
            this.data = null;
            this.dataLengthBuffer = ByteBuffer.allocate(4);
            this.unwrappedData = null;
            this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
            this.socket = channel.socket();
            this.addr = this.socket.getInetAddress();
            this.hostAddress = this.addr == null ? "*Unknown*" : this.addr.getHostAddress();
            this.remotePort = this.socket.getPort();
            this.responseQueue = new LinkedList();
            if (Server.this.socketSendBufferSize != 0) {
                try {
                    this.socket.setSendBufferSize(Server.this.socketSendBufferSize);
                }
                catch (IOException e) {
                    LOG.warn("Connection: unable to set socket send buffer size to " + Server.this.socketSendBufferSize);
                }
            }
        }

        public String toString() {
            return this.getHostAddress() + ":" + this.remotePort;
        }

        public String getHostAddress() {
            return this.hostAddress;
        }

        public InetAddress getHostInetAddress() {
            return this.addr;
        }

        public void setLastContact(long lastContact) {
            this.lastContact = lastContact;
        }

        public long getLastContact() {
            return this.lastContact;
        }

        private boolean isIdle() {
            return this.rpcCount.get() == 0;
        }

        private void decRpcCount() {
            this.rpcCount.decrementAndGet();
        }

        private void incRpcCount() {
            this.rpcCount.incrementAndGet();
        }

        private UserGroupInformation getAuthorizedUgi(String authorizedId) throws SecretManager.InvalidToken, AccessControlException {
            if (this.authMethod == SaslRpcServer.AuthMethod.TOKEN) {
                Object tokenId = SaslRpcServer.getIdentifier(authorizedId, Server.this.secretManager);
                UserGroupInformation ugi = ((TokenIdentifier)tokenId).getUser();
                if (ugi == null) {
                    throw new AccessControlException("Can't retrieve username from tokenIdentifier.");
                }
                ugi.addTokenIdentifier((TokenIdentifier)tokenId);
                return ugi;
            }
            return UserGroupInformation.createRemoteUser(authorizedId, this.authMethod);
        }

        private void saslReadAndProcess(DataInputStream dis) throws WrappedRpcServerException, IOException, InterruptedException {
            RpcHeaderProtos.RpcSaslProto saslMessage = (RpcHeaderProtos.RpcSaslProto)this.decodeProtobufFromStream(RpcHeaderProtos.RpcSaslProto.newBuilder(), dis);
            switch (saslMessage.getState()) {
                case WRAP: {
                    if (!this.saslContextEstablished || !this.useWrap) {
                        throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, new SaslException("Server is not wrapping data"));
                    }
                    this.unwrapPacketAndProcessRpcs(saslMessage.getToken().toByteArray());
                    break;
                }
                default: {
                    this.saslProcess(saslMessage);
                }
            }
        }

        private Throwable getTrueCause(IOException e) {
            for (Throwable cause = e; cause != null; cause = cause.getCause()) {
                if (cause instanceof RetriableException) {
                    return cause;
                }
                if (cause instanceof StandbyException) {
                    return cause;
                }
                if (!(cause instanceof SecretManager.InvalidToken)) continue;
                if (cause.getCause() != null) {
                    cause = cause.getCause();
                }
                return cause;
            }
            return e;
        }

        private void saslProcess(RpcHeaderProtos.RpcSaslProto saslMessage) throws WrappedRpcServerException, IOException, InterruptedException {
            if (this.saslContextEstablished) {
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, new SaslException("Negotiation is already complete"));
            }
            RpcHeaderProtos.RpcSaslProto saslResponse = null;
            try {
                try {
                    saslResponse = this.processSaslMessage(saslMessage);
                }
                catch (IOException e) {
                    Server.this.rpcMetrics.incrAuthenticationFailures();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(StringUtils.stringifyException(e));
                    }
                    IOException tce = (IOException)this.getTrueCause(e);
                    AUDITLOG.warn(Server.AUTH_FAILED_FOR + this.toString() + ":" + this.attemptingUser + " (" + e.getLocalizedMessage() + ") with true cause: (" + tce.getLocalizedMessage() + ")");
                    throw tce;
                }
                if (this.saslServer != null && this.saslServer.isComplete()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("SASL server context established. Negotiated QoP is " + this.saslServer.getNegotiatedProperty("javax.security.sasl.qop"));
                    }
                    this.user = this.getAuthorizedUgi(this.saslServer.getAuthorizationID());
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("SASL server successfully authenticated client: " + this.user);
                    }
                    Server.this.rpcMetrics.incrAuthenticationSuccesses();
                    AUDITLOG.info(Server.AUTH_SUCCESSFUL_FOR + this.user);
                    this.saslContextEstablished = true;
                }
            }
            catch (WrappedRpcServerException wrse) {
                throw wrse;
            }
            catch (IOException ioe) {
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, ioe);
            }
            if (saslResponse != null) {
                this.doSaslReply(saslResponse);
            }
            if (this.saslContextEstablished) {
                String qop = (String)this.saslServer.getNegotiatedProperty("javax.security.sasl.qop");
                this.useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
            }
        }

        private RpcHeaderProtos.RpcSaslProto processSaslMessage(RpcHeaderProtos.RpcSaslProto saslMessage) throws IOException, InterruptedException {
            RpcHeaderProtos.RpcSaslProto saslResponse;
            RpcHeaderProtos.RpcSaslProto.SaslState state = saslMessage.getState();
            switch (state) {
                case NEGOTIATE: {
                    if (this.sentNegotiate) {
                        throw new AccessControlException("Client already attempted negotiation");
                    }
                    saslResponse = this.buildSaslNegotiateResponse();
                    if (saslResponse.getState() != RpcHeaderProtos.RpcSaslProto.SaslState.SUCCESS) break;
                    this.switchToSimple();
                    break;
                }
                case INITIATE: {
                    if (saslMessage.getAuthsCount() != 1) {
                        throw new SaslException("Client mechanism is malformed");
                    }
                    RpcHeaderProtos.RpcSaslProto.SaslAuth clientSaslAuth = saslMessage.getAuths(0);
                    if (!Server.this.negotiateResponse.getAuthsList().contains(clientSaslAuth)) {
                        if (this.sentNegotiate) {
                            throw new AccessControlException(clientSaslAuth.getMethod() + " authentication is not enabled." + "  Available:" + Server.this.enabledAuthMethods);
                        }
                        saslResponse = this.buildSaslNegotiateResponse();
                        break;
                    }
                    this.authMethod = SaslRpcServer.AuthMethod.valueOf(clientSaslAuth.getMethod());
                    if (this.authMethod == SaslRpcServer.AuthMethod.SIMPLE) {
                        this.switchToSimple();
                        saslResponse = null;
                        break;
                    }
                    if (this.saslServer == null || this.authMethod != SaslRpcServer.AuthMethod.TOKEN) {
                        this.saslServer = this.createSaslServer(this.authMethod);
                    }
                    saslResponse = this.processSaslToken(saslMessage);
                    break;
                }
                case RESPONSE: {
                    saslResponse = this.processSaslToken(saslMessage);
                    break;
                }
                default: {
                    throw new SaslException("Client sent unsupported state " + state);
                }
            }
            return saslResponse;
        }

        private RpcHeaderProtos.RpcSaslProto processSaslToken(RpcHeaderProtos.RpcSaslProto saslMessage) throws SaslException {
            if (!saslMessage.hasToken()) {
                throw new SaslException("Client did not send a token");
            }
            byte[] saslToken = saslMessage.getToken().toByteArray();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Have read input token of size " + saslToken.length + " for processing by saslServer.evaluateResponse()");
            }
            saslToken = this.saslServer.evaluateResponse(saslToken);
            return this.buildSaslResponse(this.saslServer.isComplete() ? RpcHeaderProtos.RpcSaslProto.SaslState.SUCCESS : RpcHeaderProtos.RpcSaslProto.SaslState.CHALLENGE, saslToken);
        }

        private void switchToSimple() {
            this.authProtocol = AuthProtocol.NONE;
            this.saslServer = null;
        }

        private RpcHeaderProtos.RpcSaslProto buildSaslResponse(RpcHeaderProtos.RpcSaslProto.SaslState state, byte[] replyToken) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Will send " + state + " token of size " + (replyToken != null ? Integer.valueOf(replyToken.length) : null) + " from saslServer.");
            }
            RpcHeaderProtos.RpcSaslProto.Builder response = RpcHeaderProtos.RpcSaslProto.newBuilder();
            response.setState(state);
            if (replyToken != null) {
                response.setToken(ByteString.copyFrom(replyToken));
            }
            return response.build();
        }

        private void doSaslReply(Message message) throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending sasl message " + message);
            }
            Server.this.setupResponse(this.saslResponse, this.saslCall, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS, null, new ProtobufRpcEngine.RpcResponseWrapper(message), null, null);
            Server.this.responder.doRespond(this.saslCall);
        }

        private void doSaslReply(Exception ioe) throws IOException {
            Server.this.setupResponse(this.authFailedResponse, this.authFailedCall, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL, RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, null, ioe.getClass().getName(), ioe.getLocalizedMessage());
            Server.this.responder.doRespond(this.authFailedCall);
        }

        private void disposeSasl() {
            if (this.saslServer != null) {
                try {
                    this.saslServer.dispose();
                }
                catch (SaslException saslException) {
                    // empty catch block
                }
            }
        }

        private void checkDataLength(int dataLength) throws IOException {
            if (dataLength < 0) {
                String error = "Unexpected data length " + dataLength + "!! from " + this.getHostAddress();
                LOG.warn(error);
                throw new IOException(error);
            }
            if (dataLength > Server.this.maxDataLength) {
                String error = "Requested data length " + dataLength + " is longer than maximum configured RPC length " + Server.this.maxDataLength + ".  RPC came from " + this.getHostAddress();
                LOG.warn(error);
                throw new IOException(error);
            }
        }

        public int readAndProcess() throws WrappedRpcServerException, IOException, InterruptedException {
            int count;
            while (true) {
                count = -1;
                if (this.dataLengthBuffer.remaining() > 0 && ((count = Server.this.channelRead(this.channel, this.dataLengthBuffer)) < 0 || this.dataLengthBuffer.remaining() > 0)) {
                    return count;
                }
                if (!this.connectionHeaderRead) {
                    if (this.connectionHeaderBuf == null) {
                        this.connectionHeaderBuf = ByteBuffer.allocate(3);
                    }
                    if ((count = Server.this.channelRead(this.channel, this.connectionHeaderBuf)) < 0 || this.connectionHeaderBuf.remaining() > 0) {
                        return count;
                    }
                    byte version = this.connectionHeaderBuf.get(0);
                    this.setServiceClass(this.connectionHeaderBuf.get(1));
                    this.dataLengthBuffer.flip();
                    if (HTTP_GET_BYTES.equals(this.dataLengthBuffer)) {
                        this.setupHttpRequestOnIpcPortResponse();
                        return -1;
                    }
                    if (!RpcConstants.HEADER.equals(this.dataLengthBuffer) || version != 9) {
                        LOG.warn("Incorrect header or version mismatch from " + this.hostAddress + ":" + this.remotePort + " got version " + version + " expected version " + 9);
                        this.setupBadVersionResponse(version);
                        return -1;
                    }
                    this.authProtocol = this.initializeAuthContext(this.connectionHeaderBuf.get(2));
                    this.dataLengthBuffer.clear();
                    this.connectionHeaderBuf = null;
                    this.connectionHeaderRead = true;
                    continue;
                }
                if (this.data == null) {
                    this.dataLengthBuffer.flip();
                    this.dataLength = this.dataLengthBuffer.getInt();
                    this.checkDataLength(this.dataLength);
                    this.data = ByteBuffer.allocate(this.dataLength);
                }
                count = Server.this.channelRead(this.channel, this.data);
                if (this.data.remaining() != 0) break;
                this.dataLengthBuffer.clear();
                this.data.flip();
                boolean isHeaderRead = this.connectionContextRead;
                this.processOneRpc(this.data.array());
                this.data = null;
                if (isHeaderRead) break;
            }
            return count;
        }

        private AuthProtocol initializeAuthContext(int authType) throws IOException {
            AuthProtocol authProtocol = AuthProtocol.valueOf(authType);
            if (authProtocol == null) {
                IpcException ioe = new IpcException("Unknown auth protocol:" + authType);
                this.doSaslReply(ioe);
                throw ioe;
            }
            boolean isSimpleEnabled = Server.this.enabledAuthMethods.contains((Object)SaslRpcServer.AuthMethod.SIMPLE);
            switch (authProtocol) {
                case NONE: {
                    if (isSimpleEnabled) break;
                    AccessControlException ioe = new AccessControlException("SIMPLE authentication is not enabled.  Available:" + Server.this.enabledAuthMethods);
                    this.doSaslReply(ioe);
                    throw ioe;
                }
            }
            return authProtocol;
        }

        private RpcHeaderProtos.RpcSaslProto buildSaslNegotiateResponse() throws IOException, InterruptedException {
            RpcHeaderProtos.RpcSaslProto negotiateMessage = Server.this.negotiateResponse;
            if (Server.this.enabledAuthMethods.contains((Object)SaslRpcServer.AuthMethod.TOKEN)) {
                this.saslServer = this.createSaslServer(SaslRpcServer.AuthMethod.TOKEN);
                byte[] challenge = this.saslServer.evaluateResponse(new byte[0]);
                RpcHeaderProtos.RpcSaslProto.Builder negotiateBuilder = RpcHeaderProtos.RpcSaslProto.newBuilder(Server.this.negotiateResponse);
                negotiateBuilder.getAuthsBuilder(0).setChallenge(ByteString.copyFrom(challenge));
                negotiateMessage = negotiateBuilder.build();
            }
            this.sentNegotiate = true;
            return negotiateMessage;
        }

        private SaslServer createSaslServer(SaslRpcServer.AuthMethod authMethod) throws IOException, InterruptedException {
            Map<String, String> saslProps = Server.this.saslPropsResolver.getServerProperties(this.addr);
            return new SaslRpcServer(authMethod).create(this, saslProps, Server.this.secretManager);
        }

        private void setupBadVersionResponse(int clientVersion) throws IOException {
            String errMsg = "Server IPC version 9 cannot communicate with client version " + clientVersion;
            ByteArrayOutputStream buffer = new ByteArrayOutputStream();
            if (clientVersion >= 9) {
                Call fakeCall = new Call(-1, -1, null, this);
                Server.this.setupResponse(buffer, fakeCall, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL, RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_VERSION_MISMATCH, null, RPC.VersionMismatch.class.getName(), errMsg);
                Server.this.responder.doRespond(fakeCall);
            } else if (clientVersion >= 3) {
                Call fakeCall = new Call(-1, -1, null, this);
                Server.this.setupResponseOldVersionFatal(buffer, fakeCall, null, RPC.VersionMismatch.class.getName(), errMsg);
                Server.this.responder.doRespond(fakeCall);
            } else if (clientVersion == 2) {
                Call fakeCall = new Call(0, -1, null, this);
                DataOutputStream out = new DataOutputStream(buffer);
                out.writeInt(0);
                out.writeBoolean(true);
                WritableUtils.writeString(out, RPC.VersionMismatch.class.getName());
                WritableUtils.writeString(out, errMsg);
                fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray()));
                Server.this.responder.doRespond(fakeCall);
            }
        }

        private void setupHttpRequestOnIpcPortResponse() throws IOException {
            Call fakeCall = new Call(0, -1, null, this);
            fakeCall.setResponse(ByteBuffer.wrap(Server.RECEIVED_HTTP_REQ_RESPONSE.getBytes(Charsets.UTF_8)));
            Server.this.responder.doRespond(fakeCall);
        }

        private void processConnectionContext(DataInputStream dis) throws WrappedRpcServerException {
            if (this.connectionContextRead) {
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Connection context already processed");
            }
            this.connectionContext = (IpcConnectionContextProtos.IpcConnectionContextProto)this.decodeProtobufFromStream(IpcConnectionContextProtos.IpcConnectionContextProto.newBuilder(), dis);
            this.protocolName = this.connectionContext.hasProtocol() ? this.connectionContext.getProtocol() : null;
            UserGroupInformation protocolUser = ProtoUtil.getUgi(this.connectionContext);
            if (this.saslServer == null) {
                this.user = protocolUser;
            } else {
                this.user.setAuthenticationMethod(this.authMethod);
                if (protocolUser != null && !protocolUser.getUserName().equals(this.user.getUserName())) {
                    if (this.authMethod == SaslRpcServer.AuthMethod.TOKEN) {
                        throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, new AccessControlException("Authenticated user (" + this.user + ") doesn't match what the client claims to be (" + protocolUser + ")"));
                    }
                    UserGroupInformation realUser = this.user;
                    this.user = UserGroupInformation.createProxyUser(protocolUser.getUserName(), realUser);
                }
            }
            this.authorizeConnection();
            this.connectionContextRead = true;
        }

        private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws WrappedRpcServerException, IOException, InterruptedException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Have read input token of size " + inBuf.length + " for processing by saslServer.unwrap()");
            }
            inBuf = this.saslServer.unwrap(inBuf, 0, inBuf.length);
            ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
            while (true) {
                int count = -1;
                if (this.unwrappedDataLengthBuffer.remaining() > 0 && ((count = Server.this.channelRead(ch, this.unwrappedDataLengthBuffer)) <= 0 || this.unwrappedDataLengthBuffer.remaining() > 0)) {
                    return;
                }
                if (this.unwrappedData == null) {
                    this.unwrappedDataLengthBuffer.flip();
                    int unwrappedDataLength = this.unwrappedDataLengthBuffer.getInt();
                    this.unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
                }
                if ((count = Server.this.channelRead(ch, this.unwrappedData)) <= 0 || this.unwrappedData.remaining() > 0) {
                    return;
                }
                if (this.unwrappedData.remaining() != 0) continue;
                this.unwrappedDataLengthBuffer.clear();
                this.unwrappedData.flip();
                this.processOneRpc(this.unwrappedData.array());
                this.unwrappedData = null;
            }
        }

        private void processOneRpc(byte[] buf) throws IOException, WrappedRpcServerException, InterruptedException {
            int callId = -1;
            int retry = -1;
            try {
                DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
                RpcHeaderProtos.RpcRequestHeaderProto header = (RpcHeaderProtos.RpcRequestHeaderProto)this.decodeProtobufFromStream(RpcHeaderProtos.RpcRequestHeaderProto.newBuilder(), dis);
                callId = header.getCallId();
                retry = header.getRetryCount();
                if (LOG.isDebugEnabled()) {
                    LOG.debug(" got #" + callId);
                }
                this.checkRpcHeaders(header);
                if (callId < 0) {
                    this.processRpcOutOfBandRequest(header, dis);
                } else {
                    if (!this.connectionContextRead) {
                        throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Connection context not established");
                    }
                    this.processRpcRequest(header, dis);
                }
            }
            catch (WrappedRpcServerException wrse) {
                Throwable ioe = wrse.getCause();
                Call call = new Call(callId, retry, null, this);
                Server.this.setupResponse(this.authFailedResponse, call, RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null, ioe.getClass().getName(), ioe.getMessage());
                Server.this.responder.doRespond(call);
                throw wrse;
            }
        }

        private void checkRpcHeaders(RpcHeaderProtos.RpcRequestHeaderProto header) throws WrappedRpcServerException {
            if (!header.hasRpcOp()) {
                String err = " IPC Server: No rpc op in rpcRequestHeader";
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
            }
            if (header.getRpcOp() != RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET) {
                String err = "IPC Server does not implement rpc header operation" + header.getRpcOp();
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
            }
            if (!header.hasRpcKind()) {
                String err = " IPC Server: No rpc kind in rpcRequestHeader";
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
            }
        }

        private void processRpcRequest(RpcHeaderProtos.RpcRequestHeaderProto header, DataInputStream dis) throws WrappedRpcServerException, InterruptedException {
            Writable rpcRequest;
            Class<? extends Writable> rpcRequestClass = Server.this.getRpcRequestWrapper(header.getRpcKind());
            if (rpcRequestClass == null) {
                LOG.warn("Unknown rpc kind " + header.getRpcKind() + " from client " + this.getHostAddress());
                String err = "Unknown rpc kind in rpc header" + header.getRpcKind();
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err);
            }
            try {
                rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, Server.this.conf);
                rpcRequest.readFields(dis);
            }
            catch (Throwable t) {
                LOG.warn("Unable to read call parameters for client " + this.getHostAddress() + "on connection protocol " + this.protocolName + " for rpcKind " + header.getRpcKind(), t);
                String err = "IPC server unable to read call parameters: " + t.getMessage();
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
            }
            Span traceSpan = null;
            if (header.hasTraceInfo()) {
                TraceInfo parentSpan = new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId());
                traceSpan = Trace.startSpan(rpcRequest.toString(), parentSpan).detach();
            }
            CallerContext callerContext = null;
            if (header.hasCallerContext()) {
                callerContext = new CallerContext.Builder(header.getCallerContext().getContext()).setSignature(header.getCallerContext().getSignature().toByteArray()).build();
            }
            Call call = new Call(header.getCallId(), header.getRetryCount(), rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray(), traceSpan, callerContext);
            if (Server.this.callQueue.isClientBackoffEnabled()) {
                this.queueRequestOrAskClientToBackOff(call);
            } else {
                Server.this.callQueue.put(call);
            }
            this.incRpcCount();
        }

        private void queueRequestOrAskClientToBackOff(Call call) throws WrappedRpcServerException, InterruptedException {
            boolean isCallQueued = Server.this.callQueue.offer(call);
            if (!isCallQueued) {
                Server.this.rpcMetrics.incrClientBackoff();
                RetriableException retriableException = new RetriableException("Server is too busy.");
                throw new WrappedRpcServerExceptionSuppressed(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
            }
        }

        private void processRpcOutOfBandRequest(RpcHeaderProtos.RpcRequestHeaderProto header, DataInputStream dis) throws WrappedRpcServerException, IOException, InterruptedException {
            int callId = header.getCallId();
            if (callId == -3) {
                if (this.authProtocol == AuthProtocol.SASL && !this.saslContextEstablished) {
                    throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Connection header sent during SASL negotiation");
                }
                this.processConnectionContext(dis);
            } else if (callId == AuthProtocol.SASL.callId) {
                if (this.authProtocol != AuthProtocol.SASL) {
                    throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "SASL protocol not requested by client");
                }
                this.saslReadAndProcess(dis);
            } else if (callId == -4) {
                LOG.debug("Received ping message");
            } else {
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, "Unknown out of band call #" + callId);
            }
        }

        private void authorizeConnection() throws WrappedRpcServerException {
            try {
                if (this.user != null && this.user.getRealUser() != null && this.authMethod != SaslRpcServer.AuthMethod.TOKEN) {
                    ProxyUsers.authorize(this.user, this.getHostAddress());
                }
                Server.this.authorize(this.user, this.protocolName, this.getHostInetAddress());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Successfully authorized " + this.connectionContext);
                }
                Server.this.rpcMetrics.incrAuthorizationSuccesses();
            }
            catch (AuthorizationException ae) {
                LOG.info("Connection from " + this + " for protocol " + this.connectionContext.getProtocol() + " is unauthorized for user " + this.user);
                Server.this.rpcMetrics.incrAuthorizationFailures();
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_UNAUTHORIZED, ae);
            }
        }

        private <T extends Message> T decodeProtobufFromStream(Message.Builder builder, DataInputStream dis) throws WrappedRpcServerException {
            try {
                builder.mergeDelimitedFrom(dis);
                return (T)builder.build();
            }
            catch (Exception ioe) {
                Class<?> protoClass = builder.getDefaultInstanceForType().getClass();
                throw new WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, "Error decoding " + protoClass.getSimpleName() + ": " + ioe);
            }
        }

        public int getServiceClass() {
            return this.serviceClass;
        }

        public void setServiceClass(int serviceClass) {
            this.serviceClass = serviceClass;
        }

        private synchronized void close() {
            this.disposeSasl();
            this.data = null;
            this.dataLengthBuffer = null;
            if (!this.channel.isOpen()) {
                return;
            }
            try {
                this.socket.shutdownOutput();
            }
            catch (Exception e) {
                LOG.debug("Ignoring socket shutdown exception", e);
            }
            if (this.channel.isOpen()) {
                IOUtils.cleanup(null, this.channel);
            }
            IOUtils.cleanup(null, this.socket);
        }
    }

    private static class WrappedRpcServerExceptionSuppressed
    extends WrappedRpcServerException {
        public WrappedRpcServerExceptionSuppressed(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto errCode, IOException ioe) {
            super(errCode, ioe);
        }
    }

    private static class WrappedRpcServerException
    extends RpcServerException {
        private final RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto errCode;

        public WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto errCode, IOException ioe) {
            super(ioe.toString(), ioe);
            this.errCode = errCode;
        }

        public WrappedRpcServerException(RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto errCode, String message) {
            this(errCode, new RpcServerException(message));
        }

        @Override
        public RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto getRpcErrorCodeProto() {
            return this.errCode;
        }

        @Override
        public String toString() {
            return this.getCause().toString();
        }
    }

    @InterfaceAudience.Private
    public static enum AuthProtocol {
        NONE(0),
        SASL(-33);

        public final int callId;

        private AuthProtocol(int callId) {
            this.callId = callId;
        }

        static AuthProtocol valueOf(int callId) {
            for (AuthProtocol authType : AuthProtocol.values()) {
                if (authType.callId != callId) continue;
                return authType;
            }
            return null;
        }
    }

    private class Responder
    extends Thread {
        private final Selector writeSelector;
        private int pending;
        static final int PURGE_INTERVAL = 900000;

        Responder() throws IOException {
            this.setName("IPC Server Responder");
            this.setDaemon(true);
            this.writeSelector = Selector.open();
            this.pending = 0;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            LOG.info(Thread.currentThread().getName() + ": starting");
            SERVER.set(Server.this);
            try {
                this.doRunLoop();
            }
            finally {
                LOG.info("Stopping " + Thread.currentThread().getName());
                try {
                    this.writeSelector.close();
                }
                catch (IOException ioe) {
                    LOG.error("Couldn't close write selector in " + Thread.currentThread().getName(), ioe);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doRunLoop() {
            long lastPurgeTime = 0L;
            while (Server.this.running) {
                try {
                    ArrayList<Call> calls;
                    this.waitPending();
                    this.writeSelector.select(900000L);
                    Iterator<SelectionKey> iter = this.writeSelector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        try {
                            if (!key.isValid() || !key.isWritable()) continue;
                            this.doAsyncWrite(key);
                        }
                        catch (IOException e) {
                            LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
                        }
                    }
                    long now = Time.now();
                    if (now < lastPurgeTime + 900000L) continue;
                    lastPurgeTime = now;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Checking for old call responses.");
                    }
                    Set<SelectionKey> set = this.writeSelector.keys();
                    synchronized (set) {
                        calls = new ArrayList<Call>(this.writeSelector.keys().size());
                        for (SelectionKey key : this.writeSelector.keys()) {
                            Call call = (Call)key.attachment();
                            if (call == null || key.channel() != call.connection.channel) continue;
                            calls.add(call);
                        }
                    }
                    for (Call call : calls) {
                        this.doPurge(call, now);
                    }
                }
                catch (OutOfMemoryError e) {
                    LOG.warn("Out of Memory in server select", e);
                    try {
                        Thread.sleep(60000L);
                    }
                    catch (Exception ie) {}
                }
                catch (Exception e) {
                    LOG.warn("Exception in Responder", e);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doAsyncWrite(SelectionKey key) throws IOException {
            Call call = (Call)key.attachment();
            if (call == null) {
                return;
            }
            if (key.channel() != call.connection.channel) {
                throw new IOException("doAsyncWrite: bad channel");
            }
            LinkedList linkedList = call.connection.responseQueue;
            synchronized (linkedList) {
                if (this.processResponse(call.connection.responseQueue, false)) {
                    try {
                        key.interestOps(0);
                    }
                    catch (CancelledKeyException e) {
                        LOG.warn("Exception while changing ops : " + e);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doPurge(Call call, long now) {
            LinkedList responseQueue;
            LinkedList linkedList = responseQueue = call.connection.responseQueue;
            synchronized (linkedList) {
                ListIterator iter = responseQueue.listIterator(0);
                while (iter.hasNext()) {
                    call = (Call)iter.next();
                    if (now <= call.timestamp + 900000L) continue;
                    Server.this.closeConnection(call.connection);
                    break;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
            int numBytes;
            Call call;
            int numElements;
            boolean done;
            boolean error;
            block21: {
                error = true;
                done = false;
                numElements = 0;
                call = null;
                LinkedList<Call> linkedList = responseQueue;
                // MONITORENTER : linkedList
                numElements = responseQueue.size();
                if (numElements != 0) break block21;
                error = false;
                boolean bl = true;
                // MONITOREXIT : linkedList
                if (!error) return bl;
                if (call == null) return bl;
                LOG.warn(Thread.currentThread().getName() + ", call " + call + ": output error");
                done = true;
                Server.this.closeConnection(call.connection);
                return bl;
            }
            call = responseQueue.removeFirst();
            SocketChannel channel = call.connection.channel;
            if (LOG.isDebugEnabled()) {
                LOG.debug(Thread.currentThread().getName() + ": responding to " + call);
            }
            if ((numBytes = Server.this.channelWrite(channel, call.rpcResponse)) < 0) {
                boolean bl = true;
                // MONITOREXIT : linkedList
                if (!error) return bl;
                if (call == null) return bl;
                LOG.warn(Thread.currentThread().getName() + ", call " + call + ": output error");
                done = true;
                Server.this.closeConnection(call.connection);
                return bl;
            }
            try {
                if (!call.rpcResponse.hasRemaining()) {
                    call.rpcResponse = null;
                    call.connection.decRpcCount();
                    done = numElements == 1;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(Thread.currentThread().getName() + ": responding to " + call + " Wrote " + numBytes + " bytes.");
                    }
                } else {
                    call.connection.responseQueue.addFirst(call);
                    if (inHandler) {
                        call.timestamp = Time.now();
                        this.incPending();
                        try {
                            this.writeSelector.wakeup();
                            channel.register(this.writeSelector, 4, call);
                        }
                        catch (ClosedChannelException e) {
                            done = true;
                        }
                        finally {
                            this.decPending();
                        }
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(Thread.currentThread().getName() + ": responding to " + call + " Wrote partial " + numBytes + " bytes.");
                    }
                }
                error = false;
                // MONITOREXIT : linkedList
                return done;
            }
            finally {
                if (error && call != null) {
                    LOG.warn(Thread.currentThread().getName() + ", call " + call + ": output error");
                    done = true;
                    Server.this.closeConnection(call.connection);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void doRespond(Call call) throws IOException {
            LinkedList linkedList = call.connection.responseQueue;
            synchronized (linkedList) {
                call.connection.responseQueue.addLast(call);
                if (call.connection.responseQueue.size() == 1) {
                    this.processResponse(call.connection.responseQueue, true);
                }
            }
        }

        private synchronized void incPending() {
            ++this.pending;
        }

        private synchronized void decPending() {
            --this.pending;
            this.notify();
        }

        private synchronized void waitPending() throws InterruptedException {
            while (this.pending > 0) {
                this.wait();
            }
        }
    }

    private class Listener
    extends Thread {
        private ServerSocketChannel acceptChannel = null;
        private Selector selector = null;
        private Reader[] readers = null;
        private int currentReader = 0;
        private InetSocketAddress address;
        private int backlogLength = Server.access$400(Server.this).getInt("ipc.server.listen.queue.size", 128);

        public Listener() throws IOException {
            this.address = new InetSocketAddress(Server.this.bindAddress, Server.this.port);
            this.acceptChannel = ServerSocketChannel.open();
            this.acceptChannel.configureBlocking(false);
            Server.bind(this.acceptChannel.socket(), this.address, this.backlogLength, Server.this.conf, Server.this.portRangeConfig);
            Server.this.port = this.acceptChannel.socket().getLocalPort();
            this.selector = Selector.open();
            this.readers = new Reader[Server.this.readThreads];
            for (int i = 0; i < Server.this.readThreads; ++i) {
                Reader reader;
                this.readers[i] = reader = new Reader("Socket Reader #" + (i + 1) + " for port " + Server.this.port);
                reader.start();
            }
            this.acceptChannel.register(this.selector, 16);
            this.setName("IPC Server listener on " + Server.this.port);
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            LOG.info(Thread.currentThread().getName() + ": starting");
            SERVER.set(Server.this);
            Server.this.connectionManager.startIdleScan();
            while (Server.this.running) {
                SelectionKey key = null;
                try {
                    this.getSelector().select();
                    Iterator<SelectionKey> iter = this.getSelector().selectedKeys().iterator();
                    while (iter.hasNext()) {
                        key = iter.next();
                        iter.remove();
                        try {
                            if (key.isValid() && key.isAcceptable()) {
                                this.doAccept(key);
                            }
                        }
                        catch (IOException e) {
                            // empty catch block
                        }
                        key = null;
                    }
                }
                catch (OutOfMemoryError e) {
                    LOG.warn("Out of Memory in server select", e);
                    this.closeCurrentConnection(key, e);
                    Server.this.connectionManager.closeIdle(true);
                    try {
                        Thread.sleep(60000L);
                    }
                    catch (Exception exception) {}
                }
                catch (Exception e) {
                    this.closeCurrentConnection(key, e);
                }
            }
            LOG.info("Stopping " + Thread.currentThread().getName());
            Listener listener = this;
            synchronized (listener) {
                try {
                    this.acceptChannel.close();
                    this.selector.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                this.selector = null;
                this.acceptChannel = null;
                Server.this.connectionManager.stopIdleScan();
                Server.this.connectionManager.closeAll();
            }
        }

        private void closeCurrentConnection(SelectionKey key, Throwable e) {
            Connection c;
            if (key != null && (c = (Connection)key.attachment()) != null) {
                Server.this.closeConnection(c);
                Object var3_3 = null;
            }
        }

        InetSocketAddress getAddress() {
            return (InetSocketAddress)this.acceptChannel.socket().getLocalSocketAddress();
        }

        void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
            SocketChannel channel;
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            while ((channel = server.accept()) != null) {
                channel.configureBlocking(false);
                channel.socket().setTcpNoDelay(Server.this.tcpNoDelay);
                channel.socket().setKeepAlive(true);
                Reader reader = this.getReader();
                Connection c = Server.this.connectionManager.register(channel);
                if (c == null) {
                    if (!channel.isOpen()) continue;
                    IOUtils.cleanup(null, channel);
                    continue;
                }
                key.attach(c);
                reader.addConnection(c);
            }
        }

        void doRead(SelectionKey key) throws InterruptedException {
            int count;
            Connection c = (Connection)key.attachment();
            if (c == null) {
                return;
            }
            c.setLastContact(Time.now());
            try {
                count = c.readAndProcess();
            }
            catch (InterruptedException ieo) {
                LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
                throw ieo;
            }
            catch (Exception e) {
                if (!(e instanceof WrappedRpcServerExceptionSuppressed)) {
                    LOG.info(Thread.currentThread().getName() + ": readAndProcess from client " + c.getHostAddress() + " threw exception [" + e + "]", e instanceof WrappedRpcServerException ? null : e);
                }
                count = -1;
            }
            if (count < 0) {
                Server.this.closeConnection(c);
                c = null;
            } else {
                c.setLastContact(Time.now());
            }
        }

        synchronized void doStop() {
            if (this.selector != null) {
                this.selector.wakeup();
                Thread.yield();
            }
            if (this.acceptChannel != null) {
                try {
                    this.acceptChannel.socket().close();
                }
                catch (IOException e) {
                    LOG.info(Thread.currentThread().getName() + ":Exception in closing listener socket. " + e);
                }
            }
            for (Reader r : this.readers) {
                r.shutdown();
            }
        }

        synchronized Selector getSelector() {
            return this.selector;
        }

        Reader getReader() {
            this.currentReader = (this.currentReader + 1) % this.readers.length;
            return this.readers[this.currentReader];
        }

        private class Reader
        extends Thread {
            private final BlockingQueue<Connection> pendingConnections;
            private final Selector readSelector;

            Reader(String name) throws IOException {
                super(name);
                this.pendingConnections = new LinkedBlockingQueue<Connection>(Server.this.readerPendingConnectionQueue);
                this.readSelector = Selector.open();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                LOG.info("Starting " + Thread.currentThread().getName());
                try {
                    this.doRunLoop();
                }
                finally {
                    try {
                        this.readSelector.close();
                    }
                    catch (IOException ioe) {
                        LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);
                    }
                }
            }

            private synchronized void doRunLoop() {
                while (Server.this.running) {
                    SelectionKey key = null;
                    try {
                        int size;
                        for (int i = size = this.pendingConnections.size(); i > 0; --i) {
                            Connection conn = this.pendingConnections.take();
                            conn.channel.register(this.readSelector, 1, conn);
                        }
                        this.readSelector.select();
                        Iterator<SelectionKey> iter = this.readSelector.selectedKeys().iterator();
                        while (iter.hasNext()) {
                            key = iter.next();
                            iter.remove();
                            if (key.isValid() && key.isReadable()) {
                                Listener.this.doRead(key);
                            }
                            key = null;
                        }
                    }
                    catch (InterruptedException e) {
                        if (!Server.this.running) continue;
                        LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
                    }
                    catch (IOException ex) {
                        LOG.error("Error in Reader", ex);
                    }
                }
            }

            public void addConnection(Connection conn) throws InterruptedException {
                this.pendingConnections.put(conn);
                this.readSelector.wakeup();
            }

            void shutdown() {
                assert (!Server.this.running);
                this.readSelector.wakeup();
                try {
                    super.interrupt();
                    super.join();
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public static class Call
    implements Schedulable {
        private final int callId;
        private final int retryCount;
        private final Writable rpcRequest;
        private final Connection connection;
        private long timestamp;
        private ByteBuffer rpcResponse;
        private final RPC.RpcKind rpcKind;
        private final byte[] clientId;
        private final Span traceSpan;
        private final CallerContext callerContext;

        public Call(int id, int retryCount, Writable param, Connection connection) {
            this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID);
        }

        public Call(int id, int retryCount, Writable param, Connection connection, RPC.RpcKind kind, byte[] clientId) {
            this(id, retryCount, param, connection, kind, clientId, null, null);
        }

        public Call(int id, int retryCount, Writable param, Connection connection, RPC.RpcKind kind, byte[] clientId, Span span, CallerContext callerContext) {
            this.callId = id;
            this.retryCount = retryCount;
            this.rpcRequest = param;
            this.connection = connection;
            this.timestamp = Time.now();
            this.rpcResponse = null;
            this.rpcKind = kind;
            this.clientId = clientId;
            this.traceSpan = span;
            this.callerContext = callerContext;
        }

        public String toString() {
            return this.rpcRequest + " from " + this.connection + " Call#" + this.callId + " Retry#" + this.retryCount;
        }

        public void setResponse(ByteBuffer response) {
            this.rpcResponse = response;
        }

        @Override
        public UserGroupInformation getUserGroupInformation() {
            return this.connection.user;
        }
    }

    static class RpcKindMapValue {
        final Class<? extends Writable> rpcRequestWrapperClass;
        final RPC.RpcInvoker rpcInvoker;

        RpcKindMapValue(Class<? extends Writable> rpcRequestWrapperClass, RPC.RpcInvoker rpcInvoker) {
            this.rpcInvoker = rpcInvoker;
            this.rpcRequestWrapperClass = rpcRequestWrapperClass;
        }
    }

    static class ExceptionsHandler {
        private volatile Set<String> terseExceptions = new HashSet<String>();
        private volatile Set<String> suppressedExceptions = new HashSet<String>();

        ExceptionsHandler() {
        }

        void addTerseLoggingExceptions(Class<?> ... exceptionClass) {
            this.terseExceptions = ExceptionsHandler.addExceptions(this.terseExceptions, exceptionClass);
        }

        void addSuppressedLoggingExceptions(Class<?> ... exceptionClass) {
            this.suppressedExceptions = ExceptionsHandler.addExceptions(this.suppressedExceptions, exceptionClass);
        }

        boolean isTerseLog(Class<?> t) {
            return this.terseExceptions.contains(t.toString());
        }

        boolean isSuppressedLog(Class<?> t) {
            return this.suppressedExceptions.contains(t.toString());
        }

        private static Set<String> addExceptions(Set<String> exceptionsSet, Class<?>[] exceptionClass) {
            HashSet<String> newSet = new HashSet<String>(exceptionsSet);
            for (Class<?> name : exceptionClass) {
                newSet.add(name.toString());
            }
            return Collections.unmodifiableSet(newSet);
        }
    }
}

