package storm.kafka;

import backtype.storm.utils.Utils;
import java.io.UnsupportedEncodingException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.GlobalPartitionInformation;

/* loaded from: input_file:storm/kafka/DynamicBrokersReader.class */
public class DynamicBrokersReader {
    public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
    private CuratorFramework _curator;
    private String _zkPath;
    private String _topic;

    public DynamicBrokersReader(Map map, String str, String str2, String str3) {
        this._zkPath = str2;
        this._topic = str3;
        try {
            this._curator = CuratorFrameworkFactory.newClient(str, Utils.getInt(map.get("storm.zookeeper.session.timeout")).intValue(), 15000, new RetryNTimes(Utils.getInt(map.get("storm.zookeeper.retry.times")).intValue(), Utils.getInt(map.get("storm.zookeeper.retry.interval")).intValue()));
            this._curator.start();
        } catch (Exception e) {
            LOG.error("Couldn't connect to zookeeper", e);
        }
    }

    public GlobalPartitionInformation getBrokerInfo() throws SocketTimeoutException {
        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
        try {
            int numPartitions = getNumPartitions();
            String brokerPath = brokerPath();
            for (int i = 0; i < numPartitions; i++) {
                String str = brokerPath + "/" + getLeaderFor(i);
                try {
                    globalPartitionInformation.addPartition(i, getBrokerHost((byte[]) this._curator.getData().forPath(str)));
                } catch (KeeperException.NoNodeException e) {
                    LOG.error("Node {} does not exist ", str);
                }
            }
            LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
            return globalPartitionInformation;
        } catch (SocketTimeoutException e2) {
            throw e2;
        } catch (Exception e3) {
            throw new RuntimeException(e3);
        }
    }

    private int getNumPartitions() {
        try {
            return ((List) this._curator.getChildren().forPath(partitionPath())).size();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public String partitionPath() {
        return this._zkPath + "/topics/" + this._topic + "/partitions";
    }

    public String brokerPath() {
        return this._zkPath + "/ids";
    }

    private int getLeaderFor(long j) {
        try {
            return Integer.valueOf(((Number) ((Map) JSONValue.parse(new String((byte[]) this._curator.getData().forPath(partitionPath() + "/" + j + "/state"), "UTF-8"))).get("leader")).intValue()).intValue();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {
        this._curator.close();
    }

    private Broker getBrokerHost(byte[] bArr) {
        try {
            Map map = (Map) JSONValue.parse(new String(bArr, "UTF-8"));
            return new Broker((String) map.get("host"), Integer.valueOf(((Long) map.get("port")).intValue()).intValue());
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        }
    }
}
