package com.linkedin.venice.consumer;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceMessageException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapter;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.serialization.DefaultSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/consumer/ConsumerIntegrationTest.class */
public abstract class ConsumerIntegrationTest {
    private static final String TEST_KEY = "key1";
    private static final String NEW_FIELD_NAME = "newField";
    public static final int NEW_PROTOCOL_VERSION = ((Integer) AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.currentProtocolVersion.get()).intValue() + 1;
    public static final Schema NEW_PROTOCOL_SCHEMA;
    private VeniceClusterWrapper cluster;
    private ControllerClient controllerClient;
    private String store;
    private int version;
    private String topicName;
    private AvroGenericStoreClient client;

    /* loaded from: input_file:com/linkedin/venice/consumer/ConsumerIntegrationTest$ApacheKafkaProducerWithNewerProtocolAdapter.class */
    private static class ApacheKafkaProducerWithNewerProtocolAdapter extends ApacheKafkaProducerAdapter {
        public ApacheKafkaProducerWithNewerProtocolAdapter(VeniceProperties veniceProperties) {
            super(new ApacheKafkaProducerConfig(veniceProperties, (String) null, (String) null, false));
        }
    }

    /* loaded from: input_file:com/linkedin/venice/consumer/ConsumerIntegrationTest$KafkaValueSerializerWithNewerProtocol.class */
    public static class KafkaValueSerializerWithNewerProtocol extends KafkaValueSerializer {
        public byte[] serialize(String str, KafkaMessageEnvelope kafkaMessageEnvelope) {
            return ConsumerIntegrationTest.serializeNewProtocol(kafkaMessageEnvelope);
        }

        public Schema getCompiledProtocol() {
            return ConsumerIntegrationTest.NEW_PROTOCOL_SCHEMA;
        }
    }

    /* loaded from: input_file:com/linkedin/venice/consumer/ConsumerIntegrationTest$NewKafkaMessageEnvelopeWithExtraField.class */
    static class NewKafkaMessageEnvelopeWithExtraField extends KafkaMessageEnvelope {
        public static final Schema SCHEMA$ = ConsumerIntegrationTest.NEW_PROTOCOL_SCHEMA;
        private static final int newFieldIndex = getNewFieldIndex();
        public Integer newField;

        public Schema getSchema() {
            return SCHEMA$;
        }

        public Object get(int i) {
            if (newFieldIndex == i) {
                return this.newField;
            }
            Schema.Field field = super.getSchema().getField(((Schema.Field) getSchema().getFields().get(i)).name());
            if (field == null) {
                throw new IllegalStateException();
            }
            return super.get(field.pos());
        }

        public void put(int i, Object obj) {
            if (newFieldIndex == i) {
                this.newField = (Integer) obj;
                return;
            }
            Schema.Field field = super.getSchema().getField(((Schema.Field) getSchema().getFields().get(i)).name());
            if (field == null) {
                throw new IllegalStateException();
            }
            super.put(field.pos(), obj);
        }

        private static int getNewFieldIndex() {
            for (Schema.Field field : SCHEMA$.getFields()) {
                if (field.name().equals(ConsumerIntegrationTest.NEW_FIELD_NAME)) {
                    return field.pos();
                }
            }
            throw new IllegalStateException("Missing a field called 'newField' in the schema!");
        }
    }

    @BeforeClass
    public void sharedSetUp() {
        this.cluster = ServiceFactory.getVeniceCluster();
        this.controllerClient = ControllerClient.constructClusterControllerClient(this.cluster.getClusterName(), this.cluster.getAllControllersURLs());
        extraBeforeClassSetUp(this.cluster, this.controllerClient);
    }

    void extraBeforeClassSetUp(VeniceClusterWrapper veniceClusterWrapper, ControllerClient controllerClient) {
    }

