package org.apache.kafka.connect.runtime.rest.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.class
 */
@Produces({"application/json"})
@Path("/connectors")
@Consumes({"application/json"})
/* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.class */
public class ConnectorsResource {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConnectorsResource.class);
    private static final TypeReference<List<Map<String, String>>> TASK_CONFIGS_TYPE = new TypeReference<List<Map<String, String>>>() { // from class: org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.1
    };
    public static final long REQUEST_TIMEOUT_MS = 90000;
    private static long requestTimeoutMs = REQUEST_TIMEOUT_MS;
    private final Herder herder;
    private final WorkerConfig config;

    @Context
    private ServletContext context;
    private final boolean isTopicTrackingDisabled;
    private final boolean isTopicTrackingResetDisabled;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource$CreatedConnectorInfoTranslator.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource$CreatedConnectorInfoTranslator.class */
    private static class CreatedConnectorInfoTranslator implements Translator<Herder.Created<ConnectorInfo>, ConnectorInfo> {
        private CreatedConnectorInfoTranslator() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.Translator
        public Herder.Created<ConnectorInfo> translate(RestClient.HttpResponse<ConnectorInfo> httpResponse) {
            return new Herder.Created<>(httpResponse.status() == 201, httpResponse.body());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource$IdentityTranslator.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource$IdentityTranslator.class */
    public static class IdentityTranslator<T> implements Translator<T, T> {
        private IdentityTranslator() {
        }

        @Override // org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.Translator
        public T translate(RestClient.HttpResponse<T> httpResponse) {
            return httpResponse.body();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource$Translator.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource$Translator.class */
    public interface Translator<T, U> {
        T translate(RestClient.HttpResponse<U> httpResponse);
    }

    public ConnectorsResource(Herder herder, WorkerConfig workerConfig) {
        this.herder = herder;
        this.config = workerConfig;
        this.isTopicTrackingDisabled = !workerConfig.getBoolean(WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG).booleanValue();
        this.isTopicTrackingResetDisabled = !workerConfig.getBoolean(WorkerConfig.TOPIC_TRACKING_ALLOW_RESET_CONFIG).booleanValue();
    }

    public static void setRequestTimeout(long j) {
        requestTimeoutMs = j;
    }

    public static void resetRequestTimeout() {
        requestTimeoutMs = REQUEST_TIMEOUT_MS;
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x00bb, code lost:
    
        switch(r15) {
            case 0: goto L21;
            case 1: goto L22;
            default: goto L23;
        };
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x00d4, code lost:
    
        r0.put(io.debezium.pipeline.txmetadata.TransactionMonitor.DEBEZIUM_TRANSACTION_STATUS_KEY, r5.herder.connectorStatus(r0));
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x00ec, code lost:
    
        r0.put("info", r5.herder.connectorInfo(r0));
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x0104, code lost:
    
        org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.log.info("Ignoring unknown expansion type {}", r0);
     */
    /* JADX WARN: Removed duplicated region for block: B:11:0x0068 A[Catch: NotFoundException -> 0x0121, TryCatch #0 {NotFoundException -> 0x0121, blocks: (B:8:0x003e, B:9:0x005e, B:11:0x0068, B:12:0x0080, B:13:0x009c, B:16:0x00ac, B:20:0x00bb, B:21:0x00d4, B:24:0x00ec, B:26:0x0104, B:29:0x0113), top: B:7:0x003e }] */
    @javax.ws.rs.GET
    @javax.ws.rs.Path("/")
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public javax.ws.rs.core.Response listConnectors(@javax.ws.rs.core.Context javax.ws.rs.core.UriInfo r6, @javax.ws.rs.core.Context javax.ws.rs.core.HttpHeaders r7) {
        /*
            Method dump skipped, instructions count: 330
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.listConnectors(javax.ws.rs.core.UriInfo, javax.ws.rs.core.HttpHeaders):javax.ws.rs.core.Response");
    }

    @POST
    @Path("/")
    public Response createConnector(@QueryParam("forward") Boolean bool, @Context HttpHeaders httpHeaders, CreateConnectorRequest createConnectorRequest) throws Throwable {
        String trim = createConnectorRequest.name() == null ? "" : createConnectorRequest.name().trim();
        Map<String, String> config = createConnectorRequest.config();
        checkAndPutConnectorConfigName(trim, config);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.putConnectorConfig(trim, config, false, futureCallback);
        return Response.created(UriBuilder.fromUri("/connectors").path(trim).build(new Object[0])).entity(((Herder.Created) completeOrForwardRequest(futureCallback, "/connectors", HttpMethod.POST, httpHeaders, createConnectorRequest, new TypeReference<ConnectorInfo>() { // from class: org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.2
        }, new CreatedConnectorInfoTranslator(), bool)).result()).build();
    }

    @GET
    @Path("/{connector}")
    public ConnectorInfo getConnector(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectorInfo(str, futureCallback);
        return (ConnectorInfo) completeOrForwardRequest(futureCallback, "/connectors/" + str, HttpMethod.GET, httpHeaders, null, bool);
    }

    @GET
    @Path("/{connector}/config")
    public Map<String, String> getConnectorConfig(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectorConfig(str, futureCallback);
        return (Map) completeOrForwardRequest(futureCallback, "/connectors/" + str + "/config", HttpMethod.GET, httpHeaders, null, bool);
    }

    @GET
    @Path("/{connector}/status")
    public ConnectorStateInfo getConnectorStatus(@PathParam("connector") String str) {
        return this.herder.connectorStatus(str);
    }

    @GET
    @Path("/{connector}/topics")
    public Response getConnectorActiveTopics(@PathParam("connector") String str) {
        if (this.isTopicTrackingDisabled) {
            throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking is disabled.");
        }
        ActiveTopicsInfo connectorActiveTopics = this.herder.connectorActiveTopics(str);
        return Response.ok(Collections.singletonMap(connectorActiveTopics.connector(), connectorActiveTopics)).build();
    }

    @Path("/{connector}/topics/reset")
    @PUT
    public Response resetConnectorActiveTopics(@PathParam("connector") String str, @Context HttpHeaders httpHeaders) {
        if (this.isTopicTrackingDisabled) {
            throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking is disabled.");
        }
        if (this.isTopicTrackingResetDisabled) {
            throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), "Topic tracking reset is disabled.");
        }
        this.herder.resetConnectorActiveTopics(str);
        return Response.accepted().build();
    }

    @Path("/{connector}/config")
    @PUT
    public Response putConnectorConfig(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool, Map<String, String> map) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        checkAndPutConnectorConfigName(str, map);
        this.herder.putConnectorConfig(str, map, true, futureCallback);
        Herder.Created created = (Herder.Created) completeOrForwardRequest(futureCallback, "/connectors/" + str + "/config", "PUT", httpHeaders, map, new TypeReference<ConnectorInfo>() { // from class: org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.3
        }, new CreatedConnectorInfoTranslator(), bool);
        return (created.created() ? Response.created(UriBuilder.fromUri("/connectors").path(str).build(new Object[0])) : Response.ok()).entity(created.result()).build();
    }

    @POST
    @Path("/{connector}/restart")
    public void restartConnector(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartConnector(str, futureCallback);
        completeOrForwardRequest(futureCallback, "/connectors/" + str + "/restart", HttpMethod.POST, httpHeaders, null, bool);
    }

    @Path("/{connector}/pause")
    @PUT
    public Response pauseConnector(@PathParam("connector") String str, @Context HttpHeaders httpHeaders) {
        this.herder.pauseConnector(str);
        return Response.accepted().build();
    }

    @Path("/{connector}/resume")
    @PUT
    public Response resumeConnector(@PathParam("connector") String str) {
        this.herder.resumeConnector(str);
        return Response.accepted().build();
    }

    @GET
    @Path("/{connector}/tasks")
    public List<TaskInfo> getTaskConfigs(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.taskConfigs(str, futureCallback);
        return (List) completeOrForwardRequest(futureCallback, "/connectors/" + str + "/tasks", HttpMethod.GET, httpHeaders, null, new TypeReference<List<TaskInfo>>() { // from class: org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.4
        }, bool);
    }

    @POST
    @Path("/{connector}/tasks")
    public void putTaskConfigs(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool, byte[] bArr) throws Throwable {
        List<Map<String, String>> list = (List) new ObjectMapper().readValue(bArr, TASK_CONFIGS_TYPE);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.putTaskConfigs(str, list, futureCallback, InternalRequestSignature.fromHeaders(bArr, httpHeaders));
        completeOrForwardRequest(futureCallback, "/connectors/" + str + "/tasks", HttpMethod.POST, httpHeaders, list, bool);
    }

    @GET
    @Path("/{connector}/tasks/{task}/status")
    public ConnectorStateInfo.TaskState getTaskStatus(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @PathParam("task") Integer num) {
        return this.herder.taskStatus(new ConnectorTaskId(str, num.intValue()));
    }

    @POST
    @Path("/{connector}/tasks/{task}/restart")
    public void restartTask(@PathParam("connector") String str, @PathParam("task") Integer num, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.restartTask(new ConnectorTaskId(str, num.intValue()), futureCallback);
        completeOrForwardRequest(futureCallback, "/connectors/" + str + "/tasks/" + num + "/restart", HttpMethod.POST, httpHeaders, null, bool);
    }

    @Path("/{connector}")
    @DELETE
    public void destroyConnector(@PathParam("connector") String str, @Context HttpHeaders httpHeaders, @QueryParam("forward") Boolean bool) throws Throwable {
        FutureCallback futureCallback = new FutureCallback();
        this.herder.deleteConnectorConfig(str, futureCallback);
        completeOrForwardRequest(futureCallback, "/connectors/" + str, "DELETE", httpHeaders, null, bool);
    }

    private void checkAndPutConnectorConfigName(String str, Map<String, String> map) {
        String str2 = map.get("name");
        if (str2 == null) {
            map.put("name", str);
        } else if (!str2.equals(str)) {
            throw new BadRequestException("Connector name configuration (" + str2 + ") doesn't match connector name in the URL (" + str + DefaultExpressionEngine.DEFAULT_INDEX_END);
        }
    }

    private <T, U> T completeOrForwardRequest(FutureCallback<T> futureCallback, String str, String str2, HttpHeaders httpHeaders, Object obj, TypeReference<U> typeReference, Translator<T, U> translator, Boolean bool) throws Throwable {
        try {
            return futureCallback.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted");
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (!(cause instanceof RequestTargetException)) {
                if (cause instanceof RebalanceNeededException) {
                    throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)");
                }
                throw cause;
            }
            if (bool != null && !bool.booleanValue()) {
                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request because of a conflicting operation (e.g. worker rebalance)");
            }
            boolean z = bool == null;
            String forwardUrl = ((RequestTargetException) cause).forwardUrl();
            if (forwardUrl == null) {
                throw new ConnectRestException(Response.Status.CONFLICT.getStatusCode(), "Cannot complete request momentarily due to no known leader URL, likely because a rebalance was underway.");
            }
            String uri = UriBuilder.fromUri(forwardUrl).path(str).queryParam("forward", Boolean.valueOf(z)).build(new Object[0]).toString();
            log.debug("Forwarding request {} {} {}", uri, str2, obj);
            return translator.translate(RestClient.httpRequest(uri, str2, httpHeaders, obj, typeReference, this.config));
        } catch (TimeoutException e3) {
            throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out");
        }
    }

    private <T> T completeOrForwardRequest(FutureCallback<T> futureCallback, String str, String str2, HttpHeaders httpHeaders, Object obj, TypeReference<T> typeReference, Boolean bool) throws Throwable {
        return (T) completeOrForwardRequest(futureCallback, str, str2, httpHeaders, obj, typeReference, new IdentityTranslator(), bool);
    }

    private <T> T completeOrForwardRequest(FutureCallback<T> futureCallback, String str, String str2, HttpHeaders httpHeaders, Object obj, Boolean bool) throws Throwable {
        return (T) completeOrForwardRequest(futureCallback, str, str2, httpHeaders, obj, null, new IdentityTranslator(), bool);
    }
}
