jjzjj

JUC学习-线程池部分

Appletree24 2023-03-28 原文

自定义线程池

package com.appletree24;  
  
import java.util.ArrayDeque;  
import java.util.Deque;  
import java.util.HashSet;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.ReentrantLock;  
  
class Main {  
    public static void main(String[] args) throws ExecutionException, InterruptedException {  
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 5, (queue, task) -> {  
            //带超时等待  
//            queue.offer(task,500,TimeUnit.MILLISECONDS);  
        });  
        for (int i = 0; i < 10; i++) {  
            int j = i;  
            threadPool.execute(() -> {  
                try {  
                    Thread.sleep(1000L);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                System.out.println(j);  
            });  
        }  
    }  
}  
  
//策略模式接口 此处使用策略模式是因为在实现拒绝策略时,有许多种拒绝的方式,这些方式如果不使用恰当的模式,就需要大量的if..else来编写  
//且方式数量大于4个,会造成类膨胀的问题,推荐使用混合模式  
//https://www.runoob.com/design-pattern/strategy-pattern.html  
@FunctionalInterface  
interface RejectPolicy<T> {  
    void reject(BlockingQueue<T> queue, T task);  
}  
  
class ThreadPool {  
    //任务队列  
    private BlockingQueue<Runnable> taskQueue;  
    //线程集合  
    private HashSet<Worker> workers = new HashSet<>();  
  
    //线程数  
    private int coreSize;  
  
    //超时时间  
    private long timeout;  
  
    private TimeUnit timeUnit;  
    private RejectPolicy<Runnable> rejectPolicy;  
  
    //执行任务  
    public void execute(Runnable task) {  
        //当任务数未超过核心线程数时,直接交给Worker对象执行  
        //如果超过,则加入阻塞任务队列,暂存起来  
        synchronized (workers) {  
            if (workers.size() < coreSize) {  
                Worker worker = new Worker(task);  
                workers.add(worker);  
                worker.start();  
            } else {  
                //第一种选择死等  
//                taskQueue.put(task);  
                //第二种为超时等待  
                //第三种为消费者放弃任务执行  
                //第四种为主线程抛出异常  
                //第五种让调用者自行执行任务  
                taskQueue.tryPut(rejectPolicy, task);  
            }  
        }  
    }  
  
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {  
        this.coreSize = coreSize;  
        this.timeout = timeout;  
        this.timeUnit = timeUnit;  
        this.taskQueue = new BlockingQueue<>(queueCapcity);  
        this.rejectPolicy = rejectPolicy;  
    }  
  
    class Worker extends Thread {  
        private Runnable task;  
  
        public Worker(Runnable task) {  
            this.task = task;  
        }  
  
        @Override  
        public void run() {  
            //执行任务  
            //1.当传递过来的task不为空,执行任务  
            //2.当task执行完毕,再接着取下一个任务并执行  
            while (task != null || (task = taskQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {  
                try {  
                    task.run();  
                } catch (Exception e) {  
                    e.printStackTrace();  
                } finally {  
                    task = null;  
                }  
            }  
            synchronized (workers) {  
                workers.remove(this);  
            }  
        }  
    }  
}  
  
class BlockingQueue<T> {  
    //1. 任务队列  
    private final Deque<T> queue = new ArrayDeque<>();  
  
    //2. 锁  
    private final ReentrantLock lock = new ReentrantLock();  
  
    //3. 生产者条件变量  
    private final Condition fullWaitSet = lock.newCondition();  
  
    //4. 消费者条件变量  
    private final Condition emptyWaitSet = lock.newCondition();  
  
    //5. 容量上限  
    private int capcity;  
  
    public BlockingQueue(int capcity) {  
        this.capcity = capcity;  
    }  
  
