package com.bazaarvoice.emodb.sor.db.astyanax;

import com.bazaarvoice.emodb.common.api.Ttls;
import com.bazaarvoice.emodb.common.cassandra.CassandraKeyspace;
import com.bazaarvoice.emodb.sor.api.Audit;
import com.bazaarvoice.emodb.sor.api.AuditBuilder;
import com.bazaarvoice.emodb.sor.api.AuditSizeLimitException;
import com.bazaarvoice.emodb.sor.api.Compaction;
import com.bazaarvoice.emodb.sor.api.DeltaSizeLimitException;
import com.bazaarvoice.emodb.sor.api.History;
import com.bazaarvoice.emodb.sor.api.ReadConsistency;
import com.bazaarvoice.emodb.sor.api.WriteConsistency;
import com.bazaarvoice.emodb.sor.core.AuditStore;
import com.bazaarvoice.emodb.sor.db.DataWriterDAO;
import com.bazaarvoice.emodb.sor.db.RecordUpdate;
import com.bazaarvoice.emodb.sor.delta.Delta;
import com.bazaarvoice.emodb.sor.delta.Literal;
import com.bazaarvoice.emodb.sor.delta.MapDelta;
import com.bazaarvoice.emodb.table.db.Table;
import com.bazaarvoice.emodb.table.db.astyanax.AstyanaxStorage;
import com.bazaarvoice.emodb.table.db.astyanax.AstyanaxTable;
import com.bazaarvoice.emodb.table.db.astyanax.DataPurgeDAO;
import com.bazaarvoice.emodb.table.db.astyanax.FullConsistencyTimeProvider;
import com.bazaarvoice.emodb.table.db.consistency.HintsConsistencyTimeProvider;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.annotation.Timed;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingOutputStream;
import com.google.inject.Inject;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.Execution;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.AbstractThriftMutationBatchImpl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.cassandra.cql3.statements.CFPropDefs;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransportException;

/* loaded from: input_file:com/bazaarvoice/emodb/sor/db/astyanax/AstyanaxDataWriterDAO.class */
public class AstyanaxDataWriterDAO implements DataWriterDAO, DataPurgeDAO {
    private static final int MAX_BATCH_SIZE = 100;
    private static final int MAX_PENDING_SIZE = 200;
    private static final int MAX_THRIFT_FRAMED_TRANSPORT_SIZE = 15728640;
    private static final int MAX_DELTA_SIZE = 10485760;
    private static final int MAX_AUDIT_SIZE = 1048576;
    private final AstyanaxDataReaderDAO _readerDao;
    private final ChangeEncoder _changeEncoder;
    private final Meter _updateMeter;
    private final Meter _oversizeUpdateMeter;
    private final FullConsistencyTimeProvider _fullConsistencyTimeProvider;
    private final HintsConsistencyTimeProvider _rawConsistencyTimeProvider;
    private final AuditStore _auditStore;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/sor/db/astyanax/AstyanaxDataWriterDAO$BatchKey.class */
    public static class BatchKey {
        private final DeltaPlacement _placement;
        private final ConsistencyLevel _consistency;

        BatchKey(DeltaPlacement deltaPlacement, WriteConsistency writeConsistency) {
            this._placement = deltaPlacement;
            this._consistency = SorConsistencies.toAstyanax(writeConsistency);
        }

        DeltaPlacement getPlacement() {
            return this._placement;
        }

