乾貨!解析 binlog 的三種實現手段

不知道你是否有遇到過需要解析 binlog 日誌的場景。今天我整理了一些對 binlog 解析的解決方案,供大家參考下。

基於 Canal 的實時訂閱

一般業內對 binlog 進行實時監聽最常用的中間件會是 canal。canal 其實本質底層是制定了一個僞造的 MySQL 的 Slave 節點,接收 MySQL 主節點發送過來的 binlog 文件。只需要我們引入相關的依賴,然後部署一套 Canal 服務即可。Canal 的部署,可以採用單機或者結合 zk 做集羣架構。

相關的客戶端依賴如下:

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <canal.version>1.1.4</canal.version>
</properties>

<description>canal監聽案例</description>

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>${canal.version}</version>
    </dependency>
    <!-- Message、CanalEntry.Entry等來自此安裝包 -->
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>${canal.version}</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.20</version>
    </dependency>
</dependencies>

然後具體的實現代碼可以如下:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @author idea
 * @description 監聽canal的回調輸出
 */

public class CanalDemo {

    //Canal服務地址 使用自己虛擬機的ip地址
    private static final String SERVER_ADDRESS = "127.0.0.1";

    //Canal Server 服務端口號
    private static final Integer PORT = 11111;

    //目的地,其實Canal Service內部有一個隊列,和配置文件中一致即可,參考【修改instance.properties】圖中
    private static final String DESTINATION = "user_instance";

    //用戶名和密碼,但是目前不支持,只能爲空
    private static final String USERNAME = "canal";

    //用戶名和密碼,但是目前不支持,只能爲空
    private static final String PASSWORD = "canal";

    public static void main(String[] args) {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);
        canalConnector.connect();
        //訂閱所有消息
        canalConnector.subscribe(".*\..*");
        // 只訂閱test數據庫下的所有表
        //canalConnector.subscribe("test");
        //恢復到之前同步的那個位置
        canalConnector.rollback();
        System.out.println("啓動canal");
        for (; ; ) {
            //獲取指定數量的數據,但是不做確認標記,下一次取還會取到這些信息。注:不會阻塞,若不夠100,則有多少返回多少
            Message message = canalConnector.getWithoutAck(100);
            //獲取消息id
            long batchId = message.getId();
            if (batchId != -1) {
                log.info("msgId -> " + batchId);
                printEnity(message.getEntries());
                //提交確認
                //canalConnector.ack(batchId);
                //處理失敗,回滾數據
                //canalConnector.rollback(batchId);
            }
        }
    }

    private static void printEnity(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                continue;
            }
            try {
                // 序列化數據
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    System.out.println(rowChange.getEventType());
                    switch (rowChange.getEventType()) {
                        //如果希望監聽多種事件,可以手動增加case
                        case INSERT:
                            // 表名
                            String tableName = entry.getHeader().getTableName();
                            //System.out.println("表名:"+tableName);
                            //測試users表進行映射處
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            //for(CanalEntry.Column c:afterColumnsList){
                            // System.out.println("字段:"+c.getName()+"值:"+c.getValue());
                            //}

                            System.out.println(afterColumnsList);
                            break;
                        case UPDATE:
                            List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
                            System.out.println("新插入的數據是:" + afterColumnsList2);
                            break;
                        case DELETE:
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            System.out.println("被刪除的數據是:" + beforeColumnsList);
                            break;
                        case CREATE:
                            System.out.println("創建表格");
                        case ALTER:
                            System.out.println("alert變更");
                        case TRUNCATE:
                            System.out.println("truncate變更");
                        default:
                    }
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    }
}

Canal 可能存在的問題

在實際使用 Canal 的時候需要注意,它可能會存在以下問題。

一般使用 Canal 的話,建議修改 MySQL 主節點的 binlog 爲 ROW 格式,否則可能會引發很多隱藏問題。例如當 MySQL 的 BinLog 格式爲 mixed 類型的時候,可能 Canal 會收到 QUERY,UPDATE 兩條類型的事件。

由於 Canal 的部署其實由於要保證 MySQL 的 binlog 順序性,所以其實底層是有一個緩衝區用於接受 binlog 數據的,

Server: 整個 canal 實例 Instance: 你可以理解爲一個特殊的隊列角色,一個 Instance 可以用於承接一系列相關的庫表變更數據內容。

在 Instance 的內部,還有幾個角色:

由於順序性的原因,Canal 的一個 Instance 一次只能給一個消費者訪問(pull,ack)。因此如果該消費者的消費能力較弱,就會導致大量的數據堆積在 Canal 的內存中,高併發場景下可能會有 OOM 情況發生。所以 Canal 一般會和常用的 MQ 進行結合使用。例如內接入 RocketMQ 或者 Kafka,當收到 binlog 後發送到 MQ 中,讓 MQ 來承載這些壓力。(MQ 天生就是適合抗高併發的)

但是要注意 MQ 的內部隊列也得設置成一個,並且消費端得單線程消費,保證數據的順序性。

canal 監聽 mysql 的時候會在 conf/{destination}/meta.dat 文件中記錄當前 binlog 的名字、position。

下次啓動 canal 的時候就從這個 binlog 的 position 開始讀取數據。

