线程池的使用

线程池的使用
强烈推介IDEA2020.2破解激活,IntelliJ IDEA 注册码,2020.2 IDEA 激活码

线程池的使用

一、常见的五中线程池


【1】Executors.newFixedThreadPool(n):创建一个定长的线程池,可控制线程最大并发数,超出的线程会在队列中等待。创建的线程池 corePoolSize 和 maximumPoolSize 值是相等的,使用的是 LinkedBlockingQueue 阻塞队列。执行长期的任务,性能好很多。底层实现如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
   
	return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>());
}

【2】Executors.newSingleThreadExecutor():创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。将 corePoolSize 和 maximumPoolSize 都设置为1,使用的是 LinkedBlockingQueue 阻塞队列。适合一个任务一个任务执行的场景。底层实现如下:

public static ExecutorService newSingleThreadExecutor() {
   
	return new FinalizableDelegatedExecutorService
		(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
		    new LinkedBlockingQueue<Runnable>()));
}

【3】Executors.newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收线程,则新建线程。将 corePoolSize 设置为0,maximumPoolSize 设置为Integer.MAX_VALUE ,使用的阻塞队列是SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。适合执行很多短期异步的小程序或者负载较轻的服务器。

public static ExecutorService newCachedThreadPool() {
   
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
				        new SynchronousQueue<Runnable>());
}

【4】Executors.newScheduledThreadPool(n):创建一个支持定时、周期性或延时任务的限定线程数目的线程池。

schePool = Executors.newScheduledThreadPool(1);
/** * @param command * @param 第一次延迟执行的时间 * @param 两次执行的时间间隔 * @param 参数的时间单位 */
schePool.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

【5】**Executors.newWorkStealingPool():**了解

阿里巴巴开发手册:①、线程资源必须通过线程池提供,不允许在应用中自行显示创建。因为使用线程池能够减少在创建和销毁线程上所消耗的时间以及系统开销,解决资源不足问题。如果不适用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或“过度切换”的问题。②、线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式创建,这样的方式让写的同学更加明确线程池的运行的规则,避免资源耗尽的风险。例如上述前两个使用的阻塞队列是 LinkedBlockingQueue 该阻塞队列虽有界但也相当于无界,因为其长度为 Integer.MAX_VALUE 将近2亿多,可能堆积大量的请求,从而导致 OOM 。也就是说实际生产中**,不使用上述的 Executors 工具类来创建线程池**。

二、线程池的创建(底层)


我们在业务开发中通过使用工具类 Executors.newXXXThreadPool(); 类似的方式创建线程池。可以创建5种不同的线程池,但底层都是通过 new ThreadPoolExecutor 实现。因此也可以通过 ThreadPoolExecutor 来创建一个线程池,也属于底层代码。

new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,milliseconds,runnableTaskQueue,handler);

创建线程池涉及到的参数如下:【1】corePoolSize(线程池的基本大小)线程池中常驻核心线程数。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的 prestartAllCoreThread() 方法,线程池会提前创建并启动所有基本线程。
【2】runnableTaskQueue(任务队列)
用于保存等待执行的任务的阻塞队列。根据需求可以选择以下阻塞队列:
ArrayBlockingQueue:是一个基于数组结构的阻塞队列,此队列按 FIFO先进先出)原则对元素进行
排序

LinkedBlockingQueue:是一个基于链表结构的阻塞队列,此队列按 FIFO排序元素,吞吐量通常高于ArrayBlockingQueue。静态工厂方法 Executors.newFixedThreadPool() 使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用操作,否则插入操作一直处于阻塞状态吞吐量通常高于LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
PriorityBlockingQueue:一个具有优先级无限阻塞队列
【3】maximumPoolSize(线程池最大数量)线程池允许创建的最大线程数,此值>=1。如果队列满了,并且已创建的线程小于最大线程数,则线程池也会创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没有意义

