package com.datastax.oss.dsbulk.workflow.commons.log;

import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.pattern.ThrowableProxyConverter;
import ch.qos.logback.classic.spi.ThrowableProxy;
import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.connection.BusyConnectionException;
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException;
import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException;
import com.datastax.oss.driver.api.core.servererrors.ServerError;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.base.Joiner;
import com.datastax.oss.dsbulk.connectors.api.ErrorRecord;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.connectors.api.Resource;
import com.datastax.oss.dsbulk.executor.api.exception.BulkExecutionException;
import com.datastax.oss.dsbulk.executor.api.result.ReadResult;
import com.datastax.oss.dsbulk.executor.api.result.Result;
import com.datastax.oss.dsbulk.executor.api.result.WriteResult;
import com.datastax.oss.dsbulk.format.row.RowFormatter;
import com.datastax.oss.dsbulk.format.statement.StatementFormatVerbosity;
import com.datastax.oss.dsbulk.format.statement.StatementFormatter;
import com.datastax.oss.dsbulk.workflow.api.error.ErrorThreshold;
import com.datastax.oss.dsbulk.workflow.api.error.TooManyErrorsException;
import com.datastax.oss.dsbulk.workflow.commons.log.checkpoint.Checkpoint;
import com.datastax.oss.dsbulk.workflow.commons.log.checkpoint.CheckpointManager;
import com.datastax.oss.dsbulk.workflow.commons.log.checkpoint.ReplayStrategy;
import com.datastax.oss.dsbulk.workflow.commons.schema.InvalidMappingException;
import com.datastax.oss.dsbulk.workflow.commons.settings.LogSettings;
import com.datastax.oss.dsbulk.workflow.commons.statement.MappedStatement;
import com.datastax.oss.dsbulk.workflow.commons.statement.UnmappableStatement;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;

/* loaded from: input_file:com/datastax/oss/dsbulk/workflow/commons/log/LogManager.class */
public class LogManager implements AutoCloseable {
    private static final Logger LOGGER;
    private static final String MAPPING_ERRORS_FILE = "mapping-errors.log";
    private static final String CONNECTOR_ERRORS_FILE = "connector-errors.log";
    private static final String UNLOAD_ERRORS_FILE = "unload-errors.log";
    private static final String LOAD_ERRORS_FILE = "load-errors.log";
    private static final String CAS_ERRORS_FILE = "paxos-errors.log";
    private static final String CONNECTOR_BAD_FILE = "connector.bad";
    private static final String MAPPING_BAD_FILE = "mapping.bad";
    private static final String LOAD_BAD_FILE = "load.bad";
    private static final String CAS_BAD_FILE = "paxos.bad";
    private static final String CHECKPOINT_CSV = "checkpoint.csv";
    private final CqlSession session;
    private final Path operationDirectory;
    private final ErrorThreshold errorThreshold;
    private final ErrorThreshold queryWarningsThreshold;
    private final StatementFormatter statementFormatter;
    private final StatementFormatVerbosity statementFormatVerbosity;
    private final RowFormatter rowFormatter;
    private final boolean checkpointEnabled;
    private CodecRegistry codecRegistry;
    private ProtocolVersion protocolVersion;
    private StackTracePrinter stackTracePrinter;
    private final CheckpointManager initialCheckpointManager;
    private final ReplayStrategy replayStrategy;
    private FluxSink<ErrorRecord> failedRecordSink;
    private FluxSink<ErrorRecord> unmappableRecordSink;
    private FluxSink<UnmappableStatement> unmappableStatementSink;
    private FluxSink<WriteResult> failedWriteSink;
    private FluxSink<WriteResult> failedCASWriteSink;
    private FluxSink<ReadResult> failedReadSink;
    private UnicastProcessor<Void> uncaughtExceptionProcessor;
    private FluxSink<Void> uncaughtExceptionSink;
    private AtomicBoolean invalidMappingWarningDone;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicInteger errors = new AtomicInteger(0);
    private final LongAdder totalItems = new LongAdder();
    private final AtomicInteger queryWarnings = new AtomicInteger(0);
    private final AtomicBoolean queryWarningsEnabled = new AtomicBoolean(true);
    private final LoadingCache<Path, PrintWriter> openFiles = Caffeine.newBuilder().build(path -> {
        return new PrintWriter(Files.newBufferedWriter(path, StandardCharsets.UTF_8, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE));
    });
    private final Queue<CheckpointManager> checkpointManagers = new ConcurrentLinkedQueue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/oss/dsbulk/workflow/commons/log/LogManager$StackTracePrinter.class */
    public static class StackTracePrinter extends ThrowableProxyConverter {
        private StackTracePrinter() {
        }

