jjzjj

Java多线程 - 创建线程池的方法 - ThreadPoolExecutor和Executors

学全栈的灌汤包 2024-01-01 原文

文章目录

线程池(重点)

线程池介绍

什么是线程池?

线程池就是一个可以复用线程的技术。

不使用线程池的问题:

如果用户每发起一个请求,后台就创建一个新线程来处理,下次新任务来了又要创建新线程,而创建新线程的开销是很大的,这样会严重影响系统的性能。

线程池工作原理:

例如线程池中最多可以允许创建三个工作线程, 也叫核心线程, 前面三个任务来的时候会给前面三个任务单独创建三个线程; 但是后面任务再来的时候, 因为创建的工作线程已达到最大数, 那么后面的任务就会进入任务队列中排队等待; 等前面的任务执行完成, 有空闲的线程的时候使用空闲的线程依次执行任务队列中的任务

实现线程池的方式

谁代表线程池?

JDK 5.0起提供了代表线程池的接口:ExecutorService

如何得到线程池对象?

  • 方式一:使用ExecutorService的实现类ThreadPoolExecutor自创建一个线程池对象
  • 方式二:使用Executors(线程池的工具类)调用方法返回不同特点的线程池对象

方式一: 实现类ThreadPoolExecutor

ThreadPoolExecutor构造器的参数

ThreadPoolExecutor的构造器有如下参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

参数介绍:

  • 参数一:指定线程池的线程数量(核心线程): corePoolSize ----> 不能小于0
  • 参数二:指定线程池可支持的最大线程数: maximumPoolSize ----> 最大数量 >= 核心线程数量
  • 参数三:指定临时线程的最大存活时间: keepAliveTime ----> 不能小于0
  • 参数四:指定存活时间的单位(秒、分、时、天): unit ----> 时间单位
  • 参数五:指定任务队列: workQueue ----> 不能为null
  • 参数六:指定用哪个线程工厂创建线程: threadFactory ----> 不能为null
  • 参数七:指定线程忙,任务满的时候,新任务拒绝策略: handler ----> 不能为null

新任务拒绝策略:

策略详解
ThreadPoolExecutor.AbortPolicy丢弃任务并抛出RejectedExecutionException异常。是默认的策略
ThreadPoolExecutor.DiscardPolicy丢弃任务,但是不抛出异常 这是不推荐的做法
ThreadPoolExecutor.DiscardOldestPolicy抛弃队列中等待最久的任务 然后把当前任务加入队列中
ThreadPoolExecutor.CallerRunsPolicy由主线程负责调用任务的run()方法从而绕过线程池直接执行

ThreadPoolExecutor创建线程池对象示例:

ExecutorService pools = new ThreadPoolExecutor(3, 
                                               5, 
                                               8, 
                                               TimeUnit.SECONDS, 
                                               new ArrayBlockingQueue<>(6), 
                                               Executors.defaultThreadFactory(), 
                                               new ThreadPoolExecutor.AbortPolicy());

思考:

临时线程什么时候创建啊?

  • 新任务提交时发现核心线程都在忙,任务队列也满了,并且还可以创建临时线程,此时才会创建临时线程。

什么时候会开始拒绝任务?

  • 核心线程和临时线程都在忙,任务队列也满了,新的任务过来的时候才会开始任务拒绝。

线程池处理Runnable任务

ExecutorService的常用方法:

方法名称说明
execute(Runnable command)执行任务/命令,没有返回值,一般用来执行 Runnable 任务
shutdown()等任务执行完毕后关闭线程池(一般不会关闭线程池)
shutdownNow()立刻关闭,停止正在执行的任务,并返回队列中未执行的任务(一般不会关闭线程池)

演示代码:

创建一个Runnable任务线程类

public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            System.out.println(Thread.currentThread().getName() + "线程执行输出: " + i);
        }
        // 为了测试, 我们让每个线程睡眠3秒
        try {
            System.out.println(Thread.currentThread().getName() + "线程进入休眠");
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName() + "线程执行完成");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在主类中, 创建一个线程池对象并创建Runnable任务交给线程池处理

  • 如果只有三个任务, 那么会被三个核心线程同时执行
