Kafka 消息軌跡方案設計與實現
前言
在處理過的幾個千萬級 TPS 的 Kafka 集羣中,消息追蹤始終是一個既重要又棘手的問題。一條消息從 Producer 發出後,經過複雜的處理流程,最終被 Consumer 消費,中間可能會經歷重試、重平衡、多副本複製等多個環節。如果沒有完善的追蹤機制,一旦出現問題將很難定位。本文將詳細介紹 Kafka 消息軌跡的實現方案。
- Kafka 消息處理模型
在設計追蹤方案前,我們需要先理解 Kafka 的消息處理模型。一條消息在 Kafka 中的生命週期如下:
- Producer 發送階段
-
消息序列化
-
分區選擇
-
批量發送
-
壓縮處理
- Broker 存儲階段
-
Leader 接收
-
副本同步
-
日誌存儲
-
索引更新
- Consumer 消費階段
-
消費組協調
-
消息拉取
-
位移提交
-
重平衡處理
在每個階段,都需要記錄相應的軌跡信息。
- 基於攔截器的實現方案
Kafka 提供了 Producer 和 Consumer 的攔截器機制,我們可以基於此實現消息軌跡。
2.1 Producer 端實現
首先,讓我們看看 Producer 端的軌跡記錄:
public class TraceProducerInterceptor implements ProducerInterceptor<String, String> {
private final TraceCollector collector;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 在發送消息前記錄軌跡
String traceId = record.headers().lastHeader("TRACE_ID").value().toString();
// 構建軌跡事件
TraceEvent event = TraceEvent.builder()
.traceId(traceId)
.messageId(generateMessageId()) // 生成消息ID
.timestamp(System.currentTimeMillis())
.phase(TracePhase.PRODUCER_SEND)
.topic(record.topic())
.partition(record.partition())
.build();
// 收集軌跡
collector.collect(event);
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 記錄發送結果
String traceId = extractTraceId(metadata); // 從元數據中提取TraceId
TraceEvent event = TraceEvent.builder()
.traceId(traceId)
.messageId(extractMessageId(metadata))
.timestamp(System.currentTimeMillis())
.phase(TracePhase.PRODUCER_ACK)
.topic(metadata.topic())
.partition(metadata.partition())
.offset(metadata.offset())
.status(exception == null ? "SUCCESS" : "FAILED")
.errorMessage(exception != null ? exception.getMessage() : null)
.build();
collector.collect(event);
}
}
Producer 攔截器可以捕獲消息發送的整個生命週期。在onSend
方法中,我們記錄消息發送前的軌跡;在onAcknowledgement
方法中,記錄發送結果。通過這種方式,我們能夠完整追蹤消息從生產者到 broker 的過程。
2.2 Consumer 端實現
Consumer 端的軌跡記錄相對複雜一些,因爲需要處理消費重試、重平衡等場景:
public class TraceConsumerInterceptor implements ConsumerInterceptor<String, String> {
private final TraceCollector collector;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// 記錄消費開始軌跡
for (ConsumerRecord<String, String> record : records) {
String traceId = extractTraceId(record);
TraceEvent event = TraceEvent.builder()
.traceId(traceId)
.messageId(extractMessageId(record))
.timestamp(System.currentTimeMillis())
.phase(TracePhase.CONSUMER_RECEIVE)
.topic(record.topic())
.partition(record.partition())
.offset(record.offset())
.consumerGroup(getConsumerGroup())
.consumerId(getConsumerId())
.build();
collector.collect(event);
}
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// 記錄位移提交軌跡
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TraceEvent event = TraceEvent.builder()
.timestamp(System.currentTimeMillis())
.phase(TracePhase.CONSUMER_COMMIT)
.topic(entry.getKey().topic())
.partition(entry.getKey().partition())
.offset(entry.getValue().offset())
.consumerGroup(getConsumerGroup())
.consumerId(getConsumerId())
.build();
collector.collect(event);
}
}
}
2.3 消息頭部擴展
爲了在消息流轉過程中傳遞軌跡信息,我們需要擴展 Kafka 的消息頭部:
public class TraceHeadersExtension {
private static final String TRACE_ID = "TRACE_ID";
private static final String MESSAGE_ID = "MESSAGE_ID";
private static final String TIMESTAMP = "TRACE_TIMESTAMP";
private static final String SOURCE = "TRACE_SOURCE";
public static void inject(ProducerRecord<?, ?> record, String traceId) {
record.headers()
.add(TRACE_ID, traceId.getBytes())
.add(MESSAGE_ID, generateMessageId().getBytes())
.add(TIMESTAMP, String.valueOf(System.currentTimeMillis()).getBytes())
.add(SOURCE, getServiceName().getBytes());
}
public static TraceContext extract(ConsumerRecord<?, ?> record) {
String traceId = new String(record.headers().lastHeader(TRACE_ID).value());
String messageId = new String(record.headers().lastHeader(MESSAGE_ID).value());
long timestamp = Long.parseLong(new String(record.headers().lastHeader(TIMESTAMP).value()));
String source = new String(record.headers().lastHeader(SOURCE).value());
return new TraceContext(traceId, messageId, timestamp, source);
}
}
- Kafka Streams 應用的消息追蹤
對於 Kafka Streams 應用,我們需要特別處理,因爲它既是消費者又是生產者:
public class StreamsTraceProcessor implements Processor<String, String> {
private final TraceCollector collector;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
// 提取上游軌跡信息
TraceContext traceContext = extractTraceContext(context);
// 記錄處理開始
TraceEvent startEvent = TraceEvent.builder()
.traceId(traceContext.getTraceId())
.messageId(traceContext.getMessageId())
.timestamp(System.currentTimeMillis())
.phase(TracePhase.STREAMS_PROCESS_START)
.applicationId(context.applicationId())
.build();
collector.collect(startEvent);
try {
// 業務處理
String result = processValue(value);
// 轉發結果
context.forward(key, result);
// 記錄處理完成
recordSuccess(traceContext);
} catch (Exception e) {
// 記錄處理失敗
recordError(traceContext, e);
throw e;
}
}
private void recordSuccess(TraceContext traceContext) {
TraceEvent event = TraceEvent.builder()
.traceId(traceContext.getTraceId())
.messageId(traceContext.getMessageId())
.timestamp(System.currentTimeMillis())
.phase(TracePhase.STREAMS_PROCESS_END)
.status("SUCCESS")
.build();
collector.collect(event);
}
private void recordError(TraceContext traceContext, Exception e) {
TraceEvent event = TraceEvent.builder()
.traceId(traceContext.getTraceId())
.messageId(traceContext.getMessageId())
.timestamp(System.currentTimeMillis())
.phase(TracePhase.STREAMS_PROCESS_END)
.status("FAILED")
.errorMessage(e.getMessage())
.build();
collector.collect(event);
}
}
- 軌跡數據存儲與分析
軌跡數據的存儲和分析是整個方案的重要組成部分:
4.1 存儲設計
採用多級存儲策略:
public class TraceStorage {
private final ClickHouse timeseriesDB; // 明細數據
private final Elasticsearch searchDB; // 搜索服務
private final Redis cacheDB; // 實時緩存
public void store(TraceEvent event) {
// 1. 寫入實時緩存
cacheDB.setex(buildKey(event), TTL_SECONDS, event);
// 2. 異步寫入明細存儲
CompletableFuture.runAsync(() ->
timeseriesDB.insert(convertToClickHouseModel(event)));
// 3. 異步更新搜索索引
CompletableFuture.runAsync(() ->
searchDB.index(convertToSearchModel(event)));
}
public TraceChain getTraceChain(String traceId) {
// 1. 查詢緩存
List<TraceEvent> cachedEvents = cacheDB.get(buildKey(traceId));
if (!cachedEvents.isEmpty()) {
return buildChain(cachedEvents);
}
// 2. 查詢明細庫
List<TraceEvent> events = timeseriesDB.query(
"SELECT * FROM trace_events WHERE trace_id = ? ORDER BY timestamp",
traceId
);
return buildChain(events);
}
}
4.2 軌跡分析
實現一個軌跡分析器來處理軌跡數據:
public class TraceAnalyzer {
// 延遲分析
public LatencyMetrics analyzeLatency(TraceChain chain) {
long producerLatency = calculateProducerLatency(chain);
long brokerLatency = calculateBrokerLatency(chain);
long consumerLatency = calculateConsumerLatency(chain);
return new LatencyMetrics(
producerLatency,
brokerLatency,
consumerLatency
);
}
// 異常分析
public List<TraceAnomaly> analyzeAnomalies(TraceChain chain) {
List<TraceAnomaly> anomalies = new ArrayList<>();
// 檢查消息重試
if (hasRetries(chain)) {
anomalies.add(new TraceAnomaly(
AnomalyType.MESSAGE_RETRY,
getRetryCount(chain)
));
}
// 檢查消息積壓
if (hasBacklog(chain)) {
anomalies.add(new TraceAnomaly(
AnomalyType.MESSAGE_BACKLOG,
getBacklogSize(chain)
));
}
return anomalies;
}
}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/64-fNAZ00ZQyaF1LM8DGUg