        public void start() {
            setContext(new LoggerContext());
            super.start();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void printStackTrace(Throwable th, PrintWriter printWriter) {
            printWriter.print(throwableProxyToString(new ThrowableProxy(th)));
            printWriter.flush();
        }
    }

    public LogManager(CqlSession cqlSession, Path path, ErrorThreshold errorThreshold, ErrorThreshold errorThreshold2, StatementFormatter statementFormatter, StatementFormatVerbosity statementFormatVerbosity, RowFormatter rowFormatter, boolean z, @NonNull CheckpointManager checkpointManager, ReplayStrategy replayStrategy) {
        this.session = cqlSession;
        this.operationDirectory = path;
        this.errorThreshold = errorThreshold;
        this.queryWarningsThreshold = errorThreshold2;
        this.statementFormatter = statementFormatter;
        this.statementFormatVerbosity = statementFormatVerbosity;
        this.rowFormatter = rowFormatter;
        this.checkpointEnabled = z;
        this.initialCheckpointManager = checkpointManager;
        this.replayStrategy = replayStrategy;
    }

    public void init() {
        this.codecRegistry = this.session.getContext().getCodecRegistry();
        this.protocolVersion = this.session.getContext().getProtocolVersion();
        this.stackTracePrinter = new StackTracePrinter();
        this.stackTracePrinter.setOptionList(LogSettings.STACK_TRACE_PRINTER_OPTIONS);
        this.stackTracePrinter.start();
        this.failedRecordSink = newFailedRecordSink();
        this.unmappableRecordSink = newUnmappableRecordSink();
        this.unmappableStatementSink = newUnmappableStatementSink();
        this.failedWriteSink = newFailedWriteResultSink();
        this.failedCASWriteSink = newFailedCASWriteSink();
        this.failedReadSink = newFailedReadResultSink();
        this.uncaughtExceptionProcessor = UnicastProcessor.create();
        this.uncaughtExceptionSink = this.uncaughtExceptionProcessor.sink();
        this.invalidMappingWarningDone = new AtomicBoolean(false);
        Hooks.onErrorDropped(th -> {
            this.uncaughtExceptionSink.error(th);
        });
        Thread.setDefaultUncaughtExceptionHandler((thread, th2) -> {
            this.uncaughtExceptionSink.error(th2);
        });
        this.totalItems.add(this.initialCheckpointManager.getTotalItems(this.replayStrategy));
        this.errors.set((int) this.initialCheckpointManager.getRejectedItems(this.replayStrategy));
    }

    public Path getOperationDirectory() {
        return this.operationDirectory;
    }

    public long getTotalItems() {
        return this.totalItems.sum();
    }

    public int getTotalErrors() {
        return this.errors.get();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        this.failedRecordSink.complete();
        this.unmappableRecordSink.complete();
        this.unmappableStatementSink.complete();
        this.failedWriteSink.complete();
        this.failedCASWriteSink.complete();
        this.failedReadSink.complete();
        this.uncaughtExceptionSink.complete();
        this.stackTracePrinter.stop();
        this.openFiles.asMap().values().forEach(printWriter -> {
            printWriter.flush();
            printWriter.close();
        });
    }

