DUBBO 消費異步化實例與原理
1 文章概述
我們在服務端開發時如果需要實現異步調用,首先聲明一個線程池,並將調用業務方法封裝成一個任務提交至線程池,如果不需要獲取返回值則封裝爲 Runnable,需要獲取返回值則封裝爲 Callable 並通過 Future 對象接受結果。
class CalcTask1 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("task1耗時計算");
Thread.sleep(1000L);
return 100;
}
}
class CalcTask2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("task2耗時計算");
Thread.sleep(3000L);
return 200;
}
}
public class CallableTest {
public static void test1() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
CalcTask1 task1 = new CalcTask1();
Future<Integer> f1 = executor.submit(task1);
CalcTask2 task2 = new CalcTask2();
Future<Integer> f2 = executor.submit(task2);
Integer result1 = f1.get();
Integer result2 = f2.get();
System.out.println("final result=" + (result1 + result2));
executor.shutdown();
}
public static void test2() throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
CalcTask1 task1 = new CalcTask1();
CalcTask2 task2 = new CalcTask2();
tasks.add(task1);
tasks.add(task2);
for (int i = 0; i < tasks.size(); i++) {
Future<Integer> future = executor.submit(tasks.get(i));
System.out.println("result=" + future.get());
}
executor.shutdown();
}
}
1.1 什麼是消費異步化
在使用 DUBBO 進行異步化調用時不需要這麼麻煩,DUBBO 基於 NIO 非阻塞能力使得服務消費者無需啓用多線程就可以實現並行調用多個服務,在此我們給出基於 2.7.0 版本調用實例。
1.1.1 生產者
(1) 服務聲明
public interface CalcSumService {
public Integer sum(int a, int b);
}
public class CalcSumServiceImpl implements CalcSumService {
@Override
public Integer sum(int a, int b) {
return a + b;
}
}
public interface CalcSubtractionService {
public Integer subtraction(int a, int b);
}
public class CalcSubtractionServiceImpl implements CalcSubtractionService {
@Override
public Integer subtraction(int a, int b) {
return a - b;
}
}
(2) 配置文件
<beans>
<dubbo:application />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:protocol />
<bean id="calcSumService" class="com.java.front.dubbo.demo.provider.service.CalcSumServiceImpl" />
<bean id="calcSubtractionService" class="com.java.front.dubbo.demo.provider.service.CalcSubtractionServiceImpl" />
<dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSumService" ref="calcSumService" />
<dubbo:service interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" ref="calcSubtractionService" />
</beans>
(3) 服務發佈
public class Provider {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath*:META-INF/spring/dubbo-provider.xml");
context.start();
System.out.println(context);
System.in.read();
}
}
1.1.2 消費者
(1) 配置文件
<beans>
<dubbo:application />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:reference id="calcSumService" interface="com.java.front.dubbo.demo.provider.service.CalcSumService" timeout="10000">
<dubbo:method />
</dubbo:reference>
<dubbo:reference id="calcSubtractionService" interface="com.java.front.dubbo.demo.provider.service.CalcSubtractionService" timeout="10000">
<dubbo:method />
</dubbo:reference>
</beans>
(2) 服務消費
public class Consumer {
public static void main(String[] args) throws Exception {
testAsync();
System.in.read();
}
public static void testAsync() {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/** 加法運算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();
/** 減法運算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();
/** 輸出結果 **/
int sumResult = futureSum.get();
int subtractionResult = futureSubtraction.get();
System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.2 爲什麼消費異步化
異步化可以將原本串行的調用並行化,減少執行時間從而提升性能。假設上述實例加法服務需要 100ms,減法服務需要 200ms,那麼串行化執行時間爲二者之和 300ms:
如果消費異步化那麼執行時間減少爲二者最大值 200ms,異步化所帶來的性能提升不言而喻:
2 保護性暫停模式
分析 DUBBO 源碼之前我們首先介紹一種多線程設計模式:保護性暫停模式。我們設想這樣一種場景:線程 A 生產數據,線程 B 讀取這個數據。我們必須面對一種情況:線程 B 準備讀取數據時,此時線程 A 還沒有生產出數據。在這種情況下線程 B 不能一直空轉,也不能立即退出,線程 B 要等到生產數據完成並拿到數據之後才退出。
那麼在數據沒有生產出這段時間,線程 B 需要執行一種等待機制,這樣可以達到對系統保護目的,這就是保護性暫停。
public class MyData implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
public MyData(String message) {
this.message = message;
}
}
class Resource {
private MyData data;
private Object lock = new Object();
public MyData getData() {
synchronized (lock) {
while (data == null) {
try {
// 沒有數據則釋放鎖並暫停等待被喚醒
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
// 生產數據後喚醒消費線程
this.data = data;
lock.notifyAll();
}
}
}
public class ProtectDesignTest {
public static void main(String[] args) {
Resource resource = new Resource();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生產數據=" + data);
// 模擬發送耗時
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData();
System.out.println(Thread.currentThread().getName() + "接收到數據=" + data);
}, "t2").start();
}
}
在上述代碼實例中線程 1 生產數據,線程 2 消費數據,Resource 類通過 wait/notify 實現了保護性暫停模式,關於保護性暫停模式請參看我之前《保護性暫停模式詳解以及其在 DUBBO 應用源碼分析》這篇文章。
3 源碼分析
本章節我們分析對比 2.6.9 和 2.7.0 兩個版本源碼,之所以選取這兩個版本是因爲 2.7.0 是一個里程碑版本,異步化能力得到了明顯增強。
3.1 version_2.6.9
3.1.1 異步調用
我們首先看看這個版本異步調用使用方式,生產者內容和消費者配置文件同第一章節不再贅述,我們重點分析服務消費代碼。
public class AsyncConsumer {
public static void main(String[] args) throws Exception {
test1();
System.in.read();
}
public static void test1() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/** 加法運算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
Future<Integer> futureSum = RpcContext.getContext().getFuture();
/** 減法運算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
Future<Integer> futureSubtraction = RpcContext.getContext().getFuture();
/** 輸出結果 **/
int sumResult = futureSum.get();
int subtractionResult = futureSubtraction.get();
System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
}
}
消費者最終執行 DubboInvoker.doInvoke,這個方法包含異步調用核心:
public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 單向調用
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
}
// 異步調用
else if (isAsync) {
// 發起請求給生產者
ResponseFuture future = currentClient.request(inv, timeout);
// 設置future對象至上下文
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
// 返回空結果
return new RpcResult();
}
// 同步調用
else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
如果包含 async 屬性則表示異步調用,第一步發送調用請求給生產者,第二步設置 Future 對象至上下文,第三步立即返回空結果。那麼在服務消費時關鍵一步就是獲取 Future 對象,所以我們在調用時要從上下文獲取 Future 對象:
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
Future<Integer> futureSum = RpcContext.getContext().getFuture();
使用 Future 對象獲取結果:
int sumResult = futureSum.get();
進入 FutureAdapter.get() 方法:
public class FutureAdapter<V> implements Future<V> {
private final ResponseFuture future;
public V get() throws InterruptedException, ExecutionException {
try {
return (V) (((Result) future.get()).recreate());
} catch (RemotingException e) {
throw new ExecutionException(e.getMessage(), e);
} catch (Throwable e) {
throw new RpcException(e);
}
}
}
進入 ResponseFuture.get() 方法,我們可以看到保護性暫停模式應用,當生產者線程沒有返回數據則阻塞並等待被喚醒:
public class DefaultFuture implements ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
@Override
public Object get() throws RemotingException {
return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 遠程調用未完成則等待被喚醒
done.await(timeout, TimeUnit.MILLISECONDS);
// 超時時間未完成則退出
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 拋出超時異常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
}
當消費者接收到生產者響應時會調用 received 方法喚醒相關阻塞線程,這時阻塞在 get 方法中的線程即可獲取到數據:
public class DefaultFuture implements ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static void received(Channel channel, Response response) {
try {
// 根據唯一請求號獲取Future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
// 喚醒相關阻塞線程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
3.1.2 設置回調函數
我們現在調用 get 方法會阻塞在那裏等到結果,那麼有沒有一種方式當結果返回時就立即調用我們設置的回調函數?答案是有。
public class AsyncConsumer {
public static void main(String[] args) throws Exception {
test2();
System.in.read();
}
public static void test2() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/** 加法運算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
/** 執行回調函數 **/
((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {
@Override
public void done(Object response) {
System.out.println("sumResult=" + response);
}
@Override
public void caught(Throwable exception) {
exception.printStackTrace();
}
});
/** 減法運算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
/** 執行回調函數 **/
((FutureAdapter<Object>) RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback() {
@Override
public void done(Object response) {
System.out.println("subtractionResult=" + response);
}
@Override
public void caught(Throwable exception) {
exception.printStackTrace();
}
});
}
}
DefaultFuture 可以設置 callback 回調函數,當結果返回時如果回調函數不爲空則執行:
public class DefaultFuture implements ResponseFuture {
private volatile ResponseCallback callback;
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
// 執行回調函數
invokeCallback(callback);
}
}
private void invokeCallback(ResponseCallback c) {
ResponseCallback callbackCopy = c;
if (callbackCopy == null) {
throw new NullPointerException("callback cannot be null.");
}
c = null;
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());
}
if (res.getStatus() == Response.OK) {
try {
// 執行成功回調
callbackCopy.done(res.getResult());
} catch (Exception e) {
logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e);
}
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
try {
TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
// 發生超時回調
callbackCopy.caught(te);
} catch (Exception e) {
logger.error("callback invoke error ,url:" + channel.getUrl(), e);
}
} else {
try {
RuntimeException re = new RuntimeException(res.getErrorMessage());
callbackCopy.caught(re);
} catch (Exception e) {
logger.error("callback invoke error ,url:" + channel.getUrl(), e);
}
}
}
}
3.2 version_2.7.0
CompletableFuture 在這個版本中被引入實現異步調用,可以使用此類強大的異步編程 API 增強異步能力,我們首先回顧 1.1.2 章節實例:
public class Consumer {
public static void testAsync() {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer.xml" });
System.out.println(context);
context.start();
/** 加法運算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();
/** 減法運算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();
/** 輸出結果 **/
int sumResult = futureSum.get();
int subtractionResult = futureSubtraction.get();
System.out.println("sumResult=" + sumResult + ",subtractionResult=" + subtractionResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}
在上述消費者代碼的實例中我們只是應用了 CompletableFuture.get() 方法,並沒有發揮其強大功能。我們對上述實例稍加改造,兩個 CompletionStage 任務都執行完成後,兩個任務結果會一起交給 thenCombine 進行處理:
public class Consumer {
public static void testAsync() {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/spring/dubbo-consumer2.xml" });
System.out.println(context);
context.start();
/** 加法運算 **/
CalcSumService calcSumService = (CalcSumService) context.getBean("calcSumService");
calcSumService.sum(3, 2);
CompletableFuture<Integer> futureSum = RpcContext.getContext().getCompletableFuture();
/** 減法運算 **/
CalcSubtractionService calcSubtractionService = (CalcSubtractionService) context.getBean("calcSubtractionService");
calcSubtractionService.subtraction(3, 2);
CompletableFuture<Integer> futureSubtraction = RpcContext.getContext().getCompletableFuture();
/** 乘法運算 **/
CompletableFuture<Integer> multiplyResult = futureSum.thenCombine(futureSubtraction, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t, Integer u) {
return (t * u);
}
});
System.out.println("multiplyResult=" + multiplyResult.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
DubboInvoker 代碼有所變化:
public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 是否爲異步調用
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 是否爲future異步方式
boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
// 是否需要響應結果
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 超時時間
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 單向調用
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
}
// 異步請求
else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
RpcContext.getContext().setFuture(futureAdapter);
Result result;
if (isAsyncFuture) {
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return result;
}
// 同步請求
else {
RpcContext.getContext().setFuture(null);
Result result = (Result) currentClient.request(inv, timeout).get();
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
我們看到與 2.6.9 版本相同的是 FutureAdapter 同樣會被設置到上下文,但是 FutureAdapter 本身已經發生了變化:
public class FutureAdapter<V> extends CompletableFuture<V> {
private final ResponseFuture future;
private CompletableFuture<Result> resultFuture;
public FutureAdapter(ResponseFuture future) {
this.future = future;
this.resultFuture = new CompletableFuture<>();
// 設置回調函數至DefaultFuture
future.setCallback(new ResponseCallback() {
// 設置響應結果至CompletableFuture
@Override
public void done(Object response) {
Result result = (Result) response;
FutureAdapter.this.resultFuture.complete(result);
V value = null;
try {
value = (V) result.recreate();
} catch (Throwable t) {
FutureAdapter.this.completeExceptionally(t);
}
FutureAdapter.this.complete(value);
}
// 設置異常結果至FutureAdapter
@Override
public void caught(Throwable exception) {
FutureAdapter.this.completeExceptionally(exception);
}
});
}
public ResponseFuture getFuture() {
return future;
}
public CompletableFuture<Result> getResultFuture() {
return resultFuture;
}
}
我們在服務消費時通過 getResultFuture 方法獲取 CompletableFuture,這個對象值在回調時被設置,回調時機同樣在 DefaultFuture.doReceived 方法裏面:
public class DefaultFuture implements ResponseFuture {
private volatile ResponseCallback callback;
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
// 執行回調函數代碼同version_2.6.9
invokeCallback(callback);
}
}
}
4 文章總結
本文第一介紹了 DUBBO 消費異步化是什麼,以及異步化爲什麼會帶來性能提升。第二介紹了保護性暫停模式,這是實現異步化的基礎。最後我們閱讀了兩個不同版本異步化源碼,瞭解了 DUBBO 異步化演進過程,希望本文對大家有所幫助。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/x0lvGa4S1qAi4uGXB1W4Yg