package org.hibernate.search.mapper.orm.coordination.outboxpolling.event.impl;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.hibernate.search.engine.reporting.FailureContext;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.Agent;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentPersister;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentState;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentType;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.ClusterDescriptor;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.ShardAssignmentDescriptor;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.event.impl.AbstractAgentClusterLink;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.logging.impl.Log;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.SearchException;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;
import org.jboss.logging.Logger;

/* loaded from: input_file:org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/OutboxPollingEventProcessorClusterLink.class */
public final class OutboxPollingEventProcessorClusterLink extends AbstractAgentClusterLink<OutboxPollingEventProcessingInstructions> {
    private static final Log log = (Log) LoggerFactory.make(Log.class, MethodHandles.lookup());
    private final OutboxEventFinderProvider finderProvider;
    final boolean shardAssignmentIsStatic;
    ShardAssignment lastShardAssignment;

    public OutboxPollingEventProcessorClusterLink(String str, FailureHandler failureHandler, Clock clock, OutboxEventFinderProvider outboxEventFinderProvider, Duration duration, Duration duration2, Duration duration3, ShardAssignmentDescriptor shardAssignmentDescriptor) {
        super(new AgentPersister(shardAssignmentDescriptor == null ? AgentType.EVENT_PROCESSING_DYNAMIC_SHARDING : AgentType.EVENT_PROCESSING_STATIC_SHARDING, str, shardAssignmentDescriptor), failureHandler, clock, duration, duration2, duration3);
        this.finderProvider = outboxEventFinderProvider;
        if (shardAssignmentDescriptor == null) {
            this.shardAssignmentIsStatic = false;
            this.lastShardAssignment = null;
        } else {
            this.shardAssignmentIsStatic = true;
            this.lastShardAssignment = ShardAssignment.of(shardAssignmentDescriptor, outboxEventFinderProvider);
        }
        log.tracef("Agent '%s': created, staticShardAssignment = %s", str, shardAssignmentDescriptor);
    }

    @Override // org.hibernate.search.mapper.orm.coordination.outboxpolling.event.impl.AbstractAgentClusterLink
    protected AbstractAgentClusterLink.WriteAction<OutboxPollingEventProcessingInstructions> doPulse(List<Agent> list, Agent agent) {
        for (Agent agent2 : list) {
            if (AgentType.MASS_INDEXING.equals(agent2.getType())) {
                log.logf(agent.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE, "Agent '%s': another agent '%s' is currently mass indexing", selfReference(), agent2);
                return (instant, agent3, agentPersister) -> {
                    agentPersister.setSuspended(agent3);
                    return instructCommitAndRetryPulseAfterDelay(instant, this.pulseInterval);
                };
            }
        }
        try {
            ClusterTarget create = ClusterTarget.create(list);
            Optional<ShardAssignmentDescriptor> fromClusterMemberList = ShardAssignmentDescriptor.fromClusterMemberList(create.descriptor.memberIdsInShardOrder, selfReference().id);
            if (!fromClusterMemberList.isPresent()) {
                log.logf(agent.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE, "Agent '%s': this agent is superfluous and will not perform event processing, because other agents are enough to handle all the shards. Target cluster: %s.", selfReference(), create.descriptor);
                return (instant2, agent4, agentPersister2) -> {
                    agentPersister2.setSuspended(agent4);
                    return instructCommitAndRetryPulseAfterDelay(instant2, this.pulseInterval);
                };
            }
            ShardAssignmentDescriptor shardAssignmentDescriptor = fromClusterMemberList.get();
            if (create.descriptor.memberIdsInShardOrder.contains(null)) {
                log.logf(agent.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE, "Agent '%s': some cluster members are missing; this agent will wait until they are present. Target cluster: %s.", selfReference(), create.descriptor);
                return (instant3, agent5, agentPersister3) -> {
                    agentPersister3.setSuspended(agent5);
                    return instructCommitAndRetryPulseAfterDelay(instant3, this.pollingInterval);
                };
            }
            ShardAssignmentDescriptor shardAssignment = agent.getShardAssignment();
            if (!shardAssignmentDescriptor.equals(shardAssignment)) {
                log.infof("Agent '%s': the persisted shard assignment (%s) does not match the target. Target assignment: %s. Cluster: %s.", new Object[]{selfReference(), shardAssignment, shardAssignmentDescriptor, create.descriptor});
                return (instant4, agent6, agentPersister4) -> {
                    agentPersister4.setWaiting(agent6, create.descriptor, shardAssignmentDescriptor);
                    return instructCommitAndRetryPulseAfterDelay(instant4, this.pollingInterval);
                };
            }
            if (!excludedAgentsAreOutOfCluster(create.excluded)) {
                return (instant5, agent7, agentPersister5) -> {
                    agentPersister5.setWaiting(agent7, create.descriptor, shardAssignmentDescriptor);
                    return instructCommitAndRetryPulseAfterDelay(instant5, this.pollingInterval);
                };
            }
            if (!clusterMembersAreInCluster(create.membersInShardOrder, create.descriptor)) {
                return (instant6, agent8, agentPersister6) -> {
                    agentPersister6.setWaiting(agent8, create.descriptor, shardAssignmentDescriptor);
                    return instructCommitAndRetryPulseAfterDelay(instant6, this.pollingInterval);
                };
            }
            if (this.lastShardAssignment == null || !shardAssignmentDescriptor.equals(this.lastShardAssignment.descriptor)) {
                if (this.shardAssignmentIsStatic) {
                    throw new AssertionFailure("Agent '" + selfReference() + "' has a static shard assignment, but the target shard assignment (" + shardAssignmentDescriptor + ") does not match the static shard assignment (" + this.lastShardAssignment + ")");
                }
                log.infof("Agent '%s': assigning to %s", selfReference(), shardAssignmentDescriptor);
                this.lastShardAssignment = ShardAssignment.of(shardAssignmentDescriptor, this.finderProvider);
            }
            return (instant7, agent9, agentPersister7) -> {
                agentPersister7.setRunning(agent9, create.descriptor);
                return instructProceedWithEventProcessing(instant7);
            };
        } catch (SearchException e) {
            FailureContext.Builder builder = FailureContext.builder();
            builder.throwable(log.outboxEventProcessorPulseFailed(selfReference(), e.getMessage(), list, e));
            builder.failingOperation(log.outboxEventProcessorPulse(selfReference()));
            this.failureHandler.handle(builder.build());
            return (instant8, agent10, agentPersister8) -> {
                agentPersister8.setSuspended(agent10);
                return instructCommitAndRetryPulseAfterDelay(instant8, this.pulseInterval);
            };
        }
    }