    public void reportAvailableFiles() throws IOException {
        PathMatcher pathMatcher = FileSystems.getDefault().getPathMatcher("glob:*.bad");
        Set keySet = this.openFiles.asMap().keySet();
        Stream map = keySet.stream().map((v0) -> {
            return v0.getFileName();
        });
        Objects.requireNonNull(pathMatcher);
        List list = (List) map.filter(pathMatcher::matches).collect(Collectors.toList());
        if (!list.isEmpty()) {
            LOGGER.info("Rejected records can be found in the following file(s): {}", Joiner.on(", ").join(list));
        }
        List list2 = (List) keySet.stream().map((v0) -> {
            return v0.getFileName();
        }).filter(path -> {
            return !pathMatcher.matches(path);
        }).collect(Collectors.toList());
        if (!list2.isEmpty()) {
            LOGGER.info("Errors are detailed in the following file(s): {}", Joiner.on(", ").join(list2));
        }
        if (this.checkpointEnabled) {
            CheckpointManager mergeCheckpointManagers = mergeCheckpointManagers();
            if (mergeCheckpointManagers.isEmpty()) {
                return;
            }
            writeCheckpointFile(mergeCheckpointManagers);
            LOGGER.info("Checkpoints for the current operation were written to {}.", CHECKPOINT_CSV);
            LOGGER.info("To resume the current operation, re-run it with the same settings, and add the following command line flag:");
            LOGGER.info("--dsbulk.log.checkpoint.file={}/{}", this.operationDirectory, CHECKPOINT_CSV);
        }
    }

    @NonNull
    public Function<Flux<Void>, Flux<Void>> newTerminationHandler() {
        return flux -> {
            FluxSink<ErrorRecord> fluxSink = this.failedRecordSink;
            Objects.requireNonNull(fluxSink);
            Flux doOnTerminate = flux.doOnTerminate(fluxSink::complete);
            FluxSink<ErrorRecord> fluxSink2 = this.unmappableRecordSink;
            Objects.requireNonNull(fluxSink2);
            Flux doOnTerminate2 = doOnTerminate.doOnTerminate(fluxSink2::complete);
            FluxSink<UnmappableStatement> fluxSink3 = this.unmappableStatementSink;
            Objects.requireNonNull(fluxSink3);
            Flux doOnTerminate3 = doOnTerminate2.doOnTerminate(fluxSink3::complete);
            FluxSink<WriteResult> fluxSink4 = this.failedWriteSink;
            Objects.requireNonNull(fluxSink4);
            Flux doOnTerminate4 = doOnTerminate3.doOnTerminate(fluxSink4::complete);
            FluxSink<ReadResult> fluxSink5 = this.failedReadSink;
            Objects.requireNonNull(fluxSink5);
            Flux doOnTerminate5 = doOnTerminate4.doOnTerminate(fluxSink5::complete);
            FluxSink<WriteResult> fluxSink6 = this.failedCASWriteSink;
            Objects.requireNonNull(fluxSink6);
            Flux doOnTerminate6 = doOnTerminate5.doOnTerminate(fluxSink6::complete);
            FluxSink<Void> fluxSink7 = this.uncaughtExceptionSink;
            Objects.requireNonNull(fluxSink7);
            return doOnTerminate6.doOnTerminate(fluxSink7::complete).mergeWith(this.uncaughtExceptionProcessor);
        };
    }

    @NonNull
    public Function<Flux<BatchableStatement<?>>, Flux<BatchableStatement<?>>> newUnmappableStatementsHandler() {
        return flux -> {
            return flux.flatMap(batchableStatement -> {
                if (!(batchableStatement instanceof UnmappableStatement)) {
                    return Flux.just(batchableStatement);
                }
                try {
                    this.unmappableStatementSink.next((UnmappableStatement) batchableStatement);
                    return maybeTriggerOnError(null, this.errors.incrementAndGet());
                } catch (Exception e) {
                    return Flux.error(e);
                }
            }, 1, 1);
        };
    }

    @NonNull
    public Function<Flux<Record>, Flux<Record>> newFailedRecordsHandler() {
        return flux -> {
            return flux.flatMap(record -> {
                if (!(record instanceof ErrorRecord)) {
                    return Flux.just(record);
                }
                try {
                    this.failedRecordSink.next((ErrorRecord) record);
                    return maybeTriggerOnError(null, this.errors.incrementAndGet());
                } catch (Exception e) {
                    return Flux.error(e);
                }
            }, 1, 1);
        };
    }

    @NonNull
    public Function<Flux<Record>, Flux<Record>> newUnmappableRecordsHandler() {
        return flux -> {
            return flux.flatMap(record -> {
                if (!(record instanceof ErrorRecord)) {
                    return Flux.just(record);
                }
                try {
                    this.unmappableRecordSink.next((ErrorRecord) record);
                    return maybeTriggerOnError(null, this.errors.incrementAndGet());
                } catch (Exception e) {
                    return Flux.error(e);
                }
            }, 1, 1);
        };
    }