        ConsistencyLevel getConsistency() {
            return this._consistency;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof BatchKey)) {
                return false;
            }
            BatchKey batchKey = (BatchKey) obj;
            return this._consistency == batchKey.getConsistency() && this._placement.equals(batchKey.getPlacement());
        }

        public int hashCode() {
            return Objects.hashCode(this._placement, this._consistency);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/sor/db/astyanax/AstyanaxDataWriterDAO$BatchUpdate.class */
    public static class BatchUpdate {
        private final AstyanaxStorage _storage;
        private final RecordUpdate _update;

        BatchUpdate(AstyanaxStorage astyanaxStorage, RecordUpdate recordUpdate) {
            this._storage = astyanaxStorage;
            this._update = recordUpdate;
        }

        AstyanaxStorage getStorage() {
            return this._storage;
        }

        RecordUpdate getUpdate() {
            return this._update;
        }
    }

    @Inject
    public AstyanaxDataWriterDAO(AstyanaxDataReaderDAO astyanaxDataReaderDAO, FullConsistencyTimeProvider fullConsistencyTimeProvider, AuditStore auditStore, HintsConsistencyTimeProvider hintsConsistencyTimeProvider, ChangeEncoder changeEncoder, MetricRegistry metricRegistry) {
        this._readerDao = (AstyanaxDataReaderDAO) Preconditions.checkNotNull(astyanaxDataReaderDAO, "readerDao");
        this._fullConsistencyTimeProvider = (FullConsistencyTimeProvider) Preconditions.checkNotNull(fullConsistencyTimeProvider, "fullConsistencyTimeProvider");
        this._rawConsistencyTimeProvider = (HintsConsistencyTimeProvider) Preconditions.checkNotNull(hintsConsistencyTimeProvider, "rawConsistencyTimeProvider");
        this._auditStore = (AuditStore) Preconditions.checkNotNull(auditStore, "auditStore");
        this._changeEncoder = (ChangeEncoder) Preconditions.checkNotNull(changeEncoder, "changeEncoder");
        this._updateMeter = metricRegistry.meter(getMetricName("updates"));
        this._oversizeUpdateMeter = metricRegistry.meter(getMetricName("oversizeUpdates"));
    }

    private String getMetricName(String str) {
        return MetricRegistry.name("bv.emodb.sor", "AstyanaxDataWriterDAO", str);
    }

    @Override // com.bazaarvoice.emodb.sor.db.DataWriterDAO
    public long getFullConsistencyTimestamp(Table table) {
        return getFullConsistencyTimestamp((AstyanaxTable) table, this._fullConsistencyTimeProvider);
    }

    @Override // com.bazaarvoice.emodb.sor.db.DataWriterDAO
    public long getRawConsistencyTimestamp(Table table) {
        return getFullConsistencyTimestamp((AstyanaxTable) table, this._rawConsistencyTimeProvider);
    }

    private long getFullConsistencyTimestamp(AstyanaxTable astyanaxTable, FullConsistencyTimeProvider fullConsistencyTimeProvider) {
        return fullConsistencyTimeProvider.getMaxTimeStamp(((DeltaPlacement) astyanaxTable.getReadStorage().getPlacement()).getKeyspace().getClusterName());
    }

    @Override // com.bazaarvoice.emodb.sor.db.DataWriterDAO
    @Timed(name = "bv.emodb.sor.AstyanaxDataWriterDAO.updateAll", absolute = true)
    public void updateAll(Iterator<RecordUpdate> it2, DataWriterDAO.UpdateListener updateListener) {
        LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap();
        int i = 0;
        while (it2.hasNext()) {
            RecordUpdate next = it2.next();
            for (AstyanaxStorage astyanaxStorage : ((AstyanaxTable) next.getTable()).getWriteStorage()) {
                BatchKey batchKey = new BatchKey((DeltaPlacement) astyanaxStorage.getPlacement(), next.getConsistency());
                List<BatchUpdate> list = newLinkedHashMap.get(batchKey);
                if (list == null) {
                    ArrayList newArrayList = Lists.newArrayList();
                    list = newArrayList;
                    newLinkedHashMap.put(batchKey, newArrayList);
                }
                list.add(new BatchUpdate(astyanaxStorage, next));
                i++;
                if (list.size() >= 100 || i >= 200) {
                    writeAll(newLinkedHashMap, updateListener);
                    newLinkedHashMap.clear();
                    i = 0;
                }
            }
        }
        writeAll(newLinkedHashMap, updateListener);
    }

    private void writeAll(Map<BatchKey, List<BatchUpdate>> map, DataWriterDAO.UpdateListener updateListener) {
        for (Map.Entry<BatchKey, List<BatchUpdate>> entry : map.entrySet()) {
            write(entry.getKey(), entry.getValue(), updateListener);
        }
    }

    private void write(BatchKey batchKey, List<BatchUpdate> list, DataWriterDAO.UpdateListener updateListener) {
        updateListener.beforeWrite(Collections2.transform(list, new Function<BatchUpdate, RecordUpdate>() { // from class: com.bazaarvoice.emodb.sor.db.astyanax.AstyanaxDataWriterDAO.1
            @Override // com.google.common.base.Function
            public RecordUpdate apply(BatchUpdate batchUpdate) {
                return batchUpdate.getUpdate();
            }
        }));
        DeltaPlacement placement = batchKey.getPlacement();
        MutationBatch prepareMutationBatch = placement.getKeyspace().prepareMutationBatch(batchKey.getConsistency());
        int i = 0;
        int i2 = 0;
        for (BatchUpdate batchUpdate : list) {
            AstyanaxStorage storage = batchUpdate.getStorage();
            RecordUpdate update = batchUpdate.getUpdate();
            ByteBuffer rowKey = storage.getRowKey(update.getKey());
            Delta delta = update.getDelta();
            String delta2 = delta.toString();
            Set<String> tags = update.getTags();
            EnumSet<ChangeFlag> noneOf = EnumSet.noneOf(ChangeFlag.class);
            if (delta.isConstant()) {
                noneOf.add(ChangeFlag.CONSTANT_DELTA);
            }
            if ((delta instanceof MapDelta) || ((delta instanceof Literal) && (((Literal) delta).getValue() instanceof Map))) {
                noneOf.add(ChangeFlag.MAP_DELTA);
            }
            Audit build = AuditBuilder.from(update.getAudit()).set("~sha1", Hashing.sha1().hashUnencodedChars(delta2).toString()).set("~tags", tags).build();
            ByteBuffer stringToByteBuffer = stringToByteBuffer(this._changeEncoder.encodeDelta(delta2, noneOf, tags));
            ByteBuffer stringToByteBuffer2 = stringToByteBuffer(this._changeEncoder.encodeAudit(build));
            int remaining = stringToByteBuffer.remaining();
            int remaining2 = stringToByteBuffer2.remaining();
            UUID changeId = update.getChangeId();
            if (remaining > 10485760) {
                this._oversizeUpdateMeter.mark();
                throw new DeltaSizeLimitException("Delta exceeds size limit of 10485760: " + remaining, remaining);
            }
            if (remaining2 > 1048576) {
                this._oversizeUpdateMeter.mark();
                throw new AuditSizeLimitException("Audit exceeds size limit of 1048576: " + remaining2, remaining2);
            }
            if (!prepareMutationBatch.isEmpty() && i + remaining + remaining2 > 11534336) {
                MutationBatch prepareMutationBatch2 = placement.getKeyspace().prepareMutationBatch(batchKey.getConsistency());
                prepareMutationBatch2.mergeShallow(prepareMutationBatch);
                prepareMutationBatch2.withRow(placement.getDeltaColumnFamily(), rowKey).putColumn((ColumnListMutation) changeId, stringToByteBuffer, (Integer) null);
                prepareMutationBatch2.withRow(placement.getAuditColumnFamily(), rowKey).putColumn((ColumnListMutation) changeId, stringToByteBuffer2, (Integer) null);
                if (getMutationBatchSize(prepareMutationBatch2) >= 15728640) {
                    execute(prepareMutationBatch, "batch update %d records in placement %s", Integer.valueOf(i2), placement.getName());
                    i = 0;
                    i2 = 0;
                }
            }
            prepareMutationBatch.withRow(placement.getDeltaColumnFamily(), rowKey).putColumn((ColumnListMutation) changeId, stringToByteBuffer, (Integer) null);
            prepareMutationBatch.withRow(placement.getAuditColumnFamily(), rowKey).putColumn((ColumnListMutation) changeId, stringToByteBuffer2, (Integer) null);
            i += remaining + remaining2;
            i2++;
        }
        execute(prepareMutationBatch, "batch update %d records in placement %s", Integer.valueOf(i2), placement.getName());
        this._updateMeter.mark(list.size());
    }

    private ByteBuffer stringToByteBuffer(String str) {
        return StringSerializer.get().toByteBuffer(str);
    }

    @Override // com.bazaarvoice.emodb.sor.db.DataWriterDAO
    @Timed(name = "bv.emodb.sor.AstyanaxDataWriterDAO.compact", absolute = true)
    public void compact(Table table, String str, UUID uuid, Compaction compaction, UUID uuid2, Delta delta, Collection<UUID> collection, List<History> list, WriteConsistency writeConsistency) {
        Preconditions.checkNotNull(table, "table");
        Preconditions.checkNotNull(str, "key");
        Preconditions.checkNotNull(uuid, "compactionKey");
        Preconditions.checkNotNull(compaction, CFPropDefs.KW_COMPACTION);
        Preconditions.checkNotNull(uuid2, "changeId");
        Preconditions.checkNotNull(delta, "delta");
        Preconditions.checkNotNull(collection, "changesToDelete");
        Preconditions.checkNotNull(writeConsistency, "consistency");
        for (AstyanaxStorage astyanaxStorage : ((AstyanaxTable) table).getWriteStorage()) {
            DeltaPlacement deltaPlacement = (DeltaPlacement) astyanaxStorage.getPlacement();
            CassandraKeyspace keyspace = deltaPlacement.getKeyspace();
            ByteBuffer rowKey = astyanaxStorage.getRowKey(str);
            writeCompaction(rowKey, uuid, compaction, writeConsistency, deltaPlacement, keyspace, table, str);
            deleteCompactedDeltas(rowKey, writeConsistency, deltaPlacement, keyspace, collection, list, table, str);
        }
    }

    private void writeCompaction(ByteBuffer byteBuffer, UUID uuid, Compaction compaction, WriteConsistency writeConsistency, DeltaPlacement deltaPlacement, CassandraKeyspace cassandraKeyspace, Table table, String str) {
        MutationBatch prepareMutationBatch = cassandraKeyspace.prepareMutationBatch(SorConsistencies.toAstyanax(writeConsistency));
        prepareMutationBatch.withRow(deltaPlacement.getDeltaColumnFamily(), byteBuffer).putColumn((ColumnListMutation) uuid, this._changeEncoder.encodeCompaction(compaction), (Integer) null);
        execute(prepareMutationBatch, "compact placement %s, table %s, key %s", deltaPlacement.getName(), table.getName(), str);
    }

    private void deleteCompactedDeltas(ByteBuffer byteBuffer, WriteConsistency writeConsistency, DeltaPlacement deltaPlacement, CassandraKeyspace cassandraKeyspace, Collection<UUID> collection, List<History> list, Table table, String str) {
        MutationBatch prepareMutationBatch = cassandraKeyspace.prepareMutationBatch(SorConsistencies.toAstyanax(writeConsistency));
        ColumnListMutation withRow = prepareMutationBatch.withRow(deltaPlacement.getDeltaColumnFamily(), byteBuffer);
        Iterator<UUID> it2 = collection.iterator();
        while (it2.hasNext()) {
            withRow.deleteColumn(it2.next());
        }
        if (list != null && !list.isEmpty()) {
            this._auditStore.putDeltaAudits(byteBuffer, list, AstyanaxAuditBatchPersister.build(prepareMutationBatch, deltaPlacement.getDeltaHistoryColumnFamily(), this._changeEncoder, this._auditStore));
        }
        execute(prepareMutationBatch, "compact placement %s, table %s, key %s", deltaPlacement.getName(), table.getName(), str);
    }

    @Override // com.bazaarvoice.emodb.sor.db.DataWriterDAO
    @Timed(name = "bv.emodb.sorAstyanaxDataWriterDAO.storeCompactedDeltas", absolute = true)
    public void storeCompactedDeltas(Table table, String str, List<History> list, WriteConsistency writeConsistency) {
        Preconditions.checkNotNull(table, "table");
        Preconditions.checkNotNull(str, "key");
        Preconditions.checkNotNull(list, "audits");
        Preconditions.checkNotNull(writeConsistency, "consistency");
        AstyanaxTable astyanaxTable = (AstyanaxTable) table;
        for (AstyanaxStorage astyanaxStorage : astyanaxTable.getWriteStorage()) {
            DeltaPlacement deltaPlacement = (DeltaPlacement) astyanaxStorage.getPlacement();
            CassandraKeyspace keyspace = deltaPlacement.getKeyspace();
            ByteBuffer rowKey = astyanaxStorage.getRowKey(str);
            MutationBatch prepareMutationBatch = keyspace.prepareMutationBatch(SorConsistencies.toAstyanax(writeConsistency));
            ColumnListMutation withRow = prepareMutationBatch.withRow(deltaPlacement.getDeltaHistoryColumnFamily(), rowKey);
            for (History history : list) {
                withRow.putColumn((ColumnListMutation) history.getChangeId(), this._changeEncoder.encodeHistory(history), Ttls.toSeconds(this._auditStore.getHistoryTtl(), 1, null));
            }
            execute(prepareMutationBatch, "store %d compacted deltas for placement %s, table %s, key %s", Integer.valueOf(list.size()), deltaPlacement.getName(), astyanaxTable.getName(), str);
        }
    }

    @Override // com.bazaarvoice.emodb.sor.db.DataWriterDAO
    @Timed(name = "bv.emodb.sor.AstyanaxDataWriterDAO.purgeUnsafe", absolute = true)
    public void purgeUnsafe(Table table) {
        Preconditions.checkNotNull(table, "table");
        Iterator<AstyanaxStorage> it2 = ((AstyanaxTable) table).getWriteStorage().iterator();
        while (it2.hasNext()) {
            purge(it2.next(), noop());
        }
    }

    @Override // com.bazaarvoice.emodb.table.db.astyanax.DataPurgeDAO
    public void purge(AstyanaxStorage astyanaxStorage, Runnable runnable) {
        DeltaPlacement deltaPlacement = (DeltaPlacement) astyanaxStorage.getPlacement();
        MutationBatch prepareMutationBatch = deltaPlacement.getKeyspace().prepareMutationBatch(SorConsistencies.toAstyanax(WriteConsistency.STRONG));
        Iterator<String> scanKeys = this._readerDao.scanKeys(astyanaxStorage, ReadConsistency.STRONG);
        while (scanKeys.hasNext()) {
            ByteBuffer rowKey = astyanaxStorage.getRowKey(scanKeys.next());
            prepareMutationBatch.withRow(deltaPlacement.getDeltaColumnFamily(), rowKey).delete();
            prepareMutationBatch.withRow(deltaPlacement.getAuditColumnFamily(), rowKey).delete();
            if (prepareMutationBatch.getRowCount() >= 100) {
                runnable.run();
                execute(prepareMutationBatch, "purge %d records from placement %s", Integer.valueOf(prepareMutationBatch.getRowCount()), deltaPlacement.getName());
                prepareMutationBatch.discardMutations();
            }
        }
        if (prepareMutationBatch.isEmpty()) {
            return;
        }
        runnable.run();
        execute(prepareMutationBatch, "purge %d records from placement %s", Integer.valueOf(prepareMutationBatch.getRowCount()), deltaPlacement.getName());
    }

    private <R> R execute(Execution<R> execution, String str, Object... objArr) {
        try {
            return execution.execute().getResult();
        } catch (ConnectionException e) {
            String format = String.format(str, objArr);
            if (isThriftFramedTransportSizeOverrun(execution, e)) {
                throw new ThriftFramedTransportSizeException("Thrift request to large to " + format, e);
            }
            throw new RuntimeException("Failed to " + format, e);
        }
    }

    private boolean isThriftFramedTransportSizeOverrun(Execution<?> execution, ConnectionException connectionException) {
        Optional tryFind = Iterables.tryFind(Throwables.getCausalChain(connectionException), Predicates.instanceOf(TTransportException.class));
        return tryFind.isPresent() && ((TTransportException) tryFind.get()).getType() == 0 && (execution instanceof MutationBatch) && getMutationBatchSize((MutationBatch) execution) >= 15728640;
    }

    private int getMutationBatchSize(MutationBatch mutationBatch) {
        if (!$assertionsDisabled && !(mutationBatch instanceof AbstractThriftMutationBatchImpl)) {
            throw new AssertionError("MutationBatch is not an instance of AbstractThriftMutationBatchImpl");
        }
        try {
            CountingOutputStream countingOutputStream = new CountingOutputStream(ByteStreams.nullOutputStream());
            Throwable th = null;
            try {
                try {
                    TIOStreamTransport tIOStreamTransport = new TIOStreamTransport(countingOutputStream);
                    Cassandra.batch_mutate_args batch_mutate_argsVar = new Cassandra.batch_mutate_args();
                    batch_mutate_argsVar.setMutation_map(((AbstractThriftMutationBatchImpl) mutationBatch).getMutationMap());
                    batch_mutate_argsVar.write(new TBinaryProtocol(tIOStreamTransport));
                    int count = (int) countingOutputStream.getCount();
                    if (countingOutputStream != null) {
                        if (0 != 0) {
                            try {
                                countingOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            countingOutputStream.close();
                        }
                    }
                    return count;
                } finally {
                }
            } catch (Throwable th3) {
                if (countingOutputStream != null) {
                    if (th != null) {
                        try {
                            countingOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        countingOutputStream.close();
                    }
                }
                throw th3;
            }
        } catch (IOException | TException e) {
            throw Throwables.propagate(e);
        }
    }

    private Runnable noop() {
        return new Runnable() { // from class: com.bazaarvoice.emodb.sor.db.astyanax.AstyanaxDataWriterDAO.2
            @Override // java.lang.Runnable
            public void run() {
            }
        };
    }

    static {
        $assertionsDisabled = !AstyanaxDataWriterDAO.class.desiredAssertionStatus();
    }
}