此值一般设置为多少?考虑这个问题首先要分析你的系统是 CPU 密集型,还是 IO 密集型的服务。再就是查看系统的内核数:Runtime.getRuntime().availableProcessors());
①、CPU 密集型:CPU 密集型任务只有在真正的多核 CPU 上才可能得到加速,CPU 一直全速运行。而在单核 CPU 上,无论你开几个模拟的多线程任务都不能得到加速,因为 CPU 总的运算能力就那些。一般公式:线程数=CPU核数+1*(例:cpu i7 7700k 四核八线程 线程数 5)
②、IO 密集型:IO 密集型的任务并不是一直在执行任务,则应配置尽可能多的线程。一般公式:线程数=CPU核数*2(例:cpu i7 7700k 四核八线程 线程数 8)
③、IO 密集型(阻塞):IO 密集型时,大部分线程都阻塞,故需多配置线程数。一般公式:线程数=CPU核数/(1-阻塞系数)。阻塞系数:一般阻塞系数取值在0.8~0.9 之间。(例:cpu i7 7700k 四核八线程 线程数 4/(1-0.8)=20)

【4】**ThreadFactory:**由于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线程设置有意义的名字,一般使用默认即可。如下:

new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

【5】RejectedExecutionHandler(饱和策略)**:**当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这种策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。在 JDK1.5 中 Java 线程池框架提供了4种策略以及饱和策略的测试代码:
■ **AbortPolicy:**直接抛出异常,阻止系统工作。
■ **CallerRunsPolicy:**只要线程池未关闭,该策略直接在调用者线程中运行当前被丢弃的任务。显然这样不会真的丢弃任务,但是,调用者线程性能可能急剧下降。
■ **DiscardOldestPolicy:**丢弃最老的一个请求任务,也就是丢弃一个即将被执行的任务,并尝试再次提交当前任务。
■ **DiscardPolicy:**默默的丢弃无法处理的任务,不予任何处理。

public class MyThreadPoolDemo {
   
    public static void main(String[] args) {
   
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,
                    5, 1,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<>(3)
                    , Executors.defaultThreadFactory()
                    //测试不同的饱和策略
                    , new ThreadPoolExecutor.AbortPolicy());
        //模拟八个用户调用线程池处理任务 当超过 maximumPoolSize+LinkedBlockingQueue 时,使用饱和策略
        for(int i=1;i<=8;i++){
   
            //执行方法有两种,execute 和 submit(可以传入 callable带返回值)
            pool.execute(()->{
   
                System.out.println("当前线程为:"+Thread.currentThread().getName());
            });
        }
    }
}

当然,也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。如记录日志或持久化存储不能执行的任务。
【6】keepAliveTime(线程活动保持时间)**:线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间很短,可以调大时间,提高线程的利用率。
【7】TimeUnit(线程活动保持时间的单位)
:**可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微妙)。

三、向线程池提交任务


可以使用两个方法向线程池提交任务,分别为 execute() 和 submit() 方法。
execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过代码可知 execute() 方法输入的任务是一个 Runnable 类的实例。

/** * 第四种创建 java 多线程的方式:线程池 * * 常见的线程池中类有 5 种 */
public class MyThreadPoolDemo {
   
    public static void main(String[] args) {
   
        //创建一个固定的线程池,包含5个线程
        ExecutorService pool = Executors.newFixedThreadPool(5);
        //创建一个单线程的线程池:输入结果全是 “当前线程为:pool-1-thread-1”
        //ExecutorService pool = Executors.newSingleThreadExecutor();
        //创建 n 条线程来处理业务。线程数不确定
        //ExecutorService pool = Executors.newCachedThreadPool();
        
        //模拟十个用户调用线程池处理任务
        for(int i=1;i<=10;i++){
   
            //执行方法有两种,execute 和 submit(可以传入 callable带返回值)
            pool.execute(()->{
   
                System.out.println("当前线程为:"+Thread.currentThread().getName());
            });
        }
    }
}

