package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.OperatorBackendSerializationProxy;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackend.class */
public class DefaultOperatorStateBackend implements OperatorStateBackend {
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
    private final ClassLoader userClassloader;
    private final CloseableRegistry closeStreamOnCancelRegistry = new CloseableRegistry();
    private final JavaSerializer<Serializable> javaSerializer = new JavaSerializer<>();
    private final Map<String, PartitionableListState<?>> registeredStates = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackend$PartitionableListState.class */
    public static final class PartitionableListState<S> implements ListState<S> {
        private final String name;
        private final TypeSerializer<S> partitionStateSerializer;
        private final OperatorStateHandle.Mode assignmentMode;
        private final List<S> internalList = new ArrayList();

        public PartitionableListState(String str, TypeSerializer<S> typeSerializer, OperatorStateHandle.Mode mode) {
            this.name = (String) Preconditions.checkNotNull(str);
            this.partitionStateSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
            this.assignmentMode = (OperatorStateHandle.Mode) Preconditions.checkNotNull(mode);
        }

        public String getName() {
            return this.name;
        }

        public OperatorStateHandle.Mode getAssignmentMode() {
            return this.assignmentMode;
        }

        public TypeSerializer<S> getPartitionStateSerializer() {
            return this.partitionStateSerializer;
        }

        public List<S> getInternalList() {
            return this.internalList;
        }

        public void clear() {
            this.internalList.clear();
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Iterable<S> m611get() {
            return this.internalList;
        }

        public void add(S s) {
            this.internalList.add(s);
        }

        public String toString() {
            return "PartitionableListState{name='" + this.name + "', assignmentMode=" + this.assignmentMode + ", internalList=" + this.internalList + '}';
        }

        public long[] write(FSDataOutputStream fSDataOutputStream) throws IOException {
            long[] jArr = new long[this.internalList.size()];
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(fSDataOutputStream);
            for (int i = 0; i < this.internalList.size(); i++) {
                S s = this.internalList.get(i);
                jArr[i] = fSDataOutputStream.getPos();
                this.partitionStateSerializer.serialize(s, dataOutputViewStreamWrapper);
            }
            return jArr;
        }
    }

    public DefaultOperatorStateBackend(ClassLoader classLoader) throws IOException {
        this.userClassloader = (ClassLoader) Preconditions.checkNotNull(classLoader);
    }

    public Set<String> getRegisteredStateNames() {
        return this.registeredStates.keySet();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closeStreamOnCancelRegistry.close();
    }

    @Override // org.apache.flink.runtime.state.OperatorStateBackend
    public void dispose() {
        this.registeredStates.clear();
    }

    public <T extends Serializable> ListState<T> getSerializableListState(String str) throws Exception {
        return getOperatorState(new ListStateDescriptor(str, this.javaSerializer));
    }