public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为5, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService es = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. Runnable任务交给任务线程池处理
    Runnable target = new MyRunnable();
    // 三个Runnable任务会被核心线程执行
    es.execute(target);
    es.execute(target);
    es.execute(target);
}
  • 打印结果如下
pool-1-thread-1线程执行输出: 0
pool-1-thread-2线程执行输出: 0
pool-1-thread-3线程执行输出: 0
pool-1-thread-2线程执行输出: 1
pool-1-thread-2线程执行输出: 2
pool-1-thread-2线程执行输出: 3
pool-1-thread-1线程执行输出: 1
pool-1-thread-2线程执行输出: 4
pool-1-thread-3线程执行输出: 1
pool-1-thread-3线程执行输出: 2
pool-1-thread-3线程执行输出: 3
pool-1-thread-1线程执行输出: 2
pool-1-thread-3线程执行输出: 4
pool-1-thread-1线程执行输出: 3
pool-1-thread-1线程执行输出: 4
pool-1-thread-2线程进入休眠
pool-1-thread-3线程进入休眠
pool-1-thread-1线程进入休眠
pool-1-thread-3线程执行完成
pool-1-thread-1线程执行完成
pool-1-thread-2线程执行完成
  • 如果核心线程全部被占用, 那么后面的任务会进入任务队列中排队等待有空闲的核心线程; 由于我们设置的任务队列数是5, 所以进入任务队列的任务数量小于等于5时, 不会创建临时线程
public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为5, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService es = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. Runnable任务交给任务线程池处理
    Runnable target = new MyRunnable();
    // 三个Runnable任务会被核心线程执行
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);
}
  • 由于核心线程都在忙, 任务队列也满了, 这时候我们再继续添加任务时, 就会创建临时线程; 因为我们设置的核心线程数是3个, 最大线程数是5个, 所以临时线程最多只会创建两个
public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为5, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService es = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. Runnable任务交给任务线程池处理
    Runnable target = new MyRunnable();
    // 三个Runnable任务会被核心线程执行
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程都在忙, 任务队列满, 创建临时线程
    es.execute(target);
    es.execute(target);
}
  • 核心线程都忙, 任务队列满, 临时线程忙, 此时再继续添加任务就会触发拒绝任务策略
public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为5, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService es = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. Runnable任务交给任务线程池处理
    Runnable target = new MyRunnable();
    // 三个Runnable任务会被核心线程执行
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);
    es.execute(target);

    // 三个核心线程都在忙, 任务队列满, 创建临时线程
    es.execute(target);
    es.execute(target);

    // 后续任务拒绝任务会被拒绝
    es.execute(target);
}
线程池处理Callable任务

ExecutorService常用方法:

方法名称说明
submit(Callable<T> task) 执行Callable任务,返回未来任务对象(FutureTask)获取线程结果
shutdown()等任务执行完毕后关闭线程池
shutdownNow()立刻关闭,停止正在执行的任务,并返回队列中未执行的任务

注意: submit方法返回的是Future对象, Future对象是FutureTask对象继承的父类

演示代码:

定义一个Callable任务类, 用于计算1到n的和返回

package com.chenyq.d4_threadpool2;

import java.util.concurrent.Callable;

public class MyCallable implements Callable<String> {
    private int n;
    public MyCallable(int n) {
        this.n = n;
    }

    @Override
    public String call() throws Exception {
        int sum = 0;
        for (int i = 1; i <= n ; i++) {
            sum += i;
        }
        // 为了测试, 让每一个线程任务睡眠3秒
        System.out.println(Thread.currentThread().getName() + "线程进入睡眠");
        Thread.sleep(3000);
        System.out.println(Thread.currentThread().getName() + "线程执行完成");
        return Thread.currentThread().getName() + "执行1-" + n + "结果是: " + sum;
    }
}

