package io.nosqlbench.activitytype.diag;

import io.nosqlbench.engine.api.activityapi.core.BaseAsyncAction;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.CompletedOp;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.StartedOp;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.TrackedOp;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.op_output.StrideOutputConsumer;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import java.lang.Thread;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.LockSupport;
import java.util.function.LongFunction;
import java.util.function.LongToIntFunction;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/nosqlbench/activitytype/diag/AsyncDiagAction.class */
public class AsyncDiagAction extends BaseAsyncAction<DiagOpData, DiagActivity> implements Thread.UncaughtExceptionHandler, StrideOutputConsumer<DiagOpData> {
    private static final Logger logger = LoggerFactory.getLogger(AsyncDiagAction.class);
    private long lastUpdate;
    private long quantizedInterval;
    private long reportModulo;
    private int phasesPerCycle;
    private int completedPhase;
    private long erroroncycle;
    private long throwoncycle;
    private boolean logcycle;
    private RateLimiter diagRateLimiter;
    private LongToIntFunction resultFunc;
    private LongUnaryOperator delayFunc;
    private LinkedBlockingDeque<StartedOp<DiagOpData>> opQueue;
    private OpFinisher finisher;
    private boolean enableOutputProcessing;

    /* loaded from: input_file:io/nosqlbench/activitytype/diag/AsyncDiagAction$OpFinisher.class */
    private static class OpFinisher implements Runnable {
        final BlockingQueue<StartedOp<DiagOpData>> queue;
        private final AsyncDiagAction action;
        AsyncDiagAction mainContext;
        private volatile boolean running = true;
        private Thread thread = new Thread(this);
        private String name;

        public OpFinisher(String str, BlockingQueue<StartedOp<DiagOpData>> blockingQueue, AsyncDiagAction asyncDiagAction) {
            this.queue = blockingQueue;
            this.action = asyncDiagAction;
            this.name = str;
            this.thread.setName(str);
            this.thread.setUncaughtExceptionHandler(asyncDiagAction);
            this.thread.start();
        }

        public void requestStop() {
            this.running = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            AsyncDiagAction.logger.debug("stopping finisher thread for diagnostic action " + this.name);
            while (this.running) {
                try {
                    StartedOp<DiagOpData> take = this.queue.take();
                    DiagOpData diagOpData = (DiagOpData) take.getData();
                    long max = Math.max(0L, (take.getStartedAtNanos() + diagOpData.getSimulatedDelayNanos()) - System.nanoTime());
                    if (max >= 1000) {
                        LockSupport.parkNanos(max);
                    }
                    int backendExecuteOp = this.action.backendExecuteOp(take);
                    if (backendExecuteOp == 0) {
                        take.succeed(backendExecuteOp);
                    } else {
                        take.fail(backendExecuteOp);
                    }
                } catch (InterruptedException e) {
                }
            }
            AsyncDiagAction.logger.debug("stopping finisher thread for diagnostic action " + this.name);
        }
    }

    public AsyncDiagAction(DiagActivity diagActivity, int i) {
        super(diagActivity, i);
        this.erroroncycle = Long.MIN_VALUE;
        this.throwoncycle = Long.MIN_VALUE;
        this.diagRateLimiter = null;
        onActivityDefUpdate(diagActivity.getActivityDef());
    }

    public void requestStop() {
        super.requestStop();
    }

    private void updateReportTime() {
        ParameterMap params = ((DiagActivity) this.activity).getActivityDef().getParams();
        this.reportModulo = ((Long) params.getOptionalLong("modulo").orElse(10000000L)).longValue();
        this.lastUpdate = System.currentTimeMillis() - calculateOffset(this.slot, params);
        this.quantizedInterval = calculateInterval(params, ((DiagActivity) this.activity).getActivityDef().getThreads());
        Logger logger2 = logger;
        int i = this.slot;
        long j = this.quantizedInterval;
        long j2 = this.reportModulo;
        logger2.trace("updating report time for slot:" + i + ", def:" + params + " to " + j + ", and modulo " + logger2);
    }

    private long calculateOffset(long j, ParameterMap parameterMap) {
        return calculateInterval(parameterMap, ((DiagActivity) this.activity).getActivityDef().getThreads()) - (((Long) parameterMap.getOptionalLong("interval").orElse(1000L)).longValue() * j);
    }

