乾貨!解析 binlog 的三種實現手段
不知道你是否有遇到過需要解析 binlog 日誌的場景。今天我整理了一些對 binlog 解析的解決方案,供大家參考下。
基於 Canal 的實時訂閱
一般業內對 binlog 進行實時監聽最常用的中間件會是 canal。canal 其實本質底層是制定了一個僞造的 MySQL 的 Slave 節點,接收 MySQL 主節點發送過來的 binlog 文件。只需要我們引入相關的依賴,然後部署一套 Canal 服務即可。Canal 的部署,可以採用單機或者結合 zk 做集羣架構。
相關的客戶端依賴如下:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<canal.version>1.1.4</canal.version>
</properties>
<description>canal監聽案例</description>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<!-- Message、CanalEntry.Entry等來自此安裝包 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
</dependencies>
然後具體的實現代碼可以如下:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author idea
* @description 監聽canal的回調輸出
*/
public class CanalDemo {
//Canal服務地址 使用自己虛擬機的ip地址
private static final String SERVER_ADDRESS = "127.0.0.1";
//Canal Server 服務端口號
private static final Integer PORT = 11111;
//目的地,其實Canal Service內部有一個隊列,和配置文件中一致即可,參考【修改instance.properties】圖中
private static final String DESTINATION = "user_instance";
//用戶名和密碼,但是目前不支持,只能爲空
private static final String USERNAME = "canal";
//用戶名和密碼,但是目前不支持,只能爲空
private static final String PASSWORD = "canal";
public static void main(String[] args) {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);
canalConnector.connect();
//訂閱所有消息
canalConnector.subscribe(".*\..*");
// 只訂閱test數據庫下的所有表
//canalConnector.subscribe("test");
//恢復到之前同步的那個位置
canalConnector.rollback();
System.out.println("啓動canal");
for (; ; ) {
//獲取指定數量的數據,但是不做確認標記,下一次取還會取到這些信息。注:不會阻塞,若不夠100,則有多少返回多少
Message message = canalConnector.getWithoutAck(100);
//獲取消息id
long batchId = message.getId();
if (batchId != -1) {
log.info("msgId -> " + batchId);
printEnity(message.getEntries());
//提交確認
//canalConnector.ack(batchId);
//處理失敗,回滾數據
//canalConnector.rollback(batchId);
}
}
}
private static void printEnity(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}
try {
// 序列化數據
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
System.out.println(rowChange.getEventType());
switch (rowChange.getEventType()) {
//如果希望監聽多種事件,可以手動增加case
case INSERT:
// 表名
String tableName = entry.getHeader().getTableName();
//System.out.println("表名:"+tableName);
//測試users表進行映射處
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
//for(CanalEntry.Column c:afterColumnsList){
// System.out.println("字段:"+c.getName()+"值:"+c.getValue());
//}
System.out.println(afterColumnsList);
break;
case UPDATE:
List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
System.out.println("新插入的數據是:" + afterColumnsList2);
break;
case DELETE:
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
System.out.println("被刪除的數據是:" + beforeColumnsList);
break;
case CREATE:
System.out.println("創建表格");
case ALTER:
System.out.println("alert變更");
case TRUNCATE:
System.out.println("truncate變更");
default:
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
}
Canal 可能存在的問題
在實際使用 Canal 的時候需要注意,它可能會存在以下問題。
- 對 binlog 的格式有一定要求
一般使用 Canal 的話,建議修改 MySQL 主節點的 binlog 爲 ROW 格式,否則可能會引發很多隱藏問題。例如當 MySQL 的 BinLog 格式爲 mixed 類型的時候,可能 Canal 會收到 QUERY,UPDATE 兩條類型的事件。
- 本質還是單機架構
由於 Canal 的部署其實由於要保證 MySQL 的 binlog 順序性,所以其實底層是有一個緩衝區用於接受 binlog 數據的,
Server: 整個 canal 實例 Instance: 你可以理解爲一個特殊的隊列角色,一個 Instance 可以用於承接一系列相關的庫表變更數據內容。
在 Instance 的內部,還有幾個角色:
-
EventParser (數據源接入,模擬 slave 協議和 master 進行交互,協議解析)
-
EventSink (Parser 和 Store 鏈接器,進行數據過濾,加工,分發的工作)
-
EventStore (數據存儲)
-
MetaManager (增量訂閱 & 消費信息管理器)
由於順序性的原因,Canal 的一個 Instance 一次只能給一個消費者訪問(pull,ack)。因此如果該消費者的消費能力較弱,就會導致大量的數據堆積在 Canal 的內存中,高併發場景下可能會有 OOM 情況發生。所以 Canal 一般會和常用的 MQ 進行結合使用。例如內接入 RocketMQ 或者 Kafka,當收到 binlog 後發送到 MQ 中,讓 MQ 來承載這些壓力。(MQ 天生就是適合抗高併發的)
但是要注意 MQ 的內部隊列也得設置成一個,並且消費端得單線程消費,保證數據的順序性。
- meta.dat 文件的 binlog 位置對不上
canal 監聽 mysql 的時候會在 conf/{destination}/meta.dat 文件中記錄當前 binlog 的名字、position。
下次啓動 canal 的時候就從這個 binlog 的 position 開始讀取數據。
meta.dat 記錄的是上一次的 binlog 信息,當你刪除 mysql 的 binlog 或者監聽到另外一臺 mysql 後,meta.dat 記錄的信息就相當於過期信息,所以就會出現 PositionNotFoundException。
- 數據重複消費問題 這類問題一般出現在集羣架構下的 canal 實例切換,由於他們利用了 Zookeeper 作爲第三方,實際上 binlog 的位點信息是記錄在各個 server 實例本地的 meta.dat 文件裏面的,這塊的數據需要通過 TCP 請求上報到 Zk,這裏存在一個時延和網絡的不穩定因素干擾。因此如果你的 Canal 備用節點切入的時候,有可能拿到的 binlog 位點是有一定滯後的。面對這種場景,建議是消費端處理的時候,可以開一個 “窗口” 一樣的機制,防止數據重複注入。
基於 mysqlbinlog 命令進行解析
上邊我們介紹的 Canal 更多是實時數據解析時候使用的,有時候可能你只需要對 binlog 文件做一些簡單的文件解析而已,這種情況就不需要用 Canal 這麼複雜的組件了。
mysql 內置的 mysqlbinlog 工具就是一款不錯的命令工具,它內部支持對 binlog 的各種轉換。例如 row 格式的 binlog 直接給你轉成可讀性較強的 sql。
https://dev.mysql.com/doc/refman/5.7/en/mysqlbinlog.html
mysqlbinlog 的常用命令
- 基本格式
mysqlbinlog [*options*] *log_file* ...
- base64-output,可以控制輸出語句輸出 base64 編碼的 BINLOG 語句; decode-rows:選項將把基於行的事件解碼成一個 SQL 語句
mysqlbinlog --base64-output=decode-rows /data/binlog.00001 > mysql-binlog-00001.sql
- 指定時間範圍去解析 binlog
mysqlbinlog -v --base64-output=decode-rows --start-datetime="2021-4-20 00:00:00" --stop-datetime="2021-04-20 23:59:59" mysql-binlog.000001 > mysql-binlog-000001.sql
- 指定數據庫進行過濾
mysqlbinlog -v --base64-output=decode-rows --database=test --start-datetime="2021-4-20 00:00:00" --stop-datetime="2021-04-20 23:59:59" mysql-bin.000001 > mysql-binlog-000001.sql
使用 base64-output 格式輸出的內容格式如下:
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 1180
#210420 20:24:21 server id 1 end_log_pos 1237 CRC32 0x6d2e5f81 Table_map: `test`.`users` mapped to number 108
# at 1237
#210420 20:24:21 server id 1 end_log_pos 1313 CRC32 0x7ef28a01 Update_rows: table id 108 flags: STMT_END_F
### UPDATE `test`.`users`
### WHERE
### @1=4
### @2='idea-0001'
### @3=17
### @4=1618841040
### @5=0
### SET
### @1=4
### @2='idea'
### @3=18
### @4=1618841040
### @5=1618921461
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by mysqlbinlog */ /*!*/;
DELIMITER ;
# End of log file
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;
你會發現基本的 SQL 語句會有輸出,但是並不會展示具體的字段名稱(我猜是爲了節省空間吧)。這裏你可以自行進行 grep 分析,就會得出相關的 INSERT,UPDATE,DELETE 的 sql 條數了,下邊是一份我自己測試用的 Shell 腳本,供大家參考下:
cat ./mysql-binlog-000001.sql | grep 'UPDATE ' >update.sql
cat ./mysql-binlog-000001.sql | grep 'INSERT ' >insert.sql
cat ./mysql-binlog-000001.sql | grep 'DELETE ' >delete.sql
mysql-binlog-connector 開源工具
mysql-binlog-connector 是一款輕量級的 jar 包,可以用於自行解析 binlog 日誌內容。相關 github 倉庫地址: https://github.com/shyiko/mysql-binlog-connector-java 這款 jar 是一些個人開發者維護的 jar 包,可以用於實現類似 Canal 一樣的監聽 binlog 變更的效果。目前 GitHub 上的 star 數量也不少。
首先我們只需引入以下依賴:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<binlog.version>0.21.0</binlog.version>
<fastjson.version>2.0.25</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>${binlog.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
解析 binlog 日誌內容代碼如下:
import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import java.io.File;
import java.io.IOException;
/**
* @author idea
* @description
*/
public class BinlogMain {
private static String sourceFilePath = "本地binlog文件的地址";
public static void main(String[] args) throws IOException {
String filePath = sourceFilePath;
File binlogFile = new File(filePath);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setChecksumType(ChecksumType.CRC32);
BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile,
eventDeserializer);
try {
for (Event event; (event = reader.readEvent()) != null; ) {
EventType eventType = event.getHeader().getEventType();
if (EventType.QUERY.equals(eventType)) {
System.out.println(event);
} else if (EventType.UPDATE_ROWS.equals(eventType)) {
System.out.println(event);
} else if (EventType.TABLE_MAP.equals(eventType)) {
//每個rows_event事件的前後都會有個binlog記錄,用於描述表內部的id和變化
System.out.println(event);
} else if (EventType.isUpdate(eventType)) {
System.out.println(event);
} else if (EventType.isWrite(eventType)) {
System.out.println(event);
} else if (EventType.isDelete(eventType)) {
System.out.println(event);
} else if (EventType.isUpdate(eventType)) {
System.out.println(event);
}
}
} finally {
reader.close();
}
}
}
除了支持解析已有的 binlog 日誌外,使用該組件還支持僞造成 slave,實時監聽 binlog 變更的效果。但是它在數據處理,穩定性等各方面均不如阿里巴巴的 Canal 要好,所以適合簡單場景處理。如果你需要做實時監聽,可以參考這篇文章:https://blog.csdn.net/m0_37583655/article/details/119148470
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/2C_AfPWMcWwkJKf6P07C8g