/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute.source;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import com.olacabs.fabric.common.util.PropertyReader;
import com.olacabs.fabric.common.util.metrics.MetricConstants;
import com.olacabs.fabric.common.util.metrics.MetricFactory;
import com.olacabs.fabric.compute.ProcessingContext;
import com.olacabs.fabric.compute.pipeline.CommsIdGenerator;
import com.olacabs.fabric.compute.pipeline.MessageSource;
import com.olacabs.fabric.compute.pipeline.NotificationBus;
import com.olacabs.fabric.compute.pipeline.PipelineMessage;
import com.olacabs.fabric.compute.pipeline.SourceIdBasedTransactionIdGenerator;
import com.olacabs.fabric.compute.pipeline.TransactionIdGenerator;
import com.olacabs.fabric.compute.source.PipelineSource;
import com.olacabs.fabric.compute.util.MetaConstants;
import com.olacabs.fabric.model.common.ComponentMetadata;
import com.olacabs.fabric.model.event.EventSet;
import com.olacabs.fabric.model.event.RawEventBundle;
import io.astefanutti.metrics.aspectj.AnnotatedMetric;
import io.astefanutti.metrics.aspectj.MetricAspect;
import io.astefanutti.metrics.aspectj.MetricStaticAspect;
import io.astefanutti.metrics.aspectj.Metrics;
import io.astefanutti.metrics.aspectj.Profiled;
import io.astefanutti.metrics.aspectj.TimedAspect;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.reflect.MethodSignature;
import org.aspectj.runtime.internal.AroundClosure;
import org.aspectj.runtime.reflect.Factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

