package org.apache.accumulo.master.replication;

import com.google.common.base.Stopwatch;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.class */
public class RemoveCompleteReplicationRecords implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(RemoveCompleteReplicationRecords.class);
    private Connector conn;

    public RemoveCompleteReplicationRecords(Connector connector) {
        this.conn = connector;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            BatchScanner batchScanner = ReplicationTable.getBatchScanner(this.conn, 4);
            BatchWriter batchWriter = ReplicationTable.getBatchWriter(this.conn);
            if (batchScanner == null || batchWriter == null) {
                throw new AssertionError("Inconceivable; an exception should have been thrown, but 'bs' or 'bw' was null instead");
            }
            batchScanner.setRanges(Collections.singleton(new Range()));
            IteratorSetting iteratorSetting = new IteratorSetting(50, WholeRowIterator.class);
            ReplicationSchema.StatusSection.limit(batchScanner);
            ReplicationSchema.WorkSection.limit(batchScanner);
            batchScanner.addScanIterator(iteratorSetting);
            Stopwatch stopwatch = new Stopwatch();
            try {
                stopwatch.start();
                long removeCompleteRecords = removeCompleteRecords(this.conn, batchScanner, batchWriter);
                if (null != batchScanner) {
                    batchScanner.close();
                }
                if (null != batchWriter) {
                    try {
                        batchWriter.close();
                    } catch (MutationsRejectedException e) {
                        log.error("Error writing mutations to {}, will retry", "accumulo.replication", e);
                    }
                }
                stopwatch.stop();
                log.info("Removed {} complete replication entries from the table {}", Long.valueOf(removeCompleteRecords), "accumulo.replication");
            } catch (Throwable th) {
                if (null != batchScanner) {
                    batchScanner.close();
                }
                if (null != batchWriter) {
                    try {
                        batchWriter.close();
                    } catch (MutationsRejectedException e2) {
                        log.error("Error writing mutations to {}, will retry", "accumulo.replication", e2);
                    }
                }
                stopwatch.stop();
                throw th;
            }
        } catch (ReplicationTableOfflineException e3) {
            log.debug("Not attempting to remove complete replication records as the table ({}) isn't yet online", "accumulo.replication");
        }
    }

    protected long removeCompleteRecords(Connector connector, BatchScanner batchScanner, BatchWriter batchWriter) {
        Text text = new Text();
        Text text2 = new Text();
        Text text3 = new Text();
        long j = 0;
        Iterator it = batchScanner.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            try {
                SortedMap<Key, Value> decodeRow = WholeRowIterator.decodeRow((Key) entry.getKey(), (Value) entry.getValue());
                ((Key) entry.getKey()).getRow(text);
                j += removeRowIfNecessary(batchWriter, decodeRow, text, text2, text3);
            } catch (IOException e) {
                log.error("Could not deserialize {} with WholeRowIterator", ((Key) entry.getKey()).getRow(), e);
            }
        }
        return j;
    }

    protected long removeRowIfNecessary(BatchWriter batchWriter, SortedMap<Key, Value> sortedMap, Text text, Text text2, Text text3) {
        Replication.Status parseFrom;
        String sourceTableId;
        long j = 0;
        if (sortedMap.isEmpty()) {
            return 0L;
        }
        Mutation mutation = new Mutation(text);
        HashMap hashMap = new HashMap();
        for (Map.Entry<Key, Value> entry : sortedMap.entrySet()) {
            try {
                parseFrom = Replication.Status.parseFrom(entry.getValue().get());
            } catch (InvalidProtocolBufferException e) {
                log.error("Encountered unparsable protobuf for key: {}", entry.getKey().toStringNoTruncate());
            }
            if (!StatusUtil.isSafeForRemoval(parseFrom)) {
                return 0L;
            }
            Key key = entry.getKey();
            key.getColumnFamily(text2);
            key.getColumnQualifier(text3);
            log.debug("Removing {} {}:{} from replication table", new Object[]{text, text2, text3});
            mutation.putDelete(text2, text3);
            if (ReplicationSchema.StatusSection.NAME.equals(text2)) {
                sourceTableId = text3.toString();
            } else {
                if (!ReplicationSchema.WorkSection.NAME.equals(text2)) {
                    throw new RuntimeException("Got unexpected column");
                }
                sourceTableId = ReplicationTarget.from(text3).getSourceTableId();
            }
            if (parseFrom.hasCreatedTime()) {
                Long l = (Long) hashMap.get(sourceTableId);
                if (null == l) {
                    hashMap.put(sourceTableId, Long.valueOf(parseFrom.getCreatedTime()));
                } else if (l.longValue() != parseFrom.getCreatedTime()) {
                    log.warn("Found multiple values for timeClosed for {}: {} and {}", new Object[]{text, l, Long.valueOf(parseFrom.getCreatedTime())});
                }
            }
            j++;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(mutation);
        for (Map.Entry entry2 : hashMap.entrySet()) {
            log.info("Removing order mutation for table {} at {} for {}", new Object[]{entry2.getKey(), entry2.getValue(), text.toString()});
            Mutation createMutation = ReplicationSchema.OrderSection.createMutation(text.toString(), ((Long) entry2.getValue()).longValue());
            createMutation.putDelete(ReplicationSchema.OrderSection.NAME, new Text((String) entry2.getKey()));
            arrayList.add(createMutation);
        }
        try {
            batchWriter.addMutations(arrayList);
            batchWriter.flush();
            return j;
        } catch (MutationsRejectedException e2) {
            log.error("Could not submit mutation to remove columns for {} in replication table", text, e2);
            return 0L;
        }
    }
}
