package org.apache.ignite.internal.processors.datastreamer;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.DelayQueue;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.class */
public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
    private Collection<DataStreamerImpl> ldrs;
    private final GridSpinBusyLock busyLock;
    private Thread flusher;
    private final DelayQueue<DataStreamerImpl<K, V>> flushQ;
    private final Marshaller marsh;

    public DataStreamProcessor(GridKernalContext gridKernalContext) {
        super(gridKernalContext);
        this.ldrs = new GridConcurrentHashSet();
        this.busyLock = new GridSpinBusyLock();
        this.flushQ = new DelayQueue<>();
        gridKernalContext.io().addMessageListener(GridTopic.TOPIC_DATASTREAM, new GridMessageListener() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.1
            static final /* synthetic */ boolean $assertionsDisabled;

            @Override // org.apache.ignite.internal.managers.communication.GridMessageListener
            public void onMessage(UUID uuid, Object obj) {
                if (!$assertionsDisabled && !(obj instanceof DataStreamerRequest)) {
                    throw new AssertionError();
                }
                DataStreamProcessor.this.processRequest(uuid, (DataStreamerRequest) obj);
            }

            static {
                $assertionsDisabled = !DataStreamProcessor.class.desiredAssertionStatus();
            }
        });
        this.marsh = gridKernalContext.config().getMarshaller();
    }

    @Override // org.apache.ignite.internal.processors.GridProcessorAdapter, org.apache.ignite.internal.GridComponent
    public void start() throws IgniteCheckedException {
        if (this.ctx.config().isDaemon()) {
            return;
        }
        this.flusher = new IgniteThread(new GridWorker(this.ctx.gridName(), "grid-data-loader-flusher", this.log) { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.2
            @Override // org.apache.ignite.internal.util.worker.GridWorker
            protected void body() throws InterruptedException {
                while (!isCancelled()) {
                    DataStreamerImpl dataStreamerImpl = (DataStreamerImpl) DataStreamProcessor.this.flushQ.take();
                    if (!DataStreamProcessor.this.busyLock.enterBusy()) {
                        return;
                    }
                    try {
                        if (!dataStreamerImpl.isClosed()) {
                            dataStreamerImpl.tryFlush();
                            DataStreamProcessor.this.flushQ.offer((DelayQueue) dataStreamerImpl);
                            DataStreamProcessor.this.busyLock.leaveBusy();
                        }
                    } finally {
                        DataStreamProcessor.this.busyLock.leaveBusy();
                    }
                }
            }
        });
        this.flusher.start();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Started data streamer processor.");
        }
    }

    @Override // org.apache.ignite.internal.processors.GridProcessorAdapter, org.apache.ignite.internal.GridComponent
    public void onKernalStop(boolean z) {
        if (this.ctx.config().isDaemon()) {
            return;
        }
        this.ctx.io().removeMessageListener(GridTopic.TOPIC_DATASTREAM);
        this.busyLock.block();
        U.interrupt(this.flusher);
        U.join(this.flusher, this.log);
        for (DataStreamerImpl dataStreamerImpl : this.ldrs) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Closing active data streamer on grid stop [ldr=" + dataStreamerImpl + ", cancel=" + z + ']');
            }
            try {
                dataStreamerImpl.closeEx(z);
            } catch (IgniteInterruptedCheckedException e) {
                U.warn(this.log, "Interrupted while waiting for completion of the data streamer: " + dataStreamerImpl, e);
            } catch (IgniteCheckedException e2) {
                U.error(this.log, "Failed to close data streamer: " + dataStreamerImpl, e2);
            }
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Stopped data streamer processor.");
        }
    }

    public DataStreamerImpl<K, V> dataStreamer(@Nullable String str) {
        if (!this.busyLock.enterBusy()) {
            throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
        }
        try {
            final DataStreamerImpl<K, V> dataStreamerImpl = new DataStreamerImpl<>(this.ctx, str, this.flushQ);
            this.ldrs.add(dataStreamerImpl);
            dataStreamerImpl.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.3
                static final /* synthetic */ boolean $assertionsDisabled;

                @Override // org.apache.ignite.lang.IgniteInClosure
                public void apply(IgniteInternalFuture<?> igniteInternalFuture) {
                    boolean remove = DataStreamProcessor.this.ldrs.remove(dataStreamerImpl);
                    if (!$assertionsDisabled && !remove) {
                        throw new AssertionError("Loader has not been added to set: " + dataStreamerImpl);
                    }
                    if (DataStreamProcessor.this.log.isDebugEnabled()) {
                        DataStreamProcessor.this.log.debug("Loader has been completed: " + dataStreamerImpl);
                    }
                }

                static {
                    $assertionsDisabled = !DataStreamProcessor.class.desiredAssertionStatus();
                }
            });
            this.busyLock.leaveBusy();
            return dataStreamerImpl;
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processRequest(final UUID uuid, final DataStreamerRequest dataStreamerRequest) {
        ClassLoader classLoader;
        if (!this.busyLock.enterBusy()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Ignoring data load request (node is stopping): " + dataStreamerRequest);
                return;
            }
            return;
        }
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Processing data load request: " + dataStreamerRequest);
            }
            AffinityTopologyVersion readyAffinityVersion = this.ctx.cache().context().exchange().readyAffinityVersion();
            AffinityTopologyVersion affinityTopologyVersion = dataStreamerRequest.topologyVersion();
            if (readyAffinityVersion.compareTo(affinityTopologyVersion) < 0) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Received request has higher affinity topology version [request=" + dataStreamerRequest + ", locTopVer=" + readyAffinityVersion + ", rmtTopVer=" + affinityTopologyVersion + ']');
                }
                IgniteInternalFuture<?> affinityReadyFuture = this.ctx.cache().context().exchange().affinityReadyFuture(affinityTopologyVersion);
                if (affinityReadyFuture != null && !affinityReadyFuture.isDone()) {
                    affinityReadyFuture.listen(new CI1<IgniteInternalFuture<?>>() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.4
                        @Override // org.apache.ignite.lang.IgniteInClosure
                        public void apply(IgniteInternalFuture<?> igniteInternalFuture) {
                            DataStreamProcessor.this.ctx.closure().runLocalSafe(new Runnable() { // from class: org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.4.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    DataStreamProcessor.this.processRequest(uuid, dataStreamerRequest);
                                }
                            }, false);
                        }
                    });
                    this.busyLock.leaveBusy();
                    return;
                }
            }
            try {
                Object unmarshal = this.marsh.unmarshal(dataStreamerRequest.responseTopicBytes(), (ClassLoader) null);
                if (dataStreamerRequest.forceLocalDeployment()) {
                    classLoader = U.gridClassLoader();
                } else {
                    GridDeployment globalDeployment = this.ctx.deploy().getGlobalDeployment(dataStreamerRequest.deploymentMode(), dataStreamerRequest.sampleClassName(), dataStreamerRequest.sampleClassName(), dataStreamerRequest.userVersion(), uuid, dataStreamerRequest.classLoaderId(), dataStreamerRequest.participants(), null);
                    if (globalDeployment == null) {
                        sendResponse(uuid, unmarshal, dataStreamerRequest.requestId(), new IgniteCheckedException("Failed to get deployment for request [sndId=" + uuid + ", req=" + dataStreamerRequest + ']'), false);
                        this.busyLock.leaveBusy();
                        return;
                    }
                    classLoader = globalDeployment.classLoader();
                }
                try {
                    Exception exc = null;
                    try {
                        new DataStreamerUpdateJob(this.ctx, this.log, dataStreamerRequest.cacheName(), dataStreamerRequest.entries(), dataStreamerRequest.ignoreDeploymentOwnership(), dataStreamerRequest.skipStore(), (StreamReceiver) this.marsh.unmarshal(dataStreamerRequest.updaterBytes(), classLoader)).call();
                    } catch (Exception e) {
                        U.error(this.log, "Failed to finish update job.", e);
                        exc = e;
                    }
                    sendResponse(uuid, unmarshal, dataStreamerRequest.requestId(), exc, dataStreamerRequest.forceLocalDeployment());
                    this.busyLock.leaveBusy();
                } catch (IgniteCheckedException e2) {
                    U.error(this.log, "Failed to unmarshal message [nodeId=" + uuid + ", req=" + dataStreamerRequest + ']', e2);
                    sendResponse(uuid, unmarshal, dataStreamerRequest.requestId(), e2, false);
                    this.busyLock.leaveBusy();
                }
            } catch (IgniteCheckedException e3) {
                U.error(this.log, "Failed to unmarshal topic from request: " + dataStreamerRequest, e3);
                this.busyLock.leaveBusy();
            }
        } catch (Throwable th) {
            this.busyLock.leaveBusy();
            throw th;
        }
    }

    private void sendResponse(UUID uuid, Object obj, long j, @Nullable Throwable th, boolean z) {
        byte[] marshal;
        if (th != null) {
            try {
                marshal = this.marsh.marshal(th);
            } catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to marshal message.", e);
                return;
            }
        } else {
            marshal = null;
        }
        DataStreamerResponse dataStreamerResponse = new DataStreamerResponse(j, marshal, z);
        try {
            this.ctx.io().send(uuid, obj, dataStreamerResponse, GridIoPolicy.PUBLIC_POOL);
        } catch (IgniteCheckedException e2) {
            if (this.ctx.discovery().alive(uuid)) {
                U.error(this.log, "Failed to respond to node [nodeId=" + uuid + ", res=" + dataStreamerResponse + ']', e2);
            } else if (this.log.isDebugEnabled()) {
                this.log.debug("Node has left the grid: " + uuid);
            }
        }
    }

    @Override // org.apache.ignite.internal.processors.GridProcessorAdapter, org.apache.ignite.internal.GridComponent
    public void printMemoryStats() {
        X.println(">>>", new Object[0]);
        X.println(">>> Data streamer processor memory stats [grid=" + this.ctx.gridName() + ']', new Object[0]);
        X.println(">>>   ldrsSize: " + this.ldrs.size(), new Object[0]);
    }
}
