package com.linkedin.venice.utils;

import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.D2ControllerClientFactory;
import com.linkedin.venice.controllerapi.NewStoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubAdminAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapterFactory;
import com.linkedin.venice.samza.VeniceObjectWithTimestamp;
import com.linkedin.venice.samza.VeniceSystemFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.testng.Assert;

/* loaded from: input_file:com/linkedin/venice/utils/IntegrationTestPushUtils.class */
public class IntegrationTestPushUtils {
    private static final Logger LOGGER = LogManager.getLogger(IntegrationTestPushUtils.class);

    public static Properties defaultVPJProps(VeniceClusterWrapper veniceClusterWrapper, String str, String str2) {
        return TestWriteUtils.defaultVPJPropsWithD2Routing((String) null, (String) null, Collections.singletonMap(veniceClusterWrapper.getRegionName(), veniceClusterWrapper.getZk().getAddress()), VeniceControllerWrapper.PARENT_D2_SERVICE_NAME, VeniceControllerWrapper.D2_SERVICE_NAME, str, str2);
    }

    public static Properties defaultVPJPropsWithoutD2Routing(VeniceClusterWrapper veniceClusterWrapper, String str, String str2) {
        return TestWriteUtils.defaultVPJProps(veniceClusterWrapper.getAllControllersURLs(), str, str2);
    }

    public static Properties defaultVPJProps(VeniceMultiClusterWrapper veniceMultiClusterWrapper, String str, String str2) {
        return TestWriteUtils.defaultVPJPropsWithD2Routing((String) null, (String) null, Collections.singletonMap(veniceMultiClusterWrapper.getRegionName(), veniceMultiClusterWrapper.getZkServerWrapper().getAddress()), VeniceControllerWrapper.PARENT_D2_SERVICE_NAME, VeniceControllerWrapper.D2_SERVICE_NAME, str, str2);
    }

    public static Properties defaultVPJProps(VeniceTwoLayerMultiRegionMultiClusterWrapper veniceTwoLayerMultiRegionMultiClusterWrapper, String str, String str2) {
        return TestWriteUtils.defaultVPJPropsWithD2Routing(veniceTwoLayerMultiRegionMultiClusterWrapper.getParentRegionName(), veniceTwoLayerMultiRegionMultiClusterWrapper.getZkServerWrapper().getAddress(), (Map) veniceTwoLayerMultiRegionMultiClusterWrapper.getChildRegions().stream().collect(Collectors.toMap(veniceMultiClusterWrapper -> {
            return veniceMultiClusterWrapper.getRegionName();
        }, veniceMultiClusterWrapper2 -> {
            return veniceMultiClusterWrapper2.getZkServerWrapper().getAddress();
        })), VeniceControllerWrapper.PARENT_D2_SERVICE_NAME, VeniceControllerWrapper.D2_SERVICE_NAME, str, str2);
    }

    public static Properties sslVPJProps(VeniceClusterWrapper veniceClusterWrapper, String str, String str2) {
        Properties defaultVPJProps = defaultVPJProps(veniceClusterWrapper, str, str2);
        defaultVPJProps.putAll(KafkaSSLUtils.getLocalKafkaClientSSLConfig());
        return defaultVPJProps;
    }

