package org.apache.cassandra.db;

import com.datastax.dse.byos.shade.com.google.common.collect.Iterators;
import com.datastax.dse.byos.shade.com.google.common.collect.PeekingIterator;
import com.datastax.dse.byos.shade.com.google.common.util.concurrent.Striped;
import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.Single;
import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.cassandra.concurrent.SchedulableMessage;
import org.apache.cassandra.concurrent.StagedScheduler;
import org.apache.cassandra.concurrent.TPC;
import org.apache.cassandra.concurrent.TPCTaskType;
import org.apache.cassandra.concurrent.TPCUtils;
import org.apache.cassandra.concurrent.TracingAwareExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.WriteVerbs;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.FlowablePartition;
import org.apache.cassandra.db.rows.FlowablePartitions;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.Serializer;
import org.apache.cassandra.utils.btree.BTreeSet;
import org.apache.cassandra.utils.concurrent.ExecutableLock;
import org.apache.cassandra.utils.flow.RxThreads;
import org.apache.cassandra.utils.time.ApolloTime;
import org.apache.cassandra.utils.versioning.VersionDependent;
import org.apache.cassandra.utils.versioning.Versioned;

/* loaded from: input_file:org/apache/cassandra/db/CounterMutation.class */
public class CounterMutation implements IMutation, SchedulableMessage {
    public static final Versioned<WriteVerbs.WriteVersion, Serializer<CounterMutation>> serializers = WriteVerbs.WriteVersion.versioned(writeVersion -> {
        return new CounterMutationSerializer(writeVersion);
    });
    private static final Striped<Semaphore> SEMAPHORES_STRIPED = Striped.semaphore(TPCUtils.getNumCores() * 1024, 1);
    private static final ConcurrentMap<Semaphore, Pair<Long, ExecutableLock>> LOCKS = new ConcurrentHashMap();
    private static final AtomicLong LOCK_ID_GEN = new AtomicLong();
    private final Mutation mutation;
    private final ConsistencyLevel consistency;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cassandra/db/CounterMutation$CounterMutationSerializer.class */
    public static class CounterMutationSerializer extends VersionDependent<WriteVerbs.WriteVersion> implements Serializer<CounterMutation> {
        private final Serializer<Mutation> mutationSerializer;

        private CounterMutationSerializer(WriteVerbs.WriteVersion writeVersion) {
            super(writeVersion);
            this.mutationSerializer = (Serializer) Mutation.serializers.get(writeVersion);
        }

        @Override // org.apache.cassandra.utils.Serializer
        public void serialize(CounterMutation counterMutation, DataOutputPlus dataOutputPlus) throws IOException {
            this.mutationSerializer.serialize(counterMutation.mutation, dataOutputPlus);
            dataOutputPlus.writeUTF(counterMutation.consistency.name());
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.cassandra.utils.Serializer
        public CounterMutation deserialize(DataInputPlus dataInputPlus) throws IOException {
            return new CounterMutation(this.mutationSerializer.deserialize(dataInputPlus), (ConsistencyLevel) Enum.valueOf(ConsistencyLevel.class, dataInputPlus.readUTF()));
        }

        @Override // org.apache.cassandra.utils.Serializer
        public long serializedSize(CounterMutation counterMutation) {
            return this.mutationSerializer.serializedSize(counterMutation.mutation) + TypeSizes.sizeof(counterMutation.consistency.name());
        }
    }

    public CounterMutation(Mutation mutation, ConsistencyLevel consistencyLevel) {
        this.mutation = mutation;
        this.consistency = consistencyLevel;
    }

    @Override // org.apache.cassandra.db.IMutation
    public String getKeyspaceName() {
        return this.mutation.getKeyspaceName();
    }

    @Override // org.apache.cassandra.db.IMutation
    public Collection<TableId> getTableIds() {
        return this.mutation.getTableIds();
    }

    @Override // org.apache.cassandra.db.IMutation
    public Mutation add(PartitionUpdate partitionUpdate) {
        return this.mutation.add(partitionUpdate);
    }

    @Override // org.apache.cassandra.db.IMutation
    public PartitionUpdate get(TableMetadata tableMetadata) {
        return this.mutation.get(tableMetadata);
    }

    @Override // org.apache.cassandra.db.IMutation
    public Collection<PartitionUpdate> getPartitionUpdates() {
        return this.mutation.getPartitionUpdates();
    }

    public Mutation getMutation() {
        return this.mutation;
    }

    @Override // org.apache.cassandra.db.IMutation
    public DecoratedKey key() {
        return this.mutation.key();
    }

    public ConsistencyLevel consistency() {
        return this.consistency;
    }

    public CompletableFuture<Mutation> applyCounterMutation() {
        return applyCounterMutation(ApolloTime.systemClockMillis());
    }

