package org.mule.runtime.core.internal.processor.strategy;

import java.util.Objects;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ReactorProcessingStrategyFactory.class */
public class ReactorProcessingStrategyFactory extends AbstractProcessingStrategyFactory {

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ReactorProcessingStrategyFactory$ReactorProcessingStrategy.class */
    static class ReactorProcessingStrategy extends AbstractProcessingStrategy implements Startable, Stoppable {
        private final Supplier<Scheduler> cpuLightSchedulerSupplier;
        private Scheduler cpuLightScheduler;

        public ReactorProcessingStrategy(Supplier<Scheduler> supplier) {
            this.cpuLightSchedulerSupplier = (Supplier) Objects.requireNonNull(supplier);
        }

        public void start() throws MuleException {
            this.cpuLightScheduler = this.cpuLightSchedulerSupplier.get();
        }

        public void stop() throws MuleException {
            if (this.cpuLightScheduler != null) {
                this.cpuLightScheduler.stop();
            }
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor reactiveProcessor) {
            return new StreamPerEventSink(reactiveProcessor, createOnEventConsumer());
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public ReactiveProcessor onPipeline(ReactiveProcessor reactiveProcessor) {
            return publisher -> {
                return Flux.from(publisher).publishOn(Schedulers.fromExecutorService(decorateScheduler(this.cpuLightScheduler))).transform(reactiveProcessor);
            };
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
            return reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_LITE_ASYNC ? publisher -> {
                return Flux.from(publisher).transform(reactiveProcessor).publishOn(Schedulers.fromExecutorService(decorateScheduler(this.cpuLightScheduler)));
            } : super.onProcessor(reactiveProcessor);
        }

        protected Scheduler getCpuLightScheduler() {
            return this.cpuLightScheduler;
        }
    }

    @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public ProcessingStrategy create(MuleContext muleContext, String str) {
        return new ReactorProcessingStrategy(() -> {
            return muleContext.getSchedulerService().cpuLightScheduler(createSchedulerConfig(muleContext, str, ReactiveProcessor.ProcessingType.CPU_LITE));
        });
    }

    @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return ReactorProcessingStrategy.class;
    }
}