    public static void runVPJ(Properties properties, int i, ControllerClient controllerClient) {
        long currentTimeMillis = System.currentTimeMillis();
        VenicePushJob venicePushJob = new VenicePushJob(Utils.getUniqueString("hybrid-job-" + i), properties);
        try {
            venicePushJob.run();
            TestUtils.waitForNonDeterministicCompletion(5L, TimeUnit.SECONDS, () -> {
                return controllerClient.getStore((String) properties.get("venice.store.name")).getStore().getCurrentVersion() == i;
            });
            LOGGER.info("**TIME** VPJ #{} takes {} ms", Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            venicePushJob.close();
        } catch (Throwable th) {
            try {
                venicePushJob.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public static ControllerClient createStoreForJob(VeniceClusterWrapper veniceClusterWrapper, String str, String str2, Properties properties) {
        return createStoreForJob(veniceClusterWrapper.getClusterName(), str, str2, properties, CompressionStrategy.NO_OP, false, false);
    }

    public static void makeStoreHybrid(VeniceClusterWrapper veniceClusterWrapper, String str, long j, long j2) {
        ControllerClient constructClusterControllerClient = ControllerClient.constructClusterControllerClient(veniceClusterWrapper.getClusterName(), veniceClusterWrapper.getRandomRouterURL());
        try {
            ControllerResponse updateStore = constructClusterControllerClient.updateStore(str, new UpdateStoreQueryParams().setHybridRewindSeconds(j).setHybridOffsetLagThreshold(j2));
            if (updateStore.isError()) {
                throw new VeniceException(updateStore.getError());
            }
            if (constructClusterControllerClient != null) {
                constructClusterControllerClient.close();
            }
        } catch (Throwable th) {
            if (constructClusterControllerClient != null) {
                try {
                    constructClusterControllerClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static Map<String, String> getSamzaProducerConfig(VeniceClusterWrapper veniceClusterWrapper, String str, Version.PushType pushType) {
        HashMap hashMap = new HashMap();
        hashMap.put("systems.venice.push.type", pushType.toString());
        hashMap.put("systems.venice.store", str);
        hashMap.put("venice.child.d2.zk.hosts", veniceClusterWrapper.getZk().getAddress());
        hashMap.put("venice.child.controller.d2.service", VeniceControllerWrapper.D2_SERVICE_NAME);
        hashMap.put("venice.parent.d2.zk.hosts", "invalid_parent_zk_address");
        hashMap.put("venice.parent.controller.d2.service", "invalid_parent_d2_service");
        hashMap.put("deployment.id", Utils.getUniqueString("venice-push-id"));
        hashMap.put("ssl.enabled", "false");
        return hashMap;
    }

    @SafeVarargs
    public static SystemProducer getSamzaProducer(VeniceClusterWrapper veniceClusterWrapper, String str, Version.PushType pushType, Pair<String, String>... pairArr) {
        Map<String, String> samzaProducerConfig = getSamzaProducerConfig(veniceClusterWrapper, str, pushType);
        for (Pair<String, String> pair : pairArr) {
            samzaProducerConfig.put((String) pair.getFirst(), (String) pair.getSecond());
        }
        SystemProducer producer = new VeniceSystemFactory().getProducer("venice", new MapConfig(samzaProducerConfig), (MetricsRegistry) null);
        producer.start();
        return producer;
    }

    public static PubSubConsumerAdapterFactory getVeniceConsumerFactory() {
        return new ApacheKafkaConsumerAdapterFactory();
    }

    public static PubSubAdminAdapterFactory getVeniceAdminFactory() {
        return new ApacheKafkaAdminAdapterFactory();
    }

    public static ControllerClient createStoreForJob(String str, Schema schema, Properties properties) {
        return createStoreForJob(str, schema.getField(properties.getProperty("key.field", "key")).schema().toString(), schema.getField(properties.getProperty("value.field", "value")).schema().toString(), properties, CompressionStrategy.NO_OP, false, false);
    }

    public static ControllerClient createStoreForJob(String str, String str2, String str3, Properties properties, CompressionStrategy compressionStrategy, boolean z, boolean z2) {
        return createStoreForJob(str, str2, str3, properties, new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setCompressionStrategy(compressionStrategy).setBatchGetLimit(2000).setReadQuotaInCU(20000000L).setChunkingEnabled(z).setIncrementalPushEnabled(z2));
    }

    public static ControllerClient createStoreForJob(String str, String str2, String str3, Properties properties, UpdateStoreQueryParams updateStoreQueryParams) {
        ControllerClient controllerClient = getControllerClient(str, properties);
        NewStoreResponse createNewStore = controllerClient.createNewStore(properties.getProperty("venice.store.name"), "test@linkedin.com", str2, str3);
        Assert.assertFalse(createNewStore.isError(), "The NewStoreResponse returned an error: " + createNewStore.getError());
        updateStore(str, properties, updateStoreQueryParams.setStorageQuotaInByte(-1L));
        return controllerClient;
    }

    public static void updateStore(String str, Properties properties, UpdateStoreQueryParams updateStoreQueryParams) {
        ControllerClient controllerClient = getControllerClient(str, properties);
        try {
            ControllerResponse retryableRequest = controllerClient.retryableRequest(5, controllerClient2 -> {
                return controllerClient2.updateStore(properties.getProperty("venice.store.name"), updateStoreQueryParams);
            });
            Assert.assertFalse(retryableRequest.isError(), "The UpdateStore response returned an error: " + retryableRequest.getError());
            if (controllerClient != null) {
                controllerClient.close();
            }
        } catch (Throwable th) {
            if (controllerClient != null) {
                try {
                    controllerClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static ControllerClient getControllerClient(String str, Properties properties) {
        String property;
        String property2 = properties.getProperty("venice.discover.urls");
        if (!property2.startsWith("d2://")) {
            return ControllerClient.constructClusterControllerClient(str, property2);
        }
        String substring = property2.substring("d2://".length());
        if (Boolean.parseBoolean(properties.get("multi.region").toString())) {
            property = properties.getProperty("d2.zk.hosts." + properties.getProperty("parent.controller.region.name"));
        } else {
            property = properties.getProperty("d2.zk.hosts." + properties.getProperty("source.grid.fabric"));
        }
        return D2ControllerClientFactory.getControllerClient(substring, str, property, Optional.empty());
    }

    public static void sendStreamingRecord(SystemProducer systemProducer, String str, int i) {
        sendStreamingRecord(systemProducer, str, Integer.toString(i), "stream_" + i);
    }

    public static void sendStreamingRecordWithKeyPrefix(SystemProducer systemProducer, String str, String str2, int i) {
        sendStreamingRecord(systemProducer, str, str2 + i, "stream_" + i);
    }

    public static void sendStreamingDeleteRecord(SystemProducer systemProducer, String str, String str2) {
        sendStreamingRecord(systemProducer, str, str2, null, null);
    }

    public static void sendStreamingDeleteRecord(SystemProducer systemProducer, String str, String str2, Long l) {
        sendStreamingRecord(systemProducer, str, str2, null, l);
    }

    public static void sendStreamingRecord(SystemProducer systemProducer, String str, Object obj, Object obj2) {
        sendStreamingRecord(systemProducer, str, obj, obj2, null);
    }

    public static void sendStreamingRecord(SystemProducer systemProducer, String str, Object obj, Object obj2, Long l) {
        systemProducer.send(str, l == null ? new OutgoingMessageEnvelope(new SystemStream("venice", str), obj, obj2) : new OutgoingMessageEnvelope(new SystemStream("venice", str), obj, new VeniceObjectWithTimestamp(obj2, l.longValue())));
        systemProducer.flush(str);
    }

    public static void sendCustomSizeStreamingRecord(SystemProducer systemProducer, String str, int i, int i2) {
        char[] cArr = new char[i2];
        Arrays.fill(cArr, Integer.toString(i).charAt(0));
        sendStreamingRecord(systemProducer, str, Integer.toString(i), new String(cArr));
    }

    public static TopicManagerRepository getTopicManagerRepo(long j, long j2, long j3, String str, PubSubTopicRepository pubSubTopicRepository) {
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", str);
        return TopicManagerRepository.builder().setPubSubProperties(str2 -> {
            return new VeniceProperties(properties);
        }).setPubSubTopicRepository(pubSubTopicRepository).setLocalKafkaBootstrapServers(str).setPubSubConsumerAdapterFactory(getVeniceConsumerFactory()).setPubSubAdminAdapterFactory(getVeniceAdminFactory()).setKafkaOperationTimeoutMs(j).setTopicDeletionStatusPollIntervalMs(j2).setTopicMinLogCompactionLagMs(j3).build();
    }
}
