package org.apache.flink.runtime.state.heap;

import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.internal.InternalMergingState;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/AbstractHeapMergingState.class */
public abstract class AbstractHeapMergingState<K, N, IN, OUT, SV, S extends State, SD extends StateDescriptor<S, ?>> extends AbstractHeapState<K, N, SV, S, SD> implements InternalMergingState<N, IN, OUT> {
    private final AbstractHeapMergingState<K, N, IN, OUT, SV, S, SD>.MergeTransformation mergeTransformation;

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/AbstractHeapMergingState$MergeTransformation.class */
    final class MergeTransformation implements StateTransformationFunction<SV, SV> {
        MergeTransformation() {
        }

        @Override // org.apache.flink.runtime.state.StateTransformationFunction
        public SV apply(SV sv, SV sv2) throws Exception {
            return sv != null ? (SV) AbstractHeapMergingState.this.mergeState(sv, sv2) : sv2;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractHeapMergingState(SD sd, StateTable<K, N, SV> stateTable, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2) {
        super(sd, stateTable, typeSerializer, typeSerializer2);
        this.mergeTransformation = new MergeTransformation();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.internal.InternalMergingState
    public void mergeNamespaces(N n, Collection<N> collection) throws Exception {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        StateTable<K, N, SV> stateTable = this.stateTable;
        SV sv = null;
        Iterator<N> it = collection.iterator();
        while (it.hasNext()) {
            SV removeAndGetOld = stateTable.removeAndGetOld(it.next());
            if (sv != null && removeAndGetOld != null) {
                sv = mergeState(sv, removeAndGetOld);
            } else if (sv == null) {
                sv = removeAndGetOld;
            }
        }
        if (sv != null) {
            stateTable.transform(n, sv, this.mergeTransformation);
        }
    }

    protected abstract SV mergeState(SV sv, SV sv2) throws Exception;
}
