package io.debezium.transforms;

import io.debezium.Module;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.pipeline.notification.IncrementalSnapshotNotificationService;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.schema.SchemaChangeEvent;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-2.6.1.Final.jar:io/debezium/transforms/SchemaChangeEventFilter.class */
public class SchemaChangeEventFilter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SchemaChangeEventFilter.class);
    private static final Field SCHEMA_CHANGE_EVENT_EXCLUDE_LIST = Field.create("schema.change.event.exclude.list").withDisplayName("Schema change event exclude list").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.HIGH).withDescription("Support filtering during DDL synchronization").required();
    private Set<SchemaChangeEvent.SchemaChangeEventType> excludeSchemaChangeEvents;
    private SmtManager<R> smtManager;

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        Configuration from = Configuration.from(map);
        this.smtManager = new SmtManager<>(from);
        this.smtManager.validate(from, Field.setOf(SCHEMA_CHANGE_EVENT_EXCLUDE_LIST));
        this.excludeSchemaChangeEvents = (Set) Arrays.stream(from.getString(SCHEMA_CHANGE_EVENT_EXCLUDE_LIST).split(IncrementalSnapshotNotificationService.LIST_DELIMITER)).map(str -> {
            return SchemaChangeEvent.SchemaChangeEventType.valueOf(str.trim());
        }).collect(Collectors.toSet());
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public R apply(R r) {
        SchemaChangeEvent.SchemaChangeEventType valueOf;
        if (r.value() == null || !this.smtManager.isValidSchemaChange(r)) {
            return r;
        }
        List array = Requirements.requireStruct(r.value(), "Read schema change event to filter").getArray(HistoryRecord.Fields.TABLE_CHANGES);
        if (array == null) {
            LOGGER.debug("Table changes field is null, excluding it");
            return null;
        }
        if (array.isEmpty()) {
            LOGGER.debug("Table changes field is empty, expecting {} event type", SchemaChangeEvent.SchemaChangeEventType.DATABASE);
            valueOf = SchemaChangeEvent.SchemaChangeEventType.DATABASE;
        } else {
            valueOf = SchemaChangeEvent.SchemaChangeEventType.valueOf((String) ((Struct) array.get(0)).get("type"));
        }
        if (this.excludeSchemaChangeEvents.contains(valueOf)) {
            return null;
        }
        return r;
    }

    @Override // org.apache.kafka.connect.transforms.Transformation
    public ConfigDef config() {
        ConfigDef configDef = new ConfigDef();
        Field.group(configDef, null, SCHEMA_CHANGE_EVENT_EXCLUDE_LIST);
        return configDef;
    }

    @Override // org.apache.kafka.connect.transforms.Transformation, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }

    @Override // org.apache.kafka.connect.components.Versioned
    public String version() {
        return Module.version();
    }
}
