package software.amazon.kinesis.retrieval.fanout;

import java.util.concurrent.ExecutionException;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest;
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.ConsumerRegistration;

@KinesisClientInternalApi
/* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-2.2.8.jar:software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration.class */
public class FanOutConsumerRegistration implements ConsumerRegistration {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FanOutConsumerRegistration.class);

    @NonNull
    private final KinesisAsyncClient kinesisClient;
    private final String streamName;

    @NonNull
    private final String streamConsumerName;
    private final int maxDescribeStreamSummaryRetries;
    private final int maxDescribeStreamConsumerRetries;
    private final int registerStreamConsumerRetries;
    private final long retryBackoffMillis;
    private String streamArn;
    private String streamConsumerArn;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-2.2.8.jar:software/amazon/kinesis/retrieval/fanout/FanOutConsumerRegistration$ServiceCallerSupplier.class */
    public interface ServiceCallerSupplier<T> {
        T get() throws ExecutionException, InterruptedException;
    }

    @Override // software.amazon.kinesis.retrieval.ConsumerRegistration
    public String getOrCreateStreamConsumerArn() throws DependencyException {
        if (StringUtils.isEmpty(this.streamConsumerArn)) {
            DescribeStreamConsumerResponse describeStreamConsumerResponse = null;
            try {
                describeStreamConsumerResponse = describeStreamConsumer();
            } catch (ResourceNotFoundException e) {
                log.info("StreamConsumer not found, need to create it.");
            }
            if (describeStreamConsumerResponse == null) {
                LimitExceededException limitExceededException = null;
                for (int i = this.registerStreamConsumerRetries; i > 0; i--) {
                    limitExceededException = null;
                    try {
                        try {
                            streamConsumerArn(registerStreamConsumer().consumer().consumerARN());
                            break;
                        } catch (LimitExceededException e2) {
                            log.debug("RegisterStreamConsumer call got throttled will retry.");
                            limitExceededException = e2;
                        }
                    } catch (ResourceInUseException e3) {
                        log.debug("Got ResourceInUseException consumer exists, will call DescribeStreamConsumer again.");
                        describeStreamConsumerResponse = describeStreamConsumer();
                    }
                }
                if (limitExceededException != null) {
                    throw new DependencyException(limitExceededException);
                }
            }
            if (describeStreamConsumerResponse != null) {
                streamConsumerArn(describeStreamConsumerResponse.consumerDescription().consumerARN());
            }
            waitForActive();
        }
        return this.streamConsumerArn;
    }

    private RegisterStreamConsumerResponse registerStreamConsumer() throws DependencyException {
        AWSExceptionManager createExceptionManager = createExceptionManager();
        try {
            return this.kinesisClient.registerStreamConsumer((RegisterStreamConsumerRequest) KinesisRequestsBuilder.registerStreamConsumerRequestBuilder().streamARN(streamArn()).consumerName(this.streamConsumerName).mo2756build()).get();
        } catch (InterruptedException e) {
            throw new DependencyException(e);
        } catch (ExecutionException e2) {
            throw createExceptionManager.apply(e2.getCause());
        }
    }

    private DescribeStreamConsumerResponse describeStreamConsumer() throws DependencyException {
        DescribeStreamConsumerRequest.Builder describeStreamConsumerRequestBuilder = KinesisRequestsBuilder.describeStreamConsumerRequestBuilder();
        DescribeStreamConsumerRequest describeStreamConsumerRequest = StringUtils.isEmpty(this.streamConsumerArn) ? (DescribeStreamConsumerRequest) describeStreamConsumerRequestBuilder.streamARN(streamArn()).consumerName(this.streamConsumerName).mo2756build() : (DescribeStreamConsumerRequest) describeStreamConsumerRequestBuilder.consumerARN(this.streamConsumerArn).mo2756build();
        return (DescribeStreamConsumerResponse) retryWhenThrottled(() -> {
            return this.kinesisClient.describeStreamConsumer(describeStreamConsumerRequest).get();
        }, this.maxDescribeStreamConsumerRetries, "DescribeStreamConsumer");
    }

    private void waitForActive() throws DependencyException {
        ConsumerStatus consumerStatus = null;
        int i = this.maxDescribeStreamConsumerRetries;
        while (!ConsumerStatus.ACTIVE.equals(consumerStatus) && i > 0) {
            try {
                consumerStatus = describeStreamConsumer().consumerDescription().consumerStatus();
                i--;
                log.info(String.format("Waiting for StreamConsumer %s to have ACTIVE status...", this.streamConsumerName));
                Thread.sleep(this.retryBackoffMillis);
            } catch (InterruptedException e) {
                log.debug("Thread was interrupted while fetching StreamConsumer status, moving on.");
            }
        }
        if (ConsumerStatus.ACTIVE.equals(consumerStatus)) {
            return;
        }
        String format = String.format("Status of StreamConsumer %s, was not ACTIVE after all retries. Was instead %s.", this.streamConsumerName, consumerStatus);
        log.error(format);
        throw new IllegalStateException(format);
    }

    private String streamArn() throws DependencyException {
        if (StringUtils.isEmpty(this.streamArn)) {
            DescribeStreamSummaryRequest describeStreamSummaryRequest = (DescribeStreamSummaryRequest) KinesisRequestsBuilder.describeStreamSummaryRequestBuilder().streamName(this.streamName).mo2756build();
            this.streamArn = (String) retryWhenThrottled(() -> {
                return this.kinesisClient.describeStreamSummary(describeStreamSummaryRequest).get().streamDescriptionSummary().streamARN();
            }, this.maxDescribeStreamSummaryRetries, "DescribeStreamSummary");
        }
        return this.streamArn;
    }

    private <T> T retryWhenThrottled(@NonNull ServiceCallerSupplier<T> serviceCallerSupplier, int i, @NonNull String str) throws DependencyException {
        if (serviceCallerSupplier == null) {
            throw new NullPointerException("retriever");
        }
        if (str == null) {
            throw new NullPointerException("apiName");
        }
        AWSExceptionManager createExceptionManager = createExceptionManager();
        LimitExceededException limitExceededException = null;
        for (int i2 = i; i2 > 0; i2--) {
            try {
                try {
                    return serviceCallerSupplier.get();
                } catch (InterruptedException e) {
                    throw new DependencyException(e);
                } catch (ExecutionException e2) {
                    throw createExceptionManager.apply(e2.getCause());
                }
            } catch (LimitExceededException e3) {
                log.info("Throttled while calling {} API, will backoff.", str);
                try {
                    Thread.sleep(this.retryBackoffMillis + ((long) (Math.random() * 100.0d)));
                } catch (InterruptedException e4) {
                    log.debug("Sleep interrupted, shutdown invoked.");
                }
                limitExceededException = e3;
            }
        }
        if (limitExceededException == null) {
            throw new IllegalStateException(String.format("Finished all retries and no exception was caught while calling %s", str));
        }
        throw limitExceededException;
    }

    private AWSExceptionManager createExceptionManager() {
        AWSExceptionManager aWSExceptionManager = new AWSExceptionManager();
        aWSExceptionManager.add(LimitExceededException.class, limitExceededException -> {
            return limitExceededException;
        });
        aWSExceptionManager.add(ResourceInUseException.class, resourceInUseException -> {
            return resourceInUseException;
        });
        aWSExceptionManager.add(ResourceNotFoundException.class, resourceNotFoundException -> {
            return resourceNotFoundException;
        });
        aWSExceptionManager.add(KinesisException.class, kinesisException -> {
            return kinesisException;
        });
        return aWSExceptionManager;
    }

    public FanOutConsumerRegistration(@NonNull KinesisAsyncClient kinesisAsyncClient, String str, @NonNull String str2, int i, int i2, int i3, long j) {
        if (kinesisAsyncClient == null) {
            throw new NullPointerException("kinesisClient");
        }
        if (str2 == null) {
            throw new NullPointerException("streamConsumerName");
        }
        this.kinesisClient = kinesisAsyncClient;
        this.streamName = str;
        this.streamConsumerName = str2;
        this.maxDescribeStreamSummaryRetries = i;
        this.maxDescribeStreamConsumerRetries = i2;
        this.registerStreamConsumerRetries = i3;
        this.retryBackoffMillis = j;
    }

    private FanOutConsumerRegistration streamConsumerArn(String str) {
        this.streamConsumerArn = str;
        return this;
    }
}
