package io.nosqlbench.engine.core;

import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityController;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.Motor;
import io.nosqlbench.engine.api.activityapi.core.ProgressMeter;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.input.ProgressCapable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/nosqlbench/engine/core/ActivityExecutor.class */
public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressMeter {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ActivityExecutor.class);
    private static final Logger activitylogger = LoggerFactory.getLogger("ACTIVITY");
    private final List<Motor<?>> motors = new ArrayList();
    private final Activity activity;
    private final ActivityDef activityDef;
    private ExecutorService executorService;
    private RuntimeException stoppingException;
    private static final int waitTime = 10000;

    public ActivityExecutor(Activity activity) {
        this.activity = activity;
        this.activityDef = activity.getActivityDef();
        this.executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue(), new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this)));
        activity.getActivityDef().getParams().addListener(this);
        activity.setActivityController(this);
    }

    public synchronized void startActivity() {
        logger.info("starting activity " + this.activity.getAlias() + " for cycles " + this.activity.getCycleSummary());
        activitylogger.debug("START/before alias=(" + this.activity.getAlias() + ")");
        try {
            this.activity.setRunState(RunState.Starting);
            this.activity.initActivity();
            adjustToActivityDef(this.activity.getActivityDef());
            this.activity.setRunState(RunState.Running);
            activitylogger.debug("START/after alias=(" + this.activity.getAlias() + ")");
        } catch (Exception e) {
            this.stoppingException = new RuntimeException("Error initializing activity '" + this.activity.getAlias() + "': " + e.getMessage(), e);
            activitylogger.error("error initializing activity '" + this.activity.getAlias() + "': " + this.stoppingException);
            throw this.stoppingException;
        }
    }

    public synchronized void stopActivity() {
        activitylogger.debug("STOP/before alias=(" + this.activity.getAlias() + ")");
        this.activity.setRunState(RunState.Stopped);
        logger.info("stopping activity in progress: " + getActivityDef().getAlias());
        this.motors.forEach((v0) -> {
            v0.requestStop();
        });
        this.motors.forEach(motor -> {
            awaitRequiredMotorState(motor, 30000, 50, RunState.Stopped, RunState.Finished);
        });
        this.activity.shutdownActivity();
        this.activity.closeAutoCloseables();
        logger.info("stopped: " + getActivityDef().getAlias() + " with " + this.motors.size() + " slots");
        activitylogger.debug("STOP/after alias=(" + this.activity.getAlias() + ")");
    }

    public synchronized void forceStopExecutor(int i) {
        activitylogger.debug("FORCE STOP/before alias=(" + this.activity.getAlias() + ")");
        this.activity.setRunState(RunState.Stopped);
        this.executorService.shutdown();
        requestStopMotors();
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
        logger.info("stopping activity forcibly " + this.activity.getAlias());
        List<Runnable> shutdownNow = this.executorService.shutdownNow();
        this.activity.shutdownActivity();
        this.activity.closeAutoCloseables();
        logger.debug(shutdownNow.size() + " threads never started.");
        if (this.stoppingException != null) {
            activitylogger.debug("FORCE STOP/exception alias=(" + this.activity.getAlias() + ")");
            throw this.stoppingException;
        }
        activitylogger.debug("FORCE STOP/after alias=(" + this.activity.getAlias() + ")");
    }

    public boolean requestStopExecutor(int i) {
        boolean z;
        activitylogger.debug("REQUEST STOP/before alias=(" + this.activity.getAlias() + ")");
        this.activity.setRunState(RunState.Stopped);
        logger.info("Stopping executor for " + this.activity.getAlias() + " when work completes.");
        this.executorService.shutdown();
        try {
            try {
                logger.trace("awaiting termination with timeout of " + i + " seconds");
                z = this.executorService.awaitTermination(i, TimeUnit.SECONDS);
                logger.trace("finally shutting down activity " + getActivity().getAlias());
                this.activity.shutdownActivity();
                logger.trace("closing auto-closeables");
                this.activity.closeAutoCloseables();
            } catch (InterruptedException e) {
                logger.trace("interrupted while awaiting termination");
                z = false;
                logger.warn("while waiting termination of activity " + this.activity.getAlias() + ", " + e.getMessage());
                activitylogger.debug("REQUEST STOP/exception alias=(" + this.activity.getAlias() + ") wasstopped=" + 0);
                logger.trace("finally shutting down activity " + getActivity().getAlias());
                this.activity.shutdownActivity();
                logger.trace("closing auto-closeables");
                this.activity.closeAutoCloseables();
            }
            if (this.stoppingException != null) {
                logger.trace("an exception caused the activity to stop:" + this.stoppingException.getMessage());
                throw this.stoppingException;
            }
            activitylogger.debug("REQUEST STOP/after alias=(" + this.activity.getAlias() + ") wasstopped=" + z);
            return z;
        } catch (Throwable th) {
            logger.trace("finally shutting down activity " + getActivity().getAlias());
            this.activity.shutdownActivity();
            logger.trace("closing auto-closeables");
            this.activity.closeAutoCloseables();
            throw th;
        }
    }

    @Override // io.nosqlbench.engine.api.activityimpl.ParameterMap.Listener
    public synchronized void handleParameterMapUpdate(ParameterMap parameterMap) {
        if (this.activity instanceof ActivityDefObserver) {
            this.activity.onActivityDefUpdate(this.activityDef);
        }
        if (this.activity.getRunState() != RunState.Uninitialized) {
            if (this.activity.getRunState() == RunState.Running) {
                adjustToActivityDef(this.activity.getActivityDef());
            }
            this.motors.stream().filter(motor -> {
                return motor instanceof ActivityDefObserver;
            }).forEach(motor2 -> {
                ((ActivityDefObserver) motor2).onActivityDefUpdate(this.activityDef);
            });
        }
    }

    public ActivityDef getActivityDef() {
        return this.activityDef;
    }

    public boolean awaitCompletion(int i) {
        return requestStopExecutor(i);
    }

    public boolean awaitFinish(int i) {
        activitylogger.debug("AWAIT-FINISH/before alias=(" + this.activity.getAlias() + ")");
        boolean awaitAllRequiredMotorState = awaitAllRequiredMotorState(i, 50, RunState.Finished, RunState.Stopped);
        if (awaitAllRequiredMotorState) {
            awaitAllRequiredMotorState = awaitCompletion(i);
        }
        if (this.stoppingException != null) {
            activitylogger.debug("AWAIT-FINISH/exception alias=(" + this.activity.getAlias() + ")");
            throw this.stoppingException;
        }
        activitylogger.debug("AWAIT-FINISH/afte alias=(" + this.activity.getAlias() + ")");
        return awaitAllRequiredMotorState;
    }

    public String toString() {
        return getClass().getSimpleName() + "~" + this.activityDef.getAlias();
    }

    private String getSlotStatus() {
        return (String) this.motors.stream().map(motor -> {
            return motor.getSlotStateTracker().getSlotState().getCode();
        }).collect(Collectors.joining(",", "[", "]"));
    }

    private synchronized void adjustToActivityDef(ActivityDef activityDef) {
        logger.trace(">-pre-adjust->" + getSlotStatus());
        while (this.motors.size() > activityDef.getThreads()) {
            Motor<?> motor = this.motors.get(this.motors.size() - 1);
            logger.trace("Stopping cycle motor thread:" + motor);
            motor.requestStop();
            this.motors.remove(this.motors.size() - 1);
        }
        while (this.motors.size() < activityDef.getThreads()) {
            Motor<?> motor2 = this.activity.getMotorDispenserDelegate().getMotor(activityDef, this.motors.size());
            logger.trace("Starting cycle motor thread:" + motor2);
            this.motors.add(motor2);
        }
        adjustToIntendedActivityState();
        awaitActivityAndMotorStateAlignment();
        logger.trace(">post-adjust->" + getSlotStatus());
    }

    private void adjustToIntendedActivityState() {
        logger.trace("ADJUSTING to INTENDED " + this.activity.getRunState());
        switch (this.activity.getRunState()) {
            case Uninitialized:
                return;
            case Starting:
            case Running:
                this.motors.stream().filter(motor -> {
                    return motor.getSlotStateTracker().getSlotState() != RunState.Running;
                }).filter(motor2 -> {
                    return motor2.getSlotStateTracker().getSlotState() != RunState.Finished;
                }).forEach(motor3 -> {
                    motor3.getSlotStateTracker().enterState(RunState.Starting);
                    this.executorService.execute(motor3);
                });
                return;
            case Stopped:
                this.motors.stream().filter(motor4 -> {
                    return motor4.getSlotStateTracker().getSlotState() != RunState.Stopped;
                }).forEach((v0) -> {
                    v0.requestStop();
                });
                return;
            case Finished:
            case Stopping:
                throw new RuntimeException("Invalid requested state in activity executor:" + this.activity.getRunState());
            default:
                throw new RuntimeException("Unmatched run state:" + this.activity.getRunState());
        }
    }

    private void awaitActivityAndMotorStateAlignment() {
        switch (this.activity.getRunState()) {
            case Uninitialized:
                break;
            case Starting:
            case Running:
                this.motors.forEach(motor -> {
                    awaitRequiredMotorState(motor, 10000, 50, RunState.Running, RunState.Finished);
                });
                break;
            case Stopped:
                this.motors.forEach(motor2 -> {
                    awaitRequiredMotorState(motor2, 10000, 50, RunState.Stopped, RunState.Finished);
                });
                break;
            case Finished:
                this.motors.forEach(motor3 -> {
                    awaitRequiredMotorState(motor3, 10000, 50, RunState.Finished);
                });
                break;
            case Stopping:
                throw new RuntimeException("Invalid requested state in activity executor:" + this.activity.getRunState());
            default:
                throw new RuntimeException("Unmatched run state:" + this.activity.getRunState());
        }
        logger.debug("activity and threads are aligned to state " + this.activity.getRunState() + " for " + getActivity().getAlias());
    }

    private boolean awaitMotorState(Motor motor, int i, int i2, RunState... runStateArr) {
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() < currentTimeMillis + i) {
            for (RunState runState : runStateArr) {
                if (motor.getSlotStateTracker().getSlotState() == runState) {
                    Logger logger2 = logger;
                    String alias = this.activityDef.getAlias();
                    long slotId = motor.getSlotId();
                    motor.getSlotStateTracker().getSlotState();
                    logger2.trace(alias + "/Motor[" + slotId + "] is now in state " + logger2);
                    return true;
                }
            }
            try {
                Thread.sleep(i2);
            } catch (InterruptedException e) {
            }
        }
        Logger logger3 = logger;
        String alias2 = this.activityDef.getAlias();
        long slotId2 = motor.getSlotId();
        motor.getSlotStateTracker().getSlotState();
        logger3.trace(alias2 + "/Motor[" + slotId2 + "] is now in state " + logger3);
        return false;
    }

    private boolean awaitAllRequiredMotorState(int i, int i2, RunState... runStateArr) {
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        while (!z && System.currentTimeMillis() < currentTimeMillis + i) {
            z = true;
            Iterator<Motor<?>> it = this.motors.iterator();
            while (true) {
                if (it.hasNext()) {
                    Motor<?> next = it.next();
                    z = awaitMotorState(next, i, i2, runStateArr);
                    if (!z) {
                        Logger logger2 = logger;
                        long slotId = next.getSlotId();
                        Arrays.asList(runStateArr);
                        logger2.trace("failed awaiting motor " + slotId + " for state in " + logger2);
                        break;
                    }
                }
            }
        }
        return z;
    }

    private boolean awaitAnyRequiredMotorState(int i, int i2, RunState... runStateArr) {
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() < currentTimeMillis + i) {
            for (Motor<?> motor : this.motors) {
                for (RunState runState : runStateArr) {
                    if (motor.getSlotStateTracker().getSlotState() == runState) {
                        Logger logger2 = logger;
                        String alias = this.activityDef.getAlias();
                        long slotId = motor.getSlotId();
                        motor.getSlotStateTracker().getSlotState();
                        logger2.trace("at least one 'any' of " + alias + "/Motor[" + slotId + "] is now in state " + logger2);
                        return true;
                    }
                }
            }
            try {
                Thread.sleep(i2);
            } catch (InterruptedException e) {
            }
        }
        logger.trace("none of " + this.activityDef.getAlias() + "/Motor [" + this.motors.size() + "] is in states in " + Arrays.asList(runStateArr));
        return false;
    }

    private void awaitRequiredMotorState(Motor motor, int i, int i2, RunState... runStateArr) {
        RunState slotState = motor.getSlotStateTracker().getSlotState();
        if (awaitMotorState(motor, i, i2, runStateArr)) {
            logger.trace("motor " + motor + " entered awaited state: " + Arrays.asList(runStateArr));
            return;
        }
        String alias = this.activityDef.getAlias();
        String str = "Unable to await " + alias + "/Motor[" + motor.getSlotId() + "]: from state " + alias + " to " + slotState + " after waiting for " + motor.getSlotStateTracker().getSlotState() + "ms";
        RuntimeException runtimeException = new RuntimeException(str);
        logger.error(str);
        throw runtimeException;
    }

    private synchronized void requestStopMotors() {
        logger.info("stopping activity " + this.activity);
        this.activity.setRunState(RunState.Stopped);
        this.motors.forEach((v0) -> {
            v0.requestStop();
        });
    }

    public boolean isRunning() {
        return this.motors.stream().anyMatch(motor -> {
            return motor.getSlotStateTracker().getSlotState() == RunState.Running;
        });
    }

    public Activity getActivity() {
        return this.activity;
    }

    @Override // io.nosqlbench.engine.api.activityapi.core.ProgressMeter
    public synchronized double getProgress() {
        double endCycle = getActivityDef().getEndCycle() - getActivityDef().getStartCycle();
        double d = 0.0d;
        double d2 = 0.0d;
        Iterator it = ((ArrayList) this.motors.stream().map((v0) -> {
            return v0.getInput();
        }).distinct().collect(Collectors.toCollection(ArrayList::new))).iterator();
        while (it.hasNext()) {
            Input input = (Input) it.next();
            if (!(input instanceof ProgressCapable)) {
                logger.warn("input does not support activity progress: " + input);
                return Double.NaN;
            }
            ProgressCapable progressCapable = (ProgressCapable) input;
            d += progressCapable.getTotal();
            d2 += progressCapable.getProgress();
        }
        return d2 / d;
    }

    @Override // io.nosqlbench.engine.api.activityapi.core.ProgressMeter
    public String getProgressDetails() {
        return (String) this.motors.stream().map((v0) -> {
            return v0.getInput();
        }).distinct().findFirst().filter(input -> {
            return input instanceof ProgressCapable;
        }).map(input2 -> {
            return ((ProgressCapable) input2).getProgressDetails();
        }).orElse("");
    }

    @Override // io.nosqlbench.engine.api.activityapi.core.ProgressMeter
    public String getProgressName() {
        return this.activityDef.getAlias();
    }

    @Override // io.nosqlbench.engine.api.activityapi.core.ProgressMeter
    public RunState getProgressState() {
        return (RunState) this.motors.stream().map((v0) -> {
            return v0.getSlotStateTracker();
        }).map((v0) -> {
            return v0.getSlotState();
        }).distinct().sorted().findFirst().orElse(RunState.Uninitialized);
    }

    public synchronized void notifyException(Thread thread, Throwable th) {
        this.stoppingException = new RuntimeException("Error in activity thread " + thread.getName(), th);
        forceStopExecutor(10000);
    }

    @Override // io.nosqlbench.engine.api.activityapi.core.ActivityController
    public synchronized void stopActivityWithReasonAsync(String str) {
        logger.info("Stopping activity " + this.activityDef.getAlias() + ": " + str);
        this.stoppingException = new RuntimeException("Stopping activity " + this.activityDef.getAlias() + ": " + str);
        logger.error("stopping with reason: " + this.stoppingException);
        requestStopMotors();
    }

    @Override // io.nosqlbench.engine.api.activityapi.core.ActivityController
    public synchronized void stopActivityWithErrorAsync(Throwable th) {
        if (this.stoppingException == null) {
            this.stoppingException = new RuntimeException(th);
            logger.error("stopping on error: " + th.toString(), th);
        } else if (this.activityDef.getParams().getOptionalBoolean("fullerrors").orElse(false).booleanValue()) {
            logger.error("additional error: " + th.toString(), th);
        } else {
            logger.warn("summarized error (fullerrors=false): " + th.toString());
        }
        requestStopMotors();
    }
}
