/*
 * Decompiled with CFR 0.152.
 */
package io.dajac.kfn.invoker;

import io.dajac.kfn.invoker.Function;
import io.dajac.kfn.invoker.KeyValue;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionInvoker {
    private static final Logger LOG = LoggerFactory.getLogger(FunctionInvoker.class);
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final String inputTopic;
    private final String outputTopic;
    private final KafkaConsumer consumer;
    private final KafkaProducer producer;
    private final String functionName;
    private final String functionClass;
    private final Function function;

    public FunctionInvoker(Properties properties) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Properties functionProps = this.getProperitesWithPrefix(properties, "function.", true);
        this.functionName = functionProps.getProperty("name");
        this.functionClass = functionProps.getProperty("class");
        this.inputTopic = functionProps.getProperty("input");
        this.outputTopic = functionProps.getProperty("output");
        LOG.info("Loading {} function", (Object)this.functionClass);
        this.function = (Function)Class.forName(this.functionClass).newInstance();
        this.function.configure(functionProps);
        Properties consumerProps = this.getProperitesWithPrefix(properties, "consumer.", true);
        this.consumer = new KafkaConsumer(consumerProps);
        Properties producerProps = this.getProperitesWithPrefix(properties, "producer.", true);
        this.producer = new KafkaProducer(producerProps);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        LOG.info("Starting Function {}", (Object)this.functionName);
        this.isRunning.set(true);
        this.consumer.subscribe(Arrays.asList(this.inputTopic));
        try {
            while (this.isRunning.get()) {
                LOG.debug("Polling new records");
                ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
                LOG.debug("Consumed {} records", (Object)records.count());
                for (Object obj : records) {
                    final ConsumerRecord record = (ConsumerRecord)obj;
                    KeyValue result = this.function.apply(record.key(), record.value());
                    if (result == null) continue;
                    this.producer.send(new ProducerRecord(this.outputTopic, result.key, result.value), new Callback(){

                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            if (exception == null) {
                                LOG.info("Produced {}, Consumer Offset {}", (Object)metadata.offset(), (Object)record.offset());
                            } else {
                                LOG.error("Error while producing {}, Consumer Offset {}", (Object)exception.getMessage(), (Object)record.offset());
                            }
                        }
                    });
                }
                LOG.debug("Processed {} records", (Object)records.count());
            }
        }
        catch (WakeupException records) {
        }
        catch (Throwable t) {
            LOG.error("Got following error: {}", (Object)t.getMessage(), (Object)t);
        }
        finally {
            LOG.info("Shutting down Function {}", (Object)this.functionName);
            this.consumer.close();
            this.producer.close();
            this.function.close();
            this.shutdownLatch.countDown();
        }
        LOG.info("Shut down of Function {} complete", (Object)this.functionName);
    }

    public void shutdown() {
        LOG.info("Request shutting down the invoker");
        this.isRunning.set(false);
        this.consumer.wakeup();
        try {
            this.shutdownLatch.await();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public static Properties getPropsFromFile(String propsFile) throws IOException {
        Properties props = new Properties();
        if (propsFile == null) {
            return props;
        }
        try (FileInputStream propStream = new FileInputStream(propsFile);){
            props.load(propStream);
        }
        catch (IOException e) {
            throw new IOException("Couldn't load properties from " + propsFile, e);
        }
        return props;
    }

    public Properties getProperitesWithPrefix(Properties properties, String prefix, boolean strip) {
        Properties result = new Properties();
        for (String property : properties.stringPropertyNames()) {
            if (!property.startsWith(prefix) || property.length() <= prefix.length()) continue;
            if (strip) {
                result.put(property.substring(prefix.length()), properties.getProperty(property));
                continue;
            }
            result.put(property, properties.getProperty(property));
        }
        return result;
    }

    public static void main(String[] args) throws IllegalAccessException, InstantiationException, ClassNotFoundException, IOException {
        if (args.length != 1) {
            LOG.error("Properties file is required to start the instance");
            System.exit(1);
        }
        Properties properties = FunctionInvoker.getPropsFromFile(args[0]);
        FunctionInvoker invoker = new FunctionInvoker(properties);
        Runtime.getRuntime().addShutdownHook(new Thread(invoker::shutdown));
        invoker.run();
    }
}

