package com.datastax.oss.dsbulk.workflow.load;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Snapshot;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.shaded.guava.common.base.Stopwatch;
import com.datastax.oss.dsbulk.batcher.api.ReactiveStatementBatcher;
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodecFactory;
import com.datastax.oss.dsbulk.connectors.api.CommonConnectorFeature;
import com.datastax.oss.dsbulk.connectors.api.Connector;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.executor.api.result.EmptyWriteResult;
import com.datastax.oss.dsbulk.executor.api.result.WriteResult;
import com.datastax.oss.dsbulk.executor.api.writer.BulkWriter;
import com.datastax.oss.dsbulk.sampler.DataSizeSampler;
import com.datastax.oss.dsbulk.workflow.api.Workflow;
import com.datastax.oss.dsbulk.workflow.api.utils.DurationUtils;
import com.datastax.oss.dsbulk.workflow.api.utils.ThrowableUtils;
import com.datastax.oss.dsbulk.workflow.commons.log.LogManager;
import com.datastax.oss.dsbulk.workflow.commons.metrics.MetricsManager;
import com.datastax.oss.dsbulk.workflow.commons.schema.RecordMapper;
import com.datastax.oss.dsbulk.workflow.commons.settings.BatchSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.CodecSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.ConnectorSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.DriverSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.EngineSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.ExecutorSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.LogSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.MonitoringSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.SchemaGenerationStrategy;
import com.datastax.oss.dsbulk.workflow.commons.settings.SchemaSettings;
import com.datastax.oss.dsbulk.workflow.commons.settings.SettingsManager;
import com.datastax.oss.dsbulk.workflow.commons.utils.CloseableUtils;
import com.datastax.oss.dsbulk.workflow.commons.utils.ClusterInformationUtils;
import com.typesafe.config.Config;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:com/datastax/oss/dsbulk/workflow/load/LoadWorkflow.class */
public class LoadWorkflow implements Workflow {
    private static final Logger LOGGER = LoggerFactory.getLogger(LoadWorkflow.class);
    private static final int _1_KB = 1024;
    private static final int _10_KB = 10240;
    private final SettingsManager settingsManager;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private String executionId;
    private Connector connector;
    private MetricsManager metricsManager;
    private LogManager logManager;
    private EngineSettings engineSettings;
    private CqlSession session;
    private BulkWriter executor;
    private boolean batchingEnabled;
    private boolean dryRun;
    private int batchBufferSize;
    private Scheduler scheduler;
    private int numCores;
    private int readConcurrency;
    private int writeConcurrency;
    private boolean hasManyReaders;
    private Function<Record, BatchableStatement<?>> mapper;
    private Function<Publisher<BatchableStatement<?>>, Publisher<Statement<?>>> batcher;
    private Function<Flux<Record>, Flux<Record>> totalItemsMonitor;
    private Function<Flux<Record>, Flux<Record>> totalItemsCounter;
    private Function<Flux<Record>, Flux<Record>> failedRecordsMonitor;
    private Function<Flux<BatchableStatement<?>>, Flux<BatchableStatement<?>>> failedStatementsMonitor;
    private Function<Flux<Record>, Flux<Record>> failedRecordsHandler;
    private Function<Flux<BatchableStatement<?>>, Flux<BatchableStatement<?>>> unmappableStatementsHandler;
    private Function<Flux<Statement<?>>, Flux<Statement<?>>> batcherMonitor;
    private Function<Flux<Void>, Flux<Void>> terminationHandler;
    private Function<Flux<WriteResult>, Flux<WriteResult>> failedWritesHandler;
    private Function<Flux<WriteResult>, Flux<Void>> resultPositionsHndler;
    private Function<Flux<WriteResult>, Flux<WriteResult>> queryWarningsHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LoadWorkflow(Config config) {
        this.settingsManager = new SettingsManager(config);
    }

