PostgreSQL 邏輯解碼實踐 -一-

前言

在日常運維過程中,我們經常需要對數據進行一些同步處理,從系統 A 將一個表的數據同步到系統 B。以前我們主要使用 Oracle GoldenGate,後來 MySQL 逐漸增多,使用上了分佈式,阿里的 otter 也開始部分使用。近來我們陸續大量上了 PostgreSQL 數據庫,對 PG 使用較多的 CDC 同步工具是 debezium,它使用 PostgreSQL 邏輯解碼插件,今天我們將介紹一下它的一些基本知識。

邏輯解碼的概念

PostgreSQL 在 9.4 後提供了邏輯解碼功能,其基本原理是從 WAL 日誌解碼,跟蹤所有 DML (INSERT, UPDATE, DELETE) 更改。之後轉換成各種可用的格式,比如 Json。最終發送給外部程序進行消費 (例如 ElasticSearch)。下圖顯示了整個流程的前半部分(解碼輸出):

對於邏輯解碼插件,主要有以下幾類。按照格式來分主要有

具體可以參考 Wiki 上的 Logical Decoding Plugins

儘管如此,我還是先瀏覽了每一個項目,發現許多項目都停止了更新。仍在更新的主要是下面的幾個:

wal2json

Postgres Decoderbufs

pglogical

wal2mongo

wal2json 邏輯解碼測試

爲了能使用邏輯解碼,需要先安裝 wal2json 插件。

[postgres@centos8 ~]$ git clone https://github.com/eulerto/wal2json.git

cd wal2json/
export PGHOME=/data/postgresql/pgsql/default
export PATH=$PGHOME/bin:$PATH
make 
make install

接下來要在 postgresql.conf 中配置三個相關參數。

wal_level=logical
max_replication_slots=10
max_wal_sender=10

wal_level設置爲 logical 允許 WAL 記錄邏輯解碼所需的信息。參數max_replication_slots指定了發送端可以支持的最大複製槽數量,默認爲 10 個,無需修改。max_wal_sender指定發送方用於流複製的併發連接的最大數量,默認值也是 10。

同時也需要創建一個插槽。pg_recvlogical 是 PostgreSQL 自帶的工具,它管理插槽並使用插槽中的流數據。

[postgres@centos8 ~]$ pg_recvlogical --version
pg_recvlogical (PostgreSQL) 13.1


pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
2021-01-04 13:27:57.921 UTC [20543] LOG:  logical decoding found consistent point at 0/155A850
2021-01-04 13:27:57.921 UTC [20543] DETAIL:  There are no running transactions.

[postgres@centos8 ~]$ psql
psql (13.1)
Type "help" for help.

postgres=# select slot_name,slot_type,active,restart_lsn from pg_replication_slots;
 slot_name | slot_type | active | restart_lsn 
-----------+-----------+--------+-------------
 test_slot | logical   | f      | 0/155A850
(1 row)


[postgres@centos8 ~]$ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
2021-01-04 13:30:56.709 UTC [20554] LOG:  starting logical decoding for slot "test_slot"
2021-01-04 13:30:56.709 UTC [20554] DETAIL:  Streaming transactions committing after 0/155A888, reading WAL from 0/155A850.
2021-01-04 13:30:56.709 UTC [20554] LOG:  logical decoding found consistent point at 0/155A850
2021-01-04 13:30:56.709 UTC [20554] DETAIL:  There are no running transactions.

在執行 pg_recvlogical 創建槽之後,輸出一個日誌,該日誌指出邏輯解碼的 lsn 位置,並告訴您這裏沒有長事務。一旦槽被創建,再次運行 pg_recvlogical 命令,使用參數 -- start 代表從指定的邏輯複製槽開始進行流式傳送更改,將一直持續到被一個信號終止。pretty-print 表示添加空格和 JSON 結構的縮進,默認值是 false。add-msg-prefixes代表僅當前綴在列表中時才包括消息。默認爲所有前綴。它是一個逗號分隔的值。我們開的這個終端將始終輸出,並等待接收邏輯解碼流。

此時,打開另一個窗口,登錄 到 PSQL 中的製造一些事務。

  1. 執行建表語句
##窗口2
postgres=# CREATE TABLE test (id SERIAL,name VARCHAR(30), PRIMARY KEY(id));
CREATE TABLE

##窗口1
{
        "change"[
        ]
}

可以看到 DDL 語句不記錄,只顯示了 change。

  1. 執行 DML 語句
##窗口2
INSERT INTO test (name) VALUES ('yibao');
UPDATE test SET name = 'mingyoushiren' WHERE id = 1;
DELETE FROM test WHERE id = 1;

##窗口1
{
        "change"[
                {
                        "kind""insert",
                        "schema""public",
                        "table""test",
                        "columnnames"["id""name"],
                        "columntypes"["integer""character varying(30)"],
                        "columnvalues"[1, "yibao"]
                }
        ]
}
{
        "change"[
                {
                        "kind""update",
                        "schema""public",
                        "table""test",
                        "columnnames"["id""name"],
                        "columntypes"["integer""character varying(30)"],
                        "columnvalues"[1, "mingyoushiren"],
                        "oldkeys"{
                                "keynames"["id"],
                                "keytypes"["integer"],
                                "keyvalues"[1]
                        }
                }
        ]
}
{
        "change"[
                {
                        "kind""delete",
                        "schema""public",
                        "table""test",
                        "oldkeys"{
                                "keynames"["id"],
                                "keytypes"["integer"],
                                "keyvalues"[1]
                        }
                }
        ]
}

Kind 在這裏表示語句操作的類型。對於 insert 和 update,會插入新行到表中,這個新行的數據也會被捕捉併發送輸出。而 update 和 delete,默認不設置表的REPLICA IDENTITY屬性的話,oldkeys 就只會顯示歷史記錄的 key id。

REPLICA IDENTITY有四個選項。

## 窗口2
alter table test REPLICA IDENTITY full;
INSERT INTO test (name) VALUES ('yibao');
UPDATE test SET name = 'mingyoushiren' WHERE id = 2;
DELETE FROM test WHERE id = 2;

## 窗口1
{
        "change"[
        ]
}
{
        "change"[
                {
                        "kind""update",
                        "schema""public",
                        "table""test",
                        "columnnames"["id""name"],
                        "columntypes"["integer""character varying(30)"],
                        "columnvalues"[2, "mingyoushiren"],
                        "oldkeys"{
                                "keynames"["id""name"],
                                "keytypes"["integer""character varying(30)"],
                                "keyvalues"[2, "yibao"]
                        }
                }
        ]
}
{
        "change"[
                {
                        "kind""delete",
                        "schema""public",
                        "table""test",
                        "oldkeys"{
                                "keynames"["id""name"],
                                "keytypes"["integer""character varying(30)"],
                                "keyvalues"[2, "mingyoushiren"]
                        }
                }
        ]
}

我們把 test 表的REPLICA IDENTITY屬性修改成 full 之後,再次執行上述 dml 操作,可以看到這次把 oldkeys 全部都輸出了。

後記

今天我們研究了邏輯解碼的插件 wal2json,下次我們來研究怎麼把 json 輸出轉換成 MQ 消息,然後在進行消費 (Consumer)。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/j3dv99AXV962qT7laX5QDQ