聊聊 Flink:Flink 的狀態管理

我們知道,Flink 的一個算子可能會有多個子任務,每個子任務可能分佈在不同的實例(即 slot)上,我們可以把 Flink 的狀態理解爲某個算子的子任務在其當前實例上的一個變量,該變量記錄了流過當前實例算子的歷史記錄產生的結果。當新數據記錄流入時,我們需要結合該結果(即狀態,State)來進行計算。

實際上,Flink 的狀態是由算子的子任務來創建和管理的。一個狀態的更新和獲取的流程如下圖所示,一個算子子任務接收輸入流,獲取對應的狀態,根據新的計算結果更新狀態。一個簡單的例子是對一個時間窗口內流入的某個整數字段進行求和,那麼當算子子任務接收到新元素時,會獲取已經存儲在狀態中的數值(歷史記錄的求和結果),然後將當前輸入加到狀態上,並將狀態數據更新。


Flink 應用程序的狀態訪問都在本地進行,這樣有助於提高吞吐量和降低延遲。通常情況下,Flink 應用程序都是將狀態存儲在 JVM 堆內存中,但如果狀態數據太大,也可以選擇將其以結構化數據格式存儲在高速磁盤中。
通過狀態快照,Flink 能夠提供可容錯的、精確一次的計算語義。Flink 應用程序在執行時會獲取並存儲分佈式 Pipeline(流處理管道)中整體的狀態,它會將數據源中消費數據的偏移量記錄下來,並將整個作業圖中算子獲取到該數據(記錄的偏移量對應的數據)時的狀態記錄並存儲下來。當發生故障時,Flink 作業會恢復上次存儲的狀態,重置數據源從狀態中記錄的上次消費的偏移量,開始重新進行消費處理。而且狀態快照在執行時會異步獲取狀態並存儲,並不會阻塞正在進行的數據處理邏輯。

總結來說,Flink 狀態管理的主要特性如下:

Flink 提供了不同的狀態機制,用於指定狀態的存儲方式和存儲位置。根據數據集是否按照 Key 進行分區,將狀態分爲 Keyed State 和 Operator State(Non-Keyed State) 兩種類型。

二、Keyed State 與 Operator State

2.1 Keyed State

Keyed State 在通過 keyBy() 分組的 KeyedStream 上使用,對每個 Key 的數據進行狀態存儲和管理,狀態是跟每個 Key 綁定的,即每個 Key 對應一個狀態對象。根據狀態數據的類型不同,Flink 中定義了多種狀態對象,用於存儲狀態數據,以適應不同的計算場景。

通過 keyBy() 會將數據流進行狀態分區,Keyed State 被進一步組織成所謂的 Key Groups,一個 Key Groups 包含多個 Key 的狀態。Key Groups 是 Flink 可以重新分配 Keyed State 的原子單位,Key Groups 的數量與定義的最大並行度相同。在執行期間,算子的每個並行實例處理一個 Key Groups,如下圖所示。


Keyed State 支持的狀態數據類型如下:

