package com.linkedin.venice.hadoop;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.exceptions.VeniceSchemaFieldNotFoundException;
import com.linkedin.venice.hadoop.partitioner.BuggyOffsettingMapReduceShufflePartitioner;
import com.linkedin.venice.hadoop.partitioner.BuggySprayingMapReduceShufflePartitioner;
import com.linkedin.venice.hadoop.partitioner.NonDeterministicVenicePartitioner;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/TestVenicePushJob.class */
public class TestVenicePushJob {
    private static final int TEST_TIMEOUT = 60000;
    private VeniceClusterWrapper veniceCluster;
    private ControllerClient controllerClient;

    /* JADX WARN: String concatenation convert failed
    jadx.core.utils.exceptions.JadxRuntimeException: Can't remove SSA var: r7v0 java.lang.String, still in use, count: 1, list:
      (r7v0 java.lang.String) from STR_CONCAT (r7v0 java.lang.String), (",{"name": "favorite_color", "type": "string", "default": "blue"}
    ") A[MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
    	at jadx.core.utils.InsnRemover.removeSsaVar(InsnRemover.java:151)
    	at jadx.core.utils.InsnRemover.unbindResult(InsnRemover.java:116)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:80)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.dex.visitors.SimplifyVisitor.removeStringBuilderInsns(SimplifyVisitor.java:495)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertStringBuilderChain(SimplifyVisitor.java:422)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertInvoke(SimplifyVisitor.java:314)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyInsn(SimplifyVisitor.java:145)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyArgs(SimplifyVisitor.java:114)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyInsn(SimplifyVisitor.java:132)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyBlock(SimplifyVisitor.java:86)
    	at jadx.core.dex.visitors.SimplifyVisitor.visit(SimplifyVisitor.java:71)
     */
    protected static Schema writeComplicatedAvroFileWithUserSchema(File file, boolean z) throws IOException {
        String str;
        Schema parse = Schema.parse(new StringBuilder().append(z ? str + ",{\"name\": \"favorite_color\", \"type\": \"string\", \"default\": \"blue\"}\n" : "{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"User\",\n \"fields\": [\n      { \"name\": \"id\", \"type\": \"string\"},\n      {\n       \"name\": \"value\",\n       \"type\": {\n           \"type\": \"record\",\n           \"name\": \"ValueRecord\",\n           \"fields\" : [\n              {\"name\": \"favorite_number\", \"type\": \"int\"}\n").append("           ]\n        }\n      }\n ]\n}").toString());
        File file2 = new File(file, "simple_user.avro");
        DataFileWriter dataFileWriter = new DataFileWriter(new GenericDatumWriter(parse));
        try {
            dataFileWriter.create(parse, file2);
            for (int i = 1; i <= 100; i++) {
                GenericData.Record record = new GenericData.Record(parse);
                record.put("id", Integer.toString(i));
                GenericData.Record record2 = new GenericData.Record(parse.getField("value").schema());
                record2.put("favorite_number", Integer.valueOf(i));
                if (z) {
                    record2.put("favorite_color", "red");
                }
                record.put("value", record2);
                dataFileWriter.append(record);
            }
            dataFileWriter.close();
            return parse;
        } catch (Throwable th) {
            try {
                dataFileWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    protected static Schema writeSimpleAvroFileWithDifferentUserSchema(File file) throws IOException {
        Schema parse = Schema.parse("{  \"namespace\" : \"example.avro\",    \"type\": \"record\",     \"name\": \"User\",       \"fields\": [                  { \"name\": \"key\", \"type\": \"string\" },         { \"name\": \"value\", \"type\": \"string\" },         { \"name\": \"age\", \"type\": \"int\" },         { \"name\": \"company\", \"type\": \"string\" }    ]  } ");
        File file2 = new File(file, "simple_user_with_different_schema.avro");
        DataFileWriter dataFileWriter = new DataFileWriter(new GenericDatumWriter(parse));
        try {
            dataFileWriter.create(parse, file2);
            for (int i = 1; i <= 100; i++) {
                GenericData.Record record = new GenericData.Record(parse);
                record.put("key", Integer.toString(i));
                record.put("value", "test_name_" + i);
                record.put("age", Integer.valueOf(i));
                record.put("company", "company_" + i);
                dataFileWriter.append(record);
            }
            dataFileWriter.close();
            return parse;
        } catch (Throwable th) {
            try {
                dataFileWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @BeforeClass
    public void setUp() {
        Utils.thisIsLocalhost();
        this.veniceCluster = ServiceFactory.getVeniceCluster(new VeniceClusterCreateOptions.Builder().sslToStorageNodes(true).build());
        this.controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getRandomRouterURL());
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.controllerClient});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceCluster});
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Inconsistent file.* schema found.*")
    public void testRunJobWithInputHavingDifferentSchema() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        writeSimpleAvroFileWithDifferentUserSchema(tempDataDirectory);
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        TestWriteUtils.runPushJob("Test push job", IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString));
    }

