基於 Flink 和 Drools 的實時日誌處理

背景

日誌系統接入的日誌種類多、格式複雜多樣,主流的有以下幾種日誌:

以上通過各種渠道接入的日誌,存在 2 個主要的問題:

爲了解決上面 2 個問題,我們基於 flink 和 drools 規則引擎做了實時的日誌處理服務。

系統架構

架構比較簡單,架構圖如下:

各類日誌都是通過 kafka 彙總,做日誌中轉。

flink 消費 kafka 的數據,同時通過 API 調用拉取 drools 規則引擎,對日誌做解析處理後,將解析後的數據存儲到 Elasticsearch 中,用於日誌的搜索和分析等業務。

爲了監控日誌解析的實時狀態,flink 會將日誌處理的統計數據,如每分鐘處理的日誌量,每種日誌從各個機器 IP 來的日誌量寫到 Redis 中,用於監控統計。

模塊介紹

系統項目命名爲 eagle。

重點講一下 eagle-log:

對接 kafka、ES 和 Redis

對接 kafka 和 ES 都比較簡單,用的官方的 connector(flink-connector-kafka-0.10 和 flink-connector-elasticsearch6),詳見代碼。

對接 Redis,最開始用的是 org.apache.bahir 提供的 redis connector,後來發現靈活度不夠,就使用了 Jedis。

在將統計數據寫入 redis 的時候,最開始用的 keyby 分組後緩存了分組數據,在 sink 中做統計處理後寫入,參考代碼如下:

String name = "redis-agg-log";
        DataStream<Tuple2<String, List<LogEntry>>> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
                .timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
                .process(new ProcessWindowFunction<LogEntry, Tuple2<String, List<LogEntry>>, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<LogEntry> iterable, Collector<Tuple2<String, List<LogEntry>>> collector) {
                        ArrayList<LogEntry> logs = Lists.newArrayList(iterable);
                        if (logs.size() > 0) {
                            collector.collect(new Tuple2(s, logs));
                        }
                    }
                }).setParallelism(redisSinkParallelism).name(name).uid(name);

後來發現這樣做對內存消耗比較大,其實不需要緩存整個分組的原始數據,只需要一個統計數據就 OK 了,優化後:

String name = "redis-agg-log";
        DataStream<LogStatWindowResult> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
                .timeWindow(Time.seconds(windowTime))
                .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
                .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction())
                .setParallelism(redisSinkParallelism).name(name).uid(name);

這裏使用了 flink 的聚合函數和 Accumulator,通過 flink 的 agg 操作做統計,減輕了內存消耗的壓力。

使用 broadcast 廣播 drools 規則引擎

1、drools 規則流通過 broadcast map state 廣播出去。

2、kafka 的數據流 connect 規則流處理日誌。

//廣播規則流
env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)
                .broadcast(ruleStateDescriptor);

//kafka數據流
FlinkKafkaConsumer010<LogEntry> source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);

//數據流connect規則流處理日誌
BroadcastConnectedStream<LogEntry, RuleBase> connectedStreams = dataSource.connect(ruleSource);
connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);

具體細節參考開源代碼。

小結

本系統提供了一個基於 flink 的實時數據處理參考,對接了 kafka、redis 和 elasticsearch,通過可配置的 drools 規則引擎,將數據處理邏輯配置化和動態化。

對於處理後的數據,也可以對接到其他 sink,爲其他各類業務平臺提供數據的解析、清洗和標準化服務。

項目地址:

https://github.com/luxiaoxun/eagle

作者:阿凡盧

來源:https://www.cnblogs.com/luxiaoxun/p/13197981.html

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/VMqd5rJiA8zNMVEsEPBzVg