package com.thinkaurelius.titan.diskstorage.log.kcvs;

import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.thinkaurelius.titan.core.TitanException;
import com.thinkaurelius.titan.core.attribute.Duration;
import com.thinkaurelius.titan.diskstorage.BackendException;
import com.thinkaurelius.titan.diskstorage.Entry;
import com.thinkaurelius.titan.diskstorage.ReadBuffer;
import com.thinkaurelius.titan.diskstorage.ResourceUnavailableException;
import com.thinkaurelius.titan.diskstorage.StaticBuffer;
import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVSUtil;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeySliceQuery;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
import com.thinkaurelius.titan.diskstorage.log.Log;
import com.thinkaurelius.titan.diskstorage.log.Message;
import com.thinkaurelius.titan.diskstorage.log.MessageReader;
import com.thinkaurelius.titan.diskstorage.log.ReadMarker;
import com.thinkaurelius.titan.diskstorage.log.util.FutureMessage;
import com.thinkaurelius.titan.diskstorage.log.util.ProcessMessageJob;
import com.thinkaurelius.titan.diskstorage.util.BackendOperation;
import com.thinkaurelius.titan.diskstorage.util.BufferUtil;
import com.thinkaurelius.titan.diskstorage.util.StandardBaseTransactionConfig;
import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
import com.thinkaurelius.titan.diskstorage.util.WriteByteBuffer;
import com.thinkaurelius.titan.diskstorage.util.time.StandardDuration;
import com.thinkaurelius.titan.diskstorage.util.time.StandardTimepoint;
import com.thinkaurelius.titan.diskstorage.util.time.Timepoint;
import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
import com.thinkaurelius.titan.diskstorage.util.time.ZeroDuration;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
import com.thinkaurelius.titan.graphdb.database.serialize.DataOutput;
import com.thinkaurelius.titan.util.system.BackgroundThread;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PreInitializeConfigOptions
/* loaded from: input_file:com/thinkaurelius/titan/diskstorage/log/kcvs/KCVSLog.class */
public class KCVSLog implements Log, BackendOperation.TransactionalProvider {
    private static final Logger log;
    public static final ConfigOption<Duration> LOG_MAX_WRITE_TIME;
    public static final ConfigOption<Duration> LOG_MAX_READ_TIME;
    public static final ConfigOption<Duration> LOG_READ_LAG_TIME;
    public static final ConfigOption<Boolean> LOG_KEY_CONSISTENT;
    public static final long TIMESLICE_INTERVAL = 100000000;
    private static final Duration MIN_DELIVERY_DELAY;
    private static final int BATCH_SIZE_MULTIPLIER = 10;
    private static final Duration CLOSE_DOWN_WAIT;
    private static final Duration INITIAL_READER_DELAY;
    private static final Duration FOREVER;
    private static final int SYSTEM_PARTITION_ID = -1;
    private static final byte MESSAGE_COUNTER = 1;
    private static final byte MARKER_PREFIX = 2;
    private static final StaticBuffer MESSAGE_COUNTER_COLUMN;
    private static final Random random;
    private static final Duration TWO_MICROSECONDS;
    private final KCVSLogManager manager;
    private final String name;
    private final KeyColumnValueStore store;
    private ReadMarker readMarker;
    private final int numBuckets;
    private final boolean keyConsistentOperations;
    private final int sendBatchSize;
    private final Duration maxSendDelay;
    private final Duration maxWriteTime;
    private final ArrayBlockingQueue<MessageEnvelope> outgoingMsg;
    private final SendThread sendThread;
    private final int numReadThreads;
    private final int maxReadMsg;
    private final Duration readPollingInterval;
    private final Duration readLagTime;
    private final Duration maxReadTime;
    private final boolean allowReadMarkerRecovery = true;
    private ScheduledExecutorService readExecutor;
    private MessagePuller[] msgPullers;
    private final AtomicLong numBucketCounter;
    private final AtomicLong numMsgCounter;
    private final List<MessageReader> readers;
    private volatile boolean isOpen;
    private final TimestampProvider times;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/thinkaurelius/titan/diskstorage/log/kcvs/KCVSLog$MessageEnvelope.class */
    public static class MessageEnvelope {
        final FutureMessage<KCVSMessage> message;
        final StaticBuffer key;
        final Entry entry;

        private MessageEnvelope(FutureMessage<KCVSMessage> futureMessage, StaticBuffer staticBuffer, Entry entry) {
            this.message = futureMessage;
            this.key = staticBuffer;
            this.entry = entry;
        }

