package com.alibaba.otter.canal.client.kafka;

import com.alibaba.fastjson.JSON;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.logging.log4j.core.appender.mom.kafka.KafkaManager;

/* loaded from: input_file:META-INF/bundled-dependencies/canal.client-1.1.5.jar:com/alibaba/otter/canal/client/kafka/KafkaCanalConnector.class */
public class KafkaCanalConnector implements CanalMQConnector {
    protected KafkaConsumer<String, Message> kafkaConsumer;
    protected KafkaConsumer<String, String> kafkaConsumer2;
    protected String topic;
    protected Integer partition;
    protected boolean flatMessage;
    protected volatile boolean connected = false;
    protected volatile boolean running = false;
    private Map<Integer, Long> currentOffsets = new ConcurrentHashMap();
    protected Properties properties = new Properties();

    public KafkaCanalConnector(String str, String str2, Integer num, String str3, Integer num2, boolean z) {
        this.topic = str2;
        this.partition = num;
        this.flatMessage = z;
        this.properties.put("bootstrap.servers", str);
        this.properties.put("group.id", str3);
        this.properties.put("enable.auto.commit", false);
        this.properties.put("auto.commit.interval.ms", "1000");
        this.properties.put("auto.offset.reset", "latest");
        this.properties.put("request.timeout.ms", "40000");
        this.properties.put("session.timeout.ms", KafkaManager.DEFAULT_TIMEOUT_MILLIS);
        this.properties.put("isolation.level", "read_committed");
        this.properties.put("max.poll.records", (num2 == null ? 100 : num2).toString());
        this.properties.put("key.deserializer", StringDeserializer.class.getName());
        if (z) {
            this.properties.put("value.deserializer", StringDeserializer.class.getName());
        } else {
            this.properties.put("value.deserializer", MessageDeserializer.class.getName());
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void connect() {
        if (this.connected) {
            return;
        }
        this.connected = true;
        if (this.kafkaConsumer == null && !this.flatMessage) {
            this.kafkaConsumer = new KafkaConsumer<>(this.properties);
        }
        if (this.kafkaConsumer2 == null && this.flatMessage) {
            this.kafkaConsumer2 = new KafkaConsumer<>(this.properties);
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void disconnect() {
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.close();
            this.kafkaConsumer = null;
        }
        if (this.kafkaConsumer2 != null) {
            this.kafkaConsumer2.close();
            this.kafkaConsumer2 = null;
        }
        this.connected = false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitClientRunning() {
        this.running = true;
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public boolean checkValid() {
        return true;
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void subscribe() {
        waitClientRunning();
        if (this.running) {
            if (this.partition == null) {
                if (this.kafkaConsumer != null) {
                    this.kafkaConsumer.subscribe(Collections.singletonList(this.topic));
                }
                if (this.kafkaConsumer2 != null) {
                    this.kafkaConsumer2.subscribe(Collections.singletonList(this.topic));
                    return;
                }
                return;
            }
            TopicPartition topicPartition = new TopicPartition(this.topic, this.partition.intValue());
            if (this.kafkaConsumer != null) {
                this.kafkaConsumer.assign(Collections.singletonList(topicPartition));
            }
            if (this.kafkaConsumer2 != null) {
                this.kafkaConsumer2.assign(Collections.singletonList(topicPartition));
            }
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void unsubscribe() {
        waitClientRunning();
        if (this.running) {
            if (this.kafkaConsumer != null) {
                this.kafkaConsumer.unsubscribe();
            }
            if (this.kafkaConsumer2 != null) {
                this.kafkaConsumer2.unsubscribe();
            }
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<Message> getList(Long l, TimeUnit timeUnit) throws CanalClientException {
        waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        List<Message> listWithoutAck = getListWithoutAck(l, timeUnit);
        if (listWithoutAck != null && !listWithoutAck.isEmpty()) {
            ack();
        }
        return listWithoutAck;
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<Message> getListWithoutAck(Long l, TimeUnit timeUnit) throws CanalClientException {
        waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        ConsumerRecords poll = this.kafkaConsumer.poll(timeUnit.toMillis(l.longValue()));
        this.currentOffsets.clear();
        for (TopicPartition topicPartition : poll.partitions()) {
            this.currentOffsets.put(Integer.valueOf(topicPartition.partition()), Long.valueOf(this.kafkaConsumer.position(topicPartition)));
        }
        if (poll.isEmpty()) {
            return Lists.newArrayList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            arrayList.add(((ConsumerRecord) it.next()).value());
        }
        return arrayList;
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<FlatMessage> getFlatList(Long l, TimeUnit timeUnit) throws CanalClientException {
        waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        List<FlatMessage> flatListWithoutAck = getFlatListWithoutAck(l, timeUnit);
        if (flatListWithoutAck != null && !flatListWithoutAck.isEmpty()) {
            ack();
        }
        return flatListWithoutAck;
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<FlatMessage> getFlatListWithoutAck(Long l, TimeUnit timeUnit) throws CanalClientException {
        waitClientRunning();
        if (!this.running) {
            return Lists.newArrayList();
        }
        ConsumerRecords poll = this.kafkaConsumer2.poll(timeUnit.toMillis(l.longValue()));
        this.currentOffsets.clear();
        for (TopicPartition topicPartition : poll.partitions()) {
            this.currentOffsets.put(Integer.valueOf(topicPartition.partition()), Long.valueOf(this.kafkaConsumer2.position(topicPartition)));
        }
        if (poll.isEmpty()) {
            return Lists.newArrayList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            arrayList.add((FlatMessage) JSON.parseObject((String) ((ConsumerRecord) it.next()).value(), FlatMessage.class));
        }
        return arrayList;
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector, com.alibaba.otter.canal.client.CanalConnector
    public void rollback() {
        waitClientRunning();
        if (this.running) {
            if (this.kafkaConsumer != null) {
                for (Map.Entry<Integer, Long> entry : this.currentOffsets.entrySet()) {
                    this.kafkaConsumer.seek(new TopicPartition(this.topic, entry.getKey().intValue()), entry.getValue().longValue() - 1);
                }
            }
            if (this.kafkaConsumer2 != null) {
                for (Map.Entry<Integer, Long> entry2 : this.currentOffsets.entrySet()) {
                    this.kafkaConsumer2.seek(new TopicPartition(this.topic, entry2.getKey().intValue()), entry2.getValue().longValue() - 1);
                }
            }
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public void ack() {
        waitClientRunning();
        if (this.running) {
            if (this.kafkaConsumer != null) {
                this.kafkaConsumer.commitSync();
            }
            if (this.kafkaConsumer2 != null) {
                this.kafkaConsumer2.commitSync();
            }
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void subscribe(String str) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message get(int i) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message get(int i, Long l, TimeUnit timeUnit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message getWithoutAck(int i) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message getWithoutAck(int i, Long l, TimeUnit timeUnit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void ack(long j) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void rollback(long j) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    public void setSessionTimeout(Long l, TimeUnit timeUnit) {
        long millis = timeUnit.toMillis(l.longValue());
        this.properties.put("request.timeout.ms", String.valueOf(millis + 60000));
        this.properties.put("session.timeout.ms", String.valueOf(millis));
    }
}