    @Override // org.apache.cassandra.concurrent.SchedulableMessage
    public StagedScheduler getScheduler() {
        return this.mutation.getScheduler();
    }

    @Override // org.apache.cassandra.concurrent.SchedulableMessage
    public TracingAwareExecutor getRequestExecutor() {
        return this.mutation.getRequestExecutor();
    }

    @Override // org.apache.cassandra.concurrent.SchedulableMessage
    public TracingAwareExecutor getResponseExecutor() {
        return this.mutation.getResponseExecutor();
    }

    private Single<Mutation> applyCounterMutationInternal() {
        Mutation mutation = new Mutation(getKeyspaceName(), key());
        return RxThreads.subscribeOn(Completable.concat((Iterable<? extends CompletableSource>) getPartitionUpdates().stream().map(this::processModifications).map(single -> {
            return single.flatMapCompletable(partitionUpdate -> {
                return Completable.fromRunnable(() -> {
                    mutation.add(partitionUpdate);
                });
            });
        }).collect(Collectors.toList())).andThen(mutation.applyAsync()).toSingleDefault(mutation), getScheduler(), TPCTaskType.COUNTER_ACQUIRE_LOCK);
    }

    @Override // org.apache.cassandra.db.IMutation
    public void apply() {
        TPCUtils.blockingGet(applyCounterMutation());
    }

    @Override // org.apache.cassandra.db.IMutation
    public Completable applyAsync() {
        return TPCUtils.toCompletable(applyCounterMutation().thenAccept(mutation -> {
        }));
    }

    private CompletableFuture<Mutation> applyCounterMutation(long j) {
        Tracing.trace("Acquiring counter locks");
        SortedMap<Long, ExecutableLock> locks = getLocks();
        return locks.isEmpty() ? TPCUtils.toFuture(applyCounterMutationInternal()) : TPC.withLocks(locks, j, DatabaseDescriptor.getCounterWriteRpcTimeout(), () -> {
            return TPCUtils.toFuture(applyCounterMutationInternal());
        }, timeoutException -> {
            Tracing.trace("Failed to acquire locks for counter mutation for longer than {} millis, giving up", Long.valueOf(DatabaseDescriptor.getCounterWriteRpcTimeout()));
            return new WriteTimeoutException(WriteType.COUNTER, consistency(), 0, consistency().blockFor(Keyspace.open(getKeyspaceName())));
        });
    }

    private SortedMap<Long, ExecutableLock> getLocks() {
        TreeMap treeMap = new TreeMap(Comparator.naturalOrder());
        for (PartitionUpdate partitionUpdate : getPartitionUpdates()) {
            Iterator<Row> it2 = partitionUpdate.iterator();
            while (it2.hasNext()) {
                Row next = it2.next();
                Iterator<ColumnData> it3 = next.iterator();
                while (it3.hasNext()) {
                    Pair<Long, ExecutableLock> computeIfAbsent = LOCKS.computeIfAbsent(SEMAPHORES_STRIPED.get(Integer.valueOf(Objects.hash(partitionUpdate.metadata().id, key(), next.clustering(), it3.next().column()))), semaphore -> {
                        return Pair.create(Long.valueOf(LOCK_ID_GEN.incrementAndGet()), new ExecutableLock(semaphore));
                    });
                    treeMap.put(computeIfAbsent.left, computeIfAbsent.right);
                }
            }
        }
        return treeMap;
    }

    private Single<PartitionUpdate> processModifications(PartitionUpdate partitionUpdate) {
        ColumnFamilyStore columnFamilyStore = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(partitionUpdate.metadata().id);
        List<PartitionUpdate.CounterMark> collectCounterMarks = partitionUpdate.collectCounterMarks();
        if (CacheService.instance.counterCache.getCapacity() != 0) {
            Tracing.trace("Fetching {} counter values from cache", Integer.valueOf(collectCounterMarks.size()));
            updateWithCurrentValuesFromCache(collectCounterMarks, columnFamilyStore);
            if (collectCounterMarks.isEmpty()) {
                return Single.just(partitionUpdate);
            }
        }
        Tracing.trace("Reading {} counter values from the CF", Integer.valueOf(collectCounterMarks.size()));
        return updateWithCurrentValuesFromCFS(collectCounterMarks, columnFamilyStore).andThen(Single.fromCallable(() -> {
            Iterator it2 = collectCounterMarks.iterator();
            while (it2.hasNext()) {
                updateWithCurrentValue((PartitionUpdate.CounterMark) it2.next(), ClockAndCount.BLANK, columnFamilyStore);
            }
            return partitionUpdate;
        }));
    }

