package org.apache.pulsar.client.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.JsonParseException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameters(commandDescription = "Produce messages to a specified topic")
/* loaded from: input_file:org/apache/pulsar/client/cli/CmdProduce.class */
public class CmdProduce {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarClientTool.class);
    private static final int MAX_MESSAGES = 1000;

    @Parameter(description = "TopicName", required = true)
    private List<String> mainOptions;

    @Parameter(names = {"-k", "--key"}, description = "message key to add ")
    private String key;
    private ClientBuilder clientBuilder;
    private Authentication authentication;
    private String serviceURL;

    @Parameter(names = {"-m", "--messages"}, description = "Messages to send, either -m or -f must be specified. The default separator is comma", splitter = NoSplitter.class)
    private List<String> messages = Lists.newArrayList();

    @Parameter(names = {"-f", "--files"}, description = "Comma separated file paths to send, either -m or -f must be specified.")
    private List<String> messageFileNames = Lists.newArrayList();

    @Parameter(names = {"-n", "--num-produce"}, description = "Number of times to send message(s), the count of messages/files * num-produce should below than 1000.")
    private int numTimesProduce = 1;

    @Parameter(names = {"-r", "--rate"}, description = "Rate (in msg/sec) at which to produce, value 0 means to produce messages as fast as possible.")
    private double publishRate = 0.0d;

    @Parameter(names = {"-c", "--chunking"}, description = "Should split the message and publish in chunks if message size is larger than allowed max size")
    private boolean chunkingAllowed = false;

    @Parameter(names = {"-s", "--separator"}, description = "Character to split messages string on default is comma")
    private String separator = ",";

    @Parameter(names = {"-p", "--properties"}, description = "Properties to add, Comma separated key=value string, like k1=v1,k2=v2.")
    private List<String> properties = Lists.newArrayList();

    @Parameter(names = {"-ekn", "--encryption-key-name"}, description = "The public key name to encrypt payload")
    private String encKeyName = null;

    @Parameter(names = {"-ekv", "--encryption-key-value"}, description = "The URI of public key to encrypt payload, for example file:///path/to/public.key or data:application/x-pem-file;base64,*****")
    private String encKeyValue = null;

    @WebSocket(maxTextMessageSize = 65536)
    /* loaded from: input_file:org/apache/pulsar/client/cli/CmdProduce$ProducerSocket.class */
    public static class ProducerSocket {
        private final CountDownLatch closeLatch = new CountDownLatch(1);
        private Session session;
        private CompletableFuture<Void> connected;
        private volatile CompletableFuture<Void> result;

        public ProducerSocket(CompletableFuture<Void> completableFuture) {
            this.connected = completableFuture;
        }

        public CompletableFuture<Void> send(int i, byte[] bArr) throws Exception {
            this.session.getRemote().sendString(getTestJsonPayload(i, bArr));
            this.result = new CompletableFuture<>();
            return this.result;
        }

        private static String getTestJsonPayload(int i, byte[] bArr) throws JsonProcessingException {
            ProducerMessage producerMessage = new ProducerMessage();
            producerMessage.payload = Base64.getEncoder().encodeToString(bArr);
            producerMessage.key = Integer.toString(i);
            return ObjectMapperFactory.getThreadLocal().writeValueAsString(producerMessage);
        }

        public boolean awaitClose(int i, TimeUnit timeUnit) throws InterruptedException {
            return this.closeLatch.await(i, timeUnit);
        }

        @OnWebSocketClose
        public void onClose(int i, String str) {
            CmdProduce.LOG.info("Connection closed: {} - {}", Integer.valueOf(i), str);
            this.session = null;
            this.closeLatch.countDown();
        }

        @OnWebSocketConnect
        public void onConnect(Session session) {
            CmdProduce.LOG.info("Got connect: {}", session);
            this.session = session;
            this.connected.complete(null);
        }

        @OnWebSocketMessage
        public synchronized void onMessage(String str) throws JsonParseException {
            CmdProduce.LOG.info("ack= {}", str);
            if (this.result != null) {
                this.result.complete(null);
            }
        }

        public RemoteEndpoint getRemote() {
            return this.session.getRemote();
        }

        public Session getSession() {
            return this.session;
        }

        public void close() {
            this.session.close();
        }
    }

    public void updateConfig(ClientBuilder clientBuilder, Authentication authentication, String str) {
        this.clientBuilder = clientBuilder;
        this.authentication = authentication;
        this.serviceURL = str;
    }

    private List<byte[]> generateMessageBodies(List<String> list, List<String> list2) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getBytes());
        }
        try {
            Iterator<String> it2 = list2.iterator();
            while (it2.hasNext()) {
                arrayList.add(Files.readAllBytes(Paths.get(it2.next(), new String[0])));
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
        return arrayList;
    }

    public int run() throws PulsarClientException {
        if (this.mainOptions.size() != 1) {
            throw new ParameterException("Please provide one and only one topic name.");
        }
        if (this.numTimesProduce <= 0) {
            throw new ParameterException("Number of times need to be positive number.");
        }
        if (this.messages.size() > 0) {
            this.messages = Collections.unmodifiableList(Arrays.asList(this.messages.get(0).split(this.separator)));
        }
        if (this.messages.size() == 0 && this.messageFileNames.size() == 0) {
            throw new ParameterException("Please supply message content with either --messages or --files");
        }
        int size = (this.messages.size() + this.messageFileNames.size()) * this.numTimesProduce;
        if (size > MAX_MESSAGES) {
            throw new ParameterException("Attempting to send " + size + " messages. Please do not send more than " + MAX_MESSAGES + " messages");
        }
        String str = this.mainOptions.get(0);
        return this.serviceURL.startsWith("ws") ? publishToWebSocket(str) : publish(str);
    }

    private int publish(String str) {
        int i = 0;
        int i2 = 0;
        try {
            try {
                PulsarClient build = this.clientBuilder.build();
                ProducerBuilder producerBuilder = build.newProducer().topic(str);
                if (this.chunkingAllowed) {
                    producerBuilder.enableChunking(true);
                    producerBuilder.enableBatching(false);
                }
                if (StringUtils.isNotBlank(this.encKeyName) && StringUtils.isNotBlank(this.encKeyValue)) {
                    producerBuilder.addEncryptionKey(this.encKeyName);
                    producerBuilder.defaultCryptoKeyReader(this.encKeyValue);
                }
                Producer create = producerBuilder.create();
                List<byte[]> generateMessageBodies = generateMessageBodies(this.messages, this.messageFileNames);
                RateLimiter create2 = this.publishRate > 0.0d ? RateLimiter.create(this.publishRate) : null;
                HashMap hashMap = new HashMap();
                Iterator<String> it = this.properties.iterator();
                while (it.hasNext()) {
                    String[] split = it.next().split("=");
                    hashMap.put(split[0], split[1]);
                }
                for (int i3 = 0; i3 < this.numTimesProduce; i3++) {
                    for (byte[] bArr : generateMessageBodies) {
                        if (create2 != null) {
                            create2.acquire();
                        }
                        TypedMessageBuilder newMessage = create.newMessage();
                        if (!hashMap.isEmpty()) {
                            newMessage.properties(hashMap);
                        }
                        if (this.key != null && !this.key.isEmpty()) {
                            newMessage.key(this.key);
                        }
                        newMessage.value(bArr).send();
                        i++;
                    }
                }
                build.close();
                LOG.info("{} messages successfully produced", Integer.valueOf(i));
            } catch (Exception e) {
                LOG.error("Error while producing messages");
                LOG.error(e.getMessage(), e);
                i2 = -1;
                LOG.info("{} messages successfully produced", Integer.valueOf(i));
            }
            return i2;
        } catch (Throwable th) {
            LOG.info("{} messages successfully produced", Integer.valueOf(i));
            throw th;
        }
    }

    private int publishToWebSocket(String str) {
        int i = 0;
        int i2 = 0;
        TopicName topicName = TopicName.get(str);
        URI create = URI.create(this.serviceURL + (this.serviceURL.endsWith("/") ? "" : "/") + "ws/producer/" + String.format("%s/%s/" + (StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster() + "/") + "%s/%s", topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName()));
        WebSocketClient webSocketClient = new WebSocketClient(new SslContextFactory(true));
        ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
        try {
            if (this.authentication != null) {
                this.authentication.start();
                AuthenticationDataProvider authData = this.authentication.getAuthData();
                if (authData.hasDataForHttp()) {
                    for (Map.Entry entry : authData.getHttpHeaders()) {
                        clientUpgradeRequest.setHeader((String) entry.getKey(), (String) entry.getValue());
                    }
                }
            }
            CompletableFuture completableFuture = new CompletableFuture();
            ProducerSocket producerSocket = new ProducerSocket(completableFuture);
            try {
                webSocketClient.start();
                try {
                    try {
                        LOG.info("Trying to create websocket session.. on {},{}", create, clientUpgradeRequest);
                        webSocketClient.connect(producerSocket, create, clientUpgradeRequest);
                        completableFuture.get();
                        try {
                            List<byte[]> generateMessageBodies = generateMessageBodies(this.messages, this.messageFileNames);
                            RateLimiter create2 = this.publishRate > 0.0d ? RateLimiter.create(this.publishRate) : null;
                            for (int i3 = 0; i3 < this.numTimesProduce; i3++) {
                                int i4 = i3 * 10;
                                for (byte[] bArr : generateMessageBodies) {
                                    if (create2 != null) {
                                        create2.acquire();
                                    }
                                    int i5 = i4;
                                    i4++;
                                    producerSocket.send(i5, bArr).get(30L, TimeUnit.SECONDS);
                                    i++;
                                }
                            }
                            producerSocket.close();
                            LOG.info("{} messages successfully produced", Integer.valueOf(i));
                        } catch (Exception e) {
                            LOG.error("Error while producing messages");
                            LOG.error(e.getMessage(), e);
                            i2 = -1;
                            LOG.info("{} messages successfully produced", Integer.valueOf(i));
                        }
                        return i2;
                    } catch (Exception e2) {
                        LOG.error("Failed to create web-socket session", e2);
                        return -1;
                    }
                } catch (Throwable th) {
                    LOG.info("{} messages successfully produced", Integer.valueOf(i));
                    throw th;
                }
            } catch (Exception e3) {
                LOG.error("Failed to start websocket-client", e3);
                return -1;
            }
        } catch (Exception e4) {
            LOG.error("Authentication plugin error: " + e4.getMessage());
            return -1;
        }
    }
}