        public String toString() {
            return "MessageEnvelope[message=" + this.message + ",key=" + this.key + ",entry=" + this.entry + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/thinkaurelius/titan/diskstorage/log/kcvs/KCVSLog$MessagePuller.class */
    public class MessagePuller implements Runnable {
        private final int bucketId;
        private final int partitionId;
        private Timepoint messageTimeStart;

        private MessagePuller(int i, int i2) {
            this.bucketId = i2;
            this.partitionId = i;
            initializeTimepoint();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                setReadMarker();
                int timeSlice = KCVSLog.this.getTimeSlice(this.messageTimeStart);
                Timepoint time = KCVSLog.this.times.getTime();
                Timepoint sub = time.sub(KCVSLog.this.readLagTime);
                StandardTimepoint standardTimepoint = new StandardTimepoint((timeSlice + 1) * KCVSLog.TIMESLICE_INTERVAL, KCVSLog.this.times);
                Timepoint timepoint = 0 > sub.compareTo(standardTimepoint) ? sub : standardTimepoint;
                if (0 <= this.messageTimeStart.compareTo(timepoint)) {
                    if ((this.messageTimeStart.getNativeTimestamp() - timepoint.getNativeTimestamp()) / 3 > KCVSLog.this.readLagTime.getLength(KCVSLog.this.times.getUnit())) {
                        KCVSLog.log.warn("MessagePuller configured with ReadMarker timestamp in the improbably distant future: {} (current time is {})", this.messageTimeStart, time);
                        return;
                    } else {
                        KCVSLog.log.debug("MessagePuller configured with ReadMarker timestamp slightly ahead of read lag time; waiting for the clock to catch up");
                        return;
                    }
                }
                KCVSLog.log.trace("MessagePuller time window: [{}, {})", this.messageTimeStart, timepoint);
                Preconditions.checkState(this.messageTimeStart.compareTo(timepoint) < 0);
                Preconditions.checkState(timepoint.compareTo(time) <= 0, "Attempting to read messages from the future: messageTimeEnd=% vs currentTime=%s", new Object[]{timepoint, time});
                Preconditions.checkState(KCVSLog.this.times.getUnit().equals(this.messageTimeStart.getNativeUnit()), "Expected TimestampProvider TimeUnit %s to match nextTimepoint unit %s", new Object[]{KCVSLog.this.times.getUnit(), this.messageTimeStart.getNativeUnit()});
                Preconditions.checkState(KCVSLog.this.times.getUnit().equals(timepoint.getNativeUnit()), "Expected Timestampprovider TimeUnit %s to match timeWindowEnd unit %s", new Object[]{KCVSLog.this.times.getUnit(), this.messageTimeStart.getNativeUnit()});
                StaticBuffer logKey = KCVSLog.this.getLogKey(this.partitionId, this.bucketId, timeSlice);
                KeySliceQuery keySliceQuery = new KeySliceQuery(logKey, BufferUtil.getLongBuffer(this.messageTimeStart.getNativeTimestamp()), BufferUtil.getLongBuffer(timepoint.getNativeTimestamp()));
                keySliceQuery.setLimit(KCVSLog.this.maxReadMsg);
                KCVSLog.log.trace("Converted MessagePuller time window to {}", keySliceQuery);
                List<Entry> list = (List) BackendOperation.execute(getOperation(keySliceQuery), KCVSLog.this, KCVSLog.this.times, KCVSLog.this.maxReadTime);
                prepareMessageProcessing(list);
                if (list.size() >= KCVSLog.this.maxReadMsg) {
                    Entry entry = list.get(list.size() - 1);
                    timepoint = timepoint.add(KCVSLog.TWO_MICROSECONDS);
                    KCVSLog.log.debug("Extended time window to {}", timepoint);
                    KeySliceQuery keySliceQuery2 = new KeySliceQuery(logKey, BufferUtil.nextBiggerBuffer(entry.getColumn()), BufferUtil.getLongBuffer(timepoint.getNativeTimestamp()));
                    KCVSLog.log.debug("Converted extended MessagePuller time window to {}", keySliceQuery2);
                    prepareMessageProcessing((List) BackendOperation.execute(getOperation(keySliceQuery2), KCVSLog.this, KCVSLog.this.times, KCVSLog.this.maxReadTime));
                }
                this.messageTimeStart = timepoint;
            } catch (Throwable th) {
                KCVSLog.log.warn("Could not read messages for timestamp [" + this.messageTimeStart + "] (this read will be retried)", th);
            }
        }

        private void initializeTimepoint() {
            Preconditions.checkState(null == this.messageTimeStart);
            if (KCVSLog.this.readMarker.hasIdentifier()) {
                this.messageTimeStart = new StandardTimepoint(KCVSLog.this.readSetting(KCVSLog.this.readMarker.getIdentifier(), KCVSLog.this.getMarkerColumn(this.partitionId, this.bucketId), KCVSLog.this.readMarker.getStartTime(KCVSLog.this.times).getNativeTimestamp()), KCVSLog.this.times);
                KCVSLog.log.info("Loaded indentified ReadMarker start time {} into {}", this.messageTimeStart, this);
            } else {
                this.messageTimeStart = KCVSLog.this.readMarker.getStartTime(KCVSLog.this.times);
                KCVSLog.log.info("Loaded unidentified ReadMarker start time {} into {}", this.messageTimeStart, this);
            }
        }

        private void prepareMessageProcessing(List<Entry> list) {
            Iterator<Entry> it = list.iterator();
            while (it.hasNext()) {
                KCVSMessage parseMessage = KCVSLog.this.parseMessage(it.next());
                KCVSLog.log.debug("Parsed message {}, about to submit this message to the reader executor", parseMessage);
                Iterator it2 = KCVSLog.this.readers.iterator();
                while (it2.hasNext()) {
                    KCVSLog.this.readExecutor.submit(new ProcessMessageJob(parseMessage, (MessageReader) it2.next()));
                }
            }
        }

        private void setReadMarker() {
            if (KCVSLog.this.readMarker.hasIdentifier()) {
                try {
                    KCVSLog.log.debug("Attempting to persist read marker with identifier {}", KCVSLog.this.readMarker.getIdentifier());
                    KCVSLog.this.writeSetting(KCVSLog.this.readMarker.getIdentifier(), KCVSLog.this.getMarkerColumn(this.partitionId, this.bucketId), this.messageTimeStart.getNativeTimestamp());
                    KCVSLog.log.debug("Persisted read marker: identifier={} partitionId={} buckedId={} nextTimepoint={}", new Object[]{KCVSLog.this.readMarker.getIdentifier(), Integer.valueOf(this.partitionId), Integer.valueOf(this.bucketId), this.messageTimeStart});
                } catch (Throwable th) {
                    KCVSLog.log.error("Could not persist read marker [" + KCVSLog.this.readMarker.getIdentifier() + "] on bucket [" + this.bucketId + "] + partition [" + this.partitionId + "]", th);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            setReadMarker();
        }

        private BackendOperation.Transactional<List<Entry>> getOperation(final KeySliceQuery keySliceQuery) {
            return new BackendOperation.Transactional<List<Entry>>() { // from class: com.thinkaurelius.titan.diskstorage.log.kcvs.KCVSLog.MessagePuller.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.thinkaurelius.titan.diskstorage.util.BackendOperation.Transactional
                public List<Entry> call(StoreTransaction storeTransaction) throws BackendException {
                    return KCVSLog.this.store.getSlice(keySliceQuery, storeTransaction);
                }

                public String toString() {
                    return "messageReading@" + MessagePuller.this.partitionId + ":" + MessagePuller.this.bucketId;
                }
            };
        }
    }

    /* loaded from: input_file:com/thinkaurelius/titan/diskstorage/log/kcvs/KCVSLog$SendThread.class */
    private class SendThread extends BackgroundThread {
        private List<MessageEnvelope> toSend;
        static final /* synthetic */ boolean $assertionsDisabled;

        public SendThread() {
            super("KCVSLogSend" + KCVSLog.this.name, false);
            this.toSend = new ArrayList((KCVSLog.this.sendBatchSize * 3) / 2);
        }

        private Duration timeSinceFirstMsg() {
            Duration duration = ZeroDuration.INSTANCE;
            if (!this.toSend.isEmpty()) {
                Timepoint timestampMicro = this.toSend.get(0).message.getMessage().getTimestampMicro();
                Timepoint time = KCVSLog.this.times.getTime();
                if (timestampMicro.compareTo(time) < 0) {
                    long timestamp = timestampMicro.getTimestamp(KCVSLog.this.times.getUnit());
                    long timestamp2 = time.getTimestamp(KCVSLog.this.times.getUnit());
                    if (!$assertionsDisabled && timestamp >= timestamp2) {
                        throw new AssertionError();
                    }
                    duration = new StandardDuration(timestamp2 - timestamp, KCVSLog.this.times.getUnit());
                }
            }
            return duration;
        }

        private Duration maxWaitTime() {
            return !this.toSend.isEmpty() ? KCVSLog.this.maxSendDelay.sub(timeSinceFirstMsg()) : KCVSLog.FOREVER;
        }

        @Override // com.thinkaurelius.titan.util.system.BackgroundThread
        protected void waitCondition() throws InterruptedException {
            TimeUnit timeUnit = TimeUnit.MICROSECONDS;
            MessageEnvelope messageEnvelope = (MessageEnvelope) KCVSLog.this.outgoingMsg.poll(maxWaitTime().getLength(timeUnit), timeUnit);
            if (messageEnvelope != null) {
                this.toSend.add(messageEnvelope);
            }
        }

        @Override // com.thinkaurelius.titan.util.system.BackgroundThread
        protected void action() {
            MessageEnvelope messageEnvelope;
            while (this.toSend.size() < KCVSLog.this.sendBatchSize && (messageEnvelope = (MessageEnvelope) KCVSLog.this.outgoingMsg.poll()) != null) {
                this.toSend.add(messageEnvelope);
            }
            if (this.toSend.isEmpty()) {
                return;
            }
            if (KCVSLog.this.maxSendDelay.compareTo(timeSinceFirstMsg()) <= 0 || this.toSend.size() >= KCVSLog.this.sendBatchSize) {
                try {
                    KCVSLog.this.sendMessages(this.toSend);
                    this.toSend.clear();
                } catch (Throwable th) {
                    this.toSend.clear();
                    throw th;
                }
            }
        }

        @Override // com.thinkaurelius.titan.util.system.BackgroundThread
        protected void cleanup() {
            if (this.toSend.isEmpty() && KCVSLog.this.outgoingMsg.isEmpty()) {
                return;
            }
            this.toSend.addAll(KCVSLog.this.outgoingMsg);
            int i = 0;
            while (true) {
                int i2 = i;
                if (i2 >= this.toSend.size()) {
                    return;
                }
                try {
                    KCVSLog.this.sendMessages(this.toSend.subList(i2, Math.min(this.toSend.size(), i2 + KCVSLog.this.sendBatchSize)));
                    i = i2 + KCVSLog.this.sendBatchSize;
                } catch (RuntimeException e) {
                    for (int i3 = i2 + KCVSLog.this.sendBatchSize; i3 < this.toSend.size(); i3++) {
                        this.toSend.get(i3).message.failed(e);
                    }
                    throw e;
                }
            }
        }

        static {
            $assertionsDisabled = !KCVSLog.class.desiredAssertionStatus();
        }
    }

    public KCVSLog(String str, KCVSLogManager kCVSLogManager, KeyColumnValueStore keyColumnValueStore, Configuration configuration) {
        Preconditions.checkArgument((kCVSLogManager == null || str == null || keyColumnValueStore == null || configuration == null) ? false : true);
        this.name = str;
        this.manager = kCVSLogManager;
        this.store = keyColumnValueStore;
        this.times = (TimestampProvider) configuration.get(GraphDatabaseConfiguration.TIMESTAMP_PROVIDER, new String[0]);
        this.keyConsistentOperations = ((Boolean) configuration.get(LOG_KEY_CONSISTENT, new String[0])).booleanValue();
        this.numBuckets = ((Integer) configuration.get(GraphDatabaseConfiguration.LOG_NUM_BUCKETS, new String[0])).intValue();
        Preconditions.checkArgument(this.numBuckets >= 1 && this.numBuckets <= Integer.MAX_VALUE);
        this.sendBatchSize = ((Integer) configuration.get(GraphDatabaseConfiguration.LOG_SEND_BATCH_SIZE, new String[0])).intValue();
        this.maxSendDelay = (Duration) configuration.get(GraphDatabaseConfiguration.LOG_SEND_DELAY, new String[0]);
        this.maxWriteTime = (Duration) configuration.get(LOG_MAX_WRITE_TIME, new String[0]);
        this.numReadThreads = ((Integer) configuration.get(GraphDatabaseConfiguration.LOG_READ_THREADS, new String[0])).intValue();
        this.maxReadMsg = ((Integer) configuration.get(GraphDatabaseConfiguration.LOG_READ_BATCH_SIZE, new String[0])).intValue();
        this.readPollingInterval = (Duration) configuration.get(GraphDatabaseConfiguration.LOG_READ_INTERVAL, new String[0]);
        this.readLagTime = ((Duration) configuration.get(LOG_READ_LAG_TIME, new String[0])).add(this.maxSendDelay);
        this.maxReadTime = (Duration) configuration.get(LOG_MAX_READ_TIME, new String[0]);
        if (MIN_DELIVERY_DELAY.compareTo(this.maxSendDelay) <= 0) {
            this.outgoingMsg = new ArrayBlockingQueue<>(this.sendBatchSize * BATCH_SIZE_MULTIPLIER);
            this.sendThread = new SendThread();
            this.sendThread.start();
        } else {
            this.outgoingMsg = null;
            this.sendThread = null;
        }
        this.readExecutor = null;
        this.msgPullers = null;
        this.numMsgCounter = new AtomicLong(readSetting(kCVSLogManager.senderId, MESSAGE_COUNTER_COLUMN, 0L));
        this.numBucketCounter = new AtomicLong(0L);
        this.readers = new ArrayList();
        this.isOpen = true;
    }

    @Override // com.thinkaurelius.titan.diskstorage.log.Log
    public String getName() {
        return this.name;
    }

    @Override // com.thinkaurelius.titan.diskstorage.log.Log, com.thinkaurelius.titan.diskstorage.util.BackendOperation.TransactionalProvider
    public synchronized void close() throws BackendException {
        if (this.isOpen) {
            this.isOpen = false;
            if (this.readExecutor != null) {
                this.readExecutor.shutdown();
            }
            if (this.sendThread != null) {
                this.sendThread.close(CLOSE_DOWN_WAIT.getLength(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
            }
            if (this.readExecutor != null) {
                try {
                    this.readExecutor.awaitTermination(1L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    log.error("Could not terminate reader thread pool for KCVSLog " + this.name + " due to interruption");
                }
                if (this.readExecutor.isTerminated()) {
                    for (MessagePuller messagePuller : this.msgPullers) {
                        messagePuller.close();
                    }
                } else {
                    this.readExecutor.shutdownNow();
                    log.error("Reader thread pool for KCVSLog " + this.name + " did not shut down in time - could not clean up or set read markers");
                }
            }
            writeSetting(this.manager.senderId, MESSAGE_COUNTER_COLUMN, this.numMsgCounter.get());
            this.store.close();
            this.manager.closedLog(this);
        }
    }

    @Override // com.thinkaurelius.titan.diskstorage.util.BackendOperation.TransactionalProvider
    public StoreTransaction openTx() throws BackendException {
        return this.manager.storeManager.beginTransaction(this.keyConsistentOperations ? StandardBaseTransactionConfig.of(this.times, this.manager.storeManager.getFeatures().getKeyConsistentTxConfig()) : StandardBaseTransactionConfig.of(this.times));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getTimeSlice(Timepoint timepoint) {
        long timestamp = timepoint.getTimestamp(this.times.getUnit()) / TIMESLICE_INTERVAL;
        if (timestamp > 2147483647L || timestamp < 0) {
            throw new IllegalArgumentException("Timestamp overflow detected: " + timepoint);
        }
        return (int) timestamp;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StaticBuffer getLogKey(int i, int i2, int i3) {
        Preconditions.checkArgument(i >= 0 && i < (1 << this.manager.partitionBitWidth));
        Preconditions.checkArgument(i2 >= 0 && i2 < this.numBuckets);
        DataOutput dataOutput = this.manager.serializer.getDataOutput(12);
        dataOutput.putInt(i << (32 - this.manager.partitionBitWidth));
        dataOutput.putInt(i2);
        dataOutput.putInt(i3);
        return dataOutput.getStaticBuffer();
    }

    private Entry writeMessage(KCVSMessage kCVSMessage) {
        StaticBuffer content = kCVSMessage.getContent();
        DataOutput dataOutput = this.manager.serializer.getDataOutput(16 + this.manager.senderId.length() + 2 + content.length());
        long timestamp = kCVSMessage.getTimestamp(this.times.getUnit());
        Preconditions.checkArgument(timestamp > 0);
        dataOutput.putLong(timestamp);
        dataOutput.writeObjectNotNull(this.manager.senderId);
        dataOutput.putLong(this.numMsgCounter.incrementAndGet());
        int position = dataOutput.getPosition();
        dataOutput.putBytes(content);
        return new StaticArrayEntry(dataOutput.getStaticBuffer(), position);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public KCVSMessage parseMessage(Entry entry) {
        ReadBuffer asReadBuffer = entry.asReadBuffer();
        return new KCVSMessage(entry.getValue(), new StandardTimepoint(asReadBuffer.getLong(), this.times), (String) this.manager.serializer.readObjectNotNull(asReadBuffer, String.class));
    }

    @Override // com.thinkaurelius.titan.diskstorage.log.Log
    public Future<Message> add(StaticBuffer staticBuffer) {
        return add(staticBuffer, this.manager.defaultWritePartitionIds[random.nextInt(this.manager.defaultWritePartitionIds.length)]);
    }

    @Override // com.thinkaurelius.titan.diskstorage.log.Log
    public Future<Message> add(StaticBuffer staticBuffer, StaticBuffer staticBuffer2) {
        return add(staticBuffer, staticBuffer2, (ExternalPersistor) null);
    }

    public Future<Message> add(StaticBuffer staticBuffer, StaticBuffer staticBuffer2, ExternalPersistor externalPersistor) {
        Preconditions.checkArgument(staticBuffer2 != null && staticBuffer2.length() > 0, "Invalid key provided: %s", new Object[]{staticBuffer2});
        int i = 0;
        int i2 = 0;
        while (i2 < 4) {
            i = (i << 8) + (staticBuffer2.length() > i2 ? staticBuffer2.getByte(i2) & 255 : 0);
            i2++;
        }
        if ($assertionsDisabled || (this.manager.partitionBitWidth >= 0 && this.manager.partitionBitWidth <= 32)) {
            return add(staticBuffer, this.manager.partitionBitWidth == 0 ? 0 : i >>> (32 - this.manager.partitionBitWidth), externalPersistor);
        }
        throw new AssertionError();
    }

    private Future<Message> add(StaticBuffer staticBuffer, int i) {
        return add(staticBuffer, i, (ExternalPersistor) null);
    }

    /* JADX WARN: Type inference failed for: r0v12, types: [java.util.concurrent.Future<com.thinkaurelius.titan.diskstorage.log.Message>, com.thinkaurelius.titan.diskstorage.log.util.FutureMessage] */
    private Future<Message> add(StaticBuffer staticBuffer, int i, ExternalPersistor externalPersistor) {
        ResourceUnavailableException.verifyOpen(this.isOpen, "Log", this.name);
        Preconditions.checkArgument(staticBuffer != null && staticBuffer.length() > 0, "Content is empty");
        Preconditions.checkArgument(i >= 0 && i < (1 << this.manager.partitionBitWidth), "Invalid partition id: %s", new Object[]{Integer.valueOf(i)});
        Timepoint time = this.times.getTime();
        KCVSMessage kCVSMessage = new KCVSMessage(staticBuffer, time, this.manager.senderId);
        ?? futureMessage = new FutureMessage(kCVSMessage);
        MessageEnvelope messageEnvelope = new MessageEnvelope(futureMessage, getLogKey(i, (int) (this.numBucketCounter.incrementAndGet() % this.numBuckets), getTimeSlice(time)), writeMessage(kCVSMessage));
        if (externalPersistor != null) {
            try {
                externalPersistor.add(messageEnvelope.key, messageEnvelope.entry);
                messageEnvelope.message.delivered();
            } catch (TitanException e) {
                messageEnvelope.message.failed(e);
                throw e;
            }
        } else if (this.outgoingMsg == null) {
            sendMessages(ImmutableList.of(messageEnvelope));
        } else {
            try {
                this.outgoingMsg.put(messageEnvelope);
                log.debug("Enqueued {} for partition {}", messageEnvelope, Integer.valueOf(i));
            } catch (InterruptedException e2) {
                throw new TitanException("Got interrupted waiting to send message", e2);
            }
        }
        return futureMessage;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMessages(final List<MessageEnvelope> list) {
        try {
            Preconditions.checkState(((Boolean) BackendOperation.execute(new BackendOperation.Transactional<Boolean>() { // from class: com.thinkaurelius.titan.diskstorage.log.kcvs.KCVSLog.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.thinkaurelius.titan.diskstorage.util.BackendOperation.Transactional
                public Boolean call(StoreTransaction storeTransaction) throws BackendException {
                    ArrayListMultimap create = ArrayListMultimap.create();
                    for (MessageEnvelope messageEnvelope : list) {
                        create.put(messageEnvelope.key, messageEnvelope.entry);
                        KCVSLog.log.debug("Preparing to write {} to storage with column/timestamp {}", messageEnvelope, new StandardTimepoint(messageEnvelope.entry.getColumn().getLong(0), KCVSLog.this.times));
                    }
                    HashMap hashMap = new HashMap(create.keySet().size());
                    for (StaticBuffer staticBuffer : create.keySet()) {
                        hashMap.put(staticBuffer, new KCVMutation(create.get(staticBuffer), KeyColumnValueStore.NO_DELETIONS));
                        KCVSLog.log.debug("Built mutation on key {} with {} additions", staticBuffer, Integer.valueOf(create.get(staticBuffer).size()));
                    }
                    KCVSLog.this.manager.storeManager.mutateMany(ImmutableMap.of(KCVSLog.this.store.getName(), hashMap), storeTransaction);
                    KCVSLog.log.debug("Wrote {} total envelopes with operation timestamp {}", Integer.valueOf(list.size()), storeTransaction.getConfiguration().getCommitTime());
                    return Boolean.TRUE;
                }

                public String toString() {
                    return "messageSending";
                }
            }, this, this.times, this.maxWriteTime)).booleanValue());
            log.debug("Wrote {} messages to backend", Integer.valueOf(list.size()));
            Iterator<MessageEnvelope> it = list.iterator();
            while (it.hasNext()) {
                it.next().message.delivered();
            }
        } catch (TitanException e) {
            Iterator<MessageEnvelope> it2 = list.iterator();
            while (it2.hasNext()) {
                it2.next().message.failed(e);
            }
            throw e;
        }
    }

    @Override // com.thinkaurelius.titan.diskstorage.log.Log
    public synchronized void registerReader(ReadMarker readMarker, MessageReader... messageReaderArr) {
        Preconditions.checkArgument(messageReaderArr != null && messageReaderArr.length > 0, "Must specify at least one reader");
        registerReaders(readMarker, Arrays.asList(messageReaderArr));
    }

    @Override // com.thinkaurelius.titan.diskstorage.log.Log
    public synchronized void registerReaders(ReadMarker readMarker, Iterable<MessageReader> iterable) {
        ResourceUnavailableException.verifyOpen(this.isOpen, "Log", this.name);
        Preconditions.checkArgument(!Iterables.isEmpty(iterable), "Must specify at least one reader");
        Preconditions.checkArgument(readMarker != null, "Read marker cannot be null");
        Preconditions.checkArgument(this.readMarker == null || this.readMarker.isCompatible(readMarker), "Provided read marker is not compatible with existing read marker for previously registered readers");
        if (this.readMarker == null) {
            this.readMarker = readMarker;
        }
        boolean isEmpty = this.readers.isEmpty();
        for (MessageReader messageReader : iterable) {
            Preconditions.checkNotNull(messageReader);
            if (!this.readers.contains(messageReader)) {
                this.readers.add(messageReader);
            }
        }
        if (!isEmpty || this.readers.isEmpty()) {
            return;
        }
        this.readExecutor = new ScheduledThreadPoolExecutor(this.numReadThreads, new RejectedExecutionHandler() { // from class: com.thinkaurelius.titan.diskstorage.log.kcvs.KCVSLog.2
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                runnable.run();
            }
        });
        this.msgPullers = new MessagePuller[this.manager.readPartitionIds.length * this.numBuckets];
        int i = 0;
        for (int i2 : this.manager.readPartitionIds) {
            for (int i3 = 0; i3 < this.numBuckets; i3++) {
                this.msgPullers[i] = new MessagePuller(i2, i3);
                TimeUnit timeUnit = TimeUnit.MICROSECONDS;
                log.debug("Creating log read executor: initialDelay={} delay={} unit={}", new Object[]{Long.valueOf(INITIAL_READER_DELAY.getLength(timeUnit)), Long.valueOf(this.readPollingInterval.getLength(timeUnit)), timeUnit});
                this.readExecutor.scheduleWithFixedDelay(this.msgPullers[i], INITIAL_READER_DELAY.getLength(timeUnit), this.readPollingInterval.getLength(timeUnit), timeUnit);
                i++;
            }
        }
    }

    @Override // com.thinkaurelius.titan.diskstorage.log.Log
    public synchronized boolean unregisterReader(MessageReader messageReader) {
        ResourceUnavailableException.verifyOpen(this.isOpen, "Log", this.name);
        return this.readers.remove(messageReader);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StaticBuffer getMarkerColumn(int i, int i2) {
        DataOutput dataOutput = this.manager.serializer.getDataOutput(9);
        dataOutput.putByte((byte) 2);
        dataOutput.putInt(i);
        dataOutput.putInt(i2);
        return dataOutput.getStaticBuffer();
    }

    private StaticBuffer getSettingKey(String str) {
        DataOutput dataOutput = this.manager.serializer.getDataOutput(6 + str.length());
        dataOutput.putInt(SYSTEM_PARTITION_ID);
        dataOutput.writeObjectNotNull(str);
        return dataOutput.getStaticBuffer();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long readSetting(String str, final StaticBuffer staticBuffer, long j) {
        final StaticBuffer settingKey = getSettingKey(str);
        StaticBuffer staticBuffer2 = (StaticBuffer) BackendOperation.execute(new BackendOperation.Transactional<StaticBuffer>() { // from class: com.thinkaurelius.titan.diskstorage.log.kcvs.KCVSLog.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.thinkaurelius.titan.diskstorage.util.BackendOperation.Transactional
            public StaticBuffer call(StoreTransaction storeTransaction) throws BackendException {
                return KCVSUtil.get(KCVSLog.this.store, settingKey, staticBuffer, storeTransaction);
            }

            public String toString() {
                return "readingLogSetting";
            }
        }, this, this.times, this.maxReadTime);
        if (staticBuffer2 == null) {
            return j;
        }
        Preconditions.checkArgument(staticBuffer2.length() == 8);
        return staticBuffer2.getLong(0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeSetting(String str, StaticBuffer staticBuffer, long j) {
        final StaticBuffer settingKey = getSettingKey(str);
        final Entry of = StaticArrayEntry.of(staticBuffer, BufferUtil.getLongBuffer(j));
        Preconditions.checkState(((Boolean) BackendOperation.execute(new BackendOperation.Transactional<Boolean>() { // from class: com.thinkaurelius.titan.diskstorage.log.kcvs.KCVSLog.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.thinkaurelius.titan.diskstorage.util.BackendOperation.Transactional
            public Boolean call(StoreTransaction storeTransaction) throws BackendException {
                KCVSLog.this.store.mutate(settingKey, ImmutableList.of(of), KeyColumnValueStore.NO_DELETIONS, storeTransaction);
                return Boolean.TRUE;
            }

            public String toString() {
                return "writingLogSetting";
            }
        }, this, this.times, this.maxWriteTime)).booleanValue());
    }

    static {
        $assertionsDisabled = !KCVSLog.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(KCVSLog.class);
        LOG_MAX_WRITE_TIME = new ConfigOption<>(GraphDatabaseConfiguration.LOG_NS, "max-write-time", "Maximum time in ms to try persisting log messages against the backend before failing.", ConfigOption.Type.MASKABLE, new StandardDuration(10000L, TimeUnit.MILLISECONDS));
        LOG_MAX_READ_TIME = new ConfigOption<>(GraphDatabaseConfiguration.LOG_NS, "max-read-time", "Maximum time in ms to try reading log messages from the backend before failing.", ConfigOption.Type.MASKABLE, new StandardDuration(4000L, TimeUnit.MILLISECONDS));
        LOG_READ_LAG_TIME = new ConfigOption<>(GraphDatabaseConfiguration.LOG_NS, "read-lag-time", "Maximum time in ms that it may take for reads to appear in the backend. If a write does not becomevisible in the storage backend in this amount of time, a log reader might miss the message.", ConfigOption.Type.MASKABLE, new StandardDuration(500L, TimeUnit.MILLISECONDS));
        LOG_KEY_CONSISTENT = new ConfigOption<>(GraphDatabaseConfiguration.LOG_NS, "key-consistent", "Whether to require consistency for log reading and writing messages to the storage backend", ConfigOption.Type.MASKABLE, false);
        MIN_DELIVERY_DELAY = new StandardDuration(10L, TimeUnit.MILLISECONDS);
        CLOSE_DOWN_WAIT = new StandardDuration(10L, TimeUnit.SECONDS);
        INITIAL_READER_DELAY = new StandardDuration(100L, TimeUnit.MILLISECONDS);
        FOREVER = new StandardDuration(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        MESSAGE_COUNTER_COLUMN = new WriteByteBuffer(1).putByte((byte) 1).getStaticBuffer();
        random = new Random();
        TWO_MICROSECONDS = new StandardDuration(2L, TimeUnit.MICROSECONDS);
    }
}
