/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.netty;

import com.linkedin.databus.client.ChunkedBodyReadableByteChannel;
import com.linkedin.databus.client.DatabusRelayConnectionStateMessage;
import com.linkedin.databus.client.netty.AbstractHttpResponseProcessorDecorator;
import com.linkedin.databus.client.netty.AbstractNettyHttpConnection;
import com.linkedin.databus.client.netty.RemoteExceptionHandler;
import com.linkedin.databus.core.DbusPrettyLogUtils;
import com.linkedin.databus.core.async.ActorMessageQueue;
import com.linkedin.databus2.core.container.ExtendedReadTimeoutHandler;
import com.linkedin.databus2.core.container.request.RegisterResponseEntry;
import com.linkedin.databus2.core.container.request.RegisterResponseMetadataEntry;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import org.jboss.netty.handler.codec.http.HttpResponse;

class RegisterHttpResponseProcessor
extends AbstractNettyHttpConnection.BaseHttpResponseProcessor {
    public static final String MODULE = RegisterHttpResponseProcessor.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private final ActorMessageQueue _callback;
    private final DatabusRelayConnectionStateMessage _stateReuse;
    private String _registerResponseVersionHdr;

    public RegisterHttpResponseProcessor(AbstractNettyHttpConnection parent, ActorMessageQueue relayThread, DatabusRelayConnectionStateMessage stateReuse, ExtendedReadTimeoutHandler readTimeOutHandler) {
        super(parent, readTimeOutHandler);
        this._callback = relayThread;
        this._stateReuse = stateReuse;
    }

    @Override
    public void startResponse(HttpResponse response) throws Exception {
        this._decorated = new ChunkedBodyReadableByteChannel();
        this._registerResponseVersionHdr = response.getHeader("x-dbus-protocol-version");
        super.startResponse(response);
    }

    @Override
    public void finishResponse() throws Exception {
        block11: {
            super.finishResponse();
            if (this._errorHandled) {
                return;
            }
            String registerResponseError = "/register response error: ";
            try {
                String exceptionName = RemoteExceptionHandler.getExceptionName((ChunkedBodyReadableByteChannel)this._decorated);
                if (null != exceptionName) {
                    LOG.error((Object)("/register response error: " + RemoteExceptionHandler.getExceptionMessage((ChunkedBodyReadableByteChannel)this._decorated)));
                    this._stateReuse.switchToRegisterResponseError();
                    break block11;
                }
                InputStream bodyStream = Channels.newInputStream((ReadableByteChannel)((Object)this._decorated));
                ObjectMapper mapper = new ObjectMapper();
                int registerResponseVersion = 3;
                if (this._registerResponseVersionHdr != null) {
                    try {
                        registerResponseVersion = Integer.parseInt(this._registerResponseVersionHdr);
                    }
                    catch (NumberFormatException e) {
                        throw new RuntimeException("Could not parse /register response protocol version: " + this._registerResponseVersionHdr);
                    }
                    if (registerResponseVersion < 2 || registerResponseVersion > 4) {
                        throw new RuntimeException("Out-of-range /register response protocol version: " + this._registerResponseVersionHdr);
                    }
                }
                if (registerResponseVersion == 4) {
                    HashMap responseMap = (HashMap)mapper.readValue(bodyStream, (TypeReference)new TypeReference<HashMap<String, List<Object>>>(){});
                    Map sourcesSchemasMap = RegisterResponseEntry.createFromResponse((Map)responseMap, (String)"sourceSchemas", (boolean)false);
                    Map keysSchemasMap = RegisterResponseEntry.createFromResponse((Map)responseMap, (String)"keySchemas", (boolean)true);
                    List metadataSchemasList = RegisterResponseMetadataEntry.createFromResponse((Map)responseMap, (String)"metadataSchemas", (boolean)true);
                    this._stateReuse.switchToRegisterSuccess(sourcesSchemasMap, keysSchemasMap, metadataSchemasList);
                } else {
                    List schemasList = (List)mapper.readValue(bodyStream, (TypeReference)new TypeReference<List<RegisterResponseEntry>>(){});
                    Map sourcesSchemasMap = RegisterResponseEntry.convertSchemaListToMap((List)schemasList);
                    this._stateReuse.switchToRegisterSuccess(sourcesSchemasMap, null, null);
                }
            }
            catch (IOException ex) {
                LOG.error((Object)"/register response error: ", (Throwable)ex);
                this._stateReuse.switchToRegisterResponseError();
            }
            catch (RuntimeException ex) {
                LOG.error((Object)"/register response error: ", (Throwable)ex);
                this._stateReuse.switchToRegisterResponseError();
            }
        }
        this._callback.enqueueMessage((Object)this._stateReuse);
    }

    @Override
    public void handleChannelException(Throwable cause) {
        DbusPrettyLogUtils.logExceptionAtError((String)"Exception during /register response: ", (Throwable)cause, (Logger)LOG);
        if (this._responseStatus != AbstractHttpResponseProcessorDecorator.ResponseStatus.CHUNKS_FINISHED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"Enqueueing /register response error state to puller queue");
            }
            this._stateReuse.switchToRegisterResponseError();
            this._callback.enqueueMessage((Object)this._stateReuse);
        } else if (LOG.isDebugEnabled()) {
            LOG.debug((Object)"Skipping enqueueing /register response error state to puller queue");
        }
        super.handleChannelException(cause);
    }
}

