package org.apache.pulsar.io.kinesis;

import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.Recycler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.kinesis.KinesisSinkConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "kinesis", type = IOType.SINK, help = "A sink connector that copies messages from Pulsar to Kinesis", configClass = KinesisSinkConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/kinesis/KinesisSink.class */
public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> {
    private KinesisProducer kinesisProducer;
    private KinesisSinkConfig kinesisSinkConfig;
    private String streamName;
    private static final String defaultPartitionedKey = "default";
    private static final int maxPartitionedKeyLength = 256;
    private SinkContext sinkContext;
    private ScheduledExecutorService scheduledExecutor;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private volatile int previousPublishFailed = 0;
    public static final String METRICS_TOTAL_INCOMING = "_kinesis_total_incoming_";
    public static final String METRICS_TOTAL_INCOMING_BYTES = "_kinesis_total_incoming_bytes_";
    public static final String METRICS_TOTAL_SUCCESS = "_kinesis_total_success_";
    public static final String METRICS_TOTAL_FAILURE = "_kinesis_total_failure_";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KinesisSink.class);
    private static final AtomicIntegerFieldUpdater<KinesisSink> IS_PUBLISH_FAILED = AtomicIntegerFieldUpdater.newUpdater(KinesisSink.class, "previousPublishFailed");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/io/kinesis/KinesisSink$ProducerSendCallback.class */
    public static final class ProducerSendCallback implements FutureCallback<UserRecordResult> {
        private Record<byte[]> resultContext;
        private long startTime;
        private final Recycler.Handle<ProducerSendCallback> recyclerHandle;
        private KinesisSink kinesisSink;
        private Backoff backoff;
        private String partitionedKey;
        private ByteBuffer data;
        private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>() { // from class: org.apache.pulsar.io.kinesis.KinesisSink.ProducerSendCallback.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.netty.util.Recycler
            public ProducerSendCallback newObject(Recycler.Handle<ProducerSendCallback> handle) {
                return new ProducerSendCallback(handle);
            }
        };

        private ProducerSendCallback(Recycler.Handle<ProducerSendCallback> handle) {
            this.startTime = 0L;
            this.recyclerHandle = handle;
        }

        static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> record, long j, String str, ByteBuffer byteBuffer) {
            ProducerSendCallback producerSendCallback = RECYCLER.get();
            producerSendCallback.resultContext = record;
            producerSendCallback.kinesisSink = kinesisSink;
            producerSendCallback.startTime = j;
            producerSendCallback.partitionedKey = str;
            producerSendCallback.data = byteBuffer;
            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && producerSendCallback.backoff == null) {
                producerSendCallback.backoff = new Backoff(kinesisSink.kinesisSinkConfig.getRetryInitialDelayInMillis(), TimeUnit.MILLISECONDS, kinesisSink.kinesisSinkConfig.getRetryMaxDelayInMillis(), TimeUnit.MILLISECONDS, 0L, TimeUnit.SECONDS);
            }
            return producerSendCallback;
        }

        private void recycle() {
            this.resultContext = null;
            this.kinesisSink = null;
            this.startTime = 0L;
            if (this.backoff != null) {
                this.backoff.reset();
            }
            this.partitionedKey = null;
            this.data = null;
            this.recyclerHandle.recycle(this);
        }

        @Override // com.google.common.util.concurrent.FutureCallback
        public void onSuccess(UserRecordResult userRecordResult) {
            if (KinesisSink.LOG.isDebugEnabled()) {
                KinesisSink.LOG.debug("Successfully published message for {}-{} with latency {}", this.kinesisSink.streamName, userRecordResult.getShardId(), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.startTime)));
            }
            if (this.kinesisSink.sinkContext != null) {
                this.kinesisSink.sinkContext.recordMetric(KinesisSink.METRICS_TOTAL_SUCCESS, 1.0d);
            }
            this.kinesisSink.previousPublishFailed = 0;
            this.resultContext.ack();
            recycle();
        }

        @Override // com.google.common.util.concurrent.FutureCallback
        public void onFailure(Throwable th) {
            if (th instanceof UserRecordFailedException) {
                StringBuffer stringBuffer = new StringBuffer();
                ((UserRecordFailedException) th).getResult().getAttempts().forEach(attempt -> {
                    stringBuffer.append(String.format("errorMessage:%s, errorCode:%s, delay:%d, duration:%d;", attempt.getErrorMessage(), attempt.getErrorCode(), Integer.valueOf(attempt.getDelay()), Integer.valueOf(attempt.getDuration())));
                });
                KinesisSink.LOG.error("[{}] Failed to published message for replicator of {}-{}: Attempts:{}", this.kinesisSink.streamName, this.resultContext.getPartitionId(), this.resultContext.getRecordSequence(), stringBuffer.toString());
            } else if (StringUtils.isEmpty(th.getMessage())) {
                KinesisSink.LOG.error("[{}] Failed to published message for replicator of {}-{}", this.kinesisSink.streamName, this.resultContext.getPartitionId(), this.resultContext.getRecordSequence(), th);
            } else {
                KinesisSink.LOG.error("[{}] Failed to published message for replicator of {}-{}, {} ", this.kinesisSink.streamName, this.resultContext.getPartitionId(), this.resultContext.getRecordSequence(), th.getMessage());
            }
            this.kinesisSink.previousPublishFailed = 1;
            if (this.kinesisSink.sinkContext != null) {
                this.kinesisSink.sinkContext.recordMetric(KinesisSink.METRICS_TOTAL_FAILURE, 1.0d);
            }
            if (this.backoff == null) {
                recycle();
                return;
            }
            long next = this.backoff.next();
            KinesisSink.LOG.info("[{}] Retry to publish message for replicator of {}-{} after {} ms.", this.kinesisSink.streamName, this.resultContext.getPartitionId(), this.resultContext.getRecordSequence(), Long.valueOf(next));
            this.kinesisSink.scheduledExecutor.schedule(() -> {
                this.kinesisSink.sendUserRecord(this);
            }, next, TimeUnit.MICROSECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendUserRecord(ProducerSendCallback producerSendCallback) {
        Futures.addCallback(this.kinesisProducer.addUserRecord(this.streamName, producerSendCallback.partitionedKey, producerSendCallback.data), producerSendCallback, MoreExecutors.directExecutor());
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<byte[]> record) throws Exception {
        if (this.kinesisSinkConfig.isRetainOrdering() && this.previousPublishFailed == 1) {
            LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", this.streamName, record.getRecordSequence());
            throw new IllegalStateException("kinesis queue has publish failure");
        }
        String orElse = record.getKey().orElse(record.getTopicName().orElse("default"));
        sendUserRecord(ProducerSendCallback.create(this, record, System.nanoTime(), orElse.length() > 256 ? orElse.substring(0, 255) : orElse, createKinesisMessage(this.kinesisSinkConfig.getMessageFormat(), record)));
        if (this.sinkContext != null) {
            this.sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1.0d);
            this.sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, r0.array().length);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Published message to kinesis stream {} with size {}", this.streamName, Integer.valueOf(record.getValue().length));
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.kinesisProducer != null) {
            this.kinesisProducer.flush();
            this.kinesisProducer.destroy();
        }
        LOG.info("Kinesis sink stopped.");
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        this.kinesisSinkConfig = KinesisSinkConfig.load(map);
        this.sinkContext = sinkContext;
        Preconditions.checkArgument(StringUtils.isNotBlank(this.kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
        Preconditions.checkArgument(StringUtils.isNotBlank(this.kinesisSinkConfig.getAwsEndpoint()) || StringUtils.isNotBlank(this.kinesisSinkConfig.getAwsRegion()), "Either the aws-end-point or aws-region must be set");
        Preconditions.checkArgument(StringUtils.isNotBlank(this.kinesisSinkConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
        KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration();
        kinesisProducerConfiguration.setKinesisEndpoint(this.kinesisSinkConfig.getAwsEndpoint());
        if (this.kinesisSinkConfig.getAwsEndpointPort() != null) {
            kinesisProducerConfiguration.setKinesisPort(this.kinesisSinkConfig.getAwsEndpointPort().intValue());
        }
        kinesisProducerConfiguration.setRegion(this.kinesisSinkConfig.getAwsRegion());
        kinesisProducerConfiguration.setThreadingModel(KinesisProducerConfiguration.ThreadingModel.POOLED);
        kinesisProducerConfiguration.setThreadPoolSize(4);
        kinesisProducerConfiguration.setCollectionMaxCount(1L);
        if (this.kinesisSinkConfig.getSkipCertificateValidation() != null && this.kinesisSinkConfig.getSkipCertificateValidation().booleanValue()) {
            kinesisProducerConfiguration.setVerifyCertificate(false);
        }
        kinesisProducerConfiguration.setCredentialsProvider(createCredentialProvider(this.kinesisSinkConfig.getAwsCredentialPluginName(), this.kinesisSinkConfig.getAwsCredentialPluginParam()).getCredentialProvider());
        this.streamName = this.kinesisSinkConfig.getAwsKinesisStreamName();
        this.kinesisProducer = new KinesisProducer(kinesisProducerConfiguration);
        IS_PUBLISH_FAILED.set(this, 0);
        LOG.info("Kinesis sink started. {}", ReflectionToStringBuilder.toString(kinesisProducerConfiguration, ToStringStyle.SHORT_PREFIX_STYLE));
    }

    public static ByteBuffer createKinesisMessage(KinesisSinkConfig.MessageFormat messageFormat, Record<byte[]> record) {
        return KinesisSinkConfig.MessageFormat.FULL_MESSAGE_IN_JSON.equals(messageFormat) ? ByteBuffer.wrap(Utils.serializeRecordToJson(record).getBytes()) : KinesisSinkConfig.MessageFormat.FULL_MESSAGE_IN_FB.equals(messageFormat) ? Utils.serializeRecordToFlatBuffer(record) : ByteBuffer.wrap(record.getValue());
    }
}
