package org.infinispan.server.resp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.encoding.DataConversion;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;
import org.infinispan.notifications.cachelistener.filter.EventType;
import org.infinispan.server.resp.logging.Log;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;

/* loaded from: input_file:org/infinispan/server/resp/SubscriberHandler.class */
public class SubscriberHandler extends CacheRespRequestHandler {
    private static final Log log;
    public static final byte[] PREFIX_CHANNEL_BYTES;
    private final Resp3Handler handler;
    Map<WrappedByteArray, PubSubListener> specificChannelSubscribers;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/infinispan/server/resp/SubscriberHandler$ListenerKeyFilter.class */
    static class ListenerKeyFilter implements CacheEventFilter<Object, Object> {
        private final byte[] key;
        private final DataConversion conversion;

        ListenerKeyFilter(byte[] bArr, DataConversion dataConversion) {
            this.key = bArr;
            this.conversion = dataConversion;
        }

        public boolean accept(Object obj, Object obj2, Metadata metadata, Object obj3, Metadata metadata2, EventType eventType) {
            return Arrays.equals(this.key, (byte[]) this.conversion.fromStorage(obj));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Listener(clustered = true)
    /* loaded from: input_file:org/infinispan/server/resp/SubscriberHandler$PubSubListener.class */
    public static class PubSubListener {
        private final Channel channel;
        private final DataConversion keyConversion;
        private final DataConversion valueConversion;
        static final /* synthetic */ boolean $assertionsDisabled;

        PubSubListener(Channel channel, DataConversion dataConversion, DataConversion dataConversion2) {
            this.channel = channel;
            this.keyConversion = dataConversion;
            this.valueConversion = dataConversion2;
        }

        @CacheEntryCreated
        @CacheEntryModified
        public CompletionStage<Void> onEvent(CacheEntryEvent<Object, Object> cacheEntryEvent) {
            byte[] channelToKey = SubscriberHandler.channelToKey((byte[]) this.keyConversion.fromStorage(cacheEntryEvent.getKey()));
            byte[] bArr = (byte[]) this.valueConversion.fromStorage(cacheEntryEvent.getValue());
            if (channelToKey.length > 0 && bArr != null && bArr.length > 0) {
                int log10 = 18 + ((int) Math.log10(channelToKey.length)) + 1 + 2 + channelToKey.length + 2 + 1 + ((int) Math.log10(bArr.length)) + 1 + 2 + bArr.length + 2;
                ByteBuf buffer = this.channel.alloc().buffer(log10, log10);
                buffer.writeCharSequence("*3\r\n$7\r\nmessage\r\n$" + channelToKey.length + "\r\n", CharsetUtil.US_ASCII);
                buffer.writeBytes(channelToKey);
                buffer.writeCharSequence("\r\n$" + bArr.length + "\r\n", CharsetUtil.US_ASCII);
                buffer.writeBytes(bArr);
                buffer.writeByte(13);
                buffer.writeByte(10);
                if (!$assertionsDisabled && buffer.writerIndex() != log10) {
                    throw new AssertionError();
                }
                this.channel.writeAndFlush(buffer, this.channel.voidPromise());
            }
            return CompletableFutures.completedNull();
        }

        static {
            $assertionsDisabled = !SubscriberHandler.class.desiredAssertionStatus();
        }
    }

    public SubscriberHandler(RespServer respServer, Resp3Handler resp3Handler) {
        super(respServer);
        this.specificChannelSubscribers = new HashMap();
        this.handler = resp3Handler;
    }

    public static byte[] keyToChannel(byte[] bArr) {
        byte[] bArr2 = new byte[bArr.length + PREFIX_CHANNEL_BYTES.length];
        System.arraycopy(PREFIX_CHANNEL_BYTES, 0, bArr2, 0, PREFIX_CHANNEL_BYTES.length);
        System.arraycopy(bArr, 0, bArr2, PREFIX_CHANNEL_BYTES.length, bArr.length);
        return bArr2;
    }

    public static byte[] channelToKey(byte[] bArr) {
        return Arrays.copyOfRange(bArr, PREFIX_CHANNEL_BYTES.length, bArr.length);
    }

    @Override // org.infinispan.server.resp.RespRequestHandler
    public void handleChannelDisconnect(ChannelHandlerContext channelHandlerContext) {
        removeAllListeners();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.server.resp.RespRequestHandler
    public CompletionStage<RespRequestHandler> actualHandleRequest(ChannelHandlerContext channelHandlerContext, RespCommand respCommand, List<byte[]> list) {
        initializeIfNecessary(channelHandlerContext);
        switch (respCommand) {
            case SUBSCRIBE:
                AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
                for (byte[] bArr : list) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Subscriber for channel: " + CharsetUtil.UTF_8.decode(ByteBuffer.wrap(bArr)), new Object[0]);
                    }
                    WrappedByteArray wrappedByteArray = new WrappedByteArray(bArr);
                    if (this.specificChannelSubscribers.get(wrappedByteArray) == null) {
                        PubSubListener pubSubListener = new PubSubListener(channelHandlerContext.channel(), this.cache.getKeyDataConversion(), this.cache.getValueDataConversion());
                        this.specificChannelSubscribers.put(wrappedByteArray, pubSubListener);
                        aggregateCompletionStage.dependsOn(handleStageListenerError(this.cache.addListenerAsync(pubSubListener, new ListenerKeyFilter(keyToChannel(bArr), this.cache.getKeyDataConversion()), (CacheEventConverter) null), bArr, true));
                    }
                }
                return sendSubscriptions(channelHandlerContext, aggregateCompletionStage.freeze(), list, true);
            case UNSUBSCRIBE:
                AggregateCompletionStage aggregateCompletionStage2 = CompletionStages.aggregateCompletionStage();
                if (list.size() == 0) {
                    return unsubscribeAll(channelHandlerContext);
                }
                for (byte[] bArr2 : list) {
                    PubSubListener remove = this.specificChannelSubscribers.remove(new WrappedByteArray(bArr2));
                    if (remove != null) {
                        aggregateCompletionStage2.dependsOn(handleStageListenerError(this.cache.removeListenerAsync(remove), bArr2, false));
                    }
                }
                return sendSubscriptions(channelHandlerContext, aggregateCompletionStage2.freeze(), list, false);
            case PING:
                this.handler.handleRequest(channelHandlerContext, respCommand, list);
                break;
            case RESET:
            case QUIT:
                removeAllListeners();
                return this.handler.handleRequest(channelHandlerContext, respCommand, list);
            case PSUBSCRIBE:
            case PUNSUBSCRIBE:
                RespRequestHandler.stringToByteBuf("-ERR not implemented yet\r\n", this.allocatorToUse);
                break;
            default:
                return super.actualHandleRequest(channelHandlerContext, respCommand, list);
        }
        return this.myStage;
    }