@Metrics
public class PipelineStreamSource
implements MessageSource,
Profiled {
    private static final Logger log;
    private final ExecutorService executorService;
    private final int id;
    private final String instanceId;
    private final Properties properties;
    private final TransactionIdGenerator transactionIdGenerator;
    private final ComponentMetadata sourceMetadata;
    private final NotificationBus notificationBus;
    private final PipelineSource source;
    private final ProcessingContext processingContext;
    private final ObjectMapper objectMapper;
    private final Histogram batchSizeHistogram;
    private LinkedBlockingQueue<EventSet> delivered;
    private ConcurrentMap<Long, EventSet> messages;
    private Future<Boolean> generatorFuture;
    private boolean jsonConversion;
    private String topologyName;
    public Map<String, AnnotatedMetric<Gauge>> gauges;
    public Map<String, AnnotatedMetric<Meter>> meters;
    public Map<String, AnnotatedMetric<Timer>> timers;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_2;

    public PipelineStreamSource(String instanceId, Properties properties, NotificationBus notificationBus, ComponentMetadata sourceMetadata, String topologyName, PipelineSource source, ProcessingContext processingContext, ObjectMapper objectMapper, MetricRegistry registry) {
        MetricAspect.ajc$interFieldInit$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$gauges((Profiled)this);
        MetricAspect.ajc$interFieldInit$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$meters((Profiled)this);
        MetricAspect.ajc$interFieldInit$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$timers((Profiled)this);
        try {
            this.executorService = Executors.newSingleThreadExecutor();
            this.id = CommsIdGenerator.nextId();
            this.transactionIdGenerator = new SourceIdBasedTransactionIdGenerator(this);
            this.jsonConversion = true;
            this.instanceId = instanceId;
            this.properties = properties;
            this.notificationBus = notificationBus;
            this.sourceMetadata = sourceMetadata;
            this.source = source;
            this.processingContext = processingContext;
            this.objectMapper = objectMapper;
            this.topologyName = topologyName;
            this.batchSizeHistogram = registry.histogram(MetricRegistry.name(PipelineStreamSource.class, (String[])new String[]{topologyName, instanceId, "batchSize"}));
        }
        finally {
            MetricAspect.aspectOf().ajc$after$io_astefanutti_metrics_aspectj_MetricAspect$1$c735687d((Profiled)this);
        }
    }

    @Override
    public int communicationId() {
        return this.id;
    }

    @Override
    public boolean sendsNormalMessage() {
        return true;
    }

    public void initialize(Properties globalProperties) throws Exception {
        Integer count = PropertyReader.readInt((Properties)this.properties, (Properties)globalProperties, (String)MetaConstants.getComputationKey("eventset.in_flight_count"), (Integer)5);
        this.jsonConversion = PropertyReader.readBoolean((Properties)this.properties, (Properties)globalProperties, (String)MetaConstants.getComputationKey("eventset.is_serialized"), (Boolean)true);
        this.delivered = new LinkedBlockingQueue(count);
        this.messages = Maps.newConcurrentMap();
        this.source.initialize(this.instanceId, globalProperties, this.properties, this.processingContext, this.sourceMetadata);
        this.transactionIdGenerator.seed(this.seedTransactionId());
        this.notificationBus.source(this);
    }

    protected long seedTransactionId() {
        return 0L;
    }

    @Timed(name="${this.topologyName}.${this.instanceId}.acks")
    public synchronized void ackMessage(EventSet eventSet) {
        EventSet eventSet2 = eventSet;
        PipelineStreamSource.ackMessage_aroundBody1$advice(this, eventSet2, TimedAspect.aspectOf(), this, null, ajc$tjp_0);
    }

    public void start() {
        this.generatorFuture = this.executorService.submit(() -> {
            MDC.put((String)"componentId", (String)this.instanceId);
            try {
                this.generateMessage();
            }
            catch (Exception e) {
                log.error("Error thrown by source while generating message: ", (Throwable)e);
            }
            log.info("Done with message sourcing..");
            MDC.remove((String)"componentId");
            return null;
        });
    }

    public void stop() {
        log.info("Stopping source: {}", (Object)this.source.getClass().getName());
        this.source.stop();
        log.info("Stopped source: {}", (Object)this.source.getClass().getName());
        if (null != this.generatorFuture) {
            this.generatorFuture.cancel(true);
        }
        this.executorService.shutdownNow();
    }

    @Timed(name="${this.topologyName}.${this.instanceId}.batches")
    private RawEventBundle generator() {
        return (RawEventBundle)PipelineStreamSource.generator_aroundBody3$advice(this, TimedAspect.aspectOf(), this, null, ajc$tjp_1);
    }

    public void generateMessage() throws InterruptedException {
        MDC.put((String)"componentId", (String)this.instanceId);
        while (!Thread.currentThread().isInterrupted()) {
            try {
                RawEventBundle eventBundle = this.generator();
                eventBundle.getEvents().forEach(event -> {
                    if (this.jsonConversion) {
                        try {
                            if (event.getData() instanceof byte[]) {
                                event.setJsonNode(this.objectMapper.readTree((byte[])event.getData()));
                            } else if (event.getData() instanceof String) {
                                event.setJsonNode((JsonNode)this.objectMapper.readValue((String)event.getData(), ObjectNode.class));
                            } else {
                                event.setJsonNode(this.objectMapper.valueToTree(event.getData()));
                            }
                        }
                        catch (Throwable t) {
                            log.error("Error generating json payload: ", t);
                        }
                    }
                });
                EventSet eventSet = EventSet.eventFromSourceBuilder().id(this.transactionIdGenerator.transactionId()).sourceId(this.communicationId()).transactionId(eventBundle.getTransactionId()).meta(eventBundle.getMeta()).events((Collection)eventBundle.getEvents()).partitionId(eventBundle.getPartitionId()).build();
                this.batchSizeHistogram.update(eventBundle.getEvents().size());
                this.messages.put(eventSet.getId(), eventSet);
                this.delivered.put(eventSet);
                this.notificationBus.publish(PipelineMessage.userspaceMessageBuilder().messages(eventSet).build(), this.id);
            }
            catch (Exception e) {
                log.error("Blocked exception while reading message: ", (Throwable)e);
                MetricFactory.getMetricRegistry().meter(Joiner.on((String)".").join((Object)MetricConstants.getSourceExceptionPrefix(), (Object)this.getTopologyName(), new Object[]{this.getInstanceId()})).mark();
            }
        }
        MDC.remove((String)"componentId");
    }

    public boolean healthcheck() {
        return this.source.healthcheck();
    }

    public static PipelineStreamSourceBuilder builder() {
        return new PipelineStreamSourceBuilder();
    }

    public String getInstanceId() {
        return this.instanceId;
    }

    public ComponentMetadata getSourceMetadata() {
        return this.sourceMetadata;
    }

    public String getTopologyName() {
        return this.topologyName;
    }

    static {
        PipelineStreamSource.ajc$preClinit();
        try {
            log = LoggerFactory.getLogger(PipelineStreamSource.class);
        }
        catch (Throwable throwable) {
            if (throwable instanceof ExceptionInInitializerError) {
                throw (ExceptionInInitializerError)throwable;
            }
            MetricStaticAspect.aspectOf().ajc$after$io_astefanutti_metrics_aspectj_MetricStaticAspect$1$be47261c(ajc$tjp_2);
            throw throwable;
        }
        MetricStaticAspect.aspectOf().ajc$after$io_astefanutti_metrics_aspectj_MetricStaticAspect$1$be47261c(ajc$tjp_2);
    }

    public /* synthetic */ Map ajc$interFieldGet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$gauges() {
        return this.gauges;
    }

    public /* synthetic */ void ajc$interFieldSet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$gauges(Map map) {
        this.gauges = map;
    }

    public /* synthetic */ Map ajc$interFieldGet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$meters() {
        return this.meters;
    }

    public /* synthetic */ void ajc$interFieldSet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$meters(Map map) {
        this.meters = map;
    }

    public /* synthetic */ Map ajc$interFieldGet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$timers() {
        return this.timers;
    }

    public /* synthetic */ void ajc$interFieldSet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$timers(Map map) {
        this.timers = map;
    }

    private static final /* synthetic */ void ackMessage_aroundBody0(PipelineStreamSource ajc$this, EventSet eventSet) {
        MDC.put((String)"componentId", (String)ajc$this.instanceId);
        if (!ajc$this.messages.containsKey(eventSet.getId())) {
            log.error("Event set {} has already been acked. Maybe the topology is weird!!", (Object)eventSet.getId());
            return;
        }
        EventSet minMessage = ajc$this.delivered.peek();
        if (null == minMessage) {
            log.error("There are no unacked messages!! This is impossible!!");
            return;
        }
        if (minMessage.getId() != eventSet.getId()) {
            log.error("Got an out of bound message. Acceptable: {} Got: {}", (Object)minMessage.getId(), (Object)eventSet.getId());
            return;
        }
        minMessage = ajc$this.delivered.poll();
        log.debug("Acked message set: {} Partition id: {}", (Object)minMessage.getId(), (Object)minMessage.getPartitionId());
        ajc$this.messages.remove(eventSet.getId());
        ajc$this.source.ack(RawEventBundle.builder().events(minMessage.getEvents()).partitionId(minMessage.getPartitionId()).transactionId(eventSet.getTransactionId()).meta(minMessage.getMeta()).build());
        MDC.remove((String)"componentId");
    }

    private static final /* synthetic */ Object ackMessage_aroundBody1$advice(PipelineStreamSource ajc$this, EventSet eventSet, TimedAspect ajc$aspectInstance, Profiled object, AroundClosure ajc$aroundClosure, JoinPoint.StaticPart thisJoinPointStaticPart) {
        String methodSignature = ((MethodSignature)thisJoinPointStaticPart.getSignature()).getMethod().toString();
        Timer timer = (Timer)((AnnotatedMetric)MetricAspect.ajc$interFieldGetDispatch$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$timers((Profiled)object).get(methodSignature)).getMetric();
        Timer.Context context = timer.time();
        try {
            AroundClosure aroundClosure = ajc$aroundClosure;
            Profiled profiled = object;
            PipelineStreamSource.ackMessage_aroundBody0((PipelineStreamSource)profiled, eventSet);
            Object var10_11 = null;
            return var10_11;
        }
        finally {
            context.stop();
        }
    }

    private static final /* synthetic */ Object generator_aroundBody3$advice(PipelineStreamSource ajc$this, TimedAspect ajc$aspectInstance, Profiled object, AroundClosure ajc$aroundClosure, JoinPoint.StaticPart thisJoinPointStaticPart) {
        String methodSignature = ((MethodSignature)thisJoinPointStaticPart.getSignature()).getMethod().toString();
        Timer timer = (Timer)((AnnotatedMetric)MetricAspect.ajc$interFieldGetDispatch$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$timers((Profiled)object).get(methodSignature)).getMetric();
        Timer.Context context = timer.time();
        try {
            AroundClosure aroundClosure = ajc$aroundClosure;
            Profiled profiled = object;
            RawEventBundle rawEventBundle = ((PipelineStreamSource)profiled).source.getNewEvents();
            return rawEventBundle;
        }
        finally {
            context.stop();
        }
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("PipelineStreamSource.java", PipelineStreamSource.class);
        ajc$tjp_0 = factory.makeSJP("method-execution", (Signature)factory.makeMethodSig("21", "ackMessage", "com.olacabs.fabric.compute.source.PipelineStreamSource", "com.olacabs.fabric.model.event.EventSet", "eventSet", "", "void"), 130);
        ajc$tjp_1 = factory.makeSJP("method-execution", (Signature)factory.makeMethodSig("2", "generator", "com.olacabs.fabric.compute.source.PipelineStreamSource", "", "", "", "com.olacabs.fabric.model.event.RawEventBundle"), 183);
        ajc$tjp_2 = factory.makeSJP("staticinitialization", (Signature)factory.makeInitializerSig("8", "com.olacabs.fabric.compute.source.PipelineStreamSource"), 50);
    }

    public static class PipelineStreamSourceBuilder {
        private String instanceId;
        private Properties properties;
        private NotificationBus notificationBus;
        private ComponentMetadata sourceMetadata;
        private String topologyName;
        private PipelineSource source;
        private ProcessingContext processingContext;
        private ObjectMapper objectMapper;
        private MetricRegistry registry;

        PipelineStreamSourceBuilder() {
        }

        public PipelineStreamSourceBuilder instanceId(String instanceId) {
            this.instanceId = instanceId;
            return this;
        }

        public PipelineStreamSourceBuilder properties(Properties properties) {
            this.properties = properties;
            return this;
        }

        public PipelineStreamSourceBuilder notificationBus(NotificationBus notificationBus) {
            this.notificationBus = notificationBus;
            return this;
        }

        public PipelineStreamSourceBuilder sourceMetadata(ComponentMetadata sourceMetadata) {
            this.sourceMetadata = sourceMetadata;
            return this;
        }

        public PipelineStreamSourceBuilder topologyName(String topologyName) {
            this.topologyName = topologyName;
            return this;
        }

        public PipelineStreamSourceBuilder source(PipelineSource source) {
            this.source = source;
            return this;
        }

        public PipelineStreamSourceBuilder processingContext(ProcessingContext processingContext) {
            this.processingContext = processingContext;
            return this;
        }

        public PipelineStreamSourceBuilder objectMapper(ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
            return this;
        }

        public PipelineStreamSourceBuilder registry(MetricRegistry registry) {
            this.registry = registry;
            return this;
        }

        public PipelineStreamSource build() {
            return new PipelineStreamSource(this.instanceId, this.properties, this.notificationBus, this.sourceMetadata, this.topologyName, this.source, this.processingContext, this.objectMapper, this.registry);
        }

        public String toString() {
            return "PipelineStreamSource.PipelineStreamSourceBuilder(instanceId=" + this.instanceId + ", properties=" + this.properties + ", notificationBus=" + this.notificationBus + ", sourceMetadata=" + this.sourceMetadata + ", topologyName=" + this.topologyName + ", source=" + this.source + ", processingContext=" + this.processingContext + ", objectMapper=" + this.objectMapper + ", registry=" + this.registry + ")";
        }
    }
}

