package org.apache.flink.runtime.operators.coordination;

import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.class */
public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator {
    private final Provider provider;
    private QuiesceableContext quiesceableContext;
    private OperatorCoordinator coordinator;
    private boolean started;

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator$Provider.class */
    public static abstract class Provider implements OperatorCoordinator.Provider {
        private static final long serialVersionUID = 3002837631612629071L;
        private final OperatorID operatorID;

        public Provider(OperatorID operatorID) {
            this.operatorID = operatorID;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Provider
        public OperatorID getOperatorId() {
            return this.operatorID;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Provider
        public OperatorCoordinator create(OperatorCoordinator.Context context) {
            return new RecreateOnResetOperatorCoordinator(new QuiesceableContext(context), this);
        }

        protected abstract OperatorCoordinator getCoordinator(OperatorCoordinator.Context context);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator$QuiesceableContext.class */
    public static class QuiesceableContext implements OperatorCoordinator.Context {
        private final OperatorCoordinator.Context context;
        private volatile boolean quiesced = false;

        QuiesceableContext(OperatorCoordinator.Context context) {
            this.context = context;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public OperatorID getOperatorId() {
            return this.context.getOperatorId();
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public synchronized CompletableFuture<Acknowledge> sendEvent(OperatorEvent operatorEvent, int i) throws TaskNotRunningException {
            return this.quiesced ? CompletableFuture.completedFuture(Acknowledge.get()) : this.context.sendEvent(operatorEvent, i);
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public synchronized void failJob(Throwable th) {
            if (this.quiesced) {
                return;
            }
            this.context.failJob(th);
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public int currentParallelism() {
            return this.context.currentParallelism();
        }

        @VisibleForTesting
        synchronized void quiesce() {
            this.quiesced = true;
        }

        @VisibleForTesting
        boolean isQuiesced() {
            return this.quiesced;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public OperatorCoordinator.Context getContext() {
            return this.context;
        }
    }

    private RecreateOnResetOperatorCoordinator(QuiesceableContext quiesceableContext, Provider provider) {
        this.quiesceableContext = quiesceableContext;
        this.provider = provider;
        this.coordinator = provider.getCoordinator(quiesceableContext);
        this.started = false;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void start() throws Exception {
        this.coordinator.start();
        this.started = true;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, java.lang.AutoCloseable
    public void close() throws Exception {
        this.coordinator.close();
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void handleEventFromOperator(int i, OperatorEvent operatorEvent) throws Exception {
        this.coordinator.handleEventFromOperator(i, operatorEvent);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void subtaskFailed(int i, @Nullable Throwable th) {
        this.coordinator.subtaskFailed(i, th);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
        this.coordinator.checkpointCoordinator(j, completableFuture);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void checkpointComplete(long j) {
        this.coordinator.checkpointComplete(j);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void resetToCheckpoint(byte[] bArr) throws Exception {
        this.quiesceableContext.quiesce();
        this.coordinator.close();
        this.quiesceableContext = new QuiesceableContext(this.quiesceableContext.getContext());
        this.coordinator = this.provider.getCoordinator(this.quiesceableContext);
        this.coordinator.resetToCheckpoint(bArr);
        if (this.started) {
            this.coordinator.start();
        }
    }

    @VisibleForTesting
    public OperatorCoordinator getInternalCoordinator() {
        return this.coordinator;
    }

    @VisibleForTesting
    QuiesceableContext getQuiesceableContext() {
        return this.quiesceableContext;
    }
}