    //带超时的等待获取  
    public T poll(long timeout, TimeUnit unit) {  
        lock.lock();  
        long nanos = unit.toNanos(timeout);  
        try {  
            while (queue.isEmpty()) {  
                try {  
                    if (nanos <= 0) {  
                        return null;  
                    }  
                    nanos = emptyWaitSet.awaitNanos(nanos);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            T t = queue.removeFirst();  
            fullWaitSet.signal();  
            return t;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //消费者拿取任务的方法  
    public T take() {  
        lock.lock();  
        try {  
            while (queue.isEmpty()) {  
                try {  
                    emptyWaitSet.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            T t = queue.removeFirst();  
            fullWaitSet.signal();  
            return t;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //阻塞添加  
    public void put(T task) {  
        lock.lock();  
        try {  
            while (queue.size() == capcity) {  
                try {  
                    fullWaitSet.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            queue.offerLast(task);  
            //添加完后唤醒消费者等待  
            emptyWaitSet.signal();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //带超时时间的阻塞添加  
    public boolean offer(T task, long timeout, TimeUnit unit) {  
        lock.lock();  
        try {  
            long nanos = unit.toNanos(timeout);  
            while (queue.size() == capcity) {  
                try {  
                    if (nanos <= 0) return false;  
                    nanos = fullWaitSet.awaitNanos(nanos);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            queue.offerLast(task);  
            //添加完后唤醒消费者等待  
            emptyWaitSet.signal();  
            return true;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
  
    //获取当前阻塞队列大小  
    public int size() {  
        lock.lock();  
        try {  
            return queue.size();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {  
        lock.lock();  
        try {  
            //判断队列是否已满  
            if (queue.size() == capcity) {  
                rejectPolicy.reject(this, task);  
            } else {  
                queue.addLast(task);  
                emptyWaitSet.signal();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}

上述的自定义线程池虽然能够执行完毕主线程给予的任务,但任务全部执行结束后,开辟的线程池内核心线程仍然在运行,并没有结束,这是因为目前线程池中的take方法仍然为不会有超时等待的take方法,造成了死等,需要为其加入超时停止的功能。也就是替代take()的poll()

JDK自带线程池

介绍


ThreadPoolExecutor使用int的高三位表示线程池状态,低29位表示线程数量


在ThreadPoolExecutor中,同样也存在拒绝策略。其图结构如下:

其中接口就对应着在自定义线程池中实现的策略模式接口,下面的四个实现类就对应着四种不同的拒绝方式:

利用工具类创建固定大小线程池

利用工具类创建带缓冲的线程池


从源码可以看出,带缓冲的线程池中缓冲队列的使用的是一个名为SynchronousQueue的队列,这个队列的特点如下:队列不具有容量,当没有线程来取时,是无法对其内部放入数据的,例如队列内部已有一个数字1,但此时没有线程取走,则线此队列目前并不能继续存入数据,直到1被取走

利用工具类创建单线程线程池


从源码可以看出,单线程线程池中核心线程数与最大线程数相等,即不存在应急线程。只能解决一个任务
那么这个线程池和我自己创建一个线程的线程池有什么区别呢?区别如下:

ThreadPoolExecutor-submit method

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    Future<String> result = pool.submit(() -> {  
        System.out.println("running");  
        Thread.sleep(1000);  
        return "ok";  
    });  
    System.out.println(result.get());  
}

submit方法可以传入Runnable和Callable类型的参数,并且将线程内部所执行任务的结果返回,用Future包装

ThreadPoolExecutor-invokeAll

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    results.forEach(f -> {  
        try {  
            System.out.printf(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

invokeAll方法可以传入任务的集合,同样的任务的返回值也会以列表形式返回

ThreadPoolExecutor-invokeAny

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    String result = pool.invokeAny(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    pool.awaitTermination(1000, TimeUnit.MILLISECONDS);  
    System.out.println(result);  
}

invokeAny方法同样可以传入任务的集合,只不过最后返回的结果并不是任务的结果集合,而是最早完成的那个任务的结果。

ThreadPoolExecutor-shutdown

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    pool.shutdown();  
    results.forEach(f -> {  
        try {  
            System.out.println(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

shutdown方法会将线程池的状态变为SHUTDOWN

  • 不会接受新任务
  • 但已提交的任务会执行完
  • 此方法不会阻塞调用线程的执行

ThreadPoolExecutor-shutdownNow

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    List<Runnable> runnables = pool.shutdownNow();  
    results.forEach(f -> {  
        try {  
            System.out.println(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

shutdownNow方法会将线程池状态变为STOP

  • 不会接受新任务
  • 会将队列中现有的任务返回
  • 并且用interrupt方法中断正在执行的任务

任务调度线程池

在任务调度线程池加入之前,JDK1.3有一个Timer的工具类可以实现定时功能,但是Timer存在很多缺点,例如Timer的所有定时任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都会影响之后的任务。

public static void main(String[] args) {  
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);  
    pool.schedule(new Runnable() {  
        @Override  
        public void run() {  
            System.out.println("1");  
        }  
    }, 1, TimeUnit.SECONDS);  
    pool.schedule(new Runnable() {  
        @Override  
        public void run() {  
            System.out.println("2");  
        }  
    }, 1, TimeUnit.SECONDS);  
}

上述的两个任务都会在执行完后1s再执行下一个。其实就相当于将Timer中的单线程特点进行了优化,可以用池的形式分配多个线程

public static void main(String[] args) {  
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);  
    pool.scheduleAtFixedRate(new Runnable() {  
        @Override  
        public void run() {  
            System.out.println("1");  
        }  
    }, 1, 1, TimeUnit.SECONDS);  
}

上述任务会每隔1s执行一次,而执行开始的时间则是在被创建完后的1s。但是有个问题就在于,如果任务内部延时大于设定的间隔时间,并且延时大于间隔时间,那么就会等待内部延时结束才进行下一个任务的操作,总的耗时就是任务内的间隔时间

public static void main(String[] args) {  
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);  
    pool.scheduleWithFixedDelay(() -> {  
        try {  
            Thread.sleep(1000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        System.out.println("1");  
    }, 1, 1, TimeUnit.SECONDS);  
}

而上述的方法则是会在内部任务完成后才开始进行任务间隔时间的计算,例如上述代码中最后两个任务的执行时间会是3s

处理线程池中的异常

在线程池中执行的任务,如果执行过程中发生了异常,并不会提示错误信息,如果想要进行异常处理。可以有两种方式
第一种方式是使用Callable类型的参数,通过返回值类型Future进行错误信息的打印。
第二种方式就是主动处理异常,在任务代码段中使用try-catch进行异常捕获,最终输出

    public static void main(String[] args) throws ExecutionException, InterruptedException {  
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);  
        Future<Boolean> result = pool.submit(() -> {  
            int a = 1 / 0;  
            return true;  
        });  
//        System.out.println(result.get());  
    }

如果采取上面的编写方式,就算内部出现了数值溢出,终端内也不会有任何提示。
当把被注释掉的代码放开后,再次运行就可以顺利看到内部的报错了。

定时执行任务

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);  
    //在每周四18:00定时执行任务  
    long period = 1000 * 60 * 60 * 24 * 7;  
    LocalDateTime currentTime = LocalDateTime.now();  
    LocalDateTime initialTime = currentTime.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);  
    //如果当前时间星期数大于四,那么就直接变为下周四,而不是继续变为本周四  
    if (currentTime.compareTo(initialTime) > 0) {  
        initialTime = initialTime.plusWeeks(1);  
    }  
    long initialDelay = Duration.between(currentTime, initialTime).toMillis();  
    pool.scheduleAtFixedRate(() -> System.out.println("running"), initialDelay, period, TimeUnit.MILLISECONDS);  
}

代码内容如注释所述。

有关JUC学习-线程池部分的更多相关文章

  1. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  2. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  3. CAN协议的学习与理解 - 2

    最近在学习CAN,记录一下,也供大家参考交流。推荐几个我觉得很好的CAN学习,本文也是在看了他们的好文之后做的笔记首先是瑞萨的CAN入门,真的通透;秀!靠这篇我竟然2天理解了CAN协议!实战STM32F4CAN!原文链接:https://blog.csdn.net/XiaoXiaoPengBo/article/details/116206252CAN详解(小白教程)原文链接:https://blog.csdn.net/xwwwj/article/details/105372234一篇易懂的CAN通讯协议指南1一篇易懂的CAN通讯协议指南1-知乎(zhihu.com)视频推荐CAN总线个人知识总

  4. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  5. ruby - 如何让Ruby捕获线程中的语法错误 - 2

    我正在尝试使用ruby​​编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?

  6. ruby - 如何在 ruby​​ 中运行后台线程? - 2

    我是ruby​​的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp

  7. ruby - 我正在学习编程并选择了 Ruby。我应该升级到 Ruby 1.9 吗? - 2

    我完全不是程序员,正在学习使用Ruby和Rails框架进行编程。我目前正在使用Ruby1.8.7和Rails3.0.3,但我想知道我是否应该升级到Ruby1.9,因为我真的没有任何升级的“遗留”成本。缺点是什么?我是否会遇到与普通gem的兼容性问题,或者甚至其他我不太了解甚至无法预料的问题? 最佳答案 你应该升级。不要坚持从1.8.7开始。如果您发现不支持1.9.2的gem,请避免使用它们(因为它们很可能不被维护)。如果您对gem是否兼容1.9.2有任何疑问,您可以在以下位置查看:http://www.railsplugins.or

  8. ruby - Rails 开发服务器、PDFKit 和多线程 - 2

    我有一个使用PDFKit呈现网页的pdf版本的Rails应用程序。我使用Thin作为开发服务器。问题是当我处于开发模式时。当我使用“bundleexecrailss”启动我的服务器并尝试呈现任何PDF时,整个过程会陷入僵局,因为当您呈现PDF时,会向服务器请求一些额外的资源,如图像和css,看起来只有一个线程.如何配置Rails开发服务器以运行多个工作线程?非常感谢。 最佳答案 我找到的最简单的解决方案是unicorn.geminstallunicorn创建一个unicorn.conf:worker_processes3然后使用它:

  9. ruby - Ruby 1.9.1 中的 native 线程,对我有什么好处? - 2

    所以,Ruby1.9.1现在是declaredstable.Rails应该与它一起工作,并且正在慢慢地将gem移植到它。它具有native线程和全局解释器锁(GIL)。自从GIL到位后,原生线程是否比1.9.1中的绿色线程有任何优势? 最佳答案 1.9中的线程是原生的,但它们被“放慢了速度”,一次只允许一个线程运行。这是因为如果线程真的并行运行,它会混淆现有代码。优点:IO现在在线程中是异步的。如果一个线程阻塞在IO上,那么另一个线程将继续执行直到IO完成。C扩展可以使用真正的线程。缺点:任何非线程安全的C扩展都可能存在使用Thre

  10. ruby - 我如何学习 ruby​​ 的正则表达式? - 2

    如何学习ruby​​的正则表达式?(对于假人) 最佳答案 http://www.rubular.com/在Ruby中使用正则表达式时是一个很棒的工具,因为它可以立即将结果可视化。 关于ruby-我如何学习ruby​​的正则表达式?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/1881231/

随机推荐