/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client;

import com.linkedin.databus.client.pub.DbusEventDecoder;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DbusEvent;
import com.linkedin.databus.core.DbusEventPart;
import com.linkedin.databus2.schemas.SchemaId;
import com.linkedin.databus2.schemas.VersionedSchema;
import com.linkedin.databus2.schemas.VersionedSchemaSet;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;

public class DbusEventAvroDecoder
implements DbusEventDecoder {
    public static final String MODULE = DbusEventAvroDecoder.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    public static final String SRC_ID_FIELD_NAME = "srcId";
    public static final String VALUE_FIELD_NAME = "value";
    public static final String OPCODE_FIELD_NAME = "opCode";
    private static final ThreadLocal<BinaryDecoder> binDecoder = new ThreadLocal();
    private final VersionedSchemaSet _schemaSet;
    private final VersionedSchemaSet _metadataSchemaSet;

    public DbusEventAvroDecoder(VersionedSchemaSet schemaSet) {
        this(schemaSet, null);
    }

    public DbusEventAvroDecoder(VersionedSchemaSet schemaSet, VersionedSchemaSet metadataSchemaSet) {
        this._schemaSet = schemaSet;
        this._metadataSchemaSet = metadataSchemaSet;
    }

    public GenericRecord getGenericRecord(DbusEvent e, GenericRecord reuse) {
        byte[] md5 = new byte[16];
        e.schemaId(md5);
        SchemaId schemaId = new SchemaId(md5);
        VersionedSchema writerSchema = this._schemaSet.getById(schemaId);
        if (null == writerSchema) {
            LOG.error((Object)("Unable to find schema for id " + schemaId + "; event = " + e));
            throw new DatabusRuntimeException("No schema available to decode event " + e);
        }
        ByteBuffer valueBuffer = e.value();
        byte[] valueBytes = null;
        if (valueBuffer.hasArray()) {
            valueBytes = valueBuffer.array();
        } else {
            valueBytes = new byte[valueBuffer.remaining()];
            valueBuffer.get(valueBytes);
        }
        return this.getGenericRecord(valueBytes, writerSchema.getSchema(), reuse);
    }

    public GenericRecord getGenericRecord(DbusEvent e) {
        return this.getGenericRecord(e, null);
    }

    public GenericRecord getGenericRecord(byte[] valueBytes, Schema schema, GenericRecord reuse) {
        GenericRecord result = null;
        try {
            binDecoder.set(DecoderFactory.defaultFactory().createBinaryDecoder(valueBytes, binDecoder.get()));
            GenericDatumReader reader = new GenericDatumReader(schema);
            result = (GenericRecord)reader.read((Object)reuse, (Decoder)binDecoder.get());
            return result;
        }
        catch (Exception ex) {
            LOG.error((Object)("getGenericRecord Avro error: " + ex.getMessage()), (Throwable)ex);
            return result;
        }
    }

    public GenericRecord getMetadata(DbusEvent e, GenericRecord reuse) {
        DbusEventPart metadataPart = e.getPayloadMetadataPart();
        ByteBuffer dataBuffer = null;
        if (null == metadataPart || null == (dataBuffer = metadataPart.getData()) || dataBuffer.remaining() <= 0) {
            LOG.debug((Object)("No metadata for event " + e));
            return null;
        }
        VersionedSchema schema = this.getMetadataSchema(metadataPart);
        if (null == schema) {
            throw new DatabusRuntimeException("No schema available to decode metadata for event " + e);
        }
        byte[] dataBytes = null;
        if (dataBuffer.hasArray()) {
            dataBytes = dataBuffer.array();
        } else {
            dataBytes = new byte[dataBuffer.remaining()];
            try {
                dataBuffer.get(dataBytes);
            }
            catch (BufferUnderflowException ex) {
                LOG.error((Object)("metadata buffer error (remaining = " + dataBuffer.remaining() + ") for event " + e), (Throwable)ex);
                return null;
            }
        }
        return this.getGenericRecord(dataBytes, schema.getSchema(), reuse);
    }

    public <T extends SpecificRecord> T getTypedValue(DbusEvent e, T reuse, Class<T> targetClass) {
        if (null == reuse) {
            try {
                reuse = (SpecificRecord)targetClass.newInstance();
            }
            catch (InstantiationException e1) {
                LOG.error((Object)("getTypedValue class instantiation error (" + e1.getMessage() + ") for event " + e), (Throwable)e1);
                return null;
            }
            catch (IllegalAccessException e1) {
                LOG.error((Object)("getTypedValue access error (" + e1.getMessage() + ") for event " + e), (Throwable)e1);
                return null;
            }
        }
        byte[] md5 = new byte[16];
        e.schemaId(md5);
        SchemaId schemaId = new SchemaId(md5);
        VersionedSchema writerSchema = this._schemaSet.getById(schemaId);
        if (null == writerSchema) {
            LOG.error((Object)("Unable to find schema for id " + schemaId + "; event = " + e));
            throw new DatabusRuntimeException("No schema available to decode event " + e);
        }
        ByteBuffer valueBuffer = e.value();
        byte[] valueBytes = new byte[valueBuffer.remaining()];
        valueBuffer.get(valueBytes);
        try {
            binDecoder.set(DecoderFactory.defaultFactory().createBinaryDecoder(valueBytes, binDecoder.get()));
            SpecificDatumReader reader = new SpecificDatumReader(writerSchema.getSchema(), reuse.getSchema());
            return (T)((SpecificRecord)targetClass.cast(reader.read(reuse, (Decoder)binDecoder.get())));
        }
        catch (IOException e1) {
            LOG.error((Object)("getTypedValue IO error (" + e1.getMessage() + ") for event " + e), (Throwable)e1);
            return reuse;
        }
    }

    public void dumpEventValueInJSON(DbusEvent e, OutputStream out) {
        byte[] md5 = new byte[16];
        e.schemaId(md5);
        SchemaId schemaId = new SchemaId(md5);
        VersionedSchema sourceSchema = this._schemaSet.getById(schemaId);
        ByteBuffer valueBuffer = e.value();
        byte[] valueBytes = new byte[valueBuffer.remaining()];
        valueBuffer.get(valueBytes);
        try {
            Schema schema = sourceSchema.getSchema();
            GenericDatumReader reader = new GenericDatumReader(schema);
            binDecoder.set(DecoderFactory.defaultFactory().createBinaryDecoder(valueBytes, binDecoder.get()));
            Object datum = reader.read(null, (Decoder)binDecoder.get());
            GenericDatumWriter writer = new GenericDatumWriter(schema);
            JsonGenerator g = new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8);
            g.writeStartObject();
            g.writeFieldName(SRC_ID_FIELD_NAME);
            g.writeNumber(e.getSourceId());
            g.writeFieldName(OPCODE_FIELD_NAME);
            g.writeString(e.getOpcode().toString());
            g.writeFieldName("partId");
            g.writeNumber(Integer.valueOf(e.getPartitionId()).intValue());
            g.writeFieldName(VALUE_FIELD_NAME);
            writer.write(datum, (Encoder)new JsonEncoder(schema, g));
            g.writeEndObject();
            g.writeEndObject();
            try {
                g.writeEndObject();
            }
            catch (JsonGenerationException e_json) {
                // empty catch block
            }
            g.flush();
        }
        catch (IOException e1) {
            LOG.error((Object)("event value serialization error; event = " + e), (Throwable)e1);
        }
    }

    public void dumpEventValueInJSON(DbusEvent e, WritableByteChannel writeChannel) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try {
            this.dumpEventValueInJSON(e, baos);
            baos.write("\n".getBytes("UTF-8"));
            ByteBuffer writeBuffer = ByteBuffer.wrap(baos.toByteArray());
            writeChannel.write(writeBuffer);
        }
        catch (IOException e1) {
            LOG.error((Object)("event value serialization error; event = " + e), (Throwable)e1);
        }
    }

    public VersionedSchema getPayloadSchema(DbusEvent e) {
        byte[] md5 = new byte[16];
        e.schemaId(md5);
        SchemaId schemaId = new SchemaId(md5);
        VersionedSchema writerSchema = this._schemaSet.getById(schemaId);
        return writerSchema;
    }

    protected VersionedSchemaSet getSchemaSet() {
        return this._schemaSet;
    }

    public VersionedSchema getMetadataSchema(DbusEvent e) {
        DbusEventPart metadataPart = e.getPayloadMetadataPart();
        if (null == metadataPart) {
            LOG.debug((Object)("No metadata for event " + e));
            return null;
        }
        VersionedSchema schema = this.getMetadataSchema(metadataPart);
        if (null == schema) {
            throw new DatabusRuntimeException("No schema available to decode metadata for event " + e);
        }
        return schema;
    }

    public VersionedSchema getMetadataSchema(DbusEventPart metadataPart) {
        if (null == this._metadataSchemaSet) {
            return null;
        }
        SchemaId id = new SchemaId(metadataPart.getSchemaDigest());
        return this._metadataSchemaSet.getById(id);
    }

    public VersionedSchema getMetadataSchema(short version) {
        if (this._metadataSchemaSet != null) {
            return this._metadataSchemaSet.getSchemaByNameVersion("metadata-source", version);
        }
        return null;
    }

    VersionedSchema getLatestMetadataSchema() {
        if (this._metadataSchemaSet != null) {
            return this._metadataSchemaSet.getLatestVersionByName("metadata-source");
        }
        return null;
    }

    public void dumpMetadata(DbusEvent e, FileChannel writeChannel) {
        GenericRecord genericRecord = this.getMetadata(e, null);
        if (genericRecord == null) {
            return;
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try {
            String metadataInfo = genericRecord.toString() + "\n";
            baos.write(metadataInfo.getBytes("UTF-8"));
            ByteBuffer writeBuffer = ByteBuffer.wrap(baos.toByteArray());
            writeChannel.write(writeBuffer);
        }
        catch (UnsupportedEncodingException e1) {
            LOG.error((Object)("event metadata serialization error; event = " + e + "; metadata = " + genericRecord), (Throwable)e1);
        }
        catch (IOException e1) {
            LOG.error((Object)("event metadata serialization error; event = " + e + "; metadata = " + genericRecord), (Throwable)e1);
        }
    }
}

