package org.infinispan.server.resp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.server.resp.logging.Log;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;

/* loaded from: input_file:org/infinispan/server/resp/SubscriberHandler.class */
public class SubscriberHandler extends RespRequestHandler {
    private static final Log log = (Log) LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);
    public static final byte[] PREFIX_CHANNEL_BYTES = {-114, 16, 78, -3, Byte.MAX_VALUE};
    private final Resp3Handler handler;
    private final RespServer respServer;
    Map<WrappedByteArray, PubSubListener> specificChannelSubscribers = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    @Listener(clustered = true)
    /* loaded from: input_file:org/infinispan/server/resp/SubscriberHandler$PubSubListener.class */
    public static class PubSubListener {
        private final Channel channel;

        PubSubListener(Channel channel) {
            this.channel = channel;
        }

        @CacheEntryCreated
        @CacheEntryModified
        public CompletionStage<Void> onEvent(CacheEntryEvent<byte[], byte[]> cacheEntryEvent) {
            byte[] channelToKey = SubscriberHandler.channelToKey((byte[]) cacheEntryEvent.getKey());
            byte[] bArr = (byte[]) cacheEntryEvent.getValue();
            if (channelToKey.length > 0 && bArr != null && bArr.length > 0) {
                ByteBuf buffer = this.channel.alloc().buffer(14 + ((int) Math.log10(channelToKey.length)) + 1 + 2 + channelToKey.length + 2 + 1 + ((int) Math.log10(bArr.length)) + 1 + 2 + bArr.length + 2);
                buffer.writeCharSequence("*3\r\n$7\r\nmessage\r\n$" + channelToKey.length + "\r\n", CharsetUtil.UTF_8);
                buffer.writeBytes(channelToKey);
                buffer.writeCharSequence("\r\n$" + bArr.length + "\r\n", CharsetUtil.UTF_8);
                buffer.writeBytes(bArr);
                buffer.writeCharSequence("\r\n", CharsetUtil.UTF_8);
                this.channel.writeAndFlush(buffer);
            }
            return CompletableFutures.completedNull();
        }
    }

    public SubscriberHandler(RespServer respServer, Resp3Handler resp3Handler) {
        this.respServer = respServer;
        this.handler = resp3Handler;
    }

    public static byte[] keyToChannel(byte[] bArr) {
        byte[] bArr2 = new byte[bArr.length + PREFIX_CHANNEL_BYTES.length];
        System.arraycopy(PREFIX_CHANNEL_BYTES, 0, bArr2, 0, PREFIX_CHANNEL_BYTES.length);
        System.arraycopy(bArr, 0, bArr2, PREFIX_CHANNEL_BYTES.length, bArr.length);
        return bArr2;
    }

    public static byte[] channelToKey(byte[] bArr) {
        return Arrays.copyOfRange(bArr, PREFIX_CHANNEL_BYTES.length, bArr.length);
    }

    @Override // org.infinispan.server.resp.RespRequestHandler
    public void handleChannelDisconnect(ChannelHandlerContext channelHandlerContext) {
        removeAllListeners();
    }

    @Override // org.infinispan.server.resp.RespRequestHandler
    public CompletionStage<RespRequestHandler> handleRequest(ChannelHandlerContext channelHandlerContext, String str, List<byte[]> list) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -1558724943:
                if (str.equals("UNSUBSCRIBE")) {
                    z = true;
                    break;
                }
                break;
            case -993530582:
                if (str.equals("SUBSCRIBE")) {
                    z = false;
                    break;
                }
                break;
            case 2455922:
                if (str.equals("PING")) {
                    z = 2;
                    break;
                }
                break;
            case 2497103:
                if (str.equals("QUIT")) {
                    z = 4;
                    break;
                }
                break;
            case 77866287:
                if (str.equals("RESET")) {
                    z = 3;
                    break;
                }
                break;
            case 177957985:
                if (str.equals("PUNSUBSCRIBE")) {
                    z = 6;
                    break;
                }
                break;
            case 465258202:
                if (str.equals("PSUBSCRIBE")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
                for (byte[] bArr : list) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Subscriber for channel: " + CharsetUtil.UTF_8.decode(ByteBuffer.wrap(bArr)), new Object[0]);
                    }
                    WrappedByteArray wrappedByteArray = new WrappedByteArray(bArr);
                    if (this.specificChannelSubscribers.get(wrappedByteArray) == null) {
                        PubSubListener pubSubListener = new PubSubListener(channelHandlerContext.channel());
                        this.specificChannelSubscribers.put(wrappedByteArray, pubSubListener);
                        byte[] keyToChannel = keyToChannel(bArr);
                        aggregateCompletionStage.dependsOn(handleStageListenerError(this.respServer.getCache().addListenerAsync(pubSubListener, (bArr2, bArr3, metadata, bArr4, metadata2, eventType) -> {
                            return Arrays.equals(bArr2, keyToChannel);
                        }, (CacheEventConverter) null), bArr, true));
                    }
                }
                return sendSubscriptions(channelHandlerContext, aggregateCompletionStage.freeze(), list, true);
            case true:
                AggregateCompletionStage aggregateCompletionStage2 = CompletionStages.aggregateCompletionStage();
                if (list.size() == 0) {
                    return unsubscribeAll(channelHandlerContext);
                }
                for (byte[] bArr5 : list) {
                    PubSubListener remove = this.specificChannelSubscribers.remove(new WrappedByteArray(bArr5));
                    if (remove != null) {
                        aggregateCompletionStage2.dependsOn(handleStageListenerError(this.respServer.getCache().removeListenerAsync(remove), bArr5, false));
                    }
                }
                return sendSubscriptions(channelHandlerContext, aggregateCompletionStage2.freeze(), list, false);
            case true:
                this.handler.handleRequest(channelHandlerContext, str, list);
                break;
            case true:
            case true:
                removeAllListeners();
                return this.handler.handleRequest(channelHandlerContext, str, list);
            case true:
            case true:
                channelHandlerContext.writeAndFlush(RespRequestHandler.stringToByteBuf("-ERR not implemented yet\r\n", channelHandlerContext.alloc()));
                break;
            default:
                return super.handleRequest(channelHandlerContext, str, list);
        }
        return this.myStage;
    }

    private CompletionStage<Void> handleStageListenerError(CompletionStage<Void> completionStage, byte[] bArr, boolean z) {
        return completionStage.whenComplete((r7, th) -> {
            if (th != null) {
                if (z) {
                    log.exceptionWhileRegisteringListener(th, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(bArr)));
                } else {
                    log.exceptionWhileRemovingListener(th, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(bArr)));
                }
            }
        });
    }

    private void removeAllListeners() {
        Iterator<Map.Entry<WrappedByteArray, PubSubListener>> it = this.specificChannelSubscribers.entrySet().iterator();
        while (it.hasNext()) {
            this.respServer.getCache().removeListenerAsync(it.next().getValue());
            it.remove();
        }
    }

    private CompletionStage<RespRequestHandler> unsubscribeAll(ChannelHandlerContext channelHandlerContext) {
        AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        ArrayList arrayList = new ArrayList(this.specificChannelSubscribers.size());
        Iterator<Map.Entry<WrappedByteArray, PubSubListener>> it = this.specificChannelSubscribers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<WrappedByteArray, PubSubListener> next = it.next();
            CompletionStage<Void> removeListenerAsync = this.respServer.getCache().removeListenerAsync(next.getValue());
            byte[] bytes = next.getKey().getBytes();
            arrayList.add(bytes);
            aggregateCompletionStage.dependsOn(handleStageListenerError(removeListenerAsync, bytes, false));
            it.remove();
        }
        return sendSubscriptions(channelHandlerContext, aggregateCompletionStage.freeze(), arrayList, false);
    }

    private CompletionStage<RespRequestHandler> sendSubscriptions(ChannelHandlerContext channelHandlerContext, CompletionStage<Void> completionStage, Collection<byte[]> collection, boolean z) {
        return stageToReturn(completionStage, channelHandlerContext, (r8, th) -> {
            if (th != null) {
                if (z) {
                    channelHandlerContext.writeAndFlush("-ERR Failure adding client listener");
                    return;
                } else {
                    channelHandlerContext.writeAndFlush("-ERR Failure unsubscribing client listener");
                    return;
                }
            }
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                byte[] bArr = (byte[]) it.next();
                int i = z ? 20 : 22;
                String str = z ? "*2\r\n$9\r\nsubscribe\r\n$" : "*2\r\n$11\r\nunsubscribe\r\n$";
                ByteBuf buffer = channelHandlerContext.alloc().buffer(i + ((int) Math.log10(bArr.length)) + 1 + bArr.length + 2);
                buffer.writeCharSequence(str + bArr.length + "\r\n", CharsetUtil.UTF_8);
                buffer.writeBytes(bArr);
                buffer.writeCharSequence("\r\n", CharsetUtil.UTF_8);
                channelHandlerContext.writeAndFlush(buffer);
            }
        });
    }
}
