/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus2.producers.db;

import com.linkedin.databus.core.DbusEventBufferAppendable;
import com.linkedin.databus.core.DbusEventKey;
import com.linkedin.databus.core.UnsupportedKeyException;
import com.linkedin.databus.core.monitoring.mbean.DbusEventsStatisticsCollector;
import com.linkedin.databus2.producers.EventCreationException;
import com.linkedin.databus2.producers.PartitionFunction;
import com.linkedin.databus2.producers.db.EventFactory;
import com.linkedin.databus2.relay.OracleJarUtils;
import com.linkedin.databus2.relay.config.ReplicationBitSetterStaticConfig;
import com.linkedin.databus2.schemas.utils.SchemaHelper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Reader;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLXML;
import java.sql.Struct;
import java.sql.Timestamp;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.log4j.Logger;

public class OracleAvroGenericEventFactory
implements EventFactory {
    protected final Schema _eventSchema;
    protected final byte[] _schemaId;
    protected final short _sourceId;
    protected final short _pSourceId;
    protected final PartitionFunction _partitionFunction;
    private final Logger _log = Logger.getLogger(this.getClass());
    protected String keyColumnName = "key";
    private final ReplicationBitSetterStaticConfig _replSetterConfig;
    private final Pattern _replBitSetterPattern;
    public static final String MODULE = OracleAvroGenericEventFactory.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);

    public OracleAvroGenericEventFactory(short sourceId, short pSourceId, String eventSchema, PartitionFunction partitionFunction, ReplicationBitSetterStaticConfig replSetterConfig) throws EventCreationException, UnsupportedKeyException {
        Schema.Field keyField;
        this._sourceId = sourceId;
        this._pSourceId = pSourceId;
        this._eventSchema = Schema.parse((String)eventSchema);
        this._schemaId = SchemaHelper.getSchemaId((String)eventSchema);
        this._partitionFunction = partitionFunction;
        this._replSetterConfig = replSetterConfig;
        this._replBitSetterPattern = null != this._replSetterConfig && ReplicationBitSetterStaticConfig.SourceType.COLUMN.equals((Object)this._replSetterConfig.getSourceType()) ? Pattern.compile(replSetterConfig.getRemoteUpdateValueRegex()) : null;
        String keyNameOverride = SchemaHelper.getMetaField((Schema)this._eventSchema, (String)"pk");
        if (null != keyNameOverride) {
            this.keyColumnName = keyNameOverride;
            this._log.info((Object)(this._eventSchema.getFullName() + ": using primary key override:" + this.keyColumnName));
        }
        if ((keyField = this._eventSchema.getField(this.keyColumnName)) == null && (keyField = this._eventSchema.getField("id")) != null) {
            this.keyColumnName = "id";
        }
        if (keyField == null) {
            throw new EventCreationException("The event schema is missing the required field \"key\".");
        }
    }

    protected byte[] serializeEvent(GenericRecord record, long scn, long timestamp, ResultSet row, DbusEventBufferAppendable eventBuffer, boolean enableTracing, DbusEventsStatisticsCollector dbusEventsStatisticsCollector) throws EventCreationException, UnsupportedKeyException {
        byte[] serializedValue;
        try {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            BinaryEncoder encoder = new BinaryEncoder((OutputStream)bos);
            GenericDatumWriter writer = new GenericDatumWriter(record.getSchema());
            writer.write((Object)record, (Encoder)encoder);
            serializedValue = bos.toByteArray();
        }
        catch (IOException ex) {
            throw new EventCreationException("Failed to serialize the Avro GenericRecord. ResultSet was: (" + row + ")", (Throwable)ex);
        }
        catch (RuntimeException ex) {
            throw new EventCreationException("Failed to serialize the Avro GenericRecord. ResultSet was: (" + row + ")", (Throwable)ex);
        }
        return serializedValue;
    }

    public long createAndAppendEvent(long scn, long timestamp, ResultSet row, DbusEventBufferAppendable eventBuffer, boolean enableTracing, DbusEventsStatisticsCollector dbusEventsStatisticsCollector) throws SQLException, EventCreationException, UnsupportedKeyException {
        GenericRecord record = this.buildGenericRecord(row);
        boolean isReplicated = this.isReplicatedEvent(row);
        return this.createAndAppendEvent(scn, timestamp, record, row, eventBuffer, enableTracing, isReplicated, dbusEventsStatisticsCollector);
    }

    private boolean isReplicatedEvent(ResultSet row) throws SQLException {
        boolean replicated = false;
        try {
            String value;
            if (null != row && this._replSetterConfig != null && ReplicationBitSetterStaticConfig.SourceType.COLUMN.equals((Object)this._replSetterConfig.getSourceType()) && null != (value = row.getString(this._replSetterConfig.getFieldName())) && null != this._replBitSetterPattern) {
                replicated = this._replBitSetterPattern.matcher(value).matches();
            }
        }
        catch (SQLException sqlEx) {
            LOG.error((Object)("Unable to identify if this row was externally replicated. Config Used :" + this._replSetterConfig), (Throwable)sqlEx);
            throw sqlEx;
        }
        return replicated;
    }

    public long createAndAppendEvent(long scn, long timestamp, GenericRecord record, DbusEventBufferAppendable eventBuffer, boolean enableTracing, DbusEventsStatisticsCollector dbusEventsStatisticsCollector) throws EventCreationException, UnsupportedKeyException {
        return this.createAndAppendEvent(scn, timestamp, record, null, eventBuffer, enableTracing, false, dbusEventsStatisticsCollector);
    }

    public long createAndAppendEvent(long scn, long timestamp, GenericRecord record, ResultSet row, DbusEventBufferAppendable eventBuffer, boolean enableTracing, boolean isReplicated, DbusEventsStatisticsCollector dbusEventsStatisticsCollector) throws EventCreationException, UnsupportedKeyException {
        byte[] serializedValue = this.serializeEvent(record, scn, timestamp, row, eventBuffer, enableTracing, dbusEventsStatisticsCollector);
        DbusEventKey eventKey = new DbusEventKey(record.get(this.keyColumnName));
        short lPartitionId = this._partitionFunction.getPartition(eventKey);
        eventBuffer.appendEvent(eventKey, this._pSourceId, lPartitionId, timestamp * 1000000L, this._sourceId, this._schemaId, serializedValue, enableTracing, isReplicated, dbusEventsStatisticsCollector);
        return serializedValue.length;
    }

    protected GenericRecord buildGenericRecord(ResultSet rs) throws SQLException, EventCreationException {
        boolean traceEnabled = this._log.isTraceEnabled();
        if (traceEnabled) {
            this._log.trace((Object)"--- New Record ---");
        }
        GenericData.Record record = new GenericData.Record(this._eventSchema);
        List fields = this._eventSchema.getFields();
        for (Schema.Field field : fields) {
            String schemaFieldName = field.name();
            Schema fieldSchema = SchemaHelper.unwindUnionSchema((Schema.Field)field);
            Schema.Type avroFieldType = fieldSchema.getType();
            if (avroFieldType == Schema.Type.ARRAY) {
                this.putArray((GenericRecord)record, schemaFieldName, fieldSchema, OracleAvroGenericEventFactory.getJdbcArray(rs, fieldSchema));
                continue;
            }
            String databaseFieldName = SchemaHelper.getMetaField((Schema.Field)field, (String)"dbFieldName");
            try {
                Object databaseFieldValue = rs.getObject(databaseFieldName);
                this.put((GenericRecord)record, field, databaseFieldValue);
            }
            catch (SQLException ex) {
                this._log.error((Object)("Failed to read column (" + databaseFieldName + ") for source (" + this._sourceId + ")"));
                throw ex;
            }
        }
        return record;
    }

    private static Array getJdbcArray(ResultSet rs, Schema schema) throws EventCreationException {
        Array array;
        Schema elementSchema = schema.getElementType();
        String dbFieldName = SchemaHelper.getMetaField((Schema)elementSchema, (String)"dbFieldName");
        if (dbFieldName == null) {
            throw new EventCreationException("array field is missing required metadata dbFieldName. " + schema.getName());
        }
        try {
            array = rs.getArray(dbFieldName);
        }
        catch (SQLException e) {
            throw new EventCreationException("unable to read array field: " + dbFieldName + ": " + e.getMessage(), (Throwable)e);
        }
        return array;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putArray(GenericRecord record, String arrayFieldName, Schema schema, Array array) throws EventCreationException {
        if (schema.getType() != Schema.Type.ARRAY) {
            throw new EventCreationException("Not an array type. " + schema.getName());
        }
        Schema elementSchema = schema.getElementType();
        GenericData.Array avroArray = new GenericData.Array(0, schema);
        try {
            ResultSet arrayResultSet = array.getResultSet();
            try {
                while (arrayResultSet.next()) {
                    GenericData.Record elemRecord = new GenericData.Record(elementSchema);
                    avroArray.add((Object)elemRecord);
                    Struct struct = (Struct)arrayResultSet.getObject(2);
                    this.putOracleRecord((GenericRecord)elemRecord, elementSchema, struct);
                }
            }
            finally {
                arrayResultSet.close();
            }
        }
        catch (SQLException e) {
            throw new EventCreationException("putArray error: " + e.getMessage(), (Throwable)e);
        }
        record.put(arrayFieldName, (Object)avroArray);
    }

    private void addOracleRecordToParent(GenericRecord parentRecord, String fieldName, Schema fieldSchema, Struct dbFieldValue) throws EventCreationException {
        GenericData.Record fieldRecord = new GenericData.Record(fieldSchema);
        this.putOracleRecord((GenericRecord)fieldRecord, fieldSchema, dbFieldValue);
        parentRecord.put(fieldName, (Object)fieldRecord);
    }

    private void processRecordField(GenericRecord fieldRecord, Schema.Field field, Object[] structAttribs) throws EventCreationException {
        Object structAttribValue;
        String recordFieldName = field.name();
        String dbFieldPositionStr = SchemaHelper.getMetaField((Schema.Field)field, (String)"dbFieldPosition");
        int dbFieldPosition = 0;
        if (null != dbFieldPositionStr && !dbFieldPositionStr.isEmpty()) {
            dbFieldPosition = Integer.valueOf(dbFieldPositionStr);
        }
        if ((structAttribValue = structAttribs[dbFieldPosition]) == null) {
            boolean isNullAllowedInSchema = SchemaHelper.isNullable((Schema)field.schema());
            if (!isNullAllowedInSchema) {
                throw new EventCreationException("Null value not allowed for field " + recordFieldName + ":" + field.schema());
            }
        } else {
            Schema recordSchema = SchemaHelper.unwindUnionSchema((Schema.Field)field);
            Schema.Type recordFieldType = recordSchema.getType();
            switch (recordFieldType) {
                case BOOLEAN: 
                case BYTES: 
                case DOUBLE: 
                case FLOAT: 
                case INT: 
                case LONG: 
                case STRING: 
                case NULL: {
                    this.putSimpleValue(fieldRecord, recordFieldName, recordFieldType, structAttribValue);
                    break;
                }
                case RECORD: {
                    this.addOracleRecordToParent(fieldRecord, recordFieldName, recordSchema, (Struct)structAttribValue);
                    break;
                }
                case ARRAY: {
                    this.putArray(fieldRecord, recordFieldName, recordSchema, (Array)structAttribValue);
                    break;
                }
                default: {
                    throw new EventCreationException("unknown struct field type: " + recordFieldName + ":" + recordFieldType);
                }
            }
        }
    }

    private void putOracleRecord(GenericRecord fieldRecord, Schema fieldSchema, Struct dbFieldValue) throws EventCreationException {
        assert (fieldSchema.getType() == Schema.Type.RECORD);
        assert (null != dbFieldValue);
        try {
            List fields = fieldSchema.getFields();
            Object[] structAttribs = dbFieldValue.getAttributes();
            if (fields.size() != structAttribs.length) {
                throw new EventCreationException("Avro field number mismatch: avro schema field# =" + fields.size() + " ; struct " + dbFieldValue.getSQLTypeName() + " field# = " + structAttribs.length);
            }
            for (Schema.Field field : fields) {
                this.processRecordField(fieldRecord, field, structAttribs);
            }
        }
        catch (SQLException e) {
            throw new EventCreationException("creation of field " + fieldSchema.getFullName(), (Throwable)e);
        }
    }

    private void putSimpleValue(GenericRecord record, String schemaFieldName, Schema.Type avroFieldType, Object databaseFieldValue) throws EventCreationException {
        assert (null != databaseFieldValue);
        switch (avroFieldType) {
            case BOOLEAN: {
                record.put(schemaFieldName, (Object)((Boolean)databaseFieldValue));
                break;
            }
            case BYTES: {
                if (databaseFieldValue instanceof byte[]) {
                    record.put(schemaFieldName, (Object)ByteBuffer.wrap((byte[])databaseFieldValue));
                    break;
                }
                record.put(schemaFieldName, (Object)OracleAvroGenericEventFactory.extractBlobBytes((Blob)databaseFieldValue, schemaFieldName));
                break;
            }
            case DOUBLE: {
                record.put(schemaFieldName, (Object)((Number)databaseFieldValue).doubleValue());
                break;
            }
            case FLOAT: {
                record.put(schemaFieldName, (Object)Float.valueOf(((Number)databaseFieldValue).floatValue()));
                break;
            }
            case INT: {
                record.put(schemaFieldName, (Object)((Number)databaseFieldValue).intValue());
                break;
            }
            case LONG: {
                Class timestampClass = null;
                Class dateClass = null;
                Method timestampValueMethod = null;
                try {
                    timestampClass = OracleJarUtils.loadClass("oracle.sql.TIMESTAMP");
                    dateClass = OracleJarUtils.loadClass("oracle.sql.DATE");
                    timestampValueMethod = timestampClass.getMethod("timestampValue", new Class[0]);
                }
                catch (Exception e) {
                    String errMsg = "Cannot convert " + databaseFieldValue.getClass() + " to long for field " + schemaFieldName + " Unable to get oracle datatypes " + e.getMessage();
                    throw new EventCreationException(errMsg);
                }
                if (databaseFieldValue instanceof Timestamp) {
                    long time = ((Timestamp)databaseFieldValue).getTime();
                    record.put(schemaFieldName, (Object)time);
                    break;
                }
                if (databaseFieldValue instanceof Date) {
                    long time = ((Date)databaseFieldValue).getTime();
                    record.put(schemaFieldName, (Object)time);
                    break;
                }
                if (timestampClass.isInstance(databaseFieldValue)) {
                    try {
                        Object tsc = timestampClass.cast(databaseFieldValue);
                        Timestamp tsValue = (Timestamp)timestampValueMethod.invoke(tsc, new Object[0]);
                        long time = tsValue.getTime();
                        record.put(schemaFieldName, (Object)time);
                        break;
                    }
                    catch (Exception ex) {
                        throw new EventCreationException("SQLException reading oracle.sql.TIMESTAMP value for field " + schemaFieldName, (Throwable)ex);
                    }
                }
                if (dateClass.isInstance(databaseFieldValue)) {
                    try {
                        Object dsc = dateClass.cast(databaseFieldValue);
                        Timestamp tsValue = (Timestamp)timestampValueMethod.invoke(dsc, new Object[0]);
                        long time = tsValue.getTime();
                        record.put(schemaFieldName, (Object)time);
                        break;
                    }
                    catch (Exception ex) {
                        throw new EventCreationException("SQLException reading oracle.sql.TIMESTAMP value for field " + schemaFieldName, (Throwable)ex);
                    }
                }
                if (databaseFieldValue instanceof Number) {
                    long lvalue = ((Number)databaseFieldValue).longValue();
                    record.put(schemaFieldName, (Object)lvalue);
                    break;
                }
                throw new EventCreationException("Cannot convert " + databaseFieldValue.getClass() + " to long for field " + schemaFieldName);
            }
            case STRING: {
                if (databaseFieldValue instanceof Clob) {
                    String text = OracleAvroGenericEventFactory.extractClobText((Clob)databaseFieldValue, schemaFieldName);
                    record.put(schemaFieldName, (Object)text);
                    break;
                }
                if (databaseFieldValue instanceof SQLXML) {
                    SQLXML xmlInst = (SQLXML)databaseFieldValue;
                    try {
                        record.put(schemaFieldName, (Object)xmlInst.getString());
                        break;
                    }
                    catch (SQLException e) {
                        throw new EventCreationException("Cannot convert " + databaseFieldValue.getClass() + " to string field " + schemaFieldName + " cause: " + e);
                    }
                }
                String text = databaseFieldValue.toString();
                record.put(schemaFieldName, (Object)text);
                break;
            }
            case NULL: {
                record.put(schemaFieldName, null);
                break;
            }
            default: {
                throw new EventCreationException("unknown simple type " + avroFieldType.toString() + " for field " + schemaFieldName);
            }
        }
    }

    private void put(GenericRecord record, Schema.Field field, Object databaseFieldValue) throws EventCreationException {
        String schemaFieldName = field.name();
        Schema fieldSchema = SchemaHelper.unwindUnionSchema((Schema.Field)field);
        Schema.Type avroFieldType = fieldSchema.getType();
        if (databaseFieldValue == null) {
            boolean isNullAllowedInSchema = SchemaHelper.isNullable((Schema.Field)field);
            if (!isNullAllowedInSchema) {
                throw new EventCreationException("Null value not allowed for field " + schemaFieldName);
            }
        } else {
            if (this._log.isTraceEnabled()) {
                this._log.trace((Object)("record.put(\"" + schemaFieldName + "\", (" + avroFieldType + ") \"" + databaseFieldValue + "\""));
            }
            try {
                switch (avroFieldType) {
                    case BOOLEAN: 
                    case BYTES: 
                    case DOUBLE: 
                    case FLOAT: 
                    case INT: 
                    case LONG: 
                    case STRING: 
                    case NULL: {
                        this.putSimpleValue(record, schemaFieldName, avroFieldType, databaseFieldValue);
                        break;
                    }
                    case RECORD: {
                        this.addOracleRecordToParent(record, schemaFieldName, fieldSchema, (Struct)databaseFieldValue);
                        break;
                    }
                    case ARRAY: {
                        this.putArray(record, schemaFieldName, fieldSchema, (Array)databaseFieldValue);
                        break;
                    }
                    default: {
                        throw new EventCreationException("Don't know how to populate this type of field: " + avroFieldType);
                    }
                }
            }
            catch (ClassCastException ex) {
                throw new EventCreationException("Type conversion error for field name (" + field.name() + ") in source " + this._sourceId + ". Value was: " + databaseFieldValue + " avro field was: " + avroFieldType, (Throwable)ex);
            }
        }
    }

    public static ByteBuffer extractBlobBytes(Blob blob, String fieldName) throws EventCreationException {
        if (blob == null) {
            return null;
        }
        try {
            byte[] bytes = blob.getBytes(1L, (int)blob.length());
            return ByteBuffer.wrap(bytes);
        }
        catch (SQLException ex) {
            throw new EventCreationException("SQLException reading BLOB value for field " + fieldName, (Throwable)ex);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public static String extractClobText(Clob clob, String fieldName) throws EventCreationException {
        if (clob == null) {
            return null;
        }
        try {
            long length = clob.length();
            if (length <= Integer.MAX_VALUE) {
                return clob.getSubString(1L, (int)length);
            }
            Reader reader = null;
            try {
                int n;
                reader = clob.getCharacterStream();
                StringWriter writer = new StringWriter();
                char[] buffer = new char[1024];
                while ((n = reader.read(buffer)) != -1) {
                    writer.write(buffer, 0, n);
                }
                String string = writer.toString();
                return string;
            }
            catch (IOException ex) {
                throw new SQLException("IOException reading from CLOB column.", ex);
            }
            finally {
                if (reader != null) {
                    try {
                        reader.close();
                    }
                    catch (IOException ex) {}
                }
            }
        }
        catch (SQLException ex) {
            throw new EventCreationException("SQLException reading CLOB value for field " + fieldName, (Throwable)ex);
        }
    }
}

