package org.apache.pulsar.broker.service;

import com.google.gson.Gson;
import java.io.File;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.NetworkTopologyImpl;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.zookeeper.data.Stat;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"quarantine"})
/* loaded from: input_file:org/apache/pulsar/broker/service/RackAwareTest.class */
public class RackAwareTest extends BkEnsemblesTestBase {
    private static final int NUM_BOOKIES = 6;
    private final List<BookieServer> bookies;
    private static final Logger log = LoggerFactory.getLogger(RackAwareTest.class);

    public RackAwareTest() {
        super(0);
        this.bookies = new ArrayList();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "forceMinRackNumProvider")
    public Object[][] forceMinRackNumProvider() {
        return new Object[]{new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE}};
    }

    @Override // org.apache.pulsar.broker.service.BkEnsemblesTestBase
    protected void configurePulsar(ServiceConfiguration serviceConfiguration) throws Exception {
        for (int i = 0; i < NUM_BOOKIES; i++) {
            File file = Files.createTempDirectory("bk" + Integer.toString(i) + "test", new FileAttribute[0]).toFile();
            ServerConfiguration serverConfiguration = new ServerConfiguration();
            serverConfiguration.setBookiePort(0);
            serverConfiguration.setZkServers("127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
            serverConfiguration.setJournalDirName(file.getPath());
            serverConfiguration.setLedgerDirNames(new String[]{file.getPath()});
            serverConfiguration.setAllowLoopback(true);
            serverConfiguration.setAdvertisedAddress(String.format("10.0.0.%d", Integer.valueOf(i + 1)));
            BookieServer bookieServer = new BookieServer(serverConfiguration, NullStatsLogger.INSTANCE, (Supplier) null);
            bookieServer.start();
            this.bookies.add(bookieServer);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.BkEnsemblesTestBase
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.cleanup();
        Iterator<BookieServer> it = this.bookies.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        this.bookies.clear();
    }

    @Test
    public void testPlacement() throws Exception {
        int i = 0;
        while (i < NUM_BOOKIES) {
            String bookieSocketAddress = this.bookies.get(i).getLocalAddress().toString();
            BookieInfo build = BookieInfo.builder().rack("rack-" + (i == 0 ? 1 : 2)).hostname("bookie-" + (i + 1)).build();
            log.info("setting rack for bookie at {} -- {}", bookieSocketAddress, build);
            this.admin.bookies().updateBookieRackInfo(bookieSocketAddress, "default", build);
            i++;
        }
        Awaitility.await().untilAsserted(() -> {
            TreeMap treeMap = (TreeMap) new Gson().fromJson(new String(this.bkEnsemble.getZkClient().getData("/bookies", false, (Stat) null)), TreeMap.class);
            Assert.assertEquals(((Map) treeMap.get("default")).size(), NUM_BOOKIES);
            Assert.assertTrue(((Set) treeMap.values().stream().map((v0) -> {
                return v0.values();
            }).flatMap(collection -> {
                return collection.stream().map(map -> {
                    return (String) map.get("rack");
                });
            }).collect(Collectors.toSet())).containsAll(Lists.newArrayList(new String[]{"rack-1", "rack-2"})));
        });
        BookKeeper bookKeeperClient = this.pulsar.getBookKeeperClient();
        BookieId bookieId = this.bookies.get(0).getBookieId();
        for (int i2 = 0; i2 < 100; i2++) {
            LedgerHandle createLedger = bookKeeperClient.createLedger(2, 2, BookKeeper.DigestType.DUMMY, new byte[0]);
            log.info("Ledger: {} -- Ensemble: {}", Integer.valueOf(i2), createLedger.getLedgerMetadata().getEnsembleAt(0L));
            Assert.assertTrue(createLedger.getLedgerMetadata().getEnsembleAt(0L).contains(bookieId), "first bookie in rack 0 not included in ensemble");
            createLedger.close();
        }
    }

    @Test(dataProvider = "forceMinRackNumProvider")
    public void testPlacementMinRackNumsPerWriteQuorum(boolean z) throws Exception {
        cleanup();
        this.config = new ServiceConfiguration();
        this.config.setBookkeeperClientMinNumRacksPerWriteQuorum(2);
        this.config.setBookkeeperClientEnforceMinNumRacksPerWriteQuorum(z);
        setup();
        int i = 0;
        while (i < NUM_BOOKIES) {
            String bookieSocketAddress = this.bookies.get(i).getLocalAddress().toString();
            char c = i == 0 ? (char) 1 : (char) 2;
            BookieInfo build = BookieInfo.builder().rack("rack-1").hostname("bookie-" + (i + 1)).build();
            log.info("setting rack for bookie at {} -- {}", bookieSocketAddress, build);
            this.admin.bookies().updateBookieRackInfo(bookieSocketAddress, "default", build);
            i++;
        }
        Awaitility.await().untilAsserted(() -> {
            TreeMap treeMap = (TreeMap) new Gson().fromJson(new String(this.bkEnsemble.getZkClient().getData("/bookies", false, (Stat) null)), TreeMap.class);
            Assert.assertEquals(((Map) treeMap.get("default")).size(), NUM_BOOKIES);
            Set set = (Set) treeMap.values().stream().map((v0) -> {
                return v0.values();
            }).flatMap(collection -> {
                return collection.stream().map(map -> {
                    return (String) map.get("rack");
                });
            }).collect(Collectors.toSet());
            Assert.assertEquals(set.size(), 1);
            Assert.assertTrue(set.contains("rack-1"));
        });
        BookKeeper bookKeeperClient = this.pulsar.getBookKeeperClient();
        if (z) {
            try {
                bookKeeperClient.createLedger(2, 2, BookKeeper.DigestType.DUMMY, new byte[0]);
                Assert.fail("Should be failed due to no enough rack can be found");
            } catch (BKException.BKNotEnoughBookiesException e) {
            }
        } else {
            for (int i2 = 0; i2 < 10; i2++) {
                LedgerHandle createLedger = bookKeeperClient.createLedger(2, 2, BookKeeper.DigestType.DUMMY, new byte[0]);
                log.info("Ledger: {} -- Ensemble: {}", Integer.valueOf(i2), createLedger.getLedgerMetadata().getEnsembleAt(0L));
                createLedger.close();
            }
        }
    }

    public void testRackUpdate() throws Exception {
        cleanup();
        this.config = new ServiceConfiguration();
        this.config.setBookkeeperClientMinNumRacksPerWriteQuorum(2);
        this.config.setBookkeeperClientEnforceMinNumRacksPerWriteQuorum(true);
        setup();
        for (int i = 0; i < 3; i++) {
            String bookieSocketAddress = this.bookies.get(i).getLocalAddress().toString();
            BookieInfo build = BookieInfo.builder().rack("rack-0").hostname("bookie-" + (i + 1)).build();
            log.info("setting rack for bookie at {} -- {}", bookieSocketAddress, build);
            this.admin.bookies().updateBookieRackInfo(bookieSocketAddress, "default", build);
        }
        Awaitility.await().untilAsserted(() -> {
            TreeMap treeMap = (TreeMap) new Gson().fromJson(new String(this.bkEnsemble.getZkClient().getData("/bookies", false, (Stat) null)), TreeMap.class);
            Assert.assertEquals(((Map) treeMap.get("default")).size(), 3);
            Set set = (Set) treeMap.values().stream().map((v0) -> {
                return v0.values();
            }).flatMap(collection -> {
                return collection.stream().map(map -> {
                    return (String) map.get("rack");
                });
            }).collect(Collectors.toSet());
            Assert.assertEquals(set.size(), 1);
            Assert.assertTrue(set.contains("rack-0"));
        });
        BookKeeper bookKeeperClient = this.pulsar.getBookKeeperClient();
        Field declaredField = bookKeeperClient.getClass().getDeclaredField("placementPolicy");
        declaredField.setAccessible(true);
        RackawareEnsemblePlacementPolicy rackawareEnsemblePlacementPolicy = (RackawareEnsemblePlacementPolicy) declaredField.get(bookKeeperClient);
        Field declaredField2 = rackawareEnsemblePlacementPolicy.getClass().getSuperclass().getSuperclass().getDeclaredField("topology");
        declaredField2.setAccessible(true);
        NetworkTopologyImpl networkTopologyImpl = (NetworkTopologyImpl) declaredField2.get(rackawareEnsemblePlacementPolicy);
        try {
            bookKeeperClient.createLedger(2, 2, BookKeeper.DigestType.DUMMY, new byte[0]);
            Assert.fail("Should be failed due to no enough rack can be found");
        } catch (BKException.BKNotEnoughBookiesException e) {
        }
        for (int i2 = 3; i2 < NUM_BOOKIES; i2++) {
            String bookieSocketAddress2 = this.bookies.get(i2).getLocalAddress().toString();
            BookieInfo build2 = BookieInfo.builder().rack("rack-1").hostname("bookie-" + (i2 + 1)).build();
            log.info("setting rack for bookie at {} -- {}", bookieSocketAddress2, build2);
            this.admin.bookies().updateBookieRackInfo(bookieSocketAddress2, "default", build2);
        }
        Awaitility.await().untilAsserted(() -> {
            TreeMap treeMap = (TreeMap) new Gson().fromJson(new String(this.bkEnsemble.getZkClient().getData("/bookies", false, (Stat) null)), TreeMap.class);
            Assert.assertEquals(((Map) treeMap.get("default")).size(), NUM_BOOKIES);
            Set set = (Set) treeMap.values().stream().map((v0) -> {
                return v0.values();
            }).flatMap(collection -> {
                return collection.stream().map(map -> {
                    return (String) map.get("rack");
                });
            }).collect(Collectors.toSet());
            Assert.assertEquals(set.size(), 2);
            Assert.assertTrue(set.containsAll(Lists.newArrayList(new String[]{"rack-0", "rack-1"})));
        });
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(networkTopologyImpl.getNumOfRacks(), 2);
        });
        for (int i3 = 0; i3 < 2; i3++) {
            LedgerHandle createLedger = bookKeeperClient.createLedger(2, 2, BookKeeper.DigestType.DUMMY, new byte[0]);
            log.info("Ledger: {} -- Ensemble: {}", Integer.valueOf(i3), createLedger.getLedgerMetadata().getEnsembleAt(0L));
            createLedger.close();
        }
        for (int i4 = 0; i4 < 3; i4++) {
            this.admin.bookies().deleteBookieRackInfo(this.bookies.get(i4).getLocalAddress().toString());
        }
        Awaitility.await().untilAsserted(() -> {
            TreeMap treeMap = (TreeMap) new Gson().fromJson(new String(this.bkEnsemble.getZkClient().getData("/bookies", false, (Stat) null)), TreeMap.class);
            Assert.assertEquals(((Map) treeMap.get("default")).size(), 3);
            Set set = (Set) treeMap.values().stream().map((v0) -> {
                return v0.values();
            }).flatMap(collection -> {
                return collection.stream().map(map -> {
                    return (String) map.get("rack");
                });
            }).collect(Collectors.toSet());
            Assert.assertEquals(set.size(), 1);
            Assert.assertTrue(set.contains("rack-1"));
        });
        try {
            bookKeeperClient.createLedger(2, 2, BookKeeper.DigestType.DUMMY, new byte[0]);
            Assert.fail("Should be failed due to no enough rack can be found");
        } catch (BKException.BKNotEnoughBookiesException e2) {
        }
    }
}
