package com.linkedin.venice.ingestion.control;

import com.linkedin.venice.VeniceConstants;
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.TopicDoesNotExistException;
import com.linkedin.venice.kafka.TopicException;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher.class */
public class RealTimeTopicSwitcher {
    private static final Logger LOGGER = LogManager.getLogger(RealTimeTopicSwitcher.class);
    private final TopicManager topicManager;
    private final String destKafkaBootstrapServers;
    private final VeniceWriterFactory veniceWriterFactory;
    private final Time timer = new SystemTime();
    private final Integer kafkaReplicationFactorForRTTopics;
    private final Optional<Integer> minSyncReplicasForRTTopics;
    private final PubSubTopicRepository pubSubTopicRepository;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/venice/ingestion/control/RealTimeTopicSwitcher$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$meta$BufferReplayPolicy = new int[BufferReplayPolicy.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$meta$BufferReplayPolicy[BufferReplayPolicy.REWIND_FROM_SOP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$meta$BufferReplayPolicy[BufferReplayPolicy.REWIND_FROM_EOP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public RealTimeTopicSwitcher(TopicManager topicManager, VeniceWriterFactory veniceWriterFactory, VeniceProperties veniceProperties, PubSubTopicRepository pubSubTopicRepository) {
        this.topicManager = topicManager;
        this.veniceWriterFactory = veniceWriterFactory;
        this.pubSubTopicRepository = pubSubTopicRepository;
        this.destKafkaBootstrapServers = veniceProperties.getBoolean("ssl.to.kakfa", false) ? veniceProperties.getString("ssl.kafka.bootstrap.servers") : veniceProperties.getString("kafka.bootstrap.servers");
        this.kafkaReplicationFactorForRTTopics = Integer.valueOf(veniceProperties.getInt("kafka.replication.factor.rt.topics", veniceProperties.getInt("kafka.replication.factor", 3)));
        this.minSyncReplicasForRTTopics = veniceProperties.getOptionalInt("kafka.min.in.sync.replicas.rt.topics");
    }

    void sendTopicSwitch(PubSubTopic pubSubTopic, PubSubTopic pubSubTopic2, long j, List<String> list) throws TopicException {
        String str = "Cannot send TopicSwitch into '" + pubSubTopic2 + "' instructing to switch to '" + pubSubTopic + "' because";
        if (pubSubTopic.equals(pubSubTopic2)) {
            throw new DuplicateTopicException(str + " they are the same topic.");
        }
        if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(pubSubTopic)) {
            throw new TopicDoesNotExistException(str + " topic " + pubSubTopic + " does not exist.");
        }
        if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(pubSubTopic2)) {
            throw new TopicDoesNotExistException(str + " topic " + pubSubTopic2 + " does not exist.");
        }
        int size = getTopicManager().partitionsFor(pubSubTopic2).size();
        ArrayList arrayList = new ArrayList();
        if (list.isEmpty()) {
            arrayList.add(this.destKafkaBootstrapServers);
        } else {
            arrayList.addAll(list);
        }
        VeniceWriter createVeniceWriter = getVeniceWriterFactory().createVeniceWriter(new VeniceWriterOptions.Builder(pubSubTopic2.getName()).setTime(getTimer()).setPartitionCount(Integer.valueOf(size)).build());
        try {
            createVeniceWriter.broadcastTopicSwitch(arrayList, pubSubTopic.getName(), Long.valueOf(j), Collections.emptyMap());
            if (createVeniceWriter != null) {
                createVeniceWriter.close();
            }
            LOGGER.info("Successfully sent TopicSwitch into '{}' instructing to switch to '{}' with a rewindStartTimestamp of {}.", pubSubTopic2, pubSubTopic, Long.valueOf(j));
        } catch (Throwable th) {
            if (createVeniceWriter != null) {
                try {
                    createVeniceWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    void ensurePreconditions(PubSubTopic pubSubTopic, PubSubTopic pubSubTopic2, Store store, Optional<HybridStoreConfig> optional) {
        if (!optional.isPresent()) {
            throw new VeniceException("Topic switching is only supported for Hybrid Stores.");
        }
        if (!getTopicManager().containsTopicAndAllPartitionsAreOnline(pubSubTopic)) {
            Optional version = store.getVersion(Version.parseVersionFromKafkaTopicName(pubSubTopic2.getName()));
            getTopicManager().createTopic(pubSubTopic, version.isPresent() ? ((Version) version.get()).getPartitionCount() : store.getPartitionCount(), pubSubTopic.isRealTime() ? this.kafkaReplicationFactorForRTTopics.intValue() : getTopicManager().getReplicationFactor(pubSubTopic2), TopicManager.getExpectedRetentionTimeInMs(store, optional.get()), false, pubSubTopic.isRealTime() ? this.minSyncReplicasForRTTopics : Optional.empty(), false);
            return;
        }
        long topicRetention = getTopicManager().getTopicRetention(pubSubTopic);
        long expectedRetentionTimeInMs = TopicManager.getExpectedRetentionTimeInMs(store, optional.get());
        if (topicRetention != expectedRetentionTimeInMs) {
            getTopicManager().updateTopicRetention(pubSubTopic, expectedRetentionTimeInMs);
        }
    }

    long getRewindStartTime(Version version, Optional<HybridStoreConfig> optional, long j) {
        if (version.isActiveActiveReplicationEnabled()) {
            return VeniceConstants.REWIND_TIME_DECIDED_BY_SERVER.longValue();
        }
        long rewindTimeInSeconds = optional.get().getRewindTimeInSeconds() * 1000;
        if (version.getDataRecoveryVersionConfig() != null) {
            rewindTimeInSeconds = Math.min(172800000L, rewindTimeInSeconds);
        }
        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$meta$BufferReplayPolicy[optional.get().getBufferReplayPolicy().ordinal()]) {
            case AdminTopicUtils.PARTITION_NUM_FOR_ADMIN_TOPIC /* 1 */:
                return j - rewindTimeInSeconds;
            case 2:
            default:
                return getTimer().getMilliseconds() - rewindTimeInSeconds;
        }
    }

    public void transmitVersionSwapMessage(Store store, int i, int i2) {
        if (i == 0 || i2 == 0) {
            return;
        }
        Version version = (Version) store.getVersion(i).orElseThrow(() -> {
            return new VeniceException("Corresponding version " + i + "does not exist for store: " + store.getName());
        });
        Version version2 = (Version) store.getVersion(i2).orElseThrow(() -> {
            return new VeniceException("Corresponding version " + i2 + "does not exist for store: " + store.getName());
        });
        if (hasViewConfigs(version2, version) && this.topicManager.containsTopic(this.pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(store.getName())))) {
            VeniceWriter createVeniceWriter = getVeniceWriterFactory().createVeniceWriter(new VeniceWriterOptions.Builder(Version.composeRealTimeTopic(store.getName())).setTime(getTimer()).setPartitionCount(Integer.valueOf(version.getPartitionCount())).build());
            try {
                createVeniceWriter.broadcastVersionSwap(version.kafkaTopicName(), version2.kafkaTopicName(), Collections.emptyMap());
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
                LOGGER.info("Successfully sent VersionTopicSwitch for store {} from version {} to version {}", store.getName(), Integer.valueOf(i), Integer.valueOf(i2));
            } catch (Throwable th) {
                if (createVeniceWriter != null) {
                    try {
                        createVeniceWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    public boolean hasViewConfigs(Version version, Version version2) {
        return ((version2.getViewConfigs() == null || version2.getViewConfigs().isEmpty()) && (version.getViewConfigs() == null || version.getViewConfigs().isEmpty())) ? false : true;
    }

    public void switchToRealTimeTopic(String str, String str2, Store store, String str3, List<String> list) {
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(str);
        PubSubTopic topic2 = this.pubSubTopicRepository.getTopic(str2);
        if (!topic.isRealTime()) {
            throw new IllegalArgumentException("The realTimeTopicName param is invalid: " + topic);
        }
        Version version = (Version) store.getVersion(Version.parseVersionFromKafkaTopicName(topic2.getName())).orElseThrow(() -> {
            return new VeniceException("Corresponding version does not exist for topic: " + topic2 + " in store: " + store.getName());
        });
        Optional<HybridStoreConfig> ofNullable = version.isUseVersionLevelHybridConfig() ? Optional.ofNullable(version.getHybridStoreConfig()) : Optional.ofNullable(store.getHybridStoreConfig());
        ensurePreconditions(topic, topic2, store, ofNullable);
        long rewindStartTime = getRewindStartTime(version, ofNullable, version.getCreatedTime());
        PubSubTopic topic3 = version.getPushType().isStreamReprocessing() ? this.pubSubTopicRepository.getTopic(Version.composeStreamReprocessingTopic(store.getName(), version.getNumber())) : topic2;
        ArrayList arrayList = new ArrayList(Math.max(1, list.size()));
        if (version.isActiveActiveReplicationEnabled()) {
            arrayList.addAll(list);
        } else if (version.isNativeReplicationEnabled() && (isAggregate(store) || isIncrementalPush(version))) {
            arrayList.add(str3);
        }
        LOGGER.info("Will send TopicSwitch into '{}' instructing to switch to '{}' with a rewindStartTimestamp of {}.", topic2, topic, Long.valueOf(rewindStartTime));
        sendTopicSwitch(topic, topic3, rewindStartTime, arrayList);
    }

    private static boolean isAggregate(Store store) {
        return store.getHybridStoreConfig().getDataReplicationPolicy() == DataReplicationPolicy.AGGREGATE;
    }

    private static boolean isIncrementalPush(Version version) {
        return version.isIncrementalPushEnabled();
    }

    TopicManager getTopicManager() {
        return this.topicManager;
    }

    Time getTimer() {
        return this.timer;
    }

    VeniceWriterFactory getVeniceWriterFactory() {
        return this.veniceWriterFactory;
    }
}