    private boolean excludedAgentsAreOutOfCluster(List<Agent> list) {
        if (list.isEmpty()) {
            return true;
        }
        AgentState agentState = AgentState.SUSPENDED;
        for (Agent agent : list) {
            if (!agentState.equals(agent.getState())) {
                log.tracef("Agent '%s': waiting for agent '%s', which has not reached state '%s' yet", selfReference(), agent.getReference(), agentState);
                return false;
            }
        }
        log.tracef("Agent '%s': agents excluded from the cluster reached the expected state %s", selfReference(), agentState);
        return true;
    }

    private boolean clusterMembersAreInCluster(List<Agent> list, ClusterDescriptor clusterDescriptor) {
        int size = list.size();
        int i = 0;
        Set<AgentState> set = AgentState.WAITING_OR_RUNNING;
        for (Agent agent : list) {
            AgentState state = agent.getState();
            if (!set.contains(agent.getState())) {
                log.tracef("Agent '%s': waiting for agent '%s', whose state %s is not in the expected %s yet", new Object[]{selfReference(), agent.getReference(), state, set});
                return false;
            }
            Integer totalShardCount = agent.getTotalShardCount();
            if (totalShardCount == null || size != totalShardCount.intValue()) {
                log.tracef("Agent '%s': waiting for agent '%s', whose total shard count %s is not the expected %s yet", new Object[]{selfReference(), agent.getReference(), totalShardCount, Integer.valueOf(size)});
                return false;
            }
            Integer assignedShardIndex = agent.getAssignedShardIndex();
            if (assignedShardIndex == null || i != assignedShardIndex.intValue()) {
                log.tracef("Agent '%s': waiting for agent '%s', whose assigned shard index %s is not the expected %s yet", new Object[]{selfReference(), agent.getReference(), assignedShardIndex, Integer.valueOf(i)});
                return false;
            }
            i++;
        }
        log.tracef("Agent '%s': all cluster members reached the expected states %s and shard assignment %s", selfReference(), set, clusterDescriptor);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.hibernate.search.mapper.orm.coordination.outboxpolling.event.impl.AbstractAgentClusterLink
    public OutboxPollingEventProcessingInstructions instructCommitAndRetryPulseAfterDelay(Instant instant, Duration duration) {
        Instant plus = instant.plus((TemporalAmount) duration);
        log.tracef("Agent '%s': instructions are to not process events and to retry a pulse in %s, around %s", selfReference(), duration, plus);
        return new OutboxPollingEventProcessingInstructions(this.clock, plus, Optional.empty());
    }

    private OutboxPollingEventProcessingInstructions instructProceedWithEventProcessing(Instant instant) {
        Instant plus = instant.plus((TemporalAmount) this.pulseInterval);
        log.tracef("Agent '%s': instructions are to process events and to retry a pulse in %s, around %s", selfReference(), this.pulseInterval, plus);
        return new OutboxPollingEventProcessingInstructions(this.clock, plus, Optional.of(this.lastShardAssignment.eventFinder));
    }
}
