從 Go channel 中如何批量讀取數據 ?

在 Go 語言中,我們可以利用 channel 作爲數據的傳輸通道,通過定期批量讀取 channel 中的數據,並將這些數據批量發送到 Kafka 或者進行網絡寫入。這樣可以提高系統的性能,減少單個請求的網絡開銷。

批量處理的主要邏輯是:從 channel 中接收數據,積累到一定數量或者達到時間限制後,將數據批量處理(例如發送到 Kafka 或者寫入網絡)。

下面我將展示一個從 Go channel 中批量讀取數據,並批量發送到 Kafka 和批量寫入網絡數據的示例。

#1. 批量讀取 Go channel 的通用邏輯

批量讀取 Go channel 的通用邏輯可以通過一個定時器和一個緩衝區來實現:

package main



import (

	"fmt"

	"time"

)



func batchProcessor(ch <-chan string, batchSize int, flushInterval time.Duration) {

	var batch []string

	timer := time.NewTimer(flushInterval)



	for {

		select {

		case data := <-ch:

			batch = append(batch, data)

			// 當緩衝區達到批量大小時處理

			if len(batch) >= batchSize {

				fmt.Printf("Processing batch: %v\n", batch)

				batch = nil

				// 重置定時器

				timer.Reset(flushInterval)

			}



		case <-timer.C:

			// 如果達到時間間隔,但 batch 不爲空,也進行處理

			if len(batch) > 0 {

				fmt.Printf("Processing batch on timer: %v\n", batch)

				batch = nil

			}

			// 重置定時器

			timer.Reset(flushInterval)

		}

	}

}



func main() {

	dataChannel := make(chan string)

	batchSize := 5

	flushInterval := 3 * time.Second



	// 啓動批量處理協程

	go batchProcessor(dataChannel, batchSize, flushInterval)



	// 模擬向 channel 發送數據

	for i := 1; i <= 10; i++ {

		dataChannel <- fmt.Sprintf("data-%d", i)

		time.Sleep(1 * time.Second)

	}



	// 讓主程序暫停一會,以便查看處理結果

	time.Sleep(5 * time.Second)

}

上面的代碼展示了從 channel 中批量讀取數據的基本機制:

#2. 批量發送數據到 Kafka

我們可以在批量處理邏輯的基礎上,利用 Kafka 客戶端庫實現批量發送消息到 Kafka。

使用 github.com/Shopify/sarama 是 Go 中常用的 Kafka 客戶端庫。首先安裝它:

go get github.com/Shopify/sarama

然後實現批量發送數據到 Kafka 的示例:

package main



import (

	"fmt"

	"log"

	"time"



	"github.com/Shopify/sarama"

)



// 初始化 Kafka 生產者

func initKafkaProducer(brokers []string) sarama.SyncProducer {

	config := sarama.NewConfig()

	config.Producer.Return.Successes = true

	producer, err := sarama.NewSyncProducer(brokers, config)

	if err != nil {

		log.Fatalf("Failed to start Kafka producer: %v", err)

	}

	return producer

}



// 批量發送消息到 Kafka

func sendBatchToKafka(producer sarama.SyncProducer, topic string, messages []string) {

	var kafkaMessages []*sarama.ProducerMessage

	for _, msg := range messages {

		kafkaMessages = append(kafkaMessages, &sarama.ProducerMessage{

			Topic: topic,

			Value: sarama.StringEncoder(msg),

		})

	}



	err := producer.SendMessages(kafkaMessages)

	if err != nil {

		log.Printf("Failed to send messages: %v", err)

	} else {

		log.Printf("Successfully sent batch to Kafka: %v", messages)

	}

}



// 批量處理 Kafka 消息

func kafkaBatchProcessor(producer sarama.SyncProducer, topic string, ch <-chan string, batchSize int, flushInterval time.Duration) {

	var batch []string

	timer := time.NewTimer(flushInterval)



	for {

		select {

		case msg := <-ch:

			batch = append(batch, msg)

			if len(batch) >= batchSize {

				sendBatchToKafka(producer, topic, batch)

				batch = nil

				timer.Reset(flushInterval)

			}



		case <-timer.C:

			if len(batch) > 0 {

				sendBatchToKafka(producer, topic, batch)

				batch = nil

			}

			timer.Reset(flushInterval)

		}

	}

}



func main() {

	// Kafka broker 和 topic 配置

	brokers := []string{"localhost:9092"}

	topic := "test_topic"



	// 初始化 Kafka 生產者

	producer := initKafkaProducer(brokers)

	defer producer.Close()



	dataChannel := make(chan string)

	batchSize := 5

	flushInterval := 3 * time.Second



	// 啓動 Kafka 批量處理協程

	go kafkaBatchProcessor(producer, topic, dataChannel, batchSize, flushInterval)



	// 模擬向 channel 發送數據

	for i := 1; i <= 10; i++ {

		dataChannel <- fmt.Sprintf("message-%d", i)

		time.Sleep(1 * time.Second)

	}



	// 讓主程序暫停一會以便查看處理結果

	time.Sleep(5 * time.Second)

}

在這個示例中:

#3. 批量寫入網絡數據

同樣的邏輯可以用來批量寫入網絡數據。比如,將數據批量寫入到某個 HTTP API。

這裏我們使用 Go 的 net/http 來實現批量發送 HTTP 請求:

package main



import (

	"bytes"

	"fmt"

	"log"

	"net/http"

	"time"

)



// 批量發送 HTTP 請求

func sendBatchToAPI(endpoint string, batch []string) {

	// 構造請求體

	var requestBody bytes.Buffer

	for _, data := range batch {

		requestBody.WriteString(fmt.Sprintf("%s\n", data))

	}



	// 發送 HTTP POST 請求

	resp, err := http.Post(endpoint, "text/plain", &requestBody)

	if err != nil {

		log.Printf("Failed to send batch: %v", err)

		return

	}

	defer resp.Body.Close()



	log.Printf("Successfully sent batch to API: %v", batch)

}



// 批量處理 HTTP 請求

func httpBatchProcessor(endpoint string, ch <-chan string, batchSize int, flushInterval time.Duration) {

	var batch []string

	timer := time.NewTimer(flushInterval)



	for {

		select {

		case msg := <-ch:

			batch = append(batch, msg)

			if len(batch) >= batchSize {

				sendBatchToAPI(endpoint, batch)

				batch = nil

				timer.Reset(flushInterval)

			}



		case <-timer.C:

			if len(batch) > 0 {

				sendBatchToAPI(endpoint, batch)

				batch = nil

			}

			timer.Reset(flushInterval)

		}

	}

}



func main() {

	// API endpoint

	apiEndpoint := "http://localhost:8080/receive"



	dataChannel := make(chan string)

	batchSize := 5

	flushInterval := 3 * time.Second



	// 啓動 HTTP 批量處理協程

	go httpBatchProcessor(apiEndpoint, dataChannel, batchSize, flushInterval)



	// 模擬向 channel 發送數據

	for i := 1; i <= 10; i++ {

		dataChannel <- fmt.Sprintf("data-%d", i)

		time.Sleep(1 * time.Second)

	}



	// 讓主程序暫停一會以便查看處理結果

	time.Sleep(5 * time.Second)

}

#總結

以上展示了通過 Go channel 批量讀取數據,並批量發送到 Kafka 或者 HTTP API 的實現:

這個架構非常適合高吞吐量的任務處理場景,如日誌系統、數據處理管道等。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/uDG_Gqd_EFLqzAh_ntzEVg