线程池处理Callable任务时的细节和处理Callable的一样, 这里不再一一赘述, 代码如下

public static void main(String[] args) {
    // 1. 创建一个线程池对象 核心线程为5, 最大线程数5, 临时线程存活6秒, 任务队列最大为5
    ExecutorService pool = new ThreadPoolExecutor(3, 5, 6,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    // 2. 创建Callable任务, 调用submit方法, 交给任务线程池处理, 返回未来线程对象
    // 三个Runnable任务会被核心线程执行
    Future<String> f1 = pool.submit(new MyCallable(10));
    Future<String> f2 = pool.submit(new MyCallable(20));
    Future<String> f3 = pool.submit(new MyCallable(30));

    // 三个核心线程被占满, 会进入任务队列排队等待有空闲的核心线程(我们设置的任务队列最大允许排队5个任务)
    Future<String> f4 = pool.submit(new MyCallable(30));
    Future<String> f5 = pool.submit(new MyCallable(40));
    Future<String> f6 = pool.submit(new MyCallable(50));
    Future<String> f7 = pool.submit(new MyCallable(40));
    Future<String> f8 = pool.submit(new MyCallable(50));

    // 三个核心线程都在忙, 任务队列满, 创建临时线程
    Future<String> f9 = pool.submit(new MyCallable(80));
    Future<String> f10 = pool.submit(new MyCallable(60));

    // 后续任务拒绝任务会被拒绝
    Future<String> f11 = pool.submit(new MyCallable(30));

    // 3. 互获取未来线程的返回结果
    try { // 正常的结果
        System.out.println(f1.get());
        System.out.println(f2.get());
        System.out.println(f3.get());
        System.out.println(f4.get());
        System.out.println(f5.get());
        System.out.println(f6.get());
        System.out.println(f7.get());
        System.out.println(f8.get());
        System.out.println(f9.get());
        System.out.println(f10.get());
        System.out.println(f11.get());
    } catch (Exception e) { // 异常的结果
        e.printStackTrace();
    }
}

方式二: Executors工具类创建线程池

Executors得到线程池对象的常用方法如下(工具类的方法基本都是静态方法):

Executors:线程池的工具类通过调用方法返回不同类型的线程池对象。

方法名称说明
ExecutorService newCachedThreadPool()线程数量随着任务增加而增加,如果线程任务执行完毕且空闲了一段时间则会被回收掉。
ExecutorService newFixedThreadPool(int nThreads)创建固定线程数量的线程池,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程替代它(保证线程存活数量)。
ExecutorService newSingleThreadExecutor ()创建只有一个线程的线程池对象,如果该线程出现异常而结束,那么线程池会补充一个新线程。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)创建一个线程池,可以实现在给定的延迟后运行任务,或者定期执行任务。

注意:Executors的底层其实也是基于线程池的实现类ThreadPoolExecutor创建线程池对象的。

例如下面代码当我们通过newFixedThreadPool方法创建线程池对象时

ExecutorService pool = Executors.newFixedThreadPool(3);

它的源码是创建了一个核心线程3, 最大线程3, 意味着没有临时线程, 所以临时线程存活时间也为0, 源码如下

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

演示代码:

我们演示使用newFixedThreadPool(int nThreads)方法创建线程池.

在主类中创建Runnable任务提交到线程池

public static void main(String[] args) {
    // 1. 创建线程池对象
    ExecutorService pool = Executors.newFixedThreadPool(3);
    // 2. 提交Runnable任务到线程池
    pool.execute(new MyRunnable());
    pool.execute(new MyRunnable());
    pool.execute(new MyRunnable());
    // 提交第四个任务就已经没有多余线程了, 会被拒绝
    pool.execute(new MyRunnable());
}

Executors执行Callable任务

public static void main(String[] args) throws Exception {
    // 1. 创建线程池对象
    ExecutorService pool = Executors.newFixedThreadPool(3);

    // 2. 提交Callable任务
    Future f1 = pool.submit(new MyCallable(10));
    Future f2 = pool.submit(new MyCallable(20));
    Future f3 = pool.submit(new MyCallable(30));

    // 获取任务返回结果
    System.out.println(f1.get());
    System.out.println(f2.get());
    System.out.println(f3.get());
}

