package org.apache.flink.table.runtime.operators.join.temporal;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.class */
public class TemporalRowTimeJoinOperator extends BaseTwoInputStreamOperatorWithStateRetention {
    private static final long serialVersionUID = 6642514795175288193L;
    private static final String NEXT_LEFT_INDEX_STATE_NAME = "next-index";
    private static final String LEFT_STATE_NAME = "left";
    private static final String RIGHT_STATE_NAME = "right";
    private static final String REGISTERED_TIMER_STATE_NAME = "timer";
    private static final String TIMERS_STATE_NAME = "timers";
    private final BaseRowTypeInfo leftType;
    private final BaseRowTypeInfo rightType;
    private final GeneratedJoinCondition generatedJoinCondition;
    private final int leftTimeAttribute;
    private final int rightTimeAttribute;
    private final RowtimeComparator rightRowtimeComparator;
    private transient ValueState<Long> nextLeftIndex;
    private transient MapState<Long, BaseRow> leftState;
    private transient MapState<Long, BaseRow> rightState;
    private transient ValueState<Long> registeredTimer;
    private transient TimestampedCollector<BaseRow> collector;
    private transient InternalTimerService<VoidNamespace> timerService;
    private transient JoinCondition joinCondition;
    private transient JoinedRow outRow;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator$RowtimeComparator.class */
    public static class RowtimeComparator implements Comparator<BaseRow>, Serializable {
        private static final long serialVersionUID = 8160134014590716914L;
        private final int timeAttribute;

        private RowtimeComparator(int i) {
            this.timeAttribute = i;
        }

        @Override // java.util.Comparator
        public int compare(BaseRow baseRow, BaseRow baseRow2) {
            return Long.compare(baseRow.getLong(this.timeAttribute), baseRow2.getLong(this.timeAttribute));
        }
    }

    public TemporalRowTimeJoinOperator(BaseRowTypeInfo baseRowTypeInfo, BaseRowTypeInfo baseRowTypeInfo2, GeneratedJoinCondition generatedJoinCondition, int i, int i2, long j, long j2) {
        super(j, j2);
        this.leftType = baseRowTypeInfo;
        this.rightType = baseRowTypeInfo2;
        this.generatedJoinCondition = generatedJoinCondition;
        this.leftTimeAttribute = i;
        this.rightTimeAttribute = i2;
        this.rightRowtimeComparator = new RowtimeComparator(i2);
    }

    @Override // org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention
    public void open() throws Exception {
        this.joinCondition = this.generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
        this.joinCondition.setRuntimeContext(getRuntimeContext());
        this.joinCondition.open(new Configuration());
        this.nextLeftIndex = getRuntimeContext().getState(new ValueStateDescriptor(NEXT_LEFT_INDEX_STATE_NAME, Types.LONG));
        this.leftState = getRuntimeContext().getMapState(new MapStateDescriptor(LEFT_STATE_NAME, Types.LONG, this.leftType));
        this.rightState = getRuntimeContext().getMapState(new MapStateDescriptor(RIGHT_STATE_NAME, Types.LONG, this.rightType));
        this.registeredTimer = getRuntimeContext().getState(new ValueStateDescriptor(REGISTERED_TIMER_STATE_NAME, Types.LONG));
        this.timerService = getInternalTimerService(TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE, this);
        this.collector = new TimestampedCollector<>(this.output);
        this.outRow = new JoinedRow();
        this.outRow.setHeader((byte) 0);
    }

    public void processElement1(StreamRecord<BaseRow> streamRecord) throws Exception {
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        checkNotRetraction(baseRow);
        this.leftState.put(Long.valueOf(getNextLeftIndex()), baseRow);
        registerSmallestTimer(getLeftTime(baseRow));
        registerProcessingCleanupTimer();
    }

    public void processElement2(StreamRecord<BaseRow> streamRecord) throws Exception {
        BaseRow baseRow = (BaseRow) streamRecord.getValue();
        checkNotRetraction(baseRow);
        long rightTime = getRightTime(baseRow);
        this.rightState.put(Long.valueOf(rightTime), baseRow);
        registerSmallestTimer(rightTime);
        registerProcessingCleanupTimer();
    }

    public void onEventTime(InternalTimer<Object, VoidNamespace> internalTimer) throws Exception {
        this.registeredTimer.clear();
        long emitResultAndCleanUpState = emitResultAndCleanUpState(this.timerService.currentWatermark());
        if (emitResultAndCleanUpState < Long.MAX_VALUE) {
            registerTimer(emitResultAndCleanUpState);
        }
        if (this.stateCleaningEnabled) {
            if (emitResultAndCleanUpState < Long.MAX_VALUE || !this.rightState.isEmpty()) {
                registerProcessingCleanupTimer();
            } else {
                cleanupLastTimer();
            }
        }
    }

