Canal 介紹和使用指南
Canal 是阿里開源的一款 MySQL 數據庫增量日誌解析工具,提供增量數據訂閱和消費。使用 Canal 能夠實現異步更新數據,配合 MQ 使用可在很多業務場景下發揮巨大作用。
Canal 簡介
canal 是阿里開源的一款 MySQL 數據庫增量日誌解析工具,提供增量數據訂閱和消費。
使用 Canal 能夠實現異步更新數據,配合 MQ 使用可在很多業務場景下發揮巨大作用。
工作原理
MySQL 主備複製原理
-
MySQL master 將數據變更寫入二進制日誌 (binary log, 其中記錄叫做二進制日誌事件 binary log events,可以通過 show binlog events 進行查看)
-
MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌 (relay log)
-
MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
圖片來源:https://avisheksharma.wordpress.com/2015/01/07/step-wise-guide-to-setup-mysql-replication/
Canal 工作原理
-
Canal 模擬 MySQL slave 的交互協議,僞裝自己爲 MySQL slave ,向 MySQL master 發送 dump 協議
-
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 Canal)
-
Canal 解析 binary log 對象 (原始爲 byte 流)
環境準備
你應該事先準備好一個 MySQL 環境,並按以下步驟進行設置。
開啓 binlog
需要先開啓 MySQL 的 Binlog 寫入功能,配置 binlog-format
爲 ROW
模式,具體my.cnf
中配置如下:
[mysqld]
log-bin=mysql-bin # 開啓 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重複
修改配置文件之後,重啓 MySQL。
使用命令查看是否打開 binlog 模式,如輸出以下內容則說明 binlog 已開啓。
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
查看binlog_format
配置是否正確。
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
添加授權
Canal 的原理是模擬自己爲 MySQL slave,所以一定要爲賬號授予做爲 MySQL slave 的相關權限。
下面的命令是先創建一個名爲canal
的賬號,再對其進行授權,如果已有賬戶可直接 grant。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
安裝 Canal
打開官方 release 頁面,根據需要選擇對應的軟件包下載即可。
將下載後的軟件包解壓,可看到以下目錄。
bin
conf
lib
logs
修改配置文件:canal-server/conf/example/instance.properties
。
將canal.instance.master.address
修改爲你的 MySQL 地址。
將canal.instance.tsdb.dbUsername
修改爲你上面授權的賬號。
將canal.instance.tsdb.dbPassword
修改爲你上面授權賬號的密碼。
配置示例如下:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
當然我還是推薦開發和測試階段使用 Docker 搭建環境。
執行以下命令,拉取canal-server
最新鏡像。
docker pull canal/canal-server:latest
如果因網絡問題無法直接拉取 Docker 鏡像,也可以選擇 clone 代碼到本地編譯。
git clone git@github.com:alibaba/canal.git
cd canal/docker && sh build.sh
啓動容器。
docker run -d --name canal-server -p 11111:11111 canal/canal-server
進入容器。
docker exec -it canal-server /bin/bash
修改配置。
vi canal-server/conf/example/instance.properties
注意:如果是 macOS 平臺容器內使用 host.docker.internal 表示
localhost
。即:
canal.instance.master.address=host.docker.internal:3306
修改完配置後重啓容器。
docker container restart canal-server
Canal Client
Canal 特別設計了 Client-Server 模式,交互協議使用 protobuf v3 , Client 端可採用不同語言實現不同的消費邏輯。
啓動 Canal Server 之後,我們可以使用 Canal 客戶端連接 Canal 進行消費,本文以 Go 客戶端 canal-go 爲例,演示如何從 canal-server 消費數據。
package main
import (
"fmt"
"time"
pbe "github.com/withlin/canal-go/protocol/entry"
"github.com/golang/protobuf/proto"
"github.com/withlin/canal-go/client"
)
// canal-go client demo
func main() {
// 連接canal-server
connector := client.NewSimpleCanalConnector(
"127.0.0.1", 11111, "", "", "example", 60000, 60*60*1000)
err := connector.Connect()
if err != nil {
panic(err)
}
// mysql 數據解析關注的表,Perl正則表達式.
err = connector.Subscribe(".*\\..*")
if err != nil {
fmt.Printf("connector.Subscribe failed, err:%v\n", err)
panic(err)
}
// 消費消息
for {
message, err := connector.Get(100, nil, nil)
if err != nil {
fmt.Printf("connector.Get failed, err:%v\n", err)
continue
}
batchId := message.Id
if batchId == -1 || len(message.Entries) <= 0 {
time.Sleep(time.Second)
fmt.Println("===暫無數據===")
continue
}
printEntry(message.Entries)
}
}
func printEntry(entries []pbe.Entry) {
for _, entry := range entries {
// 忽略事務開啓和事務關閉類型
if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
continue
}
// RowChange對象,包含了一行數據變化的所有特徵
rowChange := new(pbe.RowChange)
// protobuf解析
err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
if err != nil {
fmt.Printf("proto.Unmarshal failed, err:%v\n", err)
}
if rowChange == nil {
continue
}
// 獲取並打印Header信息
header := entry.GetHeader()
fmt.Printf("binlog[%s : %d],name[%s,%s], eventType: %s\n",
header.GetLogfileName(),
header.GetLogfileOffset(),
header.GetSchemaName(),
header.GetTableName(),
header.GetEventType(),
)
//判斷是否爲DDL語句
if rowChange.GetIsDdl() {
fmt.Printf("isDdl:true, sql:%v\n", rowChange.GetSql())
}
// 獲取操作類型:insert/update/delete等
eventType := rowChange.GetEventType()
for _, rowData := range rowChange.GetRowDatas() {
if eventType == pbe.EventType_DELETE {
printColumn(rowData.GetBeforeColumns())
} else if eventType == pbe.EventType_INSERT || eventType == pbe.EventType_UPDATE {
printColumn(rowData.GetAfterColumns())
} else {
fmt.Println("---before---")
printColumn(rowData.GetBeforeColumns())
fmt.Println("---after---")
printColumn(rowData.GetAfterColumns())
}
}
}
}
func printColumn(columns []*pbe.Column) {
for _, col := range columns {
fmt.Printf("%s:%s update=%v\n", col.GetName(), col.GetValue(), col.GetUpdated())
}
}
Canal Kafka/RoctetMQ
Canal 1.1.1 版本之後,默認支持將 Canal Server 接收到的 binlog 數據直接投遞到 MQ,目前默認支持的 MQ 系統有 Kafka、RocketMQ、RabbitMQ、PulsarMQ。
這裏以介紹使用 Canal Server 將 binlog 數據投遞到 Kafka 爲例。
配置
請事先準備好 Kafka 環境,可參考我之前寫的 Go 操作 Kafka 之 kafka-go 使用 Docker 快速搭建本地 Kafka 環境。
修改 instance 配置
在instance.properties
配置文件中設置 MQ 相關配置。
# 按需修改成自己的數據庫信息
#################################################
...
canal.instance.master.address=127.0.0.1:3306
# username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
# 設置默認的topic
canal.mq.topic=example
# 針對庫名或者表名發送動態topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#庫名.表名: 唯一主鍵,多個表之間用逗號分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
其中,canal.mq.dynamicTopic
配置說明。
Canal 1.1.3 版本之後, 支持配置格式爲:schema
或 schema.table
,多個配置之間使用逗號或分號分隔。
-
例子 1:
test\\.test
指定匹配的單表,發送到以 test_test 爲名字的 topic 上 -
例子 2:
.*\\..*
匹配所有表,則每個表都會發送到各自表名的 topic 上 -
例子 3:
test
指定匹配對應的庫,一個庫的所有表都會發送到庫名的 topic 上 -
例子 4:
test\\..*
指定匹配的表達式,針對匹配的表會發送到各自表名的 topic 上 -
例子 5:
test,test1\\.test1
,指定多個表達式,會將 test 庫的表都發送到 test 的 topic 上,test1\\.test1
的表發送到對應的test1_test1
topic 上,其餘的表發送到默認的canal.mq.topic
值
爲滿足更大的靈活性,Canal 還允許對匹配條件的規則指定發送的 topic 名字,配置格式:topicName:schema
或 topicName:schema.table
。
-
例子 1:
test:test\\.test
指定匹配的單表,發送到以 test 爲名字的 topic 上 -
例子 2:
test:.*\\..*
匹配所有表,因爲有指定 topic,則每個表都會發送到 test 的 topic 下 -
例子 3:
test:test
指定匹配對應的庫,一個庫的所有表都會發送到 test 的 topic 下 -
例子 4:
testA:test\\..*
指定匹配的表達式,針對匹配的表會發送到 testA 的 topic 下 -
例子 5:
test0:test,test1:test1\\.test1
,指定多個表達式,會將 test 庫的表都發送到 test0 的 topic 下,test1\\.test1
的表發送到對應的 test1 的 topic 下,其餘的表發送到默認的canal.mq.topic
值
修改 canal 配置文件
默認配置文件路徑爲/usr/local/canal/conf/canal.properties
# ...
# 可選項: tcp(默認), kafka,RocketMQ,rabbitmq,pulsarmq
canal.serverMode = kafka
# ...
# 是否爲flat json格式對象
canal.mq.flatMessage = true
# Canal的batch size, 默認50K, 由於kafka最大消息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數據的超時時間, 單位: 毫秒, 空爲不限超時
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local
...
##################################################
######### Kafka #############
##################################################
# 此處配置修改爲你的Kafka環境地址
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf
# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT
更多詳細內容請查看 Canal-Kafka-RocketMQ-QuickStart。
按上述修改 Canal 配置後,重啓 Canal 服務即可。
其他
此外,Canal 也支持 HA 模式和多節點 MySQL,Canal1.14 + 版本還提供了 Admin 管理界面,讀者可以根據 Canal-Admin-QuickStart 自行嘗試。
參考資料
Canal QuickStart
Canal Docker-QuickStart
Canal AdminGuide
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/-C87woP-BriFx7ZHlmynFQ