package org.apache.pulsar.io.hbase.sink;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/hbase/sink/HbaseAbstractSink.class */
public abstract class HbaseAbstractSink<T> implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HbaseAbstractSink.class);
    private HbaseSinkConfig hbaseSinkConfig;
    private Configuration configuration;
    private Connection connection;
    private Admin admin;
    private TableName tableName;
    private Table table;
    protected TableDefinition tableDefinition;
    private long batchTimeMs;
    private int batchSize;
    private List<Record<T>> incomingList;
    private ScheduledExecutorService flushExecutor;

    /* loaded from: input_file:org/apache/pulsar/io/hbase/sink/HbaseAbstractSink$TableDefinition.class */
    public static class TableDefinition {
        private final String rowKeyName;
        private final String familyName;
        private final List<String> qualifierNames;

        private TableDefinition(String str, String str2, List<String> list) {
            this.rowKeyName = str;
            this.familyName = str2;
            this.qualifierNames = list;
        }

        public static TableDefinition of(String str, String str2, List<String> list) {
            return new TableDefinition(str, str2, list);
        }

        public String getRowKeyName() {
            return this.rowKeyName;
        }

        public String getFamilyName() {
            return this.familyName;
        }

        public List<String> getQualifierNames() {
            return this.qualifierNames;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof TableDefinition)) {
                return false;
            }
            TableDefinition tableDefinition = (TableDefinition) obj;
            if (!tableDefinition.canEqual(this)) {
                return false;
            }
            String rowKeyName = getRowKeyName();
            String rowKeyName2 = tableDefinition.getRowKeyName();
            if (rowKeyName == null) {
                if (rowKeyName2 != null) {
                    return false;
                }
            } else if (!rowKeyName.equals(rowKeyName2)) {
                return false;
            }
            String familyName = getFamilyName();
            String familyName2 = tableDefinition.getFamilyName();
            if (familyName == null) {
                if (familyName2 != null) {
                    return false;
                }
            } else if (!familyName.equals(familyName2)) {
                return false;
            }
            List<String> qualifierNames = getQualifierNames();
            List<String> qualifierNames2 = tableDefinition.getQualifierNames();
            return qualifierNames == null ? qualifierNames2 == null : qualifierNames.equals(qualifierNames2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof TableDefinition;
        }

        public int hashCode() {
            String rowKeyName = getRowKeyName();
            int hashCode = (1 * 59) + (rowKeyName == null ? 43 : rowKeyName.hashCode());
            String familyName = getFamilyName();
            int hashCode2 = (hashCode * 59) + (familyName == null ? 43 : familyName.hashCode());
            List<String> qualifierNames = getQualifierNames();
            return (hashCode2 * 59) + (qualifierNames == null ? 43 : qualifierNames.hashCode());
        }

        public String toString() {
            return "HbaseAbstractSink.TableDefinition(rowKeyName=" + getRowKeyName() + ", familyName=" + getFamilyName() + ", qualifierNames=" + getQualifierNames() + ")";
        }
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.hbaseSinkConfig = HbaseSinkConfig.load(map);
        Preconditions.checkNotNull(this.hbaseSinkConfig.getZookeeperQuorum(), "zookeeperQuorum property not set.");
        Preconditions.checkNotNull(this.hbaseSinkConfig.getZookeeperClientPort(), "zookeeperClientPort property not set.");
        Preconditions.checkNotNull(this.hbaseSinkConfig.getZookeeperZnodeParent(), "zookeeperZnodeParent property not set.");
        Preconditions.checkNotNull(this.hbaseSinkConfig.getTableName(), "hbase tableName property not set.");
        getTable(this.hbaseSinkConfig);
        this.tableDefinition = getTableDefinition(this.hbaseSinkConfig);
        this.batchTimeMs = this.hbaseSinkConfig.getBatchTimeMs();
        this.batchSize = this.hbaseSinkConfig.getBatchSize();
        this.incomingList = Lists.newArrayList();
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        this.flushExecutor.scheduleAtFixedRate(() -> {
            flush();
        }, this.batchTimeMs, this.batchTimeMs, TimeUnit.MILLISECONDS);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (null != this.table) {
            this.table.close();
        }
        if (null != this.admin) {
            this.admin.close();
        }
        if (null != this.connection) {
            this.connection.close();
        }
        if (null != this.flushExecutor) {
            this.flushExecutor.shutdown();
        }
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<T> record) throws Exception {
        int size;
        synchronized (this) {
            if (null != record) {
                this.incomingList.add(record);
            }
            size = this.incomingList.size();
        }
        if (size == this.batchSize) {
            this.flushExecutor.execute(() -> {
                flush();
            });
        }
    }

    private void flush() {
        ArrayList arrayList = new ArrayList();
        synchronized (this) {
            if (this.incomingList.isEmpty()) {
                return;
            }
            List<Record<T>> list = this.incomingList;
            this.incomingList = Lists.newArrayList();
            if (CollectionUtils.isNotEmpty(list)) {
                for (Record<T> record : list) {
                    try {
                        bindValue(record, arrayList);
                    } catch (Exception e) {
                        record.fail();
                        list.remove(record);
                        log.warn("Record flush thread was exception ", (Throwable) e);
                    }
                }
            }
            try {
                if (CollectionUtils.isNotEmpty(arrayList)) {
                    this.table.batch(arrayList, new Object[arrayList.size()]);
                }
                list.forEach(record2 -> {
                    record2.ack();
                });
                arrayList.clear();
                list.clear();
            } catch (Exception e2) {
                list.forEach(record3 -> {
                    record3.fail();
                });
                log.error("Hbase table put data exception ", (Throwable) e2);
            }
        }
    }

    public abstract void bindValue(Record<T> record, List<Put> list) throws Exception;

    private void getTable(HbaseSinkConfig hbaseSinkConfig) throws IOException {
        this.configuration = HBaseConfiguration.create();
        String hbaseConfigResources = hbaseSinkConfig.getHbaseConfigResources();
        if (StringUtils.isNotBlank(hbaseConfigResources)) {
            this.configuration.addResource(hbaseConfigResources);
        }
        this.configuration.set(HConstants.ZOOKEEPER_QUORUM, hbaseSinkConfig.getZookeeperQuorum());
        this.configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, hbaseSinkConfig.getZookeeperClientPort());
        this.configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseSinkConfig.getZookeeperZnodeParent());
        this.connection = ConnectionFactory.createConnection(this.configuration);
        this.admin = this.connection.getAdmin();
        this.tableName = TableName.valueOf(hbaseSinkConfig.getTableName());
        if (!this.admin.tableExists(this.tableName)) {
            throw new IllegalArgumentException(this.tableName + " table does not exist.");
        }
        this.table = this.connection.getTable(this.tableName);
    }

    private TableDefinition getTableDefinition(HbaseSinkConfig hbaseSinkConfig) {
        Preconditions.checkNotNull(hbaseSinkConfig.getRowKeyName(), "rowKeyName property not set.");
        Preconditions.checkNotNull(hbaseSinkConfig.getFamilyName(), "familyName property not set.");
        Preconditions.checkNotNull(hbaseSinkConfig.getQualifierNames(), "qualifierNames property not set.");
        return TableDefinition.of(hbaseSinkConfig.getRowKeyName(), hbaseSinkConfig.getFamilyName(), hbaseSinkConfig.getQualifierNames());
    }
}