Executors使用可能存在的陷阱

大型并发系统环境中使用Executors如果不注意可能会出现系统风险。

方法名称存在问题
ExecutorService newFixedThreadPool(int nThreads)允许创建的线程是固定的, 但是线程允许请求的任务队列长度是无穷大的(Integer.MAX_VALUE),可能出现OOM内存溢出错误(java.lang.OutOfMemoryError )
ExecutorService newSingleThreadExecutor()允许创建的线程是固定的, 允许请求的任务队列长度是无穷大的(Integer.MAX_VALUE),可能出现OOM内存溢出错误(java.lang.OutOfMemoryError )
ExecutorService newCachedThreadPool()创建的线程数量最大上限是无穷大(Integer.MAX_VALUE), 线程数可能会随着任务1:1增长,也可能出现OOM内存溢出错误 ( java.lang.OutOfMemoryError )
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)创建的线程数量最大上限是无穷大(Integer.MAX_VALUE), 线程数可能会随着任务1:1增长,也可能出现OOM内存溢出错误 ( java.lang.OutOfMemoryError )

阿里巴巴开发手册中明确说明, 不允许Executors创建线程池

有关Java多线程 - 创建线程池的方法 - ThreadPoolExecutor和Executors的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby - 如何在 Ruby 中顺序创建 PI - 2

    出于纯粹的兴趣,我很好奇如何按顺序创建PI,而不是在过程结果之后生成数字,而是让数字在过程本身生成时显示。如果是这种情况,那么数字可以自行产生,我可以对以前看到的数字实现垃圾收集,从而创建一个无限系列。结果只是在Pi系列之后每秒生成一个数字。这是我通过互联网筛选的结果:这是流行的计算机友好算法,类机器算法:defarccot(x,unity)xpow=unity/xn=1sign=1sum=0loopdoterm=xpow/nbreakifterm==0sum+=sign*(xpow/n)xpow/=x*xn+=2sign=-signendsumenddefcalc_pi(digits

  5. ruby - Facter::Util::Uptime:Module 的未定义方法 get_uptime (NoMethodError) - 2

    我正在尝试设置一个puppet节点,但ruby​​gems似乎不正常。如果我通过它自己的二进制文件(/usr/lib/ruby/gems/1.8/gems/facter-1.5.8/bin/facter)在cli上运行facter,它工作正常,但如果我通过由ruby​​gems(/usr/bin/facter)安装的二进制文件,它抛出:/usr/lib/ruby/1.8/facter/uptime.rb:11:undefinedmethod`get_uptime'forFacter::Util::Uptime:Module(NoMethodError)from/usr/lib/ruby

  6. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  7. Ruby 方法() 方法 - 2

    我想了解Ruby方法methods()是如何工作的。我尝试使用“ruby方法”在Google上搜索,但这不是我需要的。我也看过ruby​​-doc.org,但我没有找到这种方法。你能详细解释一下它是如何工作的或者给我一个链接吗?更新我用methods()方法做了实验,得到了这样的结果:'labrat'代码classFirstdeffirst_instance_mymethodenddefself.first_class_mymethodendendclassSecond使用类#returnsavailablemethodslistforclassandancestorsputsSeco

  8. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  9. ruby-on-rails - 无法使用 Rails 3.2 创建插件? - 2

    我对最新版本的Rails有疑问。我创建了一个新应用程序(railsnewMyProject),但我没有脚本/生成,只有脚本/rails,当我输入ruby./script/railsgeneratepluginmy_plugin"Couldnotfindgeneratorplugin.".你知道如何生成插件模板吗?没有这个命令可以创建插件吗?PS:我正在使用Rails3.2.1和ruby​​1.8.7[universal-darwin11.0] 最佳答案 随着Rails3.2.0的发布,插件生成器已经被移除。查看变更日志here.现在

  10. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

随机推荐