package com.linkedin.venice.serialization.avro;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceMessageException;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.SparseConcurrentListWithOffset;
import com.linkedin.venice.utils.Utils;
import it.unimi.dsi.fastutil.ints.IntLinkedOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.class */
public class InternalAvroSpecificSerializer<SPECIFIC_RECORD extends SpecificRecord> implements VeniceKafkaSerializer<SPECIFIC_RECORD> {

    @Deprecated
    public static final String VENICE_SCHEMA_READER_CONFIG = "venice.schema-reader";
    public static final int MAX_ATTEMPTS_FOR_SCHEMA_READER = 60;
    public static final int WAIT_TIME_BETWEEN_SCHEMA_READER_ATTEMPTS_IN_MS = 1000;
    public static final int SENTINEL_PROTOCOL_VERSION_USED_FOR_UNDETECTABLE_COMPILED_SCHEMA = -1;
    public static final int SENTINEL_PROTOCOL_VERSION_USED_FOR_UNVERSIONED_PROTOCOL = 0;
    private static final int MAGIC_BYTE_OFFSET = 0;
    private final int MAGIC_BYTE_LENGTH;
    private final byte magicByte;
    private final int PROTOCOL_VERSION_OFFSET;
    private final int PROTOCOL_VERSION_LENGTH;
    private final byte currentProtocolVersion;
    private final int PAYLOAD_OFFSET;
    private final SpecificDatumWriter writer;
    private final List<VeniceSpecificDatumReader<SPECIFIC_RECORD>> protocolVersionToReader;
    private final Schema compiledProtocol;
    private SchemaReader schemaReader;
    private final BiConsumer<Integer, Schema> newSchemaEncountered;
    private static final ThreadLocal<ReusableObjects> threadLocalReusableObjects = ThreadLocal.withInitial(() -> {
        return new ReusableObjects();
    });
    private static final Logger LOGGER = LogManager.getLogger(InternalAvroSpecificSerializer.class);
    private static final DecoderFactory DECODER_FACTORY = new DecoderFactory();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer$ReusableObjects.class */
    public static class ReusableObjects {
        final BinaryDecoder binaryDecoder;
        final BinaryEncoder binaryEncoder;
        final ByteArrayOutputStream byteArrayOutputStream;

