/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.tools;

import com.pinterest.secor.thrift.TestEnum;
import com.pinterest.secor.thrift.TestMessage;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.protocol.TSimpleJSONProtocol;

public class TestLogMessageProducer
extends Thread {
    private final String mTopic;
    private final int mNumMessages;
    private final String mType;

    public TestLogMessageProducer(String topic, int numMessages, String type) {
        this.mTopic = topic;
        this.mNumMessages = numMessages;
        this.mType = type;
    }

    @Override
    public void run() {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", "localhost:9092");
        properties.put("partitioner.class", "com.pinterest.secor.tools.RandomPartitioner");
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("key.serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(properties);
        Producer producer = new Producer(config);
        TSimpleJSONProtocol.Factory protocol = null;
        if (this.mType.equals("json")) {
            protocol = new TSimpleJSONProtocol.Factory();
        } else if (this.mType.equals("binary")) {
            protocol = new TBinaryProtocol.Factory();
        } else {
            throw new RuntimeException("Undefined message encoding type: " + this.mType);
        }
        TSerializer serializer = new TSerializer((TProtocolFactory)protocol);
        for (int i = 0; i < this.mNumMessages; ++i) {
            byte[] bytes;
            TestMessage testMessage = new TestMessage(System.currentTimeMillis() * 1000000L + (long)i, "some_value_" + i);
            if (i % 2 == 0) {
                testMessage.setEnumField(TestEnum.SOME_VALUE);
            } else {
                testMessage.setEnumField(TestEnum.SOME_OTHER_VALUE);
            }
            try {
                bytes = serializer.serialize((TBase)testMessage);
            }
            catch (TException e) {
                throw new RuntimeException("Failed to serialize message " + testMessage, e);
            }
            KeyedMessage data = new KeyedMessage(this.mTopic, (Object)Integer.toString(i), (Object)bytes);
            producer.send(data);
        }
        producer.close();
    }
}