    private CompletionStage<Void> handleStageListenerError(CompletionStage<Void> completionStage, byte[] bArr, boolean z) {
        return completionStage.whenComplete((r7, th) -> {
            if (th != null) {
                if (z) {
                    log.exceptionWhileRegisteringListener(th, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(bArr)));
                } else {
                    log.exceptionWhileRemovingListener(th, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(bArr)));
                }
            }
        });
    }

    private void removeAllListeners() {
        Iterator<Map.Entry<WrappedByteArray, PubSubListener>> it = this.specificChannelSubscribers.entrySet().iterator();
        while (it.hasNext()) {
            this.cache.removeListenerAsync(it.next().getValue());
            it.remove();
        }
    }

    private CompletionStage<RespRequestHandler> unsubscribeAll(ChannelHandlerContext channelHandlerContext) {
        AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        ArrayList arrayList = new ArrayList(this.specificChannelSubscribers.size());
        Iterator<Map.Entry<WrappedByteArray, PubSubListener>> it = this.specificChannelSubscribers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<WrappedByteArray, PubSubListener> next = it.next();
            CompletionStage<Void> removeListenerAsync = this.cache.removeListenerAsync(next.getValue());
            byte[] bytes = next.getKey().getBytes();
            arrayList.add(bytes);
            aggregateCompletionStage.dependsOn(handleStageListenerError(removeListenerAsync, bytes, false));
            it.remove();
        }
        return sendSubscriptions(channelHandlerContext, aggregateCompletionStage.freeze(), arrayList, false);
    }

    private CompletionStage<RespRequestHandler> sendSubscriptions(ChannelHandlerContext channelHandlerContext, CompletionStage<Void> completionStage, Collection<byte[]> collection, boolean z) {
        return stageToReturn(completionStage, channelHandlerContext, (r6, byteBufPool) -> {
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                byte[] bArr = (byte[]) it.next();
                String str = z ? "*2\r\n$9\r\nsubscribe\r\n$" : "*2\r\n$11\r\nunsubscribe\r\n$";
                int length = str.length() + ((int) Math.log10(bArr.length)) + 1 + 2 + bArr.length + 2;
                ByteBuf apply = byteBufPool.apply(length);
                int writerIndex = apply.writerIndex();
                apply.writeCharSequence(str + bArr.length + "\r\n", CharsetUtil.US_ASCII);
                apply.writeBytes(bArr);
                apply.writeByte(13);
                apply.writeByte(10);
                if (!$assertionsDisabled && apply.writerIndex() - writerIndex != length) {
                    throw new AssertionError();
                }
            }
        });
    }

    static {
        $assertionsDisabled = !SubscriberHandler.class.desiredAssertionStatus();
        log = (Log) LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);
        PREFIX_CHANNEL_BYTES = new byte[]{114, 101, 115, 112, 124};
    }
}
