ThreadPoolExecutor是Java的线程池并发代名词,多线程开发基本都是基于这个去做具体的业务开发。虽然觉得自己回了,网上帖子已经有很多的文章写这个,但是是自己一一点写的,终归是要比看别人的理解更加深刻,所以最近自己在对java知识的系统梳理。那么接下来主要分析下这个多线程框架的原理。
ThreadPoolExecutor的构造函数以成员变量介绍publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,TimeUnitunit,BlockingQueue<Runnable>workQueue,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler){面试靠的最多是这个构造函数中7个参数的作用,
corePoolSize是核心线程数,即使线程是空闲的,线程池一直保持的的线程数,除非allowCoreThreadTimeOut参数设置为true
maximumPoolSize线程池最大线程数
keepAliveTimeunit线程存活时间和时间单位
workQueue是任务队列,是用来保持task,该队列保持住了Runnable的任务,通过调用线程池的execute的方法.
threadFactory创建线程的工厂
RejectedExecutionHandler是当线程数超过限制以及队列也满了,需要执行的拒绝策略.
成员变零
privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;privatestaticfinalintCAPACITY=(1<<COUNT_BITS)-1;//线程容量//runStateisstoredinthehigh-orderbitsprivatestaticfinalintRUNNING=-1<<COUNT_BITS;privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;privatestaticfinalintSTOP=1<<COUNT_BITS;privatestaticfinalintTIDYING=2<<COUNT_BITS;privatestaticfinalintTERMINATED=3<<COUNT_BITS;面试最喜欢问的是ctl变量的代表什么意义?ctl变量的的的用高3位表示线程池的状态,用低29位表示线程个数,两者通过|操作,拼接出ctl变量,也就是线程池的最大线程数capacity是(2^29)-1。
线程池状态RUNNING运行状态-1<<29表示线程池可以接新的任务并且处理队列任务
SHUTDOWN关闭状态态-1<<29表示线程池不接受新的线程池任务但是可以处理队列中的任务
STOP停止状态1<<29表示线程池不接受新的线程池任务也不处理队列中的任务并且中断线程池里中正在执行的任务
TIDYING2<<29表示所有的线程池都已经中断了,线程数为0,线程状态转为为TIDYING,将执行terminated钩子函数
TERMINATED3<<29表示所有terminated方法都已经执行完成。
线程状态之间装换图首先我们来看平时业务代码是提交任务到线程池执行的函数是通过execute或者submit方法,区别就是submit返回具有Future,execute返回void,的、那么接下来我们主要分析execute的执行流程,submit涉及到线程异步返回,之后会另外单独分析,那么下面这个execute函数就能看出线程池的整个执行流程,
publicvoidexecute(Runnablecommand){if(command==null)thrownewNullPointerException();/**Proceedin3steps:**1.IffewerthancorePoolSizethreadsarerunning,tryto*startanewthreadwiththegivencommandasitsfirst*task.ThecalltoaddWorkeratomicallychecksrunStateand*workerCount,andsopreventsfalsealarmsthatwouldadd*threadswhenitshouldn't,byreturningfalse.**2.Ifataskcanbesuccessfullyqueued,thenwestillneed*todouble-checkwhetherweshouldhaveaddedathread*(becauseexistingonesdiedsincelastchecking)orthat*thepoolshutdownsinceentryintothismethod.Sowe*recheckstateandifnecessaryrollbacktheenqueuingif*stopped,orstartanewthreadiftherearenone.**3.Ifwecannotqueuetask,thenwetrytoaddanew*thread.Ifitfails,weknowweareshutdownorsaturated*andsorejectthetask.*/intc=ctl.get();if(workerCountOf(c)<corePoolSize){if(addWorker(command,true))return;c=ctl.get();}if(isRunning(c)&&workQueue.offer(command)){intrecheck=ctl.get();if(!isRunning(recheck)&&remove(command))reject(command);elseif(workerCountOf(recheck)==0)//当线程池的核心线程数设置为0情况下,那么这时workerCountOf(recheck)为0,这时就开启非线程数处理队列任务addWorker(null,false);}elseif(!addWorker(command,false))reject(command);}线程池执行任务流程图如下:
我相信大概的流程一般同学是清楚的:
当线程数的Worker线程<corePoolSize创建核心线程数执行
当线程数的Worker线程>corePoolSize,将任务加入任务队列中
则当corePoolSize<maxPoolsize,则新增非核心线程执行任务
当队列满了,线程数也已经达到maxPoolsize,则执行拒绝策略
实际源码中执行流程还有一些小细节容易被忽略的地点
重新检查线程的状态以及检查线程池的线程数的流程
线程池新增工作线程的流程线程池新增工作任务主要addWorker方法。由于代码比较长,我就在代码里写好注释
privatebooleanaddWorker(RunnablefirstTask,booleancore){retry:for(;;){intc=ctl.get();intrs=runStateOf(c);//Checkifqueueemptyonlyifnecessary.if(rs>=SHUTDOWN&&//第一个条件:线程至少不是运行状态,那么就是shutdownstoptidying,terminated状态!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))//第二个条件:当前线程池是shutdown状态且任务队列非空并且工作任务第一个任务是空的取反条件,这个含义是当除了SHUTDOWN状态且第一个任务为空且任务队列不为空//情况下,直接返回false,增加Work线程失败returnfalse;for(;;){intwc=workerCountOf(c);if(wc>=CAPACITY||wc>=(core?corePoolSize:maximumPoolSize))returnfalse;if(compareAndIncrementWorkerCount(c))breakretry;c=ctl.get();//Re-readctlif(runStateOf(c)!=rs)continueretry;//elseCASfailedduetoworkerCountchange;retryinnerloop}}booleanworkerStarted=false;booleanworkerAdded=false;Workerw=null;try{w=newWorker(firstTask);finalThreadt=w.thread;if(t!=null){finalReentrantLockmainLock=this.mainLock;mainLock.lock();try{//Recheckwhileholdinglock.//BackoutonThreadFactoryfailureorif//shutdownbeforelockacquired.intrs=runStateOf(ctl.get());if(rs<SHUTDOWN||//线程池是running状态(rs==SHUTDOWN&&firstTask==null)){//线程池处于shutdown状态并且第一个task为空if(t.isAlive())//precheckthattisstartablethrownewIllegalThreadStateException();//加入工作线程的集合workers.add(w);ints=workers.size();if(s>largestPoolSize)//记录最大线程数largestPoolSize=s;workerAdded=true;}}finally{-mainLock.unlock();-}-if(workerAdded){-t.start();workerStarted=true;}}}finally{if(!workerStarted)addWorkerFailed(w);}returnworkerStarted;}添加工作线程主要步骤
检查线程池的运行状态以及队列是否是空,增加线程。为什么增加这个判断,主要是因为线程池是多线程的随便可能另外调用shutdown等方法关闭线程池,所以做每一步之前都要再次check线程池的状态,其中比较重要的点是线程池在除了Running状态,其他的只有shutdow状态,且队列任务非空的情况,才能增加work线程处理任务。
判断线程池的线程是核心线程数,然后就判断大于核心线程数,如果不是增加的核心线程数,然后通过CAS增加线程数加1,然后re-read的ctl的现在的状态是否刚开始进入循环的状态保持一致。
创建Worker对象,它的第一个参数Runable就是执行的第一个task,然后获取mainLock的重入锁,然后再次判断线程池的状态是否是shutdown状态,然后将Worker对象加入工作线程的Set集合中,判断是大于largePoolSize,则将workSet的size赋值largePoolSize,然后赋值workerAdded为true,接下来在finnally中workerAdded为true,则调用Worker的start方法启动该Worker线程,
如果WorkerAdded失败,则从Worder的Set移除刚才加入Worker线程,并将线程池的线程数减1,
工作线程Worker的执行流程首先来看下Work的类的成员变量的构造函数,从下面的Work的代码,可以看到它是实现了RUnnable接口,上一节Worker启动是调用了它的start方法,真正由操作系统调度执行的其run方法,那么接下来重点看下run的工作流程。
privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{/***Thisclasswillneverbeserialized,butweprovidea*serialVersionUIDtosuppressajavacwarning.*/privatestaticfinallongserialVersionUID=6138294804551838833L;/**Threadthisworkerisrunningin.Nulliffactoryfails.*/finalThreadthread;/**Initialtasktorun.Possiblynull.*/RunnablefirstTask;/**Per-threadtaskcounter*/volatilelongcompletedTasks;/***CreateswithgivenfirsttaskandthreadfromThreadFactory.*@paramfirstTaskthefirsttask(nullifnone)*/Worker(RunnablefirstTask){//初始化状态为-1,表示不能被中断setState(-1);//inhibitinterruptsuntilrunWorkerthis.firstTask=firstTask;this.thread=getThreadFactory().newThread(this);}下面代码中Work的run直接调用runWork,并传入自身对象,开始一个循环判断第一个任务后者从任务队列中取任务不为空,就开始上锁,然后执行任务,如果任务队列为空了,则处理Work的退出。
/**DelegatesmainrunlooptoouterrunWorker*/publicvoidrun(){//直接调用runWorker函数runWorker(this);}finalvoidrunWorker(Workerw){//Wokder当前线程Threadwt=Thread.currentThread();Runnabletask=w.firstTask;w.firstTask=null;//将state值赋值为0,这样就运行中断w.unlock();//allowinterruptsbooleancompletedAbruptly=true;try{//循环判断第一个Task获取从获取任务while(task!=null||(task=getTask())!=null){//获取当前Work的锁,处理任务,也就是当前Work线程处理是同步处理任务的w.lock();//Ifpoolisstopping,ensurethreadisinterrupted;//ifnot,ensurethreadisnotinterrupted.This//requiresarecheckinsecondcasetodealwith//shutdownNowracewhileclearinginterrupt//线程池的状态至少是stop,即使stop,tidying.terminated状态if((runStateAtLeast(ctl.get(),STOP)//检查线程是否中断且清楚中断||(Thread.interrupted()&&//再次检查线程池的状态至少是STOPrunStateAtLeast(ctl.get(),STOP)))&&//再次判断是否中断!wt.isInterrupted())//中断线程wt.interrupt();try{//执行业务任务前处理(钩子函数)beforeExecute(wt,task);Throwablethrown=null;try{//这里就是执行提交线程池的Runnable的任务的run方法task.run();}catch(RuntimeExceptionx){thrown=x;throwx;}catch(Errorx){thrown=x;throwx;}catch(Throwablex){thrown=x;thrownewError(x);}finally{//执行业务任务后处理(钩子函数)afterExecute(task,thrown);}}finally{//执行结束重置为空,回到while循环拿下一个task=null;//处理任务加1w.completedTasks++;//释放锁,处理下一个任务w.unlock();}}//代码执行到这里,代表业务的任务没有异常,不然不会走到这里,//因为上一层try没有catch异常的,而业务执行出现异常,最里层//虽然catch了异常,但是也都通过throw向外抛出completedAbruptly=false;}finally{//如果循环结束,则处理Work退出工作,代表任务拿不到任务,即任务队列没有任务了processWorkerExit(w,completedAbruptly);}}下面就来看下getTask获取任务队列的处理逻辑、如果这里返回null,即runWorker循环退出,则会处理finnaly中processWorkExit,处理Work线程的退出,下面是getWork返回null的情况:
如果线程池状态值至少是SHUTDOWN状态,并且线程池状态值至少是STOP状态,或者是任务队列是空,则将线程池的workcout减1,并返回null,
计算线程池中线程池的数量,如果线程数量大于最大线程数量,或者allowCoreThreadTimeOut参数为true或者线程数大于并且任务队列为空,则将线程池减1,并返回null,
privateRunnablegetTask(){//超时标志booleantimedOut=false;//Didthelastpoll()timeout?for(;;){//获取线程状态intc=ctl.get();//线程状态intrs=runStateOf(c);//Checkifqueueemptyonlyifnecessary.//如果线程池状态值至少是SHUTDOWN状态,if(rs>=SHUTDOWN线程池状态值至少是STOP状态,或者是任务队列是空&&(rs>=STOP||workQueue.isEmpty())){//CAS将worker线程数减1decrementWorkerCount();returnnull;}//计算线程池线程数量intwc=workerCountOf(c);//Areworkerssubjecttoculling?//allowCoreThreadTimeOut参数设置为true,或则线程池的线程数大于corePoolSize,表示需要超时的Worker需要退出,booleantimed=allowCoreThreadTimeOut||wc>corePoolSize;//线程数大于最大线程数||已经超时if((wc>maximumPoolSize||(timed&&timedOut))//线程数大于1或者任务队列为空&&(wc>1||workQueue.isEmpty())){//CAS将线程数减1if(compareAndDecrementWorkerCount(c))returnnull;continue;}try{//需要处理超时的Worker,则获取任务队列中任务等待的时间//就是线程池构造函数中keepAliveTime时间,如果不处理超时的Worker//则直接调用take一直阻塞等待任务队列中有任务,拿到就返回Runnale任务Runnabler=timed?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):workQueue.take();if(r!=null)returnr;timedOut=true;}catch(InterruptedExceptionretry){timedOut=false;}}}Worker的退出处理:1从上面分析知道completedAbruptly是任务执行时是否出现异常标志,如果任务执行过程出错,则将线程池的线程数量减12.加线程池的mainLock的全局锁,这里主要区分Worker执行任务中,拿的是Worker内部的锁,完成任务加1,将worker从Worker的集合移除,3.执行tryTerminate函数,是否线程池线程池是否关闭4.根据线程池状态是否补充非核心的Worker线程去处理
privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly){//任务执行时出现异常,则减去工作if(completedAbruptly)//Ifabrupt,thenworkerCountwasn'tadjusteddecrementWorkerCount();//拿到线程池的主锁finalReentrantLockmainLock=this.mainLock;//加锁mainLock.lock();try{//完成任务加1completedTaskCount+=w.completedTasks;//将worker从Worker的集合移除workers.remove(w);}finally{mainLock.unlock();}//尝试线程池关闭tryTerminate();//获取线程池的ctlintc=ctl.get();//如果线程池的状态值小于STOP,即使SHUTDOWNRUNNINGif(runStateLessThan(c,STOP)){//任务执行没有异常if(!completedAbruptly){//allowCoreThreadTimeOut参数true,则min=0,表示不需要线程常驻。//负责是有corePoolSize个线程常驻线程池intmin=allowCoreThreadTimeOut?0:corePoolSize;if(min==0&&!workQueue.isEmpty())min=1;//如果线程池数大于最小,也就是不需要补充线程执行任务队列的任务if(workerCountOf(c)>=min)return;//replacementnotneeded}//走到这里表示线程池的线程数为0,而任务队列又不为空,得补充一个线程处理任务addWorker(null,false);}}t
logo设计
创造品牌价值
¥500元起
APP开发
量身定制,源码交付
¥2000元起
商标注册
一个好品牌从商标开始
¥1480元起
公司注册
注册公司全程代办
¥0元起
查
看
更
多
Java多线程之ThreadPoolExecutor原理(图文代码实例详解)
那么就是shutdownstoptidying,terminated状态!(rs==SHUTDOWN&&firstTask==null&&!workQueue.isEmpty()))\/\/第二个条件:当前线程池是shutdown状态且任务队列非空并且工作任务第一个任务是空的取反条件,
threadpoolexecutor线程池?
ThreadPoolExecutor是Java中的一个线程池实现,它提供了一个线程池框架,用于管理和控制多线程的执行。线程池的主要目的是减少在创建和销毁线程上花费的时间以及系统资源的开销,提高系统的执行效率。详细解释部分:1. 基本概念:ThreadPoolExecutor负责管理和控制一组工作线程,这些工作线程可以接受并执行被提交...
【Java原理系列】ScheduledThreadPoolExecutor原理用法示例源码详解
比如,初始化一个ScheduledThreadPoolExecutor实例,设置核心线程数,从而为定时任务提供资源保障。提交延迟任务,例如在5秒后执行特定操作,并输出相关信息。此外,提交周期性任务,如每隔2秒执行一次特定操作,用于实时监控或数据更新。最后,通过调用shutdown()与shutdownNow()方法来关闭执行器并等待所有任务...
ThreadPoolExecutor详解
ThreadPoolExecutor 1、corePoolSize(线程池基本大小) 2、maximumPoolSize(线程池最大数量) 3、workQueue(任务队列):用于保存等待执行的阻塞队列 ArrayBlockingQueue、LinkedBlockingQueue、 SynchronousQueue、PriorityBlockingQueue 4、RejectedExecutionHandler(饱和策略)(interface支持自定义...
java线程池之ScheduledThreadPoolExecutor实现原理
java线程池之ScheduledThreadPoolExecutor实现原理java中异步周期任务调度有Timer,ScheduledThreadPoolExecutor等实现,目前单机版的定时调度都是使用ScheduledThreadPoolExecutor去实现,那么它是如何实现周期执行
...带你学习ThreadPoolExecutor线程池实现原理
本文旨在通过手写一个线程池,来深入理解ThreadPoolExecutor线程池的实现原理。首先,线程池的核心目标是资源管理和性能优化,通过池化技术减少线程创建和销毁的开销。手写线程池的实现步骤包括确定核心流程和添加辅助流程,虽然代码简单,但能体现核心的池化思想。手写线程池的实现涉及到状态管理,如线程池数量和...
threadpoolexecutor线程池?
1. ThreadPoolExecutor是Java并发编程中的核心组件,它提供了一个线程池框架,用于管理和控制并发任务。2. ThreadPoolExecutor的作用是减少线程创建和销毁的开销,以及节省系统资源,从而提升程序的执行效率。3. ThreadPoolExecutor负责管理和维护一组工作线程,并根据配置策略来执行提交的任务。它允许开发者自...
线程池之ThreadPoolExecutor使用
: 线程启动原理 线程中断机制 多线程实现方式 FutureTask实现原理 线程池之ThreadPoolExecutor概述 线程池之ThreadPoolExecutor使用 线程池之ThreadPoolExecutor状态控制 线程池之ThreadPoolExecutor执行原理 线程池之ScheduledThreadPoolExecutor概述 线程池的优雅关闭实践 ...
Python 线程池 (thread pool) 创建及使用 + 实例代码
创建线程池的实例代码如下:python from concurrent.futures import ThreadPoolExecutor def task(i):定义要执行的任务 pass executor = ThreadPoolExecutor(max_workers=5) # 创建5个线程 for i in range(5): # 提交5个任务 executor.submit(task, i)可以使用as_completed监控任务完成情况 for future...
ThreadPoolExecutor线程大于corePoolsize的多出线程是怎么产生的_百度...
) 无界队列永远也不会触发reject和对临时线程的使用 如果队列满了 并且临时线程也用光了 则通过回调开发者提供的java.util.concurrent.RejectedExecutionHandler实例来处理该任务 题主可以看看java.util.concurrent.Executors是如何利用这些参数基于java.util.concurrent.ThreadPoolExecutor创造出不同的线程池的 ...