package com.datastax.bdp.cassandra.audit;

import com.datastax.bdp.cassandra.cql3.StatementUtils;
import com.datastax.bdp.config.DseConfig;
import com.datastax.bdp.ioc.DseInjector;
import com.datastax.bdp.system.CassandraAuditKeyspace;
import com.datastax.bdp.system.TimeSource;
import com.datastax.bdp.util.Addresses;
import com.datastax.bdp.util.QueryProcessorUtil;
import com.datastax.dse.byos.shade.com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.cql3.Attributes;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.BatchStatement;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.TimestampType;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.thrift.ThriftServer;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datastax/bdp/cassandra/audit/CassandraAuditWriter.class */
public class CassandraAuditWriter implements IAuditWriter {
    private static final Logger logger = LoggerFactory.getLogger(CassandraAuditWriter.class);
    private static final String DROPPED_EVENT_LOGGER = "DroppedAuditEventLogger";
    private static final Logger droppedEventLogger = LoggerFactory.getLogger(DROPPED_EVENT_LOGGER);
    private static int NUM_FIELDS = 13;
    private static final ByteBuffer nodeIp = ByteBufferUtil.bytes(Addresses.Internode.getBroadcastAddress());
    private final int retentionPeriod;
    private final ModificationStatement insertStatement;
    private final ExecutorService executorService;
    private final BlockingQueue<EventBindings> eventQueue;
    private final ConsistencyLevel writeConsistency;
    private final BatchingOptions batchingOptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/datastax/bdp/cassandra/audit/CassandraAuditWriter$BatchController.class */
    public interface BatchController {
        void reset();

        boolean flushPeriodExceeded();

        long getNextPollPeriod();

        int getBatchSize();
    }

    @VisibleForTesting
    /* loaded from: input_file:com/datastax/bdp/cassandra/audit/CassandraAuditWriter$BatchControllerFactory.class */
    interface BatchControllerFactory {
        BatchController newController();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/datastax/bdp/cassandra/audit/CassandraAuditWriter$BatchingOptions.class */
    public static class BatchingOptions {
        static BatchingOptions SYNC = new BatchingOptions();
        final BatchControllerFactory controllerFactory;
        final int queueSize;
        final int concurrentWriters;
        final boolean synchronous;

        private BatchingOptions() {
            this.queueSize = 0;
            this.concurrentWriters = 0;
            this.controllerFactory = null;
            this.synchronous = true;
        }

