package org.apache.giraph.master;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Map;
import org.apache.giraph.aggregators.Aggregator;
import org.apache.giraph.aggregators.AggregatorWrapper;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.SuperstepState;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.aggregators.AggregatorUtils;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.utils.MasterLoggingAggregator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/master/MasterAggregatorHandler.class */
public class MasterAggregatorHandler implements MasterAggregatorUsage, Writable {
    private static final Logger LOG = Logger.getLogger(MasterAggregatorHandler.class);
    private final Map<String, AggregatorWrapper<Writable>> aggregatorMap = Maps.newHashMap();
    private final AggregatorWriter aggregatorWriter;
    private final Progressable progressable;
    private final ImmutableClassesGiraphConfiguration<?, ?, ?, ?> conf;

    public MasterAggregatorHandler(ImmutableClassesGiraphConfiguration<?, ?, ?, ?> immutableClassesGiraphConfiguration, Progressable progressable) {
        this.conf = immutableClassesGiraphConfiguration;
        this.progressable = progressable;
        this.aggregatorWriter = immutableClassesGiraphConfiguration.createAggregatorWriter();
        MasterLoggingAggregator.registerAggregator(this, immutableClassesGiraphConfiguration);
    }

    @Override // org.apache.giraph.aggregators.AggregatorUsage
    public <A extends Writable> A getAggregatedValue(String str) {
        AggregatorWrapper<Writable> aggregatorWrapper = this.aggregatorMap.get(str);
        if (aggregatorWrapper != null) {
            return (A) aggregatorWrapper.getPreviousAggregatedValue();
        }
        LOG.warn("getAggregatedValue: " + AggregatorUtils.getUnregisteredAggregatorMessage(str, this.aggregatorMap.size() != 0, this.conf));
        return null;
    }

    @Override // org.apache.giraph.master.MasterAggregatorUsage
    public <A extends Writable> void setAggregatedValue(String str, A a) {
        AggregatorWrapper<Writable> aggregatorWrapper = this.aggregatorMap.get(str);
        if (aggregatorWrapper == null) {
            throw new IllegalStateException("setAggregatedValue: " + AggregatorUtils.getUnregisteredAggregatorMessage(str, this.aggregatorMap.size() != 0, this.conf));
        }
        aggregatorWrapper.setCurrentAggregatedValue(a);
    }

    @Override // org.apache.giraph.master.MasterAggregatorUsage
    public <A extends Writable> boolean registerAggregator(String str, Class<? extends Aggregator<A>> cls) throws InstantiationException, IllegalAccessException {
        checkAggregatorName(str);
        return registerAggregator(str, cls, false) != null;
    }

    @Override // org.apache.giraph.master.MasterAggregatorUsage
    public <A extends Writable> boolean registerPersistentAggregator(String str, Class<? extends Aggregator<A>> cls) throws InstantiationException, IllegalAccessException {
        checkAggregatorName(str);
        return registerAggregator(str, cls, true) != null;
    }

    private void checkAggregatorName(String str) {
        if (str.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
            throw new IllegalStateException("checkAggregatorName: __aggregatorRequestCount is not allowed for the name of aggregator");
        }
    }

    private <A extends Writable> AggregatorWrapper<A> registerAggregator(String str, Class<? extends Aggregator<A>> cls, boolean z) throws InstantiationException, IllegalAccessException {
        AggregatorWrapper<Writable> aggregatorWrapper = this.aggregatorMap.get(str);
        if (aggregatorWrapper == null) {
            aggregatorWrapper = new AggregatorWrapper<>(cls, z);
            this.aggregatorMap.put(str, aggregatorWrapper);
        }
        return (AggregatorWrapper<A>) aggregatorWrapper;
    }

