package io.envoyproxy.controlplane.server;

import com.google.common.base.Strings;
import io.envoyproxy.controlplane.cache.Resources;
import io.envoyproxy.controlplane.cache.Response;
import io.envoyproxy.controlplane.cache.Watch;
import io.envoyproxy.controlplane.server.exception.RequestException;
import io.envoyproxy.envoy.api.v2.DiscoveryRequest;
import io.envoyproxy.envoy.api.v2.DiscoveryResponse;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/envoyproxy/controlplane/server/DiscoveryRequestStreamObserver.class */
public abstract class DiscoveryRequestStreamObserver implements StreamObserver<DiscoveryRequest> {
    private static final AtomicLongFieldUpdater<DiscoveryRequestStreamObserver> streamNonceUpdater = AtomicLongFieldUpdater.newUpdater(DiscoveryRequestStreamObserver.class, "streamNonce");
    private static final Logger LOGGER = LoggerFactory.getLogger(DiscoveryServer.class);
    final long streamId;
    private final String defaultTypeUrl;
    private final StreamObserver<DiscoveryResponse> responseObserver;
    private final Executor executor;
    private final DiscoveryServer discoverySever;
    private volatile long streamNonce = 0;
    private volatile boolean isClosing;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DiscoveryRequestStreamObserver(String str, StreamObserver<DiscoveryResponse> streamObserver, long j, Executor executor, DiscoveryServer discoveryServer) {
        this.defaultTypeUrl = str;
        this.responseObserver = streamObserver;
        this.streamId = j;
        this.executor = executor;
        this.discoverySever = discoveryServer;
    }

    @Override // 
    public void onNext(DiscoveryRequest discoveryRequest) {
        String typeUrl = discoveryRequest.getTypeUrl().isEmpty() ? this.defaultTypeUrl : discoveryRequest.getTypeUrl();
        String responseNonce = discoveryRequest.getResponseNonce();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[{}] request {}[{}] with nonce {} from version {}", new Object[]{Long.valueOf(this.streamId), typeUrl, String.join(", ", (Iterable<? extends CharSequence>) discoveryRequest.getResourceNamesList()), responseNonce, discoveryRequest.getVersionInfo()});
        }
        try {
            this.discoverySever.callbacks.forEach(discoveryServerCallbacks -> {
                discoveryServerCallbacks.onStreamRequest(this.streamId, discoveryRequest);
            });
            DiscoveryResponse latestResponse = latestResponse(typeUrl);
            String nonce = latestResponse == null ? null : latestResponse.getNonce();
            if (Strings.isNullOrEmpty(nonce) || nonce.equals(responseNonce)) {
                if (!discoveryRequest.hasErrorDetail() && latestResponse != null) {
                    setAckedResources(typeUrl, (Set) latestResponse.getResourcesList().stream().map(Resources::getResourceName).collect(Collectors.toSet()));
                }
                computeWatch(typeUrl, () -> {
                    return this.discoverySever.configWatcher.createWatch(ads(), discoveryRequest, ackedResources(typeUrl), response -> {
                        this.executor.execute(() -> {
                            send(response, typeUrl);
                        });
                    });
                });
            }
        } catch (RequestException e) {
            closeWithError(e);
        }
    }

    public void onError(Throwable th) {
        if (!Status.fromThrowable(th).getCode().equals(Status.CANCELLED.getCode())) {
            LOGGER.error("[{}] stream closed with error", Long.valueOf(this.streamId), th);
        }
        try {
            this.discoverySever.callbacks.forEach(discoveryServerCallbacks -> {
                discoveryServerCallbacks.onStreamCloseWithError(this.streamId, this.defaultTypeUrl, th);
            });
            closeWithError(Status.fromThrowable(th).asException());
        } finally {
            cancel();
        }
    }

    public void onCompleted() {
        LOGGER.debug("[{}] stream closed", Long.valueOf(this.streamId));
        try {
            this.discoverySever.callbacks.forEach(discoveryServerCallbacks -> {
                discoveryServerCallbacks.onStreamClose(this.streamId, this.defaultTypeUrl);
            });
            synchronized (this.responseObserver) {
                if (!this.isClosing) {
                    this.isClosing = true;
                    this.responseObserver.onCompleted();
                }
            }
        } finally {
            cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCancelled() {
        LOGGER.info("[{}] stream cancelled", Long.valueOf(this.streamId));
        cancel();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeWithError(Throwable th) {
        synchronized (this.responseObserver) {
            if (!this.isClosing) {
                this.isClosing = true;
                this.responseObserver.onError(th);
            }
        }
        cancel();
    }

    private void send(Response response, String str) {
        String l = Long.toString(streamNonceUpdater.getAndIncrement(this));
        DiscoveryResponse build = DiscoveryResponse.newBuilder().setVersionInfo(response.version()).addAllResources(this.discoverySever.protoResourcesSerializer.serialize(response.resources())).setTypeUrl(str).setNonce(l).build();
        LOGGER.debug("[{}] response {} with nonce {} version {}", new Object[]{Long.valueOf(this.streamId), str, l, response.version()});
        this.discoverySever.callbacks.forEach(discoveryServerCallbacks -> {
            discoveryServerCallbacks.onStreamResponse(this.streamId, response.request(), build);
        });
        setLatestResponse(str, build);
        synchronized (this.responseObserver) {
            if (!this.isClosing) {
                try {
                    this.responseObserver.onNext(build);
                } catch (StatusRuntimeException e) {
                    if (!Status.CANCELLED.getCode().equals(e.getStatus().getCode())) {
                        throw e;
                    }
                }
            }
        }
    }

    abstract void cancel();

    abstract boolean ads();

    abstract DiscoveryResponse latestResponse(String str);

    abstract void setLatestResponse(String str, DiscoveryResponse discoveryResponse);

    abstract Set<String> ackedResources(String str);

    abstract void setAckedResources(String str, Set<String> set);

    abstract void computeWatch(String str, Supplier<Watch> supplier);
}
