[JAVA] 一篇文章带你搞懂Java线程池实现原理

1905 0
Honkers 2022-11-8 17:12:58 | 显示全部楼层 |阅读模式
目录

    1. 为什么要使用线程池2. 线程池的使用3. 线程池核心参数4. 线程池工作原理5. 线程池源码剖析
      5.1 线程池的属性5.2 线程池状态5.3 execute源码5.4 worker源码5.5 runWorker源码



1. 为什么要使用线程池

使用线程池通常由以下两个原因:
    频繁创建销毁线程需要消耗系统资源,使用线程池可以复用线程。使用线程池可以更容易管理线程,线程池可以动态管理线程个数、具有阻塞队列、定时周期执行任务、环境隔离等。

2. 线程池的使用
  1. /**
  2. * @author 一灯架构
  3. * @apiNote 线程池示例
  4. **/
  5. public class ThreadPoolDemo {
  6.     public static void main(String[] args) {
  7.         // 1. 创建线程池
  8.         ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
  9.                 3,
  10.                 3,
  11.                 0L,
  12.                 TimeUnit.MILLISECONDS,
  13.                 new LinkedBlockingQueue<>(),
  14.                 Executors.defaultThreadFactory(),
  15.                 new ThreadPoolExecutor.AbortPolicy());
  16.         // 2. 往线程池中提交3个任务
  17.         for (int i = 0; i < 3; i++) {
  18.             threadPoolExecutor.execute(() -> {
  19.                 System.out.println(Thread.currentThread().getName() + " 关注公众号:一灯架构");
  20.             });
  21.         }
  22.         // 3. 关闭线程池
  23.         threadPoolExecutor.shutdown();
  24.     }
  25. }
复制代码
线程池的使用非常简单:
    调用new ThreadPoolExecutor()构造方法,指定核心参数,创建线程池。调用execute()方法提交Runnable任务使用结束后,调用shutdown()方法,关闭线程池。
再看一下线程池构造方法中核心参数的作用。

3. 线程池核心参数

线程池共有七大核心参数:
参数名称参数含义
int corePoolSize核心线程数
int maximumPoolSize最大线程数
long keepAliveTime线程存活时间
TimeUnit unit时间单位
BlockingQueue workQueue阻塞队列
ThreadFactory threadFactory线程创建工厂
RejectedExecutionHandler handler拒绝策略
1.corePoolSize 核心线程数
当往线程池中提交任务,会创建线程去处理任务,直到线程数达到corePoolSize,才会往阻塞队列中添加任务。默认情况下,空闲的核心线程并不会被回收,除非配置了allowCoreThreadTimeOut=true。
2.maximumPoolSize 最大线程数
当线程池中的线程数达到corePoolSize,阻塞队列又满了之后,才会继续创建线程,直到达到maximumPoolSize,另外空闲的非核心线程会被回收。
3.keepAliveTime 线程存活时间
非核心线程的空闲时间达到了keepAliveTime,将会被回收。
4.TimeUnit 时间单位
线程存活时间的单位,默认是TimeUnit.MILLISECONDS(毫秒),可选择的有:
    TimeUnit.NANOSECONDS(纳秒)TimeUnit.MICROSECONDS(微秒)TimeUnit.MILLISECONDS(毫秒)TimeUnit.SECONDS(秒)TimeUnit.MINUTES(分钟)TimeUnit.HOURS(小时)TimeUnit.DAYS(天)
5.workQueue 阻塞队列
当线程池中的线程数达到corePoolSize,再提交的任务就会放到阻塞队列的等待,默认使用的是LinkedBlockingQueue,可选择的有:
    LinkedBlockingQueue(基于链表实现的阻塞队列)ArrayBlockingQueue(基于数组实现的阻塞队列)SynchronousQueue(只有一个元素的阻塞队列)PriorityBlockingQueue(实现了优先级的阻塞队列)DelayQueue(实现了延迟功能的阻塞队列)
