package org.apache.kafka.connect.runtime.distributed;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.11.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.class */
public class WorkerCoordinator extends AbstractCoordinator implements Closeable {
    public static final String DEFAULT_SUBPROTOCOL = "default";
    private final Logger log;
    private final String restUrl;
    private final ConfigBackingStore configStorage;
    private volatile ExtendedAssignment assignmentSnapshot;
    private ClusterConfigState configSnapshot;
    private final WorkerRebalanceListener listener;
    private final ConnectProtocolCompatibility protocolCompatibility;
    private LeaderState leaderState;
    private boolean rejoinRequested;
    private volatile ConnectProtocolCompatibility currentConnectProtocol;
    private volatile int lastCompletedGenerationId;
    private final ConnectAssignor eagerAssignor;
    private final ConnectAssignor incrementalAssignor;
    private final int coordinatorDiscoveryTimeoutMs;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$ConnectorsAndTasks.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.11.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$ConnectorsAndTasks.class */
    public static class ConnectorsAndTasks {
        public static final ConnectorsAndTasks EMPTY = new ConnectorsAndTasks(Collections.emptyList(), Collections.emptyList());
        private final Collection<String> connectors;
        private final Collection<ConnectorTaskId> tasks;

        /* JADX WARN: Classes with same name are omitted:
          input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$ConnectorsAndTasks$Builder.class
         */
        /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.11.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$ConnectorsAndTasks$Builder.class */
        public static class Builder {
            private Collection<String> withConnectors;
            private Collection<ConnectorTaskId> withTasks;

            public Builder withCopies(Collection<String> collection, Collection<ConnectorTaskId> collection2) {
                this.withConnectors = new ArrayList(collection);
                this.withTasks = new ArrayList(collection2);
                return this;
            }

            public Builder with(Collection<String> collection, Collection<ConnectorTaskId> collection2) {
                this.withConnectors = new ArrayList(collection);
                this.withTasks = new ArrayList(collection2);
                return this;
            }

            public ConnectorsAndTasks build() {
                return new ConnectorsAndTasks(this.withConnectors != null ? this.withConnectors : new ArrayList(), this.withTasks != null ? this.withTasks : new ArrayList());
            }
        }

        private ConnectorsAndTasks(Collection<String> collection, Collection<ConnectorTaskId> collection2) {
            this.connectors = collection;
            this.tasks = collection2;
        }

        public Collection<String> connectors() {
            return this.connectors;
        }

        public Collection<ConnectorTaskId> tasks() {
            return this.tasks;
        }

        public int size() {
            return this.connectors.size() + this.tasks.size();
        }

        public boolean isEmpty() {
            return this.connectors.isEmpty() && this.tasks.isEmpty();
        }

