/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.jdbc;

import com.google.common.collect.Lists;
import java.beans.ConstructorProperties;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.jdbc.JdbcSinkConfig;
import org.apache.pulsar.io.jdbc.JdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JdbcAbstractSink<T>
implements Sink<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JdbcAbstractSink.class);
    protected JdbcSinkConfig jdbcSinkConfig;
    private Connection connection;
    private String jdbcUrl;
    private String tableName;
    private JdbcUtils.TableId tableId;
    private PreparedStatement insertStatement;
    private PreparedStatement updateStatement;
    private PreparedStatement upsertStatement;
    private PreparedStatement deleteStatement;
    protected static final String ACTION_PROPERTY = "ACTION";
    protected JdbcUtils.TableDefinition tableDefinition;
    private Deque<Record<T>> incomingList;
    private AtomicBoolean isFlushing;
    private int batchSize;
    private ScheduledExecutorService flushExecutor;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext);
        this.jdbcSinkConfig.validate();
        this.jdbcUrl = this.jdbcSinkConfig.getJdbcUrl();
        if (this.jdbcSinkConfig.getJdbcUrl() == null) {
            throw new IllegalArgumentException("Required jdbc Url not set.");
        }
        Properties properties = new Properties();
        String username = this.jdbcSinkConfig.getUserName();
        String password = this.jdbcSinkConfig.getPassword();
        if (username != null) {
            properties.setProperty("user", username);
        }
        if (password != null) {
            properties.setProperty("password", password);
        }
        this.connection = DriverManager.getConnection(this.jdbcSinkConfig.getJdbcUrl(), properties);
        this.connection.setAutoCommit(!this.jdbcSinkConfig.isUseTransactions());
        log.info("Opened jdbc connection: {}, autoCommit: {}", (Object)this.jdbcUrl, (Object)this.connection.getAutoCommit());
        this.tableName = this.jdbcSinkConfig.getTableName();
        this.tableId = JdbcUtils.getTableId(this.connection, this.tableName);
        this.initStatement();
        int timeoutMs = this.jdbcSinkConfig.getTimeoutMs();
        this.batchSize = this.jdbcSinkConfig.getBatchSize();
        this.incomingList = new LinkedList<Record<T>>();
        this.isFlushing = new AtomicBoolean(false);
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        if (timeoutMs > 0) {
            this.flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
        }
    }

    private void initStatement() throws Exception {
        List<String> keyList = JdbcAbstractSink.getListFromConfig(this.jdbcSinkConfig.getKey());
        List<String> nonKeyList = JdbcAbstractSink.getListFromConfig(this.jdbcSinkConfig.getNonKey());
        this.tableDefinition = JdbcUtils.getTableDefinition(this.connection, this.tableId, keyList, nonKeyList, this.jdbcSinkConfig.isExcludeNonDeclaredFields());
        this.insertStatement = this.connection.prepareStatement(this.generateInsertQueryStatement());
        if (this.jdbcSinkConfig.getInsertMode() == JdbcSinkConfig.InsertMode.UPSERT) {
            if (nonKeyList.isEmpty() || keyList.isEmpty()) {
                throw new IllegalStateException("UPSERT mode is not configured if 'key' and 'nonKey' config are not set.");
            }
            this.upsertStatement = this.connection.prepareStatement(this.generateUpsertQueryStatement());
        }
        if (!nonKeyList.isEmpty()) {
            this.updateStatement = this.connection.prepareStatement(this.generateUpdateQueryStatement());
        }
        if (!keyList.isEmpty()) {
            this.deleteStatement = this.connection.prepareStatement(this.generateDeleteQueryStatement());
        }
    }

    private static List<String> getListFromConfig(String jdbcSinkConfig) {
        List<Object> nonKeyList = Lists.newArrayList();
        String nonKey = jdbcSinkConfig;
        if (nonKey != null && !nonKey.isEmpty()) {
            nonKeyList = Arrays.asList(nonKey.split(","));
        }
        return nonKeyList;
    }

    public void close() throws Exception {
        if (this.flushExecutor != null) {
            int timeoutMs = this.jdbcSinkConfig.getTimeoutMs() * 2;
            this.flushExecutor.shutdown();
            this.flushExecutor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
            this.flushExecutor = null;
        }
        if (this.insertStatement != null) {
            this.insertStatement.close();
        }
        if (this.updateStatement != null) {
            this.updateStatement.close();
        }
        if (this.upsertStatement != null) {
            this.upsertStatement.close();
        }
        if (this.deleteStatement != null) {
            this.deleteStatement.close();
        }
        if (this.connection != null && this.jdbcSinkConfig.isUseTransactions()) {
            this.connection.commit();
        }
        if (this.connection != null) {
            this.connection.close();
            this.connection = null;
        }
        log.info("Closed jdbc connection: {}", (Object)this.jdbcUrl);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(Record<T> record) throws Exception {
        int number;
        Deque<Record<T>> deque = this.incomingList;
        synchronized (deque) {
            this.incomingList.add(record);
            number = this.incomingList.size();
        }
        if (this.batchSize > 0 && number >= this.batchSize) {
            if (log.isDebugEnabled()) {
                log.debug("flushing by batches, hit batch size {}", (Object)this.batchSize);
            }
            this.flushExecutor.schedule(this::flush, 0L, TimeUnit.MILLISECONDS);
        }
    }

    public String generateInsertQueryStatement() {
        return JdbcUtils.buildInsertSql(this.tableDefinition);
    }

    public String generateUpdateQueryStatement() {
        return JdbcUtils.buildUpdateSql(this.tableDefinition);
    }

    public abstract String generateUpsertQueryStatement();

    public abstract List<JdbcUtils.ColumnId> getColumnsForUpsert();

    public String generateDeleteQueryStatement() {
        return JdbcUtils.buildDeleteSql(this.tableDefinition);
    }

    public abstract void bindValue(PreparedStatement var1, Mutation var2) throws Exception;

    public abstract Mutation createMutation(Record<T> var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush() {
        if (this.incomingList.size() > 0 && this.isFlushing.compareAndSet(false, true)) {
            boolean needAnotherRound;
            LinkedList<Record<Record>> swapList = new LinkedList<Record<Record>>();
            Deque<Record<T>> deque = this.incomingList;
            synchronized (deque) {
                if (log.isDebugEnabled()) {
                    log.debug("Starting flush, queue size: {}", (Object)this.incomingList.size());
                }
                int actualBatchSize = this.batchSize > 0 ? Math.min(this.incomingList.size(), this.batchSize) : this.incomingList.size();
                for (int i = 0; i < actualBatchSize; ++i) {
                    swapList.add(this.incomingList.removeFirst());
                }
                needAnotherRound = this.batchSize > 0 && !this.incomingList.isEmpty() && this.incomingList.size() >= this.batchSize;
            }
            long start = System.nanoTime();
            int count = 0;
            try {
                PreparedStatement currentBatch = null;
                List mutations = swapList.stream().map(this::createMutation).collect(Collectors.toList());
                for (Mutation mutation : mutations) {
                    PreparedStatement statement = switch (mutation.getType()) {
                        case MutationType.DELETE -> this.deleteStatement;
                        case MutationType.UPDATE -> this.updateStatement;
                        case MutationType.INSERT -> this.insertStatement;
                        case MutationType.UPSERT -> this.upsertStatement;
                        default -> {
                            String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s", new Object[]{mutation.getType(), Arrays.toString((Object[])MutationType.values()), MutationType.INSERT});
                            throw new IllegalArgumentException(msg);
                        }
                    };
                    this.bindValue(statement, mutation);
                    ++count;
                    if (this.jdbcSinkConfig.isUseJdbcBatch()) {
                        if (currentBatch != null && statement != currentBatch) {
                            this.internalFlushBatch(swapList, currentBatch, count, start);
                            start = System.nanoTime();
                        }
                        statement.addBatch();
                        currentBatch = statement;
                        continue;
                    }
                    statement.execute();
                    if (this.jdbcSinkConfig.isUseTransactions()) continue;
                    ((Record)swapList.removeFirst()).ack();
                }
                if (this.jdbcSinkConfig.isUseJdbcBatch()) {
                    this.internalFlushBatch(swapList, currentBatch, count, start);
                } else {
                    this.internalFlush(swapList);
                }
            }
            catch (Exception e) {
                log.error("Got exception {} after {} ms, failing {} messages", new Object[]{e.getMessage(), (System.nanoTime() - start) / 1000L / 1000L, swapList.size(), e});
                swapList.forEach(Record::fail);
                try {
                    if (this.jdbcSinkConfig.isUseTransactions()) {
                        this.connection.rollback();
                    }
                }
                catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
            }
            this.isFlushing.set(false);
            if (needAnotherRound) {
                this.flush();
            }
        } else if (log.isDebugEnabled()) {
            log.debug("Already in flushing state, will not flush, queue size: {}", (Object)this.incomingList.size());
        }
    }

    private void internalFlush(Deque<Record<T>> swapList) throws SQLException {
        if (this.jdbcSinkConfig.isUseTransactions()) {
            this.connection.commit();
            swapList.forEach(Record::ack);
        }
    }

    private void internalFlushBatch(Deque<Record<T>> swapList, PreparedStatement currentBatch, int count, long start) throws SQLException {
        this.executeBatch(swapList, currentBatch);
        if (log.isDebugEnabled()) {
            log.debug("Flushed {} messages in {} ms", (Object)count, (Object)((System.nanoTime() - start) / 1000L / 1000L));
        }
    }

    private void executeBatch(Deque<Record<T>> swapList, PreparedStatement statement) throws SQLException {
        int[] results = statement.executeBatch();
        HashMap<Integer, Integer> failuresMapping = null;
        boolean useTransactions = this.jdbcSinkConfig.isUseTransactions();
        for (int r : results) {
            if (!JdbcAbstractSink.isBatchItemFailed(r)) continue;
            if (failuresMapping == null) {
                failuresMapping = new HashMap<Integer, Integer>();
            }
            Integer current = failuresMapping.computeIfAbsent(r, code -> 1);
            failuresMapping.put(r, current + 1);
        }
        if (failuresMapping == null || failuresMapping.isEmpty()) {
            if (useTransactions) {
                this.connection.commit();
            }
            for (int r : results) {
                swapList.removeFirst().ack();
            }
        } else {
            if (useTransactions) {
                this.connection.rollback();
            }
            for (int r : results) {
                swapList.removeFirst().fail();
            }
            String msg = "Batch failed, got error results (error_code->count): " + String.valueOf(failuresMapping);
            throw new SQLException(msg);
        }
    }

    private static boolean isBatchItemFailed(int returnCode) {
        return returnCode != -2 && returnCode < 0;
    }

    @Generated
    public Connection getConnection() {
        return this.connection;
    }

    protected static class Mutation {
        private MutationType type;
        private Function<String, Object> values;

        @Generated
        public MutationType getType() {
            return this.type;
        }

        @Generated
        public Function<String, Object> getValues() {
            return this.values;
        }

        @Generated
        public void setType(MutationType type) {
            this.type = type;
        }

        @Generated
        public void setValues(Function<String, Object> values) {
            this.values = values;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Mutation)) {
                return false;
            }
            Mutation other = (Mutation)o;
            if (!other.canEqual(this)) {
                return false;
            }
            MutationType this$type = this.getType();
            MutationType other$type = other.getType();
            if (this$type == null ? other$type != null : !((Object)((Object)this$type)).equals((Object)other$type)) {
                return false;
            }
            Function<String, Object> this$values = this.getValues();
            Function<String, Object> other$values = other.getValues();
            return !(this$values == null ? other$values != null : !this$values.equals(other$values));
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof Mutation;
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            MutationType $type = this.getType();
            result = result * 59 + ($type == null ? 43 : ((Object)((Object)$type)).hashCode());
            Function<String, Object> $values = this.getValues();
            result = result * 59 + ($values == null ? 43 : $values.hashCode());
            return result;
        }

        @Generated
        public String toString() {
            return "JdbcAbstractSink.Mutation(type=" + String.valueOf((Object)this.getType()) + ", values=" + String.valueOf(this.getValues()) + ")";
        }

        @ConstructorProperties(value={"type", "values"})
        @Generated
        public Mutation(MutationType type, Function<String, Object> values) {
            this.type = type;
            this.values = values;
        }
    }

    protected static enum MutationType {
        INSERT,
        UPDATE,
        UPSERT,
        DELETE;

    }
}