例如,使用 ValueState 進行簡單的計數,流數據中的相同 Key 一旦出現次數達到 2,則將其平均值發送到下游,並清除狀態重新開始,代碼如下:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(value -> value.f0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

2.2 Operator State

與 Keyed State 不同,Operator State 與一個特定算子的一個實例綁定,和數據元素中的 Key 無關,每個算子子任務管理自己的 Operator State,每個算子子任務上的數據流共享同一個狀態,可以訪問和修改該狀態。Kafka 連接器是在 Flink 中使用 Operator State 的一個很好的例子。Kafka 消費者的每個並行實例維護一個主題分區和偏移量的映射作爲它的 Operator State。

當並行性發生改變時,Operator State 接口支持在並行算子實例之間重新分配狀態,並且有不同的重新分配方案。Operator State 是一種特殊類型的狀態,主要用於 Source 或 Sink 算子,用來保存流入數據的偏移量或對輸出數據做緩存,以保證 Flink 應用的 Exactly-Once 語義。

Operator State 支持的狀態數據類型爲 ListState。ListState 以一個列表的形式存儲狀態數據,以適應橫向擴展時狀態重分佈的問題。ListState 存儲的列表數據是相互獨立的狀態項的集合,在算子並行性發生改變時,這些狀態項可以在算子實例之間重新分配。

廣播狀態 (Broadcast State) 是 Operator State 的一種特殊類型。引入廣播狀態是爲了支持需要將一個流的記錄廣播到所有下游任務的場景,這些記錄用於在所有子任務之間保持相同的狀態。然後可以在處理第二個流的記錄時訪問此狀態。可以想象一個低吞吐量流,其中包含一組規則,我們要使用這組規則針對來自另一個流的所有元素進行評估。它僅適用於具有廣播流和非廣播流作爲輸入的特定算子,並且這樣的算子可以具有名稱不同的多個廣播狀態。

廣播狀態 (Broadcast State) 是 Operator State 的一種特殊類型。引入廣播狀態是爲了支持需要將一個流的記錄廣播到所有下游任務的場景,這些記錄用於在所有子任務之間保持相同的狀態。然後可以在處理第二個流的記錄時訪問此狀態。可以想象一個低吞吐量流,其中包含一組規則,我們要使用這組規則針對來自另一個流的所有元素進行評估。它僅適用於具有廣播流和非廣播流作爲輸入的特定算子,並且這樣的算子可以具有名稱不同的多個廣播狀態。

我們可以通過實現 CheckpointedFunction 接口來使用 Operator State。該接口是有狀態轉換函數的核心接口,即跨單個流記錄維護狀態的函數。雖然有更多輕量級的接口作爲各種狀態的快捷方式,但該接口在管理 Keyed State 和 Operator State 方面提供了最大的靈活性。


Operator State 支持的數據結構如下:

CheckpointedFunction 接口的源碼如下:

當檢查點(Checkpoint)獲取轉換函數的狀態快照(Snapshot)時,將調用 snapshotState(FunctionSnapshotContext)。在這個方法中,函數通常確保檢查點數據結構(在初始化階段獲得的)是最新的,以便進行快照。給定的 FunctionSnapshotContext(快照上下文)提供對檢查點元數據的訪問。當算子的子任務初始化(實例化)時,initializeState(FunctionInitializationContext) 被調用。子任務初始化包括第一次自定義函數初始化和從之前的 Checkpoint 恢復,因此初始化有兩種應用場景:

下面的代碼通過一個簡單的示例演示使用 SinkFunction 輸出數據到外部系統,並在 CheckpointedFunction 中進行數據緩存,然後統一發送到下游。當作業重啓的時候,對狀態數據進行恢復並重新分配。

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    // sink的核心處理邏輯,將給定的值寫入sink。每個記錄都會調用此函數。
    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        // 先將數據緩存到本地緩存
        bufferedElements.add(value);
        if (bufferedElements.size() >= threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // 輸出到外部系統(需要自行實現)
            }
            // 清空本地緩存
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 清除狀態
        checkpointedState.clear();
        // 將本地狀態添加到ListState
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 創建狀態描述器
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        // 創建ListState,每個ListState都使用唯一的名稱。
        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        // 如果從前一個執行的快照恢復狀態,則返回true(例如作業重啓的情況)
        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

2.3  Keyed State 與 Operator State 的區別

lYTjrA

三、Keyed State

3.1 KeyedState 之 ValueState
ValueState[T] 是單一變量的狀態,T 是某種具體的數據類型,比如 Double、String,或我們自己定義的複雜數據結構。我們可以使用 value() 方法獲取狀態,使用 update(T value) 更新狀態。

需求:當接收到的相同 key 的元素個數等於 3 個,就計算這些元素的 value 的平均值。

(1)繼承算子的 RichFunction,創建狀態並編寫業務邏輯。

public class CountWindowAverageWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {

    /**
     * 用以保存每個 key 出現的次數,以及這個 key 對應的 value 的總值
     * 1. ValueState 保存的是對應的一個 key 的一個狀態值
     */
    private ValueState<Tuple2<Long, Long>> countAndSum;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 註冊狀態
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average",  // 狀態的名字
                        Types.TUPLE(Types.LONG, Types.LONG)); // 狀態存儲的數據類型,防止類型擦除
        countAndSum = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(
            Tuple2<Long, Long> element,
            Collector<Tuple2<Long, Double>> out) throws Exception {
        // 拿到當前的 key 的狀態值
        Tuple2<Long, Long> currentState = countAndSum.value();
        // 如果狀態值還沒有初始化,則初始化
        if (currentState == null) {
            currentState = Tuple2.of(0L, 0L);
        }
        // 更新狀態值中的元素的個數
        currentState.f0 += 1;
        // 更新狀態值中的總值
        currentState.f1 += element.f1;
        // 更新狀態
        countAndSum.update(currentState);
        // 判斷,如果當前的 key 出現了 3 次,則需要計算平均值,並且輸出
        if (currentState.f0 >= 3) {
            double avg = (double)currentState.f1 / currentState.f0;
            // 輸出 key 及其對應的平均值
            out.collect(Tuple2.of(element.f0, avg));
            //  清空狀態值
            countAndSum.clear();
        }
    }
}

(2)Main 方法

/**
 * 需求:當接收到的相同 key 的元素個數等於 3 個,就計算這些元素的 value 的平均值。
 */
public class TestKeyedStateMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
                env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L,
                                5L));
        // 輸出:
        //(1,5.0)
        //(2,3.6666666666666665)
        dataStreamSource
                .keyBy(0)
                .flatMap(new CountWindowAverageWithValueState())
                .print();
        env.execute();
    }
}

(3)輸出

11> (1,5.0)
16> (2,3.6666666666666665)

