基於 Redis 實現內部消息服務通信
前言
先說一下爲什麼要有這個東西,用消息中間件的好處就不用說了,日常開發中還是有很多場景需要用到消息傳遞的,消息的topic
如何管理,如何約束topic
,重要的topic
消費記錄、歷史消息等就是這個sdk
需要做的。
本質上只是一層對消息中間件的封裝。這次只是拋磚引玉只引入redis
的三種消息類型,pubsub、queue
以及stream
。
擴展其他中間件按着代碼思路一樣。望各路大佬賜教
架構設計
一個消息服務sdk
首先需要具備兩個能力,即生產和消費,這兩個功能離不開校驗topic
合法性,我們姑且簡單點在mysql
數據庫中,但不可能每次校驗topic
是否合法都要去查詢數據庫,這裏借鑑kafka
存放topic
信息的思想,找一個redis
的key
存放所有的topic
列表。
定義一個核心 service 接口
public interface MessageHubService {
/**
* 生產消息
*/
void producer(MessageForm messageForm);
/**
* 消費消息
*/
void consumer(ConsumerAdapterForm adapterForm);
/**
* 檢查topic、type合法性
*/
void checkTopic(String topic, String type);
}
方法入參統一使用MessageForm
類,裏面定義一些基礎的信息,比如哪個消息topic
,哪個消息類型等等。
@Data
public class MessageForm {
// 消息組件類型
private String type;
// 消息主題
private String topic;
private String message = "";
// 消費者組
private String group = "UPTOWN";
}
具體實現
基礎類實現
@Service
public class MessageHubServiceImpl implements MessageHubService, ApplicationContextAware {
@Resource
protected StringRedisTemplate stringRedisTemplate;
public Map<String, MessageHubService> messageHubServiceMap = new ConcurrentHashMap<>();
private ApplicationContext applicationContext;
@PostConstruct
public void init() {
messageHubServiceMap.put(TopicTypeConstants.REDIS_PUBSUB_TYPE, applicationContext.getBean(RedisPubSubProcessor.class));
messageHubServiceMap.put(TopicTypeConstants.REDIS_STREAM_TYPE, applicationContext.getBean(RedisQueueProcessor.class));
messageHubServiceMap.put(TopicTypeConstants.REDIS_QUEUE_TYPE, applicationContext.getBean(RedisStreamProcessor.class));
}
public void checkTopic(String topic, String type) {
if (!messageHubServiceMap.containsKey(type)) {
throw new MatrixException("消息類型不支持");
}
List<String> whiteTopicList = stringRedisTemplate.opsForList().range(TopicTypeConstants.WHITE_TOPIC, 0, -1);
if ((!ObjectUtils.isEmpty(whiteTopicList) && !whiteTopicList.contains(topic)) || ObjectUtils.isEmpty(whiteTopicList)) {
throw new MatrixException("當前topic未配置");
}
}
@Override
public void producer(MessageForm messageForm) {
this.checkTopic(messageForm.getTopic(), messageForm.getType());
this.messageHubServiceMap.get(messageForm.getType()).producer(messageForm);
}
/**
* 消費者創建通過註解,已校驗topic合法性
*/
@Override
public void consumer(ConsumerAdapterForm messageForm) {
this.messageHubServiceMap.get(messageForm.getType()).consumer(messageForm);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}。
具體自實現類
@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
@Service("redisPubSubProcessor")
public class RedisPubSubProcessor extends MessageHubServiceImpl {
@Override
public void producer(MessageForm messageForm) {
// 具體生產邏輯
}
@Override
public void consumer(ConsumerAdapterForm messageForm) {
// 具體消費邏輯
}
}
生產者邏輯
生產者API
做的比較簡單,只是提供一個API
調用,在調用前做一些校驗工作,僅僅的是一條命令,不做發送失敗的重試等操作。
消費者邏輯
消費者的話還是定義一個註解,通過藉助SpringBoot
生命週期掃描註解的方式在後臺建立常駐線程的方式。
@Slf4j
@Component
public class ConsumerConfig implements DisposableBean, SmartInstantiationAwareBeanPostProcessor {
@Resource(name = "messageHubServiceImpl")
MessageHubService messageHubService;
@Bean(name = "redisPubSubConsumerMap")
public Map<String, MessageListenerAdapter> redisPubSubConsumerMap() {
return new ConcurrentHashMap<>();
}
@Override
public void destroy() throws Exception {
}
@Override
public Object getEarlyBeanReference(Object bean, String beanName) throws BeansException {
Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
for (Method method : methods) {
MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
if (annotation == null) {
continue;
}
String resolveTopic = annotation.topic();
try {
messageHubService.checkTopic(resolveTopic, annotation.type());
} catch (Exception e) {
throw new Error(e.getMessage());
}
ConsumerAdapterForm adapterForm = new ConsumerAdapterForm();
adapterForm.setBean(bean);
adapterForm.setInvokeMethod(method);
adapterForm.setTopic(resolveTopic);
adapterForm.setType(annotation.type());
adapterForm.setGroup(annotation.group());
messageHubService.consumer(adapterForm);
}
return bean;
}
}
這裏依靠spring
生命週期,拿到所有的bean
,根據註解標註的方法去走不同的邏輯生成常駐線程,監聽到消息之後回調到標註了註解的方法裏。
Topic 守護線程
@Slf4j
@Service
public class TopicReloadTask extends TimerTask {
@Resource
StringRedisTemplate stringRedisTemplate;
@Resource
EntityManager entityManager;
public final String TOPIC_SQL = " select * from MESSAGEHUB_TOPIC ";
public final String LUA_SCRIPT =
"redis.call('del', 'MESSAGEHUB_TOPIC')" +
"local topics = KEYS " +
"for i, v in pairs(topics) do " +
" redis.call('lpush', 'MESSAGEHUB_TOPIC', v) " +
"end";
@Override
public void run() {
try {
List<String> topics = this.getQueryResult(TOPIC_SQL, MessageHubTopicBean.class).stream().map(MessageHubTopicBean::getTopic).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(topics)) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(LUA_SCRIPT, Long.class);
Long result = stringRedisTemplate.execute(redisScript, topics);
log.info("reload topic finish");
}
} catch (Throwable t) {
log.error("messagehub topic reload error", t);
}
}
private <T> List<T> getQueryResult(String sql, Class<T> clazz) {
Query dataQuery = entityManager.createNativeQuery(sql, clazz);
List<T> result = new ArrayList<>();
List<Object> list = dataQuery.getResultList();
for (Object o : list) {
result.add((T) o);
}
return result;
}
}
定義一個timer
任務,隔一段時間將mysql
中的topic
白名單通過lua
腳本的方式刷新到指定的reids topic key
中。
還有一些可以優化的地方,比如同步topic
的操作只需要一個服務即可,所以可以使用@ConditionalOnProperty
註解判斷是否需要進行同步topic
。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/CmrfSSQWS0RdQquC6-njXg