    public void init() throws Exception {
        this.settingsManager.init("LOAD", true, SchemaGenerationStrategy.MAP_AND_WRITE);
        this.executionId = this.settingsManager.getExecutionId();
        LogSettings logSettings = this.settingsManager.getLogSettings();
        logSettings.init();
        ConnectorSettings connectorSettings = this.settingsManager.getConnectorSettings();
        connectorSettings.init(logSettings.isSources());
        this.connector = connectorSettings.getConnector();
        this.connector.init();
        DriverSettings driverSettings = this.settingsManager.getDriverSettings();
        SchemaSettings schemaSettings = this.settingsManager.getSchemaSettings();
        BatchSettings batchSettings = this.settingsManager.getBatchSettings();
        ExecutorSettings executorSettings = this.settingsManager.getExecutorSettings();
        CodecSettings codecSettings = this.settingsManager.getCodecSettings();
        MonitoringSettings monitoringSettings = this.settingsManager.getMonitoringSettings();
        this.engineSettings = this.settingsManager.getEngineSettings();
        driverSettings.init(true);
        logSettings.logEffectiveSettings(this.settingsManager.getEffectiveBulkLoaderConfig(), driverSettings.getDriverConfig());
        monitoringSettings.init();
        codecSettings.init();
        executorSettings.init();
        this.engineSettings.init();
        ConvertingCodecFactory createCodecFactory = codecSettings.createCodecFactory(schemaSettings.isAllowExtraFields(), schemaSettings.isAllowMissingFields());
        this.session = driverSettings.newSession(this.executionId, createCodecFactory.getCodecRegistry());
        ClusterInformationUtils.printDebugInfoAboutCluster(this.session);
        schemaSettings.init(this.session, this.connector.supports(CommonConnectorFeature.INDEXED_RECORDS), this.connector.supports(CommonConnectorFeature.MAPPED_RECORDS));
        this.logManager = logSettings.newLogManager(this.session, true);
        this.logManager.init();
        RecordMapper createRecordMapper = schemaSettings.createRecordMapper(this.session, this.connector.getRecordMetadata(), createCodecFactory);
        Objects.requireNonNull(createRecordMapper);
        this.mapper = createRecordMapper::map;
        batchSettings.init(schemaSettings.isBatchQuery());
        this.batchingEnabled = batchSettings.isBatchingEnabled();
        this.batchBufferSize = batchSettings.getBufferSize();
        if (this.batchingEnabled) {
            ReactiveStatementBatcher newStatementBatcher = batchSettings.newStatementBatcher(this.session);
            Objects.requireNonNull(newStatementBatcher);
            this.batcher = newStatementBatcher::batchByGroupingKey;
        }
        this.metricsManager = monitoringSettings.newMetricsManager(true, this.batchingEnabled, this.logManager.getOperationDirectory(), logSettings.getVerbosity(), (MetricRegistry) this.session.getMetrics().map((v0) -> {
            return v0.getRegistry();
        }).orElse(new MetricRegistry()), this.session.getContext().getProtocolVersion(), this.session.getContext().getCodecRegistry(), schemaSettings.getRowType());
        this.metricsManager.init();
        this.executor = executorSettings.newWriteExecutor(this.session, this.metricsManager.getExecutionListener());
        this.dryRun = this.engineSettings.isDryRun();
        if (this.dryRun) {
            LOGGER.info("Dry-run mode enabled.");
        }
        this.closed.set(false);
        this.totalItemsMonitor = this.metricsManager.newTotalItemsMonitor();
        this.failedRecordsMonitor = this.metricsManager.newFailedItemsMonitor();
        this.failedStatementsMonitor = this.metricsManager.newFailedItemsMonitor();
        this.batcherMonitor = this.metricsManager.newBatcherMonitor();
        this.totalItemsCounter = this.logManager.newTotalItemsCounter();
        this.failedRecordsHandler = this.logManager.newFailedRecordsHandler();
        this.unmappableStatementsHandler = this.logManager.newUnmappableStatementsHandler();
        this.queryWarningsHandler = this.logManager.newQueryWarningsHandler();
        this.failedWritesHandler = this.logManager.newFailedWritesHandler();
        this.resultPositionsHndler = this.logManager.newResultPositionsHandler();
        this.terminationHandler = this.logManager.newTerminationHandler();
        this.numCores = Runtime.getRuntime().availableProcessors();
        if (this.connector.readConcurrency() < 1) {
            throw new IllegalArgumentException("Invalid read concurrency: " + this.connector.readConcurrency());
        }
        this.readConcurrency = this.connector.readConcurrency();
        this.hasManyReaders = this.readConcurrency >= Math.max(4, this.numCores / 4);
        LOGGER.debug("Using read concurrency: {}", Integer.valueOf(this.readConcurrency));
        this.writeConcurrency = this.engineSettings.getMaxConcurrentQueries().orElseGet(this::determineWriteConcurrency);
        LOGGER.debug("Using write concurrency: {} (user-supplied: {})", Integer.valueOf(this.writeConcurrency), Boolean.valueOf(this.engineSettings.getMaxConcurrentQueries().isPresent()));
    }

