canal 同步 binlog 實戰
canal 同步 binlog 實戰
- canal 原理介紹
canal 簡介
基於日誌增量訂閱和消費的業務包括
-
數據庫鏡像
-
數據庫實時備份
-
索引構建和實時維護 (拆分異構索引、倒排索引等)
-
業務 cache 刷新
-
帶業務邏輯的增量數據處理
當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
canal 原理
-
MySQL master 將數據變更寫入二進制日誌 (binary log, 其中記錄叫做二進制日誌事件 binary log events,可以通過 show binlog events 進行查看)
-
MySQL slave 將 master 的 binary log events 拷⻉到它的中繼日誌 (relay log)
-
MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
所以 canal 模擬 mysql slave 也實現了自己的 binlog 同步,原理如下:
-
canal 模擬 MySQL slave 的交互協議,僞裝自己爲 MySQL slave ,向 MySQL master 發送 dump 協議
-
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal)
-
canal 解析 binary log 對象 (原始爲 byte 流)
-
canal 實戰
首先去下載安裝相關 canal 組件
這裏可以參考官網:https://github.com/alibaba/canal
安裝完 canal 服務,然後按照文檔介紹去配置相應的參數,就可以啓動 canal 的服務了,由於官網介紹的比較細,大家自行去安裝和配置,這裏着重介紹 canal 同步 binlog 代碼實踐。
canal-go 訂閱和消費 canal binlog
下載 canal-go 客戶端,運行裏面 samples 下的 main.go,當然可能報錯,將路徑替換正確就 OK 了。(有問題歡迎私信) 部分訂閱消費代碼如下:
func main() {
// 192.168.199.17 替換成你的canal server的地址
// example 替換成-e canal.destinations=example 你自己定義的名字
connector := client.NewSimpleCanalConnector("127.0.0.1", 11113, "", "", "example", 60000, 60*60*1000)
err := connector.Connect()
if err != nil {
log.Println(err)
os.Exit(1)
}
err = connector.Subscribe(".*\\..*")
if err != nil {
log.Println(err)
os.Exit(1)
}
for {
message, err := connector.Get(100, nil, nil)
if err != nil {
log.Println(err)
os.Exit(1)
}
batchId := message.Id
if batchId == -1 || len(message.Entries) <= 0 {
time.Sleep(300 * time.Millisecond)
fmt.Println("===沒有數據了===")
continue
}
printEntry(message.Entries)
}
}
我們可以對 canal 配置文件中已經配置的數據庫表進行增刪改,看看我們程序是否能顯示相關 binlog 日誌
- 增加
mysql> insert into canal(`id`, `name`) values(1,"hah");
Query OK, 1 row affected (0.04 sec)
mysql>
打印日誌:
===沒有數據了===
================> binlog[mysql-bin.000002 : 2069],name[test,canal], eventType: INSERT
id : 1 update= true
name : hah update= true
sex : update= true
age : update= true
amount : update= true
email : update= true
occur_time : 2021-09-15 09:47:09 update= true
- 更新
mysql> update canal set where id=1;
Query OK, 1 row affected (0.04 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql>
打印日誌如下:
===沒有數據了===
================> binlog[mysql-bin.000002 : 2348],name[test,canal], eventType: UPDATE
-------> before
id : 1 update= false
name : hah update= false
sex : update= false
age : update= false
amount : update= false
email : update= false
occur_time : 2021-09-15 09:47:09 update= false
-------> after
id : 1 update= false
name : hello update= true
sex : update= false
age : update= false
amount : update= false
email : update= false
occur_time : 2021-09-15 09:47:09 update= false
===沒有數據了===
- 刪除
mysql> delete from canal where id=1;
Query OK, 1 row affected (0.03 sec)
mysql>
打印日誌如下:
================> binlog[mysql-bin.000002 : 2643],name[test,canal], eventType: DELETE
id : 1 update= false
name : hello update= false
sex : update= false
age : update= false
amount : update= false
email : update= false
occur_time : 2021-09-15 09:47:09 update= false
===沒有數據了===
- 總結
通過 canal-go 客戶端可以訂閱和消費 canal server,那麼同樣我們可以配置 kafka,mysql,es,mq 等也可以去訂閱和消費數據,項目中比較常見的就是將 mysql 數據同步到 es 中提供搜索查詢能力,而這最好的方式就是通過 canal 中間件來做,減少人工的維護成本,提供工作效率。
關注:堆棧 future
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/XIGeRNVfZg05RUa5zM3Hsw