6.threadFactory 线程创建工厂
用来创建线程的工厂,默认的是Executors.defaultThreadFactory(),可选择的还有Executors.privilegedThreadFactory()实现了线程优先级。当然也可以自定义线程创建工厂,创建线程的时候最好指定线程名称,便于排查问题。
7.RejectedExecutionHandler 拒绝策略
当线程池中的线程数达到maximumPoolSize,阻塞队列也满了之后,再往线程池中提交任务,就会触发执行拒绝策略,默认的是AbortPolicy(直接终止,抛出异常),可选择的有:
    AbortPolicy(直接终止,抛出异常)DiscardPolicy(默默丢弃,不抛出异常)DiscardOldestPolicy(丢弃队列中最旧的任务,执行当前任务)CallerRunsPolicy(返回给调用者执行)

4. 线程池工作原理

线程池的工作原理,简单理解如下:


    当往线程池中提交任务的时候,会先判断线程池中线程数是否核心线程数,如果小于,会创建核心线程并执行任务。如果线程数大于核心线程数,会判断阻塞队列是否已满,如果没有满,会把任务添加到阻塞队列中等待调度执行。如果阻塞队列已满,会判断线程数是否小于最大线程数,如果小于,会继续创建最大线程数并执行任务。如果线程数大于最大线程数,会执行拒绝策略,然后结束。

5. 线程池源码剖析


5.1 线程池的属性
  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2.     // 线程池的控制状态,Integer长度是32位,前3位用来存储线程池状态,后29位用来存储线程数量
  3.     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  4.     // 线程个数所占的位数
  5.     private static final int COUNT_BITS = Integer.SIZE - 3;
  6.     // 线程池的最大容量,2^29-1,约5亿个线程
  7.     private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  8.     // 独占锁,用来控制多线程下的并发操作
  9.     private final ReentrantLock mainLock = new ReentrantLock();
  10.     // 工作线程的集合
  11.     private final HashSet<Worker> workers = new HashSet<>();
  12.     // 等待条件,用来响应中断
  13.     private final Condition termination = mainLock.newCondition();
  14.     // 是否允许回收核心线程
  15.     private volatile boolean allowCoreThreadTimeOut;
  16.     // 线程数的历史峰值
  17.     private int largestPoolSize;
  18.     /**
  19.      * 以下是线程池的七大核心参数
  20.      */
  21.     private volatile int corePoolSize;
  22.     private volatile int maximumPoolSize;
  23.     private volatile long keepAliveTime;
  24.     private final BlockingQueue<Runnable> workQueue;
  25.     private volatile ThreadFactory threadFactory;
  26.     private volatile RejectedExecutionHandler handler;
  27. }
复制代码
线程池的控制状态ctl用来存储线程池状态和线程个数,前3位用来存储线程池状态,后29位用来存储线程数量。
设计者多聪明,用一个变量存储了两块内容。

5.2 线程池状态

线程池共有5种状态:
状态名称状态含义状态作用
RUNNING运行中线程池创建后默认状态,接收新任务,并处理阻塞队列中的任务。
SHUTDOWN已关闭调用shutdown方法后处于该状态,不再接收新任务,处理阻塞队列中任务。
STOP已停止调用shutdownNow方法后处于该状态,不再新任务,并中断所有线程,丢弃阻塞队列中所有任务。
TIDYING处理中所有任务已完成,所有工作线程都已回收,等待调用terminated方法。
TERMINATED已终止调用terminated方法后处于该状态,线程池的最终状态。



5.3 execute源码

看一下往线程池中提交任务的源码,这是线程池的核心逻辑:
  1. // 往线程池中提交任务
  2. public void execute(Runnable command) {
  3.     // 1. 判断提交的任务是否为null
  4.     if (command == null)
  5.         throw new NullPointerException();
  6.     int c = ctl.get();
  7.     // 2. 判断线程数是否小于核心线程数
  8.     if (workerCountOf(c) < corePoolSize) {
  9.         // 3. 把任务包装成worker,添加到worker集合中
  10.         if (addWorker(command, true))
  11.             return;
  12.         c = ctl.get();
  13.     }
  14.     // 4. 判断如果线程数不小于corePoolSize,并且可以添加到阻塞队列
  15.     if (isRunning(c) && workQueue.offer(command)) {
  16.         // 5. 重新检查线程池状态,如果线程池不是运行状态,就移除刚才添加的任务,并执行拒绝策略
  17.         int recheck = ctl.get();
  18.         if (!isRunning(recheck) && remove(command))
  19.             reject(command);
  20.         // 6. 判断如果线程数是0,就创建非核心线程(任务是null,会从阻塞队列中拉取任务)
  21.         else if (workerCountOf(recheck) == 0)
  22.             addWorker(null, false);
  23.     }
  24.     // 7. 如果添加阻塞队列失败,就创建一个Worker
  25.     else if (!addWorker(command, false))
  26.         // 8. 如果创建Worker失败说明已经达到最大线程数了,则执行拒绝策略
  27.         reject(command);
  28. }
