package akka.remote.artery;

import akka.Done;
import akka.actor.Address;
import akka.actor.ScalaActorRef;
import akka.actor.package$;
import akka.dispatch.sysmsg.SystemMessage;
import akka.remote.UniqueAddress;
import akka.remote.artery.InboundControlJunction;
import akka.remote.artery.OutboundHandshake;
import akka.remote.artery.SystemMessageDelivery;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.TimerGraphStageLogic;
import akka.util.PrettyDuration$;
import akka.util.PrettyDuration$PrettyPrintableDuration$;
import java.util.ArrayDeque;
import java.util.Iterator;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import scala.MatchError;
import scala.Predef$;
import scala.StringContext;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: SystemMessageDelivery.scala */
/* loaded from: input_file:akka/remote/artery/SystemMessageDelivery$$anon$1.class */
public final class SystemMessageDelivery$$anon$1 extends TimerGraphStageLogic implements InHandler, OutHandler, InboundControlJunction.ControlMessageObserver {
    private boolean replyObserverAttached;
    private long seqNo;
    private final ArrayDeque<OutboundEnvelope> unacknowledged;
    private ArrayDeque<OutboundEnvelope> resending;
    private long resendingFromSeqNo;
    private boolean stopping;
    private final long giveUpAfterNanos;
    private long ackTimestamp;
    private final AsyncCallback<SystemMessageDelivery.Ack> ackCallback;
    private final AsyncCallback<SystemMessageDelivery.Nack> nackCallback;
    private final /* synthetic */ SystemMessageDelivery $outer;

    public void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public void onUpstreamFailure(Throwable th) throws Exception {
        InHandler.onUpstreamFailure$(this, th);
    }

    private boolean replyObserverAttached() {
        return this.replyObserverAttached;
    }

    private void replyObserverAttached_$eq(boolean z) {
        this.replyObserverAttached = z;
    }

    private long seqNo() {
        return this.seqNo;
    }

    private void seqNo_$eq(long j) {
        this.seqNo = j;
    }

    private ArrayDeque<OutboundEnvelope> unacknowledged() {
        return this.unacknowledged;
    }

    private ArrayDeque<OutboundEnvelope> resending() {
        return this.resending;
    }

    private void resending_$eq(ArrayDeque<OutboundEnvelope> arrayDeque) {
        this.resending = arrayDeque;
    }

    private long resendingFromSeqNo() {
        return this.resendingFromSeqNo;
    }

    private void resendingFromSeqNo_$eq(long j) {
        this.resendingFromSeqNo = j;
    }

    private boolean stopping() {
        return this.stopping;
    }

    private void stopping_$eq(boolean z) {
        this.stopping = z;
    }

    private long giveUpAfterNanos() {
        return this.giveUpAfterNanos;
    }

    private long ackTimestamp() {
        return this.ackTimestamp;
    }

    private void ackTimestamp_$eq(long j) {
        this.ackTimestamp = j;
    }

    private UniqueAddress localAddress() {
        return this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.localAddress();
    }

    private Address remoteAddress() {
        return this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.remoteAddress();
    }

    public void preStart() {
        ExecutionContextExecutor executionContext = materializer().executionContext();
        Future<Done> attach = this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.controlSubject().attach(this);
        AsyncCallback asyncCallback = getAsyncCallback(done -> {
            $anonfun$preStart$1(this, done);
            return BoxedUnit.UNIT;
        });
        attach.foreach(done2 -> {
            asyncCallback.invoke(done2);
            return BoxedUnit.UNIT;
        }, executionContext);
        Future<Done> stopped = this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.controlSubject().stopped();
        AsyncCallback asyncCallback2 = getAsyncCallback(r4 -> {
            $anonfun$preStart$3(this, r4);
            return BoxedUnit.UNIT;
        });
        stopped.onComplete(r42 -> {
            asyncCallback2.invoke(r42);
            return BoxedUnit.UNIT;
        }, executionContext);
    }

    public void postStop() {
        sendUnacknowledgedToDeadLetters();
        unacknowledged().clear();
        this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.controlSubject().detach(this);
    }

    public void onUpstreamFinish() {
        if (unacknowledged().isEmpty()) {
            InHandler.onUpstreamFinish$(this);
        } else {
            stopping_$eq(true);
        }
    }

