package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/FileCheckpointStateStore.class */
public class FileCheckpointStateStore implements CheckpointStateStore {
    public static final String STATE_STORE_NAME = "file";
    private final Vertx vertx;
    private final File stateDir;
    private final String consumerGroupId;
    private final ProcessingStateCodec codec;

    @Identifier(FileCheckpointStateStore.STATE_STORE_NAME)
    @ApplicationScoped
    /* loaded from: input_file:io/smallrye/reactive/messaging/kafka/commit/FileCheckpointStateStore$Factory.class */
    public static class Factory implements CheckpointStateStore.Factory {
        private final Instance<ProcessingStateCodec.Factory> stateCodecFactory;

        @Inject
        public Factory(@Any Instance<ProcessingStateCodec.Factory> instance) {
            this.stateCodecFactory = instance;
        }

        @Override // io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore.Factory
        public CheckpointStateStore create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, Class<?> cls) {
            return new FileCheckpointStateStore(vertx, (String) kafkaConsumer.configuration().get("group.id"), (File) kafkaConnectorIncomingConfiguration.config().getOptionalValue("checkpoint.file.state-dir", String.class).map(File::new).orElseGet(() -> {
                try {
                    return Files.createTempDirectory("io.smallrye.reactive.messaging.kafka", new FileAttribute[0]).toFile();
                } catch (IOException e) {
                    throw new IllegalStateException(e);
                }
            }), ((ProcessingStateCodec.Factory) CDIUtils.getInstanceById(this.stateCodecFactory, kafkaConnectorIncomingConfiguration.getChannel(), () -> {
                return this.stateCodecFactory.isUnsatisfied() ? VertxJsonProcessingStateCodec.FACTORY : (ProcessingStateCodec.Factory) this.stateCodecFactory.get();
            })).create(cls));
        }
    }

    public FileCheckpointStateStore(Vertx vertx, String str, File file, ProcessingStateCodec processingStateCodec) {
        this.vertx = vertx;
        this.consumerGroupId = str;
        this.stateDir = file;
        this.codec = processingStateCodec;
    }

    public File getStateDir() {
        return this.stateDir;
    }

    private String getStatePath(TopicPartition topicPartition) {
        return this.stateDir.toPath().resolve(this.consumerGroupId + ":" + topicPartition.topic() + ":" + topicPartition.partition()).toString();
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore
    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(Collection<TopicPartition> collection) {
        return Multi.createFrom().iterable(collection).onItem().transformToUniAndConcatenate(topicPartition -> {
            return fetchProcessingState(topicPartition).map(processingState -> {
                return Tuple2.of(topicPartition, processingState);
            });
        }).filter(tuple2 -> {
            return tuple2.getItem2() != null;
        }).collect().asMap((v0) -> {
            return v0.getItem1();
        }, (v0) -> {
            return v0.getItem2();
        });
    }

    protected Uni<ProcessingState<?>> fetchProcessingState(TopicPartition topicPartition) {
        String statePath = getStatePath(topicPartition);
        return this.vertx.fileSystem().exists(statePath).chain(bool -> {
            return bool.booleanValue() ? this.vertx.fileSystem().readFile(statePath).map(this::deserializeState).onFailure().invoke(th -> {
                KafkaLogging.log.errorf(th, "Error fetching processing state for partition %s", topicPartition);
            }).onItem().invoke(processingState -> {
                KafkaLogging.log.debugf("Fetched state for partition %s : %s", topicPartition, processingState);
            }) : Uni.createFrom().item(() -> {
                return null;
            });
        });
    }

    private ProcessingState<?> deserializeState(Buffer buffer) {
        return this.codec.decode(buffer.getBytes());
    }

    @Override // io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore
    public Uni<Void> persistProcessingState(Map<TopicPartition, ProcessingState<?>> map) {
        return Multi.createFrom().iterable(map.entrySet()).onItem().transformToUniAndConcatenate(entry -> {
            return persistProcessingState((TopicPartition) entry.getKey(), (ProcessingState) entry.getValue());
        }).collect().asList().replaceWithVoid();
    }

    protected Uni<Void> persistProcessingState(TopicPartition topicPartition, ProcessingState<?> processingState) {
        String statePath = getStatePath(topicPartition);
        return processingState != null ? this.vertx.fileSystem().exists(statePath).chain(bool -> {
            return bool.booleanValue() ? fetchProcessingState(topicPartition).onFailure().recoverWithNull() : this.vertx.fileSystem().createFile(statePath).onItem().transform(r2 -> {
                return (ProcessingState) null;
            }).onFailure(th -> {
                return Optional.ofNullable(th.getCause()).map((v0) -> {
                    return v0.getClass();
                }).orElse(null) == FileAlreadyExistsException.class;
            }).recoverWithNull();
        }).chain(processingState2 -> {
            if (processingState2 == null || processingState2.getOffset().longValue() <= processingState.getOffset().longValue()) {
                return this.vertx.fileSystem().writeFile(statePath, serializeState(processingState));
            }
            KafkaLogging.log.warnf("Skipping persist operation : higher offset found on store %d > %d", processingState2.getOffset(), processingState.getOffset());
            return Uni.createFrom().voidItem();
        }).onFailure().invoke(th -> {
            KafkaLogging.log.errorf(th, "Error persisting processing state `%s` for partition %s", processingState, topicPartition);
        }).onItem().invoke(() -> {
            KafkaLogging.log.debugf("Persisted state for partition %s : %s", topicPartition, processingState);
        }) : Uni.createFrom().voidItem();
    }

    private Buffer serializeState(ProcessingState<?> processingState) {
        return Buffer.buffer(this.codec.encode(processingState));
    }
}
