package org.apache.kafka.streams.processor.internals.metrics;

import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.class */
public class StreamsMetricsImpl implements StreamsMetrics {
    private final Metrics metrics;
    private final Map<String, String> tags;
    private final Map<Sensor, Sensor> parentSensors;
    private final Sensor skippedRecordsSensor;
    private final String threadName;
    private final Deque<String> threadLevelSensors = new LinkedList();
    private final Map<String, Deque<String>> taskLevelSensors = new HashMap();
    private final Map<String, Deque<String>> cacheLevelSensors = new HashMap();

    public StreamsMetricsImpl(Metrics metrics, String str) {
        Objects.requireNonNull(metrics, "Metrics cannot be null");
        this.threadName = str;
        this.metrics = metrics;
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("client-id", str);
        this.tags = Collections.unmodifiableMap(linkedHashMap);
        this.parentSensors = new HashMap();
        this.skippedRecordsSensor = threadLevelSensor("skipped-records", Sensor.RecordingLevel.INFO, new Sensor[0]);
        this.skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records", linkedHashMap), new Rate(TimeUnit.SECONDS, new Count()));
        this.skippedRecordsSensor.add(metrics.metricName("skipped-records-total", "stream-metrics", "The total number of skipped records", linkedHashMap), new Total());
    }

    public final Sensor threadLevelSensor(String str, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensor;
        synchronized (this.threadLevelSensors) {
            String str2 = this.threadName + "." + str;
            sensor = this.metrics.sensor(str2, recordingLevel, sensorArr);
            this.threadLevelSensors.push(str2);
        }
        return sensor;
    }

    public final void removeAllThreadLevelSensors() {
        synchronized (this.threadLevelSensors) {
            while (!this.threadLevelSensors.isEmpty()) {
                this.metrics.removeSensor(this.threadLevelSensors.pop());
            }
        }
    }

    public final Sensor taskLevelSensor(String str, String str2, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensor;
        String str3 = this.threadName + "." + str;
        synchronized (this.taskLevelSensors) {
            if (!this.taskLevelSensors.containsKey(str3)) {
                this.taskLevelSensors.put(str3, new LinkedList());
            }
            String str4 = str3 + "." + str2;
            sensor = this.metrics.sensor(str4, recordingLevel, sensorArr);
            this.taskLevelSensors.get(str3).push(str4);
        }
        return sensor;
    }

    public final void removeAllTaskLevelSensors(String str) {
        String str2 = this.threadName + "." + str;
        synchronized (this.taskLevelSensors) {
            if (this.taskLevelSensors.containsKey(str2)) {
                while (!this.taskLevelSensors.get(str2).isEmpty()) {
                    this.metrics.removeSensor(this.taskLevelSensors.get(str2).pop());
                }
                this.taskLevelSensors.remove(str2);
            }
        }
    }