3.2 KeyedState 之 ListState
ListState[T] 存儲了一個由 T 類型數據組成的列表。我們可以使用 add(IN var1) 或 addAll(Listvar1) 向狀態中添加元素,使用 OUT get() 獲取整個列表,使用 update(Listvar1) 來更新列表,新的列表將替換舊的列表。

和 3.1 類似,main 方法換一下 CountWindowAverageWithListState 類就可以了。

public class CountWindowAverageWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    //1. ListState 保存的是對應的一個 key 的出現的所有的元素
    private ListState<Tuple2<Long, Long>> elementsByKey;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 註冊狀態
        ListStateDescriptor<Tuple2<Long, Long>> descriptor =
            new ListStateDescriptor<>(
                "average",  // 狀態的名字
                Types.TUPLE(Types.LONG, Types.LONG)); // 狀態存儲的數據類型
        elementsByKey = getRuntimeContext().getListState(descriptor);
    }
    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws Exception {
        // 拿到當前的 key 的狀態值
        Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();
        // 如果狀態值還沒有初始化,則初始化
        if (currentState == null) {
            elementsByKey.addAll(Collections.emptyList());
        }
        // 更新狀態
        elementsByKey.add(element);
        // 判斷,如果當前的 key 出現了 3 次,則需要計算平均值,並且輸出
        List<Tuple2<Long, Long>> allElements = new ArrayList<>((Collection<? extends Tuple2<Long, Long>>) elementsByKey.get());
        if (allElements.size() >= 3) {
            long count = 0;
            long sum = 0;
            for (Tuple2<Long, Long> ele : allElements) {
                count++;
                sum += ele.f1;
            }
            double avg = (double) sum / count;
            out.collect(Tuple2.of(element.f0, avg));
            // 清除狀態
            elementsByKey.clear();
        }
    }
}

3.3 KeyedState 之 MapState
MapState 存儲一個 Key-Value map,其功能與 Java 的 Map 幾乎相同。UV get(UK var1) 可以獲取某個 key 下的 value,void put(UK var1, UV var2) 可以對某個 key 設置 value,boolean contains(UK var1) 判斷某個 key 是否存在,void remove(UK var1) 刪除某個 key 以及對應的 value,Iterable entries() 返回 MapState 中所有的元素,Iterator iterator() 返回一個迭代器。需要注意的是,MapState 中的 key 和 Keyed State 的 key 不是同一個 key。</entry</entry

public class CountWindowAverageWithMapState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
    // managed keyed state
    //1. MapState :key 是一個唯一的值,value 是接收到的相同的 key 對應的 value 的值
    private MapState<String, Long> mapState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 註冊狀態
        MapStateDescriptor<String, Long> descriptor =
            new MapStateDescriptor<>(
                "average",  // 狀態的名字
                String.class, Long.class); // 狀態存儲的數據類型
        mapState = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Double>> out) throws Exception {
        mapState.put(UUID.randomUUID().toString(), element.f1);
        // 判斷,如果當前的 key 出現了 3 次,則需要計算平均值,並且輸出
        List<Long> allElements = new ArrayList<>((Collection<? extends Long>) mapState.values());

        if (allElements.size() >= 3) {
            long count = 0;
            long sum = 0;
            for (Long ele : allElements) {
                count++;
                sum += ele;
            }
            double avg = (double) sum / count;
            out.collect(Tuple2.of(element.f0, avg));
            // 清除狀態
            mapState.clear();
        }
    }
}

3.4 KeyedState 之 ReducingState

ReducingState[T] 和 AggregatingState[IN, OUT] 與 ListState[T] 同屬於 MergingState[T]。與 ListState[T] 不同的是,ReducingState[T] 只有一個元素,而不是一個列表。它的原理是新元素通過 void add(IN var1) 加入後,與已有的狀態元素使用 ReduceFunction 合併爲一個元素,並更新到狀態裏。AggregatingState[IN, OUT] 與 ReducingState[T] 類似,也只有一個元素,只不過 AggregatingState[IN, OUT] 的輸入和輸出類型可以不一樣。ReducingState[T] 和 AggregatingState[IN, OUT] 與窗口上進行 ReduceFunction 和 AggregateFunction 很像,都是將新元素與已有元素做聚合。

需求:求接收到的相同 key 的 value 的 sum。

public class SumFunction extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    // 用於保存每一個 key 對應的 value 的總值
    private ReducingState<Long> sumState;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 註冊狀態
        // 聚合函數
        ReducingStateDescriptor<Long> descriptor =
            new ReducingStateDescriptor<>(
                "sum",  // 狀態的名字
                (ReduceFunction<Long>) Long::sum, Long.class); // 狀態存儲的數據類型
        sumState = getRuntimeContext().getReducingState(descriptor);
    }
    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, Long>> out) throws Exception {
        // 將數據放到狀態中
        sumState.add(element.f1);

        out.collect(Tuple2.of(element.f0, sumState.get()));
    }
}

