從實戰到原理,線程池的各類使用場景整合
在日常的開發工作中,我們經常會需要使用到線程池這類型的組件。例如下邊幾種應用場景:
線程池經典應用場景
異步發送郵件通知發送一個任務,然後注入到線程池中異步發送。
心跳請求任務創建一個任務,然後定時發送請求到線程池中。
類似的場景有很多,我們下邊一步一步地來介紹不同的應用場景下,線程池的具體使用案例:
異步發送郵件場景
定義一個簡單的郵件發送接口:
public interface SendEmailService {
/**
* 發送郵件
*
* @param emailDTO 郵件對象
*/
void sendEmail(EmailDTO emailDTO);
}
接着是郵件發送的簡單實現類:
@Service
public class SendEmailServiceImpl implements SendEmailService {
@Resource
private ExecutorService emailTaskPool;
@Override
public void sendEmail(EmailDTO emailDTO) {
emailTaskPool.submit(() -> {
try {
System.out.printf("sending email .... emailDto is %s \n", emailDTO);
Thread.sleep(1000);
System.out.println("sended success");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
郵件的發送邏輯通過一個簡單的線程睡眠來模擬發送過程中的耗時操作。
然後是線程池方面的配置:
@Configuration
public class ThreadPoolConfig {
@Bean
public ExecutorService emailTaskPool() {
return new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task"));
}
}
controller 模塊的觸發
@RestController
@RequestMapping(value = "/test")
public class TestController {
@Resource
private SendEmailService sendEmailService;
@GetMapping(value = "/send-email")
public boolean sendEmail() {
EmailDTO emailDTO = new EmailDTO();
emailDTO.setContent("測試文案");
emailDTO.setReceiver("idea");
emailDTO.setTitle("郵件標題");
sendEmailService.sendEmail(emailDTO);
return true;
}
}
這是一個非常簡單的案例,通過一個 http 請求,然後觸發一個郵件的發送操作。
心跳請求場景
這類應用場景一般會在一些基礎組件中使用到,例如一些具有心跳探活機制類型功能的中間件,如 nacos。下邊來看看對應的代碼實踐:首先是心跳模塊代碼:
public class HeartBeatInfo {
private String info;
private long nextSendTimeDelay;
public String getInfo() {
return info;
}
public void setInfo(String info) {
this.info = info;
}
public long getNextSendTimeDelay() {
return nextSendTimeDelay;
}
public void setNextSendTimeDelay(long nextSendTimeDelay) {
this.nextSendTimeDelay = nextSendTimeDelay;
}
@Override
public String toString() {
return "HeartBeatInfo{" +
"info='" + info + '\'' +
", nextSendTimeDelay=" + nextSendTimeDelay +
'}';
}
}
然後是模擬一個心跳包的發送服務接口定義:
public interface HeartBeatTaskService {
void sendBeatInfo();
}
接下來是心跳任務的發送核心部分實現:
@Service
public class HeartBeatTaskServiceImpl implements HeartBeatTaskService {
@Resource
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
@Override
public void sendBeatInfo() {
HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
heartBeatInfo.setInfo("test-info");
heartBeatInfo.setNextSendTimeDelay(1000);
scheduledThreadPoolExecutor.schedule(new HeartBeatTask(heartBeatInfo),
heartBeatInfo.getNextSendTimeDelay(), TimeUnit.MILLISECONDS);
}
class HeartBeatTask implements Runnable {
private HeartBeatInfo heartBeatInfo;
public HeartBeatTask(HeartBeatInfo heartBeatInfo) {
this.heartBeatInfo = heartBeatInfo;
}
@Override
public void run() {
System.out.println("發送心跳數據包:" + heartBeatInfo.getInfo());
HeartBeatInfo heartBeatInfo = new HeartBeatInfo();
heartBeatInfo.setInfo("test-info");
heartBeatInfo.setNextSendTimeDelay(1000);
scheduledThreadPoolExecutor.schedule(new HeartBeatTask(heartBeatInfo),
heartBeatInfo.getNextSendTimeDelay(), TimeUnit.MILLISECONDS);
}
}
}
在覈心實現的內部有一個延時線程池 ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor 會在放入線程任務的一段指定的時間之後才觸發任務的執行:
@Configuration
public class ThreadPoolConfig {
@Bean
public ScheduledThreadPoolExecutor scheduledThreadPoolExecutor(){
return new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("org.idea.threadpool.beat.sender");
return thread;
}
});
}
}
JDK 內部線程池的設計
看了上邊兩個簡單的案例之後,不知道你是否會有好奇:
到底線程池的內部運行機制會是怎樣的呢?
簡單手寫一個單消費者任務處理模型
這裏我們可以通過一段簡單的代碼來學習這部分的內容:首先,我們將需要處理的任務封裝在一個對象內部,暫時定義如下所示:
public class AsyncHandlerData {
private String dataInfo;
public String getDataInfo() {
return dataInfo;
}
public void setDataInfo(String dataInfo) {
this.dataInfo = dataInfo;
}
@Override
public String toString() {
return "AsyncHandlerData{" +
"dataInfo='" + dataInfo + '\'' +
'}';
}
}
然後會有一個專門消費這些個任務的 service:
public interface AsyncHandlerService {
/**
* 任務放入隊列中
*
* @param asyncHandlerData
*/
void putTask(AsyncHandlerData asyncHandlerData);
}
最後根據提前定義好的接口編寫一個實現類,此時將相關的任務處理邏輯規整到了一個對象當中:
@Service
public class AsyncHandlerServiceImpl implements AsyncHandlerService, CommandLineRunner {
private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler();
@Override
public void putTask(AsyncHandlerData asyncHandlerData) {
taskQueueHandler.addTask(asyncHandlerData);
}
@Override
public void run(String... args) throws Exception {
Thread thread = new Thread(taskQueueHandler);
thread.setDaemon(true);
thread.start();
}
public class TaskQueueHandler implements Runnable {
private BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(1024 * 1024);
public void addTask(AsyncHandlerData asyncHandlerData) {
tasks.offer(asyncHandlerData);
}
@Override
public void run() {
for (; ; ) {
try {
AsyncHandlerData asyncHandlerData = tasks.take();
System.out.println("異步處理任務數據:" + asyncHandlerData.getDataInfo());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
整個代碼的思路邏輯比較簡單,大致可以歸整成下圖所示:
整體的設計模式就是一端放入,由單個消費者取出。但是存在一個不足點,一旦消費者能力較弱,或者出現任務堵塞的話,就會導致任務隊列出現堆積,然後越堆積越難處理地過來。
但是這樣的設計還是一個過於簡單的模型,下邊我們來看看 jdk 內部線程池的設計模式:
線程池內部的源代碼分析
我們在項目裏使用線程池的時候,通常都會先創建一個具體實現 Bean 來定義線程池,例如:
@Bean
public ExecutorService emailTaskPool() {
return new ThreadPoolExecutor(2, 4,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new SysThreadFactory("email-task"));
}
ThreadPoolExecutor 的父類是 AbstractExecutorService,然後 AbstractExecutorService 的頂層接口是:ExecutorService。
就例如發送郵件接口而言,當線程池觸發了 submit 函數的時候,實際上會調用到父類 AbstractExecutorService 對象的java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)
方法,然後進入到ThreadPoolExecutor#execute
部分。
@Override
public void sendEmail(EmailDTO emailDTO) {
emailTaskPool.submit(() -> {
try {
System.out.printf("sending email .... emailDto is %s \n", emailDTO);
Thread.sleep(1000);
System.out.println("sended success");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)
源代碼位置:
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
這裏面你會看到返回的是一個 future 對象供調用方判斷線程池內部的函數到底是否有完全執行成功。因此如果有時候如果需要判斷線程池執行任務的結果話,可以這樣操作:
Future future = emailTaskPool.submit(() -> {
try {
System.out.printf("sending email .... emailDto is %s \n", emailDTO);
Thread.sleep(1000);
System.out.println("sended success");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//todo something
future.get();
}
在 jdk8 源代碼中,提交任務的執行邏輯部分如下所示:新增線程任務的時候代碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//工作線程數小於核心線程的時候,可以填寫worker線程
if (workerCountOf(c) < corePoolSize) {
//新增工作線程的時候會加鎖
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果線程池的狀態正常,切任務放入就緒隊列正常
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果當前線程池處於關閉狀態,則拋出拒絕異常
reject(command);
//如果工作線程數超過了核心線程數,那麼就需要考慮新增工作線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果新增的工作線程已經達到了最大線程數限制的條件下,需要觸發拒絕策略的拋出
else if (!addWorker(command, false))
reject(command);
}
通過深入閱讀工作線程主要存放在了一個 hashset 集合當中, 添加工作線程部分的邏輯代碼如下所示:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//確保當前線程池沒有進入到一個銷燬狀態中
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 如果傳入的core屬性是false,則這裏需要比對maximumPoolSize參數
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通過cas操作去增加線程池的工作線程數畝
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//真正需要指定的任務是firstTask,它會被注入到worker對象當中
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加入了鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers是一個hashset集合,會往裏面新增工作線程
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//worker本身是一個線程,但是worker對象內部還有一個線程的參數,
//這個t纔是真正的任務內容
t.start();
workerStarted = true;
}
}
} finally {
//如果worker線程創建好了,但是內部的真正任務還沒有啓動,此時突然整個
//線程池的狀態被關閉了,那麼這時候workerStarted就會爲false,然後將
//工作線程的數目做自減調整。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
進過理解之後,整體執行的邏輯以及先後順序如下圖所示:
首先判斷線程池內部的現場是否都有任務需要執行。如果不是,則使用一個空閒的工作線程用於任務執行。否則會判斷當前的堵塞隊列是否已經滿了,如果沒有滿則往隊列裏面投遞任務,等待工作線程去處理。
如果堵塞隊列已經滿了,此時會判斷工作線程數是否大於最大線程數,如果沒有,則繼續創建工作線程,如果已經達到則根據飽和策略去判斷是果斷拋出異常還是其他方式來進行處理。
線程池常用參數介紹
corePoolSize 核心線程數,當往線程池內部提交任務的時候,線程池會創建一個線程來執行任務。即使此時有空閒的工作線程能夠處理當前任務,只要總的工作線程數小於 corePoolSize,也會創建新的工作線程。
maximumPoolSize 當任務的堵塞隊列滿了之後,如果還有新的任務提交到線程池內部,此時倘若工作線程數小於 maximumPoolSize,則會創建新的工作線程。
keepAliveTime 上邊我們說到了工作線程 Worker(java.util.concurrent.ThreadPoolExecutor.Worker),當工作線程處於空閒狀態中,如果超過了 keepAliveTime 依然沒有任務,那麼就會銷燬當前工作線程。如果工作線程需要一直處於執行任務,每個任務的連續間隔都比較短,那麼這個 keepAliveTime 屬性可以適當地調整大一些。
unitkeepAliveTime 對應的時間單位
workQueue 工作隊列,當工作線程數達到了核心線程數,那麼此時新來的線程就會被放入到工作隊列中。線程池內部的工作隊列全部都是繼承自阻塞隊列的接口,對於常用的阻塞隊列類型爲:
-
ArrayBlockingQueue
-
LinkedBlockingQueue
-
SynchronousQueue
-
PriorityBlockingQueue
RejectedExecutionHandlerJDK 內部的線程拒絕策略包含了多種許多種,這裏我羅列一些常見的拒絕策略給大家認識下:
-
AbortPolicy 直接拋出異常
-
CallerRunsPolicy 任務的執行由注入的線程自己執行
-
DiscardOldestPolicy 直接拋棄掉堵塞隊列中隊列頭部的任務,然後執行嘗試將當前任務提交到堵塞隊列中。
-
DiscardPolicy 直接拋棄這個任務
從線程池設計中的一些啓發
多消費隊列的設計場景應用:業務上游提交任務,然後任務被放進一個堵塞隊列中,接下來消費者需要從堵塞隊列中提取元素, 並且將它們轉發到多個子隊列中,各個子隊列分別交給不同的子消費者處理數據。例如下圖所示:
public interface AsyncHandlerService {
/**
* 任務放入隊列中
*
* @param asyncHandlerData
*/
boolean putTask(AsyncHandlerData asyncHandlerData);
/**
* 啓動消費
*/
void startJob();
}
多消費者分發處理實現類:
@Component("asyncMultiConsumerHandlerHandler")
public class AsyncMultiConsumerHandlerHandler implements AsyncHandlerService{
private volatile TaskQueueHandler taskQueueHandler = new TaskQueueHandler(10);
@Override
public boolean putTask(AsyncHandlerData asyncHandlerData) {
return taskQueueHandler.addTask(asyncHandlerData);
}
@Override
public void startJob(){
Thread thread = new Thread(taskQueueHandler);
thread.setDaemon(true);
thread.start();
}
/**
* 將任務分發給各個子隊列去處理
*/
static class TaskQueueHandler implements Runnable {
private static BlockingQueue<AsyncHandlerData> tasks = new ArrayBlockingQueue<>(11);
public static BlockingQueue<AsyncHandlerData> getAllTaskInfo() {
return tasks;
}
private TaskDispatcherHandler[] taskDispatcherHandlers;
private int childConsumerSize = 0;
public TaskQueueHandler(int childConsumerSize) {
this.childConsumerSize = childConsumerSize;
taskDispatcherHandlers = new TaskDispatcherHandler[childConsumerSize];
for (int i = 0; i < taskDispatcherHandlers.length; i++) {
taskDispatcherHandlers[i] = new TaskDispatcherHandler(new ArrayBlockingQueue<>(100), "child-worker-" + i);
Thread thread = new Thread(taskDispatcherHandlers[i]);
thread.setDaemon(false);
thread.setName("taskQueueHandler-child-"+i);
thread.start();
}
}
public boolean addTask(AsyncHandlerData asyncHandlerData) {
return tasks.offer(asyncHandlerData);
}
@Override
public void run() {
int index = 0;
for (; ; ) {
try {
AsyncHandlerData asyncHandlerData = tasks.take();
index = (index == taskDispatcherHandlers.length) ? 0 : index;
taskDispatcherHandlers[index].addAsyncHandlerData(asyncHandlerData);
index++;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class TaskDispatcherHandler implements Runnable {
private BlockingQueue<AsyncHandlerData> subTaskQueue;
private String childName;
private AtomicLong taskCount = new AtomicLong(0);
public TaskDispatcherHandler(BlockingQueue<AsyncHandlerData> blockingQueue, String childName) {
this.subTaskQueue = blockingQueue;
this.childName = childName;
}
public void addAsyncHandlerData(AsyncHandlerData asyncHandlerData) {
subTaskQueue.add(asyncHandlerData);
}
@Override
public void run() {
for (; ; ) {
try {
AsyncHandlerData asyncHandlerData = subTaskQueue.take();
long count = taskCount.incrementAndGet();
System.out.println("【" + childName + "】子任務隊列處理:" + asyncHandlerData.getDataInfo() + count);
Thread.sleep(3000);
System.out.println("【" + childName + "】子任務隊列處理:" + asyncHandlerData.getDataInfo()+" 任務處理結束" + count);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
測試接口:
@GetMapping(value = "/send-async-data")
public boolean sendAsyncData(){
AsyncHandlerData asyncHandlerData = new AsyncHandlerData();
asyncHandlerData.setDataInfo("data info");
boolean status = asyncMultiConsumerHandlerHandler.putTask(asyncHandlerData);
if(!status){
throw new RuntimeException("insert fail");
}
return status;
}
這種設計模型適合用於對於請求吞吐量要求較高,每個請求都比較耗時的場景中。
自定義拒絕策略的應用根據具體的應用場景,通過實現java.util.concurrent.RejectedExecutionHandler
接口,自定義拒絕策略,例如對於當拋出拒絕異常的時候,往數據庫中記錄一些信息或者日誌。
相關案例代碼:
public class MyRejectPolicy{
static class MyTask implements Runnable{
@Override
public void run() {
System.out.println("this is test");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS
, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("任務被拒絕:" + r.toString());
//記錄一些信息
}
});
for(int i=0;i<100;i++){
Thread thread = new Thread(new MyTask());
threadPoolExecutor.execute(thread);
}
Thread.yield();
}
}
統計線程池的詳細信息
通過閱讀線程池的源代碼之後,可以藉助重寫 beforeExecute、afterExecute、terminated 方法去對線程池的每個線程耗時做統計。以及通過繼承 ThreadPoolExecutor 對象之後,對當前線程池的 coreSIze、maxiMumSize 等等屬性進行監控。
相關案例代碼:
public class SysThreadPool extends ThreadPoolExecutor {
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
private Logger logger = LoggerFactory.getLogger(SysThreadPool.class);
public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public SysThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTime.set(System.currentTimeMillis());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
long endTime = System.currentTimeMillis();
long executeTime = endTime - startTime.get();
logger.info("Thread {}: ExecuteTime {}", r, executeTime);
}
@Override
public void shutdown() {
super.shutdown();
}
@Override
public void execute(Runnable command) {
super.execute(command);
}
public void getTaskInfo(){
logger.info("coreSize: {}, maxSize: {}, activeCount:{},blockQueueSize:{}",super.getCorePoolSize(),super.getMaximumPoolSize(),super.getActiveCount(),super.getQueue().size());
}
static class MyTestTask implements Runnable{
@Override
public void run() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
SysThreadPool sysThreadPool = new SysThreadPool(2,5,5000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(2));
sysThreadPool.getTaskInfo();
System.out.println("------------");
for(int i=0;i<10;i++){
Thread thread = new Thread(new MyTestTask());
sysThreadPool.submit(thread);
sysThreadPool.getTaskInfo();
}
System.out.println("------------");
Thread.sleep(3000);
}
}
通過日誌打印記錄線程池的參數變化:
通過這份案例代碼不妨可以設想下通過一些定時上報邏輯來實現線程池的監控功能。
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/FvDWNSl-7LO4ZNyNYYiDJA