/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.container.request;

import com.linkedin.databus.container.netty.HttpRelay;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.CheckpointMult;
import com.linkedin.databus.core.DbusEventBufferBatchReadable;
import com.linkedin.databus.core.DbusEventBufferMult;
import com.linkedin.databus.core.Encoding;
import com.linkedin.databus.core.OffsetNotFoundException;
import com.linkedin.databus.core.ScnNotFoundException;
import com.linkedin.databus.core.StreamEventsResult;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.data_model.LogicalSource;
import com.linkedin.databus.core.data_model.PhysicalPartition;
import com.linkedin.databus.core.monitoring.mbean.StatsCollectors;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.core.container.monitoring.mbean.HttpStatisticsCollector;
import com.linkedin.databus2.core.container.request.DatabusRequest;
import com.linkedin.databus2.core.container.request.InvalidRequestParamValueException;
import com.linkedin.databus2.core.container.request.RequestProcessingException;
import com.linkedin.databus2.core.container.request.RequestProcessor;
import com.linkedin.databus2.core.filter.ConjunctionDbusFilter;
import com.linkedin.databus2.core.filter.DbusFilter;
import com.linkedin.databus2.core.filter.DbusKeyCompositeFilter;
import com.linkedin.databus2.core.filter.KeyFilterConfigJSONFactory;
import com.linkedin.databus2.core.filter.SourceDbusFilter;
import com.linkedin.databus2.schemas.SourceIdNameRegistry;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.security.spec.InvalidParameterSpecException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