    @BeforeMethod
    public void testSetUp() {
        this.store = Utils.getUniqueString("consumer_integ_test");
        this.version = 1;
        this.topicName = Version.composeRealTimeTopic(this.store);
        this.cluster.getNewStore(this.store);
        this.controllerClient.updateStore(this.store, new UpdateStoreQueryParams().setHybridRewindSeconds(25L).setHybridOffsetLagThreshold(2L));
        this.controllerClient.emptyPush(this.store, "test_push", 1L);
        TestUtils.waitForNonDeterministicAssertion(15L, TimeUnit.SECONDS, () -> {
            StoreResponse store = this.controllerClient.getStore(this.store);
            Assert.assertFalse(store.isError());
            Assert.assertEquals(store.getStore().getCurrentVersion(), this.version, "The empty push has not activated the store.");
        });
        this.client = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.store).setVeniceURL(this.cluster.getRandomRouterURL()));
    }

    @AfterMethod
    public void testCleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.client});
    }

    @AfterClass
    public void sharedCleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.cluster});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.controllerClient});
    }

    @Test(timeOut = 60000)
    public void testForwardCompatibility() throws ExecutionException, InterruptedException {
        VeniceWriter<String, String, byte[]> veniceWriter = this.cluster.getVeniceWriter(this.topicName);
        try {
            writeAndVerifyRecord(veniceWriter, this.client, "value1");
            if (veniceWriter != null) {
                veniceWriter.close();
            }
            Properties properties = new Properties();
            properties.put("kafka.value.serializer", KafkaValueSerializerWithNewerProtocol.class.getName());
            properties.put("kafka.bootstrap.servers", this.cluster.getKafka().getAddress());
            VeniceProperties veniceProperties = new VeniceProperties(properties);
            VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer("\"string\"");
            VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer("\"string\"");
            VeniceWriterWithNewerProtocol veniceWriter2 = getVeniceWriter(new VeniceWriterOptions.Builder(this.topicName).setKeySerializer(veniceAvroKafkaSerializer).setValueSerializer(veniceAvroKafkaSerializer2).setWriteComputeSerializer(new DefaultSerializer()).setTime(new SystemTime()).setPartitioner(new DefaultVenicePartitioner(veniceProperties)).build(), veniceProperties, new ApacheKafkaProducerWithNewerProtocolAdapter(veniceProperties), NEW_PROTOCOL_SCHEMA);
            try {
                writeAndVerifyRecord(veniceWriter2, this.client, "value2");
                if (veniceWriter2 != null) {
                    veniceWriter2.close();
                }
            } catch (Throwable th) {
                if (veniceWriter2 != null) {
                    try {
                        veniceWriter2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (veniceWriter != null) {
                try {
                    veniceWriter.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    abstract VeniceWriterWithNewerProtocol getVeniceWriter(VeniceWriterOptions veniceWriterOptions, VeniceProperties veniceProperties, PubSubProducerAdapter pubSubProducerAdapter, Schema schema);

    private void writeAndVerifyRecord(VeniceWriter<String, String, byte[]> veniceWriter, AvroGenericStoreClient avroGenericStoreClient, String str) throws ExecutionException, InterruptedException {
        veniceWriter.put(TEST_KEY, str, 1).get();
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
            try {
                Object obj = avroGenericStoreClient.get(TEST_KEY).get();
                Assert.assertNotNull(obj, "The key written by the " + veniceWriter.getClass().getSimpleName() + " is not in the store yet.");
                Assert.assertEquals(obj.toString(), str, "The key written by the " + veniceWriter.getClass().getSimpleName() + " is not valid.");
            } catch (ExecutionException e) {
                Assert.fail("Caught exception: " + e.getMessage());
            }
        });
    }

    public static byte[] serializeNewProtocol(GenericRecord genericRecord) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            BinaryEncoder newBinaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(byteArrayOutputStream);
            byteArrayOutputStream.write(((Byte) AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getMagicByte().get()).byteValue());
            byteArrayOutputStream.write((byte) NEW_PROTOCOL_VERSION);
            new GenericDatumWriter(genericRecord.getSchema()).write(genericRecord, newBinaryEncoder);
            newBinaryEncoder.flush();
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new VeniceMessageException("Failed to encode message: " + genericRecord.toString(), e);
        }
    }

    static {
        Schema currentProtocolVersionSchema = AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersionSchema();
        List list = (List) currentProtocolVersionSchema.getFields().stream().map(field -> {
            return AvroCompatibilityHelper.newField(field).build();
        }).collect(Collectors.toList());
        list.add(0, AvroCompatibilityHelper.newField((Schema.Field) null).setName(NEW_FIELD_NAME).setSchema(Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT)))).setOrder(Schema.Field.Order.ASCENDING).setDefault((Object) null).build());
        NEW_PROTOCOL_SCHEMA = Schema.createRecord(currentProtocolVersionSchema.getName(), currentProtocolVersionSchema.getDoc(), currentProtocolVersionSchema.getNamespace(), false);
        NEW_PROTOCOL_SCHEMA.setFields(list);
    }
}