(2)Main 類

將 3.1 的 Main 方法中 flatMap 的 Function 替換爲 SumFunction

(3)輸出

16> (2,4)
11> (1,3)
16> (2,6)
11> (1,8)
16> (2,11)
11> (1,15)

3.5 KeyedState 之 AggregatingState
需求:求接收到的相同 key 的 value 顯示出來。

(1)繼承算子的 RichFunction,創建狀態並編寫業務邏輯。

public class ContainsValueFunction extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
    private AggregatingState<Long, String> totalStr;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 註冊狀態
        AggregatingStateDescriptor<Long, String, String> descriptor =
            new AggregatingStateDescriptor<>(
                "totalStr",  // 狀態的名字
                new AggregateFunction<Long, String, String>() {
                    @Override
                    public String createAccumulator() {
                        return null;
                    }

                    @Override
                    public String add(Long value, String accumulator) {
                        if (StringUtils.isBlank(accumulator)) {
                            return String.valueOf(value);
                        }
                        return accumulator + " and " + value;
                    }

                    @Override
                    public String getResult(String accumulator) {
                        return accumulator;
                    }

                    @Override
                    public String merge(String a, String b) {
                        return null;
                    }
                }, String.class); // 狀態存儲的數據類型
        totalStr = getRuntimeContext().getAggregatingState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<Long, Long> element,
                        Collector<Tuple2<Long, String>> out) throws Exception {
        totalStr.add(element.f1);
        out.collect(Tuple2.of(element.f0, totalStr.get()));
    }
}

(2)Main 方法

將 3.1 的 Main 方法中 flatMap 的 Function 替換爲 ContainsValueFunction。

(3)輸出

11> (1,3)
16> (2,4)
11> (1,3 and 5)
16> (2,4 and 2)
11> (1,3 and 5 and 7)
16> (2,4 and 2 and 5)

四、Operator State

4.1 OperatorState 之 ListState

狀態從本質上來說,是 Flink 算子子任務的一種本地數據,爲了保證數據可恢復性,使用 Checkpoint 機制來將狀態數據持久化輸出到存儲空間上。狀態相關的主要邏輯有兩項:

Keyed State 對這兩項內容做了更完善的封裝,開發者可以開箱即用。對於 Operator State 來說,每個算子子任務管理自己的 Operator State,或者說每個算子子任務上的數據流共享同一個狀態,可以訪問和修改該狀態。Flink 的算子子任務上的數據在程序重啓、橫向伸縮等場景下不能保證百分百的一致性。換句話說,重啓 Flink 應用後,某個數據流元素不一定會和上次一樣,還能流入該算子子任務上。因此,我們需要根據自己的業務場景來設計 snapshot 和 restore 的邏輯。爲了實現這兩個步驟,Flink 提供了最爲基礎的 CheckpointedFunction 接口類。

public interface CheckpointedFunction {

  // Checkpoint時會調用這個方法,我們要實現具體的snapshot邏輯,比如將哪些本地狀態持久化
  void snapshotState(FunctionSnapshotContext context) throws Exception;
  // 初始化時會調用這個方法,向本地狀態中填充數據
  void initializeState(FunctionInitializationContext context) throws Exception;
}

在 Flink 的 Checkpoint 機制下,當一次 snapshot 觸發後,snapshotState 會被調用,將本地狀態持久化到存儲空間上。這裏我們可以先不用關心 snapshot 是如何被觸發的,暫時理解成 snapshot 是自動觸發的,後續文章會介紹 Flink 的 Checkpoint 機制。

initializeState 在算子子任務初始化時被調用,初始化包括兩種場景:

目前 Operator State 主要有三種,其中 ListState 和 UnionListState 在數據結構上都是一種 ListState,還有一種 BroadcastState。這裏我們主要介紹 ListState 這種列表形式的狀態。這種狀態以一個列表的形式序列化並存儲,以適應橫向擴展時狀態重分佈的問題。每個算子子任務有零到多個狀態 S,組成一個列表 ListState[S]。各個算子子任務將自己狀態列表的 snapshot 到存儲,整個狀態邏輯上可以理解成是將這些列表連接到一起,組成了一個包含所有狀態的大列表。當作業重啓或橫向擴展時,我們需要將這個包含所有狀態的列表重新分佈到各個算子子任務上。

ListState 和 UnionListState 的區別在於:

Operator State 的實際應用場景不如 Keyed State 多,它經常被用在 Source 或 Sink 等算子上,用來保存流入數據的偏移量或對輸出數據做緩存,以保證 Flink 應用的 Exactly-Once 語義。這裏我們來看一個 Flink 官方提供的 Sink 案例以瞭解 CheckpointedFunction 的工作原理。

需求: 每兩條數據打印一次結果 1000

(1)實現 SinkFunction 和 CheckpointedFunction