    @NonNull
    public Function<Flux<WriteResult>, Flux<WriteResult>> newFailedWritesHandler() {
        return flux -> {
            return flux.flatMap(writeResult -> {
                try {
                    if (writeResult.isSuccess()) {
                        if (writeResult.wasApplied()) {
                            return Flux.just(writeResult);
                        }
                        this.failedCASWriteSink.next(writeResult);
                        return maybeTriggerOnError(null, this.errors.addAndGet(writeResult.getBatchSize()));
                    }
                    this.failedWriteSink.next(writeResult);
                    if ($assertionsDisabled || writeResult.getError().isPresent()) {
                        return maybeTriggerOnError(((BulkExecutionException) writeResult.getError().get()).getCause(), this.errors.addAndGet(writeResult.getBatchSize()));
                    }
                    throw new AssertionError();
                } catch (Exception e) {
                    return Flux.error(e);
                }
            }, 1, 1);
        };
    }

    @NonNull
    public Function<Flux<ReadResult>, Flux<ReadResult>> newFailedReadsHandler() {
        return flux -> {
            return flux.flatMap(readResult -> {
                if (readResult.isSuccess()) {
                    return Flux.just(readResult);
                }
                try {
                    this.failedReadSink.next(readResult);
                    if ($assertionsDisabled || readResult.getError().isPresent()) {
                        return maybeTriggerOnError(((BulkExecutionException) readResult.getError().get()).getCause(), this.errors.incrementAndGet());
                    }
                    throw new AssertionError();
                } catch (Exception e) {
                    return Flux.error(e);
                }
            }, 1, 1);
        };
    }

    public <T extends Result> Function<Flux<T>, Flux<T>> newQueryWarningsHandler() {
        return flux -> {
            return flux.doOnNext(result -> {
                if (this.queryWarningsEnabled.get()) {
                    result.getExecutionInfo().ifPresent(this::maybeLogQueryWarnings);
                }
            });
        };
    }

    public Function<Flux<WriteResult>, Flux<Void>> newSuccessfulWritesHandler() {
        return flux -> {
            Flux transform = flux.map((v0) -> {
                return v0.getStatement();
            }).transform(this::extractRecordFromMappedStatement);
            if (this.checkpointEnabled) {
                transform = transform.transform(flux -> {
                    return recordCheckpoint(flux, true);
                });
            }
            return transform.then().flux();
        };
    }

    public Function<Flux<ReadResult>, Flux<Void>> newSuccessfulReadsHandler() {
        return flux -> {
            Flux flux = flux;
            if (this.checkpointEnabled) {
                flux = flux.transform(this::readResultCheckpoint);
            }
            return flux.then().flux();
        };
    }

    public Function<Flux<Record>, Flux<Void>> newSuccessfulRecordsHandler() {
        return flux -> {
            Flux flux = flux;
            if (this.checkpointEnabled) {
                flux = flux.transform(flux2 -> {
                    return recordCheckpoint(flux2, true);
                });
            }
            return flux.then().flux();
        };
    }

    public <T> Function<Flux<T>, Flux<T>> newTotalItemsCounter() {
        return flux -> {
            return flux.doOnNext(obj -> {
                this.totalItems.increment();
            });
        };
    }

    public Function<Flux<Resource>, Flux<Flux<Record>>> newConnectorCheckpointHandler() {
        return !this.checkpointEnabled ? flux -> {
            return flux.map(resource -> {
                return Flux.from(resource.read());
            });
        } : flux2 -> {
            return flux2.map(resource -> {
                Checkpoint checkpoint = this.initialCheckpointManager.getCheckpoint(resource.getURI());
                if (this.replayStrategy.isComplete(checkpoint)) {
                    return Flux.empty();
                }
                this.replayStrategy.reset(checkpoint);
                return Flux.from(resource.read()).doOnComplete(() -> {
                    checkpoint.setComplete(true);
                }).filter(record -> {
                    return this.replayStrategy.shouldReplay(checkpoint, record.getPosition());
                }).doOnNext(record2 -> {
                    checkpoint.incrementProduced();
                });
            });
        };
    }

