ThreadPoolExecutor底层原理如何从源码到核心逻辑全解析?

摘要:ThreadPoolExecutor 是 Java 并发编程中线程池的核心实现类,也是 Executor 框架的核心载体。它解决了手动创建线程的资源浪费、管理复杂等问题,其底层原理围绕「线程管理、任务调度、状态控制」三大核心展开。本文会从核
ThreadPoolExecutor 是 Java 并发编程中线程池的核心实现类,也是 Executor 框架的核心载体。它解决了手动创建线程的资源浪费、管理复杂等问题,其底层原理围绕「线程管理、任务调度、状态控制」三大核心展开。本文会从核心参数、执行流程、源码拆解、关键机制四个维度,把 ThreadPoolExecutor 的底层原理讲透。 一、先搞懂核心参数:决定线程池行为的基础 ThreadPoolExecutor 的构造方法是理解其原理的入口,核心参数直接决定线程池的运行规则: public ThreadPoolExecutor( int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数(核心+非核心) long keepAliveTime, // 非核心线程空闲超时时间 TimeUnit unit, // keepAliveTime 的时间单位 BlockingQueue<Runnable> workQueue, // 任务阻塞队列 ThreadFactory threadFactory, // 线程创建工厂(自定义线程名称/优先级) RejectedExecutionHandler handler // 拒绝策略 ) 参数 底层作用 corePoolSize 核心线程数,核心线程默认永久存活(除非设置 allowCoreThreadTimeOut=true) maximumPoolSize 线程池能创建的最大线程数,限制非核心线程的上限 keepAliveTime 非核心线程空闲超过该时间后,会被销毁释放资源 workQueue 存储等待执行的任务,是线程池「削峰填谷」的关键 threadFactory 统一创建线程(比如给线程命名,方便排查问题),默认用 Executors.defaultThreadFactory() handler 任务无法被执行时的兜底策略(线程池满+队列满/线程池关闭) 二、核心执行流程:任务是如何被处理的? ThreadPoolExecutor 处理任务的核心逻辑在 execute(Runnable command) 方法中,底层遵循「核心线程 → 任务队列 → 非核心线程 → 拒绝策略」的优先级规则,流程如下: graph TD A[提交任务] --> B{核心线程数是否已满?}; B -- 否 --> C[创建核心线程执行任务]; B -- 是 --> D{任务队列是否已满?}; D -- 否 --> E[任务入队等待执行]; D -- 是 --> F{最大线程数是否已满?}; F -- 否 --> G[创建非核心线程执行任务]; F -- 是 --> H[触发拒绝策略]; 流程拆解(结合源码简化版) public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // ctl 是核心原子变量:高3位存线程池状态,低29位存工作线程数 int c = ctl.get(); // 步骤1:核心线程数未满,创建核心线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // true 表示创建核心线程 c = ctl.get(); // 创建失败(比如线程池已关闭),重新获取状态 } // 步骤2:核心线程满,且线程池运行中,任务入队 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 二次检查:线程池已关闭,移除任务并触发拒绝策略 if (!isRunning(recheck) && remove(command)) { reject(command); } // 无工作线程(核心线程可能被销毁),创建空线程(非核心)取队列任务 else if (workerCountOf(recheck) == 0) { addWorker(null, false); // false 表示创建非核心线程 } } // 步骤3:队列满,尝试创建非核心线程执行任务 else if (!addWorker(command, false)) { // 步骤4:最大线程数已满,触发拒绝策略 reject(command); } } 关键说明: ctl:整个线程池的「状态中枢」,用一个 AtomicInteger 同时存储「线程池状态」和「工作线程数」,避免多变量的线程安全问题; addWorker(Runnable firstTask, boolean core):创建线程的核心方法,firstTask 是线程启动后第一个执行的任务,core 标记是否为核心线程。 三、源码深度拆解:核心方法的底层实现 1. ctl 变量:线程池状态+工作线程数的统一管理 ctl 是 ThreadPoolExecutor 最核心的原子变量,底层通过位运算实现「一个变量存两个信息」: // 高3位表示线程池状态,低29位表示工作线程数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 工作线程数的最大值 // 线程池的5种状态(优先级从高到低) private static final int RUNNING = -1 << COUNT_BITS; // 接收新任务+处理队列任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接收新任务,但处理队列任务 private static final int STOP = 1 << COUNT_BITS; // 不接收新任务+不处理队列任务+中断正在执行的任务 private static final int TIDYING = 2 << COUNT_BITS; // 所有任务执行完毕,工作线程数为0,准备执行terminated() private static final int TERMINATED = 3 << COUNT_BITS; // terminated() 执行完毕 // 位运算工具方法:拆分/组合状态和线程数 private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取线程池状态 private static int workerCountOf(int c) { return c & CAPACITY; } // 获取工作线程数 private static int ctlOf(int rs, int wc) { return rs | wc; } // 组合状态和线程数 底层意义:通过位运算减少锁竞争,提升并发性能(无需为「状态」和「线程数」两个变量加锁)。 2. addWorker 方法:线程的创建与启动 addWorker 是创建线程的核心,底层分为「状态检查→线程创建→线程启动」三步: private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 检查线程池状态:只有 RUNNING/SHUTDOWN(且有队列任务)时才允许创建线程 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) { return false; } // CAS 增加工作线程数(核心逻辑) for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) { return false; // 线程数超上限,创建失败 } if (compareAndIncrementWorkerCount(c)) { break retry; // CAS 成功,跳出循环 } c = ctl.get(); // CAS 失败,重新获取状态 if (runStateOf(c) != rs) { continue retry; // 状态变了,重新检查 } } } // 线程数增加成功,开始创建 Worker 线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // Worker 是线程池的内部类,封装了 Thread 和 Runnable w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 加锁保证 workers 集合的线程安全 try { int rs = runStateOf(ctl.get()); // 再次检查状态:确保线程池未关闭 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); // workers 是 HashSet<Worker>,存储所有工作线程 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 启动线程,执行 Worker#run 方法 workerStarted = true; } } } finally { if (!workerStarted) { addWorkerFailed(w); // 启动失败,回滚(减少线程数+移除 Worker) } } return workerStarted; } 3. Worker 类:线程的封装与任务循环执行 Worker 是 ThreadPoolExecutor 的内部类,实现了 Runnable 接口,是「线程+任务」的封装体,核心逻辑在 run() 方法: private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; // 真正执行任务的线程 Runnable firstTask; // 第一个要执行的任务 Worker(Runnable firstTask) { setState(-1); // 初始化同步状态,避免线程运行前被中断 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); // 通过线程工厂创建线程 } // 线程启动后执行的核心方法 public void run() { runWorker(this); // 核心:循环从队列取任务执行 } } // 真正的任务执行循环 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 释放锁,允许中断 boolean completedAbruptly = true; try { // 核心循环:有初始任务 或 从队列取到任务,就一直执行 while (task != null || (task = getTask()) != null) { w.lock(); // 加锁,防止线程被中断(除非线程池停止) // 检查线程池状态:如果是 STOP/TIDYING/TERMINATED,中断当前线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) { wt.interrupt(); } try { beforeExecute(wt, task); // 钩子方法:任务执行前的扩展(自定义) Throwable thrown = null; try { task.run(); // 执行任务(核心中的核心) } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 钩子方法:任务执行后的扩展(自定义) } } finally { task = null; w.completedTasks++; // 统计完成的任务数 w.unlock(); // 释放锁 } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); // 任务循环结束,销毁线程 } } 核心逻辑: getTask():从任务队列取任务(阻塞/超时),是线程复用的关键; runWorker():线程启动后,会循环调用 getTask() 取任务执行,直到取到 null(线程超时/线程池关闭); processWorkerExit():线程退出时的清理工作(从 workers 集合移除、更新线程数、尝试终止线程池)。 4. getTask() 方法:线程复用的核心 getTask() 负责从队列取任务,底层逻辑决定了「核心线程是否存活」「非核心线程是否超时销毁」: private Runnable getTask() { boolean timedOut = false; // 标记是否超时 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 检查线程池状态:如果是 STOP/TERMINATED,或 SHUTDOWN 且队列为空,返回 null(线程退出) if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); // 减少工作线程数 return null; } int wc = workerCountOf(c); // 判断是否需要超时销毁:非核心线程 或 核心线程允许超时 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 非核心线程超时/线程数超上限,返回 null(线程退出) if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) { return null; } continue; } try { // 从队列取任务:核心线程阻塞(take()),非核心线程超时阻塞(poll()) Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) { return r; // 取到任务,返回执行 } timedOut = true; // 超时未取到任务,标记超时 } catch (InterruptedException retry) { timedOut = false; // 被中断,重置超时标记 } } } 核心差异: 核心线程:默认调用 workQueue.take()(无限阻塞,直到取到任务),因此核心线程不会超时销毁; 非核心线程:调用 workQueue.poll(keepAliveTime, unit)(超时阻塞,超时后返回 null),线程会退出循环并销毁。 四、关键机制:拒绝策略与线程池生命周期 1. 拒绝策略:任务无法执行时的兜底 当「线程池满(达到 maximumPoolSize)+ 队列满」或「线程池已关闭」时,触发 RejectedExecutionHandler,默认提供4种策略: 策略 底层行为 AbortPolicy 抛 RejectedExecutionException(默认策略),中断任务提交 CallerRunsPolicy 由提交任务的线程(比如主线程)执行任务,降低提交速度,起到“限流”作用 DiscardPolicy 静默丢弃任务,无任何提示(不抛异常) DiscardOldestPolicy 丢弃队列中最老的任务,尝试重新提交当前任务 2. 线程池生命周期:状态流转 线程池的5种状态遵循固定流转路径,底层由 ctl 变量控制: RUNNING → SHUTDOWN → TIDYING → TERMINATED RUNNING → STOP → TIDYING → TERMINATED shutdown():触发 RUNNING → SHUTDOWN,不接收新任务,但处理队列任务; shutdownNow():触发 RUNNING → STOP,不接收新任务、不处理队列任务,中断所有正在执行的任务; terminated():钩子方法,线程池进入 TERMINATED 状态时执行(可自定义扩展)。 五、实战验证:核心原理落地示例 通过自定义线程池,验证「核心线程→队列→非核心线程→拒绝策略」的执行逻辑: import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorDemo { public static void main(String[] args) { // 核心线程2,最大线程5,非核心线程超时10秒,队列容量3,默认拒绝策略 ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3), new ThreadPoolExecutor.AbortPolicy() ); // 提交8个任务:2核心+3队列+3非核心(刚好到最大线程数) for (int i = 1; i <= 8; i++) { int taskId = i; executor.execute(() -> { try { System.out.println("任务" + taskId + "由线程" + Thread.currentThread().getName() + "执行"); Thread.sleep(1000); // 模拟任务执行 } catch (InterruptedException e) { e.printStackTrace(); } }); } // 提交第9个任务:队列+线程都满,触发拒绝策略 try { executor.execute(() -> System.out.println("任务9执行")); } catch (Exception e) { System.out.println("任务9被拒绝:" + e.getMessage()); } // 关闭线程池 executor.shutdown(); } } 执行结果: 任务1由线程pool-1-thread-1执行 任务2由线程pool-1-thread-2执行 任务6由线程pool-1-thread-3执行 任务7由线程pool-1-thread-4执行 任务8由线程pool-1-thread-5执行 任务3由线程pool-1-thread-1执行(核心线程复用) 任务4由线程pool-1-thread-2执行(核心线程复用) 任务5由线程pool-1-thread-3执行(非核心线程复用) 任务9被拒绝:Task java.lang.Runnable@xxx rejected from java.util.concurrent.ThreadPoolExecutor@xxx[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0] 总结 核心逻辑:ThreadPoolExecutor 处理任务遵循「核心线程→任务队列→非核心线程→拒绝策略」,核心线程复用、非核心线程超时销毁是性能优化的关键; 状态管理:通过 ctl 原子变量统一管理「线程池状态」和「工作线程数」,位运算减少并发竞争; 线程复用:Worker 类封装线程,runWorker() 循环调用 getTask() 从队列取任务执行,直到线程需要销毁; 关键参数:corePoolSize 决定核心线程数,maximumPoolSize 限制总线程数,workQueue 决定任务缓冲规则,三者配合决定线程池的核心行为。 理解这些底层原理,你就能避开 Executors 工具类的坑(比如 CachedThreadPool 无上限线程、FixedThreadPool 无界队列),根据业务场景自定义线程池参数。