    public boolean execute() {
        LOGGER.debug("{} started.", this);
        this.metricsManager.start();
        Stopwatch createStarted = Stopwatch.createStarted();
        (this.hasManyReaders ? manyReaders() : fewReaders()).transform(this::executeStatements).transform(this.queryWarningsHandler).transform(this.failedWritesHandler).transform(this.resultPositionsHndler).transform(this.terminationHandler).blockLast();
        createStarted.stop();
        this.metricsManager.stop();
        Duration round = DurationUtils.round(createStarted.elapsed(), TimeUnit.SECONDS);
        String formatDuration = round.isZero() ? "less than one second" : DurationUtils.formatDuration(round);
        int totalErrors = this.logManager.getTotalErrors();
        if (totalErrors == 0) {
            LOGGER.info("{} completed successfully in {}.", this, formatDuration);
        } else {
            LOGGER.warn("{} completed with {} errors in {}.", new Object[]{this, Integer.valueOf(totalErrors), formatDuration});
        }
        return totalErrors == 0;
    }

    private Flux<Statement<?>> manyReaders() {
        this.scheduler = Schedulers.newParallel(Math.min(this.readConcurrency, this.numCores), new DefaultThreadFactory("workflow"));
        return Flux.defer(() -> {
            return this.connector.read();
        }).flatMap(publisher -> {
            return Flux.from(publisher).transform(this.totalItemsMonitor).transform(this.totalItemsCounter).transform(this.failedRecordsMonitor).transform(this.failedRecordsHandler).map(this.mapper).transform(this.failedStatementsMonitor).transform(this.unmappableStatementsHandler).transform(this::bufferAndBatch).subscribeOn(this.scheduler);
        }, this.readConcurrency);
    }

    private Flux<Statement<?>> fewReaders() {
        this.scheduler = Schedulers.newParallel(this.numCores, new DefaultThreadFactory("workflow"));
        return Flux.defer(() -> {
            return this.connector.read();
        }).flatMap(publisher -> {
            return Flux.from(publisher).window(this.batchingEnabled ? this.batchBufferSize : Queues.SMALL_BUFFER_SIZE);
        }, this.readConcurrency).flatMap(flux -> {
            return flux.transform(this.totalItemsMonitor).transform(this.totalItemsCounter).transform(this.failedRecordsMonitor).transform(this.failedRecordsHandler).map(this.mapper).transform(this.failedStatementsMonitor).transform(this.unmappableStatementsHandler).transform(this::batchBuffered).subscribeOn(this.scheduler);
        }, this.numCores);
    }

    private Flux<? extends Statement<?>> bufferAndBatch(Flux<BatchableStatement<?>> flux) {
        return this.batchingEnabled ? flux.window(this.batchBufferSize).flatMap(this.batcher).transform(this.batcherMonitor) : flux;
    }

