package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.meta.ClusterInfoProvider;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.participant.protocol.KillPushJob;
import com.linkedin.venice.participant.protocol.ParticipantMessageKey;
import com.linkedin.venice.participant.protocol.ParticipantMessageValue;
import com.linkedin.venice.participant.protocol.enums.ParticipantMessageType;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/ParticipantStoreConsumptionTask.class */
public class ParticipantStoreConsumptionTask implements Runnable, Closeable {
    private static final String CLIENT_STATS_PREFIX = "venice-client";
    private final ParticipantStoreConsumptionStats stats;
    private final StoreIngestionService storeIngestionService;
    private final long participantMessageConsumptionDelayMs;
    private final ClusterInfoProvider clusterInfoProvider;
    private final ClientConfig<ParticipantMessageValue> clientConfig;
    private final ICProvider icProvider;
    private static final Logger LOGGER = LogManager.getLogger(ParticipantStoreConsumptionTask.class);
    private static final RedundantExceptionFilter EXCEPTION_FILTER = RedundantExceptionFilter.getRedundantExceptionFilter();
    private final AtomicBoolean isClosing = new AtomicBoolean();
    private final Map<String, AvroSpecificStoreClient<ParticipantMessageKey, ParticipantMessageValue>> clientMap = new VeniceConcurrentHashMap();

    public ParticipantStoreConsumptionTask(StoreIngestionService storeIngestionService, ClusterInfoProvider clusterInfoProvider, ParticipantStoreConsumptionStats participantStoreConsumptionStats, ClientConfig<ParticipantMessageValue> clientConfig, long j, ICProvider iCProvider) {
        this.stats = (ParticipantStoreConsumptionStats) Validate.notNull(participantStoreConsumptionStats);
        this.storeIngestionService = (StoreIngestionService) Validate.notNull(storeIngestionService);
        this.clusterInfoProvider = (ClusterInfoProvider) Validate.notNull(clusterInfoProvider);
        this.clientConfig = (ClientConfig) Validate.notNull(clientConfig);
        this.participantMessageConsumptionDelayMs = j;
        this.icProvider = iCProvider;
    }

    @Override // java.lang.Runnable
    public void run() {
        ParticipantMessageKey participantMessageKey;
        String veniceCluster;
        LOGGER.info("Started running {}", getClass().getSimpleName());
        while (!this.isClosing.get() && !Thread.currentThread().isInterrupted()) {
            this.stats.recordHeartbeat();
            try {
                Thread.sleep(this.participantMessageConsumptionDelayMs);
                for (String str : this.storeIngestionService.getIngestingTopicsWithVersionStatusNotOnline()) {
                    try {
                        participantMessageKey = new ParticipantMessageKey();
                        participantMessageKey.messageType = ParticipantMessageType.KILL_PUSH_JOB.getValue();
                        participantMessageKey.resourceName = str;
                        veniceCluster = this.clusterInfoProvider.getVeniceCluster(Version.parseStoreFromKafkaTopicName(str));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        LOGGER.info("Got an InterruptedException while killing consumption task for topic: {}", str, e);
                        throw e;
                        break;
                    } catch (Exception e2) {
                        String str2 = "Unexpected exception while trying to check or kill ingestion topic: " + str + ". ExMsg: " + e2.getMessage();
                        if (!EXCEPTION_FILTER.isRedundantException(str2)) {
                            LOGGER.error(str2, e2);
                        }
                        this.stats.recordKillPushJobFailedConsumption();
                    }
                    if (veniceCluster != null) {
                        ParticipantMessageValue participantMessageValue = this.icProvider != null ? (ParticipantMessageValue) ((CompletableFuture) this.icProvider.call(getClass().getCanonicalName(), () -> {
                            return getParticipantStoreClient(veniceCluster).get(participantMessageKey);
                        })).get() : (ParticipantMessageValue) getParticipantStoreClient(veniceCluster).get(participantMessageKey).get();
                        if (participantMessageValue != null && participantMessageValue.messageType == ParticipantMessageType.KILL_PUSH_JOB.getValue()) {
                            KillPushJob killPushJob = (KillPushJob) participantMessageValue.messageUnion;
                            if (this.storeIngestionService.killConsumptionTask(str)) {
                                this.stats.recordKilledPushJobs();
                                this.stats.recordKillPushJobLatency(Long.max(0L, System.currentTimeMillis() - killPushJob.timestamp));
                            }
                        }
                    }
                }
            } catch (InterruptedException e3) {
                LOGGER.info("ParticipantStoreConsumptionTask was interrupted and hence exiting now...", e3);
            } catch (Exception e4) {
                String str3 = "Exception thrown while running " + getClass().getSimpleName() + " thread. ExMsg: " + ExceptionUtils.compactExceptionDescription(e4);
                if (!EXCEPTION_FILTER.isRedundantException(str3)) {
                    LOGGER.error(str3, e4);
                }
                this.stats.recordKillPushJobFailedConsumption();
            } catch (Throwable th) {
                LOGGER.error("Throwable thrown while running {} thread", getClass().getSimpleName(), th);
            }
        }
        LOGGER.info("Stopped running {}", getClass().getSimpleName());
    }

    private AvroSpecificStoreClient<ParticipantMessageKey, ParticipantMessageValue> getParticipantStoreClient(String str) {
        try {
            this.clientMap.computeIfAbsent(str, str2 -> {
                return ClientFactory.getAndStartSpecificAvroClient(ClientConfig.cloneConfig(this.clientConfig).setStoreName(VeniceSystemStoreUtils.getParticipantStoreNameForCluster(str)).setSpecificValueClass(ParticipantMessageValue.class).setStatsPrefix(CLIENT_STATS_PREFIX));
            });
        } catch (Exception e) {
            this.stats.recordFailedInitialization();
            LOGGER.error("Failed to get participant client for cluster: {}", str, e);
        }
        return this.clientMap.get(str);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.isClosing.set(true);
        this.clientMap.values().forEach((v0) -> {
            v0.close();
        });
    }
}
