package org.apache.flink.runtime.state.memory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.KvStateSnapshot;

/* loaded from: input_file:org/apache/flink/runtime/state/memory/MemReducingState.class */
public class MemReducingState<K, N, V> extends AbstractMemState<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> implements ReducingState<V> {
    private final ReduceFunction<V> reduceFunction;

    /* loaded from: input_file:org/apache/flink/runtime/state/memory/MemReducingState$Snapshot.class */
    public static class Snapshot<K, N, V> extends AbstractMemStateSnapshot<K, N, V, ReducingState<V>, ReducingStateDescriptor<V>> {
        private static final long serialVersionUID = 1;

        public Snapshot(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, TypeSerializer<V> typeSerializer3, ReducingStateDescriptor<V> reducingStateDescriptor, byte[] bArr) {
            super(typeSerializer, typeSerializer2, typeSerializer3, reducingStateDescriptor, bArr);
        }

        @Override // org.apache.flink.runtime.state.memory.AbstractMemStateSnapshot
        public KvState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, MemoryStateBackend> createMemState(HashMap<N, Map<K, V>> hashMap) {
            return new MemReducingState(this.keySerializer, this.namespaceSerializer, this.stateDesc, hashMap);
        }
    }

    public MemReducingState(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, ReducingStateDescriptor<V> reducingStateDescriptor) {
        super(typeSerializer, typeSerializer2, reducingStateDescriptor.getSerializer(), reducingStateDescriptor);
        this.reduceFunction = reducingStateDescriptor.getReduceFunction();
    }

    public MemReducingState(TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, ReducingStateDescriptor<V> reducingStateDescriptor, HashMap<N, Map<K, V>> hashMap) {
        super(typeSerializer, typeSerializer2, reducingStateDescriptor.getSerializer(), reducingStateDescriptor, hashMap);
        this.reduceFunction = reducingStateDescriptor.getReduceFunction();
    }

    public V get() {
        if (this.currentNSState == null) {
            this.currentNSState = (Map) this.state.get(this.currentNamespace);
        }
        if (this.currentNSState != null) {
            return (V) this.currentNSState.get(this.currentKey);
        }
        return null;
    }

    public void add(V v) throws IOException {
        if (this.currentKey == null) {
            throw new RuntimeException("No key available.");
        }
        if (this.currentNSState == null) {
            this.currentNSState = new HashMap();
            this.state.put(this.currentNamespace, this.currentNSState);
        }
        Object obj = this.currentNSState.get(this.currentKey);
        if (obj == null) {
            this.currentNSState.put(this.currentKey, v);
            return;
        }
        try {
            this.currentNSState.put(this.currentKey, this.reduceFunction.reduce(obj, v));
        } catch (Exception e) {
            throw new RuntimeException("Could not add value to reducing state.", e);
        }
    }

    @Override // org.apache.flink.runtime.state.memory.AbstractMemState
    public KvStateSnapshot<K, N, ReducingState<V>, ReducingStateDescriptor<V>, MemoryStateBackend> createHeapSnapshot(byte[] bArr) {
        return new Snapshot(getKeySerializer(), getNamespaceSerializer(), this.stateSerializer, this.stateDesc, bArr);
    }
}