    @Test(expectedExceptions = {VeniceSchemaFieldNotFoundException.class}, expectedExceptionsMessageRegExp = ".*Could not find field: id1.*")
    public void testRunJobWithInvalidKeyField() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        defaultVPJProps.put("key.field", "id1");
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceSchemaFieldNotFoundException.class}, expectedExceptionsMessageRegExp = ".*Could not find field: name1.*")
    public void testRunJobWithInvalidValueField() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString);
        defaultVPJProps.put("value.field", "name1");
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceSchemaFieldNotFoundException.class}, expectedExceptionsMessageRegExp = ".*Could not find field: name1.*")
    public void testRunJobWithInvalidValueFieldVson() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleVsonFileWithUserSchema(tempDataDirectory);
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString);
        defaultVPJProps.put("key.field", "");
        defaultVPJProps.put("value.field", "name1");
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*should not have sub directory.*")
    public void testRunJobWithSubDirInInputDir() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        new File(tempDataDirectory, "sub-dir").mkdir();
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        TestWriteUtils.runPushJob("Test push job", IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString));
    }

    @Test(timeOut = 60000)
    public void testRunJobByPickingUpLatestFolder() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        new File(tempDataDirectory, "v1").mkdir();
        File file = new File(tempDataDirectory, "v2");
        file.mkdir();
        File file2 = new File(file, "v1");
        file2.mkdir();
        File file3 = new File(file, "v2");
        file3.mkdir();
        new File(file, "v3.avro").createNewFile();
        FileSystem fileSystem = FileSystem.get(new Configuration());
        Assert.assertEquals(VenicePushJob.getLatestPathOfInputDirectory("file:" + tempDataDirectory.getAbsolutePath() + "/#LATEST", fileSystem).toString(), "file:" + file.getAbsolutePath(), "VenicePushJob should parse #LATEST to latest directory when it is in the last level in the input path");
        Assert.assertEquals(VenicePushJob.getLatestPathOfInputDirectory("file:" + tempDataDirectory.getAbsolutePath() + "/#LATEST/v1", fileSystem).toString(), "file:" + file2.getAbsolutePath(), "VenicePushJob should parse #LATEST to latest directory when it is only in an intermediate level in the input path");
        Assert.assertEquals(VenicePushJob.getLatestPathOfInputDirectory("file:" + tempDataDirectory.getAbsolutePath() + "/#LATEST/#LATEST", fileSystem).toString(), "file:" + file3.getAbsolutePath(), "VenicePushJob should parse all occurrences of #LATEST to respective latest directories");
        Assert.assertEquals(VenicePushJob.getLatestPathOfInputDirectory("file:" + tempDataDirectory.getAbsolutePath() + "/#LATEST/#LATEST/", fileSystem).toString(), "file:" + file3.getAbsolutePath(), "VenicePushJob should parse #LATEST to latest directory to respective latest directories");
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Key schema mis-match for store.*")
    public void testRunJobWithDifferentKeySchemaConfig() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String uniqueString = Utils.getUniqueString("store");
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        IntegrationTestPushUtils.createStoreForJob(this.veniceCluster.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps).close();
        defaultVPJProps.setProperty("key.field", "age");
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Failed to validate value schema.*")
    public void testRunJobMultipleTimesWithInCompatibleValueSchemaConfig() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), Utils.getUniqueString("store"));
        IntegrationTestPushUtils.createStoreForJob(this.veniceCluster.getClusterName(), writeSimpleAvroFileWithUserSchema, defaultVPJProps).close();
        defaultVPJProps.setProperty("value.field", "age");
        defaultVPJProps.setProperty("controller.request.retry.attempts", "2");
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
    }

    @Test(timeOut = 60000)
    public void testRunJobWithEOPSuppressed() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String uniqueString = Utils.getUniqueString("store");
        ControllerClient controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getRandomRouterURL());
        Schema writeSimpleAvroFileWithStringToStringSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(tempDataDirectory, false);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        defaultVPJProps.setProperty("suppress.end.of.push.message", "true");
        IntegrationTestPushUtils.createStoreForJob(this.veniceCluster.getClusterName(), writeSimpleAvroFileWithStringToStringSchema, defaultVPJProps);
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
        TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(controllerClient.getFutureVersions(this.veniceCluster.getClusterName(), uniqueString).getStoreStatusMap().size(), 1);
            controllerClient.writeEndOfPush(uniqueString, 1);
        });
        TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), 1);
        });
    }

    @Test(timeOut = 60000)
    public void testRunJobWithDeferredVersionSwap() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String uniqueString = Utils.getUniqueString("store");
        ControllerClient controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getRandomRouterURL());
        Schema writeSimpleAvroFileWithStringToStringSchema = TestWriteUtils.writeSimpleAvroFileWithStringToStringSchema(tempDataDirectory, false);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        defaultVPJProps.setProperty("defer.version.swap", "true");
        IntegrationTestPushUtils.createStoreForJob(this.veniceCluster.getClusterName(), writeSimpleAvroFileWithStringToStringSchema, defaultVPJProps);
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
        TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(controllerClient.getFutureVersions(this.veniceCluster.getClusterName(), uniqueString).getStoreStatusMap().size(), 1);
            controllerClient.overrideSetActiveVersion(uniqueString, 1);
        });
        TestUtils.waitForNonDeterministicAssertion(20L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), 1);
        });
    }

    @Test(timeOut = 60000)
    public void testRunJobMultipleTimesWithCompatibleValueSchemaConfig() throws Exception {
        Schema writeComplicatedAvroFileWithUserSchema = writeComplicatedAvroFileWithUserSchema(TestWriteUtils.getTempDataDirectory(), false);
        Schema schema = writeComplicatedAvroFileWithUserSchema.getField("id").schema();
        Schema schema2 = writeComplicatedAvroFileWithUserSchema.getField("value").schema();
        String uniqueString = Utils.getUniqueString("store");
        ControllerClient controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getRandomRouterURL());
        controllerClient.createNewStore(uniqueString, "owner", schema.toString(), schema2.toString());
        controllerClient.addValueSchema(uniqueString, writeComplicatedAvroFileWithUserSchema(TestWriteUtils.getTempDataDirectory(), true).getField("value").schema().toString());
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Store does not have write compute enabled.*")
    public void testWCJobWithStoreNotWCEnabled() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String uniqueString = Utils.getUniqueString("store");
        ControllerClient controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getRandomRouterURL());
        UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams();
        updateStoreQueryParams.setWriteComputationEnabled(false);
        updateStoreQueryParams.setIncrementalPushEnabled(true);
        controllerClient.createNewStoreWithParameters(uniqueString, "owner", "\"string\"", "\"string\"", updateStoreQueryParams);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        defaultVPJProps.put("venice.write.compute.enable", true);
        defaultVPJProps.put("incremental.push", true);
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Write compute is only available for incremental push jobs.*")
    public void testWCBatchJob() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String uniqueString = Utils.getUniqueString("store");
        ControllerClient controllerClient = new ControllerClient(this.veniceCluster.getClusterName(), this.veniceCluster.getRandomRouterURL());
        UpdateStoreQueryParams updateStoreQueryParams = new UpdateStoreQueryParams();
        updateStoreQueryParams.setWriteComputationEnabled(true);
        updateStoreQueryParams.setIncrementalPushEnabled(false);
        controllerClient.createNewStoreWithParameters(uniqueString, "owner", "\"string\"", "\"string\"", updateStoreQueryParams);
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        defaultVPJProps.put("venice.write.compute.enable", true);
        defaultVPJProps.put("incremental.push", false);
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Exception or error caught during VenicePushJob.*")
    public void testRunJobWithBuggySprayingMapReduceShufflePartitioner() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        TestUtils.assertCommand(this.veniceCluster.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setPartitionCount(3)));
        TestWriteUtils.runPushJob("Test push job", IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString), venicePushJob -> {
            venicePushJob.setMapRedPartitionerClass(BuggySprayingMapReduceShufflePartitioner.class);
        });
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Exception or error caught during VenicePushJob.*")
    public void testRunJobWithBuggyOffsettingMapReduceShufflePartitioner() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        TestUtils.assertCommand(this.veniceCluster.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setPartitionCount(3)));
        TestWriteUtils.runPushJob("Test push job", IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString), venicePushJob -> {
            venicePushJob.setMapRedPartitionerClass(BuggyOffsettingMapReduceShufflePartitioner.class);
        });
    }

    @Test(timeOut = 60000, expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = ".*Exception or error caught during VenicePushJob.*")
    public void testRunJobWithNonDeterministicPartitioner() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        String name = NonDeterministicVenicePartitioner.class.getName();
        TestUtils.assertCommand(this.veniceCluster.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setPartitionCount(3).setPartitionerClass(name)));
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString);
        defaultVPJProps.setProperty("venice.partitioners", name);
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
    }

    @Test(timeOut = 60000, description = "KIF repush should copy all data including recent incPush2RT to new VT")
    public void testKIFRepushForIncrementalPushStores() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        TestUtils.assertCommand(this.veniceCluster.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setPartitionCount(2).setIncrementalPushEnabled(true)));
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString);
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(this.controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), 1);
        });
        TestWriteUtils.writeSimpleAvroFileWithUserSchema2(tempDataDirectory);
        defaultVPJProps.setProperty("incremental.push", "true");
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(this.controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), 1);
        });
        defaultVPJProps.setProperty("incremental.push", "false");
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()));
        for (int i = 1; i <= 50; i++) {
            try {
                Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            } finally {
            }
        }
        for (int i2 = 51; i2 <= 150; i2++) {
            Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i2)).get().toString(), "test_name_" + (i2 * 2));
        }
        if (andStartGenericAvroClient != null) {
            andStartGenericAvroClient.close();
        }
        defaultVPJProps.setProperty("source.kafka", "true");
        defaultVPJProps.setProperty("kafka.input.topic", Version.composeKafkaTopic(uniqueString, 1));
        defaultVPJProps.setProperty("kafka.input.broker.url", this.veniceCluster.getKafka().getAddress());
        defaultVPJProps.setProperty("kafka.input.max.records.per.mapper", "5");
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
        TestUtils.waitForNonDeterministicAssertion(30L, TimeUnit.SECONDS, () -> {
            Assert.assertEquals(this.controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), 2);
        });
        andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()));
        for (int i3 = 1; i3 <= 50; i3++) {
            try {
                Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i3)).get().toString(), "test_name_" + i3);
            } finally {
            }
        }
        for (int i4 = 51; i4 <= 150; i4++) {
            Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i4)).get().toString(), "test_name_" + (i4 * 2));
        }
        if (andStartGenericAvroClient != null) {
            andStartGenericAvroClient.close();
        }
    }

    @Test(timeOut = 60000, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testKIFRepushFetch(boolean z) throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        String str = "file://" + tempDataDirectory.getAbsolutePath();
        String uniqueString = Utils.getUniqueString("store");
        this.veniceCluster.getNewStore(uniqueString);
        TestUtils.assertCommand(this.veniceCluster.updateStore(uniqueString, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setPartitionCount(2).setIncrementalPushEnabled(true).setWriteComputationEnabled(true)));
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, str, uniqueString);
        defaultVPJProps.setProperty("send.control.messages.directly", "true");
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()));
        for (int i = 1; i <= 100; i++) {
            try {
                Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i)).get().toString(), "test_name_" + i);
            } finally {
            }
        }
        if (andStartGenericAvroClient != null) {
            andStartGenericAvroClient.close();
        }
        defaultVPJProps.setProperty("source.kafka", "true");
        defaultVPJProps.setProperty("kafka.input.fabric", "dc-0");
        defaultVPJProps.setProperty("kafka.input.max.records.per.mapper", "5");
        TestUtils.assertCommand(this.veniceCluster.updateStore(uniqueString, new UpdateStoreQueryParams().setHybridOffsetLagThreshold(1L).setHybridRewindSeconds(0L).setChunkingEnabled(z)));
        TestWriteUtils.runPushJob("Test push job", defaultVPJProps);
        andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()));
        for (int i2 = 1; i2 <= 100; i2++) {
            try {
                Assert.assertEquals(andStartGenericAvroClient.get(Integer.toString(i2)).get().toString(), "test_name_" + i2);
            } finally {
            }
        }
        if (andStartGenericAvroClient != null) {
            andStartGenericAvroClient.close();
        }
    }
}
