package org.apache.pulsar.websocket;

import java.io.IOException;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.pulsar.shade.com.google.common.base.Enums;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.base.Splitter;
import org.apache.pulsar.shade.javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.shade.org.asynchttpclient.uri.Uri;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.Session;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback;
import org.apache.pulsar.shade.org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.apache.pulsar.websocket.data.EndOfTopicResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/websocket/ConsumerHandler.class */
public class ConsumerHandler extends AbstractWebSocketHandler {
    private String subscription;
    private SubscriptionType subscriptionType;
    private SubscriptionMode subscriptionMode;
    private Consumer<byte[]> consumer;
    private int maxPendingMessages;
    private final AtomicInteger pendingMessages;
    private final boolean pullMode;
    private final LongAdder numMsgsDelivered;
    private final LongAdder numBytesDelivered;
    private final LongAdder numMsgsAcked;
    private volatile long msgDeliveredCounter;
    private static final AtomicLongFieldUpdater<ConsumerHandler> MSG_DELIVERED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, "msgDeliveredCounter");
    private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class);

    public ConsumerHandler(WebSocketService webSocketService, HttpServletRequest httpServletRequest, ServletUpgradeResponse servletUpgradeResponse) {
        super(webSocketService, httpServletRequest, servletUpgradeResponse);
        this.subscription = null;
        this.maxPendingMessages = 0;
        this.pendingMessages = new AtomicInteger();
        this.msgDeliveredCounter = 0L;
        this.numMsgsDelivered = new LongAdder();
        this.numBytesDelivered = new LongAdder();
        this.numMsgsAcked = new LongAdder();
        this.pullMode = Boolean.valueOf(this.queryParams.get("pullMode")).booleanValue();
        try {
            this.subscription = extractSubscription(httpServletRequest);
            ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) getConsumerConfiguration(webSocketService.getPulsarClient());
            if (!this.pullMode) {
                this.maxPendingMessages = consumerBuilderImpl.getConf().getReceiverQueueSize() == 0 ? 1 : consumerBuilderImpl.getConf().getReceiverQueueSize();
            }
            this.subscriptionType = consumerBuilderImpl.getConf().getSubscriptionType();
            this.subscriptionMode = consumerBuilderImpl.getConf().getSubscriptionMode();
            if (checkAuth(servletUpgradeResponse)) {
                this.consumer = consumerBuilderImpl.topic(this.topic.toString()).subscriptionName(this.subscription).subscribe();
                if (!this.service.addConsumer(this)) {
                    log.warn("[{}:{}] Failed to add consumer handler for topic {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.topic});
                }
            }
        } catch (Exception e) {
            log.warn("[{}:{}] Failed in creating subscription {} on topic {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), this.subscription, this.topic, e});
            try {
                servletUpgradeResponse.sendError(getErrorCode(e), getErrorMessage(e));
            } catch (IOException e2) {
                log.warn("[{}:{}] Failed to send error: {}", new Object[]{httpServletRequest.getRemoteAddr(), Integer.valueOf(httpServletRequest.getRemotePort()), e2.getMessage(), e2});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void receiveMessage() {
        if (log.isDebugEnabled()) {
            log.debug("[{}:{}] [{}] [{}] Receive next message", new Object[]{this.request.getRemoteAddr(), Integer.valueOf(this.request.getRemotePort()), this.topic, this.subscription});
        }
        this.consumer.receiveAsync().thenAccept(message -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Got message {}", new Object[]{getSession().getRemoteAddress(), this.topic, this.subscription, message.getMessageId()});
            }
            ConsumerMessage consumerMessage = new ConsumerMessage();
            consumerMessage.messageId = Base64.getEncoder().encodeToString(message.getMessageId().toByteArray());
            consumerMessage.payload = Base64.getEncoder().encodeToString(message.getData());
            consumerMessage.properties = message.getProperties();
            consumerMessage.publishTime = DateFormatter.format(message.getPublishTime());
            consumerMessage.redeliveryCount = message.getRedeliveryCount();
            if (message.getEventTime() != 0) {
                consumerMessage.eventTime = DateFormatter.format(message.getEventTime());
            }
            if (message.hasKey()) {
                consumerMessage.key = message.getKey();
            }
            final long length = message.getData().length;
            try {
                getSession().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(consumerMessage), new WriteCallback() { // from class: org.apache.pulsar.websocket.ConsumerHandler.1
                    @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback
                    public void writeFailed(Throwable th) {
                        ConsumerHandler.log.warn("[{}/{}] Failed to deliver msg to {} {}", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString(), th.getMessage()});
                        ConsumerHandler.this.pendingMessages.decrementAndGet();
                        ConsumerHandler.this.service.getExecutor().execute(() -> {
                            ConsumerHandler.this.receiveMessage();
                        });
                    }

                    @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback
                    public void writeSuccess() {
                        if (ConsumerHandler.log.isDebugEnabled()) {
                            ConsumerHandler.log.debug("[{}/{}] message is delivered successfully to {} ", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString()});
                        }
                        ConsumerHandler.this.updateDeliverMsgStat(length);
                    }
                });
            } catch (JsonProcessingException e) {
                close(WebSocketError.FailedToSerializeToJSON);
            }
            if (this.pendingMessages.incrementAndGet() < this.maxPendingMessages) {
                this.service.getExecutor().execute(this::receiveMessage);
            }
        }).exceptionally(th -> {
            if (th.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                log.info("[{}/{}] Consumer was closed while receiving msg from broker", this.consumer.getTopic(), this.subscription);
                return null;
            }
            log.warn("[{}/{}] Error occurred while consumer handler was delivering msg to {}: {}", new Object[]{this.consumer.getTopic(), this.subscription, getRemote().getInetSocketAddress().toString(), th.getMessage()});
            return null;
        });
    }

    @Override // org.apache.pulsar.websocket.AbstractWebSocketHandler, org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketAdapter, org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketConnectionListener
    public void onWebSocketConnect(Session session) {
        super.onWebSocketConnect(session);
        if (this.pullMode) {
            return;
        }
        receiveMessage();
    }

    @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketAdapter, org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WebSocketListener
    public void onWebSocketText(String str) {
        super.onWebSocketText(str);
        try {
            ConsumerCommand consumerCommand = (ConsumerCommand) ObjectMapperFactory.getThreadLocal().readValue(str, ConsumerCommand.class);
            if ("permit".equals(consumerCommand.type)) {
                handlePermit(consumerCommand);
            } else if ("unsubscribe".equals(consumerCommand.type)) {
                handleUnsubscribe(consumerCommand);
            } else if ("negativeAcknowledge".equals(consumerCommand.type)) {
                handleNack(consumerCommand);
            } else if ("isEndOfTopic".equals(consumerCommand.type)) {
                handleEndOfTopic();
            } else {
                handleAck(consumerCommand);
            }
        } catch (IOException e) {
            log.warn("Failed to deserialize message id: {}", str, e);
            close(WebSocketError.FailedToDeserializeFromJSON);
        }
    }

    private void handleEndOfTopic() {
        try {
            getSession().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(new EndOfTopicResponse(this.consumer.hasReachedEndOfTopic())), new WriteCallback() { // from class: org.apache.pulsar.websocket.ConsumerHandler.2
                @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback
                public void writeFailed(Throwable th) {
                    ConsumerHandler.log.warn("[{}/{}] Failed to send end of topic msg to {} due to {}", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString(), th.getMessage()});
                }

                @Override // org.apache.pulsar.shade.org.eclipse.jetty.websocket.api.WriteCallback
                public void writeSuccess() {
                    if (ConsumerHandler.log.isDebugEnabled()) {
                        ConsumerHandler.log.debug("[{}/{}] End of topic message is delivered successfully to {} ", new Object[]{ConsumerHandler.this.consumer.getTopic(), ConsumerHandler.this.subscription, ConsumerHandler.this.getRemote().getInetSocketAddress().toString()});
                    }
                }
            });
        } catch (JsonProcessingException e) {
            log.warn("[{}] Failed to generate end of topic response: {}", this.consumer.getTopic(), e.getMessage());
        } catch (Exception e2) {
            log.warn("[{}] Failed to send end of topic response: {}", this.consumer.getTopic(), e2.getMessage());
        }
    }

    private void handleUnsubscribe(ConsumerCommand consumerCommand) throws PulsarClientException {
        this.consumer.unsubscribe();
    }

    private void checkResumeReceive() {
        if (this.pullMode || this.pendingMessages.getAndDecrement() < this.maxPendingMessages) {
            return;
        }
        receiveMessage();
    }

    private void handleAck(ConsumerCommand consumerCommand) throws IOException {
        this.consumer.acknowledgeAsync(MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(consumerCommand.messageId), this.topic.toString())).thenAccept(r3 -> {
            this.numMsgsAcked.increment();
        });
        checkResumeReceive();
    }

    private void handleNack(ConsumerCommand consumerCommand) throws IOException {
        MessageId fromByteArrayWithTopic = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(consumerCommand.messageId), this.topic.toString());
        System.out.println(fromByteArrayWithTopic);
        this.consumer.negativeAcknowledge(fromByteArrayWithTopic);
        checkResumeReceive();
    }

    private void handlePermit(ConsumerCommand consumerCommand) throws IOException {
        if (consumerCommand.permitMessages == null) {
            throw new IOException("Missing required permitMessages field for 'permit' command");
        }
        if (!this.pullMode || this.pendingMessages.getAndAdd(-consumerCommand.permitMessages.intValue()) < 0) {
            return;
        }
        receiveMessage();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.consumer != null) {
            if (!this.service.removeConsumer(this)) {
                log.warn("[{}] Failed to remove consumer handler", this.consumer.getTopic());
            }
            this.consumer.closeAsync().thenAccept(r5 -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Closed consumer asynchronously", this.consumer.getTopic());
                }
            }).exceptionally(th -> {
                log.warn("[{}] Failed to close consumer", this.consumer.getTopic(), th);
                return null;
            });
        }
    }

    public Consumer<byte[]> getConsumer() {
        return this.consumer;
    }

    public String getSubscription() {
        return this.subscription;
    }

    public SubscriptionType getSubscriptionType() {
        return this.subscriptionType;
    }

    public SubscriptionMode getSubscriptionMode() {
        return this.subscriptionMode;
    }

    public long getAndResetNumMsgsDelivered() {
        return this.numMsgsDelivered.sumThenReset();
    }

    public long getAndResetNumBytesDelivered() {
        return this.numBytesDelivered.sumThenReset();
    }

    public long getAndResetNumMsgsAcked() {
        return this.numMsgsAcked.sumThenReset();
    }

    public long getMsgDeliveredCounter() {
        return this.msgDeliveredCounter;
    }

    protected void updateDeliverMsgStat(long j) {
        this.numMsgsDelivered.increment();
        MSG_DELIVERED_COUNTER_UPDATER.incrementAndGet(this);
        this.numBytesDelivered.add(j);
    }

    protected ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient pulsarClient) {
        ConsumerBuilder<byte[]> newConsumer = pulsarClient.newConsumer();
        if (this.queryParams.containsKey("ackTimeoutMillis")) {
            newConsumer.ackTimeout(Integer.parseInt(this.queryParams.get("ackTimeoutMillis")), TimeUnit.MILLISECONDS);
        }
        if (this.queryParams.containsKey("subscriptionType")) {
            Preconditions.checkArgument(Enums.getIfPresent(SubscriptionType.class, this.queryParams.get("subscriptionType")).isPresent(), "Invalid subscriptionType %s", this.queryParams.get("subscriptionType"));
            newConsumer.subscriptionType(SubscriptionType.valueOf(this.queryParams.get("subscriptionType")));
        }
        if (this.queryParams.containsKey("subscriptionMode")) {
            Preconditions.checkArgument(Enums.getIfPresent(SubscriptionMode.class, this.queryParams.get("subscriptionMode")).isPresent(), "Invalid subscriptionMode %s", this.queryParams.get("subscriptionMode"));
            newConsumer.subscriptionMode(SubscriptionMode.valueOf(this.queryParams.get("subscriptionMode")));
        }
        if (this.queryParams.containsKey("receiverQueueSize")) {
            newConsumer.receiverQueueSize(Math.min(Integer.parseInt(this.queryParams.get("receiverQueueSize")), 1000));
        }
        if (this.queryParams.containsKey("consumerName")) {
            newConsumer.consumerName(this.queryParams.get("consumerName"));
        }
        if (this.queryParams.containsKey("priorityLevel")) {
            newConsumer.priorityLevel(Integer.parseInt(this.queryParams.get("priorityLevel")));
        }
        if (this.queryParams.containsKey("maxRedeliverCount") || this.queryParams.containsKey("deadLetterTopic")) {
            DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
            if (this.queryParams.containsKey("maxRedeliverCount")) {
                builder.maxRedeliverCount(Integer.parseInt(this.queryParams.get("maxRedeliverCount"))).deadLetterTopic(String.format("%s-%s-DLQ", this.topic, this.subscription));
            }
            if (this.queryParams.containsKey("deadLetterTopic")) {
                builder.deadLetterTopic(this.queryParams.get("deadLetterTopic"));
            }
            if (this.queryParams.containsKey("negativeAckRedeliveryDelay")) {
                newConsumer.negativeAckRedeliveryDelay(Integer.parseInt(this.queryParams.get("negativeAckRedeliveryDelay")), TimeUnit.MILLISECONDS);
            }
            newConsumer.deadLetterPolicy(builder.build());
        }
        if (this.queryParams.containsKey("cryptoFailureAction")) {
            String str = this.queryParams.get("cryptoFailureAction");
            try {
                newConsumer.cryptoFailureAction(ConsumerCryptoFailureAction.valueOf(str));
            } catch (Exception e) {
                log.warn("Failed to configure cryptoFailureAction {} , {}", str, e.getMessage());
            }
        }
        return newConsumer;
    }

    @Override // org.apache.pulsar.websocket.AbstractWebSocketHandler
    protected Boolean isAuthorized(String str, AuthenticationDataSource authenticationDataSource) throws Exception {
        return Boolean.valueOf(this.service.getAuthorizationService().canConsume(this.topic, str, authenticationDataSource, this.subscription));
    }

    public static String extractSubscription(HttpServletRequest httpServletRequest) {
        List<String> splitToList = Splitter.on("/").splitToList(httpServletRequest.getRequestURI());
        Preconditions.checkArgument(splitToList.size() == 9, "Invalid topic name format");
        Preconditions.checkArgument(splitToList.get(1).equals(Uri.WS));
        int i = splitToList.get(2).equals("v2") ? 4 : 3;
        Preconditions.checkArgument(splitToList.get(i).equals("persistent") || splitToList.get(i).equals("non-persistent"));
        Preconditions.checkArgument(splitToList.get(8).length() > 0, "Empty subscription name");
        return Codec.decode(splitToList.get(8));
    }
}
