/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.mcp.server.test;

import io.quarkiverse.mcp.server.sse.client.SseClient;
import io.vertx.core.MultiMap;
import io.vertx.core.json.JsonObject;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

class McpStremableRequest {
    private final HttpClient httpClient;
    private final URI mcpEndpoint;
    private final MultiMap headers;
    private final Consumer<JsonObject> requests;
    private final Consumer<JsonObject> responses;
    private final Consumer<JsonObject> notifications;
    private final AtomicReference<HttpHeaders> responseHeaders;

    McpStremableRequest(HttpClient httpClient, URI mcpEndpoint, MultiMap headers, Consumer<JsonObject> responses, Consumer<JsonObject> notifications, Consumer<JsonObject> requests) {
        this.httpClient = httpClient;
        this.mcpEndpoint = mcpEndpoint;
        this.headers = headers;
        this.requests = requests;
        this.responses = responses;
        this.notifications = notifications;
        this.responseHeaders = new AtomicReference();
    }

    protected void acceptMessage(JsonObject message) {
        if (message.containsKey("id")) {
            if (message.containsKey("result") || message.containsKey("error")) {
                this.responses.accept(message);
            } else {
                this.requests.accept(message);
            }
        } else {
            this.notifications.accept(message);
        }
    }

    protected void process(SseClient.SseEvent event) {
        if ("message".equals(event.name())) {
            JsonObject json = new JsonObject(event.data());
            this.acceptMessage(json);
        }
    }

    void send(String body) {
        HttpRequest.Builder builder = HttpRequest.newBuilder().uri(this.mcpEndpoint).version(HttpClient.Version.HTTP_1_1).header("Accept", "text/event-stream").header("Accept", "application/json").POST(HttpRequest.BodyPublishers.ofString(body));
        this.headers.forEach(builder::header);
        HttpRequest request = builder.build();
        CompletableFuture cf = this.httpClient.sendAsync(request, responseInfo -> {
            this.responseHeaders.set(responseInfo.headers());
            String contentType = responseInfo.headers().firstValue("Content-Type").orElseThrow();
            if ("application/json".equals(contentType)) {
                return (HttpResponse.BodySubscriber)McpStremableRequest.cast(HttpResponse.BodySubscribers.ofString(StandardCharsets.UTF_8));
            }
            if ("text/event-stream".equals(contentType)) {
                return (HttpResponse.BodySubscriber)McpStremableRequest.cast(HttpResponse.BodySubscribers.fromLineSubscriber((Flow.Subscriber<? super String>)new SseClient.SseEventSubscriber(this::process)));
            }
            throw new IllegalStateException("Unsupported content type: " + contentType);
        });
        cf.whenComplete((r, t) -> {
            if (t == null) {
                JsonObject json = new JsonObject(r.body().toString());
                this.acceptMessage(json);
            }
        });
    }

    HttpHeaders responseHeaders() {
        return this.responseHeaders.get();
    }

    static <T> T cast(Object obj) {
        return (T)obj;
    }
}

