基於 Redis 實現內部消息服務通信

前言

先說一下爲什麼要有這個東西,用消息中間件的好處就不用說了,日常開發中還是有很多場景需要用到消息傳遞的,消息的topic如何管理,如何約束topic,重要的topic消費記錄、歷史消息等就是這個sdk需要做的。

本質上只是一層對消息中間件的封裝。這次只是拋磚引玉只引入redis的三種消息類型,pubsub、queue以及stream

擴展其他中間件按着代碼思路一樣。望各路大佬賜教

架構設計

一個消息服務sdk首先需要具備兩個能力,即生產和消費,這兩個功能離不開校驗topic合法性,我們姑且簡單點在mysql數據庫中,但不可能每次校驗topic是否合法都要去查詢數據庫,這裏借鑑kafka存放topic信息的思想,找一個rediskey存放所有的topic列表。

定義一個核心 service 接口

public interface MessageHubService {

    /**
     * 生產消息
     */
    void producer(MessageForm messageForm);

    /**
     * 消費消息
     */
    void consumer(ConsumerAdapterForm adapterForm);

    /**
     * 檢查topic、type合法性
     */
    void checkTopic(String topic, String type);
}

方法入參統一使用MessageForm類,裏面定義一些基礎的信息,比如哪個消息topic,哪個消息類型等等。

@Data
public class MessageForm {
    // 消息組件類型
    private String type;
    // 消息主題
    private String topic;
    private String message = "";
    // 消費者組
    private String group = "UPTOWN";
}

具體實現

基礎類實現

@Service
public class MessageHubServiceImpl implements MessageHubService, ApplicationContextAware {

    @Resource
    protected StringRedisTemplate stringRedisTemplate;

    public Map<String, MessageHubService> messageHubServiceMap = new ConcurrentHashMap<>();

    private ApplicationContext applicationContext;


    @PostConstruct
    public void init() {

        messageHubServiceMap.put(TopicTypeConstants.REDIS_PUBSUB_TYPE, applicationContext.getBean(RedisPubSubProcessor.class));
        messageHubServiceMap.put(TopicTypeConstants.REDIS_STREAM_TYPE, applicationContext.getBean(RedisQueueProcessor.class));
        messageHubServiceMap.put(TopicTypeConstants.REDIS_QUEUE_TYPE, applicationContext.getBean(RedisStreamProcessor.class));
    }

    public void checkTopic(String topic, String type) {
        if (!messageHubServiceMap.containsKey(type)) {

            throw new MatrixException("消息類型不支持");
        }

        List<String> whiteTopicList = stringRedisTemplate.opsForList().range(TopicTypeConstants.WHITE_TOPIC, 0, -1);
        if ((!ObjectUtils.isEmpty(whiteTopicList) && !whiteTopicList.contains(topic)) || ObjectUtils.isEmpty(whiteTopicList)) {
            throw new MatrixException("當前topic未配置");
        }
    }

    @Override
    public void producer(MessageForm messageForm) {
        this.checkTopic(messageForm.getTopic(), messageForm.getType());
        this.messageHubServiceMap.get(messageForm.getType()).producer(messageForm);
    }


    /**
     * 消費者創建通過註解,已校驗topic合法性
     */
    @Override
    public void consumer(ConsumerAdapterForm messageForm) {
        this.messageHubServiceMap.get(messageForm.getType()).consumer(messageForm);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

具體自實現類

@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {

    @Override
    public void producer(MessageForm messageForm) {
        // 具體生產邏輯
    }

    @Override
    public void consumer(ConsumerAdapterForm messageForm) {
        // 具體消費邏輯
    }
}

生產者邏輯

生產者API做的比較簡單,只是提供一個API調用,在調用前做一些校驗工作,僅僅的是一條命令,不做發送失敗的重試等操作。

消費者邏輯

消費者的話還是定義一個註解,通過藉助SpringBoot生命週期掃描註解的方式在後臺建立常駐線程的方式。

@Slf4j
@Component
public class ConsumerConfig implements DisposableBean, SmartInstantiationAwareBeanPostProcessor {


    @Resource(name = "messageHubServiceImpl")
    MessageHubService messageHubService;

    @Bean(name = "redisPubSubConsumerMap")
    public Map<String, MessageListenerAdapter> redisPubSubConsumerMap() {
        return new ConcurrentHashMap<>();
    }

    @Override
    public void destroy() throws Exception {

    }

    @Override
    public Object getEarlyBeanReference(Object bean, String beanName) throws BeansException {
        Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
        for (Method method : methods) {
            MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
            if (annotation == null) {
                continue;
            }
            String resolveTopic = annotation.topic();
            try {
                messageHubService.checkTopic(resolveTopic, annotation.type());
            } catch (Exception e) {

                throw new Error(e.getMessage());
            }

            ConsumerAdapterForm adapterForm = new ConsumerAdapterForm();
            adapterForm.setBean(bean);
            adapterForm.setInvokeMethod(method);
            adapterForm.setTopic(resolveTopic);
            adapterForm.setType(annotation.type());
            adapterForm.setGroup(annotation.group());
            messageHubService.consumer(adapterForm);
        }
        return bean;
    }
}

這裏依靠spring生命週期,拿到所有的bean,根據註解標註的方法去走不同的邏輯生成常駐線程,監聽到消息之後回調到標註了註解的方法裏。

Topic 守護線程

@Slf4j
@Service
public class TopicReloadTask extends TimerTask {

    @Resource
    StringRedisTemplate stringRedisTemplate;

    @Resource
    EntityManager entityManager;


    public final String TOPIC_SQL = " select * from MESSAGEHUB_TOPIC ";
    public final String LUA_SCRIPT =
                "redis.call('del', 'MESSAGEHUB_TOPIC')" +
                "local topics = KEYS " +
                "for i, v in pairs(topics) do " +
                "  redis.call('lpush', 'MESSAGEHUB_TOPIC', v) " +
                "end";


    @Override
    public void run() {
        try {
            List<String> topics = this.getQueryResult(TOPIC_SQL, MessageHubTopicBean.class).stream().map(MessageHubTopicBean::getTopic).collect(Collectors.toList());
            if (!ObjectUtils.isEmpty(topics)) {

                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
                Long result = stringRedisTemplate.execute(redisScript, topics);
                log.info("reload topic finish");
            }
        } catch (Throwable t) {

            log.error("messagehub topic reload error", t);
        }
    }

    private <T> List<T> getQueryResult(String sql, Class<T> clazz) {

        Query dataQuery = entityManager.createNativeQuery(sql, clazz);
        List<T> result = new ArrayList<>();
        List<Object> list = dataQuery.getResultList();
        for (Object o : list) {
            result.add((T) o);
        }
        return result;
    }
}

定義一個timer任務,隔一段時間將mysql中的topic白名單通過lua腳本的方式刷新到指定的reids topic key中。

還有一些可以優化的地方,比如同步topic的操作只需要一個服務即可,所以可以使用@ConditionalOnProperty註解判斷是否需要進行同步topic

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/CmrfSSQWS0RdQquC6-njXg