复制代码
execute方法的逻辑也很简单,最终就是调用addWorker方法,把任务添加到worker集合中,再看一下addWorker方法的源码:
  1. // 添加worker
  2. private boolean addWorker(Runnable firstTask, boolean core) {
  3.     retry:
  4.     for (; ; ) {
  5.         int c = ctl.get();
  6.         int rs = runStateOf(c);
  7.         // 1. 检查是否允许提交任务
  8.         if (rs >= SHUTDOWN &&
  9.                 !(rs == SHUTDOWN &&
  10.                         firstTask == null &&
  11.                         !workQueue.isEmpty()))
  12.             return false;
  13.         // 2. 使用死循环保证添加线程成功
  14.         for (; ; ) {
  15.             int wc = workerCountOf(c);
  16.             // 3. 校验线程数是否超过容量限制
  17.             if (wc >= CAPACITY ||
  18.                     wc >= (core ? corePoolSize : maximumPoolSize))
  19.                 return false;
  20.             // 4. 使用CAS修改线程数
  21.             if (compareAndIncrementWorkerCount(c))
  22.                 break retry;
  23.             c = ctl.get();
  24.             // 5. 如果线程池状态变了,则从头再来
  25.             if (runStateOf(c) != rs)
  26.                 continue retry;
  27.         }
  28.     }
  29.     boolean workerStarted = false;
  30.     boolean workerAdded = false;
  31.     Worker w = null;
  32.     try {
  33.         // 6. 把任务和新线程包装成一个worker
  34.         w = new Worker(firstTask);
  35.         final Thread t = w.thread;
  36.         if (t != null) {
  37.             // 7. 加锁,控制并发
  38.             final ReentrantLock mainLock = this.mainLock;
  39.             mainLock.lock();
  40.             try {
  41.                 // 8. 再次校验线程池状态是否异常
  42.                 int rs = runStateOf(ctl.get());
  43.                 if (rs < SHUTDOWN ||
  44.                         (rs == SHUTDOWN && firstTask == null)) {
  45.                     // 9. 如果线程已经启动,就抛出异常
  46.                     if (t.isAlive())
  47.                         throw new IllegalThreadStateException();
  48.                     // 10. 添加到worker集合中
  49.                     workers.add(w);
  50.                     int s = workers.size();
  51.                     // 11. 记录线程数历史峰值
  52.                     if (s > largestPoolSize)
  53.                         largestPoolSize = s;
  54.                     workerAdded = true;
  55.                 }
  56.             } finally {
  57.                 mainLock.unlock();
  58.             }
  59.             if (workerAdded) {
  60.                 // 12. 启动线程
  61.                 t.start();
  62.                 workerStarted = true;
  63.             }
  64.         }
  65.     } finally {
  66.         if (!workerStarted)
  67.             addWorkerFailed(w);
  68.     }
  69.     return workerStarted;
  70. }
复制代码
方法虽然很长,但是逻辑很清晰。就是把任务和线程包装成worker,添加到worker集合,并启动线程。

5.4 worker源码

再看一下worker类的结构:
  1. private final class Worker
  2.         extends AbstractQueuedSynchronizer
  3.         implements Runnable {
  4.     // 工作线程
  5.     final Thread thread;
  6.     // 任务
  7.     Runnable firstTask;
  8.     // 创建worker,并创建一个新线程(用来执行任务)
  9.     Worker(Runnable firstTask) {
  10.         setState(-1);
  11.         this.firstTask = firstTask;
  12.         this.thread = getThreadFactory().newThread(this);
  13.     }
  14. }