public class CustomSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    // 用於緩存結果數據的
    private List<Tuple2<String, Integer>> bufferElements;
    // 表示內存中數據的大小閾值
    private int threshold;
    // 用於保存內存中的狀態信息
    private ListState<Tuple2<String, Integer>> checkpointState;
    // StateBackend
    // checkpoint
    public CustomSink(int threshold) {
        this.threshold = threshold;
        this.bufferElements = new ArrayList<>();
    }

    // Sink的核心處理邏輯,將上游數據value輸出到外部系統
    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        // 可以將接收到的每一條數據保存到任何的存儲系統中
        bufferElements.add(value);
        if (bufferElements.size() == threshold) {
            // send it to the sink
            // 這裏簡單打印
            System.out.println("自定義格式:" + bufferElements);
            // 清空本地緩存
            bufferElements.clear();
        }
    }

    // 重寫CheckpointedFunction中的snapshotState
    // 將本地緩存snapshot保存到存儲上
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception
    {
        // 將之前的Checkpoint清理
        checkpointState.clear();
        // 將最新的數據寫到狀態中
        for (Tuple2<String, Integer> ele : bufferElements) {
            checkpointState.add(ele);
        }
    }

    // 重寫CheckpointedFunction中的initializeState
    // 初始化狀態:用於在程序恢復的時候從狀態中恢復數據到內存
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 註冊ListStateDescriptor
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "bufferd -elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
        // 從FunctionInitializationContext中獲取OperatorStateStore,進而獲取ListState
        checkpointState = context.getOperatorStateStore().getListState(descriptor);
        // 如果是作業重啓,讀取存儲中的狀態數據並填充到本地緩存中
        if (context.isRestored()) {
            for (Tuple2<String, Integer> ele : checkpointState.get()) {
                bufferElements.add(ele);
            }
        }
    }
}

(2)Main 方法

/**
 * 需求: 每兩條數據打印一次結果 1000
 */
public class TestOperatorStateMain {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Tuple2<String, Integer>> dataStreamSource =
            env.fromElements(Tuple2.of("Spark", 3), Tuple2.of("Hadoop", 5),
                Tuple2.of("Hadoop", 7),
                Tuple2.of("Spark", 4));

        dataStreamSource.addSink(new CustomSink(2)).setParallelism(1);
        env.execute("TestStatefulApi");
    }
}

(3)輸出

自定義格式:[(Spark,3)(Hadoop,5)]
自定義格式:[(Hadoop,7)(Spark,4)]

上面的代碼在輸出到 Sink 之前,先將數據放在本地緩存中,並定期進行 snapshot,這實現了批量輸出的功能,批量輸出能夠減少網絡等開銷。同時,程序能夠保證數據一定會輸出外部系統,因爲即使程序崩潰,狀態中存儲着還未輸出的數據,下次啓動後還會將這些未輸出數據讀取到內存,繼續輸出到外部系統。

註冊和使用 Operator State 的代碼和 Keyed State 相似,也是先註冊一個 StateDescriptor,並指定狀態名字和數據類型,然後從 FunctionInitializationContext 中獲取 OperatorStateStore,進而獲取 ListState。如果是 UnionListState,那麼代碼改爲:context.getOperatorStateStore.getUnionListState。

狀態的初始化邏輯中,我們用 context.isRestored 來判斷是否爲作業重啓,這樣可以從之前的 Checkpoint 中恢復並寫到本地緩存中。

4.2 OperatorState 之 BroadCastState

廣播狀態是固定維護在堆內存中的,不會寫入文件系統或者 RocksDB。

下面我們通過 BroadCastState 控制程序的打印輸出爲例進行介紹。

(1)定義普通數據流,消費數據

DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);

(2)定義廣播流,用於廣播規則,從而控制程序打印輸出

DataStreamSource<String> broadStreamSource = env.socketTextStream("localhost", 8888);

(3)解析廣播流中的數據,解析爲二元組

DataStream<Tuple2<String, String>> broadStream =
    broadStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {
        @Override
        public Tuple2<String, String> map(String s) throws Exception {
            String[] strings = s.split(" ");
            return Tuple2.of(strings[0], (strings[1]));
        }
    });

(4)定義需要廣播的狀態類型,只支持

MapStateDescriptor<String, String> descriptor = new
    MapStateDescriptor<String, String>(
    "ControlStream",
    String.class,
    String.class
);

(5)用解析後的廣播流將狀態廣播出去,從而生成 BroadcastStream

BroadcastStream<Tuple2<String, String>> broadcastStream = broadStream.broadcast(descriptor);

(6)通過 connect 連接兩個流,用 process 分別處理兩個流中的數據。連接流時分爲兩種情況:

KeyedBroadcastProcessFunction 比 BroadcastProcessFunction 多了計時器服務和獲取當前 key 接口,當然,這兩個功能不一定能用到。

我們這裏使用的是 BroadcastProcessFunction,這三個泛型翻譯分別代表:

IN1: 數據流 (即非廣播流) 的元素類型
IN2: 廣播流的元素類型
OUT: 兩個流連接完成後,輸出流的元素類型。

BroadcastProcessFunction 中定義了兩個函數用於處理具體的連接邏輯和業務邏輯。因此主要需要實現以下兩個函數:

public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;

這裏處理廣播流的數據,將廣播流數據保存到 BroadcastState 中。value 是廣播流中的一個元素;ctx 是上下文,提供 BroadcastState 和修改方法;out 是輸出流收集器。

public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;

這個函數處理數據流的數據,這裏之只能獲取到 ReadOnlyBroadcastState,因爲 Flink 不允許在這裏修改 BroadcastState 的狀態。value 是數據流中的一個元素;ctx 是上下文,可以提供上下文環境和只讀的 BroadcastState;out 是輸出流收集器。

注意:KeyedBroadcastProcessFunction 中的 ReadOnlyContext 多了計時器服務和獲取當前 key 接口

整的代碼如下:

需求:通過 BroadCastState 控制程序的打印輸出

/**
 * 數據流:
 * i love flink
 * 廣播流:
 * key  flink  -> 代表數據流裏面,只要包含flink的單詞纔會被打印出來。
 */
public class TestBroadcastState {
    public static void main(String[] args) throws Exception {
        //獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1. 定義普通數據流,消費數據
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999);
        // 2. 定義廣播流,用於廣播規則,從而控制程序打印輸出
        DataStreamSource<String> broadStreamSource = env.socketTextStream("localhost", 8888);
        // 3. 解析廣播流中的數據成二元組
        DataStream<Tuple2<String, String>> broadStream =
            broadStreamSource.map((MapFunction<String, Tuple2<String, String>>) s -> {
                String[] strings = s.split(" ");
                return Tuple2.of(strings[0], (strings[1]));
            });
        //4. 定義需要廣播的狀態類型,只支持MapState
        MapStateDescriptor<String, String> descriptor = new
            MapStateDescriptor<>(
            "ControlStream",
            String.class,
            String.class
        );
        //5. 用解析後的廣播流將狀態廣播出去,從而生成BroadcastStream
        BroadcastStream<Tuple2<String, String>> broadcastStream = broadStream.broadcast(descriptor);
        //6. 通過connect連接兩個流,用process分別處理兩個流中的數據
        dataStreamSource
            .connect(broadcastStream)
            .process(new KeyWordsCheckProcessor())
            .print();
        env.execute();
    }

    private static class KeyWordsCheckProcessor extends BroadcastProcessFunction<String, Tuple2<String, String>, String> {
        MapStateDescriptor<String, String> descriptor =
            new MapStateDescriptor<>(
                "ControlStream",
                String.class,
                String.class
            );
        @Override
        public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
            // 將接收到的控制數據放到 broadcast state 中
            ctx.getBroadcastState(descriptor).put(value.f0, value.f1);
            // 打印控制信息
            System.out.println(Thread.currentThread().getName() + " 接收到控制信息 :" + value);
        }

        @Override
        public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
            // 從 broadcast state 中拿到控制信息
            String keywords = ctx.getBroadcastState(descriptor).get("key");
            // 獲取符合條件的單詞
            if (value.contains(keywords)) {
                out.collect(value);
            }
        }
    }
}

五、狀態後端 State backend

5.1 狀態後端

Flink 將 Checkpoint 快照的存儲位置稱爲狀態後端 (State Backend)。狀態後端有兩種實現:一種基於 RocksDB 將工作狀態保存在磁盤上,使用 RocksDBStateBackend 類型;另一種基於堆將工作狀態保存在 Java 的堆內存中。基於堆的狀態後端有兩種類型:FsStateBackend 和 MemoryStateBackend。FsStateBackend 會定時將其狀態快照持久化到分佈式文件系統中,MemoryStateBackend 使用 JobManager 的堆內存保存狀態快照。

Flink 執行 Checkpoint 的流程:

Flink 執行 Checkpoint 的架構:


Flink 狀態後端的分類對比:

5.1.1 RocksDBStateBackend

RocksDBStateBackend 狀態後端將工作狀態 (State) 存儲在 RocksDB(一種 KV 型數據庫)中。這個狀態後端可以存儲超過內存並溢出到磁盤的非常大的狀態。所有的 key/value 狀態存儲在 RocksDB 的 key/value 索引中。爲了防止丟失狀態數據,Flink 將獲取 RocksDB 數據庫的快照作爲 Checkpoint,並在文件系統(默認情況下)或其他可配置狀態後端中持久化該快照。可以在 Flink 應用程序中使用 RocksDBStateBackend 類中的 setPredefinedOptions(PredefinedOptions)方法和 setRocksDBOptions(RocksDBOptionsFactory)設置 RocksDB 的相關選項。

