package org.apache.spark.streaming.receiver;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.internal.Logging;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.storage.StreamBlockId;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle;
import org.apache.spark.streaming.util.WriteAheadLogUtils$;
import org.apache.spark.util.Clock;
import org.apache.spark.util.ThreadUtils$;
import org.apache.spark.util.io.ChunkedByteBuffer;
import org.slf4j.Logger;
import scala.Function0;
import scala.Option$;
import scala.Some;
import scala.collection.mutable.ArrayBuffer;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutorService;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: ReceivedBlockHandler.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%e!\u0002\u000e\u001c\u0001u)\u0003\u0002\u0003\u001c\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001d\t\u0011y\u0002!\u0011!Q\u0001\n}B\u0001\"\u0012\u0001\u0003\u0002\u0003\u0006IA\u0012\u0005\t\u0013\u0002\u0011\t\u0011)A\u0005\u0015\"AQ\n\u0001B\u0001B\u0003%a\n\u0003\u0005S\u0001\t\u0005\t\u0015!\u0003T\u0011!Q\u0006A!A!\u0002\u0013Y\u0006\u0002\u00034\u0001\u0005\u0003\u0005\u000b\u0011B4\t\u000b5\u0004A\u0011\u00018\t\u000fa\u0004!\u0019!C\u0005s\"9\u0011Q\u0001\u0001!\u0002\u0013Q\b\"CA\u0004\u0001\t\u0007I\u0011BA\u0005\u0011\u001d\tY\u0001\u0001Q\u0001\n)C\u0011\"!\u0004\u0001\u0005\u0004%I!a\u0004\t\u0011\u0005m\u0001\u0001)A\u0005\u0003#A\u0011\"!\b\u0001\u0005\u0004%Y!a\b\t\u0011\u0005%\u0002\u0001)A\u0005\u0003CAq!a\u000b\u0001\t\u0003\ti\u0003C\u0004\u0002J\u0001!\t!a\u0013\t\u000f\u0005u\u0003\u0001\"\u0001\u0002`\u001dA\u0011\u0011M\u000e\t\u0002u\t\u0019GB\u0004\u001b7!\u0005Q$!\u001a\t\r54B\u0011AA4\u0011\u001d\tIG\u0006C\u0001\u0003WB\u0011\"!\u001d\u0017#\u0003%\t!a\u001d\u0003=]\u0013\u0018\u000e^3BQ\u0016\fG\rT8h\u0005\u0006\u001cX\r\u001a\"m_\u000e\\\u0007*\u00198eY\u0016\u0014(B\u0001\u000f\u001e\u0003!\u0011XmY3jm\u0016\u0014(B\u0001\u0010 \u0003%\u0019HO]3b[&twM\u0003\u0002!C\u0005)1\u000f]1sW*\u0011!eI\u0001\u0007CB\f7\r[3\u000b\u0003\u0011\n1a\u001c:h'\u0011\u0001a\u0005\f\u0019\u0011\u0005\u001dRS\"\u0001\u0015\u000b\u0003%\nQa]2bY\u0006L!a\u000b\u0015\u0003\r\u0005s\u0017PU3g!\tic&D\u0001\u001c\u0013\ty3D\u0001\u000bSK\u000e,\u0017N^3e\u00052|7m\u001b%b]\u0012dWM\u001d\t\u0003cQj\u0011A\r\u0006\u0003g}\t\u0001\"\u001b8uKJt\u0017\r\\\u0005\u0003kI\u0012q\u0001T8hO&tw-\u0001\u0007cY>\u001c7.T1oC\u001e,'o\u0001\u0001\u0011\u0005ebT\"\u0001\u001e\u000b\u0005mz\u0012aB:u_J\fw-Z\u0005\u0003{i\u0012AB\u00117pG.l\u0015M\\1hKJ\f\u0011c]3sS\u0006d\u0017N_3s\u001b\u0006t\u0017mZ3s!\t\u00015)D\u0001B\u0015\t\u0011u$\u0001\u0006tKJL\u0017\r\\5{KJL!\u0001R!\u0003#M+'/[1mSj,'/T1oC\u001e,'/\u0001\u0005tiJ,\u0017-\\%e!\t9s)\u0003\u0002IQ\t\u0019\u0011J\u001c;\u0002\u0019M$xN]1hK2+g/\u001a7\u0011\u0005eZ\u0015B\u0001';\u00051\u0019Fo\u001c:bO\u0016dUM^3m\u0003\u0011\u0019wN\u001c4\u0011\u0005=\u0003V\"A\u0010\n\u0005E{\"!C*qCJ\\7i\u001c8g\u0003)A\u0017\rZ8pa\u000e{gN\u001a\t\u0003)bk\u0011!\u0016\u0006\u0003\u001bZS!aV\u0011\u0002\r!\fGm\\8q\u0013\tIVKA\u0007D_:4\u0017nZ;sCRLwN\\\u0001\u000eG\",7m\u001b9pS:$H)\u001b:\u0011\u0005q\u001bgBA/b!\tq\u0006&D\u0001`\u0015\t\u0001w'\u0001\u0004=e>|GOP\u0005\u0003E\"\na\u0001\u0015:fI\u00164\u0017B\u00013f\u0005\u0019\u0019FO]5oO*\u0011!\rK\u0001\u0006G2|7m\u001b\t\u0003Q.l\u0011!\u001b\u0006\u0003U~\tA!\u001e;jY&\u0011A.\u001b\u0002\u0006\u00072|7m[\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0013=\u0004\u0018O]:ukZ<\bCA\u0017\u0001\u0011\u00151\u0014\u00021\u00019\u0011\u0015q\u0014\u00021\u0001@\u0011\u0015)\u0015\u00021\u0001G\u0011\u0015I\u0015\u00021\u0001K\u0011\u0015i\u0015\u00021\u0001O\u0011\u0015\u0011\u0016\u00021\u0001T\u0011\u0015Q\u0016\u00021\u0001\\\u0011\u001d1\u0017\u0002%AA\u0002\u001d\f\u0011C\u00197pG.\u001cFo\u001c:f)&lWm\\;u+\u0005Q\bcA>\u0002\u00025\tAP\u0003\u0002~}\u0006AA-\u001e:bi&|gN\u0003\u0002��Q\u0005Q1m\u001c8dkJ\u0014XM\u001c;\n\u0007\u0005\rAP\u0001\bGS:LG/\u001a#ve\u0006$\u0018n\u001c8\u0002%\tdwnY6Ti>\u0014X\rV5nK>,H\u000fI\u0001\u0016K\u001a4Wm\u0019;jm\u0016\u001cFo\u001c:bO\u0016dUM^3m+\u0005Q\u0015AF3gM\u0016\u001cG/\u001b<f'R|'/Y4f\u0019\u00164X\r\u001c\u0011\u0002\u001b]\u0014\u0018\u000e^3BQ\u0016\fG\rT8h+\t\t\t\u0002\u0005\u0003\u0002\u0014\u0005]QBAA\u000b\u0015\tQW$\u0003\u0003\u0002\u001a\u0005U!!D,sSR,\u0017\t[3bI2{w-\u0001\bxe&$X-\u00115fC\u0012dun\u001a\u0011\u0002!\u0015DXmY;uS>t7i\u001c8uKb$XCAA\u0011!\u0011\t\u0019#!\n\u000e\u0003yL1!a\n\u007f\u0005})\u00050Z2vi&|gnQ8oi\u0016DH/\u0012=fGV$xN]*feZL7-Z\u0001\u0012Kb,7-\u001e;j_:\u001cuN\u001c;fqR\u0004\u0013AC:u_J,'\t\\8dWR1\u0011qFA\u001b\u0003\u007f\u00012!LA\u0019\u0013\r\t\u0019d\u0007\u0002\u0019%\u0016\u001cW-\u001b<fI\ncwnY6Ti>\u0014XMU3tk2$\bbBA\u001c%\u0001\u0007\u0011\u0011H\u0001\bE2|7m[%e!\rI\u00141H\u0005\u0004\u0003{Q$!D*ue\u0016\fWN\u00117pG.LE\rC\u0004\u0002BI\u0001\r!a\u0011\u0002\u000b\tdwnY6\u0011\u00075\n)%C\u0002\u0002Hm\u0011QBU3dK&4X\r\u001a\"m_\u000e\\\u0017\u0001E2mK\u0006tW\u000f](mI\ncwnY6t)\u0011\ti%a\u0015\u0011\u0007\u001d\ny%C\u0002\u0002R!\u0012A!\u00168ji\"9\u0011QK\nA\u0002\u0005]\u0013A\u0003;ie\u0016\u001c\b\u000eV5nKB\u0019q%!\u0017\n\u0007\u0005m\u0003F\u0001\u0003M_:<\u0017\u0001B:u_B$\"!!\u0014\u0002=]\u0013\u0018\u000e^3BQ\u0016\fG\rT8h\u0005\u0006\u001cX\r\u001a\"m_\u000e\\\u0007*\u00198eY\u0016\u0014\bCA\u0017\u0017'\t1b\u0005\u0006\u0002\u0002d\u0005)2\r[3dWB|\u0017N\u001c;ESJ$v\u000eT8h\t&\u0014H#B.\u0002n\u0005=\u0004\"\u0002.\u0019\u0001\u0004Y\u0006\"B#\u0019\u0001\u00041\u0015a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$\u0003(\u0006\u0002\u0002v)\u001aq-a\u001e,\u0005\u0005e\u0004\u0003BA>\u0003\u000bk!!! \u000b\t\u0005}\u0014\u0011Q\u0001\nk:\u001c\u0007.Z2lK\u0012T1!a!)\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003\u000f\u000biHA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\u0004")
/* loaded from: input_file:org/apache/spark/streaming/receiver/WriteAheadLogBasedBlockHandler.class */
public class WriteAheadLogBasedBlockHandler implements ReceivedBlockHandler, Logging {
    private final BlockManager blockManager;
    private final SerializerManager serializerManager;
    private final StorageLevel storageLevel;
    private final Clock clock;
    private final FiniteDuration blockStoreTimeout;
    private final StorageLevel effectiveStorageLevel;
    private final WriteAheadLog writeAheadLog;
    private final ExecutionContextExecutorService executionContext;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static String checkpointDirToLogDir(String str, int i) {
        return WriteAheadLogBasedBlockHandler$.MODULE$.checkpointDirToLogDir(str, i);
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private FiniteDuration blockStoreTimeout() {
        return this.blockStoreTimeout;
    }

    private StorageLevel effectiveStorageLevel() {
        return this.effectiveStorageLevel;
    }

    private WriteAheadLog writeAheadLog() {
        return this.writeAheadLog;
    }

    private ExecutionContextExecutorService executionContext() {
        return this.executionContext;
    }

    @Override // org.apache.spark.streaming.receiver.ReceivedBlockHandler
    public ReceivedBlockStoreResult storeBlock(StreamBlockId streamBlockId, ReceivedBlock receivedBlock) {
        ChunkedByteBuffer chunkedByteBuffer;
        Some empty = Option$.MODULE$.empty();
        if (receivedBlock instanceof ArrayBufferBlock) {
            ArrayBuffer<?> arrayBuffer = ((ArrayBufferBlock) receivedBlock).arrayBuffer();
            empty = new Some(BoxesRunTime.boxToLong(arrayBuffer.size()));
            chunkedByteBuffer = this.serializerManager.dataSerialize(streamBlockId, arrayBuffer.iterator(), ClassTag$.MODULE$.apply(Object.class));
        } else if (receivedBlock instanceof IteratorBlock) {
            CountingIterator countingIterator = new CountingIterator(((IteratorBlock) receivedBlock).iterator());
            ChunkedByteBuffer dataSerialize = this.serializerManager.dataSerialize(streamBlockId, countingIterator, ClassTag$.MODULE$.apply(Object.class));
            empty = countingIterator.count();
            chunkedByteBuffer = dataSerialize;
        } else {
            if (!(receivedBlock instanceof ByteBufferBlock)) {
                throw new Exception(new StringBuilder(55).append("Could not push ").append(streamBlockId).append(" to block manager, unexpected block type").toString());
            }
            chunkedByteBuffer = new ChunkedByteBuffer(((ByteBufferBlock) receivedBlock).byteBuffer().duplicate());
        }
        ChunkedByteBuffer chunkedByteBuffer2 = chunkedByteBuffer;
        return new WriteAheadLogBasedStoreResult(streamBlockId, empty, (WriteAheadLogRecordHandle) ThreadUtils$.MODULE$.awaitResult(Future$.MODULE$.apply(() -> {
            if (!this.blockManager.putBytes(streamBlockId, chunkedByteBuffer2, this.effectiveStorageLevel(), true, ClassTag$.MODULE$.Nothing())) {
                throw new SparkException(new StringBuilder(53).append("Could not store ").append(streamBlockId).append(" to block manager with storage level ").append(this.storageLevel).toString());
            }
        }, executionContext()).zip(Future$.MODULE$.apply(() -> {
            return this.writeAheadLog().write(chunkedByteBuffer2.toByteBuffer(), this.clock.getTimeMillis());
        }, executionContext())).map(tuple2 -> {
            return (WriteAheadLogRecordHandle) tuple2._2();
        }, executionContext()), blockStoreTimeout()));
    }

    @Override // org.apache.spark.streaming.receiver.ReceivedBlockHandler
    public void cleanupOldBlocks(long j) {
        writeAheadLog().clean(j, false);
    }

    public void stop() {
        writeAheadLog().close();
        executionContext().shutdown();
    }

    public WriteAheadLogBasedBlockHandler(BlockManager blockManager, SerializerManager serializerManager, int i, StorageLevel storageLevel, SparkConf sparkConf, Configuration configuration, String str, Clock clock) {
        this.blockManager = blockManager;
        this.serializerManager = serializerManager;
        this.storageLevel = storageLevel;
        this.clock = clock;
        Logging.$init$(this);
        this.blockStoreTimeout = new package.DurationInt(package$.MODULE$.DurationInt(sparkConf.getInt("spark.streaming.receiver.blockStoreTimeout", 30))).seconds();
        if (storageLevel.deserialized()) {
            logWarning(() -> {
                return new StringBuilder(108).append("Storage level serialization ").append(this.storageLevel.deserialized()).append(" is not supported when").append(" write ahead log is enabled, change to serialization false").toString();
            });
        }
        if (storageLevel.replication() > 1) {
            logWarning(() -> {
                return new StringBuilder(98).append("Storage level replication ").append(this.storageLevel.replication()).append(" is unnecessary when ").append("write ahead log is enabled, change to replication 1").toString();
            });
        }
        this.effectiveStorageLevel = StorageLevel$.MODULE$.apply(storageLevel.useDisk(), storageLevel.useMemory(), storageLevel.useOffHeap(), false, 1);
        StorageLevel effectiveStorageLevel = effectiveStorageLevel();
        if (storageLevel != null ? !storageLevel.equals(effectiveStorageLevel) : effectiveStorageLevel != null) {
            logWarning(() -> {
                return new StringBuilder(98).append("User defined storage level ").append(this.storageLevel).append(" is changed to effective storage level ").append(this.effectiveStorageLevel()).append(" when write ahead log is enabled").toString();
            });
        }
        this.writeAheadLog = WriteAheadLogUtils$.MODULE$.createLogForReceiver(sparkConf, WriteAheadLogBasedBlockHandler$.MODULE$.checkpointDirToLogDir(str, i), configuration);
        this.executionContext = ExecutionContext$.MODULE$.fromExecutorService(ThreadUtils$.MODULE$.newDaemonFixedThreadPool(2, getClass().getSimpleName()));
    }
}