复制代码
5.5 runWorker源码

再看一下run方法的源码:
  1. // 线程执行入口
  2. public void run() {
  3.     runWorker(this);
  4. }
  5. // 线程运行核心方法
  6. final void runWorker(Worker w) {
  7.     Thread wt = Thread.currentThread();
  8.     Runnable task = w.firstTask;
  9.     w.firstTask = null;
  10.     w.unlock();
  11.     boolean completedAbruptly = true;
  12.     try {
  13.         // 1. 如果当前worker中任务是null,就从阻塞队列中获取任务
  14.         while (task != null || (task = getTask()) != null) {
  15.             // 加锁,保证thread不被其他线程中断(除非线程池被中断)
  16.             w.lock();
  17.             // 2. 校验线程池状态,是否需要中断当前线程
  18.             if ((runStateAtLeast(ctl.get(), STOP) ||
  19.                     (Thread.interrupted() &&
  20.                             runStateAtLeast(ctl.get(), STOP))) &&
  21.                     !wt.isInterrupted())
  22.                 wt.interrupt();
  23.             try {
  24.                 beforeExecute(wt, task);
  25.                 Throwable thrown = null;
  26.                 try {
  27.                     // 3. 执行run方法
  28.                     task.run();
  29.                 } catch (RuntimeException x) {
  30.                     thrown = x;
  31.                     throw x;
  32.                 } catch (Error x) {
  33.                     thrown = x;
  34.                     throw x;
  35.                 } catch (Throwable x) {
  36.                     thrown = x;
  37.                     throw new Error(x);
  38.                 } finally {
  39.                     afterExecute(task, thrown);
  40.                 }
  41.             } finally {
  42.                 task = null;
  43.                 w.completedTasks++;
  44.                 // 解锁
  45.                 w.unlock();
  46.             }
  47.         }
  48.         completedAbruptly = false;
  49.     } finally {
  50.         // 4. 从worker集合删除当前worker
  51.         processWorkerExit(w, completedAbruptly);
  52.     }
  53. }
复制代码
runWorker方法逻辑也很简单,就是不断从阻塞队列中拉取任务并执行。
再看一下从阻塞队列中拉取任务的逻辑:
  1. // 从阻塞队列中拉取任务
  2. private Runnable getTask() {
  3.     boolean timedOut = false;
  4.     for (; ; ) {
  5.         int c = ctl.get();
  6.         int rs = runStateOf(c);
  7.         // 1. 如果线程池已经停了,或者阻塞队列是空,就回收当前线程
  8.         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  9.             decrementWorkerCount();
  10.             return null;
  11.         }
  12.         int wc = workerCountOf(c);
  13.         // 2. 再次判断是否需要回收线程
  14.         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  15.         if ((wc > maximumPoolSize || (timed && timedOut))
  16.                 && (wc > 1 || workQueue.isEmpty())) {
  17.             if (compareAndDecrementWorkerCount(c))
  18.                 return null;
  19.             continue;
  20.         }
  21.         try {
  22.             // 3. 从阻塞队列中拉取任务
  23.             Runnable r = timed ?
  24.                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  25.                     workQueue.take();
  26.             if (r != null)
  27.                 return r;
  28.             timedOut = true;
  29.         } catch (InterruptedException retry) {
  30.             timedOut = false;
  31.         }
  32.     }
  33. }
复制代码
以上就是一篇文章带你搞懂Java线程池实现原理的详细内容,更多关于Java线程池的资料请关注中国红客联盟其它相关文章!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Honkers

荣誉红客

关注
  • 4005
    主题
  • 36
    粉丝
  • 0
    关注
这家伙很懒,什么都没留下!

中国红客联盟公众号

联系站长QQ:5520533

admin@chnhonker.com
Copyright © 2001-2025 Discuz Team. Powered by Discuz! X3.5 ( 粤ICP备13060014号 )|天天打卡 本站已运行