package com.linkedin.venice.samza;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;

/* loaded from: input_file:com/linkedin/venice/samza/VeniceSystemFactory.class */
public class VeniceSystemFactory implements SystemFactory, Serializable {
    private static final long serialVersionUID = 1;
    public static final String LEGACY_CHILD_D2_ZK_HOSTS_PROPERTY = "__r2d2DefaultClient__.r2d2Client.zkHosts";
    public static final String SYSTEMS_PREFIX = "systems.";
    public static final String DOT = ".";
    public static final String DEPLOYMENT_ID = "deployment.id";
    public static final String VENICE_PUSH_TYPE = "push.type";
    public static final String VENICE_STORE = "store";
    public static final String VENICE_AGGREGATE = "aggregate";
    public static final String VENICE_CHILD_D2_ZK_HOSTS = "venice.child.d2.zk.hosts";
    public static final String VENICE_CONTROLLER_DISCOVERY_URL = "venice.controller.discovery.url";
    public static final String VENICE_ROUTER_URL = "venice.router.url";
    public static final String VENICE_PARENT_D2_ZK_HOSTS = "venice.parent.d2.zk.hosts";
    public static final String VENICE_CHILD_CONTROLLER_D2_SERVICE = "venice.child.controller.d2.service";
    public static final String VENICE_PARENT_CONTROLLER_D2_SERVICE = "venice.parent.controller.d2.service";
    public static final String LEGACY_VENICE_CHILD_CONTROLLER_D2_SERVICE = "VeniceController";
    public static final String LEGACY_VENICE_PARENT_CONTROLLER_D2_SERVICE = "VeniceParentController";
    private final Map<SystemProducer, Pair<Boolean, Boolean>> systemProducerStatues = new VeniceConcurrentHashMap();
    private static final Logger LOGGER = LogManager.getLogger(VeniceSystemFactory.class);
    private static final AtomicInteger FACTORY_INSTANCE_NUMBER = new AtomicInteger(0);
    private static final List<String> SSL_MANDATORY_CONFIGS = Arrays.asList("ssl.keystore.type", "ssl.keystore.location", "ssl.key.password", "ssl.truststore.location", "ssl.truststore.password");

    public VeniceSystemFactory() {
        int incrementAndGet = FACTORY_INSTANCE_NUMBER.incrementAndGet();
        if (incrementAndGet > 1) {
            LOGGER.warn("There are {} VeniceSystemProducer factory instances in one process.", Integer.valueOf(incrementAndGet));
        }
    }

    public SystemAdmin getAdmin(String str, Config config) {
        return new SinglePartitionWithoutOffsetsSystemAdmin();
    }

    public SystemConsumer getConsumer(String str, Config config, MetricsRegistry metricsRegistry) {
        throw new SamzaException("There is no Venice Consumer");
    }

    public Optional<String> getControllerDiscoveryUrl(Config config) {
        String str = (String) config.get(VENICE_CONTROLLER_DISCOVERY_URL);
        return isEmpty(str) ? Optional.empty() : Optional.of(str);
    }

    @Deprecated
    protected SystemProducer createSystemProducer(String str, String str2, String str3, Version.PushType pushType, String str4, String str5, Config config, Optional<SSLFactory> optional, Optional<String> optional2) {
        return new VeniceSystemProducer(str, str, str2, str3, pushType, str4, str5, config.getBoolean("validate.venice.internal.schema.version", true), this, optional, optional2);
    }

    @Deprecated
    protected SystemProducer createSystemProducer(String str, String str2, String str3, Version.PushType pushType, String str4, String str5, boolean z, Config config, Optional<SSLFactory> optional, Optional<String> optional2) {
        return createSystemProducer(str, str, str2, str3, pushType, str4, str5, z, config, optional, optional2);
    }

    protected VeniceSystemProducer createSystemProducer(String str, String str2, String str3, String str4, Version.PushType pushType, String str5, String str6, boolean z, Config config, Optional<SSLFactory> optional, Optional<String> optional2) {
        return createSystemProducer(str, str2, str3, str4, pushType, str5, str6, z, optional, optional2);
    }