        private ReusableObjects() {
            this.binaryDecoder = AvroCompatibilityHelper.newBinaryDecoder(new byte[16]);
            this.binaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, (BinaryEncoder) null);
            this.byteArrayOutputStream = new ByteArrayOutputStream();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public InternalAvroSpecificSerializer(AvroProtocolDefinition avroProtocolDefinition) {
        this(avroProtocolDefinition, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public InternalAvroSpecificSerializer(AvroProtocolDefinition avroProtocolDefinition, Integer num) {
        this(avroProtocolDefinition, num, (num2, schema) -> {
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public InternalAvroSpecificSerializer(AvroProtocolDefinition avroProtocolDefinition, Integer num, BiConsumer<Integer, Schema> biConsumer) {
        this.schemaReader = null;
        if (avroProtocolDefinition.getMagicByte().isPresent()) {
            this.magicByte = avroProtocolDefinition.getMagicByte().get().byteValue();
            this.MAGIC_BYTE_LENGTH = 1;
        } else {
            this.magicByte = (byte) 0;
            this.MAGIC_BYTE_LENGTH = 0;
        }
        this.PROTOCOL_VERSION_OFFSET = 0 + this.MAGIC_BYTE_LENGTH;
        if (avroProtocolDefinition.protocolVersionStoredInHeader) {
            this.PROTOCOL_VERSION_LENGTH = 1;
        } else {
            this.PROTOCOL_VERSION_LENGTH = 0;
        }
        if (avroProtocolDefinition.currentProtocolVersion.isPresent()) {
            int intValue = avroProtocolDefinition.currentProtocolVersion.get().intValue();
            if (intValue == -1 || intValue == 0 || intValue > 127) {
                throw new IllegalArgumentException("Improperly defined protocol! Invalid currentProtocolVersion: " + intValue);
            }
            this.currentProtocolVersion = (byte) intValue;
        } else {
            this.currentProtocolVersion = (byte) 0;
        }
        if (num == null) {
            this.PAYLOAD_OFFSET = this.PROTOCOL_VERSION_OFFSET + this.PROTOCOL_VERSION_LENGTH;
        } else {
            if (avroProtocolDefinition.magicByte.isPresent() || avroProtocolDefinition.protocolVersionStoredInHeader) {
                throw new VeniceMessageException("The payload offset override is not intended to be used for protocols which have explicitly defined magic bytes or which have protocol versions stored in their header.");
            }
            this.PAYLOAD_OFFSET = num.intValue();
        }
        this.compiledProtocol = avroProtocolDefinition.getCurrentProtocolVersionSchema();
        Map<Integer, Schema> allSchemasFromResources = Utils.getAllSchemasFromResources(avroProtocolDefinition);
        this.protocolVersionToReader = new SparseConcurrentListWithOffset(Math.abs(allSchemasFromResources.keySet().stream().min((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElseThrow(() -> {
            return new VeniceException("There must be at least one schema for: " + avroProtocolDefinition);
        }).intValue()));
        allSchemasFromResources.forEach((num2, schema) -> {
            cacheDatumReader(num2.intValue(), schema);
        });
        this.writer = new SpecificDatumWriter(avroProtocolDefinition.schema);
        this.newSchemaEncountered = biConsumer;
    }

    @Override // com.linkedin.venice.serialization.VeniceKafkaSerializer
    public void close() {
    }

    public IntSet knownProtocols() {
        IntLinkedOpenHashSet intLinkedOpenHashSet = new IntLinkedOpenHashSet(this.protocolVersionToReader.size());
        for (int i = 0; i < this.protocolVersionToReader.size(); i++) {
            if (this.protocolVersionToReader.get(i) != null) {
                intLinkedOpenHashSet.add(i);
            }
        }
        return intLinkedOpenHashSet;
    }

    public Schema getCompiledProtocol() {
        return this.compiledProtocol;
    }

    @Override // com.linkedin.venice.serialization.VeniceKafkaSerializer
    public void configure(Map<String, ?> map, boolean z) {
        if (z) {
            throw new VeniceException("Cannot use " + getClass().getSimpleName() + " for key data.");
        }
        if (!map.containsKey(VENICE_SCHEMA_READER_CONFIG)) {
            LOGGER.info("Serializer doesn't have schemaReader");
        } else {
            this.schemaReader = (SchemaReader) map.get(VENICE_SCHEMA_READER_CONFIG);
            LOGGER.info("Serializer has schemaReader: " + this.schemaReader);
        }
    }

    public void setSchemaReader(SchemaReader schemaReader) {
        this.schemaReader = schemaReader;
    }

    @Override // com.linkedin.venice.serialization.VeniceKafkaSerializer
    public byte[] serialize(String str, SPECIFIC_RECORD specific_record) {
        try {
            ReusableObjects reusableObjects = threadLocalReusableObjects.get();
            ByteArrayOutputStream byteArrayOutputStream = reusableObjects.byteArrayOutputStream;
            byteArrayOutputStream.reset();
            BinaryEncoder newBinaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(byteArrayOutputStream, true, reusableObjects.binaryEncoder);
            if (this.MAGIC_BYTE_LENGTH == 1) {
                byteArrayOutputStream.write(this.magicByte);
            }
            if (this.PROTOCOL_VERSION_LENGTH == 1) {
                byteArrayOutputStream.write(this.currentProtocolVersion);
            }
            if (byteArrayOutputStream.size() < this.PAYLOAD_OFFSET) {
                int size = this.PAYLOAD_OFFSET - byteArrayOutputStream.size();
                for (int i = 0; i < size; i++) {
                    byteArrayOutputStream.write(0);
                }
            }
            this.writer.write(specific_record, newBinaryEncoder);
            newBinaryEncoder.flush();
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new VeniceMessageException(getClass().getSimpleName() + " failed to encode message: " + specific_record.toString(), e);
        }
    }

    public ByteBuffer serialize(SPECIFIC_RECORD specific_record) {
        return ByteBuffer.wrap(serialize((String) null, (String) specific_record));
    }

    @Override // com.linkedin.venice.serialization.VeniceKafkaSerializer
    public SPECIFIC_RECORD deserialize(String str, byte[] bArr) {
        return deserialize(bArr, (byte[]) null);
    }

    public SPECIFIC_RECORD deserialize(byte[] bArr, SPECIFIC_RECORD specific_record) {
        return deserialize(bArr, (int) getProtocolVersion(bArr), (byte) specific_record);
    }

    public SPECIFIC_RECORD deserialize(byte[] bArr, int i) {
        return deserialize(bArr, i, (int) null);
    }

    public SPECIFIC_RECORD deserialize(byte[] bArr, int i, SPECIFIC_RECORD specific_record) {
        if (bArr == null || bArr.length < this.PAYLOAD_OFFSET) {
            throw new IllegalArgumentException("Invalid byte array for serialization - no bytes to read");
        }
        VeniceSpecificDatumReader<SPECIFIC_RECORD> veniceSpecificDatumReader = this.protocolVersionToReader.get(i);
        if (veniceSpecificDatumReader == null) {
            if (this.schemaReader != null) {
                int i2 = 1;
                while (true) {
                    if (i2 > 60) {
                        break;
                    }
                    try {
                        Schema valueSchema = this.schemaReader.getValueSchema(i);
                        if (valueSchema == null) {
                            throw new VeniceMessageException("Received Protocol Version '" + i + "' which is not currently known by " + getClass().getSimpleName() + ". A remote fetch was attempted, but the " + SchemaReader.class.getSimpleName() + " returned null. The currently known Protocol Versions are: " + getCurrentlyLoadedProtocolVersions() + ".");
                        }
                        veniceSpecificDatumReader = cacheDatumReader(i, valueSchema);
                        LOGGER.info("Discovered new protocol version '" + i + "', and successfully retrieved it. Schema:\n" + valueSchema.toString(true));
                    } catch (Exception e) {
                        if (i2 == 60) {
                            throw new VeniceException("Failed to retrieve new protocol schema version (" + i + ") after 60 attempts.", e);
                        }
                        LOGGER.error("Caught an exception while trying to fetch a new protocol schema version (" + i + "). Attempt #" + i2 + "/60. Will sleep 1000 ms and try again.", e);
                        Utils.sleep(1000L);
                        i2++;
                    }
                }
            } else {
                throw new VeniceMessageException("Received Protocol Version '" + i + "' which is not supported by " + getClass().getSimpleName() + ". Protocol forward compatibility is not enabled. The only supported Protocol Versions are: " + getCurrentlyLoadedProtocolVersions() + ".");
            }
        }
        return deserialize(bArr, (VeniceSpecificDatumReader<VeniceSpecificDatumReader<SPECIFIC_RECORD>>) veniceSpecificDatumReader, (VeniceSpecificDatumReader<SPECIFIC_RECORD>) specific_record);
    }

    public SPECIFIC_RECORD deserialize(byte[] bArr, Schema schema, SPECIFIC_RECORD specific_record) {
        byte protocolVersion = getProtocolVersion(bArr);
        VeniceSpecificDatumReader<SPECIFIC_RECORD> veniceSpecificDatumReader = this.protocolVersionToReader.get(protocolVersion);
        if (veniceSpecificDatumReader == null) {
            veniceSpecificDatumReader = cacheDatumReader(protocolVersion, schema);
            this.newSchemaEncountered.accept(Integer.valueOf(protocolVersion), schema);
        }
        return deserialize(bArr, (VeniceSpecificDatumReader<VeniceSpecificDatumReader<SPECIFIC_RECORD>>) veniceSpecificDatumReader, (VeniceSpecificDatumReader<SPECIFIC_RECORD>) specific_record);
    }

    private byte getProtocolVersion(byte[] bArr) {
        if (bArr == null || bArr.length < this.PAYLOAD_OFFSET) {
            throw new IllegalArgumentException("Invalid byte array for serialization - no bytes to read");
        }
        if (this.magicByte == 0) {
            throw new VeniceMessageException("This protocol cannot be used as a Kafka deserializer: " + getClass().getSimpleName());
        }
        if (bArr[0] != this.magicByte) {
            throw new VeniceMessageException("Received Magic Byte '" + new String(bArr, 0, this.MAGIC_BYTE_LENGTH) + "' which is not supported by " + getClass().getSimpleName() + ". The only supported Magic Byte for this implementation is '" + ((int) this.magicByte) + "'.");
        }
        if (this.PROTOCOL_VERSION_LENGTH == 0) {
            throw new VeniceMessageException("This protocol cannot be used as a Kafka deserializer: " + getClass().getSimpleName());
        }
        return bArr[this.PROTOCOL_VERSION_OFFSET];
    }

    private SPECIFIC_RECORD deserialize(byte[] bArr, VeniceSpecificDatumReader<SPECIFIC_RECORD> veniceSpecificDatumReader, SPECIFIC_RECORD specific_record) {
        try {
            return (SPECIFIC_RECORD) veniceSpecificDatumReader.read(specific_record, createBinaryDecoder(bArr, this.PAYLOAD_OFFSET, bArr.length - this.PAYLOAD_OFFSET, threadLocalReusableObjects.get().binaryDecoder));
        } catch (IOException e) {
            throw new VeniceMessageException(getClass().getSimpleName() + " failed to decode message from: " + ByteUtils.toHexString(bArr), e);
        }
    }

    protected BinaryDecoder createBinaryDecoder(byte[] bArr, int i, int i2, BinaryDecoder binaryDecoder) {
        return DECODER_FACTORY.createBinaryDecoder(bArr, i, i2, binaryDecoder);
    }

    private VeniceSpecificDatumReader<SPECIFIC_RECORD> cacheDatumReader(int i, Schema schema) {
        VeniceSpecificDatumReader<SPECIFIC_RECORD> veniceSpecificDatumReader = new VeniceSpecificDatumReader<>(schema, this.compiledProtocol);
        this.protocolVersionToReader.set(i, veniceSpecificDatumReader);
        return veniceSpecificDatumReader;
    }

    private String getCurrentlyLoadedProtocolVersions() {
        return knownProtocols().toString();
    }
}