    public void onTimer(Object obj) {
        BoxedUnit boxedUnit;
        if (!SystemMessageDelivery$ResendTick$.MODULE$.equals(obj)) {
            throw new MatchError(obj);
        }
        checkGiveUp();
        if (resending().isEmpty() && !unacknowledged().isEmpty()) {
            resending_$eq(unacknowledged().clone());
            tryResend();
        }
        if (unacknowledged().isEmpty()) {
            boxedUnit = BoxedUnit.UNIT;
        } else {
            scheduleOnce(SystemMessageDelivery$ResendTick$.MODULE$, this.$outer.akka$remote$artery$SystemMessageDelivery$$resendInterval);
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    @Override // akka.remote.artery.InboundControlJunction.ControlMessageObserver
    public void notify(InboundEnvelope inboundEnvelope) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        Object message = inboundEnvelope.message();
        if (message instanceof SystemMessageDelivery.Ack) {
            SystemMessageDelivery.Ack ack = (SystemMessageDelivery.Ack) message;
            Address address = ack.from().address();
            Address remoteAddress = remoteAddress();
            if (address != null ? !address.equals(remoteAddress) : remoteAddress != null) {
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                ackCallback().invoke(ack);
                boxedUnit2 = BoxedUnit.UNIT;
            }
            return;
        }
        if (!(message instanceof SystemMessageDelivery.Nack)) {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        SystemMessageDelivery.Nack nack = (SystemMessageDelivery.Nack) message;
        Address address2 = nack.from().address();
        Address remoteAddress2 = remoteAddress();
        if (address2 != null ? !address2.equals(remoteAddress2) : remoteAddress2 != null) {
            boxedUnit = BoxedUnit.UNIT;
        } else {
            nackCallback().invoke(nack);
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    private AsyncCallback<SystemMessageDelivery.Ack> ackCallback() {
        return this.ackCallback;
    }

    private AsyncCallback<SystemMessageDelivery.Nack> nackCallback() {
        return this.nackCallback;
    }

    private void ack(long j) {
        ackTimestamp_$eq(System.nanoTime());
        if (j <= seqNo()) {
            clearUnacknowledged(j);
        }
    }

    private void clearUnacknowledged(long j) {
        while (!unacknowledged().isEmpty() && ((SystemMessageDelivery.SystemMessageEnvelope) unacknowledged().peek().message()).seqNo() <= j) {
            unacknowledged().removeFirst();
            if (unacknowledged().isEmpty()) {
                cancelTimer(this.$outer.akka$remote$artery$SystemMessageDelivery$$resendInterval);
            }
            if (stopping() && unacknowledged().isEmpty()) {
                completeStage();
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
            j = j;
        }
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    private void tryResend() {
        if (!isAvailable(this.$outer.out()) || resending().isEmpty()) {
            return;
        }
        pushCopy(resending().poll());
    }

    private void pushCopy(OutboundEnvelope outboundEnvelope) {
        push(this.$outer.out(), outboundEnvelope.copy());
    }

    public void onPush() {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        BoxedUnit boxedUnit3;
        OutboundEnvelope outboundEnvelope = (OutboundEnvelope) grab(this.$outer.in());
        Object message = outboundEnvelope.message();
        if (!(message instanceof SystemMessage ? true : message instanceof SystemMessageDelivery.AckedDeliveryMessage)) {
            if (message instanceof OutboundHandshake.HandshakeReq) {
                if (isAvailable(this.$outer.out())) {
                    pushCopy(outboundEnvelope);
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                return;
            }
            if (SystemMessageDelivery$ClearSystemMessageDelivery$.MODULE$.equals(message)) {
                clear();
                pull(this.$outer.in());
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                return;
            }
            if (resending().isEmpty() && isAvailable(this.$outer.out())) {
                push(this.$outer.out(), outboundEnvelope);
                boxedUnit = BoxedUnit.UNIT;
            } else {
                resending().offer(outboundEnvelope);
                tryResend();
                boxedUnit = BoxedUnit.UNIT;
            }
            return;
        }
        if (unacknowledged().size() < this.$outer.akka$remote$artery$SystemMessageDelivery$$maxBufferSize) {
            seqNo_$eq(seqNo() + 1);
            if (unacknowledged().isEmpty()) {
                ackTimestamp_$eq(System.nanoTime());
            } else {
                checkGiveUp();
            }
            OutboundEnvelope withMessage = outboundEnvelope.withMessage(new SystemMessageDelivery.SystemMessageEnvelope(message, seqNo(), localAddress()));
            unacknowledged().offer(withMessage);
            scheduleOnce(SystemMessageDelivery$ResendTick$.MODULE$, this.$outer.akka$remote$artery$SystemMessageDelivery$$resendInterval);
            if (resending().isEmpty() && isAvailable(this.$outer.out())) {
                pushCopy(withMessage);
                boxedUnit3 = BoxedUnit.UNIT;
            } else {
                resending().offer(withMessage);
                tryResend();
                boxedUnit3 = BoxedUnit.UNIT;
            }
        } else {
            this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.quarantine(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"System message delivery buffer overflow, size [", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.$outer.akka$remote$artery$SystemMessageDelivery$$maxBufferSize)})));
            ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(this.$outer.akka$remote$artery$SystemMessageDelivery$$deadLetters);
            actorRef2Scala.$bang(outboundEnvelope, actorRef2Scala.$bang$default$2(outboundEnvelope));
            pull(this.$outer.in());
            boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    private void checkGiveUp() {
        if (!unacknowledged().isEmpty() && System.nanoTime() - ackTimestamp() > giveUpAfterNanos()) {
            throw new SystemMessageDelivery.GaveUpSystemMessageException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Gave up sending system message to [", "] after "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.remoteAddress()})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ScopeFormat.SCOPE_SEPARATOR})).s(Predef$.MODULE$.genericWrapArray(new Object[]{PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension0(PrettyDuration$.MODULE$.PrettyPrintableDuration(this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.settings().Advanced().GiveUpSystemMessageAfter()))})));
        }
    }

    private void clear() {
        sendUnacknowledgedToDeadLetters();
        seqNo_$eq(0L);
        unacknowledged().clear();
        resending().clear();
        resendingFromSeqNo_$eq(-1L);
        cancelTimer(this.$outer.akka$remote$artery$SystemMessageDelivery$$resendInterval);
    }

    private void sendUnacknowledgedToDeadLetters() {
        Iterator<OutboundEnvelope> it = unacknowledged().iterator();
        while (it.hasNext()) {
            ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(this.$outer.akka$remote$artery$SystemMessageDelivery$$deadLetters);
            OutboundEnvelope next = it.next();
            actorRef2Scala.$bang(next, actorRef2Scala.$bang$default$2(next));
        }
    }

    public void onPull() {
        if (replyObserverAttached()) {
            if (!resending().isEmpty() || hasBeenPulled(this.$outer.in()) || stopping()) {
                tryResend();
            } else {
                pull(this.$outer.in());
            }
        }
    }

    public static final /* synthetic */ void $anonfun$preStart$1(SystemMessageDelivery$$anon$1 systemMessageDelivery$$anon$1, Done done) {
        systemMessageDelivery$$anon$1.replyObserverAttached_$eq(true);
        if (systemMessageDelivery$$anon$1.isAvailable(systemMessageDelivery$$anon$1.$outer.out())) {
            systemMessageDelivery$$anon$1.pull(systemMessageDelivery$$anon$1.$outer.in());
        }
    }

    public static final /* synthetic */ void $anonfun$preStart$3(SystemMessageDelivery$$anon$1 systemMessageDelivery$$anon$1, Try r5) {
        if (r5 instanceof Success) {
            systemMessageDelivery$$anon$1.completeStage();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r5 instanceof Failure)) {
                throw new MatchError(r5);
            }
            systemMessageDelivery$$anon$1.failStage(((Failure) r5).exception());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$ackCallback$1(SystemMessageDelivery$$anon$1 systemMessageDelivery$$anon$1, SystemMessageDelivery.Ack ack) {
        systemMessageDelivery$$anon$1.ack(ack.seqNo());
    }

    public static final /* synthetic */ void $anonfun$nackCallback$1(SystemMessageDelivery$$anon$1 systemMessageDelivery$$anon$1, SystemMessageDelivery.Nack nack) {
        if (nack.seqNo() <= systemMessageDelivery$$anon$1.seqNo()) {
            systemMessageDelivery$$anon$1.ack(nack.seqNo());
            if (nack.seqNo() > systemMessageDelivery$$anon$1.resendingFromSeqNo()) {
                systemMessageDelivery$$anon$1.resending_$eq(systemMessageDelivery$$anon$1.unacknowledged().clone());
            }
            systemMessageDelivery$$anon$1.tryResend();
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SystemMessageDelivery$$anon$1(SystemMessageDelivery systemMessageDelivery) {
        super(systemMessageDelivery.m1409shape());
        if (systemMessageDelivery == null) {
            throw null;
        }
        this.$outer = systemMessageDelivery;
        InHandler.$init$(this);
        OutHandler.$init$(this);
        this.replyObserverAttached = false;
        this.seqNo = 0L;
        this.unacknowledged = new ArrayDeque<>();
        this.resending = new ArrayDeque<>();
        this.resendingFromSeqNo = -1L;
        this.stopping = false;
        this.giveUpAfterNanos = systemMessageDelivery.akka$remote$artery$SystemMessageDelivery$$outboundContext.settings().Advanced().GiveUpSystemMessageAfter().toNanos();
        this.ackTimestamp = System.nanoTime();
        this.ackCallback = getAsyncCallback(ack -> {
            $anonfun$ackCallback$1(this, ack);
            return BoxedUnit.UNIT;
        });
        this.nackCallback = getAsyncCallback(nack -> {
            $anonfun$nackCallback$1(this, nack);
            return BoxedUnit.UNIT;
        });
        setHandlers(systemMessageDelivery.in(), systemMessageDelivery.out(), this);
    }
}
