package org.apache.flink.table.runtime.operators.aggregate;

import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.context.ExecutionContext;
import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/MiniBatchLocalGroupAggFunction.class */
public class MiniBatchLocalGroupAggFunction extends MapBundleFunction<RowData, RowData, RowData, RowData> {
    private static final long serialVersionUID = 5417039295967495506L;
    private final GeneratedAggsHandleFunction genAggsHandler;
    private transient JoinedRowData resultRow = new JoinedRowData();
    private transient AggsHandleFunction function = null;

    public MiniBatchLocalGroupAggFunction(GeneratedAggsHandleFunction generatedAggsHandleFunction) {
        this.genAggsHandler = generatedAggsHandleFunction;
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void open(ExecutionContext executionContext) throws Exception {
        super.open(executionContext);
        this.function = this.genAggsHandler.newInstance(executionContext.getRuntimeContext().getUserCodeClassLoader());
        this.function.open(new PerKeyStateDataViewStore(executionContext.getRuntimeContext()));
        this.resultRow = new JoinedRowData();
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public RowData addInput(@Nullable RowData rowData, RowData rowData2) throws Exception {
        this.function.setAccumulators(rowData == null ? this.function.createAccumulators() : rowData);
        if (RowDataUtil.isAccumulateMsg(rowData2)) {
            this.function.accumulate(rowData2);
        } else {
            this.function.retract(rowData2);
        }
        return this.function.getAccumulators();
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void finishBundle(Map<RowData, RowData> map, Collector<RowData> collector) throws Exception {
        for (Map.Entry<RowData, RowData> entry : map.entrySet()) {
            this.resultRow.replace(entry.getKey(), entry.getValue());
            collector.collect(this.resultRow);
        }
        map.clear();
    }

    @Override // org.apache.flink.table.runtime.operators.bundle.MapBundleFunction
    public void close() throws Exception {
        if (this.function != null) {
            this.function.close();
        }
    }
}
