package com.datastax.bdp.db.audit;

import com.datastax.bdp.db.upgrade.SchemaUpgrade;
import com.datastax.bdp.db.upgrade.VersionDependentFeature;
import com.datastax.bdp.db.util.ProductVersion;
import com.datastax.dse.byos.shade.com.google.common.annotations.VisibleForTesting;
import io.reactivex.Completable;
import io.reactivex.Single;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.concurrent.TPC;
import org.apache.cassandra.concurrent.TPCScheduler;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Attributes;
import org.apache.cassandra.cql3.BatchQueryOptions;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.TimestampType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UnmodifiableArrayList;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.time.ApolloTime;
import org.jctools.queues.MpmcArrayQueue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/bdp/db/audit/CassandraAuditWriter.class */
public class CassandraAuditWriter implements IAuditWriter {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CassandraAuditWriter.class);
    private static final String DROPPED_EVENT_LOGGER = "DroppedAuditEventLogger";
    private static final Logger droppedEventLogger = LoggerFactory.getLogger(DROPPED_EVENT_LOGGER);
    static int NUM_FIELDS = 14;
    private static final ByteBuffer nodeIp = ByteBufferUtil.bytes(FBUtilities.getBroadcastAddress());
    private long startupTime;
    private final int retentionPeriod;
    private final Queue<EventBindings> eventQueue;
    private final ConsistencyLevel writeConsistency;

    @VisibleForTesting
    final BatchingOptions batchingOptions;
    private final AtomicReference<State> state;
    private volatile EventPartition lastPartition;

    @VisibleForTesting
    final VersionDependentFeature<VersionDependent> feature;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/datastax/bdp/db/audit/CassandraAuditWriter$BatchingOptions.class */
    public static class BatchingOptions {
        static BatchingOptions SYNC = new BatchingOptions(true, 1, 0, 0);
        public final boolean synchronous;
        public final int batchSize;
        public final int flushPeriodInMillis;
        public final int queueSize;

        public BatchingOptions(int i, int i2, int i3) {
            this(false, i, i2, i3);
        }

        private BatchingOptions(boolean z, int i, int i2, int i3) {
            this.synchronous = z;
            this.batchSize = i;
            this.flushPeriodInMillis = i2;
            this.queueSize = i3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datastax/bdp/db/audit/CassandraAuditWriter$EventBatch.class */
    public class EventBatch {
        final EventPartition partition;
        final List<ModificationStatement> modifications = new ArrayList();
        final List<List<ByteBuffer>> values = new ArrayList();
        final List<AuditableEvent> events = new ArrayList();

        public EventBatch(EventPartition eventPartition) {
            this.partition = eventPartition;
        }

        void addEvent(EventBindings eventBindings) {
            this.modifications.add(eventBindings.stmt);
            this.values.add(eventBindings.bindings);
            this.events.add(eventBindings.event);
        }

        int size() {
            return this.modifications.size();
        }

        void execute() {
            BatchStatement batchStatement = new BatchStatement(0, BatchStatement.Type.UNLOGGED, this.modifications, Attributes.none());
            CassandraAuditWriter.this.lastPartition = this.partition;
            try {
                executeBatch(batchStatement, CassandraAuditWriter.this.writeConsistency, this.values).doOnError(th -> {
                    logDroppedEvents(th);
                }).subscribe();
            } catch (RequestExecutionException | RequestValidationException e) {
                logDroppedEvents(e);
            }
        }

        private void logDroppedEvents(Throwable th) {
            CassandraAuditWriter.logger.error("Exception writing audit events to table", th);
            Iterator<AuditableEvent> it2 = this.events.iterator();
            while (it2.hasNext()) {
                CassandraAuditWriter.droppedEventLogger.warn(it2.next().toString());
            }
        }

        private Single<ResultMessage> executeBatch(BatchStatement batchStatement, ConsistencyLevel consistencyLevel, List<List<ByteBuffer>> list) {
            return QueryProcessor.instance.processBatch(batchStatement, QueryState.forInternalCalls(), BatchQueryOptions.withPerStatementVariables(QueryOptions.forInternalCalls(consistencyLevel, UnmodifiableArrayList.emptyList()), list, UnmodifiableArrayList.emptyList()), ApolloTime.approximateNanoTime());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/bdp/db/audit/CassandraAuditWriter$EventBindings.class */
    public class EventBindings {
        private final ModificationStatement stmt;
        private final AuditableEvent event;
        private final EventPartition partition;
        private final List<ByteBuffer> bindings;
        static final /* synthetic */ boolean $assertionsDisabled;

        private EventBindings(ModificationStatement modificationStatement, AuditableEvent auditableEvent, VersionDependent versionDependent) {
            if (!$assertionsDisabled && auditableEvent == null) {
                throw new AssertionError();
            }
            this.stmt = modificationStatement;
            this.event = auditableEvent;
            this.partition = new EventPartition(auditableEvent);
            this.bindings = versionDependent.newEventBindings(auditableEvent, this.partition);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public QueryOptions getBindings() {
            return QueryOptions.forInternalCalls(CassandraAuditWriter.this.writeConsistency, this.bindings);
        }

        static {
            $assertionsDisabled = !CassandraAuditWriter.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/bdp/db/audit/CassandraAuditWriter$EventPartition.class */
    public static class EventPartition {
        private final Date date;
        private final int dayPartition;

        private EventPartition(AuditableEvent auditableEvent) {
            DateTime dateTime = new DateTime(auditableEvent.getTimestamp(), DateTimeZone.UTC);
            this.date = dateTime.toDateMidnight().toDate();
            this.dayPartition = dateTime.getHourOfDay() * 60 * 60;
        }

        public DecoratedKey getPartitionKey(IPartitioner iPartitioner) {
            return iPartitioner.decorateKey(CompositeType.build(TimestampType.instance.decompose(this.date), CassandraAuditWriter.nodeIp, ByteBufferUtil.bytes(this.dayPartition)));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            EventPartition eventPartition = (EventPartition) obj;
            return this.dayPartition == eventPartition.dayPartition && this.date.equals(eventPartition.date);
        }

        public int hashCode() {
            return (31 * this.date.hashCode()) + this.dayPartition;
        }

        public String toString() {
            return "EventPartition{date=" + this.date + ", dayPartition=" + this.dayPartition + '}';
        }
    }

    /* loaded from: input_file:com/datastax/bdp/db/audit/CassandraAuditWriter$Legacy.class */
    private final class Legacy extends VersionDependent {
        private Legacy() {
            super();
        }

        @Override // com.datastax.bdp.db.audit.CassandraAuditWriter.VersionDependent
        String insertStatement() {
            return String.format("INSERT INTO %s.%s (date, node, day_partition, event_time, batch_id, category, keyspace_name, operation, source, table_name, type, username) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", CassandraAuditKeyspace.NAME, CassandraAuditKeyspace.AUDIT_LOG);
        }

        @Override // com.datastax.bdp.db.audit.CassandraAuditWriter.VersionDependent
        List<ByteBuffer> newEventBindings(AuditableEvent auditableEvent, EventPartition eventPartition) {
            List<ByteBuffer> addEventBindingsShared = addEventBindingsShared(auditableEvent, eventPartition);
            if (CassandraAuditWriter.this.retentionPeriod > 0) {
                addEventBindingsShared.add(ByteBufferUtil.bytes(CassandraAuditWriter.this.retentionPeriod));
            }
            return addEventBindingsShared;
        }
    }

    /* loaded from: input_file:com/datastax/bdp/db/audit/CassandraAuditWriter$Native.class */
    private final class Native extends VersionDependent {
        private Native() {
            super();
        }

        @Override // com.datastax.bdp.db.audit.CassandraAuditWriter.VersionDependent
        String insertStatement() {
            return String.format("INSERT INTO %s.%s (date, node, day_partition, event_time, batch_id, category, keyspace_name, operation, source, table_name, type, username,authenticated,consistency) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", CassandraAuditKeyspace.NAME, CassandraAuditKeyspace.AUDIT_LOG);
        }

        @Override // com.datastax.bdp.db.audit.CassandraAuditWriter.VersionDependent
        List<ByteBuffer> newEventBindings(AuditableEvent auditableEvent, EventPartition eventPartition) {
            List<ByteBuffer> addEventBindingsShared = addEventBindingsShared(auditableEvent, eventPartition);
            addEventBindingsShared.add(auditableEvent.getAuthenticated() == null ? ByteBufferUtil.UNSET_BYTE_BUFFER : ByteBufferUtil.bytes(auditableEvent.getAuthenticated()));
            addEventBindingsShared.add(auditableEvent.getConsistencyLevel() == null ? ByteBufferUtil.UNSET_BYTE_BUFFER : ByteBufferUtil.bytes(auditableEvent.getConsistencyLevel().toString()));
            if (CassandraAuditWriter.this.retentionPeriod > 0) {
                addEventBindingsShared.add(ByteBufferUtil.bytes(CassandraAuditWriter.this.retentionPeriod));
            }
            return addEventBindingsShared;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/bdp/db/audit/CassandraAuditWriter$State.class */
    public enum State {
        NOT_STARTED,
        STARTING,
        READY,
        STOPPING,
        STOPPED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/bdp/db/audit/CassandraAuditWriter$VersionDependent.class */
    public abstract class VersionDependent implements VersionDependentFeature.VersionDependent {
        protected ModificationStatement insertStatement;

        private VersionDependent() {
        }

        @Override // com.datastax.bdp.db.upgrade.VersionDependentFeature.VersionDependent
        public void initialize() {
            boolean z = CassandraAuditWriter.this.retentionPeriod > 0;
            String insertStatement = insertStatement();
            if (z) {
                insertStatement = insertStatement + " USING TTL ?";
            }
            this.insertStatement = (ModificationStatement) CassandraAuditWriter.prepareStatement(insertStatement);
        }

        abstract String insertStatement();

        final Completable recordEvent(AuditableEvent auditableEvent) {
            ModificationStatement modificationStatement = this.insertStatement;
            EventBindings eventBindings = new EventBindings(modificationStatement, auditableEvent, this);
            if (CassandraAuditWriter.this.isSynchronous()) {
                return modificationStatement.execute(QueryState.forInternalCalls(), eventBindings.getBindings(), ApolloTime.approximateNanoTime()).toCompletable();
            }
            if (!CassandraAuditWriter.this.eventQueue.add(eventBindings)) {
                CassandraAuditWriter.droppedEventLogger.warn(auditableEvent.toString());
            }
            return Completable.complete();
        }

        abstract List<ByteBuffer> newEventBindings(AuditableEvent auditableEvent, EventPartition eventPartition);

        List<ByteBuffer> addEventBindingsShared(AuditableEvent auditableEvent, EventPartition eventPartition) {
            ArrayList arrayList = new ArrayList(CassandraAuditWriter.NUM_FIELDS + (CassandraAuditWriter.this.retentionPeriod > 0 ? 1 : 0));
            arrayList.add(TimestampType.instance.decompose(eventPartition.date));
            arrayList.add(CassandraAuditWriter.nodeIp);
            arrayList.add(ByteBufferUtil.bytes(eventPartition.dayPartition));
            arrayList.add(ByteBufferUtil.bytes(auditableEvent.getUid()));
            UUID batchId = auditableEvent.getBatchId();
            arrayList.add(batchId != null ? ByteBufferUtil.bytes(batchId) : ByteBufferUtil.UNSET_BYTE_BUFFER);
            arrayList.add(ByteBufferUtil.bytes(auditableEvent.getType().getCategory().toString()));
            arrayList.add(auditableEvent.getKeyspace() != null ? ByteBufferUtil.bytes(auditableEvent.getKeyspace()) : ByteBufferUtil.UNSET_BYTE_BUFFER);
            String operation = auditableEvent.getOperation();
            arrayList.add(operation != null ? ByteBufferUtil.bytes(operation) : ByteBufferUtil.UNSET_BYTE_BUFFER);
            String source = auditableEvent.getSource();
            arrayList.add(source != null ? ByteBufferUtil.bytes(source) : ByteBufferUtil.UNSET_BYTE_BUFFER);
            String columnFamily = auditableEvent.getColumnFamily();
            arrayList.add(columnFamily != null ? ByteBufferUtil.bytes(columnFamily) : ByteBufferUtil.UNSET_BYTE_BUFFER);
            arrayList.add(ByteBufferUtil.bytes(auditableEvent.getType().toString()));
            arrayList.add(ByteBufferUtil.bytes(auditableEvent.getUser()));
            return arrayList;
        }
    }

    @Override // com.datastax.bdp.db.audit.IAuditWriter
    public boolean isSetUpComplete() {
        return this.state.get() == State.READY;
    }

    @Override // com.datastax.bdp.db.audit.IAuditWriter
    public void setUp() {
        if (this.state.compareAndSet(State.NOT_STARTED, State.STARTING)) {
            CassandraAuditKeyspace.maybeConfigure();
            this.feature.setup(Gossiper.instance.clusterVersionBarrier);
            this.startupTime = ApolloTime.systemClockMillis();
            if (!this.batchingOptions.synchronous) {
                scheduleNextFlush();
            }
            Runtime.getRuntime().addShutdownHook(new Thread(new WrappedRunnable() { // from class: com.datastax.bdp.db.audit.CassandraAuditWriter.1
                @Override // org.apache.cassandra.utils.WrappedRunnable
                protected void runMayThrow() {
                    CassandraAuditWriter.logger.debug("Shutting down CassandraAuditWritter");
                    CassandraAuditWriter.this.state.compareAndSet(State.READY, State.STOPPING);
                }
            }, "Audit shutdown"));
            this.state.set(State.READY);
        }
    }

    private void scheduleNextFlush() {
        if (this.state.compareAndSet(State.STOPPING, State.STOPPED)) {
            return;
        }
        long systemClockMillis = ApolloTime.systemClockMillis();
        flushTask().doOnError(th -> {
            logger.error("Audit logging batching task failed.", th);
        }).doFinally(() -> {
            scheduleNextFlush();
        }).delaySubscription(nextFlushAfter(systemClockMillis) - systemClockMillis, TimeUnit.MILLISECONDS, getTpcSchedulerForNextFlush()).subscribe();
    }

    private TPCScheduler getTpcSchedulerForNextFlush() {
        return this.lastPartition == null ? TPC.bestTPCScheduler() : TPC.getForKey(CassandraAuditKeyspace.getKeyspace(), this.lastPartition.getPartitionKey(CassandraAuditKeyspace.getAuditLogPartitioner()));
    }

    private long nextFlushAfter(long j) {
        return floor(j, this.startupTime, this.batchingOptions.flushPeriodInMillis) + this.batchingOptions.flushPeriodInMillis;
    }

    private static long floor(long j, long j2, int i) {
        return j - ((j - j2) % i);
    }

    private Single<Boolean> flushTask() {
        return Single.fromCallable(new Callable<Boolean>() { // from class: com.datastax.bdp.db.audit.CassandraAuditWriter.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                HashMap hashMap = new HashMap();
                int i = 0;
                do {
                    EventBindings eventBindings = (EventBindings) CassandraAuditWriter.this.eventQueue.poll();
                    if (eventBindings == null) {
                        break;
                    }
                    i++;
                    EventBatch eventBatch = (EventBatch) hashMap.get(eventBindings.partition);
                    if (eventBatch == null) {
                        eventBatch = new EventBatch(eventBindings.partition);
                        hashMap.put(eventBindings.partition, eventBatch);
                    }
                    eventBatch.addEvent(eventBindings);
                    if (eventBatch.size() >= CassandraAuditWriter.this.batchingOptions.batchSize) {
                        CassandraAuditWriter.this.executeBatches(UnmodifiableArrayList.of(eventBatch));
                        hashMap.remove(eventBindings.partition);
                    }
                } while (i < CassandraAuditWriter.this.batchingOptions.queueSize);
                CassandraAuditWriter.this.executeBatches(hashMap.values());
                return Boolean.TRUE;
            }
        });
    }

    @VisibleForTesting
    protected void executeBatches(Collection<EventBatch> collection) {
        Iterator<EventBatch> it2 = collection.iterator();
        while (it2.hasNext()) {
            it2.next().execute();
        }
    }

    public CassandraAuditWriter() {
        this(DatabaseDescriptor.getAuditLoggingOptions().retention_time, DatabaseDescriptor.getRawConfig().getAuditCassConsistencyLevel(), DatabaseDescriptor.getRawConfig().getAuditLoggerCassMode().equals("sync") ? BatchingOptions.SYNC : new BatchingOptions(DatabaseDescriptor.getRawConfig().getAuditCassBatchSize(), DatabaseDescriptor.getRawConfig().getAuditCassFlushTime(), DatabaseDescriptor.getRawConfig().getAuditLoggerCassAsyncQueueSize()));
    }

    @VisibleForTesting
    CassandraAuditWriter(int i, ConsistencyLevel consistencyLevel, BatchingOptions batchingOptions) {
        this.state = new AtomicReference<>(State.NOT_STARTED);
        this.feature = VersionDependentFeature.newSchemaUpgradeBuilder().withName("CassandraAuditWriter").withMinimumDseVersion(ProductVersion.DSE_VERSION_51).withRequireDSE(true).withLegacyImplementation(new Legacy()).withCurrentImplementation(new Native()).withSchemaUpgrade(new SchemaUpgrade(CassandraAuditKeyspace.metadata(), CassandraAuditKeyspace.tablesIfNotExist(), true)).withLogger(logger).withMessageActivating("Preparing upgrade to DSE 5.1/6.0 audit log entries.").withMessageActivated("Unlocking DSE 5.1/6.0 audit log entries.").withMessageDeactivated("Using DSE 5.0 compatible audit log entries - DSE 5.1 compatible audit log entries will be written when all nodes are on DSE 5.1 or newer and automatic schema upgrade has finished.").build();
        this.retentionPeriod = (int) TimeUnit.SECONDS.convert(i, TimeUnit.HOURS);
        this.writeConsistency = consistencyLevel;
        this.batchingOptions = batchingOptions;
        this.eventQueue = batchingOptions.synchronous ? null : batchingOptions.queueSize > 0 ? new MpmcArrayQueue<>(batchingOptions.queueSize) : new ConcurrentLinkedQueue<>();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static CQLStatement prepareStatement(String str) {
        try {
            return QueryProcessor.getStatement(str, QueryState.forInternalCalls()).statement;
        } catch (Exception e) {
            throw new IllegalStateException("Error preparing audit writer", e);
        }
    }

    @Override // com.datastax.bdp.db.audit.IAuditWriter
    public Completable recordEvent(AuditableEvent auditableEvent) {
        return this.feature.implementation().recordEvent(auditableEvent);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isSynchronous() {
        return this.batchingOptions.synchronous;
    }
}
