/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.client.api.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientRequest;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
import java.io.Closeable;
import java.io.File;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.PrettyPrinter;
import org.codehaus.jackson.map.AnnotationIntrospector;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class TimelineClientImpl
extends TimelineClient {
    private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
    private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
    private static final Joiner JOINER = Joiner.on("");
    public static final int DEFAULT_SOCKET_TIMEOUT = 60000;
    private static Options opts = new Options();
    private static final String ENTITY_DATA_TYPE = "entity";
    private static final String DOMAIN_DATA_TYPE = "domain";
    private Client client;
    private ConnectionConfigurator connConfigurator;
    private DelegationTokenAuthenticator authenticator;
    private DelegationTokenAuthenticatedURL.Token token;
    private URI resURI;
    private UserGroupInformation authUgi;
    private String doAsUser;
    private Path activePath = null;
    private FileSystem fs = null;
    private Set<String> summaryEntityTypes;
    private ObjectMapper objMapper = null;
    private long flushIntervalSecs;
    private long cleanIntervalSecs;
    private long ttl;
    private LogFDsCache logFDsCache = null;
    private boolean isAppendSupported;
    private Configuration conf;
    private float timeLineServiceVersion;
    private AttemptDirCache attemptDirCache;
    private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND = "yarn.timeline-service.entity-file.fs-support-append";
    private static final short APP_LOG_DIR_PERMISSIONS = 504;
    private static final short FILE_LOG_PERMISSIONS = 416;
    private static final String DOMAIN_LOG_PREFIX = "domainlog-";
    private static final String SUMMARY_LOG_PREFIX = "summarylog-";
    private static final String ENTITY_LOG_PREFIX = "entitylog-";
    @InterfaceAudience.Private
    @VisibleForTesting
    TimelineClientConnectionRetry connectionRetry;
    private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR;

    public TimelineClientImpl() {
        super(TimelineClientImpl.class.getName());
    }

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        this.conf = conf;
        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
        UserGroupInformation realUgi = ugi.getRealUser();
        if (realUgi != null) {
            this.authUgi = realUgi;
            this.doAsUser = ugi.getShortUserName();
        } else {
            this.authUgi = ugi;
            this.doAsUser = null;
        }
        DefaultClientConfig cc = new DefaultClientConfig();
        cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
        this.connConfigurator = TimelineClientImpl.newConnConfigurator(conf);
        this.authenticator = UserGroupInformation.isSecurityEnabled() ? new KerberosDelegationTokenAuthenticator() : new PseudoDelegationTokenAuthenticator();
        this.authenticator.setConnectionConfigurator(this.connConfigurator);
        this.token = new DelegationTokenAuthenticatedURL.Token();
        this.connectionRetry = new TimelineClientConnectionRetry(conf);
        this.client = new Client((ClientHandler)new URLConnectionClientHandler((HttpURLConnectionFactory)new TimelineURLConnectionFactory()), (ClientConfig)cc);
        TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
        this.client.addFilter((ClientFilter)retryFilter);
        this.resURI = YarnConfiguration.useHttps(conf) ? URI.create(JOINER.join("https://", conf.get("yarn.timeline-service.webapp.https.address", "0.0.0.0:8190"), RESOURCE_URI_STR)) : URI.create(JOINER.join("http://", conf.get("yarn.timeline-service.webapp.address", "0.0.0.0:8188"), RESOURCE_URI_STR));
        LOG.info("Timeline service address: " + this.resURI);
        this.timeLineServiceVersion = conf.getFloat("yarn.timeline-service.version", 1.0f);
        super.serviceInit(conf);
    }

    @Override
    protected void serviceStart() throws Exception {
        if ((double)this.timeLineServiceVersion == 1.5) {
            Configuration conf = new Configuration(this.getConfig());
            conf.setBoolean("dfs.client.retry.policy.enabled", true);
            String retryPolicy = conf.get("yarn.resourcemanager.fs.state-store.retry-policy-spec", "2000, 500");
            conf.set("dfs.client.retry.policy.spec", retryPolicy);
            this.activePath = new Path(conf.get("yarn.timeline-service.entity-group-fs-store.active-dir", "/tmp/entity-file-history/active"));
            this.fs = this.activePath.getFileSystem(conf);
            if (!this.fs.exists(this.activePath)) {
                throw new IOException(this.activePath + " does not exist");
            }
            this.summaryEntityTypes = new HashSet<String>(conf.getStringCollection("yarn.timeline-service.entity-group-fs-store.summary-entity-types"));
            this.objMapper = this.createObjectMapper();
            this.flushIntervalSecs = conf.getLong("yarn.timeline-service.client.fd-flush-interval-secs", 10L);
            this.cleanIntervalSecs = conf.getLong("yarn.timeline-service.client.fd-clean-interval-secs", 60L);
            this.ttl = conf.getLong("yarn.timeline-service.client.fd-retain-secs", 300L);
            long timerTaskTTL = conf.getLong("yarn.timeline-service.client.internal-timers-ttl-secs", 420L);
            this.logFDsCache = new LogFDsCache(this.flushIntervalSecs, this.cleanIntervalSecs, this.ttl, timerTaskTTL);
            this.isAppendSupported = conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
            int attemptDirCacheSize = conf.getInt("yarn.timeline-service.client.internal-attempt-dir-cache-size", 1000);
            this.attemptDirCache = new AttemptDirCache(attemptDirCacheSize, this.fs, this.activePath);
            if (LOG.isDebugEnabled()) {
                LOG.debug("yarn.timeline-service.client.fd-flush-interval-secs:" + this.flushIntervalSecs + ", " + "yarn.timeline-service.client.fd-clean-interval-secs" + ":" + this.cleanIntervalSecs + ", " + "yarn.timeline-service.client.fd-retain-secs" + ":" + this.ttl + ", " + TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND + ":" + this.isAppendSupported);
            }
        }
    }

    @Override
    protected void serviceStop() throws Exception {
        if (this.logFDsCache != null) {
            this.logFDsCache.close();
        }
        super.serviceStop();
    }

    @Override
    public void flush() throws IOException {
        if (this.logFDsCache != null) {
            this.logFDsCache.flush();
        }
    }

    @Override
    public TimelinePutResponse putEntities(TimelineEntity ... entities) throws IOException, YarnException {
        TimelineEntities entitiesContainer = new TimelineEntities();
        entitiesContainer.addEntities(Arrays.asList(entities));
        ClientResponse resp = this.doPosting(entitiesContainer, null);
        return (TimelinePutResponse)resp.getEntity(TimelinePutResponse.class);
    }

    @Override
    public void putDomain(TimelineDomain domain) throws IOException, YarnException {
        this.doPosting(domain, DOMAIN_DATA_TYPE);
    }

    private ClientResponse doPosting(final Object obj, final String path) throws IOException, YarnException {
        ClientResponse resp;
        try {
            resp = this.authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>(){

                @Override
                public ClientResponse run() throws Exception {
                    return TimelineClientImpl.this.doPostingObject(obj, path);
                }
            });
        }
        catch (UndeclaredThrowableException e) {
            throw new IOException(e.getCause());
        }
        catch (InterruptedException ie) {
            throw new IOException(ie);
        }
        if (resp == null || resp.getClientResponseStatus() != ClientResponse.Status.OK) {
            String msg = "Failed to get the response from the timeline server.";
            LOG.error(msg);
            if (LOG.isDebugEnabled() && resp != null) {
                String output = (String)resp.getEntity(String.class);
                LOG.debug("HTTP error code: " + resp.getStatus() + " Server response : \n" + output);
            }
            throw new YarnException(msg);
        }
        return resp;
    }

    @Override
    public Token<TimelineDelegationTokenIdentifier> getDelegationToken(final String renewer) throws IOException, YarnException {
        PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>> getDTAction = new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>(){

            @Override
            public Token<TimelineDelegationTokenIdentifier> run() throws Exception {
                DelegationTokenAuthenticatedURL authUrl = new DelegationTokenAuthenticatedURL(TimelineClientImpl.this.authenticator, TimelineClientImpl.this.connConfigurator);
                return authUrl.getDelegationToken(TimelineClientImpl.this.resURI.toURL(), TimelineClientImpl.this.token, renewer, TimelineClientImpl.this.doAsUser);
            }
        };
        return (Token)this.operateDelegationToken(getDTAction);
    }

    @Override
    public long renewDelegationToken(final Token<TimelineDelegationTokenIdentifier> timelineDT) throws IOException, YarnException {
        final boolean isTokenServiceAddrEmpty = timelineDT.getService().toString().isEmpty();
        final String scheme = isTokenServiceAddrEmpty ? null : (YarnConfiguration.useHttps(this.getConfig()) ? "https" : "http");
        final InetSocketAddress address = isTokenServiceAddrEmpty ? null : SecurityUtil.getTokenServiceAddr(timelineDT);
        PrivilegedExceptionAction<Long> renewDTAction = new PrivilegedExceptionAction<Long>(){

            @Override
            public Long run() throws Exception {
                if (!timelineDT.equals(TimelineClientImpl.this.token.getDelegationToken())) {
                    TimelineClientImpl.this.token.setDelegationToken(timelineDT);
                }
                DelegationTokenAuthenticatedURL authUrl = new DelegationTokenAuthenticatedURL(TimelineClientImpl.this.authenticator, TimelineClientImpl.this.connConfigurator);
                URI serviceURI = isTokenServiceAddrEmpty ? TimelineClientImpl.this.resURI : new URI(scheme, null, address.getHostName(), address.getPort(), TimelineClientImpl.RESOURCE_URI_STR, null, null);
                return authUrl.renewDelegationToken(serviceURI.toURL(), TimelineClientImpl.this.token, TimelineClientImpl.this.doAsUser);
            }
        };
        return (Long)this.operateDelegationToken(renewDTAction);
    }

    @Override
    public void cancelDelegationToken(final Token<TimelineDelegationTokenIdentifier> timelineDT) throws IOException, YarnException {
        final boolean isTokenServiceAddrEmpty = timelineDT.getService().toString().isEmpty();
        final String scheme = isTokenServiceAddrEmpty ? null : (YarnConfiguration.useHttps(this.getConfig()) ? "https" : "http");
        final InetSocketAddress address = isTokenServiceAddrEmpty ? null : SecurityUtil.getTokenServiceAddr(timelineDT);
        PrivilegedExceptionAction<Void> cancelDTAction = new PrivilegedExceptionAction<Void>(){

            @Override
            public Void run() throws Exception {
                if (!timelineDT.equals(TimelineClientImpl.this.token.getDelegationToken())) {
                    TimelineClientImpl.this.token.setDelegationToken(timelineDT);
                }
                DelegationTokenAuthenticatedURL authUrl = new DelegationTokenAuthenticatedURL(TimelineClientImpl.this.authenticator, TimelineClientImpl.this.connConfigurator);
                URI serviceURI = isTokenServiceAddrEmpty ? TimelineClientImpl.this.resURI : new URI(scheme, null, address.getHostName(), address.getPort(), TimelineClientImpl.RESOURCE_URI_STR, null, null);
                authUrl.cancelDelegationToken(serviceURI.toURL(), TimelineClientImpl.this.token, TimelineClientImpl.this.doAsUser);
                return null;
            }
        };
        this.operateDelegationToken(cancelDTAction);
    }

    private Object operateDelegationToken(PrivilegedExceptionAction<?> action) throws IOException, YarnException {
        TimelineClientRetryOp tokenRetryOp = this.createTimelineClientRetryOpForOperateDelegationToken(action);
        return this.connectionRetry.retryOn(tokenRetryOp);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public ClientResponse doPostingObject(Object object, String path) {
        WebResource webResource = this.client.resource(this.resURI);
        if (path == null) {
            return (ClientResponse)((WebResource.Builder)webResource.accept(new String[]{"application/json"}).type("application/json")).post(ClientResponse.class, object);
        }
        if (path.equals(DOMAIN_DATA_TYPE)) {
            return (ClientResponse)((WebResource.Builder)webResource.path(path).accept(new String[]{"application/json"}).type("application/json")).put(ClientResponse.class, object);
        }
        throw new YarnRuntimeException("Unknown resource type");
    }

    private static ConnectionConfigurator newConnConfigurator(Configuration conf) {
        try {
            return TimelineClientImpl.newSslConnConfigurator(60000, conf);
        }
        catch (Exception e) {
            LOG.debug("Cannot load customized ssl related configuration. Fallback to system-generic settings.", e);
            return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
        }
    }

    private static ConnectionConfigurator newSslConnConfigurator(final int timeout, Configuration conf) throws IOException, GeneralSecurityException {
        SSLFactory factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
        factory.init();
        final SSLSocketFactory sf = factory.createSSLSocketFactory();
        final HostnameVerifier hv = factory.getHostnameVerifier();
        return new ConnectionConfigurator(){

            @Override
            public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
                if (conn instanceof HttpsURLConnection) {
                    HttpsURLConnection c = (HttpsURLConnection)conn;
                    c.setSSLSocketFactory(sf);
                    c.setHostnameVerifier(hv);
                }
                TimelineClientImpl.setTimeouts(conn, timeout);
                return conn;
            }
        };
    }

    private static void setTimeouts(URLConnection connection, int socketTimeout) {
        connection.setConnectTimeout(socketTimeout);
        connection.setReadTimeout(socketTimeout);
    }

    public static void main(String[] argv) throws Exception {
        String path;
        CommandLine cliParser = new GnuParser().parse(opts, argv);
        if (cliParser.hasOption("put") && (path = cliParser.getOptionValue("put")) != null && path.length() > 0) {
            if (cliParser.hasOption(ENTITY_DATA_TYPE)) {
                TimelineClientImpl.putTimelineDataInJSONFile(path, ENTITY_DATA_TYPE);
                return;
            }
            if (cliParser.hasOption(DOMAIN_DATA_TYPE)) {
                TimelineClientImpl.putTimelineDataInJSONFile(path, DOMAIN_DATA_TYPE);
                return;
            }
        }
        TimelineClientImpl.printUsage();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void putTimelineDataInJSONFile(String path, String type) {
        File jsonFile = new File(path);
        if (!jsonFile.exists()) {
            LOG.error("File [" + jsonFile.getAbsolutePath() + "] doesn't exist");
            return;
        }
        ObjectMapper mapper = new ObjectMapper();
        YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
        TimelineEntities entities = null;
        TimelineDomains domains = null;
        try {
            if (type.equals(ENTITY_DATA_TYPE)) {
                entities = (TimelineEntities)mapper.readValue(jsonFile, TimelineEntities.class);
            } else if (type.equals(DOMAIN_DATA_TYPE)) {
                domains = (TimelineDomains)mapper.readValue(jsonFile, TimelineDomains.class);
            }
        }
        catch (Exception e) {
            LOG.error("Error when reading  " + e.getMessage());
            e.printStackTrace(System.err);
            return;
        }
        YarnConfiguration conf = new YarnConfiguration();
        TimelineClient client = TimelineClient.createTimelineClient();
        client.init(conf);
        client.start();
        try {
            if (UserGroupInformation.isSecurityEnabled() && conf.getBoolean("yarn.timeline-service.enabled", false)) {
                Token<TimelineDelegationTokenIdentifier> token = client.getDelegationToken(UserGroupInformation.getCurrentUser().getUserName());
                UserGroupInformation.getCurrentUser().addToken(token);
            }
            if (type.equals(ENTITY_DATA_TYPE)) {
                TimelinePutResponse response = client.putEntities(entities.getEntities().toArray(new TimelineEntity[entities.getEntities().size()]));
                if (response.getErrors().size() == 0) {
                    LOG.info("Timeline entities are successfully put");
                } else {
                    for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
                        LOG.error("TimelineEntity [" + error.getEntityType() + ":" + error.getEntityId() + "] is not successfully put. Error code: " + error.getErrorCode());
                    }
                }
            } else if (type.equals(DOMAIN_DATA_TYPE)) {
                boolean hasError = false;
                for (TimelineDomain domain : domains.getDomains()) {
                    try {
                        client.putDomain(domain);
                    }
                    catch (Exception e) {
                        LOG.error("Error when putting domain " + domain.getId(), e);
                        hasError = true;
                    }
                }
                if (!hasError) {
                    LOG.info("Timeline domains are successfully put");
                }
            }
        }
        catch (RuntimeException e) {
            LOG.error("Error when putting the timeline data", e);
        }
        catch (Exception e) {
            LOG.error("Error when putting the timeline data", e);
        }
        finally {
            client.stop();
        }
    }

    private static void printUsage() {
        new HelpFormatter().printHelp("TimelineClient", opts);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public UserGroupInformation getUgi() {
        return this.authUgi;
    }

    @Override
    public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, TimelineEntity ... entities) throws IOException, YarnException {
        if ((double)this.timeLineServiceVersion != 1.5) {
            throw new YarnException("this API is not supported in current timeline service version:" + this.timeLineServiceVersion);
        }
        if (appAttemptId == null) {
            return this.putEntities(entities);
        }
        ArrayList<TimelineEntity> entitiesToDB = new ArrayList<TimelineEntity>();
        ArrayList<TimelineEntity> entitiesToSummary = new ArrayList<TimelineEntity>();
        ArrayList<TimelineEntity> entitiesToEntity = new ArrayList<TimelineEntity>();
        Path attemptDir = this.attemptDirCache.getAppAttemptDir(appAttemptId);
        for (TimelineEntity entity : entities) {
            if (this.summaryEntityTypes.contains(entity.getEntityType())) {
                entitiesToSummary.add(entity);
                continue;
            }
            if (groupId != null) {
                entitiesToEntity.add(entity);
                continue;
            }
            entitiesToDB.add(entity);
        }
        if (!entitiesToSummary.isEmpty()) {
            Path summaryLogPath = new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Writing summary log for " + appAttemptId.toString() + " to " + summaryLogPath);
            }
            this.logFDsCache.writeSummaryEntityLogs(this.fs, summaryLogPath, this.objMapper, appAttemptId, entitiesToSummary, this.isAppendSupported);
        }
        if (!entitiesToEntity.isEmpty()) {
            Path entityLogPath = new Path(attemptDir, ENTITY_LOG_PREFIX + groupId.toString());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Writing entity log for " + groupId.toString() + " to " + entityLogPath);
            }
            this.logFDsCache.writeEntityLogs(this.fs, entityLogPath, this.objMapper, appAttemptId, groupId, entitiesToEntity, this.isAppendSupported);
        }
        if (!entitiesToDB.isEmpty()) {
            this.putEntities(entitiesToDB.toArray(new TimelineEntity[entitiesToDB.size()]));
        }
        return new TimelinePutResponse();
    }

    @Override
    public void putDomain(ApplicationAttemptId appAttemptId, TimelineDomain domain) throws IOException, YarnException {
        if ((double)this.timeLineServiceVersion != 1.5) {
            throw new YarnException("this API is not supported in current timeline service version:" + this.timeLineServiceVersion);
        }
        if (appAttemptId == null) {
            this.putDomain(domain);
        } else {
            this.writeDomain(appAttemptId, domain);
        }
    }

    private ObjectMapper createObjectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.setAnnotationIntrospector((AnnotationIntrospector)new JaxbAnnotationIntrospector());
        mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
        mapper.configure(SerializationConfig.Feature.CLOSE_CLOSEABLE, false);
        return mapper;
    }

    private void writeDomain(ApplicationAttemptId appAttemptId, TimelineDomain domain) throws IOException {
        Path domainLogPath = new Path(this.attemptDirCache.getAppAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX + appAttemptId.toString());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Writing domains for " + appAttemptId.toString() + " to " + domainLogPath);
        }
        this.logFDsCache.writeDomainLog(this.fs, domainLogPath, this.objMapper, domain, this.isAppendSupported);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public TimelineClientRetryOp createTimelineClientRetryOpForOperateDelegationToken(PrivilegedExceptionAction<?> action) throws IOException {
        return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi, action);
    }

    static {
        opts.addOption("put", true, "Put the timeline entities/domain in a JSON file");
        opts.getOption("put").setArgName("Path to the JSON file");
        opts.addOption(ENTITY_DATA_TYPE, false, "Specify the JSON file contains the entities");
        opts.addOption(DOMAIN_DATA_TYPE, false, "Specify the JSON file contains the domain");
        opts.addOption("help", false, "Print usage");
        DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator(){

            @Override
            public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
                TimelineClientImpl.setTimeouts(conn, 60000);
                return conn;
            }
        };
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public class TimelineClientRetryOpForOperateDelegationToken
    extends TimelineClientRetryOp {
        private final UserGroupInformation authUgi;
        private final PrivilegedExceptionAction<?> action;

        public TimelineClientRetryOpForOperateDelegationToken(UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
            this.authUgi = authUgi;
            this.action = action;
        }

        @Override
        public Object run() throws IOException {
            this.authUgi.checkTGTAndReloginFromKeytab();
            try {
                return this.authUgi.doAs(this.action);
            }
            catch (UndeclaredThrowableException e) {
                throw new IOException(e.getCause());
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }

        @Override
        public boolean shouldRetryOn(Exception e) {
            return e instanceof ConnectException || e instanceof SocketTimeoutException;
        }
    }

    private static class AttemptDirCache {
        private final int attemptDirCacheSize;
        private final Map<ApplicationAttemptId, Path> attemptDirCache;
        private final FileSystem fs;
        private final Path activePath;

        public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) {
            this.attemptDirCacheSize = cacheSize;
            this.attemptDirCache = new LinkedHashMap<ApplicationAttemptId, Path>(this.attemptDirCacheSize, 0.75f, true){
                private static final long serialVersionUID = 1L;

                @Override
                protected boolean removeEldestEntry(Map.Entry<ApplicationAttemptId, Path> eldest) {
                    return this.size() > AttemptDirCache.this.attemptDirCacheSize;
                }
            };
            this.fs = fs;
            this.activePath = activePath;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Path getAppAttemptDir(ApplicationAttemptId attemptId) throws IOException {
            Path attemptDir = this.attemptDirCache.get(attemptId);
            if (attemptDir == null) {
                AttemptDirCache attemptDirCache = this;
                synchronized (attemptDirCache) {
                    attemptDir = this.attemptDirCache.get(attemptId);
                    if (attemptDir == null) {
                        attemptDir = this.createAttemptDir(attemptId);
                        this.attemptDirCache.put(attemptId, attemptDir);
                    }
                }
            }
            return attemptDir;
        }

        private Path createAttemptDir(ApplicationAttemptId appAttemptId) throws IOException {
            Path appDir = this.createApplicationDir(appAttemptId.getApplicationId());
            Path attemptDir = new Path(appDir, appAttemptId.toString());
            if (!this.fs.exists(attemptDir)) {
                FileSystem.mkdirs(this.fs, attemptDir, new FsPermission(504));
                if (LOG.isDebugEnabled()) {
                    LOG.debug("New attempt directory created - " + attemptDir);
                }
            }
            return attemptDir;
        }

        private Path createApplicationDir(ApplicationId appId) throws IOException {
            Path appDir = new Path(this.activePath, appId.toString());
            if (!this.fs.exists(appDir)) {
                FileSystem.mkdirs(this.fs, appDir, new FsPermission(504));
                if (LOG.isDebugEnabled()) {
                    LOG.debug("New app directory created - " + appDir);
                }
            }
            return appDir;
        }
    }

    private static class LogFDsCache
    implements Closeable,
    Flushable {
        private DomainLogFD domainLogFD = null;
        private Map<ApplicationAttemptId, EntityLogFD> summanyLogFDs;
        private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDs;
        private Timer flushTimer = null;
        private Timer cleanInActiveFDsTimer = null;
        private Timer monitorTaskTimer = null;
        private final long ttl;
        private final ReentrantLock domainFDLocker = new ReentrantLock();
        private final ReentrantLock summaryTableLocker = new ReentrantLock();
        private final ReentrantLock entityTableLocker = new ReentrantLock();
        private volatile boolean serviceStopped = false;
        private volatile boolean timerTaskStarted = false;
        private final ReentrantLock timerTaskLocker = new ReentrantLock();
        private final long flushIntervalSecs;
        private final long cleanIntervalSecs;
        private final long timerTaskRetainTTL;
        private volatile long timeStampOfLastWrite = System.currentTimeMillis();
        private final ReentrantReadWriteLock.ReadLock timerTasksMonitorReadLock;
        private final ReentrantReadWriteLock.WriteLock timerTasksMonitorWriteLock;

        public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs, long ttl, long timerTaskRetainTTL) {
            this.summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>();
            this.entityLogFDs = new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>>();
            this.flushIntervalSecs = flushIntervalSecs;
            this.cleanIntervalSecs = cleanIntervalSecs;
            this.ttl = ttl * 1000L;
            long timerTaskRetainTTLVar = timerTaskRetainTTL * 1000L;
            if (timerTaskRetainTTLVar > this.ttl) {
                this.timerTaskRetainTTL = timerTaskRetainTTLVar;
            } else {
                this.timerTaskRetainTTL = this.ttl + 120000L;
                LOG.warn("The specific yarn.timeline-service.client.internal-timers-ttl-secs : " + timerTaskRetainTTL + " is invalid, because it is less than or " + "equal to " + "yarn.timeline-service.client.fd-retain-secs" + " : " + ttl + ". Use " + "yarn.timeline-service.client.fd-retain-secs" + " : " + ttl + " + 120s instead.");
            }
            ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
            this.timerTasksMonitorReadLock = lock.readLock();
            this.timerTasksMonitorWriteLock = lock.writeLock();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void flush() throws IOException {
            try {
                this.domainFDLocker.lock();
                if (this.domainLogFD != null) {
                    this.domainLogFD.flush();
                }
            }
            finally {
                this.domainFDLocker.unlock();
            }
            this.flushSummaryFDMap(this.summanyLogFDs);
            this.flushEntityFDMap(this.entityLogFDs);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void flushSummaryFDMap(Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
            if (!logFDs.isEmpty()) {
                for (Map.Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs.entrySet()) {
                    EntityLogFD logFD = logFDEntry.getValue();
                    try {
                        logFD.lock();
                        logFD.flush();
                    }
                    finally {
                        logFD.unlock();
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void flushEntityFDMap(Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
            if (!logFDs.isEmpty()) {
                for (Map.Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
                    HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap = logFDMapEntry.getValue();
                    for (Map.Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry : logFDMap.entrySet()) {
                        EntityLogFD logFD = logFDEntry.getValue();
                        try {
                            logFD.lock();
                            logFD.flush();
                        }
                        finally {
                            logFD.unlock();
                        }
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cleanInActiveFDs() {
            long currentTimeStamp = System.currentTimeMillis();
            try {
                this.domainFDLocker.lock();
                if (this.domainLogFD != null && currentTimeStamp - this.domainLogFD.getLastModifiedTime() >= this.ttl) {
                    this.domainLogFD.close();
                    this.domainLogFD = null;
                }
            }
            finally {
                this.domainFDLocker.unlock();
            }
            this.cleanInActiveSummaryFDsforMap(this.summanyLogFDs, currentTimeStamp);
            this.cleanInActiveEntityFDsforMap(this.entityLogFDs, currentTimeStamp);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cleanInActiveSummaryFDsforMap(Map<ApplicationAttemptId, EntityLogFD> logFDs, long currentTimeStamp) {
            if (!logFDs.isEmpty()) {
                for (Map.Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs.entrySet()) {
                    EntityLogFD logFD = logFDEntry.getValue();
                    try {
                        logFD.lock();
                        if (currentTimeStamp - logFD.getLastModifiedTime() < this.ttl) continue;
                        logFD.close();
                    }
                    finally {
                        logFD.unlock();
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cleanInActiveEntityFDsforMap(Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs, long currentTimeStamp) {
            if (!logFDs.isEmpty()) {
                for (Map.Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
                    HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap = logFDMapEntry.getValue();
                    for (Map.Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry : logFDMap.entrySet()) {
                        EntityLogFD logFD = logFDEntry.getValue();
                        try {
                            logFD.lock();
                            if (currentTimeStamp - logFD.getLastModifiedTime() < this.ttl) continue;
                            logFD.close();
                        }
                        finally {
                            logFD.unlock();
                        }
                    }
                }
            }
        }

        private void monitorTimerTasks() {
            if (System.currentTimeMillis() - this.timeStampOfLastWrite >= this.timerTaskRetainTTL) {
                this.cancelAndCloseTimerTasks();
                this.timerTaskStarted = false;
            } else if (this.monitorTaskTimer != null) {
                this.monitorTaskTimer.schedule((TimerTask)new TimerMonitorTask(), this.timerTaskRetainTTL);
            }
        }

        @Override
        public void close() throws IOException {
            this.serviceStopped = true;
            this.cancelAndCloseTimerTasks();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void cancelAndCloseTimerTasks() {
            if (this.flushTimer != null) {
                this.flushTimer.cancel();
                this.flushTimer = null;
            }
            if (this.cleanInActiveFDsTimer != null) {
                this.cleanInActiveFDsTimer.cancel();
                this.cleanInActiveFDsTimer = null;
            }
            if (this.monitorTaskTimer != null) {
                this.monitorTaskTimer.cancel();
                this.monitorTaskTimer = null;
            }
            try {
                this.domainFDLocker.lock();
                if (this.domainLogFD != null) {
                    this.domainLogFD.close();
                    this.domainLogFD = null;
                }
            }
            finally {
                this.domainFDLocker.unlock();
            }
            this.closeSummaryFDs(this.summanyLogFDs);
            this.closeEntityFDs(this.entityLogFDs);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeEntityFDs(Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) {
            try {
                this.entityTableLocker.lock();
                if (!logFDs.isEmpty()) {
                    for (Map.Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
                        HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap = logFDMapEntry.getValue();
                        for (Map.Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry : logFDMap.entrySet()) {
                            EntityLogFD logFD = logFDEntry.getValue();
                            try {
                                logFD.lock();
                                logFD.close();
                            }
                            finally {
                                logFD.unlock();
                            }
                        }
                    }
                }
            }
            finally {
                this.entityTableLocker.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeSummaryFDs(Map<ApplicationAttemptId, EntityLogFD> logFDs) {
            try {
                this.summaryTableLocker.lock();
                if (!logFDs.isEmpty()) {
                    for (Map.Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs.entrySet()) {
                        EntityLogFD logFD = logFDEntry.getValue();
                        try {
                            logFD.lock();
                            logFD.close();
                        }
                        finally {
                            logFD.unlock();
                        }
                    }
                }
            }
            finally {
                this.summaryTableLocker.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void writeDomainLog(FileSystem fs, Path logPath, ObjectMapper objMapper, TimelineDomain domain, boolean isAppendSupported) throws IOException {
            this.checkAndStartTimeTasks();
            try {
                this.domainFDLocker.lock();
                if (this.domainLogFD != null) {
                    this.domainLogFD.writeDomain(domain);
                } else {
                    this.domainLogFD = new DomainLogFD(fs, logPath, objMapper, isAppendSupported);
                    this.domainLogFD.writeDomain(domain);
                }
            }
            finally {
                this.domainFDLocker.unlock();
            }
        }

        public void writeEntityLogs(FileSystem fs, Path entityLogPath, ObjectMapper objMapper, ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity, boolean isAppendSupported) throws IOException {
            this.checkAndStartTimeTasks();
            this.writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId, groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void writeEntityLogs(FileSystem fs, Path logPath, ObjectMapper objMapper, ApplicationAttemptId attemptId, TimelineEntityGroupId groupId, List<TimelineEntity> entities, boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
            HashMap<TimelineEntityGroupId, EntityLogFD> logMapFD = logFDs.get(attemptId);
            if (logMapFD != null) {
                EntityLogFD logFD = logMapFD.get(groupId);
                if (logFD != null) {
                    try {
                        logFD.lock();
                        if (this.serviceStopped) {
                            return;
                        }
                        logFD.writeEntities(entities);
                    }
                    finally {
                        logFD.unlock();
                    }
                } else {
                    this.createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId, entities, isAppendSupported, logFDs);
                }
            } else {
                this.createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId, entities, isAppendSupported, logFDs);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void createEntityFDandWrite(FileSystem fs, Path logPath, ObjectMapper objMapper, ApplicationAttemptId attemptId, TimelineEntityGroupId groupId, List<TimelineEntity> entities, boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
            try {
                EntityLogFD logFD;
                this.entityTableLocker.lock();
                if (this.serviceStopped) {
                    return;
                }
                HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap = logFDs.get(attemptId);
                if (logFDMap == null) {
                    logFDMap = new HashMap();
                }
                if ((logFD = logFDMap.get(groupId)) == null) {
                    logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
                }
                try {
                    logFD.lock();
                    logFD.writeEntities(entities);
                    logFDMap.put(groupId, logFD);
                    logFDs.put(attemptId, logFDMap);
                }
                finally {
                    logFD.unlock();
                }
            }
            finally {
                this.entityTableLocker.unlock();
            }
        }

        public void writeSummaryEntityLogs(FileSystem fs, Path logPath, ObjectMapper objMapper, ApplicationAttemptId attemptId, List<TimelineEntity> entities, boolean isAppendSupported) throws IOException {
            this.checkAndStartTimeTasks();
            this.writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities, isAppendSupported, this.summanyLogFDs);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void writeSummmaryEntityLogs(FileSystem fs, Path logPath, ObjectMapper objMapper, ApplicationAttemptId attemptId, List<TimelineEntity> entities, boolean isAppendSupported, Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
            EntityLogFD logFD = null;
            logFD = logFDs.get(attemptId);
            if (logFD != null) {
                try {
                    logFD.lock();
                    if (this.serviceStopped) {
                        return;
                    }
                    logFD.writeEntities(entities);
                }
                finally {
                    logFD.unlock();
                }
            } else {
                this.createSummaryFDAndWrite(fs, logPath, objMapper, attemptId, entities, isAppendSupported, logFDs);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void createSummaryFDAndWrite(FileSystem fs, Path logPath, ObjectMapper objMapper, ApplicationAttemptId attemptId, List<TimelineEntity> entities, boolean isAppendSupported, Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
            try {
                this.summaryTableLocker.lock();
                if (this.serviceStopped) {
                    return;
                }
                EntityLogFD logFD = logFDs.get(attemptId);
                if (logFD == null) {
                    logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
                }
                try {
                    logFD.lock();
                    logFD.writeEntities(entities);
                    logFDs.put(attemptId, logFD);
                }
                finally {
                    logFD.unlock();
                }
            }
            finally {
                this.summaryTableLocker.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void checkAndStartTimeTasks() {
            block7: {
                try {
                    this.timerTasksMonitorReadLock.lock();
                    this.timeStampOfLastWrite = System.currentTimeMillis();
                    if (this.timerTaskStarted) break block7;
                    try {
                        this.timerTaskLocker.lock();
                        if (!this.timerTaskStarted) {
                            this.createAndStartTimerTasks();
                            this.timerTaskStarted = true;
                        }
                    }
                    finally {
                        this.timerTaskLocker.unlock();
                    }
                }
                finally {
                    this.timerTasksMonitorReadLock.unlock();
                }
            }
        }

        private void createAndStartTimerTasks() {
            this.flushTimer = new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer", true);
            this.flushTimer.schedule((TimerTask)new FlushTimerTask(), this.flushIntervalSecs * 1000L, this.flushIntervalSecs * 1000L);
            this.cleanInActiveFDsTimer = new Timer(LogFDsCache.class.getSimpleName() + "cleanInActiveFDsTimer", true);
            this.cleanInActiveFDsTimer.schedule((TimerTask)new CleanInActiveFDsTask(), this.cleanIntervalSecs * 1000L, this.cleanIntervalSecs * 1000L);
            this.monitorTaskTimer = new Timer(LogFDsCache.class.getSimpleName() + "MonitorTimer", true);
            this.monitorTaskTimer.schedule((TimerTask)new TimerMonitorTask(), this.timerTaskRetainTTL);
        }

        private class TimerMonitorTask
        extends TimerTask {
            private TimerMonitorTask() {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    LogFDsCache.this.timerTasksMonitorWriteLock.lock();
                    LogFDsCache.this.monitorTimerTasks();
                }
                finally {
                    LogFDsCache.this.timerTasksMonitorWriteLock.unlock();
                }
            }
        }

        private class CleanInActiveFDsTask
        extends TimerTask {
            private CleanInActiveFDsTask() {
            }

            @Override
            public void run() {
                block2: {
                    try {
                        LogFDsCache.this.cleanInActiveFDs();
                    }
                    catch (Exception e) {
                        if (!LOG.isDebugEnabled()) break block2;
                        LOG.debug(e);
                    }
                }
            }
        }

        private class FlushTimerTask
        extends TimerTask {
            private FlushTimerTask() {
            }

            @Override
            public void run() {
                block2: {
                    try {
                        LogFDsCache.this.flush();
                    }
                    catch (Exception e) {
                        if (!LOG.isDebugEnabled()) break block2;
                        LOG.debug(e);
                    }
                }
            }
        }
    }

    private static class LogFD {
        private FSDataOutputStream stream;
        protected ObjectMapper objMapper;
        protected JsonGenerator jsonGenerator;
        protected long lastModifiedTime;
        private final boolean isAppendSupported;
        private final ReentrantLock fdLock = new ReentrantLock();
        private final FileSystem fs;
        private final Path logPath;

        public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, boolean isAppendSupported) throws IOException {
            this.fs = fs;
            this.logPath = logPath;
            this.isAppendSupported = isAppendSupported;
            this.objMapper = objMapper;
            this.prepareForWrite();
        }

        public void close() {
            if (this.stream != null) {
                IOUtils.cleanup(LOG, new Closeable[]{this.jsonGenerator});
                IOUtils.cleanup(LOG, this.stream);
                this.stream = null;
                this.jsonGenerator = null;
            }
        }

        public void flush() throws IOException {
            if (this.stream != null) {
                this.stream.hflush();
            }
        }

        public long getLastModifiedTime() {
            return this.lastModifiedTime;
        }

        protected void prepareForWrite() throws IOException {
            this.stream = this.createLogFileStream(this.fs, this.logPath);
            this.jsonGenerator = new JsonFactory().createJsonGenerator((OutputStream)this.stream);
            this.jsonGenerator.setPrettyPrinter((PrettyPrinter)new MinimalPrettyPrinter("\n"));
            this.jsonGenerator.configure(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM, false);
            this.lastModifiedTime = System.currentTimeMillis();
        }

        protected boolean writerClosed() {
            return this.stream == null;
        }

        private FSDataOutputStream createLogFileStream(FileSystem fs, Path logPath) throws IOException {
            FSDataOutputStream stream;
            if (!this.isAppendSupported) {
                logPath = new Path(logPath.getParent(), logPath.getName() + "_" + System.currentTimeMillis());
            }
            if (!fs.exists(logPath)) {
                stream = fs.create(logPath, false);
                fs.setPermission(logPath, new FsPermission(416));
            } else {
                stream = fs.append(logPath);
            }
            return stream;
        }

        public void lock() {
            this.fdLock.lock();
        }

        public void unlock() {
            this.fdLock.unlock();
        }
    }

    private static class EntityLogFD
    extends LogFD {
        public EntityLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, boolean isAppendSupported) throws IOException {
            super(fs, logPath, objMapper, isAppendSupported);
        }

        public synchronized void writeEntities(List<TimelineEntity> entities) throws IOException {
            if (this.writerClosed()) {
                this.prepareForWrite();
            }
            for (TimelineEntity entity : entities) {
                this.objMapper.writeValue(this.jsonGenerator, (Object)entity);
            }
            this.lastModifiedTime = System.currentTimeMillis();
        }
    }

    private static class DomainLogFD
    extends LogFD {
        public DomainLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper, boolean isAppendSupported) throws IOException {
            super(fs, logPath, objMapper, isAppendSupported);
        }

        public void writeDomain(TimelineDomain domain) throws IOException {
            this.objMapper.writeValue(this.jsonGenerator, (Object)domain);
            this.lastModifiedTime = System.currentTimeMillis();
        }
    }

    private class TimelineURLConnectionFactory
    implements HttpURLConnectionFactory {
        private TimelineURLConnectionFactory() {
        }

        public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
            TimelineClientImpl.this.authUgi.checkTGTAndReloginFromKeytab();
            try {
                return new DelegationTokenAuthenticatedURL(TimelineClientImpl.this.authenticator, TimelineClientImpl.this.connConfigurator).openConnection(url, TimelineClientImpl.this.token, TimelineClientImpl.this.doAsUser);
            }
            catch (UndeclaredThrowableException e) {
                throw new IOException(e.getCause());
            }
            catch (AuthenticationException ae) {
                throw new IOException(ae);
            }
        }
    }

    private class TimelineJerseyRetryFilter
    extends ClientFilter {
        private TimelineJerseyRetryFilter() {
        }

        public ClientResponse handle(final ClientRequest cr) throws ClientHandlerException {
            TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp(){

                @Override
                public Object run() {
                    return TimelineJerseyRetryFilter.this.getNext().handle(cr);
                }

                @Override
                public boolean shouldRetryOn(Exception e) {
                    return e instanceof ClientHandlerException && e.getCause() instanceof ConnectException;
                }
            };
            try {
                return (ClientResponse)TimelineClientImpl.this.connectionRetry.retryOn(jerseyRetryOp);
            }
            catch (IOException e) {
                throw new ClientHandlerException("Jersey retry failed!\nMessage: " + e.getMessage());
            }
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    static class TimelineClientConnectionRetry {
        @InterfaceAudience.Private
        @VisibleForTesting
        public int maxRetries;
        @InterfaceAudience.Private
        @VisibleForTesting
        public long retryInterval;
        private boolean retried = false;

        @InterfaceAudience.Private
        @VisibleForTesting
        boolean getRetired() {
            return this.retried;
        }

        public TimelineClientConnectionRetry(Configuration conf) {
            Preconditions.checkArgument(conf.getInt("yarn.timeline-service.client.max-retries", 30) >= -1, "%s property value should be greater than or equal to -1", "yarn.timeline-service.client.max-retries");
            Preconditions.checkArgument(conf.getLong("yarn.timeline-service.client.retry-interval-ms", 1000L) > 0L, "%s property value should be greater than zero", "yarn.timeline-service.client.retry-interval-ms");
            this.maxRetries = conf.getInt("yarn.timeline-service.client.max-retries", 30);
            this.retryInterval = conf.getLong("yarn.timeline-service.client.retry-interval-ms", 1000L);
        }

        public Object retryOn(TimelineClientRetryOp op) throws RuntimeException, IOException {
            int leftRetries = this.maxRetries;
            this.retried = false;
            while (true) {
                try {
                    return op.run();
                }
                catch (IOException | RuntimeException e) {
                    if (leftRetries != 0) {
                        if (!op.shouldRetryOn(e)) {
                            throw e;
                        }
                        this.logException(e, leftRetries);
                        if (leftRetries > 0) {
                            --leftRetries;
                        }
                        this.retried = true;
                        try {
                            Thread.sleep(this.retryInterval);
                        }
                        catch (InterruptedException ie) {
                            LOG.warn("Client retry sleep interrupted! ");
                        }
                        continue;
                    }
                    throw new RuntimeException("Failed to connect to timeline server. Connection retries limit exceeded. The posted timeline event may be missing");
                }
                break;
            }
        }

        private void logException(Exception e, int leftRetries) {
            if (leftRetries > 0) {
                LOG.info("Exception caught by TimelineClientConnectionRetry, will try " + leftRetries + " more time(s).\nMessage: " + e.getMessage());
            } else {
                LOG.info("ConnectionException caught by TimelineClientConnectionRetry, will keep retrying.\nMessage: " + e.getMessage());
            }
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public static abstract class TimelineClientRetryOp {
        public abstract Object run() throws IOException;

        public abstract boolean shouldRetryOn(Exception var1);
    }
}

