xxl-job 驚豔的設計,怎能叫人不愛


通信底層介紹

xxl-job 使用 netty http 的方式進行通信,雖然也支持 Mina,jetty,netty tcp 等方式,但是代碼裏面固定寫死的是 netty http。

通信整體流程

我以調度器通知執行器執行任務爲例,繪製的活動圖:

活動圖

驚豔的設計

看完了整個處理流程代碼,設計上可以說獨具匠心,將 netty,多線程的知識運用得行雲流水。

我現在就將這些設計上出彩的點總結如下:

| 使用動態代理模式,隱藏通信細節

xxl-job 定義了兩個接口 ExecutorBiz,AdminBiz,ExecutorBiz 接口中封裝了向心跳,暫停,觸發執行等操作,AdminBiz 封裝了回調,註冊,取消註冊操作,接口的實現類中,並沒有通信相關的處理。

XxlRpcReferenceBean 類的 getObject() 方法會生成一個代理類,這個代理類會進行遠程通信。

| 全異步處理

執行器收到消息進行反序列化,並沒有同步執行任務代碼,而是將任務信息存儲在 LinkedBlockingQueue 中,異步線程從這個隊列中獲取任務信息,然後執行。

而任務的處理結果,也不是說處理完之後,同步返回的,也是放到回調線程的阻塞隊列中,異步的將處理結果返回回去。

這樣處理的好處就是減少了 netty 工作線程的處理時間,提升了吞吐量。

| 對異步處理的包裝

對異步處理進行了包裝,代碼看起來是同步調用的。

我們看下調度器,XxlJobTrigger 類觸發任務執行的代碼:

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        //這裏面做了很多異步處理,最終同步得到處理結果
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

ExecutorBiz.run 方法我們說過了,是走的動態代理,和執行器進行通信,執行器執行結果也是異步處理完,才返回的,而這裏看到的 run 方法是同步等待處理結果返回。

我們看下 xxl-job 是如何同步獲取處理結果的:調度器向執行器發出消息後,該線程阻塞。等到執行器處理完畢後,將處理結果返回,喚醒被阻塞的線程,調用處拿到返回值。

動態代理代碼如下:

//代理類中的觸發調用
if (CallType.SYNC == callType) {
   // future-response set
   XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
   try {
      // do invoke
      client.asyncSend(finalAddress, xxlRpcRequest);

      // future get
      XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
      if (xxlRpcResponse.getErrorMsg() != null) {
         throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
      }
      return xxlRpcResponse.getResult();
   } catch (Exception e) {
      logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

      throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
   } finally{
      // future-response remove
      futureResponse.removeInvokerFuture();
   }
}

XxlRpcFutureResponse 類中實現了線程的等待,和線程喚醒的處理:

//返回結果,喚醒線程
public void setResponse(XxlRpcResponse response) {
   this.response = response;
   synchronized (lock) {
      done = true;
      lock.notifyAll();
   }
}

@Override
    public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (!done) {
            synchronized (lock) {
                try {
                    if (timeout < 0) {
            //線程阻塞
                        lock.wait();
                    } else {
                        long timeoutMillis = (TimeUnit.MILLISECONDS==unit)?timeout:TimeUnit.MILLISECONDS.convert(timeout , unit);
                        lock.wait(timeoutMillis);
                    }
                } catch (InterruptedException e) {
                    throw e;
                }
            }
        }

        if (!done) {
            throw new XxlRpcException("xxl-rpc, request timeout at:"+ System.currentTimeMillis() +", request:" + request.toString());
        }
        return response;
    }

有的同學可能會問了,調度器接收到返回結果,怎麼確定喚醒哪個線程呢?

每一次遠程調用,都會生成 uuid 的請求 id,這個 id 是在整個調用過程中一直傳遞的,就像一把鑰匙,在你回家的的時候,拿着它就帶開門。

這裏拿着請求 id 這把鑰匙,就能找到對應的 XxlRpcFutureResponse,然後調用 setResponse 方法,設置返回值,喚醒線程。

public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){

    // 通過requestId找到XxlRpcFutureResponse,
    final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
    if (futureResponse == null) {
        return;
    }
    if (futureResponse.getInvokeCallback()!=null) {

        // callback type
        try {
            executeResponseCallback(new Runnable() {
                @Override
                public void run() {
                    if (xxlRpcResponse.getErrorMsg() != null) {
                        futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                    } else {
                        futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                    }
                }
            });
        }catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    } else {
        // 裏面調用lock的notify方法
        futureResponse.setResponse(xxlRpcResponse);
    }

    // do remove
    futureResponsePool.remove(requestId);

}
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源https://mp.weixin.qq.com/s/o7ZqkqNYo8uJy_uxPy7vKw