    public void close() throws Exception {
        if (this.joinCondition != null) {
            this.joinCondition.close();
        }
    }

    private long emitResultAndCleanUpState(long j) throws Exception {
        List<BaseRow> rightRowSorted = getRightRowSorted(this.rightRowtimeComparator);
        long j2 = Long.MAX_VALUE;
        Iterator it = this.leftState.entries().iterator();
        while (it.hasNext()) {
            BaseRow baseRow = (BaseRow) ((Map.Entry) it.next()).getValue();
            long leftTime = getLeftTime(baseRow);
            if (leftTime <= j) {
                Optional<BaseRow> latestRightRowToJoin = latestRightRowToJoin(rightRowSorted, leftTime);
                if (latestRightRowToJoin.isPresent() && this.joinCondition.apply(baseRow, latestRightRowToJoin.get())) {
                    this.outRow.replace(baseRow, latestRightRowToJoin.get());
                    this.collector.collect(this.outRow);
                }
                it.remove();
            } else {
                j2 = Math.min(j2, leftTime);
            }
        }
        cleanupState(j, rightRowSorted);
        return j2;
    }

    private void cleanupState(long j, List<BaseRow> list) throws Exception {
        int firstIndexToKeep = firstIndexToKeep(j, list);
        for (int i = 0; i < firstIndexToKeep; i++) {
            this.rightState.remove(Long.valueOf(getRightTime(list.get(i))));
        }
    }

    @Override // org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention
    public void cleanupState(long j) {
        this.leftState.clear();
        this.rightState.clear();
    }

    private int firstIndexToKeep(long j, List<BaseRow> list) {
        int indexOfFirstElementNewerThanTimer = indexOfFirstElementNewerThanTimer(j, list);
        return indexOfFirstElementNewerThanTimer < 0 ? list.size() - 1 : indexOfFirstElementNewerThanTimer - 1;
    }

    private int indexOfFirstElementNewerThanTimer(long j, List<BaseRow> list) {
        ListIterator<BaseRow> listIterator = list.listIterator();
        while (listIterator.hasNext()) {
            if (getRightTime(listIterator.next()) > j) {
                return listIterator.previousIndex();
            }
        }
        return -1;
    }

    private Optional<BaseRow> latestRightRowToJoin(List<BaseRow> list, long j) {
        return latestRightRowToJoin(list, 0, list.size() - 1, j);
    }

    private Optional<BaseRow> latestRightRowToJoin(List<BaseRow> list, int i, int i2, long j) {
        if (i > i2) {
            return i - 1 < 0 ? Optional.empty() : Optional.of(list.get(i - 1));
        }
        int i3 = (i + i2) >>> 1;
        BaseRow baseRow = list.get(i3);
        int compare = Long.compare(getRightTime(baseRow), j);
        return compare < 0 ? latestRightRowToJoin(list, i3 + 1, i2, j) : compare > 0 ? latestRightRowToJoin(list, i, i3 - 1, j) : Optional.of(baseRow);
    }

    private void registerSmallestTimer(long j) throws IOException {
        Long l = (Long) this.registeredTimer.value();
        if (l == null) {
            registerTimer(j);
        } else if (l.longValue() > j) {
            this.timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l.longValue());
            registerTimer(j);
        }
    }

    private void registerTimer(long j) throws IOException {
        this.registeredTimer.update(Long.valueOf(j));
        this.timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, j);
    }

    private List<BaseRow> getRightRowSorted(RowtimeComparator rowtimeComparator) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.rightState.values().iterator();
        while (it.hasNext()) {
            arrayList.add((BaseRow) it.next());
        }
        arrayList.sort(rowtimeComparator);
        return arrayList;
    }

    private long getNextLeftIndex() throws IOException {
        Long l = (Long) this.nextLeftIndex.value();
        if (l == null) {
            l = 0L;
        }
        this.nextLeftIndex.update(Long.valueOf(l.longValue() + 1));
        return l.longValue();
    }

    private long getLeftTime(BaseRow baseRow) {
        return baseRow.getLong(this.leftTimeAttribute);
    }

    private long getRightTime(BaseRow baseRow) {
        return baseRow.getLong(this.rightTimeAttribute);
    }

    private void checkNotRetraction(BaseRow baseRow) {
        if (BaseRowUtil.isRetractMsg(baseRow)) {
            throw new IllegalStateException("Retractions are not supported by " + getClass().getSimpleName() + ". If this can happen it should be validated during planning!");
        }
    }
}