    public <S> ListState<S> getOperatorState(ListStateDescriptor<S> listStateDescriptor) throws IOException {
        return getOperatorState(listStateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    public <T extends Serializable> ListState<T> getBroadcastSerializableListState(String str) throws Exception {
        return getBroadcastOperatorState(new ListStateDescriptor(str, this.javaSerializer));
    }

    public <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return getOperatorState(listStateDescriptor, OperatorStateHandle.Mode.BROADCAST);
    }

    private <S> ListState<S> getOperatorState(ListStateDescriptor<S> listStateDescriptor, OperatorStateHandle.Mode mode) throws IOException {
        Preconditions.checkNotNull(listStateDescriptor);
        String str = (String) Preconditions.checkNotNull(listStateDescriptor.getName());
        TypeSerializer typeSerializer = (TypeSerializer) Preconditions.checkNotNull(listStateDescriptor.getSerializer());
        PartitionableListState<?> partitionableListState = this.registeredStates.get(str);
        if (null == partitionableListState) {
            partitionableListState = new PartitionableListState<>(str, typeSerializer, mode);
            this.registeredStates.put(str, partitionableListState);
        } else {
            Preconditions.checkState(partitionableListState.getAssignmentMode().equals(mode), "Incompatible assignment mode. Provided: " + mode + ", expected: " + partitionableListState.getAssignmentMode());
            Preconditions.checkState(partitionableListState.getPartitionStateSerializer().isCompatibleWith(listStateDescriptor.getSerializer()), "Incompatible type serializers. Provided: " + listStateDescriptor.getSerializer() + ", found: " + partitionableListState.getPartitionStateSerializer());
        }
        return partitionableListState;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <S> void deserializeStateValues(PartitionableListState<S> partitionableListState, FSDataInputStream fSDataInputStream, OperatorStateHandle.StateMetaInfo stateMetaInfo) throws IOException {
        long[] offsets;
        if (null == stateMetaInfo || null == (offsets = stateMetaInfo.getOffsets())) {
            return;
        }
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
        TypeSerializer partitionStateSerializer = partitionableListState.getPartitionStateSerializer();
        for (long j : offsets) {
            fSDataInputStream.seek(j);
            partitionableListState.add(partitionStateSerializer.deserialize(dataInputViewStreamWrapper));
        }
    }

    /* JADX WARN: Type inference failed for: r0v12, types: [java.io.OutputStream, org.apache.flink.core.fs.FSDataOutputStream, java.io.Closeable, org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream] */
    @Override // org.apache.flink.runtime.state.Snapshotable
    public RunnableFuture<OperatorStateHandle> snapshot(long j, long j2, CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        if (this.registeredStates.isEmpty()) {
            return DoneFuture.nullValue();
        }
        ArrayList arrayList = new ArrayList(this.registeredStates.size());
        Iterator<Map.Entry<String, PartitionableListState<?>>> it = this.registeredStates.entrySet().iterator();
        while (it.hasNext()) {
            PartitionableListState<?> value = it.next().getValue();
            arrayList.add(new OperatorBackendSerializationProxy.StateMetaInfo(value.getName(), value.getPartitionStateSerializer(), value.getAssignmentMode()));
        }
        HashMap hashMap = new HashMap(this.registeredStates.size());
        ?? createCheckpointStateOutputStream = checkpointStreamFactory.createCheckpointStateOutputStream(j, j2);
        try {
            this.closeStreamOnCancelRegistry.registerClosable((Closeable) createCheckpointStateOutputStream);
            DataOutputView dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper((OutputStream) createCheckpointStateOutputStream);
            new OperatorBackendSerializationProxy(arrayList).write(dataOutputViewStreamWrapper);
            dataOutputViewStreamWrapper.writeInt(this.registeredStates.size());
            for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
                PartitionableListState<?> value2 = entry.getValue();
                hashMap.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo(value2.write(createCheckpointStateOutputStream), value2.getAssignmentMode()));
            }
            DoneFuture doneFuture = new DoneFuture(new OperatorStateHandle(hashMap, createCheckpointStateOutputStream.closeAndGetHandle()));
            this.closeStreamOnCancelRegistry.unregisterClosable((Closeable) createCheckpointStateOutputStream);
            createCheckpointStateOutputStream.close();
            return doneFuture;
        } catch (Throwable th) {
            this.closeStreamOnCancelRegistry.unregisterClosable((Closeable) createCheckpointStateOutputStream);
            createCheckpointStateOutputStream.close();
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public void restore(Collection<OperatorStateHandle> collection) throws Exception {
        if (null == collection) {
            return;
        }
        for (OperatorStateHandle operatorStateHandle : collection) {
            if (operatorStateHandle != null) {
                FSDataInputStream openInputStream = operatorStateHandle.openInputStream();
                this.closeStreamOnCancelRegistry.registerClosable(openInputStream);
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                try {
                    Thread.currentThread().setContextClassLoader(this.userClassloader);
                    OperatorBackendSerializationProxy operatorBackendSerializationProxy = new OperatorBackendSerializationProxy(this.userClassloader);
                    operatorBackendSerializationProxy.read(new DataInputViewStreamWrapper(openInputStream));
                    for (OperatorBackendSerializationProxy.StateMetaInfo<?> stateMetaInfo : operatorBackendSerializationProxy.getNamedStateSerializationProxies()) {
                        PartitionableListState<?> partitionableListState = this.registeredStates.get(stateMetaInfo.getName());
                        if (null == partitionableListState) {
                            PartitionableListState<?> partitionableListState2 = new PartitionableListState<>(stateMetaInfo.getName(), stateMetaInfo.getStateSerializer(), stateMetaInfo.getMode());
                            this.registeredStates.put(partitionableListState2.getName(), partitionableListState2);
                        } else {
                            Preconditions.checkState(partitionableListState.getPartitionStateSerializer().isCompatibleWith(stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " + partitionableListState.getPartitionStateSerializer() + " is not compatible with " + stateMetaInfo.getStateSerializer());
                        }
                    }
                    for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                        PartitionableListState<?> partitionableListState3 = this.registeredStates.get(entry.getKey());
                        Preconditions.checkState(null != partitionableListState3, "Found state without corresponding meta info: " + entry.getKey());
                        deserializeStateValues(partitionableListState3, openInputStream, entry.getValue());
                    }
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    this.closeStreamOnCancelRegistry.unregisterClosable(openInputStream);
                    IOUtils.closeQuietly(openInputStream);
                } catch (Throwable th) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    this.closeStreamOnCancelRegistry.unregisterClosable(openInputStream);
                    IOUtils.closeQuietly(openInputStream);
                    throw th;
                }
            }
        }
    }
}
