package com.datastax.bdp.gcore.netmsg;

import com.datastax.bdp.graph.api.exception.DsegException;
import com.datastax.dse.byos.shade.com.google.common.base.MoreObjects;
import com.datastax.dse.byos.shade.com.google.common.base.Objects;
import com.datastax.dse.byos.shade.com.google.common.base.Preconditions;
import com.datastax.dse.byos.shade.com.google.common.collect.AbstractIterator;
import com.datastax.dse.byos.shade.com.google.common.collect.ImmutableList;
import com.datastax.dse.byos.shade.com.google.common.util.concurrent.AbstractFuture;
import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import org.apache.cassandra.dht.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/bdp/gcore/netmsg/RetryingResultSet.class */
public class RetryingResultSet<Q, A, R, X> extends AbstractFuture<Collection<R>> implements ResultSet<R> {
    private static final Logger logger = LoggerFactory.getLogger(RetryingResultSet.class);
    private final BasicMessenger basicMessenger;
    private final String graphName;
    private final ConversationType<Q, A, R> conversationType;
    private final Function<X, Token> segmentToToken;
    private final Function<Collection<X>, Q> requestBuilder;
    private final CountDownLatch completionLatch;
    private final int retryCount;
    private final boolean failFast;
    private final Set<X> pendingSegments;
    private final ConcurrentLinkedQueue<R> results;
    private final Set<WeakReference<BlockingQueue<RetryingResultSet<Q, A, R, X>.Event>>> consumers;
    private final ReentrantReadWriteLock productionLock;
    private final Queue<Throwable> exceptions;
    private final ExecutorService executorService;

    /* loaded from: input_file:com/datastax/bdp/gcore/netmsg/RetryingResultSet$BlockingIterator.class */
    private class BlockingIterator extends AbstractIterator<R> {
        BlockingQueue<RetryingResultSet<Q, A, R, X>.Event> queue = new LinkedBlockingQueue();
        private boolean completed;

        public BlockingIterator() {
            RetryingResultSet.this.registerConsumer(this.queue);
            this.completed = false;
        }