    private void updateWithCurrentValue(PartitionUpdate.CounterMark counterMark, ClockAndCount clockAndCount, ColumnFamilyStore columnFamilyStore) {
        long max = Math.max(ApolloTime.systemClockMicros(), clockAndCount.clock + 1);
        long j = clockAndCount.count + CounterContext.instance().total(counterMark.value());
        counterMark.setValue(CounterContext.instance().createGlobal(CounterId.getLocalId(), max, j));
        columnFamilyStore.putCachedCounter(key().getKey(), counterMark.clustering(), counterMark.column(), counterMark.path(), ClockAndCount.create(max, j));
    }

    private void updateWithCurrentValuesFromCache(List<PartitionUpdate.CounterMark> list, ColumnFamilyStore columnFamilyStore) {
        Iterator<PartitionUpdate.CounterMark> it2 = list.iterator();
        while (it2.hasNext()) {
            PartitionUpdate.CounterMark next = it2.next();
            ClockAndCount cachedCounter = columnFamilyStore.getCachedCounter(key().getKey(), next.clustering(), next.column(), next.path());
            if (cachedCounter != null) {
                updateWithCurrentValue(next, cachedCounter, columnFamilyStore);
                it2.remove();
            }
        }
    }

    private Completable updateWithCurrentValuesFromCFS(List<PartitionUpdate.CounterMark> list, ColumnFamilyStore columnFamilyStore) {
        ColumnFilter.Builder selectionBuilder = ColumnFilter.selectionBuilder();
        BTreeSet.Builder builder = BTreeSet.builder(columnFamilyStore.metadata().comparator);
        for (PartitionUpdate.CounterMark counterMark : list) {
            if (counterMark.clustering() != Clustering.STATIC_CLUSTERING) {
                builder.add(counterMark.clustering());
            }
            if (counterMark.path() == null) {
                selectionBuilder.add(counterMark.column());
            } else {
                selectionBuilder.select(counterMark.column(), counterMark.path());
            }
        }
        int systemClockSecondsAsInt = ApolloTime.systemClockSecondsAsInt();
        SinglePartitionReadCommand create = SinglePartitionReadCommand.create(columnFamilyStore.metadata(), systemClockSecondsAsInt, key(), selectionBuilder.build(), new ClusteringIndexNamesFilter(builder.build(), false));
        PeekingIterator peekingIterator = Iterators.peekingIterator(list.iterator());
        return Completable.using(() -> {
            return create.executionController();
        }, readExecutionController -> {
            return create.deferredQuery(columnFamilyStore, readExecutionController).flatMapCompletable(flowableUnfilteredPartition -> {
                FlowablePartition filter = FlowablePartitions.filter(flowableUnfilteredPartition, systemClockSecondsAsInt);
                updateForRow(peekingIterator, filter.staticRow(), columnFamilyStore);
                return filter.content().takeWhile(row -> {
                    return peekingIterator.hasNext();
                }).processToRxCompletable(row2 -> {
                    updateForRow(peekingIterator, row2, columnFamilyStore);
                });
            });
        }, readExecutionController2 -> {
            readExecutionController2.close();
        });
    }

    private int compare(Clustering clustering, Clustering clustering2, ColumnFamilyStore columnFamilyStore) {
        if (clustering == Clustering.STATIC_CLUSTERING) {
            return clustering2 == Clustering.STATIC_CLUSTERING ? 0 : -1;
        }
        if (clustering2 == Clustering.STATIC_CLUSTERING) {
            return 1;
        }
        return columnFamilyStore.getComparator().compare(clustering, clustering2);
    }

    private void updateForRow(PeekingIterator<PartitionUpdate.CounterMark> peekingIterator, Row row, ColumnFamilyStore columnFamilyStore) {
        int i = 0;
        while (peekingIterator.hasNext()) {
            int compare = compare(peekingIterator.peek().clustering(), row.clustering(), columnFamilyStore);
            i = compare;
            if (compare >= 0) {
                break;
            } else {
                peekingIterator.next();
            }
        }
        if (peekingIterator.hasNext()) {
            while (i == 0) {
                PartitionUpdate.CounterMark next = peekingIterator.next();
                Cell cell = next.path() == null ? row.getCell(next.column()) : row.getCell(next.column(), next.path());
                if (cell != null) {
                    updateWithCurrentValue(next, CounterContext.instance().getLocalClockAndCount(cell.value()), columnFamilyStore);
                    peekingIterator.remove();
                }
                if (!peekingIterator.hasNext()) {
                    return;
                } else {
                    i = compare(peekingIterator.peek().clustering(), row.clustering(), columnFamilyStore);
                }
            }
        }
    }

    @Override // org.apache.cassandra.db.IMutation
    public long getTimeout() {
        return DatabaseDescriptor.getCounterWriteRpcTimeout();
    }

    public String toString() {
        return toString(false);
    }

    @Override // org.apache.cassandra.db.IMutation
    public String toString(boolean z) {
        return String.format("CounterMutation(%s, %s)", this.mutation.toString(z), this.consistency);
    }
}
