設計模式最佳套路2 —— 愉快地使用管道模式

管道模式(Pipeline Pattern) 是**責任鏈模式(Chain of Responsibility Pattern)**的常用變體之一。在管道模式中,管道扮演着流水線的角色,將數據傳遞到一個加工處理序列中,數據在每個步驟中被加工處理後,傳遞到下一個步驟進行加工處理,直到全部步驟處理完畢。

PS:純的責任鏈模式在鏈上只會有一個處理器用於處理數據,而管道模式上多個處理器都會處理數據。

何時使用管道模式

任務代碼較爲複雜,需要拆分爲多個子步驟時,尤其是後續可能在任意位置添加新的子步驟、刪除舊的子步驟、交換子步驟順序,可以考慮使用管道模式。

愉快地使用管道模式

背景回放

最開始做模型平臺的時候,創建模型實例的功能,包括:“輸入數據校驗 -> 根據輸入創建模型實例 -> 保存模型實例到相關 DB 表” 總共三個步驟,也不算複雜,所以當時的代碼大概是這樣的:

public class ModelServiceImpl implements ModelService {
    /**
     * 提交模型(構建模型實例)
     */
    public CommonReponse<Long> buildModelInstance(InstanceBuildRequest request) {
        // 輸入數據校驗
        validateInput(request);
        // 根據輸入創建模型實例
        ModelInstance instance = createModelInstance(request);
        // 保存實例到相關 DB 表
        saveInstance(instance);
    }
}

然而沒有過多久,我們發現表單輸入數據的格式並不完全符合模型的輸入要求,於是我們要加入 “表單數據的預處理”。這功能還沒動手呢,又有業務方提出自己也存在需要對數據進行處理的情況(比如根據商家的表單輸入,生成一些其他業務數據作爲模型輸入)。

所以在 “輸入數據校驗” 之後,還需要加入 “表單輸入輸出預處理” 和 “業務方自定義數據處理(可選)”。這個時候我就面臨一個選擇:是否繼續通過在 buildModelInstance 中加入新的方法來實現這些新的處理步驟?好處就是可以當下偷懶,但是壞處呢:

  1. ModelService 應該只用來接收 HSF 請求,而不應該承載業務邏輯,如果將 提交模型 的邏輯都寫在這個類當中,違反了 單一職責,而且後面會導致 類代碼爆炸

  2. 將來每加入一個新的處理步驟或者刪除某個步驟,我就要修改 buildModelInstance 這個本應該非常內聚的方法,違反了 開閉原則

所以,爲了不給以後的自己挖坑,我覺得要思考一個萬全的方案。這個時候,我小腦袋花開始飛轉,突然閃過了 Netty 中的 ChannelPipeline —— 對哦,管道模式,不就正是我需要的嘛!

管道模式的實現方式也是多種多樣,接下來基於前面的背景,我分享一下我目前基於 Spring 實現管道模式的 “最佳套路”(如果你有更好的套路,歡迎賜教,一起討論哦)。

定義管道處理的上下文

/**
 * 傳遞到管道的上下文
 */
@Getter
@Setter
public class PipelineContext {
    /**
     * 處理開始時間
     */
    private LocalDateTime startTime;
    /**
     * 處理結束時間
     */
    private LocalDateTime endTime;
    /**
     * 獲取數據名稱
     */
    public String getName() {
        return this.getClass().getSimpleName();
    }
}

定義上下文處理器

/**
 * 管道中的上下文處理器
 */
public interface ContextHandler<T extends PipelineContext> {
    /**
     * 處理輸入的上下文數據
     *
     * @param context 處理時的上下文數據
     * @return 返回 true 則表示由下一個 ContextHandler 繼續處理,返回 false 則表示處理結束
     */
    boolean handle(T context);
}

爲了方便說明,我們現在先定義出最早版 【提交模型邏輯】 的上下文和相關處理器:

/**
 * 模型實例構建的上下文
 */
@Getter
@Setter
public class InstanceBuildContext extends PipelineContext {
    /**
     * 模型 id
     */
    private Long modelId;
    /**
     * 用戶 id
     */
    private long userId;
    /**
     * 表單輸入
     */
    private Map<String, Object> formInput;
    /**
     * 保存模型實例完成後,記錄下 id
     */
    private Long instanceId;
    /**
     * 模型創建出錯時的錯誤信息
     */
    private String errorMsg;
    // 其他參數
    @Override
    public String getName() {
        return "模型實例構建上下文";
    }
}

處理器 - 輸入數據校驗:

@Component
public class InputDataPreChecker implements ContextHandler<InstanceBuildContext> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--輸入數據校驗--");
        Map<String, Object> formInput = context.getFormInput();
        if (MapUtils.isEmpty(formInput)) {
            context.setErrorMsg("表單輸入數據不能爲空");
            return false;
        }
        String instanceName = (String) formInput.get("instanceName");
        if (StringUtils.isBlank(instanceName)) {
            context.setErrorMsg("表單輸入數據必須包含實例名稱");
            return false;
        }
        return true;
    }
}