    protected VeniceSystemProducer createSystemProducer(String str, String str2, String str3, String str4, Version.PushType pushType, String str5, String str6, boolean z, Optional<SSLFactory> optional, Optional<String> optional2) {
        return new VeniceSystemProducer(str, str2, str3, str4, pushType, str5, str6, z, this, optional, optional2);
    }

    public SystemProducer getProducer(String str, String str2, boolean z, String str3, Config config) {
        String str4;
        String str5;
        if (isEmpty(str2)) {
            throw new SamzaException("store should not be null for system " + str);
        }
        String str6 = (String) config.get(DEPLOYMENT_ID);
        Optional<String> controllerDiscoveryUrl = getControllerDiscoveryUrl(config);
        String str7 = SYSTEMS_PREFIX + str + DOT;
        try {
            Version.PushType valueOf = Version.PushType.valueOf(str3);
            String str8 = (String) config.get("com.linkedin.app.env");
            LOGGER.info("Running Fabric from config: {}", str8);
            if (str8 == null) {
                str8 = System.getProperty("com.linkedin.app.env");
                LOGGER.info("Running Fabric from environment: {}", str8);
                if (str8 != null) {
                    str8 = str8.toLowerCase();
                }
            }
            if (str8 != null && str8.contains("corp")) {
                str8 = "prod-lva1";
            }
            LOGGER.info("Final Running Fabric: {}", str8);
            boolean z2 = config.getBoolean("validate.venice.internal.schema.version", true);
            Optional<SSLFactory> empty = Optional.empty();
            if (config.getBoolean("ssl.enabled", true)) {
                LOGGER.info("Controller ACL is enabled.");
                empty = Optional.of(SslUtils.getSSLFactory(getSslProperties(config), config.get("ssl.factory.class.name", "com.linkedin.venice.security.DefaultSSLFactory")));
            }
            Optional<String> ofNullable = Optional.ofNullable((String) config.get("venice.partitioners"));
            if (controllerDiscoveryUrl.isPresent()) {
                String str9 = (String) config.get(VENICE_ROUTER_URL);
                LOGGER.info("Configs for {} producer: ", str);
                LOGGER.info("{}{}: {}", str7, VENICE_STORE, str2);
                LOGGER.info("{}{}: {}", str7, VENICE_AGGREGATE, Boolean.valueOf(z));
                LOGGER.info("{}{}: {}", str7, VENICE_PUSH_TYPE, valueOf);
                LOGGER.info("{}: {}", VENICE_CONTROLLER_DISCOVERY_URL, controllerDiscoveryUrl.get());
                LOGGER.info("{}: {}", VENICE_ROUTER_URL, str9);
                VeniceSystemProducer veniceSystemProducer = new VeniceSystemProducer(controllerDiscoveryUrl.get(), str2, valueOf, str6, str8, z2, this, empty, ofNullable, (Time) SystemTime.INSTANCE);
                veniceSystemProducer.setRouterUrl(str9);
                veniceSystemProducer.applyAdditionalWriterConfigs(config);
                return veniceSystemProducer;
            }
            String str10 = (String) config.get(VENICE_PARENT_D2_ZK_HOSTS);
            if (isEmpty(str10)) {
                throw new SamzaException("venice.parent.d2.zk.hosts should not be null, please put this property in your app-def.xml");
            }
            String str11 = (String) config.get(VENICE_CHILD_D2_ZK_HOSTS);
            String str12 = (String) config.get(LEGACY_CHILD_D2_ZK_HOSTS_PROPERTY);
            if (isEmpty(str11)) {
                if (isEmpty(str12)) {
                    throw new SamzaException("Either venice.child.d2.zk.hosts or __r2d2DefaultClient__.r2d2Client.zkHosts should be defined");
                }
                str11 = str12;
            }
            String str13 = (String) config.get(VENICE_CHILD_CONTROLLER_D2_SERVICE);
            if (isEmpty(str13)) {
                LOGGER.info("venice.child.controller.d2.service is not defined. Using VeniceController");
                str13 = LEGACY_VENICE_CHILD_CONTROLLER_D2_SERVICE;
            }
            String str14 = (String) config.get(VENICE_PARENT_CONTROLLER_D2_SERVICE);
            if (isEmpty(str14)) {
                LOGGER.info("venice.parent.controller.d2.service is not defined. Using VeniceParentController");
                str14 = LEGACY_VENICE_PARENT_CONTROLLER_D2_SERVICE;
            }
            LOGGER.info("Configs for {} producer: ", str);
            LOGGER.info("{}{}: {}", str7, VENICE_STORE, str2);
            LOGGER.info("{}{}: {}", str7, VENICE_AGGREGATE, Boolean.valueOf(z));
            LOGGER.info("{}{}: {}", str7, VENICE_PUSH_TYPE, valueOf);
            LOGGER.info("{}: {}", VENICE_PARENT_D2_ZK_HOSTS, str10);
            LOGGER.info("{}: {}", VENICE_CHILD_D2_ZK_HOSTS, str11);
            LOGGER.info("{}: {}", VENICE_PARENT_CONTROLLER_D2_SERVICE, str14);
            LOGGER.info("{}: {}", VENICE_CHILD_CONTROLLER_D2_SERVICE, str13);
            if (z) {
                str4 = str10;
                str5 = str14;
            } else {
                str4 = str11;
                str5 = str13;
            }
            LOGGER.info("Will use the following primary controller D2 ZK hosts: {} and D2 Service: {}", str4, str5);
            VeniceSystemProducer createSystemProducer = createSystemProducer(str11, str4, str5, str2, valueOf, str6, str8, z2, config, empty, ofNullable);
            createSystemProducer.applyAdditionalWriterConfigs(config);
            this.systemProducerStatues.computeIfAbsent(createSystemProducer, systemProducer -> {
                return Pair.create(true, false);
            });
            return createSystemProducer;
        } catch (Exception e) {
            throw new SamzaException("Cannot parse venice push type: " + str3 + ".  Must be one of: " + ((String) Arrays.stream(Version.PushType.values()).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(","))));
        }
    }

    public SystemProducer getProducer(String str, Config config, MetricsRegistry metricsRegistry) {
        String str2 = SYSTEMS_PREFIX + str + DOT;
        return getProducer(str, (String) config.get(str2 + VENICE_STORE), config.getBoolean(str2 + VENICE_AGGREGATE, false), (String) config.get(str2 + VENICE_PUSH_TYPE), config);
    }

    public VeniceSystemProducer getClosableProducer(String str, Config config, MetricsRegistry metricsRegistry) {
        return (VeniceSystemProducer) getProducer(str, config, metricsRegistry);
    }

    public int getNumberOfActiveSystemProducers() {
        int i = 0;
        Iterator<Map.Entry<SystemProducer, Pair<Boolean, Boolean>>> it = this.systemProducerStatues.entrySet().iterator();
        while (it.hasNext()) {
            i += ((Boolean) it.next().getValue().getFirst()).booleanValue() ? 1 : 0;
        }
        return i;
    }

    public boolean getOverallExecutionStatus() {
        Iterator<Map.Entry<SystemProducer, Pair<Boolean, Boolean>>> it = this.systemProducerStatues.entrySet().iterator();
        while (it.hasNext()) {
            if (!((Boolean) it.next().getValue().getSecond()).booleanValue()) {
                return false;
            }
        }
        return true;
    }

    public void endStreamReprocessingSystemProducer(SystemProducer systemProducer, boolean z) {
        this.systemProducerStatues.put(systemProducer, Pair.create(false, Boolean.valueOf(z)));
    }

    private Properties getSslProperties(Config config) {
        SSL_MANDATORY_CONFIGS.forEach(str -> {
            if (!config.containsKey(str)) {
                throw new VeniceException("Missing a mandatory SSL config: " + str);
            }
        });
        Properties properties = new Properties();
        properties.setProperty("ssl.enabled", "true");
        properties.setProperty("ssl.keystore.type", (String) config.get("ssl.keystore.type"));
        properties.setProperty("ssl.keystore.location", (String) config.get("ssl.keystore.location"));
        properties.setProperty("ssl.keystore.password", (String) config.get("ssl.key.password"));
        properties.setProperty("ssl.truststore.location", (String) config.get("ssl.truststore.location"));
        properties.setProperty("ssl.truststore.password", (String) config.get("ssl.truststore.password"));
        return properties;
    }

    private static boolean isEmpty(String str) {
        return str == null || str.isEmpty() || str.equals("null");
    }
}
