Kafka 順序消費方案

前言

本文針對解決 Kafka 不同 Topic 之間存在一定的數據關聯時的順序消費問題。如存在 Topic-insert 和 Topic-update 分別是對數據的插入和更新,當 insert 和 update 操作爲同一數據時,應保證先 insert 再 update。

1、問題引入

kafka 的順序消費一直是一個難以解決的問題,kafka 的消費策略是對於同 Topic 同 Partition 的消息可保證順序消費,其餘無法保證。如果一個 Topic 只有一個 Partition,那麼這個 Topic 對應 consumer 的消費必然是有序的。不同的 Topic 的任何情況下都無法保證 consumer 的消費順序和 producer 的發送順序一致。

如果不同 Topic 之間存在數據關聯且對消費順序有要求,該如何處理?本文主要解決此問題。

2、解決思路

現有 Topic-insert 和 Topic-update,數據唯一標識爲 id,對於 id=1 的數據而言,要保證 Topic-insert 消費在前,Topic-update 消費在後。

兩個 Topic 的消費爲不同線程處理,所以爲了保證在同一時間內的同一數據標識的消息僅有一個業務邏輯在處理,需要對業務添加鎖操作。 使用 synchronized 進行加鎖的話,會影響無關聯的 insert 和 update 的數據消費能力,如 id=1 的 insert 和 id=2 的 update,在 synchronized 的情況下,無法併發處理,這是沒有必要的,我們需要的是對於 id=1 的 insert 和 id=1 的 update 在同一時間只有一個在處理,所以使用細粒度鎖來完成加鎖的操作。

細粒度鎖實現:https://blog.csdn.net/qq_38245668/article/details/105891161

PS:如果爲分佈式系統,細粒度鎖需要使用分佈式鎖的對應實現。

在對 insert 和 update 加鎖之後,其實還是沒有解決消費順序的問題,只是確保了同一時間只有一個業務在處理。 對於消費順序異常的問題,也就是先消費了 update 再消費 insert 的情況。

處理方式:消費到 update 數據,校驗庫中是否存在當前數據(也就是是否執行 insert),如果沒有,就將當前 update 數據存入緩存,key 爲數據標識 id,在 insert 消費時檢查是否存在 id 對應的 update 緩存,如果有,就證明當前數據的消費順序異常,需執行 update 操作,再將緩存數據移除。

3、實現方案

消息發送:

kafkaTemplate.send("TOPIC_INSERT""1");
kafkaTemplate.send("TOPIC_UPDATE""1");

監聽代碼示例:

KafkaListenerDemo.java

@Component
@Slf4j
public class KafkaListenerDemo {

    // 消費到的數據緩存
    private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
    // 數據存儲
    private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
    private WeakRefHashLock weakRefHashLock;

    public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
        this.weakRefHashLock = weakRefHashLock;
    }

    @KafkaListener(topics = "TOPIC_INSERT")
    public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
        // 模擬順序異常,也就是insert後消費,這裏線程sleep
        Thread.sleep(1000);

        String id = record.value();
        log.info("接收到insert :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            log.info("開始處理 {} 的insert", id);
            // 模擬 insert 業務處理
            Thread.sleep(1000);
            // 從緩存中獲取 是否存在有update數據
            if (UPDATE_DATA_MAP.containsKey(id)){
                // 緩存數據存在,執行update
                doUpdate(id);
            }
            log.info("處理 {} 的insert 結束", id);
        }finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    @KafkaListener(topics = "TOPIC_UPDATE")
    public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{

        String id = record.value();
        log.info("接收到update :: {}", id);
        Lock lock = weakRefHashLock.lock(id);
        lock.lock();
        try {
            // 測試使用,不做數據庫的校驗
            if (!DATA_MAP.containsKey(id)){
                // 未找到對應數據,證明消費順序異常,將當前數據加入緩存
                log.info("消費順序異常,將update數據 {} 加入緩存", id);
                UPDATE_DATA_MAP.put(id, id);
            }else {
                doUpdate(id);
            }
        }finally {
            lock.unlock();
        }
        acknowledgment.acknowledge();
    }

    void doUpdate(String id) throws InterruptedException{
        // 模擬 update
        log.info("開始處理update::{}", id);
        Thread.sleep(1000);
        log.info("處理update::{} 結束", id);
    }

}

日誌(代碼中已模擬必現消費順序異常的場景):

接收到update ::1
消費順序異常,將update數據 1 加入緩存
接收到insert ::1
開始處理 1 的insert
開始處理update::1
處理update::1 結束
處理 1 的insert 結束

觀察日誌,此方案可正常處理不同 Topic 再存在數據關聯的消費順序問題。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/idq7lrb24VRRRKHIpU2jFg