package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorStateManager.class */
public class ProcessorStateManager implements StateManager {
    private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
    public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
    private final File baseDir;
    private final TaskId taskId;
    private final String logPrefix;
    private final boolean isStandby;
    private final StateDirectory stateDirectory;
    private final Map<String, StateStore> stores;
    private final Map<String, StateStore> globalStores;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final Map<TopicPartition, Long> offsetLimits;
    private final Map<TopicPartition, Long> restoredOffsets;
    private final Map<TopicPartition, Long> checkpointedOffsets;
    private final Map<String, StateRestoreCallback> restoreCallbacks;
    private final Map<String, String> storeToChangelogTopic;
    private final Map<String, TopicPartition> partitionForTopic = new HashMap();

    public ProcessorStateManager(TaskId taskId, Collection<TopicPartition> collection, Consumer<byte[], byte[]> consumer, boolean z, StateDirectory stateDirectory, Map<String, String> map) throws LockException, IOException {
        this.taskId = taskId;
        this.stateDirectory = stateDirectory;
        this.logPrefix = String.format("task [%s]", taskId);
        for (TopicPartition topicPartition : collection) {
            this.partitionForTopic.put(topicPartition.topic(), topicPartition);
        }
        this.stores = new LinkedHashMap();
        this.globalStores = new HashMap();
        this.restoreConsumer = consumer;
        this.offsetLimits = new HashMap();
        this.restoredOffsets = new HashMap();
        this.isStandby = z;
        this.restoreCallbacks = z ? new HashMap() : null;
        this.storeToChangelogTopic = map;
        if (!stateDirectory.lock(taskId, 5)) {
            throw new LockException(String.format("%s Failed to lock the state directory for task %s", this.logPrefix, taskId));
        }
        try {
            this.baseDir = stateDirectory.directoryForTask(taskId);
            OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
            this.checkpointedOffsets = new HashMap(offsetCheckpoint.read());
            offsetCheckpoint.delete();
        } catch (ProcessorStateException e) {
            throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s", this.logPrefix, taskId, e));
        }
    }