public class ReadEventsRequestProcessor
implements RequestProcessor {
    public static final String MODULE = ReadEventsRequestProcessor.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    public static final String COMMAND_NAME = "stream";
    public static final String CHECKPOINT_PARAM = "checkPoint";
    public static final String CHECKPOINT_PARAM_MULT = "checkPointMult";
    public static final String FETCH_SIZE_PARAM = "size";
    public static final String OUTPUT_FORMAT_PARAM = "output";
    public static final String SOURCES_PARAM = "sources";
    public static final String KEY_MIN_PARAM = "keyMin";
    public static final String KEY_MAX_PARAM = "keyMax";
    public static final String OFFSET_PARAM = "offset";
    public static final String JSON_FORMAT_STRING = "json";
    public static final String BINARY_FORMAT_STRING = "binary";
    public static final String PARTITION_INFO_STRING = "filters";
    public static final String STREAM_FROM_LATEST_SCN = "streamFromLatestScn";
    public static final String SUBS_PARAM = "subs";
    private final ExecutorService _executorService;
    private final DbusEventBufferMult _eventBuffer;
    private final HttpRelay _relay;

    public ReadEventsRequestProcessor(ExecutorService executorService, HttpRelay relay) {
        this._executorService = executorService;
        this._relay = relay;
        this._eventBuffer = this._relay.getEventBuffer();
    }

    public ExecutorService getExecutorService() {
        return this._executorService;
    }

    public DatabusRequest process(DatabusRequest request) throws IOException, RequestProcessingException, DatabusException {
        boolean isDebug = LOG.isDebugEnabled();
        try {
            int clientEventVersion;
            ObjectMapper objMapper = new ObjectMapper();
            String checkpointString = request.getParams().getProperty(CHECKPOINT_PARAM, null);
            String checkpointStringMult = request.getParams().getProperty(CHECKPOINT_PARAM_MULT, null);
            int fetchSize = request.getRequiredIntParam(FETCH_SIZE_PARAM);
            String formatStr = request.getRequiredStringParam(OUTPUT_FORMAT_PARAM);
            Encoding enc = Encoding.valueOf((String)formatStr.toUpperCase());
            String sourcesListStr = request.getParams().getProperty(SOURCES_PARAM, null);
            String subsStr = request.getParams().getProperty(SUBS_PARAM, null);
            String partitionInfoStr = request.getParams().getProperty(PARTITION_INFO_STRING);
            String streamFromLatestSCNStr = request.getParams().getProperty(STREAM_FROM_LATEST_SCN);
            String clientMaxEventVersionStr = request.getParams().getProperty("maxev");
            int n = clientEventVersion = clientMaxEventVersionStr != null ? Integer.parseInt(clientMaxEventVersionStr) : 0;
            if (clientEventVersion < 0 || clientEventVersion == 1 || clientEventVersion > 2) {
                throw new InvalidRequestParamValueException(COMMAND_NAME, "maxev", clientMaxEventVersionStr);
            }
            if (null == sourcesListStr && null == subsStr) {
                throw new InvalidRequestParamValueException(COMMAND_NAME, "sources|subs", "null");
            }
            boolean v2Mode = null == subsStr;
            DbusKeyCompositeFilter keyCompositeFilter = null;
            if (null != partitionInfoStr) {
                try {
                    Map fMap = KeyFilterConfigJSONFactory.parseSrcIdFilterConfigMap((String)partitionInfoStr);
                    keyCompositeFilter = new DbusKeyCompositeFilter();
                    keyCompositeFilter.setFilterMap(fMap);
                    if (isDebug) {
                        LOG.debug((Object)("keyCompositeFilter is :" + keyCompositeFilter));
                    }
                }
                catch (Exception ex) {
                    String msg = "Got exception while parsing partition Configs. PartitionInfo is:" + partitionInfoStr;
                    LOG.error((Object)msg, (Throwable)ex);
                    throw new InvalidRequestParamValueException(COMMAND_NAME, PARTITION_INFO_STRING, partitionInfoStr);
                }
            }
            boolean streamFromLatestSCN = false;
            if (null != streamFromLatestSCNStr) {
                streamFromLatestSCN = Boolean.valueOf(streamFromLatestSCNStr);
            }
            long start = System.currentTimeMillis();
            ArrayList<DatabusSubscription> subs = null;
            SourceIdNameRegistry srcRegistry = this._relay.getSourcesIdNameRegistry();
            HashSet<Integer> sourceIds = new HashSet<Integer>();
            if (null != sourcesListStr) {
                String[] sourcesList;
                for (String sourceId : sourcesList = sourcesListStr.split(",")) {
                    try {
                        Integer srcId = Integer.valueOf(sourceId);
                        sourceIds.add(srcId);
                    }
                    catch (NumberFormatException nfe) {
                        HttpStatisticsCollector globalHttpStatsCollector = this._relay.getHttpStatisticsCollector();
                        if (null != globalHttpStatsCollector) {
                            globalHttpStatsCollector.registerInvalidStreamRequest();
                        }
                        throw new InvalidRequestParamValueException(COMMAND_NAME, SOURCES_PARAM, sourceId);
                    }
                }
            }
            NavigableSet<Object> ppartKeys = null;
            if (null != subsStr) {
                List subsBuilder = null;
                subsBuilder = (List)objMapper.readValue(subsStr, (TypeReference)new TypeReference<List<DatabusSubscription.Builder>>(){});
                subs = new ArrayList(subsBuilder.size());
                for (DatabusSubscription.Builder subBuilder : subsBuilder) {
                    subs.add(subBuilder.build());
                }
                ppartKeys = new TreeSet<DbusEventBufferMult.PhysicalPartitionKey>();
                for (DatabusSubscription sub : subs) {
                    PhysicalPartition ppart = sub.getPhysicalPartition();
                    if (ppart.isAnyPartitionWildcard()) {
                        ppartKeys = this._eventBuffer.getAllPhysicalPartitionKeys();
                        break;
                    }
                    ppartKeys.add(new DbusEventBufferMult.PhysicalPartitionKey(ppart));
                }
            }
            if (subs != null && checkpointStringMult == null && checkpointString != null) {
                throw new RequestProcessingException("Both Subscriptions and CheckpointMult should be present");
            }
            if (null == subs) {
                subs = new ArrayList<DatabusSubscription>();
            }
            for (Integer srcId : sourceIds) {
                LogicalSource lsource = srcRegistry.getSource(srcId);
                if (lsource == null) {
                    throw new InvalidRequestParamValueException(COMMAND_NAME, SOURCES_PARAM, srcId.toString());
                }
                if (isDebug) {
                    LOG.debug((Object)("registry returns " + lsource + " for srcid=" + srcId));
                }
                DatabusSubscription newSub = DatabusSubscription.createSimpleSourceSubscription((LogicalSource)lsource);
                subs.add(newSub);
            }
            DbusFilter ppartFilters = null;
            if (subs.size() > 0) {
                try {
                    ppartFilters = this._eventBuffer.constructFilters(subs);
                }
                catch (DatabusException de) {
                    throw new RequestProcessingException("unable to generate physical partitions filters:" + de.getMessage(), (Throwable)de);
                }
            }
            ConjunctionDbusFilter filters = new ConjunctionDbusFilter();
            if (v2Mode) {
                filters.addFilter((DbusFilter)new SourceDbusFilter(sourceIds));
            } else if (null != ppartFilters) {
                filters.addFilter(ppartFilters);
            }
            if (null != keyCompositeFilter) {
                filters.addFilter((DbusFilter)keyCompositeFilter);
            }
            Checkpoint cp = null;
            CheckpointMult cpMult = null;
            if (checkpointStringMult != null) {
                try {
                    cpMult = new CheckpointMult(checkpointStringMult);
                }
                catch (InvalidParameterSpecException e) {
                    LOG.error((Object)("Invalid CheckpointMult:" + checkpointStringMult), (Throwable)e);
                    throw new InvalidRequestParamValueException(COMMAND_NAME, "CheckpointMult", checkpointStringMult);
                }
            } else {
                cpMult = new CheckpointMult();
                for (Integer srcId : sourceIds) {
                    PhysicalPartition pPartition = this._eventBuffer.getPhysicalPartition(srcId.intValue());
                    if (pPartition == null) {
                        throw new RequestProcessingException("unable to find physical partitions for source:" + srcId);
                    }
                    if (checkpointString != null) {
                        cp = new Checkpoint(checkpointString);
                    } else {
                        cp = new Checkpoint();
                        cp.setFlexible();
                    }
                    cpMult.addCheckpoint(pPartition, cp);
                }
            }
            if (isDebug) {
                LOG.debug((Object)("checkpointStringMult = " + checkpointStringMult + ";singlecheckpointString=" + checkpointString + ";CPM=" + cpMult));
            }
            if (cpMult.getCursorPartition() == null) {
                cpMult.setCursorPartition(request.getCursorPartition());
            }
            if (isDebug && cpMult.getCursorPartition() != null) {
                LOG.debug((Object)("Using physical paritition cursor " + cpMult.getCursorPartition()));
            }
            if (cp == null) {
                Iterator it = sourceIds.iterator();
                if (it.hasNext()) {
                    Integer srcId;
                    srcId = (Integer)it.next();
                    PhysicalPartition pPartition = this._eventBuffer.getPhysicalPartition(srcId.intValue());
                    cp = cpMult.getCheckpoint(pPartition);
                } else {
                    cp = new Checkpoint();
                    cp.setFlexible();
                }
            }
            if (null != checkpointString && isDebug) {
                LOG.debug((Object)("About to stream from cp: " + checkpointString.toString()));
            }
            HttpStatisticsCollector globalHttpStatsCollector = this._relay.getHttpStatisticsCollector();
            HttpStatisticsCollector connHttpStatsCollector = null;
            if (null != globalHttpStatsCollector) {
                connHttpStatsCollector = (HttpStatisticsCollector)request.getParams().get(globalHttpStatsCollector.getName());
            }
            if (null != globalHttpStatsCollector) {
                globalHttpStatsCollector.registerStreamRequest(cp, sourceIds);
            }
            StatsCollectors statsCollectors = this._relay.getOutBoundStatsCollectors();
            try {
                DbusEventBufferBatchReadable bufRead = v2Mode ? this._eventBuffer.getDbusEventBufferBatchReadable(sourceIds, cpMult, statsCollectors) : this._eventBuffer.getDbusEventBufferBatchReadable(cpMult, ppartKeys, statsCollectors);
                int eventsRead = 0;
                int minPendingEventSize = 0;
                StreamEventsResult result = null;
                bufRead.setClientMaxEventVersion(clientEventVersion);
                if (v2Mode) {
                    result = bufRead.streamEvents(streamFromLatestSCN, fetchSize, (WritableByteChannel)request.getResponseContent(), enc, (DbusFilter)filters);
                    eventsRead = result.getNumEventsStreamed();
                    minPendingEventSize = result.getSizeOfPendingEvent();
                    if (isDebug) {
                        LOG.debug((Object)("Process: streamed " + eventsRead + " from sources " + Arrays.toString(sourceIds.toArray())));
                        LOG.debug((Object)("CP=" + cpMult));
                    }
                } else {
                    result = bufRead.streamEvents(streamFromLatestSCN, fetchSize, (WritableByteChannel)request.getResponseContent(), enc, (DbusFilter)filters);
                    eventsRead = result.getNumEventsStreamed();
                    minPendingEventSize = result.getSizeOfPendingEvent();
                    if (isDebug) {
                        LOG.debug((Object)("Process: streamed " + eventsRead + " with subscriptions " + subs));
                    }
                    if ((cpMult = bufRead.getCheckpointMult()) != null) {
                        request.setCursorPartition(cpMult.getCursorPartition());
                    }
                }
                if (eventsRead == 0 && minPendingEventSize > 0) {
                    request.getResponseContent().addMetadata("x-dbus-pending-event-size", (Object)minPendingEventSize);
                    LOG.debug((Object)("Returning 0 events but have pending event of size " + minPendingEventSize));
                }
            }
            catch (ScnNotFoundException snfe) {
                if (null != globalHttpStatsCollector) {
                    globalHttpStatsCollector.registerScnNotFoundStreamResponse();
                }
                throw new RequestProcessingException((Throwable)snfe);
            }
            catch (OffsetNotFoundException snfe) {
                LOG.error((Object)"OffsetNotFound", (Throwable)snfe);
                if (null != globalHttpStatsCollector) {
                    globalHttpStatsCollector.registerScnNotFoundStreamResponse();
                }
                throw new RequestProcessingException((Throwable)snfe);
            }
            if (null != connHttpStatsCollector) {
                connHttpStatsCollector.registerStreamResponse(System.currentTimeMillis() - start);
                globalHttpStatsCollector.merge(connHttpStatsCollector);
                connHttpStatsCollector.reset();
            } else if (null != globalHttpStatsCollector) {
                globalHttpStatsCollector.registerStreamResponse(System.currentTimeMillis() - start);
            }
        }
        catch (InvalidRequestParamValueException e) {
            HttpStatisticsCollector globalHttpStatsCollector = this._relay.getHttpStatisticsCollector();
            if (null != globalHttpStatsCollector) {
                globalHttpStatsCollector.registerInvalidStreamRequest();
            }
            throw e;
        }
        return request;
    }
}

