前言
Thread Pool 的概念和使用 Database 的 Connection Pool 是很類似的概念
就像 Connection Pool 的使用方法是去 Pool 裡面取得一個 Connection 使用
使用完之後就關閉此 Connection,並把這個 Connection 丟回 Pool 之中讓其他程式使用
Thread Pool 也是這種概念,但在 JDK 1.5 之前的版本中是沒有一個管控的方式
幾乎都是用 new Thread 的方式去創建使用
在 JDK 1.5 之後的版本則是出了 Exectuor 去管控 Thread Pool
ThreadPoolExecutor 介紹
Java 提供了 ThreadPoolExecutor 能讓我們客製化定義不同的使用模式
以下為 ThreadPoolExecutor 的設定即使用方法
以及取用 Queue Size 以及 Thread Name 的方式
1 | ThreadPoolExecutor executor = new ThreadPoolExecutor( |
- corePoolSize
核心 Thread 的數量,基本上 Thread 數量不會低於此數字 - maxPoolSize
Thread Pool 的最大數量,如果所有 Thread 都被執行的話
Task 會被塞到 Queue 之中等到有空閒的 Thread 為止
決定 maxPoolSize 的數量最好是根據系統資源去計算出來
Runtime.getRuntime().availableProcessors();
- keeyAliveTime
當閒置時間超過此設定的時間的話,系統會開始回收 corePoolSize 以上多餘的 Thread - unit
keepAliveTime 的時間單位,可以使用TimeUnit.SECONDS
- workQueue
決定當所有 Thread 都被執行時,Task 在 Queue 之中會以何種形式等待 - handler
Queue 已滿且 Thread 已達到 maxPoolSize 之後會以什麼樣的方式處理新的 Task
BlockingQueue 詳細介紹
基本規則為
- 如果當前的 Thread 小於 corePoolSize,則 Executor 首先會新增 Thread,而不會把 Task 丟到 Queue 之中 (基本上就是直接運行的意思)
- 如果當前的 Thread 大於等於 corePoolSize,則 Executor 首先會把 Task 加到 Queue 之中等待
- 當 Task 無法再被加入到 Queue 之中的話,則 Executor 首先會創建新的 Thread,直到超過 maxPoolSize 為止
- 超過 maxPoolSize 時,任務會被拒絕
BlockingQueue 有三種類型
直接提交
代表類型: synchronousQueue
基本上就 Queue Size 就是 0
會直接把 Task 提交給 Thread,如果不存在可用 Thread,則新建一個
如果此類型有設置 maxPoolSize 的話,是有可會拒絕新的 Task
所以通常使這種類型,會建議 maxPoolSize 不要做上限設定
無界隊列 (Unbounded Queue)
代表類型: LinkedBlockingQueue
Queue 的大小是無限制的
特別注意的是因為大小是無限制,所以萬一 Task 執行時間過長
會導致有大量個 Task 卡在 Queue 之中動彈不得,進而導致 OOM 的發生Executors.newFixedThreadPool
採用的就是此種類型的 Queue
有界隊列 (Bounded Queue)
代表類型: ArrayBlockingQueue
Queue 的大小是有限制的
但要注意的點是,這個 Queue 大小必須和 Thread Pool 相互搭配才可以發揮出比較好的效能
使用大的 Queue Size 和小的 Thread Pool Size
雖然可以有效降低 CPU 使用率,但會降低 QPS
而使用小的 Queue Size 和大的 Thread Pool Size
雖然可以提昇 QPS,但會降低 CPU
Queue 飽和 RejectExecutionHandle 介紹
再來要介紹當 Queue 飽和之後,可以根據不同 handle 做出不一樣的行為
以下總計有四種使用方式
終止策略 (AbortPolicy)
此為預設 Policy
使用該 Policy,飽和時會拋出 RejectedExecutionException
調用者可以用以下自行定義方式處理異常
1 | executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { |
拋棄策略 (DiscardPolicy)
不做任何處理直接拋棄
拋棄舊任務策略 (DiscardOldestPolicy)
把 Queue 之中最頭的元素拋棄,並在嘗試重新提交 Task
調用者運行策略 (CallerRunsPolicy)
簡單來說,飽和後會直接由調用 Thread Pool 的主 Thread 自己來執行這個 Task
但在這個期間,主 Thread 就無法再度提交 Task
從而讓 Thread Pool 有時間把正在處理的 Task 給完成
創建 Thread Pool 的四個常用方法
這四個常用的方法都是透過 ThreadPoolExecutor 的不同參數所實作而成的
public static ExecutorService newFixedThreadPool(int nThreads)
創建固定數量的 Thead,提交 Task 的時候如果未達nThreads
的數量的話,則會一直新建 Thread
達到nThreads
時,之後的 Task 則會進入到佇列中1
2
3
4
5public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}public static ExecutorService newCachedThreadPool()
Thread 的數量預設上限為 2^31 - 1,如果當 Thread 大於 Tasks 數量的時候
就會開始去回收那些等了超過 60 秒還沒有 Task 進來的 Thread
問題是,這個newCachedThreadPool
是屬於動態新建所以萬一 Task 一直大於 Thread 數量的話則會一直新建
這樣很容易耗光機器資源,使用這個最好的狀況是 Task 的執行時間是短的才比較適合1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}public static ExecutorService newSingleThreadExecutor()
創建一個 Single Thread,因為此 Thread 被使用的話其他都會是在佇列中等待,所以效能會下降1
2
3
4public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
支持定時以及週期性執行 Task 的需求1
2
3public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}看看 Parent Class
1
2
3
4public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
但是基本上不是很推薦使用以上這四種方法去定義 Thread Pool
在阿里巴巴的 Java 開發手冊中也有提到,如果要新建 Thread
請透過 ThreadPoolExecutor 的方式去自定義 Thread Pool 的使用模式
在這篇文章的樓主也是因為用了以上其中一個方法採到 OOM 的雷
所以在設定 Thread Pool 的時候要特別注意使用的情況適不適合!