package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AssignedTasks.class */
public class AssignedTasks<T extends AbstractTask> {
    private static final Logger log = LoggerFactory.getLogger(AssignedTasks.class);
    private final String logPrefix;
    private final String taskTypeName;
    private final Time time;
    private Map<TaskId, T> created = new HashMap();
    private Map<TaskId, T> suspended = new HashMap();
    private Map<TaskId, T> restoring = new HashMap();
    private Set<TopicPartition> restoredPartitions = new HashSet();
    private Set<TaskId> previousActiveTasks = new HashSet();
    private Map<TaskId, T> running = new ConcurrentHashMap();
    private Map<TopicPartition, T> runningByPartition = new HashMap();

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/AssignedTasks$Latency.class */
    class Latency {
        private long startTime;

        Latency(long j) {
            this.startTime = j;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long compute() {
            long j = this.startTime;
            this.startTime = AssignedTasks.this.time.milliseconds();
            return Math.max(this.startTime - j, 0L);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/AssignedTasks$TaskAction.class */
    public interface TaskAction<T extends AbstractTask> {
        String name();

        void apply(T t);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AssignedTasks(String str, String str2, Time time) {
        this.logPrefix = str;
        this.taskTypeName = str2;
        this.time = time;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addNewTask(T t) {
        log.trace("{} Add newly created {} {} with assigned partitions {}", new Object[]{this.logPrefix, this.taskTypeName, t.id(), t.partitions()});
        this.created.put(t.id(), t);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TopicPartition> uninitializedPartitions() {
        if (this.created.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet hashSet = new HashSet();
        for (Map.Entry<TaskId, T> entry : this.created.entrySet()) {
            if (entry.getValue().hasStateStores()) {
                hashSet.addAll(entry.getValue().partitions());
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initializeNewTasks() {
        if (!this.created.isEmpty()) {
            log.trace("{} Initializing {}s {}", new Object[]{this.logPrefix, this.taskTypeName, this.created.keySet()});
        }
        Iterator<Map.Entry<TaskId, T>> it = this.created.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, T> next = it.next();
            try {
                if (next.getValue().initializeStateStores()) {
                    transitionToRunning(next.getValue());
                } else {
                    log.debug("{} transitioning {} {} to restoring", new Object[]{this.logPrefix, this.taskTypeName, next.getKey()});
                    this.restoring.put(next.getKey(), next.getValue());
                }
                it.remove();
            } catch (LockException e) {
                log.trace("{} Could not create {} {} due to {}; will retry in the next run loop", new Object[]{this.logPrefix, this.taskTypeName, next.getKey(), e.getMessage()});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TopicPartition> updateRestored(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet hashSet = new HashSet();
        this.restoredPartitions.addAll(collection);
        Iterator<Map.Entry<TaskId, T>> it = this.restoring.entrySet().iterator();
        while (it.hasNext()) {
            T value = it.next().getValue();
            if (this.restoredPartitions.containsAll(value.changelogPartitions())) {
                transitionToRunning(value);
                hashSet.addAll(value.partitions());
                it.remove();
                log.trace("{} {} {} completed restoration as all its changelog partitions {} have been applied to restore state", new Object[]{this.logPrefix, this.taskTypeName, value.id(), value.changelogPartitions()});
            } else if (log.isTraceEnabled()) {
                new HashSet(value.changelogPartitions()).removeAll(this.restoredPartitions);
                log.trace("{} partition restoration not complete for {} {} partitions: {}", new Object[]{this.logPrefix, this.taskTypeName, value.id(), value.changelogPartitions()});
            }
        }
        if (allTasksRunning()) {
            this.restoredPartitions.clear();
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean allTasksRunning() {
        return this.created.isEmpty() && this.suspended.isEmpty() && this.restoring.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<T> runningTasks() {
        return this.running.values();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RuntimeException suspend() {
        AtomicReference atomicReference = new AtomicReference(null);
        log.trace("{} Suspending running {} {}", new Object[]{this.logPrefix, this.taskTypeName, runningTaskIds()});
        atomicReference.compareAndSet(null, suspendTasks(this.running.values()));
        log.trace("{} Close restoring {} {}", new Object[]{this.logPrefix, this.taskTypeName, this.restoring.keySet()});
        atomicReference.compareAndSet(null, closeTasksUnclean(this.restoring.values()));
        atomicReference.compareAndSet(null, closeTasksUnclean(this.created.values()));
        this.previousActiveTasks.clear();
        this.previousActiveTasks.addAll(this.running.keySet());
        this.running.clear();
        this.restoring.clear();
        this.created.clear();
        this.runningByPartition.clear();
        return (RuntimeException) atomicReference.get();
    }

    private RuntimeException closeTasksUnclean(Collection<T> collection) {
        RuntimeException runtimeException = null;
        for (T t : collection) {
            try {
                t.close(false, false);
            } catch (RuntimeException e) {
                log.error("{} Failed to close {}, {}", new Object[]{this.logPrefix, this.taskTypeName, t.id, e});
                if (runtimeException == null) {
                    runtimeException = e;
                }
            }
        }
        return runtimeException;
    }

    private RuntimeException suspendTasks(Collection<T> collection) {
        RuntimeException runtimeException = null;
        Iterator<T> it = collection.iterator();
        while (it.hasNext()) {
            T next = it.next();
            try {
                next.suspend();
                this.suspended.put(next.id(), next);
            } catch (ProducerFencedException e) {
                closeZombieTask(next);
                it.remove();
            } catch (CommitFailedException e2) {
                this.suspended.put(next.id(), next);
                log.warn("{} Failed to commit {} {} state when suspending due to CommitFailedException", new Object[]{this.logPrefix, this.taskTypeName, next.id});
            } catch (RuntimeException e3) {
                log.error("{} Suspending {} {} failed due to the following error:", new Object[]{this.logPrefix, this.taskTypeName, next.id, e3});
                try {
                    next.close(false, false);
                } catch (Exception e4) {
                    log.error("{} After suspending failed, closing the same {} {} failed again due to the following error:", new Object[]{this.logPrefix, this.taskTypeName, next.id, e4});
                }
                if (runtimeException == null) {
                    runtimeException = e3;
                }
            }
        }
        return runtimeException;
    }

    private void closeZombieTask(T t) {
        log.warn("{} Producer of {} {} fenced; closing zombie task", new Object[]{this.logPrefix, this.taskTypeName, t.id});
        try {
            t.close(false, true);
        } catch (Exception e) {
            log.warn("{} Failed to close zombie {} due to {}, ignore and proceed", new Object[]{this.taskTypeName, this.logPrefix, e});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasRunningTasks() {
        return !this.running.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean maybeResumeSuspendedTask(TaskId taskId, Set<TopicPartition> set) {
        if (!this.suspended.containsKey(taskId)) {
            return false;
        }
        T t = this.suspended.get(taskId);
        if (!t.partitions().equals(set)) {
            log.trace("{} couldn't resume task {} assigned partitions {}, task partitions", new Object[]{this.logPrefix, taskId, set, t.partitions});
            return false;
        }
        this.suspended.remove(taskId);
        log.trace("{} Resuming suspended {} {} with assigned partitions {}", new Object[]{this.logPrefix, this.taskTypeName, taskId, set});
        t.resume();
        try {
            transitionToRunning(t);
            return true;
        } catch (ProducerFencedException e) {
            closeZombieTask(t);
            this.suspended.remove(taskId);
            this.running.remove(t.id());
            throw e;
        }
    }

    private void transitionToRunning(T t) {
        log.debug("{} transitioning {} {} to running", new Object[]{this.logPrefix, this.taskTypeName, t.id()});
        this.running.put(t.id(), t);
        t.initializeTopology();
        Iterator<TopicPartition> it = t.partitions().iterator();
        while (it.hasNext()) {
            this.runningByPartition.put(it.next(), t);
        }
        Iterator<TopicPartition> it2 = t.changelogPartitions().iterator();
        while (it2.hasNext()) {
            this.runningByPartition.put(it2.next(), t);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public T runningTaskFor(TopicPartition topicPartition) {
        return this.runningByPartition.get(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> runningTaskIds() {
        return this.running.keySet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, T> runningTaskMap() {
        return Collections.unmodifiableMap(this.running);
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        describe(sb, this.running.values(), str, "Running:");
        describe(sb, this.suspended.values(), str, "Suspended:");
        describe(sb, this.restoring.values(), str, "Restoring:");
        describe(sb, this.created.values(), str, "New:");
        return sb.toString();
    }

    private void describe(StringBuilder sb, Collection<T> collection, String str, String str2) {
        sb.append(str).append(str2);
        Iterator<T> it = collection.iterator();
        while (it.hasNext()) {
            sb.append(str).append(it.next().toString(str + "\t\t"));
        }
        sb.append("\n");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<AbstractTask> allInitializedTasks() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.running.values());
        arrayList.addAll(this.suspended.values());
        arrayList.addAll(this.restoring.values());
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<T> suspendedTasks() {
        return this.suspended.values();
    }

    Collection<T> restoringTasks() {
        return Collections.unmodifiableCollection(this.restoring.values());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<TaskId> allAssignedTaskIds() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.running.keySet());
        arrayList.addAll(this.suspended.keySet());
        arrayList.addAll(this.restoring.keySet());
        arrayList.addAll(this.created.keySet());
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clear() {
        this.runningByPartition.clear();
        this.running.clear();
        this.created.clear();
        this.suspended.clear();
        this.restoredPartitions.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> previousTaskIds() {
        return this.previousActiveTasks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void commit() {
        RuntimeException applyToRunningTasks = applyToRunningTasks(new TaskAction<T>() { // from class: org.apache.kafka.streams.processor.internals.AssignedTasks.1
            @Override // org.apache.kafka.streams.processor.internals.AssignedTasks.TaskAction
            public String name() {
                return "commit";
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedTasks.TaskAction
            public void apply(T t) {
                t.commit();
            }
        }, false);
        if (applyToRunningTasks != null) {
            throw applyToRunningTasks;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public int process() {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        applyToRunningTasks(new TaskAction<T>() { // from class: org.apache.kafka.streams.processor.internals.AssignedTasks.2
            @Override // org.apache.kafka.streams.processor.internals.AssignedTasks.TaskAction
            public String name() {
                return "process";
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedTasks.TaskAction
            public void apply(T t) {
                if (t.process()) {
                    atomicInteger.incrementAndGet();
                }
            }
        }, true);
        return atomicInteger.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void punctuateAndCommit(final Sensor sensor, final Sensor sensor2) {
        final Latency latency = new Latency(this.time.milliseconds());
        RuntimeException applyToRunningTasks = applyToRunningTasks(new TaskAction<T>() { // from class: org.apache.kafka.streams.processor.internals.AssignedTasks.3
            String name;

            @Override // org.apache.kafka.streams.processor.internals.AssignedTasks.TaskAction
            public String name() {
                return this.name;
            }

            @Override // org.apache.kafka.streams.processor.internals.AssignedTasks.TaskAction
            public void apply(T t) {
                this.name = "punctuate";
                if (t.maybePunctuate()) {
                    sensor2.record(latency.compute(), latency.startTime);
                }
                if (t.commitNeeded()) {
                    this.name = "commit";
                    long milliseconds = AssignedTasks.this.time.milliseconds();
                    t.commit();
                    sensor.record(latency.compute(), latency.startTime);
                    if (AssignedTasks.log.isDebugEnabled()) {
                        AssignedTasks.log.debug("{} Committed active task {} per user request in {} ms", new Object[]{AssignedTasks.this.logPrefix, t.id(), Long.valueOf(latency.startTime - milliseconds)});
                    }
                }
            }
        }, false);
        if (applyToRunningTasks != null) {
            throw applyToRunningTasks;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<TaskId> suspendedTaskIds() {
        return this.suspended.keySet();
    }

    private RuntimeException applyToRunningTasks(TaskAction<T> taskAction, boolean z) {
        RuntimeException runtimeException = null;
        Iterator<T> it = runningTasks().iterator();
        while (it.hasNext()) {
            T next = it.next();
            try {
                taskAction.apply(next);
            } catch (CommitFailedException e) {
                log.warn("{} Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", new Object[]{this.logPrefix, this.taskTypeName, next.id(), taskAction.name()});
            } catch (RuntimeException e2) {
                log.error("{} Failed to {} {} {} due to the following error:", new Object[]{this.logPrefix, taskAction.name(), this.taskTypeName, next.id(), e2});
                if (z) {
                    throw e2;
                }
                if (runtimeException == null) {
                    runtimeException = e2;
                }
            } catch (ProducerFencedException e3) {
                closeZombieTask(next);
                it.remove();
            }
        }
        return runtimeException;
    }
}
