package com.linkedin.venice.controller.kafka.consumer;

import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation;
import com.linkedin.venice.controller.kafka.protocol.admin.SchemaMeta;
import com.linkedin.venice.controller.kafka.protocol.admin.StoreCreation;
import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType;
import com.linkedin.venice.controller.kafka.protocol.enums.SchemaType;
import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.integration.utils.PubSubBrokerConfigs;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceControllerCreateOptions;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(priority = -5)
/* loaded from: input_file:com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskIntegrationTest.class */
public class AdminConsumptionTaskIntegrationTest {
    private static final int TIMEOUT = 60000;
    private static final String owner = "test_owner";
    private static final String keySchema = "\"string\"";
    private static final String valueSchema = "\"string\"";
    private String clusterName = Utils.getUniqueString("test-cluster");
    private final AdminOperationSerializer adminOperationSerializer = new AdminOperationSerializer();
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

    @Test(timeOut = 60000)
    public void testSkipMessageEndToEnd() throws ExecutionException, InterruptedException, IOException {
        ZkServerWrapper zkServer = ServiceFactory.getZkServer();
        try {
            PubSubBrokerWrapper pubSubBroker = ServiceFactory.getPubSubBroker(new PubSubBrokerConfigs.Builder().setZkWrapper(zkServer).build());
            try {
                TopicManager topicManager = IntegrationTestPushUtils.getTopicManagerRepo(30000L, 100L, 0L, pubSubBroker.getAddress(), this.pubSubTopicRepository).getTopicManager();
                try {
                    PubSubTopic topic = this.pubSubTopicRepository.getTopic(AdminTopicUtils.getTopicNameFromClusterName(this.clusterName));
                    topicManager.createTopic(topic, 1, 1, true);
                    String str = "test-store";
                    VeniceControllerWrapper veniceController = ServiceFactory.getVeniceController(new VeniceControllerCreateOptions.Builder(this.clusterName, zkServer, pubSubBroker).build());
                    try {
                        VeniceWriter createVeniceWriter = TestUtils.getVeniceWriterFactory(pubSubBroker.getAddress()).createVeniceWriter(new VeniceWriterOptions.Builder(topic.getName()).build());
                        try {
                            long offset = ((PubSubProduceResult) createVeniceWriter.put(new byte[0], getStoreCreationMessage(this.clusterName, "test-store", owner, "invalid_key_schema", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get()).getOffset();
                            createVeniceWriter.put(new byte[0], getStoreCreationMessage(this.clusterName, "test-store", owner, "\"string\"", "\"string\"", 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
                            Thread.sleep(5000L);
                            Assert.assertFalse(veniceController.getVeniceAdmin().hasStore(this.clusterName, "test-store"));
                            ControllerClient controllerClient = new ControllerClient(this.clusterName, veniceController.getControllerUrl());
                            try {
                                controllerClient.skipAdminMessage(Long.toString(offset), false);
                                controllerClient.close();
                                TestUtils.waitForNonDeterministicAssertion(180000L, TimeUnit.MILLISECONDS, () -> {
                                    Assert.assertTrue(veniceController.getVeniceAdmin().hasStore(this.clusterName, str));
                                });
                                if (createVeniceWriter != null) {
                                    createVeniceWriter.close();
                                }
                                if (veniceController != null) {
                                    veniceController.close();
                                }
                                if (topicManager != null) {
                                    topicManager.close();
                                }
                                if (pubSubBroker != null) {
                                    pubSubBroker.close();
                                }
                                if (zkServer != null) {
                                    zkServer.close();
                                }
                            } catch (Throwable th) {
                                try {
                                    controllerClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                                throw th;
                            }
                        } catch (Throwable th3) {
                            if (createVeniceWriter != null) {
                                try {
                                    createVeniceWriter.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        if (veniceController != null) {
                            try {
                                veniceController.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        }
                        throw th5;
                    }
                } catch (Throwable th7) {
                    if (topicManager != null) {
                        try {
                            topicManager.close();
                        } catch (Throwable th8) {
                            th7.addSuppressed(th8);
                        }
                    }
                    throw th7;
                }
            } catch (Throwable th9) {
                if (pubSubBroker != null) {
                    try {
                        pubSubBroker.close();
                    } catch (Throwable th10) {
                        th9.addSuppressed(th10);
                    }
                }
                throw th9;
            }
        } catch (Throwable th11) {
            if (zkServer != null) {
                try {
                    zkServer.close();
                } catch (Throwable th12) {
                    th11.addSuppressed(th12);
                }
            }
            throw th11;
        }
    }

    private byte[] getStoreCreationMessage(String str, String str2, String str3, String str4, String str5, long j) {
        StoreCreation storeCreation = (StoreCreation) AdminMessageType.STORE_CREATION.getNewInstance();
        storeCreation.clusterName = str;
        storeCreation.storeName = str2;
        storeCreation.owner = str3;
        storeCreation.keySchema = new SchemaMeta();
        storeCreation.keySchema.definition = str4;
        storeCreation.keySchema.schemaType = SchemaType.AVRO_1_4.getValue();
        storeCreation.valueSchema = new SchemaMeta();
        storeCreation.valueSchema.definition = str5;
        storeCreation.valueSchema.schemaType = SchemaType.AVRO_1_4.getValue();
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.STORE_CREATION.getValue();
        adminOperation.payloadUnion = storeCreation;
        adminOperation.executionId = j;
        return this.adminOperationSerializer.serialize(adminOperation);
    }
}
