java线程池ThreadPoolExecutor详解及使用

java线程池ThreadPoolExecutor的使用

在日常开发中我们经常会使用多线程,但是当线程数量达到一定程度时,频繁的创建线程、执行任务、销毁线程就会造成资源的浪费和性能的开销,那么这个时候就要考虑使用线程池了,它可以很好的避免这个问题,尤其是程序中需要创建大量的生存周期很短的线程时,更应该考虑使用线程池。

线程池的概念

线程池(ThreadPool),简单的说,它就是一个装有线程的池子,我们只需把任务(多线程执行的内容)交给线程池来处理,和数据库连接池、Http连接池的概念差不多的,通过维护一定数量的线程来达到多个线程复用的效果,同时它也能够帮我们自动管理线程的生命周期。

线程池的好处

  • 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗
  • 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行
  • 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换
  • 提供更强大的功能,延时定时线程池。

线程池底层类ThreadPoolExcutor介绍

线程池的种类有4种,下面会讲,但是他们的底层使用的都是ThreadPoolExcutor这个类,所以只需把这个类搞懂,就基本能解决一些常规的业务了。

常用构造方法

五个参数的

/**
 * @param corePoolSize 线程池的核心数量
 * @param maximumPoolSize 线程池的总数量
 * @param keepAliveTime 在任务量大于队列长度需要需要创建的线程数量时,会创建新的临时线程,假如新创建的临时线程把队列中的任务都执行完了,那么接下来这			个临时线程并不是立即释放,而是会等待新的任务与核心线程一起去执行,
 * @param unit 设置等待时间的时间单位 DAYS、HOURS、MINUTES等等
 * @param workQueue 阻塞队列,用于存放没有执行的任务,只存放由execute()方法提交的、实现Runnable接口的任务。
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue);

六个参数的

/**
 * @param corePoolSize 线程池的核心数量
 * @param maximumPoolSize 线程池的总数量
 * @param keepAliveTime 在任务量大于队列长度需要需要创建的线程数量时,会创建新的临时线程,假如新创建的临时线程把队列中的任务都执行完了,那么接下来这			个临时线程并不是立即释放,而是会等待新的任务与核心线程一起去执行,
 * @param unit 设置等待时间的时间单位 DAYS、HOURS、MINUTES等等
 * @param workQueue 阻塞队列,用于存放没有执行的任务,只存放由execute()方法提交的、实现Runnable接口的任务。
 * @param handler 任务拒绝策略,当任务量大于我们线程池设置的最大线程数时,那么就会触发这个拒绝策略,java中定义的拒绝策略有4中,之后给大家演示。
 */
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler);

ThreadPoolExcutor类总共有4个,熟悉以上两个常用的构造方法即可。

线程池的使用

这里我就举一个简单的使用场景,用多线程去网页上下载图片。

创建一个springboot项目

@SpringBootApplication
public class SpringbootTemplateApplication {

    public static void main(String[] args) {

        ConfigurableApplicationContext application = SpringApplication.run(SpringbootTemplateApplication.class, args);
        DownloadPicService service = application.getBean(DownloadPicService.class);
        service.start();
    }

    /**
     * 往ioc容器中注入一个自定义的线程池
     * 核心线程数为30
     * 最大线程数为30
     * 临时线程等待任务时间为30s
     * 阻塞队列的长度为10
     * 拒绝策略为丢弃新任务
     * @return
     */
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){
        return new ThreadPoolExecutor(30,
                                     30,
                                     30,
                                     TimeUnit.SECONDS,
                                     new ArrayBlockingQueue<>(10),
                                     new ThreadPoolExecutor.AbortPolicy()
                );
    }

}
@Service
public class DownloadPicService {
    private static final Logger logger = LoggerFactory.getLogger(DownloadPicService.class);

    //将线程池注入
    @Autowired
    public ThreadPoolExecutor threadPoolExecutor;


    public void start(){
        for (int i = 0; i < 41; i++) {

            //将任务提交到线程池,
            threadPoolExecutor.execute(new DownloadTask("https://image.baidu.com/xxxxxxxxx"));
        }
    }

    /**
     * 这里我为了方便,直接创建的是一个静态内部类,实现 Runnable接口
     */
    public static class DownloadTask implements Runnable{
        private String path;    //图片地址

        public DownloadTask(String path) {
            this.path = path;
        }