處理器 - 根據輸入創建模型實例:

@Component
public class ModelInstanceCreator implements ContextHandler<InstanceBuildContext> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--根據輸入數據創建模型實例--");
        // 假裝創建模型實例
        return true;
    }
}

處理器 - 保存模型實例到相關 DB 表:

@Component
public class ModelInstanceSaver implements ContextHandler<InstanceBuildContext> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--保存模型實例到相關DB表--");
        // 假裝保存模型實例
        return true;
    }
}

到這裏,有個問題就出現了:應該使用什麼樣的方式,將同一種 Context 的 ContextHandler 串聯爲管道呢?思考一下:

  1. 給 ContextHandler 加一個 setNext 方法,每個實現類必須指定其下一個處理器。缺點也很明顯,如果在當前管道中間加入一個新的 ContextHandler,那麼要勢必要修改前一個 ContextHandler 的 setNext 方法;另外,代碼是寫給人閱讀的,這樣做沒法一眼就直觀的知道整個管道的處理鏈路,還要進入到每個相關的 ContextHandler 中去查看才知道。

  2. 給 ContextHandler 加上 @Order 註解,根據 @Order 中給定的數字來確定每個 ContextHandler 的序列,一開始時每個數字間隔的可以大些(比如 10、20、30),後續加入新的 ContextHandler 時,可以指定數字爲 (11、21、31)這種,那麼可以避免上面方案中要修改代碼的問題,但是仍然無法避免要進入每個相關的 ContextHandler 中去查看才能知道管道處理鏈路的問題。

  3. 提前寫好一份路由表,指定好 ”Context -> 管道 “的映射(管道用 List 來表示),以及管道中處理器的順序 。Spring 來根據這份路由表,在啓動時就構建好一個 Map,Map 的鍵爲 Context 的類型,值爲 管道(即 List)。這樣的話,如果想知道每個管道的處理鏈路,直接看這份路由表就行,一目瞭然。缺點嘛,就是每次加入新的 ContextHandler 時,這份路由表也需要在對應管道上進行小改動 —— 但是如果能讓閱讀代碼更清晰,我覺得這樣的修改是值得的、可接受的~

構建管道路由表

基於 Spring 的 Java Bean 配置,我們可以很方便的構建管道的路由表:

/**
 * 管道路由的配置
 */
@Configuration
public class PipelineRouteConfig implements ApplicationContextAware {
    /**
     * 數據類型->管道中處理器類型列表 的路由
     */
    private static final
    Map<Class<? extends PipelineContext>,
        List<Class<? extends ContextHandler<? extends PipelineContext>>>> PIPELINE_ROUTE_MAP = new HashMap<>(4);
    /*
     * 在這裏配置各種上下文類型對應的處理管道:鍵爲上下文類型,值爲處理器類型的列表
     */
    static {
        PIPELINE_ROUTE_MAP.put(InstanceBuildContext.class,
                               Arrays.asList(
                                       InputDataPreChecker.class,
                                       ModelInstanceCreator.class,
                                       ModelInstanceSaver.class
                               ));
        // 將來其他 Context 的管道配置
    }
    /**
     * 在 Spring 啓動時,根據路由表生成對應的管道映射關係
     */
    @Bean("pipelineRouteMap")
    public Map<Class<? extends PipelineContext>, List<? extends ContextHandler<? extends PipelineContext>>> getHandlerPipelineMap() {
        return PIPELINE_ROUTE_MAP.entrySet()
                                 .stream()
                                 .collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));
    }
    /**
     * 根據給定的管道中 ContextHandler 的類型的列表,構建管道
     */
    private List<? extends ContextHandler<? extends PipelineContext>> toPipeline(
            Map.Entry<Class<? extends PipelineContext>, List<Class<? extends ContextHandler<? extends PipelineContext>>>> entry) {
        return entry.getValue()
                    .stream()
                    .map(appContext::getBean)
                    .collect(Collectors.toList());
    }
    private ApplicationContext appContext;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        appContext = applicationContext;
    }
}

定義管道執行器

最後一步,定義管道執行器。管道執行器 根據傳入的上下文數據的類型,找到其對應的管道,然後將上下文數據放入管道中去進行處理。

/**
 * 管道執行器
 */
