package com.linkedin.davinci.helix;

import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.ingestion.VeniceIngestionBackend;
import com.linkedin.davinci.kafka.consumer.StoreIngestionService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
import com.linkedin.venice.helix.HelixState;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushmonitor.HybridStoreQuotaStatus;
import com.linkedin.venice.utils.Timer;
import com.linkedin.venice.utils.Utils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/helix/AbstractPartitionStateModel.class */
public abstract class AbstractPartitionStateModel extends StateModel {
    protected final Logger logger = LogManager.getLogger(getClass());
    private static final String STORE_PARTITION_DESCRIPTION_FORMAT = "%s-%d";
    private static final int RETRY_COUNT = 5;
    private static final int RETRY_DURATION_MS = 1000;
    private static final int WAIT_PARTITION_ACCESSOR_TIME_OUT_MS = (int) TimeUnit.MINUTES.toMillis(5);
    private final VeniceIngestionBackend ingestionBackend;
    private final ReadOnlyStoreRepository storeRepository;
    private final VeniceStoreVersionConfig storeConfig;
    private final int partition;
    private final String storePartitionDescription;
    private final CompletableFuture<HelixPartitionStatusAccessor> partitionStatusAccessorFuture;
    private final String instanceName;
    private HelixPartitionStatusAccessor partitionPushStatusAccessor;