    public Function<Flux<RangeReadResource>, Flux<Flux<ReadResult>>> newRangeReadCheckpointHandler() {
        return !this.checkpointEnabled ? flux -> {
            return flux.map(rangeReadResource -> {
                return Flux.from(rangeReadResource.read());
            });
        } : flux2 -> {
            return flux2.map(rangeReadResource -> {
                Checkpoint checkpoint = this.initialCheckpointManager.getCheckpoint(rangeReadResource.getURI());
                if (this.replayStrategy.isComplete(checkpoint)) {
                    return Flux.empty();
                }
                this.replayStrategy.reset(checkpoint);
                AtomicBoolean atomicBoolean = new AtomicBoolean();
                return Flux.from(rangeReadResource.read()).doOnComplete(() -> {
                    checkpoint.setComplete(!atomicBoolean.get());
                }).filter(readResult -> {
                    return this.replayStrategy.shouldReplay(checkpoint, readResult.getPosition());
                }).doOnNext(readResult2 -> {
                    if (readResult2.isSuccess()) {
                        checkpoint.incrementProduced();
                    } else {
                        atomicBoolean.set(true);
                    }
                });
            });
        };
    }

    @NonNull
    private Flux<Record> extractRecordFromMappedStatement(Flux<? extends Statement<?>> flux) {
        return flux.flatMap(statement -> {
            return statement instanceof BatchStatement ? Flux.fromIterable((BatchStatement) statement) : Flux.just(statement);
        }).cast(MappedStatement.class).map((v0) -> {
            return v0.getRecord();
        });
    }