    private Flux<? extends Statement<?>> batchBuffered(Flux<BatchableStatement<?>> flux) {
        return this.batchingEnabled ? flux.transform(this.batcher).transform(this.batcherMonitor) : flux;
    }

    private Flux<WriteResult> executeStatements(Flux<? extends Statement<?>> flux) {
        if (this.dryRun) {
            return flux.map(EmptyWriteResult::new);
        }
        BulkWriter bulkWriter = this.executor;
        Objects.requireNonNull(bulkWriter);
        return flux.flatMap(bulkWriter::writeReactive, this.writeConcurrency);
    }

    public void close() throws Exception {
        if (this.closed.compareAndSet(false, true)) {
            LOGGER.debug("{} closing.", this);
            Exception closeQuietly = CloseableUtils.closeQuietly(this.session, CloseableUtils.closeQuietly(this.executor, CloseableUtils.closeQuietly(this.scheduler, CloseableUtils.closeQuietly(this.connector, CloseableUtils.closeQuietly(this.logManager, CloseableUtils.closeQuietly(this.metricsManager, (Exception) null))))));
            if (this.metricsManager != null) {
                this.metricsManager.reportFinalMetrics();
            }
            if (this.logManager != null) {
                this.logManager.reportLastLocations();
            }
            LOGGER.debug("{} closed.", this);
            if (closeQuietly != null) {
                throw closeQuietly;
            }
        }
    }

    public String toString() {
        return this.executionId == null ? "Operation" : "Operation " + this.executionId;
    }

    private int determineWriteConcurrency() {
        if (this.dryRun) {
            return this.numCores;
        }
        double meanRowSize = (this.engineSettings.isDataSizeSamplingEnabled() && this.connector.supports(CommonConnectorFeature.DATA_SIZE_SAMPLING)) ? getMeanRowSize() : 1024.0d;
        int i = meanRowSize <= 512.0d ? this.hasManyReaders ? this.numCores * 64 : this.numCores * 16 : meanRowSize <= 1024.0d ? this.hasManyReaders ? this.numCores * 32 : this.numCores * 8 : meanRowSize <= 10240.0d ? this.numCores * 4 : this.numCores;
        if (!this.batchingEnabled && meanRowSize <= 1024.0d) {
            i *= 4;
        }
        return i;
    }

    private double getMeanRowSize() {
        double d;
        try {
            LOGGER.debug("Sampling data...");
            DriverContext context = this.session.getContext();
            Flux map = Flux.merge(this.connector.read()).map(this.mapper);
            Class<BoundStatement> cls = BoundStatement.class;
            Objects.requireNonNull(BoundStatement.class);
            Histogram sampleWrites = DataSizeSampler.sampleWrites(context, map.filter((v1) -> {
                return r2.isInstance(v1);
            }).take(1000L).toIterable());
            if (sampleWrites.getCount() < 1000) {
                LOGGER.debug("Data sample is too small: {}, discarding", Long.valueOf(sampleWrites.getCount()));
                d = 1024.0d;
            } else {
                Snapshot snapshot = sampleWrites.getSnapshot();
                d = snapshot.getMean();
                double stdDev = snapshot.getStdDev();
                double d2 = stdDev / d;
                LOGGER.debug("Average record size in bytes: {}, std dev: {}, coefficientOfVariation: {}", new Object[]{Double.valueOf(d), Double.valueOf(stdDev), Double.valueOf(d2)});
                if (d2 >= 1.0d) {
                    LOGGER.debug("Data sample is too spread out, discarding");
                    d = 1024.0d;
                }
            }
        } catch (Exception e) {
            LOGGER.debug("Sampling failed: {}", ThrowableUtils.getSanitizedErrorMessage(e));
            d = 1024.0d;
        }
        return d;
    }
}
