/*
 * Decompiled with CFR 0.152.
 */
package com.hbase.haxwell;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.hbase.haxwell.HaxwellMetrics;
import com.hbase.haxwell.api.HaxwellEvent;
import com.hbase.haxwell.api.HaxwellEventListener;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class HaxwellEventExecutor {
    private Log log = LogFactory.getLog(this.getClass());
    private HaxwellEventListener eventListener;
    private int numThreads;
    private int batchSize;
    private HaxwellMetrics HaxwellMetrics;
    private List<ThreadPoolExecutor> executors;
    private Multimap<Integer, HaxwellEvent> eventBuffers;
    private List<Future<?>> futures;
    private HashFunction hashFunction = Hashing.murmur3_32();
    private boolean stopped = false;

    public HaxwellEventExecutor(HaxwellEventListener eventListener, List<ThreadPoolExecutor> executors, int batchSize, HaxwellMetrics HaxwellMetrics2) {
        this.eventListener = eventListener;
        this.executors = executors;
        this.numThreads = executors.size();
        this.batchSize = batchSize;
        this.HaxwellMetrics = HaxwellMetrics2;
        this.eventBuffers = ArrayListMultimap.create((int)this.numThreads, (int)batchSize);
        this.futures = Lists.newArrayList();
    }

    public void scheduleHaxwellEvent(HaxwellEvent HaxwellEvent2) {
        if (this.stopped) {
            throw new IllegalStateException("This executor is stopped");
        }
        int partition = (this.hashFunction.hashBytes(HaxwellEvent2.getRow()).asInt() & Integer.MAX_VALUE) % this.numThreads;
        List eventBuffer = (List)this.eventBuffers.get((Object)partition);
        eventBuffer.add(HaxwellEvent2);
        if (eventBuffer.size() == this.batchSize) {
            this.scheduleEventBatch(partition, Lists.newArrayList((Iterable)eventBuffer));
            this.eventBuffers.removeAll((Object)partition);
        }
    }

    private void scheduleEventBatch(int partition, final List<HaxwellEvent> events) {
        Future<?> future = this.executors.get(partition).submit(new Runnable(){

            @Override
            public void run() {
                try {
                    long before = System.currentTimeMillis();
                    HaxwellEventExecutor.this.log.debug((Object)"Delivering message to listener");
                    HaxwellEventExecutor.this.eventListener.processEvents(events);
                    HaxwellEventExecutor.this.HaxwellMetrics.reportFilteredSepOperation(System.currentTimeMillis() - before);
                }
                catch (RuntimeException e) {
                    HaxwellEventExecutor.this.log.error((Object)"Error while processing event", (Throwable)e);
                    throw e;
                }
            }
        });
        this.futures.add(future);
    }

    public List<Future<?>> flush() {
        Iterator iterator = this.eventBuffers.keySet().iterator();
        while (iterator.hasNext()) {
            int partition = (Integer)iterator.next();
            List buffer = (List)this.eventBuffers.get((Object)partition);
            if (buffer.isEmpty()) continue;
            this.scheduleEventBatch(partition, Lists.newArrayList((Iterable)buffer));
        }
        this.eventBuffers.clear();
        ArrayList flushedFutures = Lists.newArrayList(this.futures);
        return flushedFutures;
    }
}