RocksDBStateBackend 是目前唯一支持增量 Checkpoint(增量快照)的狀態後端。不同於產生一個包含所有數據的全量備份,增量快照中只包含自上一次快照完成之後被修改的記錄,因此可以顯著減少快照完成所耗的時間。

雖然 RocksDBStateBackend 支持增量快照,但是默認情況下沒有開啓該功能,使用的仍然是全量快照,如果需
要開啓,可以通過在 flink-conf.yaml 配置文件中設置 state.backend.incremental:true 實現。

對於超大狀態的聚合,例如以天爲單位的窗口計算並且對讀寫性能要求不高的作業,建議使用 RocksDBStateBackend。

5.1.2 FsStateBackend
FsStateBackend 狀態後端在 TaskManager 的內存(JVM 堆)中保存運行時的工作狀態。執行 Checkpoint 時,會將狀態數據以文件的形式持久化到外部文件系統中。如果外部文件系統是持久的分佈式文件系統,則此狀態後端支持高可用設置。每個 Checkpoint 將分別將其所有文件存儲在包含 Checkpoint 編號的文件系統的子目錄中,例如 HDFS 目錄 hdfs://namenode:port/flink-checkpoints/chk-17/。

如果一個 TaskManager 併發執行多個任務(如果 TaskManager 有多個 Task Slot,或者使用 Task Slot 共享),那麼所有任務的聚合狀態需要被放入該 TaskManager 的內存。FsStateBackend 狀態後端直接與元數據一起存儲小的狀態塊,以避免創建許多小文件。其閾值是可配置的。當增加這個閾值時,Checkpoint 元數據的大小也會增加。所有保留的已完成 Checkpoint 的元數據需要裝入 JobManager 的堆內存。這都不是問題,除非閾值太大。可以通過調用 getMinFileSizeThreshold() 方法獲取設置的閾值。

FsStateBackend 狀態後端適用於狀態比較大、窗口比較長的作業以及所有高可用的場景。對於一些以分鐘爲單位的窗口聚合,建議使用該狀態後端。

5.1.3 MemoryStateBackend

MemoryStateBackend 狀態後端在 TaskManager 的內存(JVM 堆)中以 Java 對象的形式保存運行時的工作狀態。執行 Checkpoint 時,會直接將其狀態保存到 JobManager 的堆內存。默認每個狀態在 JobManager 中允許使用的最大內存爲 5MB,可以通過 MemoryStateBackend 的構造函數進行調整。

該狀態後端建議只用於實驗、本地測試或狀態數據非常小的流應用程序,因爲它需要將 Checkpoint 數據存儲在 JobManager 的內存中,較大的狀態數據將佔用較大一部分 JobManager 的主內存,從而降低操作的穩定性。對於任何其他設置,都應該使用 FsStateBackend。FsStateBackend 將工作狀態以同樣的方式保存在 TaskManager 上,但執行 Checkpoint 時狀態數據直接存儲在文件系統中,而不是 JobManager 的內存中,因此支持非常大的狀態數據。

所有狀態後端都可以在應用程序中配置(通過使用各自的構造函數參數創建狀態後端並在執行環境中設置),也可以在 Flink 集羣環境中指定。如果在應用程序中指定了狀態後端,則它可以從 Flink 集羣環境配置中獲取額外的配置參數。例如,如果在沒有默認保存點(Savepoint,在 4.13.4 節將詳細講解)目錄的應用程序中,則它將選擇在運行的集羣環境的 Flink 配置中指定的默認保存點目錄。

通常,建議在生產中避免使用 MemoryStateBackend,因爲它將快照存儲在 JobManager 的內存中,而不是持久化到磁盤。當需要在 FsStateBackend 和 RocksDBStateBackend 之間進行選擇時,需要從性能和可伸縮性方面進行考慮。FsStateBackend 非常快,因爲每個狀態訪問和更新操作在 Java 堆內存上,但是狀態大小受集羣內可用內存的限制。另一方面,RocksDBStateBackend 可以根據可用磁盤空間進行擴展,並且是唯一支持增量快照的狀態後端。但是每個狀態訪問和更新都需要序列化/反序列化,這樣會導致平均性能比內存狀態後端慢一個數量級。

5.2 狀態後端配置

可以在配置文件 flink-conf.yaml 中通過屬性 state.backend 設置全局默認的狀態後端。例如,設置狀態後端爲 FsStateBackend:

#使用文件系統存儲快照
state.backend: filesystem
#存儲快照的目錄
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

state.backend 的可選值包括 jobmanager(MemoryStateBackend 狀態後端)、filesystem(FsStateBackend 狀態後端)、rocksdb(RocksDBStateBackend 狀態後端),或使用實現了狀態後端工廠 StateBackendFactory 的類的全限定類名,例如 RocksDBStateBackend 對應 org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory。

state.checkpoints.dir 指定了所有狀態後端的數據存儲目錄。

