package org.apache.flink.runtime.state.heap.async;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/async/AsyncHeapKeyedStateBackend.class */
public class AsyncHeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncHeapKeyedStateBackend.class);
    private final HashMap<String, StateTable<K, ?, ?>> stateTables;

    public AsyncHeapKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, int i, KeyGroupRange keyGroupRange) {
        super(taskKvStateRegistry, typeSerializer, classLoader, i, keyGroupRange);
        this.stateTables = new HashMap<>();
        LOG.info("Initializing heap keyed state backend with stream factory.");
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> typeSerializer, StateDescriptor<?, V> stateDescriptor) {
        return tryRegisterStateTable(stateDescriptor.getName(), stateDescriptor.getType(), typeSerializer, stateDescriptor.getSerializer());
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(String str, StateDescriptor.Type type, TypeSerializer<N> typeSerializer, TypeSerializer<V> typeSerializer2) {
        RegisteredBackendStateMetaInfo<N, V> registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo<>(type, str, typeSerializer, typeSerializer2);
        StateTable<K, N, V> stateTable = this.stateTables.get(str);
        if (stateTable == null) {
            stateTable = newStateTable(registeredBackendStateMetaInfo);
            this.stateTables.put(str, stateTable);
        } else {
            if (!registeredBackendStateMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
                throw new RuntimeException("Trying to access state using incompatible meta info, was " + stateTable.getMetaInfo() + " trying access with " + registeredBackendStateMetaInfo);
            }
            stateTable.setMetaInfo(registeredBackendStateMetaInfo);
        }
        return stateTable;
    }

    private boolean hasRegisteredState() {
        return !this.stateTables.isEmpty();
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, V> ValueState<V> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<V> valueStateDescriptor) throws Exception {
        return new HeapValueState(valueStateDescriptor, tryRegisterStateTable(typeSerializer, valueStateDescriptor), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T> ListState<T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        return new HeapListState(listStateDescriptor, tryRegisterStateTable(listStateDescriptor.getName(), listStateDescriptor.getType(), typeSerializer, new ArrayListSerializer(listStateDescriptor.getSerializer())), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
        return new HeapReducingState(reducingStateDescriptor, tryRegisterStateTable(typeSerializer, reducingStateDescriptor), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.AbstractKeyedStateBackend
    public <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
        return new HeapFoldingState(foldingStateDescriptor, tryRegisterStateTable(typeSerializer, foldingStateDescriptor), this.keySerializer, typeSerializer);
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public RunnableFuture<KeyGroupsStateHandle> snapshot(final long j, final long j2, final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        if (!hasRegisteredState()) {
            return DoneFuture.nullValue();
        }
        long currentTimeMillis = System.currentTimeMillis();
        Preconditions.checkState(this.stateTables.size() <= 32767, "Too many KV-States: " + this.stateTables.size() + ". Currently at most 32767 states are supported");
        ArrayList arrayList = new ArrayList(this.stateTables.size());
        final HashMap hashMap = new HashMap(this.stateTables.size());
        final HashedMap hashedMap = new HashedMap(this.stateTables.size());
        for (Map.Entry<String, StateTable<K, ?, ?>> entry : this.stateTables.entrySet()) {
            RegisteredBackendStateMetaInfo<?, ?> metaInfo = entry.getValue().getMetaInfo();
            arrayList.add(new KeyedBackendSerializationProxy.StateMetaInfo(metaInfo.getStateType(), metaInfo.getName(), metaInfo.getNamespaceSerializer(), metaInfo.getStateSerializer()));
            hashMap.put(entry.getKey(), Integer.valueOf(hashMap.size()));
            StateTable<K, ?, ?> value = entry.getValue();
            if (null != value) {
                hashedMap.put(value, value.createSnapshot());
            }
        }
        final KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(this.keySerializer, arrayList);
        AsyncStoppableTaskWithCallback from = AsyncStoppableTaskWithCallback.from(new AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() { // from class: org.apache.flink.runtime.state.heap.async.AsyncHeapKeyedStateBackend.1
            AtomicBoolean open = new AtomicBoolean(false);

            /* JADX WARN: Type inference failed for: r0v6, types: [java.io.Closeable, org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream] */
            @Override // org.apache.flink.runtime.io.async.AbstractAsyncIOCallable
            public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
                if (!this.open.compareAndSet(false, true)) {
                    throw new IOException("Operation already opened.");
                }
                ?? createCheckpointStateOutputStream = checkpointStreamFactory.createCheckpointStateOutputStream(j, j2);
                try {
                    AsyncHeapKeyedStateBackend.this.cancelStreamRegistry.registerClosable((Closeable) createCheckpointStateOutputStream);
                    return createCheckpointStateOutputStream;
                } catch (Exception e) {
                    this.open.set(false);
                    throw e;
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            /* JADX WARN: Type inference failed for: r0v3, types: [java.io.OutputStream, org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream] */
            @Override // org.apache.flink.runtime.io.async.AbstractAsyncIOCallable
            public KeyGroupsStateHandle performOperation() throws Exception {
                long currentTimeMillis2 = System.currentTimeMillis();
                CheckpointStreamFactory.CheckpointStateOutputStream ioHandle = getIoHandle();
                DataOutputView dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(ioHandle);
                keyedBackendSerializationProxy.write(dataOutputViewStreamWrapper);
                long[] jArr = new long[AsyncHeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups()];
                for (int i = 0; i < AsyncHeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups(); i++) {
                    int keyGroupId = AsyncHeapKeyedStateBackend.this.keyGroupRange.getKeyGroupId(i);
                    jArr[i] = ioHandle.getPos();
                    dataOutputViewStreamWrapper.writeInt(keyGroupId);
                    for (Map.Entry entry2 : AsyncHeapKeyedStateBackend.this.stateTables.entrySet()) {
                        dataOutputViewStreamWrapper.writeShort(((Integer) hashMap.get(entry2.getKey())).intValue());
                        ((StateTableSnapshot) hashedMap.get(entry2.getValue())).writeMappingsInKeyGroup(dataOutputViewStreamWrapper, keyGroupId);
                    }
                }
                if (!this.open.compareAndSet(true, false)) {
                    throw new IOException("Checkpoint stream already closed.");
                }
                KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(new KeyGroupRangeOffsets(AsyncHeapKeyedStateBackend.this.keyGroupRange, jArr), ioHandle.closeAndGetHandle());
                AsyncHeapKeyedStateBackend.LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{checkpointStreamFactory, Thread.currentThread(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2)});
                return keyGroupsStateHandle;
            }

            @Override // org.apache.flink.runtime.io.async.AbstractAsyncIOCallable, org.apache.flink.runtime.io.async.AsyncDoneCallback
            public void done(boolean z) {
                FSDataOutputStream fSDataOutputStream;
                if (this.open.compareAndSet(true, false) && null != (fSDataOutputStream = (CheckpointStreamFactory.CheckpointStateOutputStream) getIoHandle())) {
                    AsyncHeapKeyedStateBackend.this.cancelStreamRegistry.unregisterClosable(fSDataOutputStream);
                    IOUtils.closeQuietly(fSDataOutputStream);
                }
                Iterator it = hashedMap.values().iterator();
                while (it.hasNext()) {
                    ((StateTableSnapshot) it.next()).release();
                }
            }
        });
        LOG.info("Heap backend snapshot (" + checkpointStreamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - currentTimeMillis) + " ms.");
        return from;
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public void restore(Collection<KeyGroupsStateHandle> collection) throws Exception {
        LOG.info("Initializing heap keyed state backend from snapshot.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", collection);
        }
        if (MigrationUtil.isOldSavepointKeyedState(collection)) {
            throw new UnsupportedOperationException("This async.HeapKeyedStateBackend does not support restore from old savepoints.");
        }
        restorePartitionedState(collection);
    }

    private void restorePartitionedState(Collection<KeyGroupsStateHandle> collection) throws Exception {
        HashMap hashMap = new HashMap();
        int i = 0;
        this.stateTables.clear();
        for (KeyGroupsStateHandle keyGroupsStateHandle : collection) {
            if (keyGroupsStateHandle != null) {
                FSDataInputStream openInputStream = keyGroupsStateHandle.openInputStream();
                this.cancelStreamRegistry.registerClosable(openInputStream);
                try {
                    DataInputView dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
                    KeyedBackendSerializationProxy keyedBackendSerializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                    keyedBackendSerializationProxy.read(dataInputViewStreamWrapper);
                    List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> namedStateSerializationProxies = keyedBackendSerializationProxy.getNamedStateSerializationProxies();
                    for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfo : namedStateSerializationProxies) {
                        if (null == this.stateTables.get(stateMetaInfo.getStateName())) {
                            this.stateTables.put(stateMetaInfo.getStateName(), newStateTable(new RegisteredBackendStateMetaInfo<>(stateMetaInfo)));
                            hashMap.put(Integer.valueOf(i), stateMetaInfo.getStateName());
                            i++;
                        }
                    }
                    Iterator<Tuple2<Integer, Long>> it = keyGroupsStateHandle.getGroupRangeOffsets().iterator();
                    while (it.hasNext()) {
                        Tuple2<Integer, Long> next = it.next();
                        int intValue = ((Integer) next.f0).intValue();
                        openInputStream.seek(((Long) next.f1).longValue());
                        Preconditions.checkState(dataInputViewStreamWrapper.readInt() == intValue, "Unexpected key-group in restore.");
                        for (int i2 = 0; i2 < namedStateSerializationProxies.size(); i2++) {
                            StateTableByKeyGroupReaders.readerForVersion(this.stateTables.get(hashMap.get(Integer.valueOf(dataInputViewStreamWrapper.readShort()))), 2).readMappingsInKeyGroup(dataInputViewStreamWrapper, intValue);
                        }
                    }
                } finally {
                    this.cancelStreamRegistry.unregisterClosable(openInputStream);
                    IOUtils.closeQuietly(openInputStream);
                }
            }
        }
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @VisibleForTesting
    public int numStateEntries() {
        int i = 0;
        Iterator<StateTable<K, ?, ?>> it = this.stateTables.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    @VisibleForTesting
    public int numStateEntries(Object obj) {
        int i = 0;
        Iterator<StateTable<K, ?, ?>> it = this.stateTables.values().iterator();
        while (it.hasNext()) {
            i += it.next().sizeOfNamespace(obj);
        }
        return i;
    }

    private <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> registeredBackendStateMetaInfo) {
        return new CopyOnWriteStateTable(this, registeredBackendStateMetaInfo);
    }
}
