package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.class */
public class DedupingOperator<T> extends AbstractStreamOperator<WindowedValue<T>> implements OneInputStreamOperator<WindowedValue<ValueWithRecordId<T>>, WindowedValue<T>>, KeyGroupCheckpointedOperator {
    private static final long MAX_RETENTION_SINCE_ACCESS = Duration.standardMinutes(10).getMillis();
    private static final long MAX_CACHE_SIZE = 100000;
    private transient LoadingCache<Integer, LoadingCache<ByteBuffer, AtomicBoolean>> dedupingCache;
    private transient KeyedStateBackend<ByteBuffer> keyedStateBackend;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator$KeyGroupLoader.class */
    public static class KeyGroupLoader extends CacheLoader<Integer, LoadingCache<ByteBuffer, AtomicBoolean>> {
        private KeyGroupLoader() {
        }

        @Override // org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.cache.CacheLoader
        public LoadingCache<ByteBuffer, AtomicBoolean> load(Integer num) throws Exception {
            return CacheBuilder.newBuilder().expireAfterAccess(DedupingOperator.MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS).maximumSize(DedupingOperator.MAX_CACHE_SIZE).build(new TrueBooleanLoader());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator$TrueBooleanLoader.class */
    public static class TrueBooleanLoader extends CacheLoader<ByteBuffer, AtomicBoolean> {
        private TrueBooleanLoader() {
        }

        @Override // org.apache.beam.runners.flink.p0002.p00111.repackaged.com.google.common.cache.CacheLoader
        public AtomicBoolean load(ByteBuffer byteBuffer) throws Exception {
            return new AtomicBoolean(true);
        }
    }

    public void open() throws Exception {
        super.open();
        checkInitCache();
        this.keyedStateBackend = getKeyedStateBackend();
    }

    private void checkInitCache() {
        if (this.dedupingCache == null) {
            this.dedupingCache = CacheBuilder.newBuilder().build(new KeyGroupLoader());
        }
    }

    public void processElement(StreamRecord<WindowedValue<ValueWithRecordId<T>>> streamRecord) throws Exception {
        if (shouldOutput(this.keyedStateBackend.getCurrentKeyGroupIndex(), (ByteBuffer) this.keyedStateBackend.getCurrentKey())) {
            WindowedValue windowedValue = (WindowedValue) streamRecord.getValue();
            this.output.collect(streamRecord.replace(windowedValue.withValue(((ValueWithRecordId) windowedValue.getValue()).getValue())));
        }
    }

    private boolean shouldOutput(int i, ByteBuffer byteBuffer) throws ExecutionException {
        return this.dedupingCache.get(Integer.valueOf(i)).getUnchecked(byteBuffer).getAndSet(false);
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupRestoringOperator
    public void restoreKeyGroupState(int i, DataInputStream dataInputStream) throws Exception {
        checkInitCache();
        Integer num = (Integer) VarIntCoder.of().decode(dataInputStream, Coder.Context.NESTED);
        for (int i2 = 0; i2 < num.intValue(); i2++) {
            shouldOutput(i, ByteBuffer.wrap(ByteArrayCoder.of().decode(dataInputStream, Coder.Context.NESTED)));
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator
    public void snapshotKeyGroupState(int i, DataOutputStream dataOutputStream) throws Exception {
        Set<ByteBuffer> keySet = this.dedupingCache.get(Integer.valueOf(i)).asMap().keySet();
        VarIntCoder.of().encode(Integer.valueOf(keySet.size()), dataOutputStream, Coder.Context.NESTED);
        Iterator<ByteBuffer> it = keySet.iterator();
        while (it.hasNext()) {
            ByteArrayCoder.of().encode(it.next().array(), dataOutputStream, Coder.Context.NESTED);
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        if (getKeyedStateBackend() != null) {
            try {
                KeyedStateCheckpointOutputStream rawKeyedOperatorStateOutput = stateSnapshotContext.getRawKeyedOperatorStateOutput();
                try {
                    try {
                        Iterator it = rawKeyedOperatorStateOutput.getKeyGroupList().iterator();
                        while (it.hasNext()) {
                            int intValue = ((Integer) it.next()).intValue();
                            rawKeyedOperatorStateOutput.startNewKeyGroup(intValue);
                            snapshotKeyGroupState(intValue, new DataOutputViewStreamWrapper(rawKeyedOperatorStateOutput));
                        }
                        try {
                            rawKeyedOperatorStateOutput.close();
                        } catch (Exception e) {
                            LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", getOperatorName(), e);
                        }
                    } catch (Throwable th) {
                        try {
                            rawKeyedOperatorStateOutput.close();
                        } catch (Exception e2) {
                            LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", getOperatorName(), e2);
                        }
                        throw th;
                    }
                } catch (Exception e3) {
                    throw new Exception("Could not write timer service of " + getOperatorName() + " to checkpoint state stream.", e3);
                }
            } catch (Exception e4) {
                throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', e4);
            }
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        if (getKeyedStateBackend() != null) {
            KeyGroupRange keyGroupRange = getKeyedStateBackend().getKeyGroupRange();
            for (KeyGroupStatePartitionStreamProvider keyGroupStatePartitionStreamProvider : stateInitializationContext.getRawKeyedStateInputs()) {
                DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(keyGroupStatePartitionStreamProvider.getStream());
                int keyGroupId = keyGroupStatePartitionStreamProvider.getKeyGroupId();
                Preconditions.checkArgument(keyGroupRange.contains(keyGroupId), "Key Group " + keyGroupId + " does not belong to the local range.");
                restoreKeyGroupState(keyGroupId, dataInputViewStreamWrapper);
            }
        }
    }
}