    public static String storeChangelogTopic(String str, String str2) {
        return str + "-" + str2 + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public File baseDir() {
        return this.baseDir;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void register(StateStore stateStore, boolean z, StateRestoreCallback stateRestoreCallback) {
        log.debug("{} Registering state store {} to its state manager", this.logPrefix, stateStore.name());
        if (stateStore.name().equals(CHECKPOINT_FILE_NAME)) {
            throw new IllegalArgumentException(String.format("%s Illegal store name: %s", this.logPrefix, CHECKPOINT_FILE_NAME));
        }
        if (this.stores.containsKey(stateStore.name())) {
            throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", this.logPrefix, stateStore.name()));
        }
        String str = this.storeToChangelogTopic.get(stateStore.name());
        if (str == null) {
            this.stores.put(stateStore.name(), stateStore);
            return;
        }
        int partition = getPartition(str);
        boolean z2 = true;
        long currentTimeMillis = System.currentTimeMillis();
        do {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
            }
            try {
                List partitionsFor = this.restoreConsumer.partitionsFor(str);
                if (partitionsFor != null) {
                    Iterator it = partitionsFor.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        } else if (((PartitionInfo) it.next()).partition() == partition) {
                            z2 = false;
                            break;
                        }
                    }
                    if (!z2) {
                        break;
                    }
                } else {
                    throw new StreamsException(String.format("%s Could not find partition info for topic: %s", this.logPrefix, str));
                }
            } catch (TimeoutException e2) {
                throw new StreamsException(String.format("%s Could not fetch partition info for topic: %s before expiration of the configured request timeout", this.logPrefix, str));
            }
        } while (System.currentTimeMillis() < currentTimeMillis + 5000);
        if (z2) {
            throw new StreamsException(String.format("%s Store %s's change log (%s) does not contain partition %s", this.logPrefix, stateStore.name(), str, Integer.valueOf(partition)));
        }
        if (!this.isStandby) {
            log.trace("{} Restoring state store {} from changelog topic {}", new Object[]{this.logPrefix, stateStore.name(), str});
            restoreActiveState(str, stateRestoreCallback);
        } else if (stateStore.persistent()) {
            log.trace("{} Preparing standby replica of persistent state store {} with changelog topic {}", new Object[]{this.logPrefix, stateStore.name(), str});
            this.restoreCallbacks.put(str, stateRestoreCallback);
        }
        this.stores.put(stateStore.name(), stateStore);
    }

    private void restoreActiveState(String str, StateRestoreCallback stateRestoreCallback) {
        if (!this.restoreConsumer.subscription().isEmpty()) {
            throw new IllegalStateException(String.format("%s Restore consumer should have not subscribed to any partitions (%s) beforehand", this.logPrefix, this.restoreConsumer.subscription()));
        }
        TopicPartition topicPartition = new TopicPartition(str, getPartition(str));
        this.restoreConsumer.assign(Collections.singletonList(topicPartition));
        try {
            this.restoreConsumer.seekToEnd(Collections.singleton(topicPartition));
            long position = this.restoreConsumer.position(topicPartition);
            if (this.checkpointedOffsets.containsKey(topicPartition)) {
                this.restoreConsumer.seek(topicPartition, this.checkpointedOffsets.get(topicPartition).longValue());
            } else {
                this.restoreConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
            log.debug("restoring partition {} from offset {} to endOffset {}", new Object[]{topicPartition, Long.valueOf(this.restoreConsumer.position(topicPartition)), Long.valueOf(position)});
            long offsetLimit = offsetLimit(topicPartition);
            do {
                long j = 0;
                for (ConsumerRecord consumerRecord : this.restoreConsumer.poll(100L).records(topicPartition)) {
                    j = consumerRecord.offset();
                    if (j >= offsetLimit) {
                        break;
                    } else {
                        stateRestoreCallback.restore((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value());
                    }
                }
                if (j < offsetLimit && this.restoreConsumer.position(topicPartition) != position) {
                }
                this.restoredOffsets.put(topicPartition, Long.valueOf(Math.min(offsetLimit, this.restoreConsumer.position(topicPartition))));
                this.restoreConsumer.assign(Collections.emptyList());
                return;
            } while (this.restoreConsumer.position(topicPartition) <= position);
            throw new IllegalStateException(String.format("%s Log end offset of %s should not change while restoring: old end offset %d, current offset %d", this.logPrefix, topicPartition, Long.valueOf(position), Long.valueOf(this.restoreConsumer.position(topicPartition))));
        } catch (Throwable th) {
            this.restoreConsumer.assign(Collections.emptyList());
            throw th;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public Map<TopicPartition, Long> checkpointedOffsets() {
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<String, StateRestoreCallback>> it = this.restoreCallbacks.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            TopicPartition topicPartition = new TopicPartition(key, getPartition(key));
            if (this.checkpointedOffsets.containsKey(topicPartition)) {
                hashMap.put(topicPartition, this.checkpointedOffsets.get(topicPartition));
            } else {
                hashMap.put(topicPartition, -1L);
            }
        }
        return hashMap;
    }

    public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition topicPartition, List<ConsumerRecord<byte[], byte[]>> list) {
        long offsetLimit = offsetLimit(topicPartition);
        ArrayList arrayList = null;
        StateRestoreCallback stateRestoreCallback = this.restoreCallbacks.get(topicPartition.topic());
        long j = -1;
        int i = 0;
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            if (consumerRecord.offset() < offsetLimit) {
                try {
                    stateRestoreCallback.restore((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value());
                    j = consumerRecord.offset();
                } catch (Exception e) {
                    throw new ProcessorStateException(String.format("%s exception caught while trying to restore state from %s", this.logPrefix, topicPartition), e);
                }
            } else {
                if (arrayList == null) {
                    arrayList = new ArrayList(list.size() - i);
                }
                arrayList.add(consumerRecord);
            }
            i++;
        }
        this.restoredOffsets.put(topicPartition, Long.valueOf(j + 1));
        return arrayList;
    }

    public void putOffsetLimit(TopicPartition topicPartition, long j) {
        this.offsetLimits.put(topicPartition, Long.valueOf(j));
    }

    private long offsetLimit(TopicPartition topicPartition) {
        Long l = this.offsetLimits.get(topicPartition);
        if (l != null) {
            return l.longValue();
        }
        return Long.MAX_VALUE;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getStore(String str) {
        return this.stores.get(str);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void flush(InternalProcessorContext internalProcessorContext) {
        if (this.stores.isEmpty()) {
            return;
        }
        log.debug("{} Flushing all stores registered in the state manager", this.logPrefix);
        for (StateStore stateStore : this.stores.values()) {
            try {
                log.trace("{} Flushing store={}", this.logPrefix, stateStore.name());
                stateStore.flush();
            } catch (Exception e) {
                throw new ProcessorStateException(String.format("%s Failed to flush state store %s", this.logPrefix, stateStore.name()), e);
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void close(Map<TopicPartition, Long> map) throws IOException {
        try {
            if (!this.stores.isEmpty()) {
                log.debug("{} Closing its state manager and all the registered state stores", this.logPrefix);
                for (Map.Entry<String, StateStore> entry : this.stores.entrySet()) {
                    log.debug("{} Closing storage engine {}", this.logPrefix, entry.getKey());
                    try {
                        entry.getValue().close();
                    } catch (Exception e) {
                        throw new ProcessorStateException(String.format("%s Failed to close state store %s", this.logPrefix, entry.getKey()), e);
                    }
                }
                if (map != null) {
                    HashMap hashMap = new HashMap();
                    for (String str : this.stores.keySet()) {
                        if (this.stores.get(str).persistent() && this.storeToChangelogTopic.containsKey(str)) {
                            TopicPartition topicPartition = new TopicPartition(this.storeToChangelogTopic.get(str), getPartition(str));
                            Long l = map.get(topicPartition);
                            if (l != null) {
                                hashMap.put(topicPartition, Long.valueOf(l.longValue() + 1));
                            } else {
                                Long l2 = this.restoredOffsets.get(topicPartition);
                                if (l2 != null) {
                                    hashMap.put(topicPartition, l2);
                                }
                            }
                        }
                    }
                    new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)).write(hashMap);
                }
            }
        } finally {
            this.stateDirectory.unlock(this.taskId);
        }
    }

    private int getPartition(String str) {
        TopicPartition topicPartition = this.partitionForTopic.get(str);
        return topicPartition == null ? this.taskId.partition : topicPartition.partition();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerGlobalStateStores(List<StateStore> list) {
        for (StateStore stateStore : list) {
            this.globalStores.put(stateStore.name(), stateStore);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getGlobalStore(String str) {
        return this.globalStores.get(str);
    }
}
