Disruptor 有哪些典型的使用場景?

大家好,我是君哥。

Disruptor 是一款高性能的內存有界隊列,它通過內存預分配、無鎖併發、解決僞共享問題、使用 RingBuffer 取代阻塞隊列等措施來大幅提升隊列性能。

但開發者們往往對它的使用場景不太瞭解,到底應該在哪些場景使用呢?今天咱們就來聊一聊 Disruptor 的使用場景。

Disruptor 是一個生產 - 消費模式的隊列,這裏我們使用官網的示例,生產者發送一個 long 類型的變量,消費者收到消息後把變量打印出來。首先定義消息體:

public class LongEvent {
    private long value;
    public void set(long value)
    {
        this.value = value;
    }

    @Override
    public String toString()
    {
        return "LongEvent{" + "value=" + value + '}';
    }
}

爲了讓 Disruptor 給消息預先分配內存,定義一個 EventFactory,代碼如下:

public class LongEventFactory implements EventFactory<LongEvent>
{
    @Override
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

下面定義個消費者 LongEventHandler:

public class LongEventHandler implements EventHandler<LongEvent>
{
    private String consumer;

    public LongEventHandler(String consumer) {
        this.consumer = consumer;
    }

    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("consumer: " + consumer + ",Event: " + event);
    }
}
  1. 廣播場景

廣播場景在我們的開發工作中並不少見,比如系統收到上游系統的一個請求消息,然後把這個消息發送給多個下游系統來處理。Disruptor 支持廣播模式。比如消費者生產的消息由三個消費者來消費:

public class Broadcast {
    public static void main(String[] args) throws InterruptedException {
        int bufferSize = 1024;

        Disruptor<LongEvent> disruptor =
                new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
        EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
        EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");

        disruptor.handleEventsWith(consumer1, consumer2, consumer3);
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}
  1. 日誌收集

再來看一個日誌收集的例子。這裏我們假設一個場景,業務系統集羣有 3 個節點,每個節點打印的業務日誌發送到 Disruptor,Disruptor 下游有 3 個消費者負責日誌收集。這裏我們需要重新定義一個日誌收集處理類,代碼如下:

public class LogCollectHandler implements WorkHandler<LongEvent> {
    public LogCollectHandler(String consumer) {
        this.consumer = consumer;
    }

    private String consumer;


    @Override
    public void onEvent(LongEvent event)
    {
        System.out.println("consumer: " + consumer + ",Event: " + event);
    }
}

下面這個代碼是綁定消費者的代碼:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 WorkHandler<LongEvent> consumer1 = new LogCollectHandler("consumer1");
 WorkHandler<LongEvent> consumer2 = new LogCollectHandler("consumer2");
 WorkHandler<LongEvent> consumer3 = new LogCollectHandler("consumer3");

 disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);
 disruptor.start();
}

需要注意的是,上面使用的是 Disruptor 的 handleEventsWithWorkerPool 方法,使用的消費者不是 EventHandler,而是 WorkHandler。消費者組裏面的消費者如果是 WorkHandler,那消費者之間就是有競爭的,比如一個 Event 已經被 consumer1 消費過,那就不再會被其他消費者消費了。消費者組裏面的消費者如果是 EventHandler,那消費者之間是沒有競爭的,所有消息都會消費。

  1. 責任鏈

責任鏈這種設計模式我們都比較熟悉了,同一個對象的處理有多個不同的邏輯,每個邏輯作爲一個節點組成責任鏈,比如收到一條告警消息,處理節點分爲:給開發人員發送郵件、給運維人員發送短信、給業務人員發送 OA 消息。

Disruptor 支持鏈式處理消息,看下面的示例代碼:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
 EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
 EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");

 disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);
 disruptor.start();
}

Disruptor 也支持多個並行責任鏈,下圖是 2 條責任鏈的場景:

這裏給出一個示例代碼:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
 EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
 EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
 EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");
 EventHandler<LongEvent> consumer5 = new LongEventHandler("consumer5");
 EventHandler<LongEvent> consumer6 = new LongEventHandler("consumer6");

 disruptor.handleEventsWith(consumer1).then(consumer2).then(consumer3);
 disruptor.handleEventsWith(consumer4).then(consumer5).then(consumer6);
 disruptor.start();
}
  1. 多任務協作

一個經典的例子,我們在泡咖啡之前,需要燒水、洗被子、磨咖啡粉,這三個步驟可以並行,但是需要等着三步都完成之後,纔可以泡咖啡。

當然,這個例子可以用 Java 中的 CompletableFuture 來實現,代碼如下:

public static void main(String[] args){
    ExecutorService executor = ...;
    CompletableFuture future1 = CompletableFuture.runAsync(() -> {
        try {
            washCup();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture future2 = CompletableFuture.runAsync(() -> {
        try {
            hotWater();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture future3 = CompletableFuture.runAsync(() -> {
        try {
            grindCoffee();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, executor);

    CompletableFuture.allOf(future1, future2, future3).thenAccept(
            r -> {
                System.out.println("泡咖啡");
            }
    );
    System.out.println("我是主線程");
}

同樣,使用 Disruptor 也可以實現這個場景,看下面代碼:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 EventHandler<LongEvent> consumer1 = new LongEventHandler("consumer1");
 EventHandler<LongEvent> consumer2 = new LongEventHandler("consumer2");
 EventHandler<LongEvent> consumer3 = new LongEventHandler("consumer3");
 EventHandler<LongEvent> consumer4 = new LongEventHandler("consumer4");

 disruptor.handleEventsWith(consumer1, consumer2, consumer3).then(consumer4);
 disruptor.start();
}
  1. 多消費者組

類比主流消息隊列的場景,Disruptor 也可以實現多消費者組的場景,組間並行消費互不影響,組內消費者競爭消息,如下圖:

示例代碼如下:

public static void main(String[] args) throws InterruptedException {
 int bufferSize = 1024;

 Disruptor<LongEvent> disruptor =
   new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

 WorkHandler<LongEvent> consumer1 = new LogWorkHandler("consumer1");
 WorkHandler<LongEvent> consumer2 = new LogWorkHandler("consumer2");
 WorkHandler<LongEvent> consumer3 = new LogWorkHandler("consumer3");
 WorkHandler<LongEvent> consumer4 = new LogWorkHandler("consumer4");
 WorkHandler<LongEvent> consumer5 = new LogWorkHandler("consumer5");
 WorkHandler<LongEvent> consumer6 = new LogWorkHandler("consumer6");

 disruptor.handleEventsWithWorkerPool(consumer1, consumer2, consumer3);
 disruptor.handleEventsWithWorkerPool(consumer4, consumer5, consumer6);
 disruptor.start();
}
  1. 總結

通過消費者的靈活組合,Disruptor 的使用場景非常豐富。本文介紹了 Disruptor 的 5 個典型使用場景。在選型的時候,除了使用場景,更多地要考慮到 Disruptor 作爲高性能內存隊列的這個特點。

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/hYRInFDtZJxjc1e5sbrASw