package org.apache.flume.channel.file;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import javax.annotation.Nullable;
import org.apache.commons.collections.map.MultiValueMap;
import org.apache.flume.channel.file.LogFile;
import org.apache.flume.channel.file.TransactionEventRecord;
import org.apache.flume.channel.file.encryption.KeyProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:META-INF/bundled-dependencies/flume-file-channel-1.9.0.jar:org/apache/flume/channel/file/ReplayHandler.class */
public class ReplayHandler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ReplayHandler.class);
    private final FlumeEventQueue queue;
    private final long lastCheckpoint;
    private final KeyProvider encryptionKeyProvider;
    private final boolean fsyncPerTransaction;
    int readCount = 0;
    int putCount = 0;
    int takeCount = 0;
    int rollbackCount = 0;
    int commitCount = 0;
    int skipCount = 0;
    private final List<Long> pendingTakes = Lists.newArrayList();
    private final Map<Integer, LogFile.SequentialReader> readers = Maps.newHashMap();
    private final PriorityQueue<LogRecord> logRecordBuffer = new PriorityQueue<>();

    @VisibleForTesting
    public int getReadCount() {
        return this.readCount;
    }

    @VisibleForTesting
    public int getPutCount() {
        return this.putCount;
    }

    @VisibleForTesting
    public int getTakeCount() {
        return this.takeCount;
    }

    @VisibleForTesting
    public int getCommitCount() {
        return this.commitCount;
    }

    @VisibleForTesting
    public int getRollbackCount() {
        return this.rollbackCount;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplayHandler(FlumeEventQueue flumeEventQueue, @Nullable KeyProvider keyProvider, boolean z) {
        this.queue = flumeEventQueue;
        this.lastCheckpoint = flumeEventQueue.getLogWriteOrderID();
        this.encryptionKeyProvider = keyProvider;
        this.fsyncPerTransaction = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Finally extract failed */
    @Deprecated
    public void replayLogv1(List<File> list) throws Exception {
        int i = 0;
        int i2 = 0;
        MultiValueMap multiValueMap = new MultiValueMap();
        SetMultimap<Long, Long> deserializeInflightPuts = this.queue.deserializeInflightPuts();
        for (Long l : deserializeInflightPuts.keySet()) {
            Iterator<Long> it = deserializeInflightPuts.get((SetMultimap<Long, Long>) l).iterator();
            while (it.hasNext()) {
                multiValueMap.put(l, FlumeEventPointer.fromLong(it.next().longValue()));
            }
        }
        SetMultimap<Long, Long> deserializeInflightTakes = this.queue.deserializeInflightTakes();
        LOG.info("Starting replay of " + list);
        for (File file : list) {
            LOG.info("Replaying " + file);
            LogFile.SequentialReader sequentialReader = null;
            try {
                try {
                    sequentialReader = LogFileFactory.getSequentialReader(file, this.encryptionKeyProvider, this.fsyncPerTransaction);
                    sequentialReader.skipToLastCheckpointPosition(this.queue.getLogWriteOrderID());
                    int logFileID = sequentialReader.getLogFileID();
                    while (true) {
                        LogRecord next = sequentialReader.next();
                        if (next == null) {
                            break;
                        }
                        int offset = next.getOffset();
                        TransactionEventRecord event = next.getEvent();
                        short recordType = event.getRecordType();
                        long transactionID = event.getTransactionID();
                        this.readCount++;
                        if (event.getLogWriteOrderID() <= this.lastCheckpoint) {
                            this.skipCount++;
                        } else if (recordType == TransactionEventRecord.Type.PUT.get()) {
                            this.putCount++;
                            multiValueMap.put(Long.valueOf(transactionID), new FlumeEventPointer(logFileID, offset));
                        } else if (recordType == TransactionEventRecord.Type.TAKE.get()) {
                            this.takeCount++;
                            Take take = (Take) event;
                            multiValueMap.put(Long.valueOf(transactionID), new FlumeEventPointer(take.getFileID(), take.getOffset()));
                        } else if (recordType == TransactionEventRecord.Type.ROLLBACK.get()) {
                            this.rollbackCount++;
                            multiValueMap.remove(Long.valueOf(transactionID));
                        } else if (recordType == TransactionEventRecord.Type.COMMIT.get()) {
                            this.commitCount++;
                            Collection<FlumeEventPointer> collection = (Collection) multiValueMap.remove(Long.valueOf(transactionID));
                            if (((Commit) event).getType() == TransactionEventRecord.Type.TAKE.get() && deserializeInflightTakes.containsKey(Long.valueOf(transactionID))) {
                                if (collection == null) {
                                    collection = Sets.newHashSet();
                                }
                                Iterator<Long> it2 = deserializeInflightTakes.removeAll((Object) Long.valueOf(transactionID)).iterator();
                                while (it2.hasNext()) {
                                    collection.add(FlumeEventPointer.fromLong(it2.next().longValue()));
                                }
                            }
                            if (collection != null && collection.size() > 0) {
                                processCommit(((Commit) event).getType(), collection);
                                i2 += collection.size();
                            }
                        } else {
                            Preconditions.checkArgument(false, "Unknown record type: " + Integer.toHexString(recordType));
                        }
                    }
                    LOG.info("Replayed " + i2 + " from " + file);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("read: " + this.readCount + ", put: " + this.putCount + ", take: " + this.takeCount + ", rollback: " + this.rollbackCount + ", commit: " + this.commitCount + ", skipp: " + this.skipCount);
                    }
                    i += i2;
                    i2 = 0;
                    if (sequentialReader != null) {
                        sequentialReader.close();
                    }
                } catch (EOFException e) {
                    LOG.warn("Hit EOF on " + file);
                    i += i2;
                    i2 = 0;
                    if (sequentialReader != null) {
                        sequentialReader.close();
                    }
                }
            } catch (Throwable th) {
                int i3 = i + i2;
                if (sequentialReader != null) {
                    sequentialReader.close();
                }
                throw th;
            }
        }
        int i4 = 0;
        Iterator<Long> it3 = deserializeInflightTakes.keySet().iterator();
        while (it3.hasNext()) {
            Iterator<Long> it4 = deserializeInflightTakes.get((SetMultimap<Long, Long>) it3.next()).iterator();
            while (it4.hasNext()) {
                this.queue.addHead(FlumeEventPointer.fromLong(it4.next().longValue()));
                i4++;
            }
        }
        deserializeInflightTakes.clear();
        int i5 = i2 + i4;
        int size = this.pendingTakes.size();
        if (size > 0) {
            String str = "Pending takes " + size + " exist after the end of replay";
            if (LOG.isDebugEnabled()) {
                Iterator<Long> it5 = this.pendingTakes.iterator();
                while (it5.hasNext()) {
                    LOG.debug("Pending take " + FlumeEventPointer.fromLong(it5.next().longValue()));
                }
            } else {
                LOG.error(str + ". Duplicate messages will exist in destination.");
            }
        }
        LOG.info("Replayed " + i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void replayLog(List<File> list) throws Exception {
        int i = 0;
        MultiValueMap multiValueMap = new MultiValueMap();
        long j = this.lastCheckpoint;
        long j2 = this.lastCheckpoint;
        LOG.info("Starting replay of " + list);
        SetMultimap<Long, Long> deserializeInflightPuts = this.queue.deserializeInflightPuts();
        for (Long l : deserializeInflightPuts.keySet()) {
            Iterator<Long> it = deserializeInflightPuts.get((SetMultimap<Long, Long>) l).iterator();
            while (it.hasNext()) {
                multiValueMap.put(l, FlumeEventPointer.fromLong(it.next().longValue()));
            }
        }
        SetMultimap<Long, Long> deserializeInflightTakes = this.queue.deserializeInflightTakes();
        try {
            for (File file : list) {
                LOG.info("Replaying " + file);
                try {
                    LogFile.SequentialReader sequentialReader = LogFileFactory.getSequentialReader(file, this.encryptionKeyProvider, this.fsyncPerTransaction);
                    sequentialReader.skipToLastCheckpointPosition(this.queue.getLogWriteOrderID());
                    Preconditions.checkState(!this.readers.containsKey(Integer.valueOf(sequentialReader.getLogFileID())), "Readers " + this.readers + " already contains " + sequentialReader.getLogFileID());
                    this.readers.put(Integer.valueOf(sequentialReader.getLogFileID()), sequentialReader);
                    LogRecord next = sequentialReader.next();
                    if (next == null) {
                        this.readers.remove(Integer.valueOf(sequentialReader.getLogFileID()));
                        sequentialReader.close();
                    } else {
                        this.logRecordBuffer.add(next);
                    }
                } catch (EOFException e) {
                    LOG.warn("Ignoring " + file + " due to EOF", (Throwable) e);
                }
            }
            while (true) {
                LogRecord next2 = next();
                if (next2 == null) {
                    break;
                }
                int fileID = next2.getFileID();
                int offset = next2.getOffset();
                TransactionEventRecord event = next2.getEvent();
                short recordType = event.getRecordType();
                long transactionID = event.getTransactionID();
                j = Math.max(j, transactionID);
                j2 = Math.max(j2, event.getLogWriteOrderID());
                this.readCount++;
                if (this.readCount % 10000 == 0 && this.readCount > 0) {
                    LOG.info("read: " + this.readCount + ", put: " + this.putCount + ", take: " + this.takeCount + ", rollback: " + this.rollbackCount + ", commit: " + this.commitCount + ", skip: " + this.skipCount + ", eventCount:" + i);
                }
                if (event.getLogWriteOrderID() <= this.lastCheckpoint) {
                    this.skipCount++;
                } else if (recordType == TransactionEventRecord.Type.PUT.get()) {
                    this.putCount++;
                    multiValueMap.put(Long.valueOf(transactionID), new FlumeEventPointer(fileID, offset));
                } else if (recordType == TransactionEventRecord.Type.TAKE.get()) {
                    this.takeCount++;
                    Take take = (Take) event;
                    multiValueMap.put(Long.valueOf(transactionID), new FlumeEventPointer(take.getFileID(), take.getOffset()));
                } else if (recordType == TransactionEventRecord.Type.ROLLBACK.get()) {
                    this.rollbackCount++;
                    multiValueMap.remove(Long.valueOf(transactionID));
                } else if (recordType == TransactionEventRecord.Type.COMMIT.get()) {
                    this.commitCount++;
                    Collection<FlumeEventPointer> collection = (Collection) multiValueMap.remove(Long.valueOf(transactionID));
                    if (((Commit) event).getType() == TransactionEventRecord.Type.TAKE.get() && deserializeInflightTakes.containsKey(Long.valueOf(transactionID))) {
                        if (collection == null) {
                            collection = Sets.newHashSet();
                        }
                        Iterator<Long> it2 = deserializeInflightTakes.removeAll((Object) Long.valueOf(transactionID)).iterator();
                        while (it2.hasNext()) {
                            collection.add(FlumeEventPointer.fromLong(it2.next().longValue()));
                        }
                    }
                    if (collection != null && collection.size() > 0) {
                        processCommit(((Commit) event).getType(), collection);
                        i += collection.size();
                    }
                } else {
                    Preconditions.checkArgument(false, "Unknown record type: " + Integer.toHexString(recordType));
                }
            }
            LOG.info("read: " + this.readCount + ", put: " + this.putCount + ", take: " + this.takeCount + ", rollback: " + this.rollbackCount + ", commit: " + this.commitCount + ", skip: " + this.skipCount + ", eventCount:" + i);
            this.queue.replayComplete();
            TransactionIDOracle.setSeed(j);
            WriteOrderOracle.setSeed(j2);
            for (LogFile.SequentialReader sequentialReader2 : this.readers.values()) {
                if (sequentialReader2 != null) {
                    sequentialReader2.close();
                }
            }
            int i2 = 0;
            Iterator<Long> it3 = deserializeInflightTakes.keySet().iterator();
            while (it3.hasNext()) {
                Iterator<Long> it4 = deserializeInflightTakes.get((SetMultimap<Long, Long>) it3.next()).iterator();
                while (it4.hasNext()) {
                    this.queue.addHead(FlumeEventPointer.fromLong(it4.next().longValue()));
                    i2++;
                }
            }
            deserializeInflightTakes.clear();
            int i3 = i + i2;
            int size = this.pendingTakes.size();
            if (size > 0) {
                LOG.info("Pending takes " + size + " exist after the end of replay. Duplicate messages will exist in destination.");
            }
        } catch (Throwable th) {
            TransactionIDOracle.setSeed(j);
            WriteOrderOracle.setSeed(j2);
            for (LogFile.SequentialReader sequentialReader3 : this.readers.values()) {
                if (sequentialReader3 != null) {
                    sequentialReader3.close();
                }
            }
            throw th;
        }
    }

    private LogRecord next() throws IOException, CorruptEventException {
        LogRecord next;
        LogRecord poll = this.logRecordBuffer.poll();
        if (poll != null && (next = this.readers.get(Integer.valueOf(poll.getFileID())).next()) != null) {
            this.logRecordBuffer.add(next);
        }
        return poll;
    }

    private void processCommit(short s, Collection<FlumeEventPointer> collection) {
        if (s == TransactionEventRecord.Type.PUT.get()) {
            for (FlumeEventPointer flumeEventPointer : collection) {
                if (!this.queue.addTail(flumeEventPointer)) {
                    throw new IllegalStateException("Unable to add " + flumeEventPointer + ". Queue depth = " + this.queue.getSize() + ", Capacity = " + this.queue.getCapacity());
                }
                if (this.pendingTakes.remove(Long.valueOf(flumeEventPointer.toLong()))) {
                    Preconditions.checkState(this.queue.remove(flumeEventPointer), "Take was pending and pointer was successfully added to the queue but could not be removed: " + flumeEventPointer);
                }
            }
            return;
        }
        if (s != TransactionEventRecord.Type.TAKE.get()) {
            Preconditions.checkArgument(false, "Unknown record type: " + Integer.toHexString(s));
            return;
        }
        for (FlumeEventPointer flumeEventPointer2 : collection) {
            if (!this.queue.remove(flumeEventPointer2)) {
                this.pendingTakes.add(Long.valueOf(flumeEventPointer2.toLong()));
            }
        }
    }
}
