package org.apache.kafka.connect.tools;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.tools.ThroughputThrottler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.15.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/tools/VerifiableSourceTask.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/tools/VerifiableSourceTask.class */
public class VerifiableSourceTask extends SourceTask {
    public static final String NAME_CONFIG = "name";
    public static final String ID_CONFIG = "id";
    public static final String TOPIC_CONFIG = "topic";
    public static final String THROUGHPUT_CONFIG = "throughput";
    private static final String ID_FIELD = "id";
    private static final String SEQNO_FIELD = "seqno";
    private String name;
    private int id;
    private String topic;
    private Map<String, Integer> partition;
    private long startingSeqno;
    private long seqno;
    private ThroughputThrottler throttler;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) VerifiableSourceTask.class);
    private static final ObjectMapper JSON_SERDE = new ObjectMapper();

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return new VerifiableSourceConnector().version();
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public void start(Map<String, String> map) {
        try {
            this.name = map.get("name");
            this.id = Integer.parseInt(map.get("id"));
            this.topic = map.get("topic");
            long parseLong = Long.parseLong(map.get("throughput"));
            this.partition = Collections.singletonMap("id", Integer.valueOf(this.id));
            Map<String, Object> offset = this.context.offsetStorageReader().offset(this.partition);
            if (offset != null) {
                this.seqno = ((Long) offset.get(SEQNO_FIELD)).longValue() + 1;
            } else {
                this.seqno = 0L;
            }
            this.startingSeqno = this.seqno;
            this.throttler = new ThroughputThrottler(parseLong, System.currentTimeMillis());
            log.info("Started VerifiableSourceTask {}-{} producing to topic {} resuming from seqno {}", this.name, Integer.valueOf(this.id), this.topic, Long.valueOf(this.startingSeqno));
        } catch (NumberFormatException e) {
            throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public List<SourceRecord> poll() throws InterruptedException {
        String str;
        if (this.throttler.shouldThrottle(this.seqno - this.startingSeqno, System.currentTimeMillis())) {
            this.throttler.throttle();
        }
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        hashMap.put("name", this.name);
        hashMap.put("task", Integer.valueOf(this.id));
        hashMap.put("topic", this.topic);
        hashMap.put("time_ms", Long.valueOf(currentTimeMillis));
        hashMap.put(SEQNO_FIELD, Long.valueOf(this.seqno));
        try {
            str = JSON_SERDE.writeValueAsString(hashMap);
        } catch (JsonProcessingException e) {
            str = "Bad data can't be written as json: " + e.getMessage();
        }
        System.out.println(str);
        List<SourceRecord> singletonList = Collections.singletonList(new SourceRecord(this.partition, Collections.singletonMap(SEQNO_FIELD, Long.valueOf(this.seqno)), this.topic, Schema.INT32_SCHEMA, Integer.valueOf(this.id), Schema.INT64_SCHEMA, Long.valueOf(this.seqno)));
        this.seqno++;
        return singletonList;
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) throws InterruptedException {
        String str;
        HashMap hashMap = new HashMap();
        hashMap.put("name", this.name);
        hashMap.put("task", Integer.valueOf(this.id));
        hashMap.put("topic", this.topic);
        hashMap.put("time_ms", Long.valueOf(System.currentTimeMillis()));
        hashMap.put(SEQNO_FIELD, sourceRecord.value());
        hashMap.put("committed", true);
        try {
            str = JSON_SERDE.writeValueAsString(hashMap);
        } catch (JsonProcessingException e) {
            str = "Bad data can't be written as json: " + e.getMessage();
        }
        System.out.println(str);
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public void stop() {
        this.throttler.wakeup();
    }
}