    public void prepareSuperstep(MasterClient masterClient) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("prepareSuperstep: Start preparing aggregators");
        }
        for (AggregatorWrapper<Writable> aggregatorWrapper : this.aggregatorMap.values()) {
            if (aggregatorWrapper.isPersistent()) {
                aggregatorWrapper.aggregateCurrent(aggregatorWrapper.getPreviousAggregatedValue());
            }
            aggregatorWrapper.setPreviousAggregatedValue(aggregatorWrapper.getCurrentAggregatedValue());
            aggregatorWrapper.resetCurrentAggregator();
            this.progressable.progress();
        }
        MasterLoggingAggregator.logAggregatedValue(this, this.conf);
        if (LOG.isDebugEnabled()) {
            LOG.debug("prepareSuperstep: Aggregators prepared");
        }
    }

    public void finishSuperstep(MasterClient masterClient) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("finishSuperstep: Start finishing aggregators");
        }
        for (AggregatorWrapper<Writable> aggregatorWrapper : this.aggregatorMap.values()) {
            if (aggregatorWrapper.isChanged()) {
                aggregatorWrapper.setPreviousAggregatedValue(aggregatorWrapper.getCurrentAggregatedValue());
                aggregatorWrapper.resetCurrentAggregator();
            }
            this.progressable.progress();
        }
        try {
            for (Map.Entry<String, AggregatorWrapper<Writable>> entry : this.aggregatorMap.entrySet()) {
                masterClient.sendAggregator(entry.getKey(), entry.getValue().getAggregatorClass(), entry.getValue().getPreviousAggregatedValue());
                this.progressable.progress();
            }
            masterClient.finishSendingAggregatedValues();
            if (LOG.isDebugEnabled()) {
                LOG.debug("finishSuperstep: Aggregators finished");
            }
        } catch (IOException e) {
            throw new IllegalStateException("finishSuperstep: IOException occurred while sending aggregators", e);
        }
    }

    public void acceptAggregatedValues(DataInput dataInput) throws IOException {
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            String readUTF = dataInput.readUTF();
            AggregatorWrapper<Writable> aggregatorWrapper = this.aggregatorMap.get(readUTF);
            if (aggregatorWrapper == null) {
                throw new IllegalStateException("acceptAggregatedValues: Master received aggregator which isn't registered: " + readUTF);
            }
            Writable createInitialValue = aggregatorWrapper.createInitialValue();
            createInitialValue.readFields(dataInput);
            aggregatorWrapper.setCurrentAggregatedValue(createInitialValue);
            this.progressable.progress();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("acceptAggregatedValues: Accepted one set with " + readInt + " aggregated values");
        }
    }

    public void writeAggregators(long j, SuperstepState superstepState) {
        try {
            this.aggregatorWriter.writeAggregator(Iterables.transform(this.aggregatorMap.entrySet(), new Function<Map.Entry<String, AggregatorWrapper<Writable>>, Map.Entry<String, Writable>>() { // from class: org.apache.giraph.master.MasterAggregatorHandler.1
                public Map.Entry<String, Writable> apply(Map.Entry<String, AggregatorWrapper<Writable>> entry) {
                    MasterAggregatorHandler.this.progressable.progress();
                    return new AbstractMap.SimpleEntry(entry.getKey(), entry.getValue().getPreviousAggregatedValue());
                }
            }), superstepState == SuperstepState.ALL_SUPERSTEPS_DONE ? -1L : j);
        } catch (IOException e) {
            throw new IllegalStateException("coordinateSuperstep: IOException while writing aggregators data", e);
        }
    }

    public void initialize(BspService bspService) {
        try {
            this.aggregatorWriter.initialize(bspService.getContext(), bspService.getApplicationAttempt());
        } catch (IOException e) {
            throw new IllegalStateException("initialize: Couldn't initialize aggregatorWriter", e);
        }
    }

    public void close() throws IOException {
        this.aggregatorWriter.close();
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(this.aggregatorMap.size());
        for (Map.Entry<String, AggregatorWrapper<Writable>> entry : this.aggregatorMap.entrySet()) {
            dataOutput.writeUTF(entry.getKey());
            dataOutput.writeUTF(entry.getValue().getAggregatorClass().getName());
            dataOutput.writeBoolean(entry.getValue().isPersistent());
            entry.getValue().getPreviousAggregatedValue().write(dataOutput);
            this.progressable.progress();
        }
    }

    public void readFields(DataInput dataInput) throws IOException {
        this.aggregatorMap.clear();
        int readInt = dataInput.readInt();
        for (int i = 0; i < readInt; i++) {
            try {
                AggregatorWrapper registerAggregator = registerAggregator(dataInput.readUTF(), AggregatorUtils.getAggregatorClass(dataInput.readUTF()), dataInput.readBoolean());
                Writable createInitialValue = registerAggregator.createInitialValue();
                createInitialValue.readFields(dataInput);
                registerAggregator.setPreviousAggregatedValue(createInitialValue);
                this.progressable.progress();
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("readFields: IllegalAccessException occurred", e);
            } catch (InstantiationException e2) {
                throw new IllegalStateException("readFields: InstantiationException occurred", e2);
            }
        }
    }
}