        public String toString() {
            return "{ connectorIds=" + this.connectors + ", taskIds=" + this.tasks + '}';
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$LeaderState.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.11.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$LeaderState.class */
    public static class LeaderState {
        private final Map<String, ExtendedWorkerState> allMembers;
        private final Map<String, String> connectorOwners;
        private final Map<ConnectorTaskId, String> taskOwners;

        public LeaderState(Map<String, ExtendedWorkerState> map, Map<String, Collection<String>> map2, Map<String, Collection<ConnectorTaskId>> map3) {
            this.allMembers = map;
            this.connectorOwners = WorkerCoordinator.invertAssignment(map2);
            this.taskOwners = WorkerCoordinator.invertAssignment(map3);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String ownerUrl(ConnectorTaskId connectorTaskId) {
            String str = this.taskOwners.get(connectorTaskId);
            if (str == null) {
                return null;
            }
            return this.allMembers.get(str).url();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String ownerUrl(String str) {
            String str2 = this.connectorOwners.get(str);
            if (str2 == null) {
                return null;
            }
            return this.allMembers.get(str2).url();
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$WorkerCoordinatorMetrics.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.11.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$WorkerCoordinatorMetrics.class */
    private class WorkerCoordinatorMetrics {
        public final String metricGrpName;

        public WorkerCoordinatorMetrics(Metrics metrics, String str) {
            this.metricGrpName = str + "-coordinator-metrics";
            Measurable measurable = new Measurable() { // from class: org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerCoordinatorMetrics.1
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    if (WorkerCoordinator.this.assignmentSnapshot == null) {
                        return 0.0d;
                    }
                    return r0.connectors().size();
                }
            };
            Measurable measurable2 = new Measurable() { // from class: org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerCoordinatorMetrics.2
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    if (WorkerCoordinator.this.assignmentSnapshot == null) {
                        return 0.0d;
                    }
                    return r0.tasks().size();
                }
            };
            metrics.addMetric(metrics.metricName("assigned-connectors", this.metricGrpName, "The number of connector instances currently assigned to this consumer"), measurable);
            metrics.addMetric(metrics.metricName("assigned-tasks", this.metricGrpName, "The number of tasks currently assigned to this consumer"), measurable2);
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$WorkerLoad.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.11.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$WorkerLoad.class */
    public static class WorkerLoad {
        private final String worker;
        private final Collection<String> connectors;
        private final Collection<ConnectorTaskId> tasks;

        /* JADX WARN: Classes with same name are omitted:
          input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$WorkerLoad$Builder.class
         */
        /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.11.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/distributed/WorkerCoordinator$WorkerLoad$Builder.class */
        public static class Builder {
            private String withWorker;
            private Collection<String> withConnectors;
            private Collection<ConnectorTaskId> withTasks;

            public Builder(String str) {
                this.withWorker = (String) Objects.requireNonNull(str, "worker cannot be null");
            }

            public Builder withCopies(Collection<String> collection, Collection<ConnectorTaskId> collection2) {
                this.withConnectors = new ArrayList((Collection) Objects.requireNonNull(collection, "connectors may be empty but not null"));
                this.withTasks = new ArrayList((Collection) Objects.requireNonNull(collection2, "tasks may be empty but not null"));
                return this;
            }

            public Builder with(Collection<String> collection, Collection<ConnectorTaskId> collection2) {
                this.withConnectors = (Collection) Objects.requireNonNull(collection, "connectors may be empty but not null");
                this.withTasks = (Collection) Objects.requireNonNull(collection2, "tasks may be empty but not null");
                return this;
            }

            public WorkerLoad build() {
                return new WorkerLoad(this.withWorker, this.withConnectors != null ? this.withConnectors : new ArrayList(), this.withTasks != null ? this.withTasks : new ArrayList());
            }
        }

        private WorkerLoad(String str, Collection<String> collection, Collection<ConnectorTaskId> collection2) {
            this.worker = str;
            this.connectors = collection;
            this.tasks = collection2;
        }

        public String worker() {
            return this.worker;
        }

        public Collection<String> connectors() {
            return this.connectors;
        }

        public Collection<ConnectorTaskId> tasks() {
            return this.tasks;
        }

        public int connectorsSize() {
            return this.connectors.size();
        }

        public int tasksSize() {
            return this.tasks.size();
        }

        public void assign(String str) {
            this.connectors.add(str);
        }

        public void assign(ConnectorTaskId connectorTaskId) {
            this.tasks.add(connectorTaskId);
        }

        public int size() {
            return this.connectors.size() + this.tasks.size();
        }

        public boolean isEmpty() {
            return this.connectors.isEmpty() && this.tasks.isEmpty();
        }

        public static Comparator<WorkerLoad> connectorComparator() {
            return (workerLoad, workerLoad2) -> {
                int size = workerLoad.connectors.size() - workerLoad2.connectors.size();
                return size != 0 ? size : workerLoad.worker == null ? workerLoad2.worker == null ? 0 : -1 : workerLoad.worker.compareTo(workerLoad2.worker);
            };
        }

        public static Comparator<WorkerLoad> taskComparator() {
            return (workerLoad, workerLoad2) -> {
                int size = workerLoad.tasks.size() - workerLoad2.tasks.size();
                return size != 0 ? size : workerLoad.worker == null ? workerLoad2.worker == null ? 0 : -1 : workerLoad.worker.compareTo(workerLoad2.worker);
            };
        }

        public String toString() {
            return "{ worker=" + this.worker + ", connectorIds=" + this.connectors + ", taskIds=" + this.tasks + '}';
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof WorkerLoad)) {
                return false;
            }
            WorkerLoad workerLoad = (WorkerLoad) obj;
            return this.worker.equals(workerLoad.worker) && this.connectors.equals(workerLoad.connectors) && this.tasks.equals(workerLoad.tasks);
        }

        public int hashCode() {
            return Objects.hash(this.worker, this.connectors, this.tasks);
        }
    }

    public WorkerCoordinator(GroupRebalanceConfig groupRebalanceConfig, LogContext logContext, ConsumerNetworkClient consumerNetworkClient, Metrics metrics, String str, Time time, String str2, ConfigBackingStore configBackingStore, WorkerRebalanceListener workerRebalanceListener, ConnectProtocolCompatibility connectProtocolCompatibility, int i) {
        super(groupRebalanceConfig, logContext, consumerNetworkClient, metrics, str, time);
        this.log = logContext.logger(WorkerCoordinator.class);
        this.restUrl = str2;
        this.configStorage = configBackingStore;
        this.assignmentSnapshot = null;
        new WorkerCoordinatorMetrics(metrics, str);
        this.listener = workerRebalanceListener;
        this.rejoinRequested = false;
        this.protocolCompatibility = connectProtocolCompatibility;
        this.incrementalAssignor = new IncrementalCooperativeAssignor(logContext, time, i);
        this.eagerAssignor = new EagerAssignor(logContext);
        this.currentConnectProtocol = connectProtocolCompatibility;
        this.coordinatorDiscoveryTimeoutMs = groupRebalanceConfig.heartbeatIntervalMs;
        this.lastCompletedGenerationId = AbstractCoordinator.Generation.NO_GENERATION.generationId;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public void requestRejoin() {
        this.rejoinRequested = true;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public String protocolType() {
        return "connect";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public synchronized boolean ensureCoordinatorReady(Timer timer) {
        return super.ensureCoordinatorReady(timer);
    }

    public void poll(long j) {
        long milliseconds = this.time.milliseconds();
        long j2 = milliseconds;
        do {
            if (coordinatorUnknown()) {
                this.log.debug("Broker coordinator is marked unknown. Attempting discovery with a timeout of {}ms", Integer.valueOf(this.coordinatorDiscoveryTimeoutMs));
                if (ensureCoordinatorReady(this.time.timer(this.coordinatorDiscoveryTimeoutMs))) {
                    this.log.debug("Broker coordinator is ready");
                } else {
                    this.log.debug("Can not connect to broker coordinator");
                    ExtendedAssignment extendedAssignment = this.assignmentSnapshot;
                    if (extendedAssignment != null && !extendedAssignment.failed()) {
                        this.log.info("Broker coordinator was unreachable for {}ms. Revoking previous assignment {} to avoid running tasks while not being a member the group", Integer.valueOf(this.coordinatorDiscoveryTimeoutMs), extendedAssignment);
                        this.listener.onRevoked(extendedAssignment.leader(), extendedAssignment.connectors(), extendedAssignment.tasks());
                        this.assignmentSnapshot = null;
                    }
                }
                j2 = this.time.milliseconds();
            }
            if (rejoinNeededOrPending()) {
                ensureActiveGroup();
                j2 = this.time.milliseconds();
            }
            pollHeartbeat(j2);
            this.client.poll(this.time.timer(Math.min(Math.max(0L, j - (j2 - milliseconds)), timeToNextHeartbeat(j2))));
            j2 = this.time.milliseconds();
        } while (j - (j2 - milliseconds) > 0);
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
        this.configSnapshot = this.configStorage.snapshot();
        ExtendedWorkerState extendedWorkerState = new ExtendedWorkerState(this.restUrl, this.configSnapshot.offset(), this.assignmentSnapshot);
        switch (this.protocolCompatibility) {
            case EAGER:
                return ConnectProtocol.metadataRequest(extendedWorkerState);
            case COMPATIBLE:
                return IncrementalCooperativeConnectProtocol.metadataRequest(extendedWorkerState, false);
            case SESSIONED:
                return IncrementalCooperativeConnectProtocol.metadataRequest(extendedWorkerState, true);
            default:
                throw new IllegalStateException("Unknown Connect protocol compatibility mode " + this.protocolCompatibility);
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected void onJoinComplete(int i, String str, String str2, ByteBuffer byteBuffer) {
        ExtendedAssignment deserializeAssignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(byteBuffer);
        this.log.debug("Deserialized new assignment: {}", deserializeAssignment);
        this.currentConnectProtocol = ConnectProtocolCompatibility.fromProtocol(str2);
        this.rejoinRequested = false;
        if (this.currentConnectProtocol != ConnectProtocolCompatibility.EAGER) {
            if (!deserializeAssignment.revokedConnectors().isEmpty() || !deserializeAssignment.revokedTasks().isEmpty()) {
                this.listener.onRevoked(deserializeAssignment.leader(), deserializeAssignment.revokedConnectors(), deserializeAssignment.revokedTasks());
            }
            ExtendedAssignment extendedAssignment = this.assignmentSnapshot;
            if (extendedAssignment != null) {
                extendedAssignment.connectors().removeAll(deserializeAssignment.revokedConnectors());
                extendedAssignment.tasks().removeAll(deserializeAssignment.revokedTasks());
                this.log.debug("After revocations snapshot of assignment: {}", extendedAssignment);
                deserializeAssignment.connectors().addAll(extendedAssignment.connectors());
                deserializeAssignment.tasks().addAll(extendedAssignment.tasks());
            }
            this.log.debug("Augmented new assignment: {}", deserializeAssignment);
        }
        this.assignmentSnapshot = deserializeAssignment;
        this.lastCompletedGenerationId = i;
        this.listener.onAssigned(deserializeAssignment, i);
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected Map<String, ByteBuffer> performAssignment(String str, String str2, List<JoinGroupResponseData.JoinGroupResponseMember> list) {
        return ConnectProtocolCompatibility.fromProtocol(str2) == ConnectProtocolCompatibility.EAGER ? this.eagerAssignor.performAssignment(str, str2, list, this) : this.incrementalAssignor.performAssignment(str, str2, list, this);
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    protected void onJoinPrepare(int i, String str) {
        this.log.info("Rebalance started");
        leaderState(null);
        ExtendedAssignment extendedAssignment = this.assignmentSnapshot;
        if (this.currentConnectProtocol != ConnectProtocolCompatibility.EAGER) {
            this.log.debug("Cooperative rebalance triggered. Keeping assignment {} until it's explicitly revoked.", extendedAssignment);
            return;
        }
        this.log.debug("Revoking previous assignment {}", extendedAssignment);
        if (extendedAssignment == null || extendedAssignment.failed()) {
            return;
        }
        this.listener.onRevoked(extendedAssignment.leader(), extendedAssignment.connectors(), extendedAssignment.tasks());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public boolean rejoinNeededOrPending() {
        ExtendedAssignment extendedAssignment = this.assignmentSnapshot;
        return super.rejoinNeededOrPending() || extendedAssignment == null || extendedAssignment.failed() || this.rejoinRequested;
    }

    @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator
    public String memberId() {
        AbstractCoordinator.Generation generationIfStable = generationIfStable();
        return generationIfStable != null ? generationIfStable.memberId : "";
    }

    public int generationId() {
        return super.generation().generationId;
    }

    public int lastCompletedGenerationId() {
        return this.lastCompletedGenerationId;
    }

    public void revokeAssignment(ExtendedAssignment extendedAssignment) {
        this.listener.onRevoked(extendedAssignment.leader(), extendedAssignment.connectors(), extendedAssignment.tasks());
    }

    private boolean isLeader() {
        ExtendedAssignment extendedAssignment = this.assignmentSnapshot;
        return extendedAssignment != null && memberId().equals(extendedAssignment.leader());
    }

    public String ownerUrl(String str) {
        if (rejoinNeededOrPending() || !isLeader()) {
            return null;
        }
        return leaderState().ownerUrl(str);
    }

    public String ownerUrl(ConnectorTaskId connectorTaskId) {
        if (rejoinNeededOrPending() || !isLeader()) {
            return null;
        }
        return leaderState().ownerUrl(connectorTaskId);
    }

    public ClusterConfigState configFreshSnapshot() {
        return this.configStorage.snapshot();
    }

    public ClusterConfigState configSnapshot() {
        return this.configSnapshot;
    }

    public void configSnapshot(ClusterConfigState clusterConfigState) {
        this.configSnapshot = clusterConfigState;
    }

    private LeaderState leaderState() {
        return this.leaderState;
    }

    public void leaderState(LeaderState leaderState) {
        this.leaderState = leaderState;
    }

    public short currentProtocolVersion() {
        return this.currentConnectProtocol.protocolVersion();
    }

    public static <K, V> Map<V, K> invertAssignment(Map<K, Collection<V>> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<K, Collection<V>> entry : map.entrySet()) {
            K key = entry.getKey();
            Iterator<V> it = entry.getValue().iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), key);
            }
        }
        return hashMap;
    }
}
