本篇文章给大家谈谈什么是线程池,如何使用,为什么要用,以及线程池不建议使用executors对应的知识点,文章可能有点长,但是希望大家可以阅读完,增长自己的知识,最重要的是希望对各位有所帮助,可以解决了您的问题,不要忘了收藏本站喔。
本文目录
如何学习Java多线程
JAVA中的多线程使用十分广泛,很多的JAVA框架都使用到了多线程,比如spring,mybatis,druid等!
多线程有什么好处呢?比如说web服务器的多连接,异步调用,并行操作,避免持续阻塞等等!
多线程怎么实现呢?1,继承Thread类,2,实现Runnable接口,3实现callable+futureTask实现异步回调,4,使用线程池Executors.newFixedThreadPool(5);
多线程怎么保证线程安全?
1,时间换空间:加锁
①,synchronize:锁方法,锁代码段,锁对象,锁的粒度大!
②,reentrantlock:使用lock和unlock实现加锁和解锁,可使用ReadWriteLock读写锁来实现读和写的锁分离,底层使用CAS和AQS实现,这也是很多框架里面用到的技术!
2,空间换时间:线程的本地变量隔离,ThreadLocal,实现一个线程一份变量,数据不共享,所以线程安全,spring中bean默认都是单例的,但是spring接受并发请求是线程安全的,就是因为使用threadlocal把请求,上下文数据装在了线程里。所以请求之间互不干涉!
JAVA多线程还涉及到哪些技术?
1,synchonizeHashmap,hashTable(基本上是锁方法,所以效率低),concurrentHashmap(分段锁,锁粒度小,性能好),CopyOnWriteArrayList、CopyOnWriteArraySet(可重入锁)等等!
2,countdownbatch用做计数器!
3,使用forkjoin做并行计算!
4,有锁不如无锁!
....
多了解这些技术下面底层的东西,多去实际情景中总结,犯错然后改正才能更快的成长!
JAVA多线程知识点可以写好几本书,而随便一个知识点都可以写一章,只有经常钻研并使用才能懂其精髓,希望我在这条路上越走越远,以后学到的东西就记录于此,互相学习,共勉。。
多线程返回值问题
importjava.util.concurrent.Callable;
/**
*线程类,需要返回值的,实现Callable接口
*@authorxhc
*
*/
@SuppressWarnings("rawtypes")
publicclassMyThreadimplementsCallable{
@SuppressWarnings("static-access")
@Override
publicObjectcall()throwsException{
Thread.currentThread().sleep(1000);//睡眠一秒
returnThread.currentThread().getName();
}
}
测试类:
packagecom.xjiuge.test;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importjava.util.concurrent.Future;
publicclassApp{
//建立线程池
staticExecutorServicepool=Executors.newCachedThreadPool();
@SuppressWarnings("unchecked")
publicstaticvoidmain(String[]args){
//存放future的集合
List<Future<String>>list=newArrayList<>();
longst=System.currentTimeMillis();//开始时间
//循环5次,开启5个线程
for(inti=0;i<5;i++){
//获取线程类返回的值,用future接收
Future<String>future=pool.submit(newMyThread());
//将future放入list
list.add(future);
}
try{
//遍历list读取future中的值
for(Future<String>future:list){
while(true){
//判断线程操作是否执行完毕,并且操作没有被取消掉
if(future.isDone()&&!future.isCancelled()){
//调用get方法获取返回值
Stringresult=future.get().toString();
System.out.println(result);
break;
}
}
}
}catch(Exceptione){
e.printStackTrace();
}
longet=System.currentTimeMillis();//结束时间
System.out.println("总耗时:"+(et-st)+"ms");
}
}
打印结果:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
总耗时:1004ms
线程池处理流程是什么
提示
请带着这些问题继续后文,会很大程度上帮助你更好的理解相关知识点。@pdai
为什么要有线程池?Java是实现和管理线程池有哪些方式?请简单举例如何使用。为什么很多公司不允许使用Executors去创建线程池?那么推荐怎么使用呢?ThreadPoolExecutor有哪些核心的配置参数?请简要说明ThreadPoolExecutor可以创建哪是哪三种线程池呢?当队列满了并且worker的数量达到maxSize的时候,会怎么样?说说ThreadPoolExecutor有哪些RejectedExecutionHandler策略?默认是什么策略?简要说下线程池的任务执行机制?execute–>addWorker–>runworker(getTask)线程池中任务是如何提交的?线程池中任务是如何关闭的?在配置线程池的时候需要考虑哪些配置因素?如何监控线程池的状态?为什么要有线程池线程池能够对线程进行统一分配,调优和监控:
降低资源消耗(线程无限制地创建,然后使用完毕后销毁)提高响应速度(无须创建线程)提高线程的可管理性ThreadPoolExecutor例子Java是如何实现和管理线程池的?
从JDK5开始,把工作单元与执行机制分离开来,工作单元包括Runnable和Callable,而执行机制由Executor框架提供。
WorkerThreadSimpleThreadPool
程序中我们创建了固定大小为五个工作线程的线程池。然后分配给线程池十个工作,因为线程池大小为五,它将启动五个工作线程先处理五个工作,其他的工作则处于等待状态,一旦有工作完成,空闲下来工作线程就会捡取等待队列里的其他工作进行执行。
这里是以上程序的输出。
输出表明线程池中至始至终只有五个名为"pool-1-thread-1"到"pool-1-thread-5"的五个线程,这五个线程不随着工作的完成而消亡,会一直存在,并负责执行分配给线程池的任务,直到线程池消亡。
Executors类提供了使用了ThreadPoolExecutor的简单的ExecutorService实现,但是ThreadPoolExecutor提供的功能远不止于此。我们可以在创建ThreadPoolExecutor实例时指定活动线程的数量,我们也可以限制线程池的大小并且创建我们自己的RejectedExecutionHandler实现来处理不能适应工作队列的工作。
这里是我们自定义的RejectedExecutionHandler接口的实现。
RejectedExecutionHandlerImpl.javaThreadPoolExecutor提供了一些方法,我们可以使用这些方法来查询executor的当前状态,线程池大小,活动线程数量以及任务数量。因此我是用来一个监控线程在特定的时间间隔内打印executor信息。
MyMonitorThread.java这里是使用ThreadPoolExecutor的线程池实现例子。
WorkerPool.java注意在初始化ThreadPoolExecutor时,我们保持初始池大小为2,最大池大小为4而工作队列大小为2。因此如果已经有四个正在执行的任务而此时分配来更多任务的话,工作队列将仅仅保留他们(新任务)中的两个,其他的将会被RejectedExecutionHandlerImpl处理。
上面程序的输出可以证实以上观点。
注意executor的活动任务、完成任务以及所有完成任务,这些数量上的变化。我们可以调用shutdown()方法来结束所有提交的任务并终止线程池。
ThreadPoolExecutor使用详解其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。
Execute原理当一个任务提交至线程池之后:
线程池首先当前运行的线程数量是否少于corePoolSize。如果是,则创建一个新的工作线程来执行任务。如果都在执行任务,则进入2.判断BlockingQueue是否已经满了,倘若还没有满,则将线程放入BlockingQueue。否则进入3.如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。当ThreadPoolExecutor创建新线程时,通过CAS来更新线程池的状态ctl.
参数corePoolSize线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize,即使有其他空闲线程能够执行新来的任务,也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。workQueue用来保存等待被执行的任务的阻塞队列.在JDK中提供了如下阻塞队列:具体可以参考JUC集合:BlockQueue详解ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQueue;SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue;PriorityBlockingQueue:具有优先级的无界阻塞队列;LinkedBlockingQueue比ArrayBlockingQueue在插入删除节点性能方面更优,但是二者在put(),take()任务的时均需要加锁,SynchronousQueue使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer().
maximumPoolSize线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列,则maximumPoolSize则不起作用,因为无法提交至核心线程池的线程会一直持续地放入workQueue.keepAliveTime线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用,超过这个时间的空闲线程将被终止;unitkeepAliveTime的单位threadFactory创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactoryhandler线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:AbortPolicy:直接抛出异常,默认策略;CallerRunsPolicy:用调用者所在的线程来执行任务;DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;DiscardPolicy:直接丢弃任务;当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
三种类型newFixedThreadPool线程池的线程数量达corePoolSize后,即使线程池没有可执行任务时,也不会释放线程。
FixedThreadPool的工作队列为无界队列LinkedBlockingQueue(队列容量为Integer.MAX_VALUE),这会导致以下问题:
线程池里的线程数量不超过corePoolSize,这导致了maximumPoolSize和keepAliveTime将会是个无用参数由于使用了无界队列,所以FixedThreadPool永远不会拒绝,即饱和策略失效newSingleThreadExecutor初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行.
由于使用了无界队列,所以SingleThreadPool永远不会拒绝,即饱和策略失效
newCachedThreadPool线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;执行过程与前两种稍微不同:
主线程调用SynchronousQueue的offer()方法放入task,倘若此时线程池中有空闲的线程尝试读取SynchronousQueue的task,即调用了SynchronousQueue的poll(),那么主线程将该task交给空闲线程.否则执行(2)当线程池为空或者没有空闲的线程,则创建新的线程执行任务.执行完任务的线程倘若在60s内仍空闲,则会被终止.因此长时间空闲的CachedThreadPool不会持有任何线程资源.关闭线程池遍历线程池中的所有线程,然后逐个调用线程的interrupt方法来中断线程.
关闭方式-shutdown将线程池里的线程状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程.
关闭方式-shutdownNow将线程池里的线程状态设置成STOP状态,然后停止所有正在执行或暂停任务的线程.只要调用这两个关闭方法中的任意一个,isShutDown()返回true.当所有任务都成功关闭了,isTerminated()返回true.
ThreadPoolExecutor源码详解几个关键属性内部状态其中AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
RUNNING:-1<<COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;SHUTDOWN:0<<COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;STOP:1<<COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;TIDYING:2<<COUNT_BITS,即高3位为010,所有的任务都已经终止;TERMINATED:3<<COUNT_BITS,即高3位为011,terminated()方法已经执行完成任务的执行execute–>addWorker–>runworker(getTask)
线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
execute()方法ThreadPoolExecutor.execute(task)实现了Executor.execute(task)
为什么需要doublecheck线程池的状态?在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workque是线程池之前的状态。倘若没有doublecheck,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。
addWorker方法从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务线程池创建新线程执行任务时,需要获取全局锁:
Worker类的runworker方法继承了AQS类,可以方便的实现工作线程的中止操作;实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;当前提交的任务firstTask作为参数传入Worker的构造方法;一些属性还有构造方法:
runWorker方法是线程池的核心:
线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行可中断;Worker执行firstTask或从workQueue中获取任务:进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)检查线程池状态,倘若线程池处于中断状态,当前线程将中断。执行beforeExecute执行任务的run方法执行afterExecute方法解锁操作通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
getTask方法
下面来看一下getTask()方法,这里面涉及到keepAliveTime的使用,从这个方法我们可以看出线程池是怎么让超过corePoolSize的那部分worker销毁的。
注意这里一段代码是keepAliveTime起作用的关键:
allowCoreThreadTimeOut为false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。
如果线程允许空闲等待而不被销毁timed==false,workQueue.take任务:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
如果线程不允许无休止空闲timed==true,workQueue.poll任务:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;
任务的提交submit任务,等待线程池execute执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中,并阻塞等待运行结果;FutureTask任务执行完成后,通过UNSAFE设置waiters相应的waitNode为null,并通过LockSupport类unpark方法唤醒主线程;在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。
Callable接口类似于Runnable,只是Runnable没有返回值。Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;Future.get方法会导致主线程阻塞,直到Callable任务执行完成;submit方法AbstractExecutorService.submit()实现了ExecutorService.submit()可以获取执行完的返回值,而ThreadPoolExecutor是AbstractExecutorService.submit()的子类,所以submit方法也是ThreadPoolExecutor`的方法。
通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;
FutureTask对象publicclassFutureTask<V>implementsRunnableFuture<V>可以将FutureTask提交至线程池中等待被执行(通过FutureTask的run方法来执行)
内部状态内部状态的修改通过sun.misc.Unsafe修改
get方法内部通过awaitDone方法对主线程进行阻塞,具体实现如下:
如果主线程被中断,则抛出中断异常;
判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;最终通过LockSupport的park或parkNanos挂起线程;run方法
FutureTask.run方法是在线程池中被执行的,而非主线程
通过执行Callable任务的call方法;如果call执行成功,则通过set方法保存结果;如果call执行有异常,则通过setException保存异常;任务的关闭shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完
shutdownNow做的比较绝,它先将线程池状态设置为STOP,然后拒绝所有提交的任务。最后中断左右正在运行中的worker,然后清空任务队列。
更深入理解
为什么线程池不允许使用Executors去创建?推荐方式是什么?线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors各个方法的弊端:
newFixedThreadPool和newSingleThreadExecutor:??主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。newCachedThreadPool和newScheduledThreadPool:??主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。推荐方式1首先引入:commons-lang3包
推荐方式2首先引入:com.google.guava包
推荐方式3spring配置线程池方式:自定义线程工厂bean需要实现ThreadFactory,可参考该接口的其它默认实现类,使用方式直接注入bean调用execute(Runnabletask)方法即可
配置线程池需要考虑因素
从任务的优先级,任务的执行时间长短,任务的性质(CPU密集/IO密集),任务的依赖关系这四个角度来分析。并且近可能地使用有界的工作队列。
性质不同的任务可用使用不同规模的线程池分开处理:
CPU密集型:尽可能少的线程,Ncpu+1IO密集型:尽可能多的线程,Ncpu*2,比如数据库连接池混合型:CPU密集型的任务与IO密集型任务的执行时间差别较小,拆分为两个线程池;否则没有必要拆分。监控线程池的状态可以使用ThreadPoolExecutor以下方法:
getTaskCount()Returnstheapproximatetotalnumberoftasksthathaveeverbeenscheduledforexecution.getCompletedTaskCount()Returnstheapproximatetotalnumberoftasksthathavecompletedexecution.返回结果少于getTaskCount()。getLargestPoolSize()Returnsthelargestnumberofthreadsthathaveeversimultaneouslybeeninthepool.返回结果小于等于maximumPoolSizegetPoolSize()Returnsthecurrentnumberofthreadsinthepool.getActiveCount()Returnstheapproximatenumberofthreadsthatareactivelyexecutingtasks.参考文章《Java并发编程艺术》https://www.jianshu.com/p/87bff5cc8d8chttps://blog.csdn.net/programmer_at/article/details/79799267https://blog.csdn.net/u013332124/article/details/79587436https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice由于问答代码块插入受限,部分代码未完全展示,若有需要可阅读原文:戳我阅读原文
什么是线程池,如何使用,为什么要用
一、线程池的作用:
线程池作用就是限制系统中执行线程的数量。
根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。
二、如何使用:
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。
1.newSingleThreadExecutor创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
2.newFixedThreadPool创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
3.newCachedThreadPool创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。4.newScheduledThreadPool创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
三、为什么要用线程池:
1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。
关于什么是线程池,如何使用,为什么要用和线程池不建议使用executors的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
声明:本文内容来自互联网不代表本站观点,转载请注明出处:https://bk.oku6.com/12/100948.html