package org.infinispan.persistence.manager;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.functions.Function;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.StampedLock;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.jcip.annotations.GuardedBy;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.IllegalLifecycleStateException;
import org.infinispan.commons.io.ByteBufferFactory;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.ByRef;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.StoreConfiguration;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.expiration.impl.InternalExpirationManager;
import org.infinispan.factories.InterceptorChainFactory;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.interceptors.impl.CacheLoaderInterceptor;
import org.infinispan.interceptors.impl.CacheWriterInterceptor;
import org.infinispan.interceptors.impl.TransactionalStoreInterceptor;
import org.infinispan.marshall.persistence.PersistenceMarshaller;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.persistence.InitializationContextImpl;
import org.infinispan.persistence.async.AsyncNonBlockingStore;
import org.infinispan.persistence.internal.PersistenceUtil;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.LocalOnlyCacheLoader;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.persistence.spi.StoreUnavailableException;
import org.infinispan.persistence.support.DelegatingNonBlockingStore;
import org.infinispan.persistence.support.NonBlockingStoreAdapter;
import org.infinispan.persistence.support.SegmentPublisherWrapper;
import org.infinispan.persistence.support.SingleSegmentPublisher;
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.NonBlockingManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

@Scope(Scopes.NAMED_CACHE)
/* loaded from: input_file:org/infinispan/persistence/manager/PersistenceManagerImpl.class */
public class PersistenceManagerImpl implements PersistenceManager {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());

    @Inject
    Configuration configuration;

    @Inject
    GlobalConfiguration globalConfiguration;

    @Inject
    ComponentRef<AdvancedCache<Object, Object>> cache;

    @Inject
    KeyPartitioner keyPartitioner;

    @Inject
    TimeService timeService;

    @ComponentName(KnownComponentNames.PERSISTENCE_MARSHALLER)
    @Inject
    PersistenceMarshaller persistenceMarshaller;

    @Inject
    ByteBufferFactory byteBufferFactory;

    @Inject
    CacheNotifier<Object, Object> cacheNotifier;

    @Inject
    InternalEntryFactory internalEntryFactory;

    @Inject
    MarshallableEntryFactory<?, ?> marshallableEntryFactory;

    @ComponentName(KnownComponentNames.NON_BLOCKING_EXECUTOR)
    @Inject
    Executor nonBlockingExecutor;

    @Inject
    BlockingManager blockingManager;

    @Inject
    NonBlockingManager nonBlockingManager;

    @Inject
    ComponentRef<InternalExpirationManager<Object, Object>> expirationManager;

    @Inject
    DistributionManager distributionManager;

    @Inject
    InterceptorChainFactory interceptorChainFactory;
    private volatile boolean enabled;
    private volatile boolean clearOnStop;
    private volatile AutoCloseable availabilityTask;
    private volatile String unavailableExceptionMessage;
    private boolean isInvalidationCache;
    private boolean allSegmentedOrShared;
    private int segmentCount;
    private final StampedLock lock = new StampedLock();

    @GuardedBy("lock")
    private List<StoreStatus> stores = null;
    private final List<PersistenceManager.StoreChangeListener> listeners = new CopyOnWriteArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/persistence/manager/PersistenceManagerImpl$HandleFlowables.class */
    public interface HandleFlowables<K, V> {
        CompletionStage<Void> handleFlowables(NonBlockingStore<K, V> nonBlockingStore, int i, Flowable<NonBlockingStore.SegmentedPublisher<Object>> flowable, Flowable<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> flowable2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/persistence/manager/PersistenceManagerImpl$StoreStatus.class */
    public static class StoreStatus {
        final NonBlockingStore<?, ?> store;
        final StoreConfiguration config;
        final Set<NonBlockingStore.Characteristic> characteristics;
        boolean availability = true;

        StoreStatus(NonBlockingStore<?, ?> nonBlockingStore, StoreConfiguration storeConfiguration, Set<NonBlockingStore.Characteristic> set) {
            this.store = nonBlockingStore;
            this.config = storeConfiguration;
            this.characteristics = set;
        }

        <K, V> NonBlockingStore<K, V> store() {
            return (NonBlockingStore<K, V>) this.store;
        }

        private boolean hasCharacteristic(NonBlockingStore.Characteristic characteristic) {
            return this.characteristics.contains(characteristic);
        }
    }

    private <K, V> NonBlockingStore<K, V> getStore(Predicate<StoreStatus> predicate) {
        long tryOptimisticRead = this.lock.tryOptimisticRead();
        NonBlockingStore<K, V> storeLocked = getStoreLocked(predicate);
        if (!this.lock.validate(tryOptimisticRead)) {
            long acquireReadLock = acquireReadLock();
            try {
                storeLocked = getStoreLocked(predicate);
                releaseReadLock(acquireReadLock);
            } catch (Throwable th) {
                releaseReadLock(acquireReadLock);
                throw th;
            }
        }
        return storeLocked;
    }

    @GuardedBy("lock#readLock")
    private <K, V> NonBlockingStore<K, V> getStoreLocked(Predicate<StoreStatus> predicate) {
        if (this.stores == null) {
            return null;
        }
        for (StoreStatus storeStatus : this.stores) {
            if (predicate.test(storeStatus)) {
                return storeStatus.store();
            }
        }
        return null;
    }

    @GuardedBy("lock#readLock")
    private StoreStatus getStoreStatusLocked(Predicate<? super StoreStatus> predicate) {
        for (StoreStatus storeStatus : this.stores) {
            if (predicate.test(storeStatus)) {
                return storeStatus;
            }
        }
        return null;
    }

    @Start
    public void start() {
        this.enabled = this.configuration.persistence().usingStores();
        this.segmentCount = this.configuration.clustering().hash().numSegments();
        this.isInvalidationCache = this.configuration.clustering().cacheMode().isInvalidation();
        if (this.enabled) {
            Completable.using(this::acquireWriteLock, l -> {
                return startManagerAndStores(this.configuration.persistence().stores());
            }, (v1) -> {
                releaseWriteLock(v1);
            }).blockingAwait();
        }
    }

    @GuardedBy("lock#writeLock")
    private Completable startManagerAndStores(Collection<StoreConfiguration> collection) {
        if (collection.isEmpty()) {
            throw new IllegalArgumentException("Store configurations require at least one configuration");
        }
        this.enabled = true;
        if (this.stores == null) {
            this.stores = new ArrayList(collection.size());
        }
        Completable startStoresOnly = startStoresOnly(collection);
        long availabilityInterval = this.configuration.persistence().availabilityInterval();
        if (availabilityInterval > 0 && this.availabilityTask == null) {
            startStoresOnly = startStoresOnly.doOnComplete(() -> {
                this.availabilityTask = this.nonBlockingManager.scheduleWithFixedDelay(this::pollStoreAvailability, availabilityInterval, availabilityInterval, TimeUnit.MILLISECONDS, th -> {
                    return !(th instanceof Error);
                });
            });
        }
        return startStoresOnly.doOnComplete(() -> {
            boolean z = this.configuration.expiration().maxIdle() > 0;
            if ((this.configuration.expiration().lifespan() > 0) || z) {
                this.stores.stream().forEach(storeStatus -> {
                    if (storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.READ_ONLY)) {
                        return;
                    }
                    if (z) {
                        if (!this.configuration.persistence().passivation()) {
                            throw Log.CONFIG.maxIdleNotAllowedWithoutPassivation();
                        }
                        Log.CONFIG.maxIdleNotTestedWithPassivation();
                    }
                    if (!storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.EXPIRATION)) {
                        throw Log.CONFIG.expirationNotAllowedWhenStoreDoesNotSupport(storeStatus.store.getClass().getName());
                    }
                });
            }
            this.allSegmentedOrShared = allStoresSegmentedOrShared();
        });
    }

    private Completable startStoresOnly(Iterable<StoreConfiguration> iterable) {
        Flowable concatMapSingle = Flowable.fromIterable(iterable).concatMapSingle(storeConfiguration -> {
            NonBlockingStore storeFromConfiguration = PersistenceUtil.storeFromConfiguration(storeConfiguration);
            NonBlockingStore asyncNonBlockingStore = storeConfiguration.async().enabled() ? new AsyncNonBlockingStore(storeFromConfiguration) : storeFromConfiguration;
            NonBlockingStore nonBlockingStore = asyncNonBlockingStore;
            Completable fromCompletionStage = Completable.fromCompletionStage(asyncNonBlockingStore.start(new InitializationContextImpl(storeConfiguration, this.cache.wired(), this.keyPartitioner, this.persistenceMarshaller, this.timeService, this.byteBufferFactory, this.marshallableEntryFactory, this.nonBlockingExecutor, this.globalConfiguration, this.blockingManager, this.nonBlockingManager)).whenComplete((r9, th) -> {
                if (th != null) {
                    this.stores.add(new StoreStatus(nonBlockingStore, null, null));
                }
            }));
            NonBlockingStore nonBlockingStore2 = asyncNonBlockingStore;
            return fromCompletionStage.toSingle(() -> {
                return new StoreStatus(nonBlockingStore2, storeConfiguration, updateCharacteristics(nonBlockingStore2, nonBlockingStore2.characteristics(), storeConfiguration));
            });
        });
        List<StoreStatus> list = this.stores;
        Objects.requireNonNull(list);
        return concatMapSingle.doOnNext((v1) -> {
            r1.add(v1);
        }).delay(storeStatus -> {
            return storeStatus.config.purgeOnStartup() ? Flowable.fromCompletable(Completable.fromCompletionStage(storeStatus.store.clear())) : Flowable.empty();
        }).ignoreElements();
    }

    @GuardedBy("lock")
    private boolean allStoresSegmentedOrShared() {
        return getStoreLocked(storeStatus -> {
            return (storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.SEGMENTABLE) && storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.SHAREABLE)) ? false : true;
        }) != null;
    }

    private Set<NonBlockingStore.Characteristic> updateCharacteristics(NonBlockingStore<?, ?> nonBlockingStore, Set<NonBlockingStore.Characteristic> set, StoreConfiguration storeConfiguration) {
        if (storeConfiguration.ignoreModifications()) {
            if (set.contains(NonBlockingStore.Characteristic.WRITE_ONLY)) {
                throw log.storeConfiguredHasBothReadAndWriteOnly(nonBlockingStore.getClass().getName(), NonBlockingStore.Characteristic.WRITE_ONLY, NonBlockingStore.Characteristic.READ_ONLY);
            }
            set.add(NonBlockingStore.Characteristic.READ_ONLY);
            set.remove(NonBlockingStore.Characteristic.TRANSACTIONAL);
        }
        if (storeConfiguration.writeOnly()) {
            if (set.contains(NonBlockingStore.Characteristic.READ_ONLY)) {
                throw log.storeConfiguredHasBothReadAndWriteOnly(nonBlockingStore.getClass().getName(), NonBlockingStore.Characteristic.READ_ONLY, NonBlockingStore.Characteristic.WRITE_ONLY);
            }
            set.add(NonBlockingStore.Characteristic.WRITE_ONLY);
            set.remove(NonBlockingStore.Characteristic.BULK_READ);
        }
        if (!storeConfiguration.segmented()) {
            set.remove(NonBlockingStore.Characteristic.SEGMENTABLE);
        } else if (!set.contains(NonBlockingStore.Characteristic.SEGMENTABLE)) {
            throw log.storeConfiguredSegmentedButCharacteristicNotPresent(nonBlockingStore.getClass().getName());
        }
        if (!storeConfiguration.transactional()) {
            set.remove(NonBlockingStore.Characteristic.TRANSACTIONAL);
        } else if (!set.contains(NonBlockingStore.Characteristic.TRANSACTIONAL)) {
            throw log.storeConfiguredTransactionalButCharacteristicNotPresent(nonBlockingStore.getClass().getName());
        }
        if (!storeConfiguration.shared()) {
            set.remove(NonBlockingStore.Characteristic.SHAREABLE);
        } else if (!set.contains(NonBlockingStore.Characteristic.SHAREABLE)) {
            throw log.storeConfiguredSharedButCharacteristicNotPresent(nonBlockingStore.getClass().getName());
        }
        return set;
    }

    protected CompletionStage<Void> pollStoreAvailability() {
        CompletionStage<Boolean> booleanStage;
        if (log.isTraceEnabled()) {
            log.trace("Polling Store availability");
        }
        AtomicReference atomicReference = new AtomicReference();
        long acquireReadLock = acquireReadLock();
        try {
            AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            for (StoreStatus storeStatus : this.stores) {
                try {
                    booleanStage = storeStatus.store.isAvailable();
                } catch (Throwable th) {
                    log.storeIsAvailableCheckThrewException(th, storeStatus.store.getClass().getName());
                    booleanStage = CompletableFutures.booleanStage(false);
                }
                aggregateCompletionStage.dependsOn(booleanStage.exceptionally(th2 -> {
                    log.storeIsAvailableCompletedExceptionally(th2, storeStatus.store.getClass().getName());
                    return false;
                }).thenCompose(bool -> {
                    storeStatus.availability = bool.booleanValue();
                    if (bool.booleanValue()) {
                        return CompletableFutures.completedNull();
                    }
                    atomicReference.compareAndSet(null, storeStatus.store);
                    return updatePersistenceAvailability(storeStatus.store);
                }));
            }
            CompletionStage<Void> freeze = aggregateCompletionStage.freeze();
            if (CompletionStages.isCompletedSuccessfully(freeze)) {
                CompletionStage<Void> updatePersistenceAvailability = updatePersistenceAvailability((NonBlockingStore) atomicReference.get());
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return updatePersistenceAvailability;
            }
            CompletionStage<Void> whenComplete = freeze.thenCompose(r5 -> {
                return updatePersistenceAvailability((NonBlockingStore) atomicReference.get());
            }).whenComplete((r7, th3) -> {
                releaseReadLock(acquireReadLock);
            });
            if (0 != 0) {
                releaseReadLock(acquireReadLock);
            }
            return whenComplete;
        } catch (Throwable th4) {
            if (1 != 0) {
                releaseReadLock(acquireReadLock);
            }
            throw th4;
        }
    }

    private CompletionStage<Void> updatePersistenceAvailability(NonBlockingStore<?, ?> nonBlockingStore) {
        if (nonBlockingStore != null) {
            if (this.unavailableExceptionMessage == null) {
                log.persistenceUnavailable(nonBlockingStore.getClass().getName());
                this.unavailableExceptionMessage = "Store " + nonBlockingStore + " is unavailable";
                return this.cacheNotifier.notifyPersistenceAvailabilityChanged(false);
            }
        } else if (this.unavailableExceptionMessage != null) {
            log.persistenceAvailable();
            this.unavailableExceptionMessage = null;
            return this.cacheNotifier.notifyPersistenceAvailabilityChanged(true);
        }
        return CompletableFutures.completedNull();
    }

    @Stop
    public void stop() {
        AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        long acquireWriteLock = acquireWriteLock();
        try {
            stopAvailabilityTask();
            if (this.stores == null) {
                return;
            }
            for (StoreStatus storeStatus : this.stores) {
                NonBlockingStore store = storeStatus.store();
                aggregateCompletionStage.dependsOn((!this.clearOnStop || storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.READ_ONLY)) ? store.stop() : store.clear().thenCompose(r3 -> {
                    return store.stop();
                }));
            }
            this.stores = null;
            releaseWriteLock(acquireWriteLock);
            CompletionStages.join(aggregateCompletionStage.freeze());
        } finally {
            releaseWriteLock(acquireWriteLock);
        }
    }

    private void stopAvailabilityTask() {
        AutoCloseable autoCloseable = this.availabilityTask;
        if (autoCloseable != null) {
            try {
                autoCloseable.close();
            } catch (Exception e) {
                log.warn("There was a problem stopping availability task", e);
            }
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public boolean isEnabled() {
        return this.enabled;
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public boolean isReadOnly() {
        return getStore(storeStatus -> {
            return !storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.READ_ONLY);
        }) == null;
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public boolean hasWriter() {
        return getStore(storeStatus -> {
            return !storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.READ_ONLY);
        }) != null;
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public boolean hasStore(Predicate<StoreConfiguration> predicate) {
        return getStore(storeStatus -> {
            return predicate.test(storeStatus.config);
        }) != null;
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public Flowable<MarshallableEntry<Object, Object>> preloadPublisher() {
        long acquireReadLock = acquireReadLock();
        NonBlockingStore storeLocked = getStoreLocked(storeStatus -> {
            return storeStatus.config.preload();
        });
        if (storeLocked != null) {
            return Flowable.fromPublisher(storeLocked.publishEntries(IntSets.immutableRangeSet(this.segmentCount), null, true)).doFinally(() -> {
                releaseReadLock(acquireReadLock);
            });
        }
        releaseReadLock(acquireReadLock);
        return Flowable.empty();
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public void addStoreListener(PersistenceManager.StoreChangeListener storeChangeListener) {
        this.listeners.add(storeChangeListener);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public void removeStoreListener(PersistenceManager.StoreChangeListener storeChangeListener) {
        this.listeners.remove(storeChangeListener);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Void> addStore(StoreConfiguration storeConfiguration) {
        return Single.fromCompletionStage(this.cache.wired().sizeAsync()).doOnSuccess(l -> {
            if (l.longValue() > 0) {
                throw log.cannotAddStore(this.cache.wired().getName());
            }
        }).concatMapCompletable(l2 -> {
            return Completable.using(this::acquireWriteLock, l2 -> {
                return startManagerAndStores(Collections.singletonList(storeConfiguration)).doOnComplete(() -> {
                    this.interceptorChainFactory.addPersistenceInterceptors(this.cache.wired().getAsyncInterceptorChain(), this.configuration, Collections.singletonList(storeConfiguration));
                    this.listeners.forEach(storeChangeListener -> {
                        storeChangeListener.storeChanged(createStatus());
                    });
                });
            }, (v1) -> {
                releaseWriteLock(v1);
            });
        }).toCompletionStage((Object) null);
    }

    private PersistenceStatus createStatus() {
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        boolean z4 = false;
        boolean z5 = false;
        for (StoreStatus storeStatus : this.stores) {
            if (storeStatus.config.async().enabled()) {
                z |= storeStatus.config.shared();
                z3 = true;
            }
            if (storeStatus.config.segmented()) {
                z2 = true;
            }
            if (storeStatus.config.ignoreModifications()) {
                z4 = true;
            }
            if (storeStatus.config.transactional()) {
                z5 = true;
            }
        }
        return new PersistenceStatus(this.enabled, z2, z3, z, z4, z5);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Void> disableStore(String str) {
        boolean z = false;
        AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        long writeLock = this.lock.writeLock();
        try {
            if (!checkStoreAvailability()) {
                CompletableFuture completedNull = CompletableFutures.completedNull();
                this.lock.unlockWrite(writeLock);
                return completedNull;
            }
            boolean z2 = true;
            Iterator<StoreStatus> it = this.stores.iterator();
            while (it.hasNext()) {
                StoreStatus next = it.next();
                NonBlockingStore<?, ?> unwrapStore = unwrapStore(next.store());
                if (unwrapStore.getClass().getName().equals(str) || containedInAdapter(unwrapStore, str)) {
                    it.remove();
                    aggregateCompletionStage.dependsOn(unwrapStore.stop().whenComplete((r4, th) -> {
                        if (th != null) {
                            log.warn("There was an error stopping the store", th);
                        }
                    }));
                } else {
                    z = true;
                    z2 = z2 && next.availability;
                }
            }
            if (!z) {
                this.unavailableExceptionMessage = null;
                this.enabled = false;
                stopAvailabilityTask();
            } else if (z2) {
                this.unavailableExceptionMessage = null;
            }
            this.allSegmentedOrShared = allStoresSegmentedOrShared();
            this.listeners.forEach(storeChangeListener -> {
                storeChangeListener.storeChanged(createStatus());
            });
            if (!z) {
                AsyncInterceptorChain asyncInterceptorChain = this.cache.wired().getAsyncInterceptorChain();
                AsyncInterceptor findInterceptorExtending = asyncInterceptorChain.findInterceptorExtending(CacheLoaderInterceptor.class);
                if (findInterceptorExtending == null) {
                    Log.PERSISTENCE.persistenceWithoutCacheLoaderInterceptor();
                } else {
                    asyncInterceptorChain.removeInterceptor((Class<? extends AsyncInterceptor>) findInterceptorExtending.getClass());
                }
                AsyncInterceptor findInterceptorExtending2 = asyncInterceptorChain.findInterceptorExtending(CacheWriterInterceptor.class);
                if (findInterceptorExtending2 == null) {
                    AsyncInterceptor findInterceptorWithClass = asyncInterceptorChain.findInterceptorWithClass(TransactionalStoreInterceptor.class);
                    if (findInterceptorWithClass == null) {
                        Log.PERSISTENCE.persistenceWithoutCacheWriteInterceptor();
                    } else {
                        asyncInterceptorChain.removeInterceptor((Class<? extends AsyncInterceptor>) findInterceptorWithClass.getClass());
                    }
                } else {
                    asyncInterceptorChain.removeInterceptor((Class<? extends AsyncInterceptor>) findInterceptorExtending2.getClass());
                }
            }
            CompletionStage<Void> freeze = aggregateCompletionStage.freeze();
            this.lock.unlockWrite(writeLock);
            return freeze;
        } catch (Throwable th2) {
            this.lock.unlockWrite(writeLock);
            throw th2;
        }
    }

    private <K, V> NonBlockingStore<K, V> unwrapStore(NonBlockingStore<K, V> nonBlockingStore) {
        return nonBlockingStore instanceof DelegatingNonBlockingStore ? ((DelegatingNonBlockingStore) nonBlockingStore).delegate() : nonBlockingStore;
    }

    private Object unwrapOldSPI(NonBlockingStore<?, ?> nonBlockingStore) {
        return nonBlockingStore instanceof NonBlockingStoreAdapter ? ((NonBlockingStoreAdapter) nonBlockingStore).getActualStore() : nonBlockingStore;
    }

    private boolean containedInAdapter(NonBlockingStore<?, ?> nonBlockingStore, String str) {
        return (nonBlockingStore instanceof NonBlockingStoreAdapter) && ((NonBlockingStoreAdapter) nonBlockingStore).getActualStore().getClass().getName().equals(str);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public <T> Set<T> getStores(Class<T> cls) {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                Set<T> emptySet = Collections.emptySet();
                releaseReadLock(acquireReadLock);
                return emptySet;
            }
            Stream map = this.stores.stream().map((v0) -> {
                return v0.store();
            }).map(this::unwrapStore).map(this::unwrapOldSPI);
            Objects.requireNonNull(cls);
            Stream<T> filter = map.filter(cls::isInstance);
            Objects.requireNonNull(cls);
            Set<T> set = (Set) filter.map(cls::cast).collect(Collectors.toCollection(HashSet::new));
            releaseReadLock(acquireReadLock);
            return set;
        } catch (Throwable th) {
            releaseReadLock(acquireReadLock);
            throw th;
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public Collection<String> getStoresAsString() {
        long acquireReadLock = acquireReadLock();
        try {
            if (checkStoreAvailability()) {
                Collection<String> collection = (Collection) this.stores.stream().map((v0) -> {
                    return v0.store();
                }).map(this::unwrapStore).map(this::unwrapOldSPI).map(obj -> {
                    return obj.getClass().getName();
                }).collect(Collectors.toCollection(ArrayList::new));
                releaseReadLock(acquireReadLock);
                return collection;
            }
            List emptyList = Collections.emptyList();
            releaseReadLock(acquireReadLock);
            return emptyList;
        } catch (Throwable th) {
            releaseReadLock(acquireReadLock);
            throw th;
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Void> purgeExpired() {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                releaseReadLock(acquireReadLock);
                return CompletableFutures.completedNull();
            }
            if (log.isTraceEnabled()) {
                log.tracef("Purging entries from stores on cache %s", this.cache.getName());
            }
            AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            for (StoreStatus storeStatus : this.stores) {
                if (storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.EXPIRATION)) {
                    aggregateCompletionStage.dependsOn(Flowable.fromPublisher(storeStatus.store().purgeExpired()).concatMapCompletable(marshallableEntry -> {
                        return Completable.fromCompletionStage(this.expirationManager.running().handleInStoreExpirationInternal((MarshallableEntry<Object, Object>) marshallableEntry));
                    }).toCompletionStage((Object) null));
                }
            }
            return aggregateCompletionStage.freeze().whenComplete((r7, th) -> {
                releaseReadLock(acquireReadLock);
            });
        } catch (Throwable th2) {
            releaseReadLock(acquireReadLock);
            throw th2;
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Void> clearAllStores(Predicate<? super StoreConfiguration> predicate) {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                CompletableFuture completedNull = CompletableFutures.completedNull();
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return completedNull;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Clearing all stores", new Object[0]);
            }
            AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            for (StoreStatus storeStatus : this.stores) {
                if (!storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.READ_ONLY) && predicate.test(storeStatus.config)) {
                    aggregateCompletionStage.dependsOn(storeStatus.store.clear());
                }
            }
            CompletionStage<Void> freeze = aggregateCompletionStage.freeze();
            if (CompletionStages.isCompletedSuccessfully(freeze)) {
                return freeze;
            }
            CompletionStage<Void> whenComplete = freeze.whenComplete((r7, th) -> {
                releaseReadLock(acquireReadLock);
            });
            if (0 != 0) {
                releaseReadLock(acquireReadLock);
            }
            return whenComplete;
        } finally {
            if (1 != 0) {
                releaseReadLock(acquireReadLock);
            }
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Boolean> deleteFromAllStores(Object obj, int i, Predicate<? super StoreConfiguration> predicate) {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                CompletableFuture completedFalse = CompletableFutures.completedFalse();
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return completedFalse;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Deleting entry for key %s from stores", obj);
            }
            if (this.stores.isEmpty()) {
                CompletableFuture completedFalse2 = CompletableFutures.completedFalse();
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return completedFalse2;
            }
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage(atomicBoolean);
            for (StoreStatus storeStatus : this.stores) {
                if (!storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.READ_ONLY) && predicate.test(storeStatus.config)) {
                    aggregateCompletionStage.dependsOn(storeStatus.store.delete(i, obj).thenAccept(bool -> {
                        if (bool == null || bool.booleanValue()) {
                            atomicBoolean.set(true);
                        }
                    }));
                }
            }
            CompletionStage freeze = aggregateCompletionStage.freeze();
            if (CompletionStages.isCompletedSuccessfully(freeze)) {
                CompletionStage<Boolean> booleanStage = CompletableFutures.booleanStage(atomicBoolean.get());
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return booleanStage;
            }
            CompletionStage<Boolean> handle = freeze.handle((atomicBoolean2, th) -> {
                releaseReadLock(acquireReadLock);
                if (th != null) {
                    throw CompletableFutures.asCompletionException(th);
                }
                return Boolean.valueOf(atomicBoolean2.get());
            });
            if (0 != 0) {
                releaseReadLock(acquireReadLock);
            }
            return handle;
        } catch (Throwable th2) {
            if (1 != 0) {
                releaseReadLock(acquireReadLock);
            }
            throw th2;
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public <K, V> Publisher<MarshallableEntry<K, V>> publishEntries(boolean z, boolean z2) {
        return publishEntries(obj -> {
            return true;
        }, z, z2, storeConfiguration -> {
            return true;
        });
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public <K, V> Publisher<MarshallableEntry<K, V>> publishEntries(Predicate<? super K> predicate, boolean z, boolean z2, Predicate<? super StoreConfiguration> predicate2) {
        return publishEntries(IntSets.immutableRangeSet(this.segmentCount), predicate, z, z2, predicate2);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public <K, V> Publisher<MarshallableEntry<K, V>> publishEntries(IntSet intSet, Predicate<? super K> predicate, boolean z, boolean z2, Predicate<? super StoreConfiguration> predicate2) {
        return Flowable.using(this::acquireReadLock, l -> {
            if (!checkStoreAvailability()) {
                return Flowable.empty();
            }
            if (log.isTraceEnabled()) {
                log.tracef("Publishing entries for segments %s", intSet);
            }
            for (StoreStatus storeStatus : this.stores) {
                Set<NonBlockingStore.Characteristic> set = storeStatus.characteristics;
                if (set.contains(NonBlockingStore.Characteristic.BULK_READ) && predicate2.test(storeStatus.config)) {
                    return storeStatus.store().publishEntries(intSet, (set.contains(NonBlockingStore.Characteristic.SEGMENTABLE) || intSet.containsAll(IntSets.immutableRangeSet(this.segmentCount))) ? predicate : PersistenceUtil.combinePredicate(intSet, this.keyPartitioner, predicate), z);
                }
            }
            return Flowable.empty();
        }, (v1) -> {
            releaseReadLock(v1);
        });
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public <K> Publisher<K> publishKeys(Predicate<? super K> predicate, Predicate<? super StoreConfiguration> predicate2) {
        return publishKeys(IntSets.immutableRangeSet(this.segmentCount), predicate, predicate2);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public <K> Publisher<K> publishKeys(IntSet intSet, Predicate<? super K> predicate, Predicate<? super StoreConfiguration> predicate2) {
        return Flowable.using(this::acquireReadLock, l -> {
            if (!checkStoreAvailability()) {
                return Flowable.empty();
            }
            if (log.isTraceEnabled()) {
                log.tracef("Publishing keys for segments %s", intSet);
            }
            for (StoreStatus storeStatus : this.stores) {
                Set<NonBlockingStore.Characteristic> set = storeStatus.characteristics;
                if (set.contains(NonBlockingStore.Characteristic.BULK_READ) && predicate2.test(storeStatus.config)) {
                    return storeStatus.store().publishKeys(intSet, (set.contains(NonBlockingStore.Characteristic.SEGMENTABLE) || intSet.containsAll(IntSets.immutableRangeSet(this.segmentCount))) ? predicate : PersistenceUtil.combinePredicate(intSet, this.keyPartitioner, predicate));
                }
            }
            return Flowable.empty();
        }, (v1) -> {
            releaseReadLock(v1);
        });
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public <K, V> CompletionStage<MarshallableEntry<K, V>> loadFromAllStores(Object obj, boolean z, boolean z2) {
        return loadFromAllStores(obj, this.keyPartitioner.getSegment(obj), z, z2);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public <K, V> CompletionStage<MarshallableEntry<K, V>> loadFromAllStores(Object obj, int i, boolean z, boolean z2) {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                CompletableFuture completedNull = CompletableFutures.completedNull();
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return completedNull;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Loading entry for key %s with segment %d", obj, Integer.valueOf(i));
            }
            CompletionStage<MarshallableEntry<K, V>> loadFromStoresIterator = loadFromStoresIterator(obj, i, this.stores.iterator(), z, z2);
            if (CompletionStages.isCompletedSuccessfully(loadFromStoresIterator)) {
                return loadFromStoresIterator;
            }
            CompletionStage<MarshallableEntry<K, V>> whenComplete = loadFromStoresIterator.whenComplete((marshallableEntry, th) -> {
                releaseReadLock(acquireReadLock);
            });
            if (0 != 0) {
                releaseReadLock(acquireReadLock);
            }
            return whenComplete;
        } finally {
            if (1 != 0) {
                releaseReadLock(acquireReadLock);
            }
        }
    }

    private <K, V> CompletionStage<MarshallableEntry<K, V>> loadFromStoresIterator(Object obj, int i, Iterator<StoreStatus> it, boolean z, boolean z2) {
        while (it.hasNext()) {
            StoreStatus next = it.next();
            NonBlockingStore<K, V> store = next.store();
            if (allowLoad(next, z, z2)) {
                return (CompletionStage<MarshallableEntry<K, V>>) store.load(segmentOrZero(next, i), obj).thenCompose(marshallableEntry -> {
                    return marshallableEntry != null ? CompletableFuture.completedFuture(marshallableEntry) : loadFromStoresIterator(obj, i, it, z, z2);
                });
            }
        }
        return CompletableFutures.completedNull();
    }

    private boolean allowLoad(StoreStatus storeStatus, boolean z, boolean z2) {
        return !storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.WRITE_ONLY) && (z || !isLocalOnlyLoader(storeStatus.store)) && (z2 || storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.READ_ONLY) || storeStatus.config.ignoreModifications());
    }

    private boolean isLocalOnlyLoader(NonBlockingStore<?, ?> nonBlockingStore) {
        if (nonBlockingStore instanceof LocalOnlyCacheLoader) {
            return true;
        }
        NonBlockingStore<?, ?> delegate = nonBlockingStore instanceof DelegatingNonBlockingStore ? ((DelegatingNonBlockingStore) nonBlockingStore).delegate() : nonBlockingStore;
        if (delegate instanceof LocalOnlyCacheLoader) {
            return true;
        }
        if (delegate instanceof NonBlockingStoreAdapter) {
            return ((NonBlockingStoreAdapter) delegate).getActualStore() instanceof LocalOnlyCacheLoader;
        }
        return false;
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Long> approximateSize(Predicate<? super StoreConfiguration> predicate, IntSet intSet) {
        if (!isEnabled()) {
            return NonBlockingStore.SIZE_UNAVAILABLE_FUTURE;
        }
        long acquireReadLock = acquireReadLock();
        try {
            if (!isAvailable()) {
                releaseReadLock(acquireReadLock);
                return NonBlockingStore.SIZE_UNAVAILABLE_FUTURE;
            }
            if (this.stores == null) {
                throw new IllegalLifecycleStateException();
            }
            StoreStatus storeStatusLocked = getStoreStatusLocked(storeStatus -> {
                return storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.BULK_READ) && predicate.test(storeStatus.config);
            });
            if (storeStatusLocked == null) {
                releaseReadLock(acquireReadLock);
                return NonBlockingStore.SIZE_UNAVAILABLE_FUTURE;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Obtaining approximate size from store %s", storeStatusLocked.store);
            }
            return (storeStatusLocked.hasCharacteristic(NonBlockingStore.Characteristic.SEGMENTABLE) ? storeStatusLocked.store.approximateSize(intSet) : storeStatusLocked.store.approximateSize(IntSets.immutableRangeSet(this.segmentCount)).thenApply(l -> {
                int localWriteSegmentsCount = storeStatusLocked.hasCharacteristic(NonBlockingStore.Characteristic.SHAREABLE) ? this.segmentCount : this.distributionManager.getCacheTopology().getLocalWriteSegmentsCount();
                return Long.valueOf(localWriteSegmentsCount > 0 ? (l.longValue() * intSet.size()) / localWriteSegmentsCount : l.longValue());
            })).whenComplete((l2, th) -> {
                releaseReadLock(acquireReadLock);
            });
        } catch (Throwable th2) {
            releaseReadLock(acquireReadLock);
            throw th2;
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Long> size(Predicate<? super StoreConfiguration> predicate, IntSet intSet) {
        long acquireReadLock = acquireReadLock();
        try {
            checkStoreAvailability();
            if (log.isTraceEnabled()) {
                log.tracef("Obtaining size from stores", new Object[0]);
            }
            NonBlockingStore storeLocked = getStoreLocked(storeStatus -> {
                return storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.BULK_READ) && predicate.test(storeStatus.config);
            });
            if (storeLocked == null) {
                releaseReadLock(acquireReadLock);
                return NonBlockingStore.SIZE_UNAVAILABLE_FUTURE;
            }
            if (intSet == null) {
                intSet = IntSets.immutableRangeSet(this.segmentCount);
            }
            return storeLocked.size(intSet).whenComplete((l, th) -> {
                releaseReadLock(acquireReadLock);
            });
        } catch (Throwable th2) {
            releaseReadLock(acquireReadLock);
            throw th2;
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Long> size(Predicate<? super StoreConfiguration> predicate) {
        return size(predicate, null);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public void setClearOnStop(boolean z) {
        this.clearOnStop = z;
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Void> writeToAllNonTxStores(MarshallableEntry marshallableEntry, int i, Predicate<? super StoreConfiguration> predicate, long j) {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                CompletableFuture completedNull = CompletableFutures.completedNull();
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return completedNull;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Writing entry %s for with segment: %d", marshallableEntry, Integer.valueOf(i));
            }
            AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            for (StoreStatus storeStatus : this.stores) {
                if (shouldWrite(storeStatus, predicate, j)) {
                    aggregateCompletionStage.dependsOn(storeStatus.store.write(i, marshallableEntry));
                }
            }
            CompletionStage<Void> freeze = aggregateCompletionStage.freeze();
            if (CompletionStages.isCompletedSuccessfully(freeze)) {
                return freeze;
            }
            CompletionStage<Void> whenComplete = freeze.whenComplete((r7, th) -> {
                releaseReadLock(acquireReadLock);
            });
            if (0 != 0) {
                releaseReadLock(acquireReadLock);
            }
            return whenComplete;
        } finally {
            if (1 != 0) {
                releaseReadLock(acquireReadLock);
            }
        }
    }

    private int segmentOrZero(StoreStatus storeStatus, int i) {
        if (storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.SEGMENTABLE)) {
            return i;
        }
        return 0;
    }

    private boolean shouldWrite(StoreStatus storeStatus, Predicate<? super StoreConfiguration> predicate) {
        return !storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.READ_ONLY) && predicate.test(storeStatus.config);
    }

    private boolean shouldWrite(StoreStatus storeStatus, Predicate<? super StoreConfiguration> predicate, long j) {
        return shouldWrite(storeStatus, predicate) && !storeStatus.store.ignoreCommandWithFlags(j);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Void> prepareAllTxStores(TxInvocationContext<AbstractCacheTransaction> txInvocationContext, Predicate<? super StoreConfiguration> predicate) throws PersistenceException {
        return batchOperation(toMvccEntryFlowable(txInvocationContext, null), txInvocationContext, (nonBlockingStore, i, flowable, flowable2) -> {
            return nonBlockingStore.prepareWithModifications(txInvocationContext.getTransaction(), i, flowable, flowable2);
        }).thenApply(CompletableFutures.toNullFunction());
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Void> commitAllTxStores(TxInvocationContext<AbstractCacheTransaction> txInvocationContext, Predicate<? super StoreConfiguration> predicate) {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                CompletableFuture completedNull = CompletableFutures.completedNull();
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return completedNull;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Committing transaction %s to stores", txInvocationContext);
            }
            AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            for (StoreStatus storeStatus : this.stores) {
                if (shouldPerformTransactionOperation(storeStatus, predicate)) {
                    aggregateCompletionStage.dependsOn(storeStatus.store.commit(txInvocationContext.getTransaction()));
                }
            }
            CompletionStage<Void> freeze = aggregateCompletionStage.freeze();
            if (CompletionStages.isCompletedSuccessfully(freeze)) {
                return freeze;
            }
            CompletionStage<Void> whenComplete = freeze.whenComplete((r7, th) -> {
                releaseReadLock(acquireReadLock);
            });
            if (0 != 0) {
                releaseReadLock(acquireReadLock);
            }
            return whenComplete;
        } finally {
            if (1 != 0) {
                releaseReadLock(acquireReadLock);
            }
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Void> rollbackAllTxStores(TxInvocationContext<AbstractCacheTransaction> txInvocationContext, Predicate<? super StoreConfiguration> predicate) {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                CompletableFuture completedNull = CompletableFutures.completedNull();
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return completedNull;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Rolling back transaction %s for stores", txInvocationContext);
            }
            AggregateCompletionStage<Void> aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
            for (StoreStatus storeStatus : this.stores) {
                if (shouldPerformTransactionOperation(storeStatus, predicate)) {
                    aggregateCompletionStage.dependsOn(storeStatus.store.rollback(txInvocationContext.getTransaction()));
                }
            }
            CompletionStage<Void> freeze = aggregateCompletionStage.freeze();
            if (CompletionStages.isCompletedSuccessfully(freeze)) {
                return freeze;
            }
            CompletionStage<Void> whenComplete = freeze.whenComplete((r7, th) -> {
                releaseReadLock(acquireReadLock);
            });
            if (0 != 0) {
                releaseReadLock(acquireReadLock);
            }
            return whenComplete;
        } finally {
            if (1 != 0) {
                releaseReadLock(acquireReadLock);
            }
        }
    }

    private boolean shouldPerformTransactionOperation(StoreStatus storeStatus, Predicate<? super StoreConfiguration> predicate) {
        return storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.TRANSACTIONAL) && predicate.test(storeStatus.config);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public <K, V> CompletionStage<Void> writeEntries(Iterable<MarshallableEntry<K, V>> iterable, Predicate<? super StoreConfiguration> predicate) {
        return Completable.using(this::acquireReadLock, l -> {
            if (!checkStoreAvailability()) {
                return Completable.complete();
            }
            if (log.isTraceEnabled()) {
                log.trace("Writing entries to stores");
            }
            return Flowable.fromIterable(this.stores).filter(storeStatus -> {
                return shouldWrite(storeStatus, predicate) && !storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.TRANSACTIONAL);
            }).flatMapCompletable(storeStatus2 -> {
                boolean hasCharacteristic = storeStatus2.hasCharacteristic(NonBlockingStore.Characteristic.SEGMENTABLE);
                return Completable.fromCompletionStage(storeStatus2.store().batch(segmentCount(hasCharacteristic), Flowable.empty(), hasCharacteristic ? Flowable.fromIterable(iterable).groupBy(groupingFunction((v0) -> {
                    return v0.getKey();
                })).map(SegmentPublisherWrapper::wrap) : Flowable.just(SingleSegmentPublisher.singleSegment(Flowable.fromIterable(iterable)))));
            });
        }, (v1) -> {
            releaseReadLock(v1);
        }).toCompletionStage((Object) null);
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Long> writeMapCommand(PutMapCommand putMapCommand, InvocationContext invocationContext, BiPredicate<? super PutMapCommand, Object> biPredicate) {
        return batchOperation(entriesFromCommand(putMapCommand, invocationContext, biPredicate), invocationContext, (v0, v1, v2, v3) -> {
            return v0.batch(v1, v2, v3);
        });
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Long> performBatch(TxInvocationContext<AbstractCacheTransaction> txInvocationContext, BiPredicate<? super WriteCommand, Object> biPredicate) {
        return batchOperation(toMvccEntryFlowable(txInvocationContext, biPredicate), txInvocationContext, (v0, v1, v2, v3) -> {
            return v0.batch(v1, v2, v3);
        });
    }

    private <K, V> CompletionStage<Long> batchOperation(Flowable<MVCCEntry<K, V>> flowable, InvocationContext invocationContext, HandleFlowables<K, V> handleFlowables) {
        return Single.using(this::acquireReadLock, l -> {
            if (!checkStoreAvailability()) {
                return Single.just(0L);
            }
            if (log.isTraceEnabled()) {
                log.trace("Writing batch to stores");
            }
            return Flowable.fromIterable(this.stores).filter(storeStatus -> {
                return !storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.READ_ONLY);
            }).flatMapSingle(storeStatus2 -> {
                Flowable flowable2;
                boolean shared = storeStatus2.config.shared();
                if (shared) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Store %s is shared, checking skip shared stores and ignoring entries not primarily owned by this node", storeStatus2.store);
                    }
                    flowable2 = flowable.filter(mVCCEntry -> {
                        return !mVCCEntry.isSkipSharedStore();
                    });
                } else {
                    flowable2 = flowable;
                }
                boolean segmented = storeStatus2.config.segmented();
                Flowable autoConnect = flowable2.publish().autoConnect(2);
                Flowable<NonBlockingStore.SegmentedPublisher<Object>> createRemoveFlowable = createRemoveFlowable(autoConnect, shared, segmented, storeStatus2);
                ByRef.Long r0 = new ByRef.Long(0L);
                return Single.fromCompletionStage(handleFlowables.handleFlowables(storeStatus2.store(), segmentCount(segmented), createRemoveFlowable, createWriteFlowable(autoConnect, invocationContext, shared, segmented, r0, storeStatus2)).thenApply(r4 -> {
                    return Long.valueOf(r0.get());
                }));
            }).last(0L);
        }, (v1) -> {
            releaseReadLock(v1);
        }).toCompletionStage();
    }

    private <K, V> Flowable<NonBlockingStore.SegmentedPublisher<Object>> createRemoveFlowable(Flowable<MVCCEntry<K, V>> flowable, boolean z, boolean z2, StoreStatus storeStatus) {
        Flowable<NonBlockingStore.SegmentedPublisher<Object>> just;
        Flowable map = flowable.filter((v0) -> {
            return v0.isRemoved();
        }).map((v0) -> {
            return v0.getKey();
        });
        if (z2) {
            KeyPartitioner keyPartitioner = this.keyPartitioner;
            Objects.requireNonNull(keyPartitioner);
            just = filterSharedSegments(map.groupBy(keyPartitioner::getSegment).map(SegmentPublisherWrapper::wrap), null, z);
        } else {
            if (z && !this.isInvalidationCache) {
                map = map.filter(obj -> {
                    return this.distributionManager.getCacheTopology().getDistribution(obj).isPrimary();
                });
            }
            just = Flowable.just(SingleSegmentPublisher.singleSegment(map));
        }
        if (log.isTraceEnabled()) {
            just = just.doOnSubscribe(subscription -> {
                log.tracef("Store %s has subscribed to remove batch", storeStatus.store);
            }).map(segmentedPublisher -> {
                int segment = segmentedPublisher.getSegment();
                return SingleSegmentPublisher.singleSegment(segment, Flowable.fromPublisher(segmentedPublisher).doOnNext(obj2 -> {
                    log.tracef("Emitting key %s for removal from segment %s", obj2, Integer.valueOf(segment));
                }));
            });
        }
        return just;
    }

    private <K, V> Flowable<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> createWriteFlowable(Flowable<MVCCEntry<K, V>> flowable, InvocationContext invocationContext, boolean z, boolean z2, ByRef.Long r10, StoreStatus storeStatus) {
        Flowable<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> just;
        Flowable map = flowable.filter(mVCCEntry -> {
            return !mVCCEntry.isRemoved();
        }).map(mVCCEntry2 -> {
            K key = mVCCEntry2.getKey();
            return this.marshallableEntryFactory.create((Object) key, this.internalEntryFactory.getValueFromCtx(key, invocationContext));
        });
        if (z2) {
            just = filterSharedSegments(map.doOnNext(marshallableEntry -> {
                r10.inc();
            }).groupBy(marshallableEntry2 -> {
                return Integer.valueOf(this.keyPartitioner.getSegment(marshallableEntry2.getKey()));
            }).map(SegmentPublisherWrapper::wrap), r10, z);
        } else {
            if (z && !this.isInvalidationCache) {
                map = map.filter(marshallableEntry3 -> {
                    return this.distributionManager.getCacheTopology().getDistribution(marshallableEntry3.getKey()).isPrimary();
                });
            }
            just = Flowable.just(SingleSegmentPublisher.singleSegment(map.doOnNext(marshallableEntry4 -> {
                r10.inc();
            })));
        }
        if (log.isTraceEnabled()) {
            just = just.doOnSubscribe(subscription -> {
                log.tracef("Store %s has subscribed to write batch", storeStatus.store);
            }).map(segmentedPublisher -> {
                int segment = segmentedPublisher.getSegment();
                return SingleSegmentPublisher.singleSegment(segment, Flowable.fromPublisher(segmentedPublisher).doOnNext(marshallableEntry5 -> {
                    log.tracef("Emitting entry %s for write to segment %s", marshallableEntry5.getKey(), Integer.valueOf(segment));
                }));
            });
        }
        return just;
    }

    private <I> Flowable<NonBlockingStore.SegmentedPublisher<I>> filterSharedSegments(Flowable<NonBlockingStore.SegmentedPublisher<I>> flowable, ByRef.Long r6, boolean z) {
        return (!z || this.isInvalidationCache) ? flowable : flowable.map(segmentedPublisher -> {
            if (this.distributionManager.getCacheTopology().getSegmentDistribution(segmentedPublisher.getSegment()).isPrimary()) {
                return segmentedPublisher;
            }
            Flowable fromPublisher = Flowable.fromPublisher(segmentedPublisher);
            return SingleSegmentPublisher.singleSegment(segmentedPublisher.getSegment(), r6 != null ? fromPublisher.doOnNext(obj -> {
                r6.dec();
            }).ignoreElements().toFlowable() : fromPublisher.take(0L));
        });
    }

    private <K, V> Flowable<MVCCEntry<K, V>> toMvccEntryFlowable(TxInvocationContext<AbstractCacheTransaction> txInvocationContext, BiPredicate<? super WriteCommand, Object> biPredicate) {
        return Flowable.fromIterable(txInvocationContext.getCacheTransaction().getAllModifications()).filter(writeCommand -> {
            return !writeCommand.hasAnyFlag(FlagBitSets.SKIP_CACHE_STORE | FlagBitSets.ROLLING_UPGRADE);
        }).concatMap(writeCommand2 -> {
            return entriesFromCommand(writeCommand2, txInvocationContext, biPredicate);
        });
    }

    private <K, V, WCT extends WriteCommand> Flowable<MVCCEntry<K, V>> entriesFromCommand(WCT wct, InvocationContext invocationContext, BiPredicate<? super WCT, Object> biPredicate) {
        if (!(wct instanceof DataWriteCommand)) {
            return wct instanceof InvalidateCommand ? Flowable.empty() : Flowable.fromIterable(wct.getAffectedKeys()).concatMapMaybe(obj -> {
                MVCCEntry acquireKeyFromContext = acquireKeyFromContext(invocationContext, wct, obj, biPredicate);
                return acquireKeyFromContext != null ? Maybe.just(acquireKeyFromContext) : Maybe.empty();
            });
        }
        MVCCEntry<K, V> acquireKeyFromContext = acquireKeyFromContext(invocationContext, wct, ((DataWriteCommand) wct).getKey(), biPredicate);
        return acquireKeyFromContext != null ? Flowable.just(acquireKeyFromContext) : Flowable.empty();
    }

    private <K, V, WCT extends WriteCommand> MVCCEntry<K, V> acquireKeyFromContext(InvocationContext invocationContext, WCT wct, Object obj, BiPredicate<? super WCT, Object> biPredicate) {
        if (biPredicate != null && !biPredicate.test(wct, obj)) {
            return null;
        }
        MVCCEntry<K, V> mVCCEntry = (MVCCEntry) invocationContext.lookupEntry(obj);
        if (mVCCEntry.isChanged()) {
            return mVCCEntry;
        }
        return null;
    }

    private <E> Function<E, Integer> groupingFunction(Function<E, Object> function) {
        return obj -> {
            return Integer.valueOf(this.keyPartitioner.getSegment(function.apply(obj)));
        };
    }

    private int segmentCount(boolean z) {
        if (z) {
            return this.segmentCount;
        }
        return 1;
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public boolean isAvailable() {
        return this.unavailableExceptionMessage == null;
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Boolean> addSegments(IntSet intSet) {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                CompletableFuture completedFalse = CompletableFutures.completedFalse();
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return completedFalse;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Adding segments %s to stores", intSet);
            }
            AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage(Boolean.valueOf(this.allSegmentedOrShared));
            for (StoreStatus storeStatus : this.stores) {
                if (shouldInvokeSegmentMethods(storeStatus)) {
                    aggregateCompletionStage.dependsOn(storeStatus.store.addSegments(intSet));
                }
            }
            CompletionStage<Boolean> freeze = aggregateCompletionStage.freeze();
            if (CompletionStages.isCompletedSuccessfully(freeze)) {
                return freeze;
            }
            CompletionStage<Boolean> whenComplete = freeze.whenComplete((bool, th) -> {
                releaseReadLock(acquireReadLock);
            });
            if (0 != 0) {
                releaseReadLock(acquireReadLock);
            }
            return whenComplete;
        } finally {
            if (1 != 0) {
                releaseReadLock(acquireReadLock);
            }
        }
    }

    @Override // org.infinispan.persistence.manager.PersistenceManager
    public CompletionStage<Boolean> removeSegments(IntSet intSet) {
        long acquireReadLock = acquireReadLock();
        try {
            if (!checkStoreAvailability()) {
                CompletableFuture completedFalse = CompletableFutures.completedFalse();
                if (1 != 0) {
                    releaseReadLock(acquireReadLock);
                }
                return completedFalse;
            }
            if (log.isTraceEnabled()) {
                log.tracef("Removing segments %s from stores", intSet);
            }
            AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage(Boolean.valueOf(this.allSegmentedOrShared));
            for (StoreStatus storeStatus : this.stores) {
                if (shouldInvokeSegmentMethods(storeStatus)) {
                    aggregateCompletionStage.dependsOn(storeStatus.store.removeSegments(intSet));
                }
            }
            CompletionStage<Boolean> freeze = aggregateCompletionStage.freeze();
            if (CompletionStages.isCompletedSuccessfully(freeze)) {
                return freeze;
            }
            CompletionStage<Boolean> whenComplete = freeze.whenComplete((bool, th) -> {
                releaseReadLock(acquireReadLock);
            });
            if (0 != 0) {
                releaseReadLock(acquireReadLock);
            }
            return whenComplete;
        } finally {
            if (1 != 0) {
                releaseReadLock(acquireReadLock);
            }
        }
    }

    private static boolean shouldInvokeSegmentMethods(StoreStatus storeStatus) {
        return storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.SEGMENTABLE) && !storeStatus.hasCharacteristic(NonBlockingStore.Characteristic.SHAREABLE);
    }

    public <K, V> List<NonBlockingStore<K, V>> getAllStores(Predicate<Set<NonBlockingStore.Characteristic>> predicate) {
        long acquireReadLock = acquireReadLock();
        try {
            if (checkStoreAvailability()) {
                List<NonBlockingStore<K, V>> list = (List) this.stores.stream().filter(storeStatus -> {
                    return predicate.test(storeStatus.characteristics);
                }).map((v0) -> {
                    return v0.store();
                }).collect(Collectors.toCollection(ArrayList::new));
                releaseReadLock(acquireReadLock);
                return list;
            }
            List<NonBlockingStore<K, V>> emptyList = Collections.emptyList();
            releaseReadLock(acquireReadLock);
            return emptyList;
        } catch (Throwable th) {
            releaseReadLock(acquireReadLock);
            throw th;
        }
    }

    private long acquireReadLock() {
        return this.lock.readLock();
    }

    private long acquireWriteLock() {
        return this.lock.writeLock();
    }

    private void releaseReadLock(long j) {
        this.lock.unlockRead(j);
    }

    private void releaseWriteLock(long j) {
        this.lock.unlockWrite(j);
    }

    private boolean checkStoreAvailability() {
        if (!this.enabled) {
            return false;
        }
        String str = this.unavailableExceptionMessage;
        if (str != null) {
            throw new StoreUnavailableException(str);
        }
        if (this.stores == null) {
            throw new IllegalLifecycleStateException();
        }
        return true;
    }

    boolean anyLocksHeld() {
        return this.lock.isReadLocked() || this.lock.isWriteLocked();
    }
}