    @NonNull
    private FluxSink<ErrorRecord> newFailedRecordSink() {
        UnicastProcessor create = UnicastProcessor.create();
        Flux flatMap = create.flatMap(this::appendFailedRecordToDebugFile).flatMap(errorRecord -> {
            return appendToBadFile(errorRecord, CONNECTOR_BAD_FILE);
        });
        if (this.checkpointEnabled) {
            flatMap = flatMap.transform(flux -> {
                return recordCheckpoint(flux, false);
            });
        }
        flatMap.subscribe(record -> {
        }, this::onSinkError);
        return create.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    @NonNull
    private FluxSink<ErrorRecord> newUnmappableRecordSink() {
        UnicastProcessor create = UnicastProcessor.create();
        Flux flatMap = create.flatMap(this::appendUnmappableReadResultToDebugFile).flatMap(errorRecord -> {
            return appendToBadFile(errorRecord, MAPPING_BAD_FILE);
        });
        if (this.checkpointEnabled) {
            flatMap = flatMap.transform(flux -> {
                return recordCheckpoint(flux, false);
            });
        }
        flatMap.subscribe(record -> {
        }, this::onSinkError);
        return create.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    @NonNull
    private FluxSink<UnmappableStatement> newUnmappableStatementSink() {
        UnicastProcessor create = UnicastProcessor.create();
        Flux flatMap = create.doOnNext(this::maybeWarnInvalidMapping).flatMap(this::appendUnmappableStatementToDebugFile).transform(this::extractRecordFromMappedStatement).flatMap(record -> {
            return appendToBadFile(record, MAPPING_BAD_FILE);
        });
        if (this.checkpointEnabled) {
            flatMap = flatMap.transform(flux -> {
                return recordCheckpoint(flux, false);
            });
        }
        flatMap.subscribe(record2 -> {
        }, this::onSinkError);
        return create.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    @NonNull
    private FluxSink<WriteResult> newFailedWriteResultSink() {
        UnicastProcessor create = UnicastProcessor.create();
        Flux flatMap = create.flatMap(this::appendFailedWriteResultToDebugFile).map((v0) -> {
            return v0.getStatement();
        }).transform(this::extractRecordFromMappedStatement).flatMap(record -> {
            return appendToBadFile(record, LOAD_BAD_FILE);
        });
        if (this.checkpointEnabled) {
            flatMap = flatMap.transform(flux -> {
                return recordCheckpoint(flux, false);
            });
        }
        flatMap.subscribe(record2 -> {
        }, this::onSinkError);
        return create.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    @NonNull
    private FluxSink<WriteResult> newFailedCASWriteSink() {
        UnicastProcessor create = UnicastProcessor.create();
        Flux flatMap = create.flatMap(this::appendFailedCASWriteResultToDebugFile).map((v0) -> {
            return v0.getStatement();
        }).transform(this::extractRecordFromMappedStatement).flatMap(record -> {
            return appendToBadFile(record, CAS_BAD_FILE);
        });
        if (this.checkpointEnabled) {
            flatMap = flatMap.transform(flux -> {
                return recordCheckpoint(flux, false);
            });
        }
        flatMap.subscribe(record2 -> {
        }, this::onSinkError);
        return create.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    @NonNull
    private FluxSink<ReadResult> newFailedReadResultSink() {
        UnicastProcessor create = UnicastProcessor.create();
        create.flatMap(this::appendFailedReadResultToDebugFile).subscribe(readResult -> {
        }, this::onSinkError);
        return create.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    @NonNull
    private Flux<Record> recordCheckpoint(Flux<Record> flux, boolean z) {
        return flux.transformDeferredContextual((flux2, contextView) -> {
            CheckpointManager checkpointManager = (CheckpointManager) contextView.get(CheckpointManager.class);
            return flux2.doOnNext(record -> {
                checkpointManager.update(record.getResource(), record.getPosition(), z);
            });
        }).contextWrite(context -> {
            CheckpointManager checkpointManager = new CheckpointManager();
            this.checkpointManagers.add(checkpointManager);
            return context.put(CheckpointManager.class, checkpointManager);
        });
    }

    @NonNull
    private Flux<ReadResult> readResultCheckpoint(Flux<ReadResult> flux) {
        return flux.transformDeferredContextual((flux2, contextView) -> {
            CheckpointManager checkpointManager = (CheckpointManager) contextView.get(CheckpointManager.class);
            return flux2.doOnNext(readResult -> {
                checkpointManager.update(readResult.getStatement().getResource(), readResult.getPosition(), readResult.isSuccess());
            });
        }).contextWrite(context -> {
            CheckpointManager checkpointManager = new CheckpointManager();
            this.checkpointManagers.add(checkpointManager);
            return context.put(CheckpointManager.class, checkpointManager);
        });
    }

    private Mono<Record> appendToBadFile(Record record, String str) {
        return Mono.just(record).handle((record2, synchronousSink) -> {
            try {
                doAppendToBadFile(record2, str);
                synchronousSink.next(record2);
            } catch (Exception e) {
                synchronousSink.error(e);
            }
        });
    }

    private void doAppendToBadFile(Record record, String str) {
        Object source = record.getSource();
        if (source != null) {
            PrintWriter printWriter = (PrintWriter) this.openFiles.get(this.operationDirectory.resolve(str));
            if (!$assertionsDisabled && printWriter == null) {
                throw new AssertionError();
            }
            if (source instanceof ReadResult) {
                ((ReadResult) source).getRow().ifPresent(row -> {
                    LogManagerUtils.printAndMaybeAddNewLine(LogManagerUtils.formatSingleLine(row.getFormattedContents()), printWriter);
                });
            } else {
                LogManagerUtils.printAndMaybeAddNewLine(source.toString(), printWriter);
            }
            printWriter.flush();
        }
    }

    private Mono<WriteResult> appendFailedWriteResultToDebugFile(WriteResult writeResult) {
        return appendStatement(writeResult, LOAD_ERRORS_FILE);
    }

    private Mono<WriteResult> appendFailedCASWriteResultToDebugFile(WriteResult writeResult) {
        return appendStatement(writeResult, CAS_ERRORS_FILE);
    }

    private Mono<ReadResult> appendFailedReadResultToDebugFile(ReadResult readResult) {
        return appendStatement(readResult, UNLOAD_ERRORS_FILE);
    }

    private <R extends Result> Mono<R> appendStatement(R r, String str) {
        return Mono.just(r).handle((result, synchronousSink) -> {
            try {
                doAppendStatement(result, str, true);
                synchronousSink.next(result);
            } catch (Exception e) {
                synchronousSink.error(e);
            }
        });
    }

    private <R extends Result> void doAppendStatement(R r, String str, boolean z) {
        PrintWriter printWriter = (PrintWriter) this.openFiles.get(this.operationDirectory.resolve(str));
        if (!$assertionsDisabled && printWriter == null) {
            throw new AssertionError();
        }
        printWriter.print("Statement: ");
        LogManagerUtils.printAndMaybeAddNewLine(this.statementFormatter.format(r.getStatement(), this.statementFormatVerbosity, this.protocolVersion, this.codecRegistry), printWriter);
        if (r instanceof WriteResult) {
            WriteResult writeResult = (WriteResult) r;
            if (writeResult.isSuccess() && !writeResult.wasApplied()) {
                printWriter.println("Failed conditional updates: ");
                writeResult.getFailedWrites().forEach(row -> {
                    LogManagerUtils.printAndMaybeAddNewLine(this.rowFormatter.format(row, this.protocolVersion, this.codecRegistry), printWriter);
                });
            }
        }
        if (r.getError().isPresent()) {
            this.stackTracePrinter.printStackTrace((Throwable) r.getError().get(), printWriter);
        }
        if (z) {
            printWriter.println();
        }
        printWriter.flush();
    }

    private Mono<UnmappableStatement> appendUnmappableStatementToDebugFile(UnmappableStatement unmappableStatement) {
        return Mono.just(unmappableStatement).handle((unmappableStatement2, synchronousSink) -> {
            try {
                doAppendUnmappableStatementToDebugFile(unmappableStatement2);
                synchronousSink.next(unmappableStatement2);
            } catch (Exception e) {
                synchronousSink.error(e);
            }
        });
    }

    private void doAppendUnmappableStatementToDebugFile(UnmappableStatement unmappableStatement) {
        PrintWriter printWriter = (PrintWriter) this.openFiles.get(this.operationDirectory.resolve(MAPPING_ERRORS_FILE));
        if (!$assertionsDisabled && printWriter == null) {
            throw new AssertionError();
        }
        Record record = unmappableStatement.getRecord();
        appendResourceAndPosition(printWriter, record);
        if (record.getSource() != null) {
            printWriter.println("Source: " + LogManagerUtils.formatSource(record));
        }
        this.stackTracePrinter.printStackTrace(unmappableStatement.getError(), printWriter);
        printWriter.println();
        printWriter.flush();
    }

    private Mono<ErrorRecord> appendUnmappableReadResultToDebugFile(ErrorRecord errorRecord) {
        return Mono.just(errorRecord).handle((errorRecord2, synchronousSink) -> {
            try {
                doAppendUnmappableReadResultToDebugFile(errorRecord2);
                synchronousSink.next(errorRecord2);
            } catch (Exception e) {
                synchronousSink.error(e);
            }
        });
    }

    private void doAppendUnmappableReadResultToDebugFile(ErrorRecord errorRecord) {
        PrintWriter printWriter = (PrintWriter) this.openFiles.get(this.operationDirectory.resolve(MAPPING_ERRORS_FILE));
        if (!$assertionsDisabled && printWriter == null) {
            throw new AssertionError();
        }
        appendResourceAndPosition(printWriter, errorRecord);
        if (errorRecord.getSource() instanceof ReadResult) {
            appendReadResult((ReadResult) errorRecord.getSource(), MAPPING_ERRORS_FILE, printWriter);
        }
        this.stackTracePrinter.printStackTrace(errorRecord.getError(), printWriter);
        printWriter.println();
        printWriter.flush();
    }

    private Mono<ErrorRecord> appendFailedRecordToDebugFile(ErrorRecord errorRecord) {
        return Mono.just(errorRecord).handle((errorRecord2, synchronousSink) -> {
            try {
                doAppendFailedRecordToDebugFile(errorRecord);
                synchronousSink.next(errorRecord2);
            } catch (Exception e) {
                synchronousSink.error(e);
            }
        });
    }

    private void doAppendFailedRecordToDebugFile(ErrorRecord errorRecord) {
        PrintWriter printWriter = (PrintWriter) this.openFiles.get(this.operationDirectory.resolve(CONNECTOR_ERRORS_FILE));
        if (!$assertionsDisabled && printWriter == null) {
            throw new AssertionError();
        }
        appendResourceAndPosition(printWriter, errorRecord);
        if (errorRecord.getSource() instanceof ReadResult) {
            appendReadResult((ReadResult) errorRecord.getSource(), CONNECTOR_ERRORS_FILE, printWriter);
        } else if (errorRecord.getSource() != null) {
            printWriter.println("Source: " + LogManagerUtils.formatSource(errorRecord));
        }
        this.stackTracePrinter.printStackTrace(errorRecord.getError(), printWriter);
        printWriter.println();
        printWriter.flush();
    }

    private void appendReadResult(ReadResult readResult, String str, PrintWriter printWriter) {
        doAppendStatement(readResult, str, false);
        readResult.getRow().ifPresent(row -> {
            printWriter.print("Row: ");
            LogManagerUtils.printAndMaybeAddNewLine(this.rowFormatter.format(row, this.protocolVersion, this.codecRegistry), printWriter);
        });
    }

    private void appendResourceAndPosition(PrintWriter printWriter, Record record) {
        printWriter.println("Resource: " + record.getResource());
        printWriter.println("Position: " + record.getPosition());
    }

    @VisibleForTesting
    void writeCheckpointFile(CheckpointManager checkpointManager) throws IOException {
        PrintWriter printWriter = new PrintWriter(Files.newBufferedWriter(this.operationDirectory.resolve(CHECKPOINT_CSV), StandardCharsets.UTF_8, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE));
        Throwable th = null;
        try {
            try {
                checkpointManager.printCsv(printWriter);
                printWriter.flush();
                if (0 == 0) {
                    printWriter.close();
                    return;
                }
                try {
                    printWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (th != null) {
                try {
                    printWriter.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            } else {
                printWriter.close();
            }
            throw th4;
        }
    }

    @VisibleForTesting
    CheckpointManager mergeCheckpointManagers() {
        CheckpointManager checkpointManager = new CheckpointManager(new TreeMap());
        checkpointManager.merge(this.initialCheckpointManager);
        Iterator<CheckpointManager> it = this.checkpointManagers.iterator();
        while (it.hasNext()) {
            checkpointManager.merge(it.next());
        }
        return checkpointManager;
    }

    private <T> Flux<T> maybeTriggerOnError(@Nullable Throwable th, int i) {
        return (th == null || !isUnrecoverable(th)) ? this.errorThreshold.checkThresholdExceeded((long) i, this.totalItems) ? Flux.error(new TooManyErrorsException(this.errorThreshold)) : Flux.empty() : Flux.error(th);
    }

    private void maybeWarnInvalidMapping(UnmappableStatement unmappableStatement) {
        if ((unmappableStatement.getError() instanceof InvalidMappingException) && this.invalidMappingWarningDone.compareAndSet(false, true)) {
            LOGGER.warn("At least 1 record does not match the provided schema.mapping or schema.query. Please check that the connector configuration and the schema configuration are correct.");
        }
    }

    private void maybeLogQueryWarnings(ExecutionInfo executionInfo) {
        for (String str : executionInfo.getWarnings()) {
            if (this.queryWarningsThreshold.checkThresholdExceeded(this.queryWarnings.incrementAndGet(), this.totalItems)) {
                this.queryWarningsEnabled.set(false);
                LOGGER.warn("The maximum number of logged query warnings has been exceeded ({}); subsequent warnings will not be logged.", this.queryWarningsThreshold.thresholdAsString());
                return;
            }
            LOGGER.warn("Query generated server-side warning: " + str);
        }
    }

    private void onSinkError(Throwable th) {
        LOGGER.error("Error while writing to log files, aborting", th);
        this.uncaughtExceptionSink.error(th);
    }

    private static boolean isUnrecoverable(Throwable th) {
        return th instanceof AllNodesFailedException ? ((AllNodesFailedException) th).getAllErrors().values().stream().flatMap((v0) -> {
            return v0.stream();
        }).anyMatch(LogManager::isUnrecoverable) : ((th instanceof ServerError) || (th instanceof QueryExecutionException) || (th instanceof InvalidQueryException) || (th instanceof DriverTimeoutException) || (th instanceof RequestThrottlingException) || (th instanceof FrameTooLongException) || (th instanceof BusyConnectionException)) ? false : true;
    }

    static {
        $assertionsDisabled = !LogManager.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(LogManager.class);
    }
}