/**输出结果展示: *当前线程为:pool-1-thread-1 *当前线程为:pool-1-thread-2 *当前线程为:pool-1-thread-1 *当前线程为:pool-1-thread-2 *当前线程为:pool-1-thread-2 *当前线程为:pool-1-thread-2 *当前线程为:pool-1-thread-1 *当前线程为:pool-1-thread-3 *当前线程为:pool-1-thread-4 *当前线程为:pool-1-thread-5 */

//也可以使用如下形式:
threadPool.execute(new Runnable(){
   
   @Override
   public void run(){
   
       //TODO Auto-generated method stub
   }
});

submit() 方法用于提交需要返回值的任务。线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get() 方法来获取返回值,get() 方法会阻塞当前线程直到任务成功,而使用 get(long timeout,timeUnit unit) 方法则会阻塞当前线程一段时间后立即返回,这个时候任务可能没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
     try{
   
           Object s = future.get();
     } catch (InterruptedException e) {
   
           //处理中断异常
     } catch (ExecutionException e) {
   
           //处理无法执行任务异常
     } finally {
   
           //关闭线程池
           executor.shutdown();
     }

四、合理配置线程池


根据任务特性合理的配置线程池,可以从以下几个角度来分析:
● 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
● 任务的优先级:高、中和底。
● 任务的执行时间:长、中和短。
● 任务的依赖性:是否依赖其他系统资源,如数据库连接。
【1】性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务(每当用户访问资源的时候,会发送一个比对请求到服务端,比对本地静态文件版本和服务端的文件版本是否一致,不一致则更新。这种任务一般不占用大量 IO,所以后台服务器可以快速处理,压力落在CPU上)应配置尽可能小的线程,如配置N(cpu 的个数)+1个线程的线程池。由于IO密集型任务(常有大数据量的查询和批量插入操作,此时的压力主要在I/O上)线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N(cpu的个数)。混合型任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行时间相差不大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则么必要进行分解。可以通过Runtime.getRuntime().availableProcessors() 方法获得当前设备的CPU个数。
【2】优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行。

注意:如果一直有优先级高的任务提交到任务队列里,那么优先级低的任务可能永远不能执行。

【3】执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
【4】依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待时间越长,则CPU空闲时间就越长,那么线程数应该设置的越大,这样才能更好的利用CPU。
【5】建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设置大一点,比如几千。举个栗子:有一次,系统后台任务线程池的队列和线程池全满了,不断抛出任务处理异常,通过排查发现是数据库的问题,导致执行SQL变的缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务使用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样的问题时也会影响到其他任务。

五、线程池的监控


如果系统中大量使用线程池,则有必要对线程池进行监控,方便出现问题时,可以根据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用一下属性:
● **taskCount:**线程池执行的任务数量。
● **completedTaskCount:**线程池在运行过程中已完成的任务数量,小于或等于 taskCount。
● **largestPoolSize:**线程池里曾经创建过最大线程数量。通过这个数据可以知道线程池是否曾经满过。如果数值等于线程池的最大值,则表示线程池曾经瞒过。
● **getPoolSize:**线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
● **getActiveCount:**获取活动的线程数。

过扩展线程池进行监控,可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute 和 terminated 方法,也可以在任务执行前、执行后和线程池关闭前执行一些代码来进行监控。例如,监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。

protected void beforeExecute(Thread t, Runnable r) {
     }

六、关闭线程池


可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。两者的区别:shutdown 只是将线程池的状态设置成SHUTDOWN 状态,然后中断所有么有正在执行任务的线程。shutdownNow 将线程池设置为 STOP 状态,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务来决定,通常调用shutdown 方法来关闭线程池,如果任务不一定要执行完,就可以调用 shutdownNow 方法。

Now 将线程池设置为 STOP 状态,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务来决定,通常调用shutdown 方法来关闭线程池,如果任务不一定要执行完,就可以调用 shutdownNow 方法。

**总结:**通过本篇先简单了解为什么要使用线程池、如何使用线程池和线程池的使用原理等。方便以后更准确、更有效地使用线程池

本文来源程序猿进阶,由javajgs_com转载发布,观点不代表Java架构师必看的立场,转载请标明来源出处:https://javajgs.com/archives/8322

发表评论