    private long calculateInterval(ParameterMap parameterMap, int i) {
        long longValue = ((Long) parameterMap.getOptionalLong("interval").orElse(1000L)).longValue();
        if (longValue == 0) {
            return Long.MAX_VALUE;
        }
        return longValue * i;
    }

    public void init() {
        this.opQueue = new LinkedBlockingDeque<>();
        this.finisher = new OpFinisher(((DiagActivity) this.activity).getAlias() + "_finisher_" + this.slot, this.opQueue, this);
    }

    public void onActivityDefUpdate(ActivityDef activityDef) {
        super.onActivityDefUpdate(activityDef);
        ParameterMap params = activityDef.getParams();
        updateReportTime();
        this.delayFunc = ((DiagActivity) this.activity).getDelayFunc();
        this.resultFunc = ((DiagActivity) this.activity).getResultFunc();
        this.erroroncycle = ((Long) params.getOptionalLong("erroroncycle").orElse(Long.MIN_VALUE)).longValue();
        this.throwoncycle = ((Long) params.getOptionalLong("throwoncycle").orElse(Long.MIN_VALUE)).longValue();
        this.logcycle = ((Boolean) params.getOptionalBoolean("logcycle").orElse(false)).booleanValue();
        this.diagRateLimiter = ((DiagActivity) this.activity).getDiagRateLimiter();
        this.enableOutputProcessing = ((Boolean) params.getOptionalBoolean("enable_output_processing").orElse(false)).booleanValue();
    }

    public LongFunction<DiagOpData> getOpInitFunction() {
        return j -> {
            return new DiagOpData("a diag op");
        };
    }

    public void startOpCycle(TrackedOp<DiagOpData> trackedOp) {
        ((DiagOpData) trackedOp.getData()).log("starting at " + System.nanoTime());
        ((DiagOpData) trackedOp.getData()).setSimulatedDelayNanos(this.delayFunc.applyAsLong(trackedOp.getCycle()));
        this.opQueue.add(trackedOp.start());
    }

    private int backendExecuteOp(StartedOp<DiagOpData> startedOp) {
        long cycle = startedOp.getCycle();
        if (this.logcycle) {
            logger.trace("cycle " + cycle);
        }
        if (this.diagRateLimiter != null) {
            this.diagRateLimiter.maybeWaitForOp();
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.completedPhase >= this.phasesPerCycle) {
            this.completedPhase = 0;
        }
        if (currentTimeMillis - this.lastUpdate > this.quantizedInterval) {
            long j = (currentTimeMillis - this.lastUpdate) - this.quantizedInterval;
            Logger logger2 = logger;
            logger2.info("diag action interval, input=" + cycle + ", phase=" + logger2 + ", report delay=" + this.completedPhase + "ms");
            this.lastUpdate += this.quantizedInterval;
            ((DiagActivity) this.activity).delayHistogram.update(j);
        }
        if (cycle % this.reportModulo == 0) {
            Logger logger3 = logger;
            int i = this.completedPhase;
            logger3.info("diag action   modulo, input=" + cycle + ", phase=" + logger3);
        }
        this.completedPhase++;
        int applyAsInt = this.resultFunc.applyAsInt(cycle);
        if (this.erroroncycle == cycle) {
            ((DiagActivity) this.activity).getActivityController().stopActivityWithReasonAsync("Diag was requested to stop on cycle " + this.erroroncycle);
        }
        if (this.throwoncycle == cycle) {
            throw new RuntimeException("Diag was asked to throw an error on cycle " + this.throwoncycle);
        }
        return applyAsInt;
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        logger.error("Error on finisher thread: " + thread.getName() + ", error: " + th.getMessage());
        requestStop();
    }

    public void onStrideOutput(List<CompletedOp<DiagOpData>> list) {
        if (this.enableOutputProcessing) {
            logger.info("processing stride output for " + list.get(0).getCycle());
            long cycle = list.get(0).getCycle();
            long cycle2 = list.get(list.size() - 1).getCycle() + 1;
            String str = (String) ((DiagOpData) list.get(0).getData()).getDiagLog().stream().collect(Collectors.joining("\n"));
            ((DiagActivity) this.activity).getSequenceBlocker().awaitAndRun(cycle, cycle2, () -> {
                Logger logger2 = logger;
                logger2.info(" => " + cycle + " -> " + logger2 + ": " + cycle2);
            });
        }
    }
}