        @Override // com.datastax.dse.byos.shade.com.google.common.collect.AbstractIterator
        protected R computeNext() {
            try {
                if (this.completed) {
                    return endOfData();
                }
                RetryingResultSet<Q, A, R, X>.Event take = this.queue.take();
                if (null == ((Event) take).throwable && null == ((Event) take).result) {
                    this.completed = true;
                    return endOfData();
                }
                if (null == ((Event) take).throwable) {
                    RetryingResultSet.logger.debug("{}: computed next result: {}", this, ((Event) take).result);
                    return (R) ((Event) take).result;
                }
                if (((Event) take).throwable instanceof RuntimeException) {
                    throw ((RuntimeException) ((Event) take).throwable);
                }
                throw new DsegException(((Event) take).throwable);
            } catch (InterruptedException e) {
                throw new DsegException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/bdp/gcore/netmsg/RetryingResultSet$Event.class */
    public class Event {
        private final Throwable throwable;
        private final R result;

        public Event() {
            this.throwable = null;
            this.result = null;
        }

        public Event(Throwable th) {
            this.throwable = th;
            this.result = null;
        }

        public Event(R r) {
            this.throwable = null;
            this.result = r;
        }
    }

    /* loaded from: input_file:com/datastax/bdp/gcore/netmsg/RetryingResultSet$ExceptionIterator.class */
    private class ExceptionIterator implements Iterator<R> {
        private Throwable t;

        public ExceptionIterator(Throwable th) {
            this.t = th;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return null != this.t;
        }

        @Override // java.util.Iterator
        public R next() {
            if (null == this.t) {
                throw new NoSuchElementException();
            }
            if (this.t instanceof RuntimeException) {
                throw ((RuntimeException) this.t);
            }
            throw new DsegException(this.t);
        }
    }

    private RetryingResultSet(BasicMessenger basicMessenger, String str, ConversationType<Q, A, R> conversationType, Iterable<X> iterable, Function<X, Token> function, Function<Collection<X>, Q> function2, int i, boolean z) {
        this.basicMessenger = basicMessenger;
        this.graphName = str;
        this.conversationType = conversationType;
        Set<X> newSetFromMap = Collections.newSetFromMap(new ConcurrentHashMap());
        Iterator<X> it2 = iterable.iterator();
        while (it2.hasNext()) {
            newSetFromMap.add(it2.next());
        }
        this.results = new ConcurrentLinkedQueue<>();
        this.exceptions = new ConcurrentLinkedQueue();
        this.segmentToToken = function;
        this.requestBuilder = function2;
        this.completionLatch = new CountDownLatch(newSetFromMap.size());
        this.consumers = Collections.newSetFromMap(new ConcurrentHashMap());
        this.productionLock = new ReentrantReadWriteLock();
        this.pendingSegments = newSetFromMap;
        this.retryCount = i;
        this.failFast = z;
        this.executorService = Executors.newSingleThreadExecutor();
        logger.debug("Constructed {}", this);
    }

    public static <Q, A, R, X> RetryingResultSet<Q, A, R, X> of(BasicMessenger basicMessenger, List<AnnotatedFuture<X, A>> list, String str, ConversationType<Q, A, R> conversationType, Iterable<X> iterable, Function<X, Token> function, Function<Collection<X>, Q> function2, int i, boolean z) {
        RetryingResultSet<Q, A, R, X> retryingResultSet = new RetryingResultSet<>(basicMessenger, str, conversationType, iterable, function, function2, i, z);
        retryingResultSet.prepareRetries(list, 1);
        return retryingResultSet;
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("graphName", this.graphName).add("retryCount", this.retryCount).add("failFast", this.failFast).add("pendingSegmentsCount", this.pendingSegments.size()).toString();
    }

    private void prepareRetries(List<AnnotatedFuture<X, A>> list, int i) {
        for (AnnotatedFuture<X, A> annotatedFuture : list) {
            annotatedFuture.whenCompleteAsync((obj, th) -> {
                RemoteQueryException remoteQueryException = null;
                if (null == th) {
                    try {
                        completeSegmentResults(annotatedFuture.getSegments(), this.conversationType.getResultBuilder().apply(obj));
                        return;
                    } catch (RemoteQueryException e) {
                        remoteQueryException = e;
                    }
                }
                if (i < this.retryCount) {
                    retry(annotatedFuture.getSegments(), i + 1);
                } else {
                    Preconditions.checkState((null == th && null == remoteQueryException) ? false : true);
                    failSegments(annotatedFuture.getSegments(), (Throwable) Objects.firstNonNull(th, remoteQueryException));
                }
            }, (Executor) this.executorService);
        }
    }

    private void failSegments(Iterable<X> iterable, Throwable th) {
        produceThrowable(iterable, th);
    }

    private void completeSegmentResults(Iterable<X> iterable, Collection<R> collection) {
        produceResults(iterable, collection);
    }

    private void produceThrowable(Iterable<X> iterable, Throwable th) {
        this.productionLock.writeLock().lock();
        try {
            this.exceptions.add(th);
            Iterator<WeakReference<BlockingQueue<RetryingResultSet<Q, A, R, X>.Event>>> it2 = this.consumers.iterator();
            while (it2.hasNext()) {
                BlockingQueue<RetryingResultSet<Q, A, R, X>.Event> blockingQueue = it2.next().get();
                if (null != blockingQueue) {
                    blockingQueue.add(new Event(th));
                }
            }
            retireSegments(iterable);
            this.productionLock.writeLock().unlock();
        } catch (Throwable th2) {
            this.productionLock.writeLock().unlock();
            throw th2;
        }
    }

    private void produceResults(Iterable<X> iterable, Collection<R> collection) {
        this.productionLock.writeLock().lock();
        if (null != collection) {
            try {
                this.results.addAll(collection);
                Iterator<R> it2 = collection.iterator();
                while (it2.hasNext()) {
                    RetryingResultSet<Q, A, R, X>.Event event = new Event(it2.next());
                    Iterator<WeakReference<BlockingQueue<RetryingResultSet<Q, A, R, X>.Event>>> it3 = this.consumers.iterator();
                    while (it3.hasNext()) {
                        BlockingQueue<RetryingResultSet<Q, A, R, X>.Event> blockingQueue = it3.next().get();
                        if (null != blockingQueue) {
                            blockingQueue.add(event);
                        }
                    }
                }
            } catch (Throwable th) {
                this.productionLock.writeLock().unlock();
                throw th;
            }
        }
        retireSegments(iterable);
        this.productionLock.writeLock().unlock();
    }

    private void retireSegments(Iterable<X> iterable) {
        for (X x : iterable) {
            logger.debug("{}: retiring segment: {}", this, x);
            Preconditions.checkState(this.pendingSegments.contains(x));
            this.pendingSegments.remove(x);
            this.completionLatch.countDown();
            if (0 == this.completionLatch.getCount()) {
                logger.debug("{}: all segments retired, completing internal future", this);
                RetryingResultSet<Q, A, R, X>.Event event = new Event();
                Iterator<WeakReference<BlockingQueue<RetryingResultSet<Q, A, R, X>.Event>>> it2 = this.consumers.iterator();
                while (it2.hasNext()) {
                    BlockingQueue<RetryingResultSet<Q, A, R, X>.Event> blockingQueue = it2.next().get();
                    if (null != blockingQueue) {
                        blockingQueue.add(event);
                    }
                }
                this.consumers.clear();
                if (this.exceptions.isEmpty()) {
                    Preconditions.checkState(set(ImmutableList.copyOf((Collection) this.results)));
                    logger.debug("{}: set results ({} elements)", this, Integer.valueOf(this.results.size()));
                } else {
                    Preconditions.checkState(setException(this.exceptions.peek()));
                    logger.debug("{}: set exception ({})", this, this.exceptions.peek().getMessage());
                }
            }
        }
    }

    private void retry(Iterable<X> iterable, int i) {
        prepareRetries(this.basicMessenger.sendTargeted(this.graphName, this.conversationType, iterable, this.segmentToToken, this.requestBuilder), i);
    }

    @Override // com.datastax.dse.byos.shade.com.google.common.util.concurrent.AbstractFuture, java.util.concurrent.Future, com.datastax.bdp.gcore.netmsg.ResultSet
    public boolean isDone() {
        return 0 == this.completionLatch.getCount();
    }

    @Override // com.datastax.bdp.gcore.netmsg.ResultSet
    public void await() throws InterruptedException {
        this.completionLatch.await();
    }

    @Override // com.datastax.bdp.gcore.netmsg.ResultSet
    public void await(long j, TimeUnit timeUnit) throws InterruptedException {
        this.completionLatch.await(j, timeUnit);
    }

    @Override // java.lang.Iterable
    public Iterator<R> iterator() {
        if (this.exceptions.isEmpty() || !this.failFast) {
            logger.debug("{}: returning BlockingIterator", this);
            return new BlockingIterator();
        }
        logger.debug("{}: returning ExceptionIterator ({})", this, this.exceptions.element().getMessage());
        return new ExceptionIterator(this.exceptions.element());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerConsumer(BlockingQueue<RetryingResultSet<Q, A, R, X>.Event> blockingQueue) {
        this.productionLock.readLock().lock();
        try {
            if (this.failFast) {
                Iterator<Throwable> it2 = this.exceptions.iterator();
                while (it2.hasNext()) {
                    blockingQueue.add(new Event(it2.next()));
                }
            }
            Iterator<R> it3 = this.results.iterator();
            while (it3.hasNext()) {
                blockingQueue.add(new Event(it3.next()));
            }
            if (!this.failFast) {
                Iterator<Throwable> it4 = this.exceptions.iterator();
                while (it4.hasNext()) {
                    blockingQueue.add(new Event(it4.next()));
                }
            }
            if (isDone()) {
                blockingQueue.add(new Event());
            }
            this.consumers.add(new WeakReference<>(blockingQueue));
            this.productionLock.readLock().unlock();
        } catch (Throwable th) {
            this.productionLock.readLock().unlock();
            throw th;
        }
    }
}