        BatchingOptions(int i, int i2, BatchControllerFactory batchControllerFactory) {
            this.queueSize = i;
            this.concurrentWriters = i2;
            this.controllerFactory = batchControllerFactory;
            this.synchronous = false;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:com/datastax/bdp/cassandra/audit/CassandraAuditWriter$DefaultBatchController.class */
    static class DefaultBatchController implements BatchController {
        final int flushPeriod;
        final int batchSize;
        private long start;
        private long lastCheckpoint;
        private final TimeSource timeSource;

        DefaultBatchController(int i, int i2) {
            this(i, i2, (TimeSource) DseInjector.get().getInstance(TimeSource.class));
        }

        DefaultBatchController(int i, int i2, TimeSource timeSource) {
            this.flushPeriod = i;
            this.batchSize = i2;
            this.timeSource = timeSource;
            reset();
        }

        @Override // com.datastax.bdp.cassandra.audit.CassandraAuditWriter.BatchController
        public void reset() {
            this.start = this.timeSource.currentTimeMillis();
            this.lastCheckpoint = this.start;
        }

        @Override // com.datastax.bdp.cassandra.audit.CassandraAuditWriter.BatchController
        public boolean flushPeriodExceeded() {
            this.lastCheckpoint = this.timeSource.currentTimeMillis();
            return this.lastCheckpoint - this.start > ((long) this.flushPeriod);
        }

        @Override // com.datastax.bdp.cassandra.audit.CassandraAuditWriter.BatchController
        public long getNextPollPeriod() {
            return this.flushPeriod - (this.lastCheckpoint - this.start);
        }

        @Override // com.datastax.bdp.cassandra.audit.CassandraAuditWriter.BatchController
        public int getBatchSize() {
            return this.batchSize;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datastax/bdp/cassandra/audit/CassandraAuditWriter$EventBatch.class */
    public class EventBatch {
        final EventPartition partition;
        final List<ModificationStatement> modifications = new LinkedList();
        final List<List<ByteBuffer>> values = new LinkedList();
        final List<AuditableEvent> events = new LinkedList();

        public EventBatch(EventPartition eventPartition) {
            this.partition = eventPartition;
        }

        void addEvent(EventBindings eventBindings) {
            this.modifications.add(CassandraAuditWriter.this.insertStatement);
            this.values.add(eventBindings.bindings);
            this.events.add(eventBindings.event);
        }

        void execute() {
            try {
                QueryProcessorUtil.processBatch(new BatchStatement(0, BatchStatement.Type.UNLOGGED, this.modifications, Attributes.none()), CassandraAuditWriter.this.writeConsistency, this.values);
            } catch (RequestExecutionException | RequestValidationException e) {
                CassandraAuditWriter.logger.error("Exception writing audit events to table", e);
                Iterator<AuditableEvent> it2 = this.events.iterator();
                while (it2.hasNext()) {
                    CassandraAuditWriter.droppedEventLogger.warn(it2.next().toString());
                }
            }
        }
    }

    /* loaded from: input_file:com/datastax/bdp/cassandra/audit/CassandraAuditWriter$EventBatcher.class */
    private class EventBatcher implements Runnable {
        private final BatchController controller;

        private EventBatcher(BatchController batchController) {
            this.controller = batchController;
        }

        private void consume() {
            while (!Thread.interrupted()) {
                boolean z = false;
                HashMap hashMap = new HashMap();
                int i = 0;
                this.controller.reset();
                while (true) {
                    if (Thread.interrupted()) {
                        z = true;
                        break;
                    }
                    if (i <= 0) {
                        this.controller.reset();
                    } else if (this.controller.flushPeriodExceeded()) {
                        break;
                    }
                    try {
                        EventBindings eventBindings = (EventBindings) CassandraAuditWriter.this.eventQueue.poll(this.controller.getNextPollPeriod(), TimeUnit.MILLISECONDS);
                        if (eventBindings != null) {
                            CassandraAuditWriter.logger.debug("batching event {} into partition {}", eventBindings, eventBindings.partition);
                            EventBatch eventBatch = (EventBatch) hashMap.get(eventBindings.partition);
                            if (eventBatch == null) {
                                eventBatch = new EventBatch(eventBindings.partition);
                                hashMap.put(eventBindings.partition, eventBatch);
                            }
                            eventBatch.addEvent(eventBindings);
                            i++;
                            if (i >= this.controller.getBatchSize()) {
                                break;
                            }
                        }
                    } catch (InterruptedException e) {
                        z = true;
                    }
                }
                if (i > 0) {
                    CassandraAuditWriter.this.executeBatches(hashMap.values());
                }
                if (z) {
                    break;
                }
            }
            Thread.currentThread().interrupt();
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    consume();
                    return;
                } catch (Throwable th) {
                    CassandraAuditWriter.logger.error(th.getMessage(), th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/bdp/cassandra/audit/CassandraAuditWriter$EventBindings.class */
    public class EventBindings {
        private final AuditableEvent event;
        private final EventPartition partition;
        private final List<ByteBuffer> bindings;
        static final /* synthetic */ boolean $assertionsDisabled;

        private EventBindings(AuditableEvent auditableEvent) {
            if (!$assertionsDisabled && auditableEvent == null) {
                throw new AssertionError();
            }
            this.event = auditableEvent;
            this.partition = new EventPartition(auditableEvent);
            this.bindings = new ArrayList(CassandraAuditWriter.NUM_FIELDS + (CassandraAuditWriter.this.retentionPeriod > 0 ? 1 : 0));
            this.bindings.add(TimestampType.instance.decompose(this.partition.date));
            this.bindings.add(CassandraAuditWriter.nodeIp);
            this.bindings.add(ByteBufferUtil.bytes(this.partition.dayPartition));
            this.bindings.add(ByteBufferUtil.bytes(auditableEvent.getUid()));
            UUID batchId = auditableEvent.getBatchId();
            this.bindings.add(batchId != null ? ByteBufferUtil.bytes(batchId) : null);
            this.bindings.add(ByteBufferUtil.bytes(auditableEvent.getType().getCategory().toString()));
            this.bindings.add(auditableEvent.getKeyspace() != null ? ByteBufferUtil.bytes(auditableEvent.getKeyspace()) : null);
            String operation = auditableEvent.getOperation();
            this.bindings.add(operation != null ? ByteBufferUtil.bytes(operation) : null);
            String source = auditableEvent.getSource();
            this.bindings.add(source != null ? ByteBufferUtil.bytes(source) : null);
            String columnFamily = auditableEvent.getColumnFamily();
            this.bindings.add(columnFamily != null ? ByteBufferUtil.bytes(columnFamily) : null);
            this.bindings.add(ByteBufferUtil.bytes(auditableEvent.getType().toString()));
            this.bindings.add(ByteBufferUtil.bytes(auditableEvent.getUser()));
            this.bindings.add(auditableEvent.getAuthenticated() == null ? null : ByteBufferUtil.bytes(auditableEvent.getAuthenticated()));
            this.bindings.add(auditableEvent.getConsistencyLevel() == null ? null : ByteBufferUtil.bytes(auditableEvent.getConsistencyLevel().toString()));
            if (CassandraAuditWriter.this.retentionPeriod > 0) {
                this.bindings.add(ByteBufferUtil.bytes(CassandraAuditWriter.this.retentionPeriod));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public QueryOptions getBindings() {
            return QueryOptions.forInternalCalls(CassandraAuditWriter.this.writeConsistency, this.bindings);
        }

        static {
            $assertionsDisabled = !CassandraAuditWriter.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/bdp/cassandra/audit/CassandraAuditWriter$EventPartition.class */
    public static class EventPartition {
        private final Date date;
        private final int dayPartition;

        private EventPartition(AuditableEvent auditableEvent) {
            DateTime dateTime = new DateTime(auditableEvent.getTimestamp(), DateTimeZone.UTC);
            this.date = dateTime.toDateMidnight().toDate();
            this.dayPartition = dateTime.getHourOfDay() * 60 * 60;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            EventPartition eventPartition = (EventPartition) obj;
            if (this.dayPartition != eventPartition.dayPartition) {
                return false;
            }
            return this.date.equals(eventPartition.date);
        }

        public int hashCode() {
            return (31 * this.date.hashCode()) + this.dayPartition;
        }

        public String toString() {
            return "EventPartition{date=" + this.date + ", dayPartition=" + this.dayPartition + '}';
        }
    }

    @VisibleForTesting
    void executeBatches(Collection<EventBatch> collection) {
        Iterator<EventBatch> it2 = collection.iterator();
        while (it2.hasNext()) {
            it2.next().execute();
        }
    }

    public CassandraAuditWriter() {
        this(DseConfig.getAuditLoggerRetentionTime(), DseConfig.getAuditCassConsistencyLevel(), DseConfig.getAuditLoggerCassMode().equals(ThriftServer.ThriftServerType.SYNC) ? BatchingOptions.SYNC : new BatchingOptions(DseConfig.getAuditLoggerCassAsyncQueueSize(), DseConfig.getAuditLoggerNumCassLoggers(), new BatchControllerFactory() { // from class: com.datastax.bdp.cassandra.audit.CassandraAuditWriter.1
            @Override // com.datastax.bdp.cassandra.audit.CassandraAuditWriter.BatchControllerFactory
            public BatchController newController() {
                return new DefaultBatchController(DseConfig.getAuditCassFlushTime(), DseConfig.getAuditCassBatchSize());
            }
        }));
    }

    @VisibleForTesting
    CassandraAuditWriter(int i, ConsistencyLevel consistencyLevel, BatchingOptions batchingOptions) {
        this.retentionPeriod = (int) TimeUnit.SECONDS.convert(i, TimeUnit.HOURS);
        this.writeConsistency = consistencyLevel;
        this.batchingOptions = batchingOptions;
        CassandraAuditKeyspace.maybeConfigure();
        if (isSynchronous()) {
            logger.info(String.format("%s starting in synchronous mode", CassandraAuditWriter.class.getSimpleName()));
            this.executorService = null;
            this.eventQueue = null;
        } else {
            if (batchingOptions.queueSize > 0) {
                this.eventQueue = new ArrayBlockingQueue(batchingOptions.queueSize);
            } else {
                this.eventQueue = new LinkedBlockingQueue();
            }
            this.executorService = Executors.newFixedThreadPool(batchingOptions.concurrentWriters, new BasicThreadFactory.Builder().namingPattern("CassandraAuditWriter-%d").build());
            EventBatcher eventBatcher = new EventBatcher(batchingOptions.controllerFactory.newController());
            for (int i2 = 0; i2 < batchingOptions.concurrentWriters; i2++) {
                this.executorService.submit(eventBatcher);
            }
            Runtime.getRuntime().addShutdownHook(new Thread(new WrappedRunnable() { // from class: com.datastax.bdp.cassandra.audit.CassandraAuditWriter.2
                @Override // org.apache.cassandra.utils.WrappedRunnable
                protected void runMayThrow() throws Exception {
                    CassandraAuditWriter.logger.debug("Shutting down executor service");
                    CassandraAuditWriter.this.executorService.shutdown();
                    CassandraAuditWriter.logger.debug("Executor service shutdown complete");
                }
            }, "Audit shutdown"));
        }
        this.insertStatement = prepareInsert(i > 0);
    }

    private static ModificationStatement prepareInsert(boolean z) {
        String format = String.format("INSERT INTO %s.%s (date, node, day_partition, event_time, batch_id, category, keyspace_name, operation, source, table_name, type, username,authenticated,consistency)VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)", CassandraAuditKeyspace.NAME, CassandraAuditKeyspace.AUDIT_LOG);
        if (z) {
            format = format + " USING TTL ?";
        }
        return (ModificationStatement) StatementUtils.prepareStatement(format, QueryState.forInternalCalls(), "Error preparing audit writer");
    }

    @Override // com.datastax.bdp.cassandra.audit.IAuditWriter
    public void recordEvent(AuditableEvent auditableEvent) {
        EventBindings eventBindings = new EventBindings(auditableEvent);
        if (isSynchronous()) {
            try {
                this.insertStatement.execute(QueryState.forInternalCalls(), eventBindings.getBindings(), System.nanoTime());
            } catch (RequestExecutionException | RequestValidationException e) {
                throw new RuntimeException(e);
            }
        } else {
            try {
                this.eventQueue.put(eventBindings);
            } catch (InterruptedException e2) {
                throw new RuntimeException("Unable to enqueue audit event", e2);
            }
        }
    }

    @Override // com.datastax.bdp.cassandra.audit.IAuditWriter
    public boolean isLoggingEnabled() {
        return true;
    }

    private boolean isSynchronous() {
        return this.batchingOptions.synchronous;
    }
}
