package com.linkedin.venice.endToEnd;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.helix.HelixParticipationService;
import com.linkedin.davinci.notifier.LeaderErrorNotifier;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.VeniceUserStoreType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.rocksdb.ComparatorOptions;
import org.rocksdb.util.BytewiseComparator;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/linkedin/venice/endToEnd/TestLeaderReplicaFailover.class */
public class TestLeaderReplicaFailover {
    private static final int TEST_TIMEOUT = 360000;
    String stringSchemaStr = "\"string\"";
    String valueSchemaStr = "{\"type\":\"record\",\"name\":\"KeyRecord\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}";
    AvroSerializer serializer = new AvroSerializer(AvroCompatibilityHelper.parse(new String[]{this.stringSchemaStr}));
    AvroSerializer valueSerializer = new AvroSerializer(AvroCompatibilityHelper.parse(new String[]{this.valueSchemaStr}));
    private String clusterName;
    private VeniceClusterWrapper clusterWrapper;

    @BeforeClass
    public void setUp() {
        this.serializer = new AvroSerializer(AvroCompatibilityHelper.parse(new String[]{"\"string\""}));
        Properties properties = new Properties();
        properties.setProperty("server.promotion.to.leader.replica.delay.seconds", Long.toString(1L));
        properties.put("rocksdb.plain.table.format.enabled", false);
        properties.put("default.offline.push.strategy", OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION);
        new Properties().put("default.partition.size", 10000);
        this.clusterWrapper = ServiceFactory.getVeniceCluster(1, 3, 1, 3, 1, false, false, properties);
        this.clusterName = this.clusterWrapper.getClusterName();
    }

    @AfterClass
    public void cleanUp() {
        this.clusterWrapper.close();
    }

    @Test(timeOut = 360000)
    public void testLeaderReplicaFailover() throws Exception {
        ControllerClient controllerClient = new ControllerClient(this.clusterWrapper.getClusterName(), this.clusterWrapper.getAllControllersURLs());
        TestUtils.assertCommand(controllerClient.configureActiveActiveReplicationForCluster(true, VeniceUserStoreType.BATCH_ONLY.toString(), Optional.empty()));
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file:" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.clusterWrapper, str, uniqueString);
        defaultVPJProps.setProperty("default.offline.push.strategy", OfflinePushStrategy.WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION.toString());
        IntegrationTestPushUtils.createStoreForJob(this.clusterName, writeSimpleAvroFileWithUserSchema.getField("key").schema().toString(), this.valueSchemaStr, defaultVPJProps, new UpdateStoreQueryParams().setPartitionCount(3)).close();
        VersionCreationResponse assertCommand = TestUtils.assertCommand(controllerClient.requestTopicForWrites(uniqueString, 10240L, Version.PushType.BATCH, Version.guidBasedDummyPushId(), true, true, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L));
        String kafkaTopic = assertCommand.getKafkaTopic();
        VeniceWriterFactory veniceWriterFactory = TestUtils.getVeniceWriterFactory(assertCommand.getKafkaBootstrapServers());
        HelixExternalViewRepository routingDataRepository = this.clusterWrapper.getLeaderVeniceController().getVeniceHelixAdmin().getHelixVeniceClusterResources(this.clusterWrapper.getClusterName()).getRoutingDataRepository();
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
            Assert.assertNotNull(routingDataRepository.getLeaderInstance(kafkaTopic, 0));
        });
        Instance leaderInstance = routingDataRepository.getLeaderInstance(kafkaTopic, 0);
        VeniceNotifier veniceNotifier = null;
        for (VeniceServerWrapper veniceServerWrapper : this.clusterWrapper.getVeniceServers()) {
            Assert.assertNotNull(veniceServerWrapper);
            if (veniceServerWrapper.getPort() == leaderInstance.getPort()) {
                HelixParticipationService helixParticipationService = veniceServerWrapper.getVeniceServer().getHelixParticipationService();
                veniceNotifier = new LeaderErrorNotifier(helixParticipationService.getVeniceOfflinePushMonitorAccessor(), helixParticipationService.getStatusStoreWriter(), helixParticipationService.getHelixReadOnlyStoreRepository(), helixParticipationService.getInstance().getNodeId());
                helixParticipationService.replaceAndAddTestIngestionNotifier(veniceNotifier);
            }
        }
        Assert.assertNotNull(veniceNotifier);
        VeniceWriter createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(kafkaTopic).build());
        try {
            createVeniceWriter.broadcastStartOfPush(true, Collections.emptyMap());
            for (Map.Entry<byte[], byte[]> entry : generateData(1000, true, 0, this.serializer).entrySet()) {
                createVeniceWriter.put(entry.getKey(), entry.getValue(), 1, (PubSubProducerCallback) null);
            }
            createVeniceWriter.broadcastEndOfPush(Collections.emptyMap());
            if (createVeniceWriter != null) {
                createVeniceWriter.close();
            }
            TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
                return this.clusterWrapper.getLeaderVeniceController().getVeniceAdmin().getCurrentVersion(this.clusterWrapper.getClusterName(), uniqueString) == 1;
            });
            HelixAdmin helixAdmin = null;
            try {
                helixAdmin = new ZKHelixAdmin(this.clusterWrapper.getZk().getAddress());
                VeniceNotifier veniceNotifier2 = veniceNotifier;
                TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, true, () -> {
                    Assert.assertTrue(veniceNotifier2.hasReportedError());
                    Assert.assertEquals(helixAdmin.getInstanceConfig(this.clusterName, leaderInstance.getNodeId()).getDisabledPartitionsMap().size(), 1);
                });
                this.clusterWrapper.stopVeniceServer(leaderInstance.getPort());
                TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, true, () -> {
                    Assert.assertTrue(this.clusterWrapper.getLeaderVeniceController().getVeniceAdmin().getReplicas(this.clusterName, kafkaTopic).size() == 6);
                });
                this.clusterWrapper.restartVeniceServer(leaderInstance.getPort());
                TestUtils.waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, true, () -> {
                    Assert.assertTrue(this.clusterWrapper.getLeaderVeniceController().getVeniceAdmin().getReplicas(this.clusterName, kafkaTopic).size() == 9);
                    Assert.assertEquals(helixAdmin.getInstanceConfig(this.clusterName, leaderInstance.getNodeId()).getDisabledPartitionsMap().size(), 0);
                });
                if (helixAdmin != null) {
                    helixAdmin.close();
                }
            } catch (Throwable th) {
                if (helixAdmin != null) {
                    helixAdmin.close();
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    private Map<byte[], byte[]> generateData(int i, boolean z, int i2, AvroSerializer avroSerializer) {
        AbstractMap hashMap;
        if (z) {
            BytewiseComparator bytewiseComparator = new BytewiseComparator(new ComparatorOptions());
            hashMap = new TreeMap((bArr, bArr2) -> {
                return bytewiseComparator.compare(ByteBuffer.wrap(bArr), ByteBuffer.wrap(bArr2));
            });
        } else {
            hashMap = new HashMap();
        }
        for (int i3 = i2; i3 < i + i2; i3++) {
            GenericData.Record record = new GenericData.Record(Schema.parse(this.valueSchemaStr));
            record.put("name", "value" + i3);
            hashMap.put(avroSerializer.serialize("key" + i3), this.valueSerializer.serialize(record));
        }
        return hashMap;
    }
}
