package io.debezium.pipeline.notification;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.function.BlockingConsumer;
import io.debezium.function.Predicates;
import io.debezium.pipeline.notification.channels.ConnectChannel;
import io.debezium.pipeline.notification.channels.NotificationChannel;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.SchemaFactory;
import java.time.Clock;
import java.util.List;
import java.util.function.Predicate;
import org.apache.kafka.connect.source.SourceRecord;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-2.5.4.Final.jar:io/debezium/pipeline/notification/NotificationService.class */
public class NotificationService<P extends Partition, O extends OffsetContext> {
    private final List<NotificationChannel> notificationChannels;
    private final List<String> enabledChannels;
    private final IncrementalSnapshotNotificationService<P, O> incrementalSnapshotNotificationService;
    private final InitialSnapshotNotificationService<P, O> initialSnapshotNotificationService;

    public NotificationService(List<NotificationChannel> list, CommonConnectorConfig commonConnectorConfig, SchemaFactory schemaFactory, BlockingConsumer<SourceRecord> blockingConsumer) {
        this.notificationChannels = list;
        this.enabledChannels = commonConnectorConfig.getEnabledNotificationChannels();
        this.notificationChannels.stream().filter(isEnabled()).forEach(notificationChannel -> {
            notificationChannel.init(commonConnectorConfig);
        });
        this.notificationChannels.stream().filter(isConnectChannel()).forEach(notificationChannel2 -> {
            ((ConnectChannel) notificationChannel2).initConnectChannel(schemaFactory, blockingConsumer);
        });
        this.incrementalSnapshotNotificationService = new IncrementalSnapshotNotificationService<>(this, commonConnectorConfig, Clock.systemUTC());
        this.initialSnapshotNotificationService = new InitialSnapshotNotificationService<>(this, commonConnectorConfig, Clock.systemUTC());
    }

    public void notify(Notification notification) {
        this.notificationChannels.stream().filter(isEnabled()).forEach(notificationChannel -> {
            notificationChannel.send(notification);
        });
    }

    public void notify(Notification notification, Offsets<P, ? extends OffsetContext> offsets) {
        this.notificationChannels.stream().filter(isEnabled()).filter(Predicates.not(isConnectChannel())).forEach(notificationChannel -> {
            notificationChannel.send(notification);
        });
        this.notificationChannels.stream().filter(isEnabled()).filter(isConnectChannel()).forEach(notificationChannel2 -> {
            ((ConnectChannel) notificationChannel2).send(notification, offsets);
        });
    }

    public IncrementalSnapshotNotificationService<P, O> incrementalSnapshotNotificationService() {
        return this.incrementalSnapshotNotificationService;
    }

    public InitialSnapshotNotificationService<P, O> initialSnapshotNotificationService() {
        return this.initialSnapshotNotificationService;
    }

    private Predicate<? super NotificationChannel> isEnabled() {
        return notificationChannel -> {
            return this.enabledChannels.contains(notificationChannel.name());
        };
    }

    private Predicate<? super NotificationChannel> isConnectChannel() {
        return notificationChannel -> {
            return notificationChannel instanceof ConnectChannel;
        };
    }

    public void stop() {
        this.notificationChannels.stream().filter(isEnabled()).forEach((v0) -> {
            v0.close();
        });
    }
}