meta.dat 記錄的是上一次的 binlog 信息,當你刪除 mysql 的 binlog 或者監聽到另外一臺 mysql 後,meta.dat 記錄的信息就相當於過期信息,所以就會出現 PositionNotFoundException。

基於 mysqlbinlog 命令進行解析

上邊我們介紹的 Canal 更多是實時數據解析時候使用的,有時候可能你只需要對 binlog 文件做一些簡單的文件解析而已,這種情況就不需要用 Canal 這麼複雜的組件了。

mysql 內置的 mysqlbinlog 工具就是一款不錯的命令工具,它內部支持對 binlog 的各種轉換。例如 row 格式的 binlog 直接給你轉成可讀性較強的 sql。

https://dev.mysql.com/doc/refman/5.7/en/mysqlbinlog.html

mysqlbinlog 的常用命令

mysqlbinlog [*options*] *log_file* ...
mysqlbinlog --base64-output=decode-rows  /data/binlog.00001 > mysql-binlog-00001.sql
mysqlbinlog -v --base64-output=decode-rows --start-datetime="2021-4-20 00:00:00" --stop-datetime="2021-04-20 23:59:59" mysql-binlog.000001 > mysql-binlog-000001.sql
mysqlbinlog -v --base64-output=decode-rows --database=test --start-datetime="2021-4-20 00:00:00" --stop-datetime="2021-04-20 23:59:59" mysql-bin.000001 > mysql-binlog-000001.sql

使用 base64-output 格式輸出的內容格式如下:

/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 1180
#210420 20:24:21 server id 1  end_log_pos 1237 CRC32 0x6d2e5f81 	Table_map: `test`.`users` mapped to number 108
# at 1237
#210420 20:24:21 server id 1  end_log_pos 1313 CRC32 0x7ef28a01 	Update_rows: table id 108 flags: STMT_END_F
### UPDATE `test`.`users`
### WHERE
###   @1=4
###   @2='idea-0001'
###   @3=17
###   @4=1618841040
###   @5=0
### SET
###   @1=4
###   @2='idea'
###   @3=18
###   @4=1618841040
###   @5=1618921461
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by mysqlbinlog */ /*!*/;
DELIMITER ;
# End of log file
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;

你會發現基本的 SQL 語句會有輸出,但是並不會展示具體的字段名稱(我猜是爲了節省空間吧)。這裏你可以自行進行 grep 分析,就會得出相關的 INSERT,UPDATE,DELETE 的 sql 條數了,下邊是一份我自己測試用的 Shell 腳本,供大家參考下:

cat ./mysql-binlog-000001.sql | grep 'UPDATE ' >update.sql
cat ./mysql-binlog-000001.sql | grep 'INSERT ' >insert.sql
cat ./mysql-binlog-000001.sql | grep 'DELETE ' >delete.sql

mysql-binlog-connector 開源工具

mysql-binlog-connector 是一款輕量級的 jar 包,可以用於自行解析 binlog 日誌內容。相關 github 倉庫地址: https://github.com/shyiko/mysql-binlog-connector-java 這款 jar 是一些個人開發者維護的 jar 包,可以用於實現類似 Canal 一樣的監聽 binlog 變更的效果。目前 GitHub 上的 star 數量也不少。

首先我們只需引入以下依賴:

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <binlog.version>0.21.0</binlog.version>
    <fastjson.version>2.0.25</fastjson.version>
</properties>

<dependencies>
    <dependency>
        <groupId>com.github.shyiko</groupId>
        <artifactId>mysql-binlog-connector-java</artifactId>
        <version>${binlog.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
</dependencies>

解析 binlog 日誌內容代碼如下:

import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;

import java.io.File;
import java.io.IOException;

/**
 * @author idea
 * @description
 */
public class BinlogMain {

    private static String sourceFilePath = "本地binlog文件的地址";


    public static void main(String[] args) throws IOException {
        String filePath = sourceFilePath;
        File binlogFile = new File(filePath);
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setChecksumType(ChecksumType.CRC32);
        BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile,
                eventDeserializer);
        try {
            for (Event event; (event = reader.readEvent()) != null; ) {
                EventType eventType = event.getHeader().getEventType();
                if (EventType.QUERY.equals(eventType)) {
                    System.out.println(event);
                } else if (EventType.UPDATE_ROWS.equals(eventType)) {
                    System.out.println(event);
                } else if (EventType.TABLE_MAP.equals(eventType)) {
                    //每個rows_event事件的前後都會有個binlog記錄,用於描述表內部的id和變化
                    System.out.println(event);
                } else if (EventType.isUpdate(eventType)) {
                    System.out.println(event);
                } else if (EventType.isWrite(eventType)) {
                    System.out.println(event);
                } else if (EventType.isDelete(eventType)) {
                    System.out.println(event);
                } else if (EventType.isUpdate(eventType)) {
                    System.out.println(event);
                }
            }
        } finally {
            reader.close();
        }

    }

}

除了支持解析已有的 binlog 日誌外,使用該組件還支持僞造成 slave,實時監聽 binlog 變更的效果。但是它在數據處理,穩定性等各方面均不如阿里巴巴的 Canal 要好,所以適合簡單場景處理。如果你需要做實時監聽,可以參考這篇文章:https://blog.csdn.net/m0_37583655/article/details/119148470

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/2C_AfPWMcWwkJKf6P07C8g