package com.linkedin.venice.hadoop.input.kafka.ttl;

import com.linkedin.venice.hadoop.AbstractVeniceFilter;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.hadoop.schema.HDFSRmdSchemaSource;
import com.linkedin.venice.schema.rmd.RmdUtils;
import com.linkedin.venice.schema.rmd.RmdVersionId;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;

/* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/ttl/VeniceRmdTTLFilter.class */
public abstract class VeniceRmdTTLFilter<INPUT_VALUE> extends AbstractVeniceFilter<INPUT_VALUE> {
    private final TTLResolutionPolicy ttlPolicy;
    private final long ttlInMs;
    private final HDFSRmdSchemaSource schemaSource;
    protected final Map<RmdVersionId, Schema> rmdMapping;

    public VeniceRmdTTLFilter(VeniceProperties veniceProperties) throws IOException {
        super(veniceProperties);
        this.ttlPolicy = TTLResolutionPolicy.valueOf(veniceProperties.getInt(VenicePushJob.REPUSH_TTL_POLICY));
        this.ttlInMs = TimeUnit.SECONDS.toMillis(veniceProperties.getLong(VenicePushJob.REPUSH_TTL_IN_SECONDS));
        this.schemaSource = new HDFSRmdSchemaSource(veniceProperties.getString(VenicePushJob.RMD_SCHEMA_DIR));
        this.rmdMapping = this.schemaSource.fetchSchemas();
    }

    @Override // com.linkedin.venice.hadoop.AbstractVeniceFilter
    public boolean apply(INPUT_VALUE input_value) {
        if (skipRmdRecord(input_value)) {
            return false;
        }
        Instant now = Instant.now();
        switch (this.ttlPolicy) {
            case RT_WRITE_ONLY:
                return ChronoUnit.MILLIS.between(Instant.ofEpochMilli(getTimeStampFromRmdRecord(input_value)), now) > this.ttlInMs;
            default:
                throw new UnsupportedOperationException(this.ttlPolicy + " policy is not supported.");
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.schemaSource.close();
    }

    public long getTimeStampFromRmdRecord(INPUT_VALUE input_value) {
        ByteBuffer rmdPayload = getRmdPayload(input_value);
        if (rmdPayload == null || !rmdPayload.hasRemaining()) {
            throw new IllegalStateException("The record doesn't contain required RMD field. Please check if your store has A/A enabled");
        }
        int rmdId = getRmdId(input_value);
        Schema schema = this.rmdMapping.get(new RmdVersionId(getSchemaId(input_value), rmdId));
        return RmdUtils.extractTimestampFromRmd(RmdUtils.deserializeRmdBytes(schema, schema, rmdPayload)).stream().mapToLong(l -> {
            return l.longValue();
        }).max().orElseThrow(NoSuchElementException::new);
    }

    protected abstract int getSchemaId(INPUT_VALUE input_value);

    protected abstract int getRmdId(INPUT_VALUE input_value);

    protected abstract ByteBuffer getRmdPayload(INPUT_VALUE input_value);

    protected boolean skipRmdRecord(INPUT_VALUE input_value) {
        return false;
    }
}