    public AbstractPartitionStateModel(VeniceIngestionBackend veniceIngestionBackend, ReadOnlyStoreRepository readOnlyStoreRepository, VeniceStoreVersionConfig veniceStoreVersionConfig, int i, CompletableFuture<HelixPartitionStatusAccessor> completableFuture, String str) {
        this.ingestionBackend = veniceIngestionBackend;
        this.storeRepository = readOnlyStoreRepository;
        this.storeConfig = veniceStoreVersionConfig;
        this.partition = i;
        this.storePartitionDescription = String.format(STORE_PARTITION_DESCRIPTION_FORMAT, veniceStoreVersionConfig.getStoreVersionName(), Integer.valueOf(i));
        this.partitionStatusAccessorFuture = completableFuture;
        this.instanceName = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void executeStateTransition(Message message, NotificationContext notificationContext, Runnable runnable) {
        executeStateTransition(message, notificationContext, runnable, false);
    }

    protected void executeStateTransition(Message message, NotificationContext notificationContext, Runnable runnable, boolean z) {
        String fromState = message.getFromState();
        String toState = message.getToState();
        logEntry(fromState, toState, message, notificationContext, z);
        Thread.currentThread().setName("Helix-ST-" + message.getResourceName() + "-" + this.partition + "-" + fromState + "->" + toState);
        try {
            runnable.run();
            logCompletion(fromState, toState, message, notificationContext, z);
            Thread.currentThread().setName("Inactive ST thread.");
        } catch (Throwable th) {
            Thread.currentThread().setName("Inactive ST thread.");
            throw th;
        }
    }

    private void logEntry(String str, String str2, Message message, NotificationContext notificationContext, boolean z) {
        this.logger.info("{} {} transition from {} to {} for resource: {}, partition: {} invoked with message {} and context {}", getStorePartitionDescription(), z ? "rolling back" : "initiating", str, str2, getStoreConfig().getStoreVersionName(), Integer.valueOf(this.partition), message, notificationContext);
    }

    private void logCompletion(String str, String str2, Message message, NotificationContext notificationContext, boolean z) {
        this.logger.info("{} {} transition from {} to {} for resource: {}, partition: {} invoked with message {} and context {}", getStorePartitionDescription(), z ? "rolled back" : "completed", str, str2, getStoreConfig().getStoreVersionName(), Integer.valueOf(this.partition), message, notificationContext);
    }

    @Override // org.apache.helix.participant.statemachine.StateModel
    public void rollbackOnError(Message message, NotificationContext notificationContext, StateTransitionError stateTransitionError) {
        executeStateTransition(message, notificationContext, () -> {
            this.logger.info("{} met an error during state transition. Stop the running consumption.", getStorePartitionDescription(), stateTransitionError.getException());
            stopConsumption();
        }, true);
    }

    @Override // org.apache.helix.participant.statemachine.StateModel
    @Transition(to = HelixState.DROPPED_STATE, from = "ERROR")
    public void onBecomeDroppedFromError(Message message, NotificationContext notificationContext) {
        executeStateTransition(message, notificationContext, () -> {
            try {
                removePartitionFromStoreGracefully();
            } catch (Exception e) {
                this.logger.error("Encountered exception during the transition from ERROR to DROPPED.", (Throwable) e);
            }
        });
    }

    @Override // org.apache.helix.participant.statemachine.StateModel
    public void reset() {
        try {
            stopConsumption();
        } catch (Exception e) {
            this.logger.error("Error when trying to stop any ongoing consumption during reset for: {}", this.storePartitionDescription, e);
        }
        try {
            waitPartitionPushStatusAccessor();
            if (this.partitionPushStatusAccessor != null) {
                this.partitionPushStatusAccessor.updateReplicaStatus(this.storeConfig.getStoreVersionName(), this.partition, ExecutionStatus.STARTED);
                this.partitionPushStatusAccessor.updateHybridQuotaReplicaStatus(this.storeConfig.getStoreVersionName(), this.partition, HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED);
            }
        } catch (Exception e2) {
            throw new VeniceException("Error when initializing partition push status accessor, reset failed. ", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setupNewStorePartition() {
        try {
            Timer run = Timer.run(d -> {
                if (this.storeConfig.isDebugLoggingEnabled()) {
                    this.logger.info("Completed waiting for partition push status accessor for resource {}, partition {}. Total elapsed time: {} ms", this.storeConfig.getStoreVersionName(), Integer.valueOf(this.partition), d);
                }
            });
            try {
                waitPartitionPushStatusAccessor();
                initializePartitionPushStatus();
                if (run != null) {
                    run.close();
                }
                Timer run2 = Timer.run(d2 -> {
                    if (this.storeConfig.isDebugLoggingEnabled()) {
                        this.logger.info("Completed starting the consumption for resource {} partition {}. Total elapsed time: {} ms", this.storeConfig.getStoreVersionName(), Integer.valueOf(this.partition), d2);
                    }
                });
                try {
                    this.ingestionBackend.startConsumption(this.storeConfig, this.partition);
                    if (run2 != null) {
                        run2.close();
                    }
                } catch (Throwable th) {
                    if (run2 != null) {
                        try {
                            run2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (run != null) {
                    try {
                        run.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Exception e) {
            throw new VeniceException("Error when initializing partition push status accessor, will not start ingestion for store partition. ", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void removePartitionFromStoreGracefully() {
        this.ingestionBackend.dropStoragePartitionGracefully(this.storeConfig, this.partition, getStoreConfig().getPartitionGracefulDropDelaySeconds());
        removeCustomizedState();
    }

    protected void removeCustomizedState() {
        try {
            waitPartitionPushStatusAccessor();
            if (this.partitionPushStatusAccessor != null) {
                String storeVersionName = getStoreConfig().getStoreVersionName();
                boolean z = false;
                int i = 0;
                while (!z && i <= 5) {
                    i++;
                    if (i > 1) {
                        this.logger.info("Wait {} ms to retry.", (Object) 1000);
                        Utils.sleep(1000L);
                        this.logger.info("Attempt #{} in removing customized state for store: {}, partition: {}, on instance: {}", Integer.valueOf(i), storeVersionName, Integer.valueOf(this.partition), this.instanceName);
                    }
                    try {
                        this.partitionPushStatusAccessor.deleteReplicaStatus(storeVersionName, this.partition);
                        z = true;
                    } catch (Exception e) {
                        this.logger.error("Error in removing customized state for store: {}, partition: {}, on instance: {}", storeVersionName, Integer.valueOf(this.partition), this.instanceName, e);
                    }
                }
                if (z) {
                    return;
                }
                String format = String.format("Error: After attempting %s times, removing customized state for store: %s, partition: %s, on instance: %s", Integer.valueOf(i), storeVersionName, Integer.valueOf(this.partition), this.instanceName);
                this.logger.error(format);
                throw new VeniceException(format);
            }
        } catch (Exception e2) {
            throw new VeniceException("Error when initializing partition push status accessor, failed to remove customized state. ", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitConsumptionCompleted(String str, StateModelIngestionProgressNotifier stateModelIngestionProgressNotifier) {
        int i;
        try {
            try {
                i = getStoreRepo().getStoreOrThrow(Version.parseStoreFromKafkaTopicName(str)).getBootstrapToOnlineTimeoutInHours();
            } catch (Exception e) {
                this.logger.warn("Failed to fetch bootstrapToOnlineTimeoutInHours from store config for resource {}, using the default value of {} hours instead", (Object) str, (Object) 24);
                i = 24;
            }
            stateModelIngestionProgressNotifier.waitConsumptionCompleted(str, this.partition, i, getStoreIngestionService());
        } catch (InterruptedException e2) {
            String str2 = "Can not complete consumption for resource:" + str + " partition:" + this.partition;
            this.logger.error(str2, (Throwable) e2);
            throw new VeniceException(str2, e2);
        }
    }

    private void waitPartitionPushStatusAccessor() throws Exception {
        if (this.partitionPushStatusAccessor == null) {
            this.partitionPushStatusAccessor = this.partitionStatusAccessorFuture.get(WAIT_PARTITION_ACCESSOR_TIME_OUT_MS, TimeUnit.MILLISECONDS);
        }
    }

    private void initializePartitionPushStatus() {
        if (this.partitionPushStatusAccessor == null) {
            throw new VeniceException("HelixPartitionStatusAccessor is expected not to be null.");
        }
        this.partitionPushStatusAccessor.updateReplicaStatus(this.storeConfig.getStoreVersionName(), getPartition(), ExecutionStatus.NOT_STARTED);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopConsumption() {
        this.ingestionBackend.stopConsumption(this.storeConfig, this.partition);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public VeniceIngestionBackend getIngestionBackend() {
        return this.ingestionBackend;
    }

    protected StoreIngestionService getStoreIngestionService() {
        return this.ingestionBackend.getStoreIngestionService();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ReadOnlyStoreRepository getStoreRepo() {
        return this.storeRepository;
    }

    protected StorageService getStorageService() {
        return this.ingestionBackend.getStorageService();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public VeniceStoreVersionConfig getStoreConfig() {
        return this.storeConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getPartition() {
        return this.partition;
    }

    public String getStorePartitionDescription() {
        return this.storePartitionDescription;
    }
}