        @Override
        public void run() {
            FileOutputStream fos = null;
            try {
                byte[] bytes = IOUtils.toByteArray(new URI(path));
                String uuid = UUID.randomUUID().toString();
                String picName = uuid+".jpg";
                fos = new FileOutputStream(new File("F:\\labelTest"+File.separator+picName));
                IOUtils.write(bytes, fos);
                logger.info("图片下载成功,{}", picName);
            }catch (Exception e){
                logger.error("下载图片失败,原因:{}", e.getMessage());
            }finally {
                    try {
                        if(fos != null){
                            fos.close();
                        }
                    }catch (Exception e){
                        e.printStackTrace();
                    }
            }
        }
    }

}

启动后发现报错

Caused by: java.util.concurrent.RejectedExecutionException: Task com.pihao.service.DownloadPicService$DownloadTask@22f314c7 rejected from java.util.concurrent.ThreadPoolExecutor@4ec3005d[Running, pool size = 30, active threads = 30, queued tasks = 10, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.pihao.service.DownloadPicService.start(DownloadPicService.java:35)
	at com.pihao.SpringbootTemplateApplication.main(SpringbootTemplateApplication.java:20)
	... 5 more
# 这是什么原因呢?AbortPolicy,这是我设置的线程池拒绝策略,当现有任务数量超过线程池设置的最大线程数时会触发,究其原因,是因为我执行了41个下载任务,而最大线程数设置的40,所以第41个任务就被舍弃了,拒绝执行,抛出错误,前面40个任务继续执行不受影响。

线程池的执行步骤

先来看一张原理图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FAJAOHD9-1596719539105)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20200806155151733.png)]

  • 首先用户提交任务到线程池,线程池判断现有的任务数量是否大于核心线程数
    • 如果没有大于核心线程数,那么就直接使用线程池中闲置的线程来执行任务;
    • 如果现有任务数量超过了核心线程数,并且阻塞队列中还没有放满,那么之后进来的任务就会暂时存放到阻塞队列中,等待核心线程来执行
    • 如果现有任务数量超过了核心线程数,并且阻塞队列也已经放满了,那么就会创建新的临时线程
      • 如果此时临时线程数量+核心线程数量没有超过最大线程数量,由新线程与核心线程一起分摊执行任务
      • 如果此时临时线程数量+核心线程数量超过最大线程数量,就会触发任务拒绝策略,有四种,后面讲

ok,执行步骤就是这样

线程池的种类

newFixedThreadPool

固定的线程池

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

特点:核心线程数和最大线程数相等,而空闲的存活时间为0毫秒,说明这个参数也没有啥意义,工作队列为最大的Integer.MAX_VALUE大小的阻塞队列。当执行任务时,如果线程都很忙,就会丢到工作队列等有空闲线程时再执行,队列满就执行默认的拒绝策略

newSingleThreadExecutor

单例的线程池

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

特点:核心线程数和最大线程数均为1,空闲的存活时间为0毫秒,工作队列也是最大的Integer.MAX_VALUE大小的阻塞队列。意味着每次执行一个线程,多余的任务先存储到工作队列,一个一个的执行,这样就保证了线程的顺序执行

newCachedThreadPool

带缓冲的线程池

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

特点:核心线程数为0,最大线程数为最大值Integer.MAX_VALUE,创建的新的临时线程等待60秒后释放,SynchronousQueue这是一个直接提交的队列,意味着每个新任务都会有线程来执行,如果线程池有可用的线程则执行任务,否则就创建一个来执行,线程池中的线程数不确定,一般建议执行速度较快较小的线程,不然这个最大线程池边界过大容易造成内存溢出。

newScheduledThreadPool

调度线程池

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

特点:延时队列,支持时间控制,详细请查看它的 schedule ( ) 方法

线程池任务拒绝策略

之前说过,当提交的任务总数大于最大线程数时才会触发任务拒绝策略,java中的定义的策略有4中,分别如下

AbortPolicy

这个是最简单的,直接拒绝后面提交的任务,并抛出拒绝异常

DiscardPolicy

后面的任务不做任务处理,不执行,也不抛出异常

DiscardOldestPolicy

丢弃最老的任务,就是从队列中取出最老的任务,然后放入新进来的任务执行

CallerRunsPolicy

如果线程池未关闭,则调用主线程来帮忙执行新的任务,这也会导致主线程效率

总结

线程以及线程池这一块面试必问,躲不掉的!!!重点是线程池的执行步骤那一部分。与其每次侥幸心理背面试题还不如静下心来好好把它理解透彻,事半功倍。