package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AbstractTask.class */
public abstract class AbstractTask implements Task {
    final TaskId id;
    final String applicationId;
    final ProcessorTopology topology;
    final ProcessorStateManager stateMgr;
    final Set<TopicPartition> partitions;
    final Consumer<byte[], byte[]> consumer;
    final String logPrefix;
    final boolean eosEnabled;
    final Logger log;
    final LogContext logContext;
    boolean taskInitialized;
    private final StateDirectory stateDirectory;
    InternalProcessorContext processorContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractTask(TaskId taskId, String str, Collection<TopicPartition> collection, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, boolean z, StateDirectory stateDirectory, StreamsConfig streamsConfig) {
        this.id = taskId;
        this.applicationId = str;
        this.partitions = new HashSet(collection);
        this.topology = processorTopology;
        this.consumer = consumer;
        this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
        this.stateDirectory = stateDirectory;
        Object[] objArr = new Object[2];
        objArr[0] = z ? "standby-task" : "task";
        objArr[1] = id();
        this.logPrefix = String.format("%s [%s] ", objArr);
        this.logContext = new LogContext(this.logPrefix);
        this.log = this.logContext.logger(getClass());
        try {
            this.stateMgr = new ProcessorStateManager(taskId, collection, z, stateDirectory, processorTopology.storeToChangelogTopic(), changelogReader, this.eosEnabled, this.logContext);
        } catch (IOException e) {
            throw new ProcessorStateException(String.format("%sError while creating the state manager", this.logPrefix), e);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public TaskId id() {
        return this.id;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public final String applicationId() {
        return this.applicationId;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public final Set<TopicPartition> partitions() {
        return this.partitions;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public final ProcessorTopology topology() {
        return this.topology;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public final ProcessorContext context() {
        return this.processorContext;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public StateStore getStore(String str) {
        return this.stateMgr.getStore(str);
    }

    public String toString() {
        return toString("");
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append(str);
        sb.append("StreamsTask taskId: ");
        sb.append(this.id);
        sb.append("\n");
        if (this.topology != null) {
            sb.append(str).append(this.topology.toString(str + "\t"));
        }
        if (this.partitions != null && !this.partitions.isEmpty()) {
            sb.append(str).append("Partitions [");
            Iterator<TopicPartition> it = this.partitions.iterator();
            while (it.hasNext()) {
                sb.append(it.next().toString()).append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }

    protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
        return Collections.emptyMap();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateOffsetLimits() {
        for (TopicPartition topicPartition : this.partitions) {
            try {
                OffsetAndMetadata committed = this.consumer.committed(topicPartition);
                long offset = committed != null ? committed.offset() : 0L;
                this.stateMgr.putOffsetLimit(topicPartition, offset);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Updating store offset limits {} for changelog {}", Long.valueOf(offset), topicPartition);
                }
            } catch (KafkaException e) {
                throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", this.id, topicPartition), e);
            } catch (AuthorizationException e2) {
                throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", this.id, topicPartition), e2);
            } catch (WakeupException e3) {
                throw e3;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flushState() {
        this.stateMgr.flush();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerStateStores() {
        if (this.topology.stateStores().isEmpty()) {
            return;
        }
        try {
            if (!this.stateDirectory.lock(this.id, 5)) {
                throw new LockException(String.format("%sFailed to lock the state directory for task %s", this.logPrefix, this.id));
            }
            this.log.trace("Initializing state stores");
            updateOffsetLimits();
            for (StateStore stateStore : this.topology.stateStores()) {
                this.log.trace("Initializing store {}", stateStore.name());
                stateStore.init(this.processorContext, stateStore);
            }
        } catch (IOException e) {
            throw new StreamsException(String.format("%sFatal error while trying to lock the state directory for task %s", this.logPrefix, this.id));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [java.lang.Throwable] */
    public void closeStateManager(boolean z) throws ProcessorStateException {
        ProcessorStateException processorStateException = null;
        this.log.trace("Closing state manager");
        try {
            this.stateMgr.close(z ? activeTaskCheckpointableOffsets() : null);
            try {
                this.stateDirectory.unlock(this.id);
            } catch (IOException e) {
                if (0 == 0) {
                    processorStateException = new ProcessorStateException(String.format("%sFailed to release state dir lock", this.logPrefix), e);
                }
            }
        } catch (ProcessorStateException e2) {
            processorStateException = e2;
            try {
                this.stateDirectory.unlock(this.id);
            } catch (IOException e3) {
                if (processorStateException == null) {
                    processorStateException = new ProcessorStateException(String.format("%sFailed to release state dir lock", this.logPrefix), e3);
                }
            }
        } catch (Throwable th) {
            try {
                this.stateDirectory.unlock(this.id);
            } catch (IOException e4) {
                if (0 == 0) {
                    new ProcessorStateException(String.format("%sFailed to release state dir lock", this.logPrefix), e4);
                }
            }
            throw th;
        }
        if (processorStateException != null) {
            throw processorStateException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean hasStateStores() {
        return !this.topology.stateStores().isEmpty();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Collection<TopicPartition> changelogPartitions() {
        return this.stateMgr.changelogPartitions();
    }
}
