package com.linkedin.venice.hadoop.input.kafka.ttl;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.hadoop.AbstractVeniceFilter;
import com.linkedin.venice.hadoop.FilterChain;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.hadoop.schema.HDFSRmdSchemaSource;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/ttl/TestVeniceKafkaInputTTLFilter.class */
public class TestVeniceKafkaInputTTLFilter {
    private static final long TTL_IN_SECONDS_DEFAULT = 3600;
    private static final String TEST_STORE = "test_store";
    private static final String VALUE_RECORD_SCHEMA_STR = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"default\":\"venice\"}]}";
    private Schema valueSchema;
    private Schema rmdSchema;
    private HDFSRmdSchemaSource source;
    private VeniceKafkaInputTTLFilter filterWithSupportedPolicy;
    private FilterChain<KafkaInputMapperValue> filterChain;

    @BeforeClass
    public void setUp() throws IOException {
        this.valueSchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(VALUE_RECORD_SCHEMA_STR);
        this.rmdSchema = RmdSchemaGenerator.generateMetadataSchema(this.valueSchema, 1);
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        Properties properties = new Properties();
        properties.put("repush.ttl.seconds", Long.valueOf(TTL_IN_SECONDS_DEFAULT));
        properties.put("repush.ttl.policy", 0);
        properties.put("rmd.schema.dir", tempDataDirectory.getAbsolutePath());
        properties.put("venice.store.name", TEST_STORE);
        VeniceProperties veniceProperties = new VeniceProperties(properties);
        setupHDFS(veniceProperties);
        this.filterWithSupportedPolicy = new VeniceKafkaInputTTLFilter(veniceProperties);
        this.filterChain = new FilterChain<>(new AbstractVeniceFilter[]{this.filterWithSupportedPolicy});
    }

    private void setupHDFS(VeniceProperties veniceProperties) throws IOException {
        ControllerClient controllerClient = (ControllerClient) Mockito.mock(ControllerClient.class);
        MultiSchemaResponse.Schema[] generateMultiSchema = generateMultiSchema(1);
        MultiSchemaResponse multiSchemaResponse = new MultiSchemaResponse();
        multiSchemaResponse.setSchemas(generateMultiSchema);
        ((ControllerClient) Mockito.doReturn(multiSchemaResponse).when(controllerClient)).getAllReplicationMetadataSchemas(TEST_STORE);
        this.source = new HDFSRmdSchemaSource(veniceProperties.getString("rmd.schema.dir"), TEST_STORE);
        this.source.loadRmdSchemasOnDisk(controllerClient);
    }

    private MultiSchemaResponse.Schema[] generateMultiSchema(int i) {
        MultiSchemaResponse.Schema[] schemaArr = new MultiSchemaResponse.Schema[i];
        for (int i2 = 1; i2 <= i; i2++) {
            MultiSchemaResponse.Schema schema = new MultiSchemaResponse.Schema();
            schema.setRmdValueSchemaId(i2);
            schema.setDerivedSchemaId(i2);
            schema.setId(i2);
            schema.setSchemaStr(this.rmdSchema.toString());
            schemaArr[i2 - 1] = schema;
        }
        return schemaArr;
    }

    @Test
    public void testFilterChain() {
        Assert.assertFalse(this.filterChain.isEmpty());
    }

    @Test
    public void testFilterWithRTPolicyWithValidValues() {
        int i = 0;
        int i2 = 0;
        Iterator<KafkaInputMapperValue> it = generateRecord(4, 6, 4, Instant.now(), TTL_IN_SECONDS_DEFAULT).iterator();
        while (it.hasNext()) {
            if (this.filterWithSupportedPolicy.apply(it.next())) {
                i2++;
            } else {
                i++;
            }
        }
        Assert.assertEquals(i, 8);
        Assert.assertEquals(i2, 6);
    }

    @Test(expectedExceptions = {IllegalStateException.class})
    public void testFilterWithRTPolicyWithInValidValues() {
        Assert.assertFalse(this.filterWithSupportedPolicy.apply(new KafkaInputMapperValue()));
    }

    private List<KafkaInputMapperValue> generateRecord(int i, int i2, int i3, Instant instant, long j) {
        ArrayList arrayList = new ArrayList();
        for (int i4 = 0; i4 < i; i4++) {
            arrayList.add(generateKIMWithRmdTimeStamp(instant.toEpochMilli(), false));
        }
        Instant minus = instant.minus(j + 1, (TemporalUnit) ChronoUnit.SECONDS);
        for (int i5 = 0; i5 < i2; i5++) {
            arrayList.add(generateKIMWithRmdTimeStamp(minus.toEpochMilli(), false));
        }
        for (int i6 = 0; i6 < i3; i6++) {
            arrayList.add(generateKIMWithRmdTimeStamp(minus.toEpochMilli(), true));
        }
        return arrayList;
    }

    private KafkaInputMapperValue generateKIMWithRmdTimeStamp(long j, boolean z) {
        KafkaInputMapperValue kafkaInputMapperValue = new KafkaInputMapperValue();
        kafkaInputMapperValue.schemaId = z ? -10 : 1;
        kafkaInputMapperValue.replicationMetadataVersionId = 1;
        kafkaInputMapperValue.replicationMetadataPayload = RmdUtils.serializeRmdRecord(this.rmdSchema, generateRmdRecordWithValueLevelTimeStamp(j));
        return kafkaInputMapperValue;
    }

    private GenericRecord generateRmdRecordWithValueLevelTimeStamp(long j) {
        List asList = Arrays.asList(1L, 2L, 3L);
        GenericData.Record record = new GenericData.Record(this.rmdSchema);
        record.put("timestamp", Long.valueOf(j));
        record.put("replication_checkpoint_vector", asList);
        return record;
    }
}