    public final Sensor cacheLevelSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        Sensor sensor;
        String str4 = this.threadName + "." + str + "." + str2;
        synchronized (this.cacheLevelSensors) {
            if (!this.cacheLevelSensors.containsKey(str4)) {
                this.cacheLevelSensors.put(str4, new LinkedList());
            }
            String str5 = str4 + "." + str3;
            sensor = this.metrics.sensor(str5, recordingLevel, sensorArr);
            this.cacheLevelSensors.get(str4).push(str5);
        }
        return sensor;
    }

    public final void removeAllCacheLevelSensors(String str, String str2) {
        String str3 = this.threadName + "." + str + "." + str2;
        synchronized (this.cacheLevelSensors) {
            if (this.cacheLevelSensors.containsKey(str3)) {
                while (!this.cacheLevelSensors.get(str3).isEmpty()) {
                    this.metrics.removeSensor(this.cacheLevelSensors.get(str3).pop());
                }
                this.cacheLevelSensors.remove(str3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Map<String, String> tags() {
        return this.tags;
    }

    public final Sensor skippedRecordsSensor() {
        return this.skippedRecordsSensor;
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addSensor(String str, Sensor.RecordingLevel recordingLevel) {
        return this.metrics.sensor(str, recordingLevel);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addSensor(String str, Sensor.RecordingLevel recordingLevel, Sensor... sensorArr) {
        return this.metrics.sensor(str, recordingLevel, sensorArr);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public void recordLatency(Sensor sensor, long j, long j2) {
        sensor.record(j2 - j);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public void recordThroughput(Sensor sensor, long j) {
        sensor.record(j);
    }

    private String groupNameFromScope(String str) {
        return "stream-" + str + "-metrics";
    }

    private String sensorName(String str, String str2) {
        return str2 == null ? str : str2 + "-" + str;
    }

    public Map<String, String> tagMap(String... strArr) {
        HashMap hashMap = new HashMap(this.tags);
        if (strArr != null) {
            if (strArr.length % 2 != 0) {
                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
            }
            for (int i = 0; i < strArr.length; i += 2) {
                hashMap.put(strArr[i], strArr[i + 1]);
            }
        }
        return hashMap;
    }

    private Map<String, String> constructTags(String str, String str2, String... strArr) {
        String[] strArr2 = (String[]) Arrays.copyOf(strArr, strArr.length + 2);
        strArr2[strArr.length] = str + "-id";
        strArr2[strArr.length + 1] = str2;
        return tagMap(strArr2);
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addLatencyAndThroughputSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, String... strArr) {
        return addLatencyAndThroughputSensor(null, str, str2, str3, recordingLevel, strArr);
    }

    public Sensor addLatencyAndThroughputSensor(String str, String str2, String str3, String str4, Sensor.RecordingLevel recordingLevel, String... strArr) {
        Map<String, String> constructTags = constructTags(str2, str3, strArr);
        Map<String, String> constructTags2 = constructTags(str2, StreamsConfig.OPTIMIZE, strArr);
        Sensor sensor = this.metrics.sensor(sensorName(buildUniqueSensorName(str4, str), null), recordingLevel);
        addLatencyMetrics(str2, sensor, str4, constructTags2);
        addThroughputMetrics(str2, sensor, str4, constructTags2);
        Sensor sensor2 = this.metrics.sensor(sensorName(buildUniqueSensorName(str4, str), str3), recordingLevel, new Sensor[]{sensor});
        addLatencyMetrics(str2, sensor2, str4, constructTags);
        addThroughputMetrics(str2, sensor2, str4, constructTags);
        this.parentSensors.put(sensor2, sensor);
        return sensor2;
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public Sensor addThroughputSensor(String str, String str2, String str3, Sensor.RecordingLevel recordingLevel, String... strArr) {
        return addThroughputSensor(null, str, str2, str3, recordingLevel, strArr);
    }

    public Sensor addThroughputSensor(String str, String str2, String str3, String str4, Sensor.RecordingLevel recordingLevel, String... strArr) {
        Map<String, String> constructTags = constructTags(str2, str3, strArr);
        Map<String, String> constructTags2 = constructTags(str2, StreamsConfig.OPTIMIZE, strArr);
        Sensor sensor = this.metrics.sensor(sensorName(buildUniqueSensorName(str4, str), null), recordingLevel);
        addThroughputMetrics(str2, sensor, str4, constructTags2);
        Sensor sensor2 = this.metrics.sensor(sensorName(buildUniqueSensorName(str4, str), str3), recordingLevel, new Sensor[]{sensor});
        addThroughputMetrics(str2, sensor2, str4, constructTags);
        this.parentSensors.put(sensor2, sensor);
        return sensor2;
    }

    private String buildUniqueSensorName(String str, String str2) {
        return this.threadName + "." + (str2 == null ? "" : str2 + ".") + str;
    }

    private void addLatencyMetrics(String str, Sensor sensor, String str2, Map<String, String> map) {
        sensor.add(this.metrics.metricName(str2 + "-latency-avg", groupNameFromScope(str), "The average latency of " + str2 + " operation.", map), new Avg());
        sensor.add(this.metrics.metricName(str2 + "-latency-max", groupNameFromScope(str), "The max latency of " + str2 + " operation.", map), new Max());
    }

    private void addThroughputMetrics(String str, Sensor sensor, String str2, Map<String, String> map) {
        sensor.add(this.metrics.metricName(str2 + "-rate", groupNameFromScope(str), "The average number of occurrence of " + str2 + " operation per second.", map), new Rate(TimeUnit.SECONDS, new Count()));
        sensor.add(this.metrics.metricName(str2 + "-total", groupNameFromScope(str), "The total number of occurrence of " + str2 + " operations.", map), new Count());
    }

    @Override // org.apache.kafka.streams.StreamsMetrics
    public void removeSensor(Sensor sensor) {
        Objects.requireNonNull(sensor, "Sensor is null");
        this.metrics.removeSensor(sensor.name());
        Sensor sensor2 = this.parentSensors.get(sensor);
        if (sensor2 != null) {
            this.metrics.removeSensor(sensor2.name());
        }
    }
}
