package org.apache.kafka.streams;

import java.io.File;
import java.lang.Thread;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/kafka/streams/KafkaStreams.class */
public class KafkaStreams {
    private static final String JMX_PREFIX = "kafka.streams";
    private static final int CREATED = 0;
    private static final int STOPPED = 2;
    private int state;
    private final StreamThread[] threads;
    private final Metrics metrics;
    private final UUID processId;
    private final StreamsConfig config;
    private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
    private static final int RUNNING = 1;
    private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(RUNNING);

    public KafkaStreams(TopologyBuilder topologyBuilder, Properties properties) {
        this(topologyBuilder, new StreamsConfig(properties), new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(TopologyBuilder topologyBuilder, StreamsConfig streamsConfig) {
        this(topologyBuilder, streamsConfig, new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(TopologyBuilder topologyBuilder, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier) {
        this.state = CREATED;
        SystemTime systemTime = new SystemTime();
        this.processId = UUID.randomUUID();
        this.config = streamsConfig;
        String string = streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
        String string2 = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
        string2 = string2.length() <= 0 ? string + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement() : string2;
        List configuredInstances = streamsConfig.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
        configuredInstances.add(new JmxReporter(JMX_PREFIX));
        this.metrics = new Metrics(new MetricConfig().samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG).intValue()).timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG).longValue(), TimeUnit.MILLISECONDS), configuredInstances, systemTime);
        this.threads = new StreamThread[streamsConfig.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG).intValue()];
        for (int i = CREATED; i < this.threads.length; i += RUNNING) {
            this.threads[i] = new StreamThread(topologyBuilder, streamsConfig, kafkaClientSupplier, string, string2, this.processId, this.metrics, systemTime);
        }
    }

    public synchronized void start() {
        log.debug("Starting Kafka Stream process");
        if (this.state != 0) {
            if (this.state != RUNNING) {
                throw new IllegalStateException("Cannot restart after closing.");
            }
            throw new IllegalStateException("This process was already started.");
        }
        StreamThread[] streamThreadArr = this.threads;
        int length = streamThreadArr.length;
        for (int i = CREATED; i < length; i += RUNNING) {
            streamThreadArr[i].start();
        }
        this.state = RUNNING;
        log.info("Started Kafka Stream process");
    }

    public synchronized void close() {
        log.debug("Stopping Kafka Stream process");
        if (this.state == RUNNING) {
            StreamThread[] streamThreadArr = this.threads;
            int length = streamThreadArr.length;
            for (int i = CREATED; i < length; i += RUNNING) {
                streamThreadArr[i].close();
            }
            StreamThread[] streamThreadArr2 = this.threads;
            int length2 = streamThreadArr2.length;
            for (int i2 = CREATED; i2 < length2; i2 += RUNNING) {
                try {
                    streamThreadArr2[i2].join();
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
        }
        if (this.state != STOPPED) {
            this.metrics.close();
            this.state = STOPPED;
            log.info("Stopped Kafka Stream process");
        }
    }

    public void cleanUp() {
        if (this.state == RUNNING) {
            throw new IllegalStateException("Cannot clean up while running.");
        }
        String str = this.config.getString(StreamsConfig.STATE_DIR_CONFIG) + File.separator + this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
        log.debug("Clean up local Kafka Streams data in {}", str);
        log.debug("Removing local Kafka Streams application data in {} for application {}", str, this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
        Utils.delete(new File(str));
    }

    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        StreamThread[] streamThreadArr = this.threads;
        int length = streamThreadArr.length;
        for (int i = CREATED; i < length; i += RUNNING) {
            streamThreadArr[i].setUncaughtExceptionHandler(uncaughtExceptionHandler);
        }
    }
}