也可以在應用程序中使用 StreamExecutionEnvironment API 對作業的狀態後端進行設置。從 Flink 1.13 開始,在 API 層面爲了對狀態後端更容易理解,重新編寫了狀態後端的公共類,以幫助開發者更好地理解本地狀態存儲和檢查點存儲的分離。用戶可以在不丟失任何狀態或一致性的情況下遷移現有應用程序以使用新的 API。

例如,設置狀態後端爲 FsStateBackend,代碼如下:


設置狀態後端爲 RocksDBStateBackend,代碼如下:


設置狀態後端爲 MemoryStateBackend,代碼如下:

val env=StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new HashMapStateBackend)
env.getCheckpointConfig.setCheckpointStorage(new JobManagerCheckpointStorage)

上述代碼中,HashMapStateBackend 在 TaskManager 的內存(JVM 堆)中保存運行時的工作狀態。執行 Checkpoint 時,會根據配置的 CheckpointStorage 保存狀態到指定的位置。CheckpointStorage 是一個接口,定義了狀態後端如何存儲其狀態以在流應用程序中進行容錯。該接口的各種實現以不同的方式存儲檢查點狀態,並具有不同的可用性保證。

例如,JobManagerCheckpointStorage 將檢查點數據存儲在 JobManager 的內存中。它是輕量級的,沒有額外的依賴項,但不可擴展,只支持小量狀態數據。這種檢查點存儲策略便於本地測試和開發。

FileSystemCheckpointStorage 將檢查點存儲在 HDFS、S3 等文件系統中。此存儲策略支持大量狀態數據,可以達到數 TB,同時爲有狀態應用程序提供高度可用的基礎。對於大多數生產部署,建議使用此檢查點存儲策略。

5.3 Checkpoint 配置

默認情況下,Checkpoint 是禁用的,可以通過相應的配置啓用。

5.3.1 全局配置

可以在 Flink 的配置文件 flink-conf.yaml 中對 Checkpoint 進行全局配置,代碼如下:

 state.backend: filesystem
 state.checkpoints.dir: hdfs://namenode:9000/flink-checkpoints
 state.backend.incremental: false

state.backend 用於指定狀態後端,支持的值爲 jobmanager、filesystem、rocksdb,常用值爲 filesystem 或 rocksdb,默認爲 none。jobmanager 表示使用 MemoryStateBackend 狀態後端,filesystem 表示使用 FsStateBackend 狀態後端,rocksdb 表示使用 RocksDBStateBackend 狀態後端。

state.checkpoints.dir 用於指定 Checkpoint 在文件系統的存儲目錄,默認爲 none。

state.backend.incremental 用於開啓/禁用增量 Checkpoint 功能,默認爲 false。對於支持增量 Checkpoint 的狀態後端有用,例如 RocksDBStateBackend。

常見 Flink Checkpoint 全局配置選項介紹:

5.3.2 應用配置

除了可以在 Flink 的配置文件中對 Checkpoint 進行全局配置外,還可以在 Flink 應用程序中通過代碼配置 Checkpoint,該配置將覆蓋配置文件中的全局配置。

在 Flink 應用程序中進行 Checkpoint 配置,必須的配置選項如下:

val env=StreamExecutionEnvironment.getExecutionEnvironment
//每隔1秒執行一次Checkpoint
env.enableCheckpointing(1000)
//指定狀態後端
env.setStateBackend(new FsStateBackend("file:///D:checkpoint"))

其他可選選項說明如下:

當設置了每隔一秒執行一次 Checkpoint 時,如果由於網絡延遲,前一次 Checkpoint 執行較慢,容易導致與後一次 Checkpoint 執行重疊。爲了防止這種情況,可以設置兩次 Checkpoint 之間的最小時間間隔:

//設置兩次Checkpoint之間的最小時間間隔爲500毫秒,默認爲0
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

設置可容忍的失敗的 Checkpoint 數量,默認值爲 0,意味着不容忍任何 Checkpoint 失敗:

env.getCheckpointConfig.setTolerableCheckpointFailureNumber(10)

作業取消時保留 Checkpoint 數據,以便根據實際需要恢復到指定的 Checkpoint:

env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

ExternalizedCheckpointCleanup 的相關選項如下:

CheckpointingMode 定義了系統在出現故障時提供的一致性保證。例如,設置 Checkpoint 執行模式爲 exactly once(默認值):

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

設置 Checkpoint 執行模式爲 at least once:

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

設置 Checkpoint 執行的超時時間爲一分鐘,超過該時間則被丟棄,默認超時時間爲 10 分鐘:

env.getCheckpointConfig.setCheckpointTimeout(6000)

設置最大允許的同時執行的 Checkpoint 的數量,默認爲 1。當達到設置的最大值時,如果需要觸發新的 Checkpoint,需要等待正在執行的 Checkpoint 完成或過期:

env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/47AMkD2ugjY80IoU_MVlJw