PostgreSQL 邏輯解碼實踐 -一-
前言
在日常運維過程中,我們經常需要對數據進行一些同步處理,從系統 A 將一個表的數據同步到系統 B。以前我們主要使用 Oracle GoldenGate,後來 MySQL 逐漸增多,使用上了分佈式,阿里的 otter 也開始部分使用。近來我們陸續大量上了 PostgreSQL 數據庫,對 PG 使用較多的 CDC 同步工具是 debezium
,它使用 PostgreSQL 邏輯解碼插件,今天我們將介紹一下它的一些基本知識。
邏輯解碼的概念
PostgreSQL 在 9.4 後提供了邏輯解碼功能,其基本原理是從 WAL 日誌解碼,跟蹤所有 DML (INSERT, UPDATE, DELETE) 更改。之後轉換成各種可用的格式,比如 Json。最終發送給外部程序進行消費 (例如 ElasticSearch)。下圖顯示了整個流程的前半部分(解碼輸出):
對於邏輯解碼插件,主要有以下幾類。按照格式來分主要有
-
JSON format plugins
-
Protobuf Format Plugins
-
Avro Format Plugins
-
SQL Format Plugins
-
Miscellaneous Plugins
具體可以參考 Wiki 上的 Logical Decoding Plugins
儘管如此,我還是先瀏覽了每一個項目,發現許多項目都停止了更新。仍在更新的主要是下面的幾個:
wal2json
Postgres Decoderbufs
pglogical
wal2mongo
wal2json 邏輯解碼測試
爲了能使用邏輯解碼,需要先安裝 wal2json 插件。
[postgres@centos8 ~]$ git clone https://github.com/eulerto/wal2json.git
cd wal2json/
export PGHOME=/data/postgresql/pgsql/default
export PATH=$PGHOME/bin:$PATH
make
make install
接下來要在 postgresql.conf 中配置三個相關參數。
wal_level=logical
max_replication_slots=10
max_wal_sender=10
將wal_level
設置爲 logical 允許 WAL 記錄邏輯解碼所需的信息。參數max_replication_slots
指定了發送端可以支持的最大複製槽數量,默認爲 10 個,無需修改。max_wal_sender
指定發送方用於流複製的併發連接的最大數量,默認值也是 10。
同時也需要創建一個插槽。pg_recvlogical 是 PostgreSQL 自帶的工具,它管理插槽並使用插槽中的流數據。
[postgres@centos8 ~]$ pg_recvlogical --version
pg_recvlogical (PostgreSQL) 13.1
pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
2021-01-04 13:27:57.921 UTC [20543] LOG: logical decoding found consistent point at 0/155A850
2021-01-04 13:27:57.921 UTC [20543] DETAIL: There are no running transactions.
[postgres@centos8 ~]$ psql
psql (13.1)
Type "help" for help.
postgres=# select slot_name,slot_type,active,restart_lsn from pg_replication_slots;
slot_name | slot_type | active | restart_lsn
-----------+-----------+--------+-------------
test_slot | logical | f | 0/155A850
(1 row)
[postgres@centos8 ~]$ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
2021-01-04 13:30:56.709 UTC [20554] LOG: starting logical decoding for slot "test_slot"
2021-01-04 13:30:56.709 UTC [20554] DETAIL: Streaming transactions committing after 0/155A888, reading WAL from 0/155A850.
2021-01-04 13:30:56.709 UTC [20554] LOG: logical decoding found consistent point at 0/155A850
2021-01-04 13:30:56.709 UTC [20554] DETAIL: There are no running transactions.
在執行 pg_recvlogical 創建槽之後,輸出一個日誌,該日誌指出邏輯解碼的 lsn 位置,並告訴您這裏沒有長事務。一旦槽被創建,再次運行 pg_recvlogical 命令,使用參數 -- start 代表從指定的邏輯複製槽開始進行流式傳送更改,將一直持續到被一個信號終止。pretty-print 表示添加空格和 JSON 結構的縮進,默認值是 false。add-msg-prefixes
代表僅當前綴在列表中時才包括消息。默認爲所有前綴。它是一個逗號分隔的值。我們開的這個終端將始終輸出,並等待接收邏輯解碼流。
此時,打開另一個窗口,登錄 到 PSQL 中的製造一些事務。
- 執行建表語句
##窗口2
postgres=# CREATE TABLE test (id SERIAL,name VARCHAR(30), PRIMARY KEY(id));
CREATE TABLE
##窗口1
{
"change": [
]
}
可以看到 DDL 語句不記錄,只顯示了 change。
- 執行 DML 語句
##窗口2
INSERT INTO test (name) VALUES ('yibao');
UPDATE test SET name = 'mingyoushiren' WHERE id = 1;
DELETE FROM test WHERE id = 1;
##窗口1
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "test",
"columnnames": ["id", "name"],
"columntypes": ["integer", "character varying(30)"],
"columnvalues": [1, "yibao"]
}
]
}
{
"change": [
{
"kind": "update",
"schema": "public",
"table": "test",
"columnnames": ["id", "name"],
"columntypes": ["integer", "character varying(30)"],
"columnvalues": [1, "mingyoushiren"],
"oldkeys": {
"keynames": ["id"],
"keytypes": ["integer"],
"keyvalues": [1]
}
}
]
}
{
"change": [
{
"kind": "delete",
"schema": "public",
"table": "test",
"oldkeys": {
"keynames": ["id"],
"keytypes": ["integer"],
"keyvalues": [1]
}
}
]
}
Kind 在這裏表示語句操作的類型。對於 insert 和 update,會插入新行到表中,這個新行的數據也會被捕捉併發送輸出。而 update 和 delete,默認不設置表的REPLICA IDENTITY
屬性的話,oldkeys 就只會顯示歷史記錄的 key id。
REPLICA IDENTITY
有四個選項。
-
default,記錄 pk 列
-
using index index_name,記錄索引列。該索引必須是唯一索引、不能是部分索引、不能是可延遲索引並且只包括被標記成 not null 的列。
-
full 記錄行中所有列的舊值。
-
nothing 什麼也不記錄(這是系統表的默認值)
## 窗口2
alter table test REPLICA IDENTITY full;
INSERT INTO test (name) VALUES ('yibao');
UPDATE test SET name = 'mingyoushiren' WHERE id = 2;
DELETE FROM test WHERE id = 2;
## 窗口1
{
"change": [
]
}
{
"change": [
{
"kind": "update",
"schema": "public",
"table": "test",
"columnnames": ["id", "name"],
"columntypes": ["integer", "character varying(30)"],
"columnvalues": [2, "mingyoushiren"],
"oldkeys": {
"keynames": ["id", "name"],
"keytypes": ["integer", "character varying(30)"],
"keyvalues": [2, "yibao"]
}
}
]
}
{
"change": [
{
"kind": "delete",
"schema": "public",
"table": "test",
"oldkeys": {
"keynames": ["id", "name"],
"keytypes": ["integer", "character varying(30)"],
"keyvalues": [2, "mingyoushiren"]
}
}
]
}
我們把 test 表的REPLICA IDENTITY
屬性修改成 full 之後,再次執行上述 dml 操作,可以看到這次把 oldkeys 全部都輸出了。
後記
今天我們研究了邏輯解碼的插件 wal2json,下次我們來研究怎麼把 json 輸出轉換成 MQ 消息,然後在進行消費 (Consumer)。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/j3dv99AXV962qT7laX5QDQ