package org.infinispan.client.hotrod.impl.iteration;

import io.netty.channel.Channel;
import io.reactivex.rxjava3.core.Flowable;
import java.lang.invoke.MethodHandles;
import java.net.SocketAddress;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.impl.consistenthash.SegmentConsistentHash;
import org.infinispan.client.hotrod.impl.operations.IterationNextResponse;
import org.infinispan.client.hotrod.impl.operations.IterationStartResponse;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:org/infinispan/client/hotrod/impl/iteration/RemotePublisher.class */
public class RemotePublisher<K, E> implements Publisher<Map.Entry<K, E>> {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final OperationsFactory operationsFactory;
    private final String filterConverterFactory;
    private final byte[][] filterParams;
    private final IntSet segments;
    private final int batchSize;
    private final boolean metadata;
    private final DataFormat dataFormat;
    private final KeyTracker segmentKeyTracker;
    private final Set<SocketAddress> failedServers = ConcurrentHashMap.newKeySet();

    public RemotePublisher(OperationsFactory operationsFactory, String str, byte[][] bArr, Set<Integer> set, int i, boolean z, DataFormat dataFormat) {
        this.operationsFactory = operationsFactory;
        this.filterConverterFactory = str;
        this.filterParams = bArr;
        SegmentConsistentHash segmentConsistentHash = (SegmentConsistentHash) operationsFactory.getConsistentHash();
        if (set != null) {
            this.segments = IntSets.concurrentCopyFrom(IntSets.from(set), ((Integer) Collections.max(set)).intValue() + 1);
        } else if (segmentConsistentHash != null) {
            int numSegments = segmentConsistentHash.getNumSegments();
            this.segments = IntSets.concurrentSet(numSegments);
            for (int i2 = 0; i2 < numSegments; i2++) {
                this.segments.set(i2);
            }
        } else {
            this.segments = null;
        }
        this.batchSize = i;
        this.metadata = z;
        this.dataFormat = dataFormat;
        this.segmentKeyTracker = KeyTrackerFactory.create(dataFormat, segmentConsistentHash, operationsFactory.getTopologyId(), set);
    }

    public void subscribe(Subscriber<? super Map.Entry<K, E>> subscriber) {
        if (this.segments != null) {
            Flowable.just(this.segments).map(intSet -> {
                Map<SocketAddress, Set<Integer>> primarySegmentsByAddress = this.operationsFactory.getPrimarySegmentsByAddress();
                HashMap hashMap = new HashMap(primarySegmentsByAddress.size());
                for (Map.Entry<SocketAddress, Set<Integer>> entry : primarySegmentsByAddress.entrySet()) {
                    K key = entry.getKey();
                    if (this.failedServers.contains(key)) {
                        key = null;
                    }
                    IntSet intSet = null;
                    Iterator<Integer> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        int intValue = it.next().intValue();
                        if (intSet.contains(intValue)) {
                            if (intSet == null) {
                                intSet = IntSets.mutableEmptySet();
                            }
                            intSet.set(intValue);
                        }
                    }
                    if (intSet != null) {
                        hashMap.put(key, intSet);
                    }
                }
                if (hashMap.isEmpty()) {
                    hashMap.put(null, intSet);
                }
                return hashMap;
            }).flatMap(map -> {
                int size = (this.batchSize / map.size()) + 1;
                return Flowable.fromIterable(map.entrySet()).map(entry -> {
                    return new RemoteInnerPublisherHandler(this, size, () -> {
                        return null;
                    }, entry).startPublisher();
                }).flatMap(RxJavaInterop.identityFunction(), map.size());
            }).repeatUntil(() -> {
                log.tracef("Segments left to process are %s", this.segments);
                return this.segments.isEmpty();
            }).subscribe(subscriber);
        } else {
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            new RemoteInnerPublisherHandler<K, E>(this, this.batchSize, () -> {
                if (atomicBoolean.getAndSet(false)) {
                    return new AbstractMap.SimpleImmutableEntry(null, null);
                }
                return null;
            }, null) { // from class: org.infinispan.client.hotrod.impl.iteration.RemotePublisher.1
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.infinispan.client.hotrod.impl.iteration.RemoteInnerPublisherHandler
                public void handleThrowableInResponse(Throwable th, Map.Entry<SocketAddress, IntSet> entry) {
                    atomicBoolean.set(true);
                    super.handleThrowableInResponse(th, entry);
                }
            }.startPublisher().subscribe(subscriber);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void erroredServer(SocketAddress socketAddress) {
        this.failedServers.add(socketAddress);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletionStage<Void> sendCancel(byte[] bArr, Channel channel) {
        return this.operationsFactory.newIterationEndOperation(bArr, channel).execute().handle((iterationEndResponse, th) -> {
            if (th != null) {
                Log.HOTROD.ignoringErrorDuringIterationClose(iterationId(bArr), th);
                return null;
            }
            short status = iterationEndResponse.getStatus();
            if (HotRodConstants.isSuccess(status) && Log.HOTROD.isDebugEnabled()) {
                Log.HOTROD.iterationClosed(iterationId(bArr));
            }
            if (HotRodConstants.isInvalidIteration(status)) {
                throw Log.HOTROD.errorClosingIteration(iterationId(bArr));
            }
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String iterationId(byte[] bArr) {
        return new String(bArr, HotRodConstants.HOTROD_STRING_CHARSET);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeSegments(IntSet intSet) {
        if (this.segments != null) {
            this.segments.removeAll(intSet);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletionStage<IterationStartResponse> newIteratorStartOperation(SocketAddress socketAddress, IntSet intSet, int i) {
        return this.operationsFactory.newIterationStartOperation(this.filterConverterFactory, this.filterParams, intSet, i, this.metadata, this.dataFormat, socketAddress).execute();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletionStage<IterationNextResponse<K, E>> newIteratorNextOperation(byte[] bArr, Channel channel) {
        return this.operationsFactory.newIterationNextOperation(bArr, channel, this.segmentKeyTracker, this.dataFormat).execute();
    }
}
