package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
import org.apache.flink.migration.runtime.state.memory.MigrationRestoreSnapshot;
import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
import org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.HashMapSerializer;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.class */
public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
    private final HashMap<String, StateTable<K, ?, ?>> stateTables;
    private final boolean asynchronousSnapshots;

    public HeapKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, int i, KeyGroupRange keyGroupRange, boolean z, ExecutionConfig executionConfig) {
        super(taskKvStateRegistry, typeSerializer, classLoader, i, keyGroupRange, executionConfig);
        this.stateTables = new HashMap<>();
        this.asynchronousSnapshots = z;
        LOG.info("Initializing heap keyed state backend with stream factory.");
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> typeSerializer, StateDescriptor<?, V> stateDescriptor) {
        return tryRegisterStateTable(stateDescriptor.getName(), stateDescriptor.getType(), typeSerializer, stateDescriptor.getSerializer());
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(String str, StateDescriptor.Type type, TypeSerializer<N> typeSerializer, TypeSerializer<V> typeSerializer2) {
        RegisteredKeyedBackendStateMetaInfo<N, V> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(type, str, typeSerializer, typeSerializer2);
        StateTable<K, N, V> stateTable = this.stateTables.get(str);
        if (stateTable == null) {
            stateTable = newStateTable(registeredKeyedBackendStateMetaInfo);
            this.stateTables.put(str, stateTable);
        } else {
            Preconditions.checkState(str.equals(stateTable.getMetaInfo().getName()), "Incompatible state names. Was [" + stateTable.getMetaInfo().getName() + "], registered with [" + registeredKeyedBackendStateMetaInfo.getName() + "].");
            if (!registeredKeyedBackendStateMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
                Preconditions.checkState(registeredKeyedBackendStateMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()), "Incompatible state types. Was [" + stateTable.getMetaInfo().getStateType() + "], registered with [" + registeredKeyedBackendStateMetaInfo.getStateType() + "].");
            }
            stateTable.setMetaInfo(registeredKeyedBackendStateMetaInfo);
        }
        return stateTable;
    }

    private boolean hasRegisteredState() {
        return !this.stateTables.isEmpty();
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, V> InternalValueState<N, V> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<V> valueStateDescriptor) throws Exception {
        return new HeapValueState(valueStateDescriptor, tryRegisterStateTable(typeSerializer, valueStateDescriptor), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T> InternalListState<N, T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        return new HeapListState(listStateDescriptor, tryRegisterStateTable(listStateDescriptor.getName(), listStateDescriptor.getType(), typeSerializer, new ArrayListSerializer(listStateDescriptor.getElementSerializer())), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T> InternalReducingState<N, T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
        return new HeapReducingState(reducingStateDescriptor, tryRegisterStateTable(typeSerializer, reducingStateDescriptor), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(TypeSerializer<N> typeSerializer, AggregatingStateDescriptor<T, ACC, R> aggregatingStateDescriptor) throws Exception {
        return new HeapAggregatingState(aggregatingStateDescriptor, tryRegisterStateTable(typeSerializer, aggregatingStateDescriptor), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
        return new HeapFoldingState(foldingStateDescriptor, tryRegisterStateTable(typeSerializer, foldingStateDescriptor), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> typeSerializer, MapStateDescriptor<UK, UV> mapStateDescriptor) throws Exception {
        return new HeapMapState(mapStateDescriptor, tryRegisterStateTable(mapStateDescriptor.getName(), mapStateDescriptor.getType(), typeSerializer, new HashMapSerializer(mapStateDescriptor.getKeySerializer(), mapStateDescriptor.getValueSerializer())), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public RunnableFuture<KeyedStateHandle> snapshot(long j, long j2, CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
        if (!hasRegisteredState()) {
            return DoneFuture.nullValue();
        }
        long currentTimeMillis = System.currentTimeMillis();
        Preconditions.checkState(this.stateTables.size() <= 32767, "Too many KV-States: " + this.stateTables.size() + ". Currently at most 32767 states are supported");
        ArrayList arrayList = new ArrayList(this.stateTables.size());
        final HashMap hashMap = new HashMap(this.stateTables.size());
        final HashedMap hashedMap = new HashedMap(this.stateTables.size());
        for (Map.Entry<String, StateTable<K, ?, ?>> entry : this.stateTables.entrySet()) {
            arrayList.add(entry.getValue().getMetaInfo().snapshot());
            hashMap.put(entry.getKey(), Integer.valueOf(hashMap.size()));
            StateTable<K, ?, ?> value = entry.getValue();
            if (null != value) {
                hashedMap.put(value, value.createSnapshot());
            }
        }
        final KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(this.keySerializer, arrayList);
        AsyncStoppableTaskWithCallback from = AsyncStoppableTaskWithCallback.from(new AbstractAsyncSnapshotIOCallable<KeyedStateHandle>(j, j2, checkpointStreamFactory, this.cancelStreamRegistry) { // from class: org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.1
            /* JADX WARN: Type inference failed for: r0v3, types: [java.io.OutputStream, org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream] */
            @Override // org.apache.flink.runtime.io.async.AbstractAsyncIOCallable
            public KeyGroupsStateHandle performOperation() throws Exception {
                long currentTimeMillis2 = System.currentTimeMillis();
                CheckpointStreamFactory.CheckpointStateOutputStream ioHandle = getIoHandle();
                DataOutputView dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(ioHandle);
                keyedBackendSerializationProxy.write(dataOutputViewStreamWrapper);
                long[] jArr = new long[HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups()];
                for (int i = 0; i < HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups(); i++) {
                    int keyGroupId = HeapKeyedStateBackend.this.keyGroupRange.getKeyGroupId(i);
                    jArr[i] = ioHandle.getPos();
                    dataOutputViewStreamWrapper.writeInt(keyGroupId);
                    for (Map.Entry entry2 : HeapKeyedStateBackend.this.stateTables.entrySet()) {
                        dataOutputViewStreamWrapper.writeShort(((Integer) hashMap.get(entry2.getKey())).intValue());
                        ((StateTableSnapshot) hashedMap.get(entry2.getValue())).writeMappingsInKeyGroup(dataOutputViewStreamWrapper, keyGroupId);
                    }
                }
                StreamStateHandle closeStreamAndGetStateHandle = closeStreamAndGetStateHandle();
                if (HeapKeyedStateBackend.this.asynchronousSnapshots) {
                    HeapKeyedStateBackend.LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{this.streamFactory, Thread.currentThread(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2)});
                }
                if (closeStreamAndGetStateHandle == null) {
                    return null;
                }
                return new KeyGroupsStateHandle(new KeyGroupRangeOffsets(HeapKeyedStateBackend.this.keyGroupRange, jArr), closeStreamAndGetStateHandle);
            }
        });
        if (!this.asynchronousSnapshots) {
            from.run();
        }
        LOG.info("Heap backend snapshot (" + checkpointStreamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
        return from;
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public void restore(Collection<KeyedStateHandle> collection) throws Exception {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        LOG.info("Initializing heap keyed state backend from snapshot.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", collection);
        }
        if (MigrationUtil.isOldSavepointKeyedState(collection)) {
            restoreOldSavepointKeyedState(collection);
        } else {
            restorePartitionedState(collection);
        }
    }

    private void restorePartitionedState(Collection<KeyedStateHandle> collection) throws Exception {
        HashMap hashMap = new HashMap();
        int i = 0;
        this.stateTables.clear();
        boolean z = false;
        for (KeyedStateHandle keyedStateHandle : collection) {
            if (keyedStateHandle != null) {
                if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                    throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
                }
                KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
                FSDataInputStream openInputStream = keyGroupsStateHandle.openInputStream();
                this.cancelStreamRegistry.registerClosable(openInputStream);
                try {
                    DataInputView dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
                    KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                    keyedBackendSerializationProxy.read(dataInputViewStreamWrapper);
                    if (!z) {
                        if (CompatibilityUtil.resolveCompatibilityResult(keyedBackendSerializationProxy.getKeySerializer(), UnloadableDummyTypeSerializer.class, keyedBackendSerializationProxy.getKeySerializerConfigSnapshot(), this.keySerializer).isRequiresMigration()) {
                            throw new IllegalStateException("The new key serializer is not compatible to read previous keys. Aborting now since state migration is currently not available");
                        }
                        z = true;
                    }
                    List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = keyedBackendSerializationProxy.getStateMetaInfoSnapshots();
                    for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> snapshot : stateMetaInfoSnapshots) {
                        if (snapshot.getStateSerializer() == null || (snapshot.getStateSerializer() instanceof UnloadableDummyTypeSerializer)) {
                            throw new IOException("Unable to restore keyed state [" + snapshot.getName() + "]. For memory-backed keyed state, the previous serializer of the keyed state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                        }
                        if (null == this.stateTables.get(snapshot.getName())) {
                            this.stateTables.put(snapshot.getName(), newStateTable(new RegisteredKeyedBackendStateMetaInfo<>(snapshot.getStateType(), snapshot.getName(), snapshot.getNamespaceSerializer(), snapshot.getStateSerializer())));
                            hashMap.put(Integer.valueOf(i), snapshot.getName());
                            i++;
                        }
                    }
                    Iterator<Tuple2<Integer, Long>> it = keyGroupsStateHandle.getGroupRangeOffsets().iterator();
                    while (it.hasNext()) {
                        Tuple2<Integer, Long> next = it.next();
                        int intValue = ((Integer) next.f0).intValue();
                        long longValue = ((Long) next.f1).longValue();
                        Preconditions.checkState(this.keyGroupRange.contains(intValue), "The key group must belong to the backend.");
                        openInputStream.seek(longValue);
                        Preconditions.checkState(dataInputViewStreamWrapper.readInt() == intValue, "Unexpected key-group in restore.");
                        for (int i2 = 0; i2 < stateMetaInfoSnapshots.size(); i2++) {
                            StateTableByKeyGroupReaders.readerForVersion(this.stateTables.get(hashMap.get(Integer.valueOf(dataInputViewStreamWrapper.readShort()))), keyedBackendSerializationProxy.getReadVersion()).readMappingsInKeyGroup(dataInputViewStreamWrapper, intValue);
                        }
                    }
                } finally {
                    this.cancelStreamRegistry.unregisterClosable(openInputStream);
                    IOUtils.closeQuietly(openInputStream);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.state.CheckpointListener
    public void notifyCheckpointComplete(long j) {
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @Deprecated
    private void restoreOldSavepointKeyedState(Collection<KeyedStateHandle> collection) throws IOException, ClassNotFoundException {
        if (collection.isEmpty()) {
            return;
        }
        Preconditions.checkState(1 == collection.size(), "Only one element expected here.");
        KeyedStateHandle next = collection.iterator().next();
        if (!(next instanceof MigrationKeyGroupStateHandle)) {
            throw new IllegalStateException("Unexpected state handle type, expected: " + MigrationKeyGroupStateHandle.class + ", but found " + next.getClass());
        }
        FSDataInputStream openInputStream = ((MigrationKeyGroupStateHandle) next).openInputStream();
        Throwable th = null;
        try {
            try {
                HashMap hashMap = (HashMap) InstantiationUtil.deserializeObject(openInputStream, this.userCodeClassLoader);
                if (openInputStream != null) {
                    if (0 != 0) {
                        try {
                            openInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        openInputStream.close();
                    }
                }
                for (Map.Entry entry : hashMap.entrySet()) {
                    String str = (String) entry.getKey();
                    KvStateSnapshot kvStateSnapshot = (KvStateSnapshot) entry.getValue();
                    if (!(kvStateSnapshot instanceof MigrationRestoreSnapshot)) {
                        throw new IllegalStateException("Unknown state: " + kvStateSnapshot);
                    }
                    this.stateTables.put(str, ((MigrationRestoreSnapshot) kvStateSnapshot).deserialize(str, this));
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (openInputStream != null) {
                if (th != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th3;
        }
    }

    @VisibleForTesting
    public int numStateEntries() {
        int i = 0;
        Iterator<StateTable<K, ?, ?>> it = this.stateTables.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    @VisibleForTesting
    public int numStateEntries(Object obj) {
        int i = 0;
        Iterator<StateTable<K, ?, ?>> it = this.stateTables.values().iterator();
        while (it.hasNext()) {
            i += it.next().sizeOfNamespace(obj);
        }
        return i;
    }

    public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> registeredKeyedBackendStateMetaInfo) {
        return this.asynchronousSnapshots ? new CopyOnWriteStateTable(this, registeredKeyedBackendStateMetaInfo) : new NestedMapsStateTable(this, registeredKeyedBackendStateMetaInfo);
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public boolean supportsAsynchronousSnapshots() {
        return this.asynchronousSnapshots;
    }
}
