package com.linkedin.venice.helix;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.listener.ListenerManager;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushmonitor.OfflinePushAccessor;
import com.linkedin.venice.pushmonitor.OfflinePushStatus;
import com.linkedin.venice.pushmonitor.PartitionStatus;
import com.linkedin.venice.pushmonitor.PartitionStatusListener;
import com.linkedin.venice.pushmonitor.ReadOnlyPartitionStatus;
import com.linkedin.venice.utils.HelixUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:com/linkedin/venice/helix/VeniceOfflinePushMonitorAccessor.class */
public class VeniceOfflinePushMonitorAccessor implements OfflinePushAccessor {
    public static final String OFFLINE_PUSH_SUB_PATH = "OfflinePushes";
    private static final int DEFAULT_ZK_REFRESH_ATTEMPTS = 3;
    private static final long DEFAULT_ZK_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(10);
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) VeniceOfflinePushMonitorAccessor.class);
    private final String clusterName;
    private final ZkBaseDataAccessor<OfflinePushStatus> offlinePushStatusAccessor;
    private final ZkBaseDataAccessor<PartitionStatus> partitionStatusAccessor;
    private final String offlinePushStatusParentPath;
    private final ZkClient zkClient;
    private final ListenerManager<PartitionStatusListener> listenerManager;
    private final PartitionStatusZkListener partitionStatusZkListener;
    private final int refreshAttemptsForZkReconnect;
    private final long refreshIntervalForZkReconnectInMs;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/helix/VeniceOfflinePushMonitorAccessor$PartitionStatusZkListener.class */
    public class PartitionStatusZkListener implements IZkDataListener {
        private PartitionStatusZkListener() {
        }

        @Override // org.apache.helix.zookeeper.zkclient.IZkDataListener
        public void handleDataChange(String str, Object obj) throws Exception {
            if (!(obj instanceof PartitionStatus)) {
                throw new VeniceException("Invalid notification, changed data is not:" + PartitionStatus.class.getName());
            }
            String parseTopicFromPartitionStatusPath = VeniceOfflinePushMonitorAccessor.this.parseTopicFromPartitionStatusPath(str);
            ReadOnlyPartitionStatus fromPartitionStatus = ReadOnlyPartitionStatus.fromPartitionStatus((PartitionStatus) obj);
            VeniceOfflinePushMonitorAccessor.this.listenerManager.trigger(parseTopicFromPartitionStatusPath, partitionStatusListener -> {
                try {
                    partitionStatusListener.onPartitionStatusChange(parseTopicFromPartitionStatusPath, fromPartitionStatus);
                } catch (Exception e) {
                    VeniceOfflinePushMonitorAccessor.LOGGER.error("Error when invoking callback function for partition status change", (Throwable) e);
                }
            });
        }

        @Override // org.apache.helix.zookeeper.zkclient.IZkDataListener
        public void handleDataDeleted(String str) {
            VeniceOfflinePushMonitorAccessor.LOGGER.error("Partition status should not be deleted while monitoring the push status. Path: {}.", str);
        }
    }

    public VeniceOfflinePushMonitorAccessor(String str, ZkClient zkClient, HelixAdapterSerializer helixAdapterSerializer) {
        this(str, zkClient, helixAdapterSerializer, 3, DEFAULT_ZK_REFRESH_INTERVAL);
    }

    public VeniceOfflinePushMonitorAccessor(String str, ZkClient zkClient, HelixAdapterSerializer helixAdapterSerializer, int i, long j) {
        this.clusterName = str;
        this.offlinePushStatusParentPath = getOfflinePushStatuesParentPath();
        this.zkClient = zkClient;
        registerSerializers(helixAdapterSerializer);
        this.zkClient.setZkSerializer(helixAdapterSerializer);
        this.offlinePushStatusAccessor = new ZkBaseDataAccessor<>(zkClient);
        this.partitionStatusAccessor = new ZkBaseDataAccessor<>(zkClient);
        this.listenerManager = new ListenerManager<>();
        this.partitionStatusZkListener = new PartitionStatusZkListener();
        this.refreshAttemptsForZkReconnect = i;
        this.refreshIntervalForZkReconnectInMs = j;
    }

    public VeniceOfflinePushMonitorAccessor(String str, ZkBaseDataAccessor<OfflinePushStatus> zkBaseDataAccessor, ZkBaseDataAccessor<PartitionStatus> zkBaseDataAccessor2) {
        this.clusterName = str;
        this.offlinePushStatusAccessor = zkBaseDataAccessor;
        this.partitionStatusAccessor = zkBaseDataAccessor2;
        this.offlinePushStatusParentPath = getOfflinePushStatuesParentPath();
        this.zkClient = null;
        this.listenerManager = new ListenerManager<>();
        this.partitionStatusZkListener = new PartitionStatusZkListener();
        this.refreshAttemptsForZkReconnect = 3;
        this.refreshIntervalForZkReconnectInMs = DEFAULT_ZK_REFRESH_INTERVAL;
    }

    private void registerSerializers(HelixAdapterSerializer helixAdapterSerializer) {
        String str = this.offlinePushStatusParentPath + "/*";
        helixAdapterSerializer.registerSerializer(str, new OfflinePushStatusJSONSerializer());
        helixAdapterSerializer.registerSerializer(str + "/*", new PartitionStatusJSONSerializer());
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public List<OfflinePushStatus> loadOfflinePushStatusesAndPartitionStatuses() {
        LOGGER.info("Start loading all offline pushes statuses from ZK in cluster: {}.", this.clusterName);
        List<OfflinePushStatus> children = HelixUtils.getChildren(this.offlinePushStatusAccessor, this.offlinePushStatusParentPath, this.refreshAttemptsForZkReconnect, this.refreshIntervalForZkReconnectInMs);
        Iterator<OfflinePushStatus> it2 = children.iterator();
        while (it2.hasNext()) {
            OfflinePushStatus next = it2.next();
            if (next == null) {
                LOGGER.warn("Found null push status in cluster: {}.", this.clusterName);
                it2.remove();
            } else if (next.getCurrentStatus().isTaskStatus()) {
                next.setPartitionStatuses(getPartitionStatuses(next.getKafkaTopic(), next.getNumberOfPartition()));
            } else {
                LOGGER.info("Found invalid push statues: {} for topic: {} in cluster: {}. Will delete it from ZK.", next.getCurrentStatus(), next.getKafkaTopic(), this.clusterName);
                HelixUtils.remove(this.offlinePushStatusAccessor, getOfflinePushStatusPath(next.getKafkaTopic()));
                it2.remove();
            }
        }
        LOGGER.info("Loaded {} offline pushes statuses from ZK in cluster: {}.", Integer.valueOf(children.size()), this.clusterName);
        return children;
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public List<String> loadOfflinePushStatusPaths() {
        LOGGER.info("Start listing all offline pushes paths from ZK in cluster (only list path names): {}.", this.clusterName);
        List<String> listPathContents = HelixUtils.listPathContents(this.offlinePushStatusAccessor, this.offlinePushStatusParentPath);
        LOGGER.info("Listed {} offline pushes statuses path names from ZK in cluster: {}.", Integer.valueOf(listPathContents.size()), this.clusterName);
        return listPathContents;
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public OfflinePushStatus getOfflinePushStatusAndItsPartitionStatuses(String str) {
        OfflinePushStatus offlinePushStatus = this.offlinePushStatusAccessor.get(getOfflinePushStatusPath(str), (Stat) null, AccessOption.PERSISTENT);
        if (offlinePushStatus == null) {
            throw new VeniceException("Can not find offline push status in ZK from path:" + getOfflinePushStatusPath(str));
        }
        offlinePushStatus.setPartitionStatuses(getPartitionStatuses(str, offlinePushStatus.getNumberOfPartition()));
        return offlinePushStatus;
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public Optional<Long> getOfflinePushStatusCreationTime(String str) {
        try {
            Stat stat = this.offlinePushStatusAccessor.getStat(getOfflinePushStatusPath(str), AccessOption.PERSISTENT);
            if (stat != null) {
                return Optional.of(Long.valueOf(stat.getCtime()));
            }
            LOGGER.warn("Failed to get offline push status creation time for topic: {} in cluster: {}.", str, this.clusterName);
            return Optional.empty();
        } catch (Exception e) {
            LOGGER.warn("Failed to get offline push status creation time for topic: {} in cluster: {}.", str, this.clusterName, e);
            return Optional.empty();
        }
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public void updateOfflinePushStatus(OfflinePushStatus offlinePushStatus) {
        HelixUtils.update(this.offlinePushStatusAccessor, getOfflinePushStatusPath(offlinePushStatus.getKafkaTopic()), offlinePushStatus);
        LOGGER.info("Updated push status for topic {} in cluster: {} to status: {}.", offlinePushStatus.getKafkaTopic(), this.clusterName, offlinePushStatus.getCurrentStatus());
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public synchronized void createOfflinePushStatusAndItsPartitionStatuses(OfflinePushStatus offlinePushStatus) {
        LOGGER.info("Start creating offline push status for topic: {} in cluster: {}.", offlinePushStatus.getKafkaTopic(), this.clusterName);
        HelixUtils.create(this.offlinePushStatusAccessor, getOfflinePushStatusPath(offlinePushStatus.getKafkaTopic()), offlinePushStatus);
        LOGGER.info("Created offline push status ZNode. Start creating partition statuses.");
        ArrayList arrayList = new ArrayList(offlinePushStatus.getNumberOfPartition());
        ArrayList arrayList2 = new ArrayList(offlinePushStatus.getNumberOfPartition());
        for (int i = 0; i < offlinePushStatus.getNumberOfPartition(); i++) {
            arrayList.add(getPartitionStatusPath(offlinePushStatus.getKafkaTopic(), i));
            arrayList2.add(new PartitionStatus(i));
        }
        HelixUtils.updateChildren(this.partitionStatusAccessor, arrayList, arrayList2);
        LOGGER.info("Created {} partition status Znodes for topic: {}.", Integer.valueOf(offlinePushStatus.getNumberOfPartition()), offlinePushStatus.getKafkaTopic());
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public void deleteOfflinePushStatusAndItsPartitionStatuses(String str) {
        LOGGER.info("Start deleting offline push status for topic: {} in cluster: {}.", str, this.clusterName);
        HelixUtils.remove(this.offlinePushStatusAccessor, getOfflinePushStatusPath(str));
        LOGGER.info("Deleted offline push status for topic: {} in cluster: {}.", str, this.clusterName);
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public void updateReplicaStatus(String str, int i, String str2, ExecutionStatus executionStatus, long j, String str3) {
        compareAndUpdateReplicaStatus(str, i, str2, executionStatus, j, str3);
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public void updateReplicaStatus(String str, int i, String str2, ExecutionStatus executionStatus, String str3) {
        compareAndUpdateReplicaStatus(str, i, str2, executionStatus, -2147483648L, str3);
    }

    private void compareAndUpdateReplicaStatus(String str, int i, String str2, ExecutionStatus executionStatus, long j, String str3) {
        if (pushStatusExists(str)) {
            LOGGER.info("Start update replica status for topic: {}, partition: {} in cluster: {}.", str, Integer.valueOf(i), this.clusterName);
            HelixUtils.compareAndUpdate(this.partitionStatusAccessor, getPartitionStatusPath(str, i), partitionStatus -> {
                if (partitionStatus == null) {
                    partitionStatus = new PartitionStatus(i);
                }
                partitionStatus.updateReplicaStatus(str2, executionStatus, str3);
                if (j != -2147483648L) {
                    partitionStatus.updateProgress(str2, j);
                }
                if (!StringUtils.isEmpty(str3)) {
                    partitionStatus.updateIncrementalPushVersion(str2, str3);
                }
                return partitionStatus;
            });
            LOGGER.info("Updated replica status for topic: {} partition: {} status: {} in cluster: {}.", str, Integer.valueOf(i), executionStatus, this.clusterName);
        }
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public void subscribePartitionStatusChange(OfflinePushStatus offlinePushStatus, PartitionStatusListener partitionStatusListener) {
        this.listenerManager.subscribe(offlinePushStatus.getKafkaTopic(), partitionStatusListener);
        for (int i = 0; i < offlinePushStatus.getNumberOfPartition(); i++) {
            this.partitionStatusAccessor.subscribeDataChanges(getPartitionStatusPath(offlinePushStatus.getKafkaTopic(), i), this.partitionStatusZkListener);
        }
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public void unsubscribePartitionsStatusChange(OfflinePushStatus offlinePushStatus, PartitionStatusListener partitionStatusListener) {
        unsubscribePartitionsStatusChange(offlinePushStatus.getKafkaTopic(), offlinePushStatus.getNumberOfPartition(), partitionStatusListener);
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public void unsubscribePartitionsStatusChange(String str, int i, PartitionStatusListener partitionStatusListener) {
        this.listenerManager.unsubscribe(str, partitionStatusListener);
        for (int i2 = 0; i2 < i; i2++) {
            this.partitionStatusAccessor.unsubscribeDataChanges(getPartitionStatusPath(str, i2), this.partitionStatusZkListener);
        }
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public void subscribePushStatusCreationChange(IZkChildListener iZkChildListener) {
        this.offlinePushStatusAccessor.subscribeChildChanges(getOfflinePushStatuesParentPath(), iZkChildListener);
    }

    @Override // com.linkedin.venice.pushmonitor.OfflinePushAccessor
    public void unsubscribePushStatusCreationChange(IZkChildListener iZkChildListener) {
        this.offlinePushStatusAccessor.unsubscribeChildChanges(getOfflinePushStatuesParentPath(), iZkChildListener);
    }

    protected PartitionStatus getPartitionStatus(String str, int i) {
        PartitionStatus partitionStatus = this.partitionStatusAccessor.get(getPartitionStatusPath(str, i), (Stat) null, AccessOption.PERSISTENT);
        LOGGER.debug("Read partition status for topic: {} in partition: {} in cluster: {}.", str, Integer.valueOf(i), this.clusterName);
        return partitionStatus;
    }

    protected List<PartitionStatus> getPartitionStatuses(String str, int i) {
        LOGGER.debug("Start reading partition status from ZK for topic: {} in cluster: {}.", str, this.clusterName);
        List<PartitionStatus> children = HelixUtils.getChildren(this.partitionStatusAccessor, getOfflinePushStatusPath(str), this.refreshAttemptsForZkReconnect, this.refreshIntervalForZkReconnectInMs);
        LOGGER.debug("Read {} partition status from ZK for topic: {} in cluster: {}.", Integer.valueOf(children.size()), str, this.clusterName);
        if (children.isEmpty()) {
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(new PartitionStatus(i2));
            }
            return arrayList;
        }
        Collections.sort(children);
        if (children.size() == i) {
            return children;
        }
        ArrayList arrayList2 = new ArrayList(i);
        int i3 = 0;
        for (int i4 = 0; i4 < i; i4++) {
            if (i3 >= children.size() || i4 != children.get(i3).getPartitionId()) {
                arrayList2.add(new PartitionStatus(i4));
            } else {
                int i5 = i3;
                i3++;
                arrayList2.add(children.get(i5));
            }
        }
        return arrayList2;
    }

    public final String getOfflinePushStatuesParentPath() {
        return HelixUtils.getHelixClusterZkPath(this.clusterName) + "/" + OFFLINE_PUSH_SUB_PATH;
    }

    private String getOfflinePushStatusPath(String str) {
        return this.offlinePushStatusParentPath + "/" + str;
    }

    private String getPartitionStatusPath(String str, int i) {
        return getOfflinePushStatusPath(str) + "/" + i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String parseTopicFromPartitionStatusPath(String str) {
        int lastIndexOf = str.lastIndexOf(47);
        return str.substring(str.lastIndexOf(47, lastIndexOf - 1) + 1, lastIndexOf);
    }

    private boolean pushStatusExists(String str) {
        if (this.partitionStatusAccessor.exists(getOfflinePushStatusPath(str), AccessOption.PERSISTENT)) {
            return true;
        }
        LOGGER.warn("Push status does not exist, ignore the subsequent operation. Topic: {}.", str);
        return false;
    }

    public ZkBaseDataAccessor<OfflinePushStatus> getOfflinePushStatusAccessor() {
        return this.offlinePushStatusAccessor;
    }
}
