/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.runtime.producer.avro;

import com.flipkart.aesop.runtime.producer.mapper.BinLogEventMapper;
import com.flipkart.aesop.runtime.producer.mapper.impl.DefaultBinLogEventMapper;
import com.flipkart.aesop.runtime.producer.mapper.impl.ORToAvroMapper;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.common.glossary.Column;
import com.google.code.or.common.glossary.Row;
import com.linkedin.databus.core.DatabusRuntimeException;
import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.DbusEventInfo;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.DbusOpcode;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus2.core.DatabusException;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.ds.DbChangeEntry;
import com.linkedin.databus2.producers.ds.KeyPair;
import com.linkedin.databus2.producers.ds.PrimaryKeySchema;
import com.linkedin.databus2.schemas.NoSuchSchemaException;
import com.linkedin.databus2.schemas.SchemaId;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class MysqlAvroEventManager<T extends GenericRecord> {
    private static final Logger LOGGER = LogFactory.getLogger(MysqlAvroEventManager.class);
    protected final int lSourceId;
    protected final int pSourceId;

    public MysqlAvroEventManager(int lSourceId, int pSourceId) throws DatabusException {
        this.lSourceId = lSourceId;
        this.pSourceId = pSourceId;
    }

    public int createAndAppendEvent(DbChangeEntry changeEntry, DbusEventBufferAppendable eventBuffer, boolean enableTracing, DbusEventsStatisticsCollector dbusEventsStatisticsCollector) throws EventCreationException, UnsupportedKeyException, DatabusException {
        LOGGER.debug("Request received for create and append event for " + changeEntry);
        Object keyObj = this.obtainKey(changeEntry);
        DbusEventKey eventKey = new DbusEventKey(keyObj);
        SchemaId schemaId = SchemaId.createWithMd5((Schema)changeEntry.getSchema());
        byte[] payload = this.serializeEvent(changeEntry.getRecord());
        DbusEventInfo eventInfo = new DbusEventInfo(changeEntry.getOpCode(), changeEntry.getScn(), (short)this.pSourceId, (short)this.pSourceId, changeEntry.getTimestampInNanos(), (short)this.lSourceId, schemaId.getByteArray(), payload, enableTracing, false);
        boolean success = eventBuffer.appendEvent(eventKey, eventInfo, dbusEventsStatisticsCollector);
        LOGGER.debug("Successfully created and appended event for " + changeEntry);
        return success ? payload.length : -1;
    }

    public List<DbChangeEntry> frameAvroRecord(BinlogEventV4Header eventHeader, List<Row> rowList, DbusOpcode dbusOpCode, Map<Integer, BinLogEventMapper<T>> binLogEventMappers, Schema schema, long scn) {
        ArrayList<DbChangeEntry> entryList = new ArrayList<DbChangeEntry>();
        LOGGER.debug("Received frame avro record request for " + eventHeader);
        try {
            long timestampInNanos = eventHeader.getTimestamp() * 1000000L;
            boolean isReplicated = false;
            for (Row row : rowList) {
                List columns = row.getColumns();
                BinLogEventMapper binLogEventMapper = binLogEventMappers.get(this.lSourceId) == null ? new DefaultBinLogEventMapper(new ORToAvroMapper()) : binLogEventMappers.get(this.lSourceId);
                Object genericRecord = binLogEventMapper.mapBinLogEvent(eventHeader, row, dbusOpCode, schema);
                List<KeyPair> keyPairList = this.generateKeyPair(columns, schema);
                DbChangeEntry dbChangeEntry = new DbChangeEntry(scn, timestampInNanos, genericRecord, dbusOpCode, false, schema, keyPairList);
                entryList.add(dbChangeEntry);
                LOGGER.debug("Successfully Processed the Row " + dbChangeEntry);
            }
        }
        catch (NoSuchSchemaException ne) {
            LOGGER.error("No Such element exception : " + ne.getMessage() + " Cause: " + ne.getCause());
            throw new DatabusRuntimeException((Throwable)ne);
        }
        catch (DatabusException de) {
            LOGGER.error("Databus exception : " + de.getMessage() + " Cause: " + de.getCause());
            throw new DatabusRuntimeException((Throwable)de);
        }
        return entryList;
    }

    protected byte[] serializeEvent(GenericRecord record) throws EventCreationException {
        byte[] serializedValue;
        ByteArrayOutputStream bos = null;
        try {
            bos = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((OutputStream)bos, null);
            GenericDatumWriter writer = new GenericDatumWriter(record.getSchema());
            writer.write((Object)record, (Encoder)encoder);
            serializedValue = bos.toByteArray();
        }
        catch (IOException ex) {
            LOGGER.error("Failed to serialize avro record : " + record + " Exception : " + ex.getMessage() + "  Cause: " + ex.getCause());
            throw new EventCreationException("Failed to serialize the Avro GenericRecord", (Throwable)ex);
        }
        catch (RuntimeException ex) {
            LOGGER.error("Failed to serialize avro record : " + record + " Exception : " + ex.getMessage() + "  Cause: " + ex.getCause());
            throw new EventCreationException("Failed to serialize the Avro GenericRecord", (Throwable)ex);
        }
        finally {
            if (bos != null) {
                try {
                    bos.close();
                }
                catch (IOException e) {
                    LOGGER.error("Exception occurred while closing output stream");
                }
            }
        }
        return serializedValue;
    }

    private List<KeyPair> generateKeyPair(List<Column> columns, Schema schema) throws DatabusException {
        Object value = null;
        Schema.Type schemaType = null;
        String pkFieldName = SchemaHelper.getMetaField((Schema)schema, (String)"pk");
        LOGGER.debug("Generate Key Pair is called for columns " + columns);
        if (pkFieldName == null) {
            LOGGER.error("Primary key not defined for schema " + schema);
            throw new DatabusException("No primary key specified in the schema");
        }
        PrimaryKeySchema pkSchema = new PrimaryKeySchema(pkFieldName);
        List fields = schema.getFields();
        ArrayList<KeyPair> keyPairList = new ArrayList<KeyPair>();
        int index = 0;
        for (Schema.Field field : fields) {
            if (pkSchema.isPartOfPrimaryKey(field)) {
                value = columns.get(index).getValue();
                schemaType = field.schema().getType();
                KeyPair keyPair = new KeyPair(value, schemaType);
                keyPairList.add(keyPair);
            }
            ++index;
        }
        LOGGER.debug("Generated keypairs " + keyPairList + "for columns " + columns);
        return keyPairList;
    }

    /*
     * Enabled aggressive block sorting
     */
    private Object obtainKey(DbChangeEntry dbChangeEntry) throws DatabusException {
        if (null == dbChangeEntry) {
            LOGGER.error("Received null dbChangeEntry");
            throw new DatabusException("DBUpdateImage is null");
        }
        List keyPairList = dbChangeEntry.getPkeys();
        if (null == keyPairList || keyPairList.size() == 0) {
            LOGGER.error("Received null dbChangeEntry key pairs");
            throw new DatabusException("There do not seem to be any keys");
        }
        LOGGER.debug("Obtain Key is called for pairs " + keyPairList);
        if (keyPairList.size() == 1) {
            Object key = ((KeyPair)keyPairList.get(0)).getKey();
            Schema.Type pKeyType = ((KeyPair)keyPairList.get(0)).getKeyType();
            Object keyObj = null;
            if (pKeyType == Schema.Type.INT) {
                if (key instanceof Integer) {
                    return key;
                }
                String message = "Schema.Type does not match actual key type (INT) " + key.getClass().getName();
                LOGGER.error(message);
                throw new DatabusException(message);
            }
            if (pKeyType != Schema.Type.LONG) {
                return key;
            }
            if (key instanceof Long) {
                keyObj = key;
                return key;
            }
            String message = "Schema.Type does not match actual key type (LONG) " + key.getClass().getName();
            LOGGER.error(message);
            throw new DatabusException(message);
        }
        Iterator li = keyPairList.iterator();
        StringBuilder compositeKey = new StringBuilder();
        while (li.hasNext()) {
            KeyPair kp = (KeyPair)li.next();
            Schema.Type pKeyType = kp.getKeyType();
            Object key = kp.getKey();
            if (pKeyType == Schema.Type.INT) {
                if (!(key instanceof Integer)) {
                    String message = "Schema.Type does not match actual key type (INT) " + key.getClass().getName();
                    LOGGER.error(message);
                    throw new DatabusException(message);
                }
                compositeKey.append(kp.getKey().toString());
            } else if (pKeyType == Schema.Type.LONG) {
                if (!(key instanceof Long)) {
                    String message = "Schema.Type does not match actual key type (LONG) " + key.getClass().getName();
                    LOGGER.error(message);
                    throw new DatabusException(message);
                }
                compositeKey.append(key.toString());
            } else {
                compositeKey.append(key);
            }
            if (!li.hasNext()) continue;
            compositeKey.append("\t");
        }
        return compositeKey.toString();
    }
}