@Component
public class PipelineExecutor {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * 引用 PipelineRouteConfig 中的 pipelineRouteMap
     */
    @Resource
    private Map<Class<? extends PipelineContext>,
                List<? extends ContextHandler<? super PipelineContext>>> pipelineRouteMap;
    /**
     * 同步處理輸入的上下文數據<br/>
     * 如果處理時上下文數據流通到最後一個處理器且最後一個處理器返回 true,則返回 true,否則返回 false
     *
     * @param context 輸入的上下文數據
     * @return 處理過程中管道是否暢通,暢通返回 true,不暢通返回 false
     */
    public boolean acceptSync(PipelineContext context) {
        Objects.requireNonNull(context, "上下文數據不能爲 null");
        // 拿到數據類型
        Class<? extends PipelineContext> dataType = context.getClass();
        // 獲取數據處理管道
        List<? extends ContextHandler<? super PipelineContext>> pipeline = pipelineRouteMap.get(dataType);
        if (CollectionUtils.isEmpty(pipeline)) {
            logger.error("{} 的管道爲空", dataType.getSimpleName());
            return false;
        }
        // 管道是否暢通
        boolean lastSuccess = true;
        for (ContextHandler<? super PipelineContext> handler : pipeline) {
            try {
                // 當前處理器處理數據,並返回是否繼續向下處理
                lastSuccess = handler.handle(context);
            } catch (Throwable ex) {
                lastSuccess = false;
                logger.error("[{}] 處理異常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
            }
            // 不再向下處理
            if (!lastSuccess) { break; }
        }
        return lastSuccess;
    }
}

使用管道模式

此時,我們可以將最開始的 buildModelInstance 修改爲:

public CommonResponse<Long> buildModelInstance(InstanceBuildRequest request) {
    InstanceBuildContext data = createPipelineData(request);
    boolean success = pipelineExecutor.acceptSync(data);
    // 創建模型實例成功
    if (success) {
        return CommonResponse.success(data.getInstanceId());
    }
    logger.error("創建模式實例失敗:{}", data.getErrorMsg());
    return CommonResponse.failed(data.getErrorMsg());
}

我們模擬一下模型實例的創建過程:

參數正常時:

參數出錯時:

這個時候我們再爲 InstanceBuildContext 加入新的兩個 ContextHandler:FormInputPreprocessor(表單輸入數據預處理) 和 BizSideCustomProcessor(業務方自定義數據處理)。

@Component
public class FormInputPreprocessor implements ContextHandler<InstanceBuildContext> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--表單輸入數據預處理--");
        // 假裝進行表單輸入數據預處理
        return true;
    }
}

此時 buildModelInstance 不需要做任何修改,我們只需要在 “路由表” 裏面,將這兩個 ContextHandler 加入到 InstanceBuildContext 關聯的管道中,Spring 啓動的時候,會自動幫我們構建好每種 Context 對應的管道:

再模擬一下模型實例的創建過程:

異步處理

管道執行器 PipelineExecutor 中,acceptSync 是個同步的方法。

小蜜:看名字你就知道你悄悄埋伏筆了。

對於步驟繁多的任務,很多時候我們更需要的是異步處理,比如某些耗時長的定時任務。管道處理異步化非常的簡單,我們先定義一個線程池,比如:

<!-- 專門用於執行管道任務的線程池 -->
<bean
     >
    <property  /> <!-- 核心線程數 -->
    <property  />  <!-- 最大線程數 -->
    <property  />  <!-- 線程最大空閒時間/秒(根據管道使用情況指定)-->
    <property  />     <!-- 任務隊列大小(根據管道使用情況指定)-->
    <property  />
    <property >
        <bean />
    </property>
</bean>

然後在 PipelineExecutor 中加入異步處理的方法:

通用處理

比如我們想記錄下每次管道處理的時間,以及在處理前和處理後都打印相關的日誌。那麼我們可以提供兩個通用的 ContextHandler,分別放在每個管道的頭和尾:

通用頭、尾處理器可以在路由表裏面放置,但是每次新加一種 PipelineContext 都要加一次,好像沒有必要 —— 我們直接修改下 管道執行器 PipelineExecutor 中的 acceptSync 方法:

@Component
public class PipelineExecutor {
    ......
    @Autowired
    private CommonHeadHandler commonHeadHandler;
    @Autowired
    private CommonTailHandler commonTailHandler;
    public boolean acceptSync(PipelineContext context) {
        ......
        // 【通用頭處理器】處理
        commonHeadHandler.handle(context);
        // 管道是否暢通
        boolean lastSuccess = true;
        for (ContextHandler<? super PipelineContext> handler : pipeline) {
            try {
                // 當前處理器處理數據,並返回是否繼續向下處理
                lastSuccess = handler.handle(context);
            } catch (Throwable ex) {
                lastSuccess = false;
                logger.error("[{}] 處理異常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
            }
            // 不再向下處理
            if (!lastSuccess) { break; }
        }
        // 【通用尾處理器】處理
        commonTailHandler.handle(context);
        return lastSuccess;
    }
}

總結

通過管道模式,我們大幅降低了系統的耦合度和提升了內聚程度與擴展性:

作者 | 周密(之葉)

編輯 | 橙子君

出品 | 阿里巴巴新零售淘系技術

本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s?__biz=MzAxNDEwNjk5OQ==&amp;mid=2650413430&amp;idx=1&amp;sn=32c89ea3222d341bf3014854f69fd239&amp;chksm=8396d16eb4e1587838f99a6859668a3588b0762155c1a13872d3f419efa3f75c5a532d7d38fe&amp;scene=21#wechat_redirect