package tech.guyi.component.message.stream.api.consumer;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.guyi.component.message.stream.api.converter.MessageTypeConverters;
import tech.guyi.component.message.stream.api.hook.MessageStreamHook;
import tech.guyi.component.message.stream.api.hook.MessageStreamHookRunner;
import tech.guyi.component.message.stream.api.stream.MessageStreams;
import tech.guyi.component.message.stream.api.utils.AntPathMatchers;
import tech.guyi.component.message.stream.api.worker.MessageStreamWorker;

/* loaded from: input_file:tech/guyi/component/message/stream/api/consumer/MessageConsumers.class */
public class MessageConsumers {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumers.class);

    @Resource
    private AntPathMatchers matchers;

    @Resource
    private MessageStreamWorker worker;

    @Resource
    private MessageStreams messageStreams;

    @Resource
    private MessageTypeConverters converters;

    @Resource
    private MessageStreamHookRunner runner;
    private final Map<String, MessageConsumer<?>> consumers = new HashMap();

    public void onMessage(String str, String str2, Map<String, Object> map, byte[] bArr, MessageConsumer messageConsumer) {
        Class messageType = messageConsumer.messageType();
        this.worker.submit(() -> {
            Object convert;
            if (messageType == byte[].class) {
                convert = bArr;
            } else {
                try {
                    convert = this.converters.convert(bArr, messageType);
                } catch (Exception e) {
                    log.error("消息消费异常", e);
                    return;
                }
            }
            messageConsumer.accept(convert, str, str2, map);
        });
    }

    public void onMessage(String str, String str2, Map<String, Object> map, byte[] bArr) {
        Stream<String> distinct = this.consumers.keySet().stream().filter(str3 -> {
            return this.matchers.match(str3, str);
        }).distinct();
        Map<String, MessageConsumer<?>> map2 = this.consumers;
        map2.getClass();
        distinct.map((v1) -> {
            return r1.get(v1);
        }).filter(messageConsumer -> {
            return messageConsumer.getStream() == null || messageConsumer.getStream().isEmpty() || messageConsumer.getStream().contains(str2);
        }).forEach(messageConsumer2 -> {
            onMessage(str, str2, map, bArr, messageConsumer2);
        });
    }

    public void register(MessageConsumer messageConsumer) {
        for (String str : messageConsumer.getTopic()) {
            this.consumers.put(str.toString(), messageConsumer);
            this.messageStreams.register(str.toString(), messageConsumer.getStream());
        }
        this.runner.run(MessageStreamHook.REGISTER, messageConsumer.getTopic());
    }

    public void unregister(String str) {
        Stream<String> stream = this.consumers.keySet().stream();
        str.getClass();
        stream.filter((v1) -> {
            return r1.equals(v1);
        }).forEach(str2 -> {
            this.messageStreams.unregister(str2, this.consumers.remove(str2).getStream());
        });
        this.runner.run(MessageStreamHook.UN_REGISTER, str);
    }
}
